CnC
 All Classes Namespaces Functions Variables Typedefs Enumerator Friends
reduce.h
00001 /* *******************************************************************************
00002  *  Copyright (c) 2007-2014, Intel Corporation
00003  *
00004  *  Redistribution and use in source and binary forms, with or without
00005  *  modification, are permitted provided that the following conditions are met:
00006  *
00007  *  * Redistributions of source code must retain the above copyright notice,
00008  *    this list of conditions and the following disclaimer.
00009  *  * Redistributions in binary form must reproduce the above copyright
00010  *    notice, this list of conditions and the following disclaimer in the
00011  *    documentation and/or other materials provided with the distribution.
00012  *  * Neither the name of Intel Corporation nor the names of its contributors
00013  *    may be used to endorse or promote products derived from this software
00014  *    without specific prior written permission.
00015  *
00016  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017  *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018  *  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00019  *  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
00020  *  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00021  *  DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
00022  *  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00023  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
00024  *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00025  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00026  ********************************************************************************/
00027 
00028 /*
00029   Reductions for CnC, provided as a re-usable graph.
00030 */
00031 
00032 #ifndef _CnC_REDUCE_H_
00033 #define _CnC_REDUCE_H_
00034 
00035 #include <cnc/internal/cnc_stddef.h>
00036 #include <tbb/concurrent_unordered_map.h>
00037 #include <tbb/combinable.h>
00038 #include <tbb/atomic.h>
00039 #include <tbb/spin_rw_mutex.h>
00040 #include <functional>
00041 
00042 namespace CnC
00043 {
00044 
00045     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > class reduction;
00046 
00047     /// \defgroup reductions Asynchronous Reductions
00048     /// @{
00049 
00050     /// Creates a graph for asynchronous reductions.
00051     ///
00052     /// Takes an input collection and reduces its content with a given
00053     /// operation and selection mechanism.  The computation is done
00054     /// while new items arrive. Not all items need to be available to
00055     /// start or make progress.  Data input is provided by normal puts
00056     /// into the input collection. The final reduced value for a
00057     /// reduction is put into the output collection.
00058     ///
00059     /// Supports multiple concurrent reductions (with the same
00060     /// operation) identified by a reduction id.  For this, a selector
00061     /// functor can be provided to tell which data-item goes to which
00062     /// reduction (maps a data-tag to a reduction-id).
00063     ///
00064     /// The number reduced items per reduction-id needs to be provided
00065     /// through a second input collection. You can signal no more
00066     /// incoming values by putting a count < 0. Providing counts
00067     /// late reduces communication and potentially improves performance.
00068     ///
00069     /// Each reduction is independent of other reductions and can
00070     /// finish independently while others are still processing.
00071     /// Connected graphs can get the reduced values with a normal
00072     /// get-calls (using the desired reduction-id as the tag).
00073     ///
00074     /// The implementation is virtually lock-free. On distributed memory
00075     /// the additional communication is also largely asynchronous.
00076     ///
00077     /// See also \ref reuse
00078     ///
00079     /// \param ctxt the context to which the graph belongs
00080     /// \param name the name of this reduction graph instance
00081     /// \param in   input collection, every item that's put here is
00082     ///             applied to sel and potentially takes part in a reduction
00083     /// \param cnt  input collection; number of items for each reduction
00084     ///             expected to be put here (tag is reduction-id, value is count)
00085     /// \param out  output collection, reduced results are put here with tags as returned by sel
00086     /// \param op   the reduction operation:\n
00087     ///             IType (*)(const IType&, const IType&) const\n
00088     ///             usually a functor
00089     /// \param idty the identity/neutral element for the given operation
00090     /// \param sel  functor, called once for every item put into "in":\n
00091     ///             bool (*)( const ITag & itag, OTag & otag ) const\n
00092     ///             must return true if given element should be used for a reduction, otherwise false;\n
00093     ///             if true, it must set otag to the tag of the reduction it participates in
00094     template< typename Ctxt, typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00095     graph * make_reduce_graph( CnC::context< Ctxt > & ctxt, const std::string & name, 
00096                                CnC::item_collection< ITag, IType, ITuner > & in,
00097                                CnC::item_collection< OTag, CType, CTuner > & cnt,
00098                                CnC::item_collection< OTag, IType, OTuner > & out,
00099                                const ReduceOp & op,
00100                                const IType & idty,
00101                                const Select & sel )
00102     {
00103         return new reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >( ctxt, name, in, cnt, out, op, idty, sel );
00104     }
00105 
00106 
00107     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00108     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00109     // The below is the implementation, normaly users shouldn't need to read it
00110     // However, you might want to use this a template for writring your own reduction.
00111     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00112     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00113 
00114     
00115 #ifdef _DIST_CNC_
00116     // message tags for dist CnC
00117     namespace DISTRED {
00118         static const char BCASTCOUNT  = 93;
00119         static const char GATHERCOUNT = 94;
00120         static const char DONE        = 95;
00121         static const char ALLDONE     = 96;
00122         static const char VALUE       = 97;
00123         static const char ALLVALUES   = 98;
00124     }
00125 #endif
00126 
00127     // status of each reduction
00128     static const int LOCAL         = 0;
00129     static const int CNT_AVAILABLE = 1;
00130     static const int BCAST_DONE    = 2;
00131     static const int FINISH        = 3;
00132     static const int DONE          = 4;
00133 
00134     // shortcut macro for ITAC instrumentation
00135 #ifdef CNC_WITH_ITAC
00136 # define TRACE( _m ) static std::string _t_n_( m_reduce->name() + _m ); VT_FUNC( _t_n_.c_str() );
00137 #else
00138 # define TRACE( _m )
00139 #endif
00140 
00141 #define M1 static_cast< CType >( -1 )
00142 
00143     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00144     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00145 
00146     // The actual reduction graph. A complex template construct; use
00147     // make_reduce_graph to create a reduction graph. Only the constructor is
00148     // public, everything else is private ("hidden").
00149     //
00150     // We use tbb::combinable for the local (shard memory) reduction part.
00151     //
00152     // On distributed memory we implement the following asyncrhonous protocol
00153     //   - As long as the count of a given reduction is unkown, we proceed as everything was local.
00154     //   - As soon as the count arrives, we do the following
00155     //     0. assign ownership of the reduction to the count-providing process
00156     //        The owner controls gathering the distributed values when the count
00157     //     1a. if count is a real count, it bcasts the count
00158     //     1a if it's a done-flag (-1), we immediately move to 3.
00159     //     2a. immediately followed by a gather of the counts (but not values)
00160     //     2b. processes which are not the owners send a message for every additional item that was not gathered in 2a
00161     //     3. once the onwer sees that all items for the reduction have arrived it bcast DONE
00162     //     4. immediately followed by a gather of the values
00163     //     5. when the owner collected all values, it use tbb::combinable and puts the final value
00164     // As almost everything can happen at the same time, we use transactional-like 
00165     // operations implemented with atomic variables which guide each reduction through its states
00166     // We implement our own bcast/gather tree individually for each owner (as its root)
00167     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00168     class reduction : public CnC::graph
00169     {
00170     public:
00171         typedef CnC::item_collection< ITag, IType, ITuner > icoll_type;
00172         typedef CnC::item_collection< OTag, CType, CTuner > ccoll_type;
00173         typedef CnC::item_collection< OTag, IType, OTuner > ocoll_type;
00174 
00175         template< typename Ctxt >
00176         reduction( CnC::context< Ctxt > & ctxt, const std::string & name, icoll_type & in, ccoll_type & c, ocoll_type & out, const ReduceOp & red, const IType & identity, const Select & sel );
00177         ~reduction();
00178         
00179         // sometimes you can't tell number of reduced items until all computation is done.
00180         // This call finalizes all reductions, no matter if a count was given or not.
00181         void flush();
00182         
00183         // the implementation is "hidden"
00184     private:
00185         typedef reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select > reduce_type;
00186         typedef tbb::spin_rw_mutex mutex_type;
00187 
00188         // thread-local storage per reduction
00189         struct red_tls {
00190             tbb::combinable< IType > val;
00191             tbb::atomic< CType > nreduced;
00192             tbb::atomic< CType > n;
00193             mutex_type mtx;
00194 #ifdef _DIST_CNC_
00195             tbb::atomic< int > nCounts;
00196             tbb::atomic< int > nValues;
00197             int owner;
00198             
00199 #endif
00200             tbb::atomic< int > status;
00201             red_tls();
00202             red_tls( const IType & v );
00203         };
00204         typedef tbb::concurrent_unordered_map< OTag, red_tls > tls_map_type;
00205 
00206         // callback for collection a
00207         struct on_item_put : public icoll_type::callback_type
00208         {
00209             on_item_put( reduce_type * r );
00210             void on_put( const ITag & tag, const IType & val );
00211 #ifdef _DIST_CNC_
00212             void on_value( const OTag & otag, const typename tls_map_type::iterator & i, const IType & val );
00213 #endif
00214             void add_value( const typename tls_map_type::iterator & i, const IType & val ) const;
00215         private:
00216 
00217             reduce_type * m_reduce;
00218         };
00219         friend struct on_item_put;
00220 
00221         // callback for count collection
00222         struct on_count_put : public ccoll_type::callback_type
00223         {
00224             on_count_put( reduce_type * r );
00225             void on_put( const OTag & otag, const CType & cnt );
00226 #ifdef _DIST_CNC_
00227             void on_done( const OTag & otag, const typename tls_map_type::iterator & i, const int owner );
00228             void on_bcastCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt, const int owner );
00229             void on_gatherCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt );
00230 #endif
00231         private:
00232             reduce_type * m_reduce;
00233         };
00234         friend struct on_count_put;
00235 
00236         typename tls_map_type::iterator get( const OTag & tag );
00237         bool try_put_value( const OTag & otag, const typename tls_map_type::iterator & i );
00238 #ifdef _DIST_CNC_
00239         bool send_count( const OTag & otag, const typename tls_map_type::iterator & i, const int to, const bool always );
00240         static int my_parent_for_root( const int root );
00241         void try_send_or_put_value( const OTag & otag, const typename tls_map_type::iterator & i );
00242         void try_send_or_put_all();
00243         // home-grown bcast that uses a tree for each root
00244         // at some point something like this should go into the CnC runtime
00245         // returns the number of messages sent (0, 1 or 2)
00246         int bcast( CnC::serializer * ser, int root );
00247         int bcast_count( const OTag & tag, const CType & val, const int root );
00248         virtual void recv_msg( serializer * ser );
00249 #endif
00250         icoll_type & m_in;
00251         ccoll_type & m_cnt;
00252         ocoll_type & m_out;
00253         on_item_put   * m_ondata;
00254         on_count_put   * m_oncount;
00255         ReduceOp     m_op;
00256         Select       m_sel;
00257         tls_map_type m_reductions;
00258         const IType  m_identity;
00259 #ifdef _DIST_CNC_
00260         tbb::atomic< int > m_nDones;
00261         bool         m_alldone;
00262 #endif
00263     };
00264 
00265     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00266     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00267 
00268     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00269     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::red_tls::red_tls()
00270     {
00271         nreduced = 0;
00272         n = M1;
00273         status = LOCAL;
00274 #ifdef _DIST_CNC_
00275         nCounts = -1;
00276         nValues = -1;
00277         owner = -1;
00278 #endif
00279     } 
00280 
00281     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00282 
00283     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00284     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::red_tls::red_tls( const IType & v )
00285     { 
00286         val = v;
00287         nreduced = 0;
00288         n = M1;
00289         status = LOCAL;
00290 #ifdef _DIST_CNC_
00291         nCounts = -1;
00292         nValues = -1;
00293         owner = -1;
00294 #endif
00295     } 
00296 
00297     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00298     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00299 
00300     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00301     template< typename Ctxt >
00302     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::reduction( CnC::context< Ctxt > & ctxt, const std::string & name,
00303                                                                                                 icoll_type & in, ccoll_type & c, ocoll_type & out,
00304                                                                                                 const ReduceOp & red, const IType & identity, const Select & sel )
00305         : CnC::graph( ctxt, name ),
00306           m_in( in ),
00307           m_cnt( c ),
00308           m_out( out ),
00309           m_ondata( new on_item_put( this ) ),
00310           m_oncount( new on_count_put( this ) ),
00311           m_op( red ),
00312           m_sel( sel ),
00313           m_reductions(),
00314           m_identity( identity )
00315     {
00316         // callback objects must persist the lifetime of collections
00317         m_in.on_put( m_ondata );
00318         m_cnt.on_put( m_oncount );
00319 #ifdef _DIST_CNC_
00320         m_alldone = false;
00321         m_nDones = -1;
00322 #endif
00323     }
00324 
00325     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00326 
00327     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00328     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::~reduction() 
00329     {
00330         // delete m_ondata;
00331         // delete m_oncount;
00332     }
00333 
00334     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00335 
00336     // sometimes you can't tell number of reduced items until all computation is done.
00337     // This call finalizes all reductions, no matter if a count was given or not.
00338     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00339     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::flush()
00340     {
00341 #ifdef _DIST_CNC_
00342         if( Internal::distributor::active() ) {
00343             if( trace_level() > 0 ) {
00344                 Internal::Speaker oss(std::cout);
00345                 oss << this->name() << " flush: bcast ALLDONE";
00346             }
00347             CNC_ASSERT( CnC::tuner_base::myPid() == 0 );
00348             m_alldone = true;
00349             CnC::serializer * ser = this->new_serializer();
00350             (*ser) & DISTRED::ALLDONE;
00351             m_nDones = 1000; // protect from current messages
00352             m_nDones += bcast( ser, 0 );
00353             m_nDones -= 999;
00354             try_send_or_put_all();
00355         } else
00356 #endif
00357         for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) {
00358             m_out.put( i->first, i->second.val.combine( m_op ) );
00359         }
00360     }
00361     
00362     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00363     typename reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::tls_map_type::iterator
00364     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::get( const OTag & tag )
00365     {
00366         typename tls_map_type::iterator i = m_reductions.find( tag );
00367         if( i == m_reductions.end() ) {
00368             i = m_reductions.insert( typename tls_map_type::value_type( tag, red_tls() ) ).first;
00369         }
00370         return i;
00371     }
00372     
00373     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00374 
00375     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00376     bool reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_put_value( const OTag & otag, const typename tls_map_type::iterator & i )
00377     {
00378         if( trace_level() > 2 ) {
00379             Internal::Speaker oss(std::cout);
00380             oss << this->name() << " try_put_value [" << otag << "]"
00381 #ifdef _DIST_CNC_
00382                 << " nValues " << i->second.nValues << " status " << i->second.status
00383 #endif
00384                 ;
00385         }
00386         if( i->second.nreduced == i->second.n ) {
00387             // setting n could go in parallel, it sets new n and then compares
00388             mutex_type::scoped_lock _lock( i->second.mtx );
00389             if( i->second.status != DONE ) {
00390                 this->m_out.put( otag, i->second.val.combine( this->m_op ) );
00391                 i->second.status = DONE;
00392                 return true;
00393             }
00394         }
00395         return false;
00396     }
00397     
00398     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00399 
00400 #ifdef _DIST_CNC_
00401     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00402     bool reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::send_count( const OTag & otag, const typename tls_map_type::iterator & i,
00403                                                                                                       const int to, const bool always )
00404     {
00405         // we might have a count-put and the last value-put concurrently and all local
00406         // only then the owner/to can be <0 or myself
00407         if( to < 0 || to == CnC::tuner_base::myPid() ) return false;
00408         CType _cnt = i->second.nreduced.fetch_and_store( 0 );
00409         if( always || _cnt > 0 ) { // someone else might have transmitted the combined count already
00410             CnC::serializer * ser = this->new_serializer();
00411             (*ser) & DISTRED::GATHERCOUNT & otag & _cnt;
00412             if( trace_level() > 2 ) {
00413                 Internal::Speaker oss(std::cout);
00414                 oss << this->name() << " send GATHERCOUNT [" << otag << "] " << _cnt << " to " << to;
00415             }
00416             this->send_msg( ser, to );
00417             return true;
00418         }
00419         return false;
00420     }
00421 
00422     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00423 
00424     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00425     int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::my_parent_for_root( const int root )
00426     {
00427         const int mpid = CnC::tuner_base::myPid();
00428         const int nps  = CnC::tuner_base::numProcs();
00429         CNC_ASSERT( root != mpid );
00430         int _p = ( ( ( mpid >= root ? ( mpid - root ) : ( mpid + nps - root ) ) - 1 ) / 2 ) + root;
00431         return _p % nps;
00432     };
00433 
00434     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00435 
00436     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00437     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_send_or_put_value( const OTag & otag, const typename tls_map_type::iterator & i )
00438     {
00439         if( trace_level() > 2 ) {
00440             Internal::Speaker oss(std::cout);
00441             oss << this->name() << " try_send_or_put_value [" << otag << "] nValues " << i->second.nValues << " status " << i->second.status;
00442         }
00443         if( i->second.nValues.fetch_and_decrement() == 1 ) {
00444             if( i->second.owner == CnC::tuner_base::myPid() ) {
00445                 if( i->second.status == FINISH ) {
00446                     CNC_ASSERT( i->second.nreduced == i->second.n || i->second.n == M1 );
00447                     CNC_ASSERT( i->second.nValues == 0 && i->second.status == FINISH );
00448                     i->second.nreduced = i->second.n;
00449                     try_put_value( otag, i );
00450                 }
00451             } else {
00452                 CnC::serializer * ser = this->new_serializer();
00453                 IType _val( i->second.val.combine( this->m_op ) );
00454                 (*ser) & DISTRED::VALUE & otag & _val;
00455                 const int to = my_parent_for_root( i->second.owner );
00456                 if( trace_level() > 2 ) {
00457                     Internal::Speaker oss(std::cout);
00458                     oss << this->name() << " send VALUE [" << otag << "] " << _val << " to " << to;
00459                 }
00460                 this->send_msg( ser, to );
00461                 i->second.status = DONE;
00462             }
00463         }
00464     }
00465         
00466     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00467 
00468     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00469     int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::bcast( CnC::serializer * ser, int root ) 
00470     {
00471         const int mpid = CnC::tuner_base::myPid();
00472         const int nps  = CnC::tuner_base::numProcs();
00473         int _r1 = ( ( mpid >= root ? ( mpid - root ) : ( mpid + nps - root ) ) + 1 ) * 2 - 1;
00474         if( _r1 < nps ) {
00475             if( _r1 < nps-1 ) {
00476                 // we have 2 children
00477                 _r1 = (root + _r1) % nps;
00478                 int _recvrs[2] = { _r1, (_r1+1)%nps };
00479                 this->bcast_msg( ser, _recvrs, 2 );
00480                 return 2;
00481             } else {
00482                 // we have only a single child
00483                 _r1 = (root + _r1) % nps;
00484                 this->send_msg( ser, _r1 );
00485                 return 1;
00486             }
00487         }
00488         delete ser;
00489         // we are a leaf, nothing to be sent
00490         return 0;
00491     }
00492 
00493     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00494 
00495     // home-grown bcast that uses a tree for each root
00496     // at some point something like this should go into the CnC runtime
00497     // returns the number of messages sent (0, 1 or 2)
00498     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00499     int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::bcast_count( const OTag & tag, const CType & val, const int root )
00500     {
00501         CnC::serializer * ser = this->new_serializer();
00502         (*ser) & ( val != M1 ? DISTRED::BCASTCOUNT : DISTRED::DONE ) & tag & root;
00503         if( val != M1 ) (*ser) & val;
00504         int _c = bcast( ser, root );
00505         if( _c && trace_level() > 2 ) {
00506             Internal::Speaker oss(std::cout);
00507             oss << this->name() << " bcast " << (val != M1 ? " BCASTCOUNT [" : " DONE [") << tag << "]";
00508         }
00509         return _c;
00510     };
00511 
00512     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00513 
00514     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00515     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_send_or_put_all()
00516     {
00517         CNC_ASSERT( m_alldone );
00518         if( trace_level() > 2 ) {
00519             Internal::Speaker oss(std::cout);
00520             oss << this->name() << " try_send_or_put_all " << m_nDones;
00521         }
00522         if( --m_nDones == 0 ) {
00523             if( CnC::tuner_base::myPid() == 0 ) {
00524                 for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) {
00525                     m_out.put( i->first, i->second.val.combine( m_op ) );
00526                 }
00527             } else {
00528                 int _n = m_reductions.size();
00529                 CnC::serializer * ser = this->new_serializer();
00530                 (*ser) & DISTRED::ALLVALUES & _n;
00531                 for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) {
00532                     IType _val( i->second.val.combine( this->m_op ) );
00533                     (*ser) & i->first & _val;
00534                 }
00535                 const int to = my_parent_for_root( 0 );
00536                 if( trace_level() > 2 ) {
00537                     Internal::Speaker oss(std::cout);
00538                     oss << this->name() << " send ALLVALUES to " << to;
00539                 }
00540                 this->send_msg( ser, to );
00541             }
00542         }
00543     }
00544 
00545     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00546 
00547     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00548     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::recv_msg( serializer * ser )
00549     {
00550         char _op;
00551         (*ser) & _op;
00552 
00553         switch( _op ) {
00554         case DISTRED::GATHERCOUNT : {
00555             OTag _tag;
00556             CType _cnt;
00557             (*ser) & _tag & _cnt;
00558             if( trace_level() > 2 ) {
00559                 Internal::Speaker oss(std::cout);
00560                 oss << this->name() << " recvd GATHERCOUNT [" << _tag << "] " << _cnt;
00561             }
00562             m_oncount->on_gatherCount( _tag, get( _tag ), _cnt );
00563             break;
00564         }
00565         case DISTRED::BCASTCOUNT : {
00566             OTag _tag;
00567             CType _cnt;
00568             int _owner;
00569             (*ser) & _tag & _owner & _cnt ;
00570             if( trace_level() > 2 ) {
00571                 Internal::Speaker oss(std::cout);
00572                 oss << this->name() << " recvd BCASTCOUNT [" << _tag << "] " << _cnt << " " << _owner;
00573             }
00574             m_oncount->on_bcastCount( _tag, get( _tag ), _cnt, _owner );
00575             break;
00576         }
00577         case DISTRED::VALUE : {
00578             OTag _tag;
00579             IType _val;
00580             (*ser) & _tag & _val;
00581             if( trace_level() > 2 ) {
00582                 Internal::Speaker oss(std::cout);
00583                 oss << this->name() << " recvd VALUE [" << _tag << "] " << _val;
00584             }
00585             m_ondata->on_value( _tag, get( _tag ), _val );
00586             break;
00587         }
00588         case DISTRED::DONE : {
00589             OTag _tag;
00590             int _owner;
00591             (*ser) & _tag & _owner;
00592             if( trace_level() > 2 ) {
00593                 Internal::Speaker oss(std::cout);
00594                 oss << this->name() << " recvd DONE [" << _tag << "] " << _owner;
00595             }
00596             m_oncount->on_done( _tag, get( _tag ), _owner );
00597             break;
00598         }
00599         case DISTRED::ALLDONE : {
00600             if( trace_level() > 2 ) {
00601                 Internal::Speaker oss(std::cout);
00602                 oss << this->name() << " recvd ALLDONE";
00603             }
00604             m_alldone = true;
00605             CnC::serializer * ser = this->new_serializer();
00606             (*ser) & DISTRED::ALLDONE;
00607             m_nDones = 1000; // protect from current messages
00608             m_nDones += bcast( ser, 0 );
00609             m_nDones -= 999;
00610             try_send_or_put_all();
00611             break;
00612         }
00613         default:
00614         case DISTRED::ALLVALUES : {
00615             CNC_ASSERT( m_alldone = true );
00616             int _n;
00617             (*ser) & _n;
00618             if( trace_level() > 2 ) {
00619                 Internal::Speaker oss(std::cout);
00620                 oss << this->name() << " recvd ALLVALUES " << _n;
00621             }
00622             while( _n-- ) {
00623                 OTag _tag;
00624                 IType _val;
00625                 (*ser) & _tag & _val;
00626                 const typename tls_map_type::iterator i = get( _tag );
00627                 m_ondata->add_value( i, _val );
00628             }
00629             try_send_or_put_all();
00630             break;
00631         }
00632             CNC_ASSERT_MSG( false, "Unexpected message tag in JOIN" );
00633         }
00634     }
00635 
00636 #endif // _DIST_CNC_
00637         
00638     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00639     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00640     
00641     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00642     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_item_put( reduce_type * r )
00643         : m_reduce( r )
00644     {}
00645     
00646     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00647 
00648     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00649     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::add_value( const typename tls_map_type::iterator & i, const IType & val ) const
00650     {
00651         bool _exists;
00652         IType & _rval = i->second.val.local( _exists );
00653         _rval = m_reduce->m_op( _exists ? _rval : m_reduce->m_identity, val );
00654     }
00655  
00656     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00657 
00658 #ifdef _DIST_CNC_
00659     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00660     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_value( const OTag & otag, const typename tls_map_type::iterator & i, const IType & val )
00661     {
00662         TRACE( "::on_value" );
00663         add_value( i, val );
00664         m_reduce->try_send_or_put_value( otag, i );
00665     }
00666 #endif
00667 
00668     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00669 
00670     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00671     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_put( const ITag & tag, const IType & val )
00672     {
00673         TRACE( "::on_data" );
00674         OTag otag;
00675         if( m_reduce->m_sel( tag, otag ) ) {
00676             typename tls_map_type::iterator i = m_reduce->get( otag );
00677             add_value( i, val );
00678             CType _n = ++i->second.nreduced;
00679             if( m_reduce->trace_level() > 0 ) { 
00680                 Internal::Speaker oss(std::cout);
00681                 oss << m_reduce->name() << " on_put [";
00682                 cnc_format( oss, tag );
00683                 oss << "] for [" << otag << "] " << val << " nred now " << _n << "/" << i->second.n
00684 #ifdef _DIST_CNC_
00685                     << " owner " << i->second.owner << " status " << i->second.status << " nCounts " << i->second.nCounts
00686 #endif
00687                     ;
00688             }
00689 #ifdef _DIST_CNC_
00690             // not yet done, might need some communication
00691             if( Internal::distributor::active() ) {
00692                 // just in case, test if all was local
00693                 // 2 cases
00694                 // 1.: local phase or we are the owner
00695                 // 2.: we know the red-count, but the reduction is not yet done.
00696                 if( i->second.status >= CNT_AVAILABLE ) {
00697                     if( CnC::tuner_base::myPid() == i->second.owner ) {
00698                         // the owner's put was the final put, let's trigger the done propagation
00699                         if( _n == i->second.n && i->second.nCounts <= 0 ) m_reduce->m_oncount->on_done( otag, i, CnC::tuner_base::myPid() );
00700                     } else {
00701                         // 2nd case
00702                         // We have to report the new count (not the value)
00703                         // we use an atomic variable to count, so we do not lose contributions
00704                         // we don't report the value, so nothing gets inconsistent
00705                         // the value is transmitted only when we know nothing more comes in (elsewhere)
00706                         m_reduce->send_count( otag, i, i->second.owner, false );
00707                     }
00708                 } // else means we are still collecting locally (case 1)
00709                 //                        }
00710             } else
00711 #endif
00712                 m_reduce->try_put_value( otag, i );
00713         } else if( m_reduce->trace_level() > 0 ) { 
00714             Internal::Speaker oss(std::cout);
00715             oss << m_reduce->name() << " [";
00716             cnc_format( oss, tag );
00717             oss << "] was not selected";
00718         }
00719     }
00720             
00721     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00722     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00723 
00724     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00725     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_count_put( reduce_type * r )
00726         : m_reduce( r )
00727     {}
00728 
00729     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00730 
00731 #ifdef _DIST_CNC_
00732     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00733     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_done( const OTag & otag, const typename tls_map_type::iterator & i, const int owner )
00734     {
00735         TRACE( "::on_done" );
00736         if( m_reduce->trace_level() > 2 ) { 
00737             Internal::Speaker oss(std::cout);
00738             oss << m_reduce->name() << " on_done for [" << otag << "] nred now " << i->second.nreduced << "/" << i->second.n
00739                 << " owner " << i->second.owner << " status " << i->second.status;
00740         }
00741         i->second.owner = owner;
00742         if( owner != CnC::tuner_base::myPid() || i->second.status.compare_and_swap( BCAST_DONE, CNT_AVAILABLE ) == CNT_AVAILABLE ) {
00743             // "forward" through bcast-tree (we are using our home-grown bcast!)
00744             i->second.nValues = 1000;
00745             i->second.nValues += m_reduce->bcast_count( otag, M1, owner );
00746             int _tmp = i->second.status.compare_and_swap( FINISH, BCAST_DONE );
00747             CNC_ASSERT( owner != CnC::tuner_base::myPid() || _tmp == BCAST_DONE );
00748             // we leave one more for ourself (no contribution to value)
00749             i->second.nValues -= 999;
00750             // we might be a leaf or all the values came back between the bcast and now
00751             m_reduce->try_send_or_put_value( otag, i );
00752         }
00753     }
00754 
00755     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00756 
00757     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00758     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_bcastCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt, const int owner )
00759     {
00760         TRACE( "::on_bcast" );
00761         CNC_ASSERT( cnt != M1 );
00762         i->second.owner = owner;
00763         i->second.n = cnt;
00764 
00765         int _tmp = i->second.status.compare_and_swap( CNT_AVAILABLE, LOCAL );
00766         CNC_ASSERT( _tmp == LOCAL );
00767         // "forward" through bcast-tree (we are using our home-grown bcast!)
00768         i->second.nCounts = 1;
00769         i->second.nCounts += m_reduce->bcast_count( otag, cnt, owner );
00770         // if we are a leaf -> trigger gather to root
00771         if( --i->second.nCounts == 0 && owner != CnC::tuner_base::myPid() ) {
00772             m_reduce->send_count( otag, i, my_parent_for_root( owner ), true );
00773         }
00774     }
00775 
00776     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00777 
00778     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00779     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_gatherCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt )
00780     {
00781         TRACE( "::on_gather" );
00782         CNC_ASSERT( cnt != M1 );
00783         i->second.nreduced += cnt;
00784         if( m_reduce->trace_level() > 2 ) { 
00785             Internal::Speaker oss(std::cout);
00786             oss << m_reduce->name() << " on_gatherCount [" << otag << "] now " << i->second.nreduced << "/" << i->second.n << " nCounts " << i->second.nCounts;
00787         }
00788         // at root, we might need to trigger done phase
00789         if( --i->second.nCounts <= 0 ) { // there might be extra counts
00790             if( i->second.owner == CnC::tuner_base::myPid() ) {
00791                 if( i->second.n == i->second.nreduced ) {
00792                     on_done( otag, i, CnC::tuner_base::myPid() );
00793                 }
00794             } else {
00795                 CNC_ASSERT( i->second.nCounts == 0 ); // extra counts occur only on root
00796                 m_reduce->send_count( otag, i, my_parent_for_root( i->second.owner ), true );
00797             }
00798         }
00799     }
00800 #endif // _DIST_CNC_
00801 
00802     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00803 
00804     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00805     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_put( const OTag & otag, const CType & cnt )
00806     {
00807         TRACE( "::on_cnt" );
00808         typename tls_map_type::iterator i = m_reduce->get( otag );
00809                 
00810 #ifdef _DIST_CNC_
00811         if( Internal::distributor::active() ) {
00812             if( cnt != M1 ) { //is this a normal count?
00813                 i->second.n = cnt;
00814                 // just in case all was local, we try to finish
00815                 if( ! m_reduce->try_put_value( otag, i ) ) {
00816                     on_bcastCount( otag, i, cnt, CnC::tuner_base::myPid() );
00817                 }
00818             } else { // this is the done flag
00819                 // we have no idea what was put remotely
00820                 // -> we trigger the final gather phase
00821                 int _tmp = i->second.status.compare_and_swap( CNT_AVAILABLE, LOCAL );
00822                 CNC_ASSERT( _tmp == LOCAL );
00823                 i->second.nCounts = 0;
00824                 on_done( otag, i, CnC::tuner_base::myPid() );
00825             }
00826         } else
00827 #endif
00828             {
00829                 if( cnt != M1 ) i->second.n = cnt;
00830                 else i->second.n = i->second.nreduced;
00831                 m_reduce->try_put_value( otag, i );
00832             }
00833     }
00834 
00835     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00836 
00837 } // namespace CnC
00838 
00839 /// @}
00840 
00841 #endif //_CnC_REDUCE_H_
 All Classes Namespaces Functions Variables Typedefs Enumerator Friends