00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #ifndef _CnC_REDUCE_H_
00033 #define _CnC_REDUCE_H_
00034
00035 #include <cnc/internal/cnc_stddef.h>
00036 #include <tbb/concurrent_unordered_map.h>
00037 #include <tbb/combinable.h>
00038 #include <tbb/atomic.h>
00039 #include <tbb/spin_rw_mutex.h>
00040 #include <functional>
00041
00042 namespace CnC
00043 {
00044
00045 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > class reduction;
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094 template< typename Ctxt, typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00095 graph * make_reduce_graph( CnC::context< Ctxt > & ctxt, const std::string & name,
00096 CnC::item_collection< ITag, IType, ITuner > & in,
00097 CnC::item_collection< OTag, CType, CTuner > & cnt,
00098 CnC::item_collection< OTag, IType, OTuner > & out,
00099 const ReduceOp & op,
00100 const IType & idty,
00101 const Select & sel )
00102 {
00103 return new reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >( ctxt, name, in, cnt, out, op, idty, sel );
00104 }
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115 #ifdef _DIST_CNC_
00116
00117 namespace DISTRED {
00118 static const char BCASTCOUNT = 93;
00119 static const char GATHERCOUNT = 94;
00120 static const char DONE = 95;
00121 static const char ALLDONE = 96;
00122 static const char VALUE = 97;
00123 static const char ALLVALUES = 98;
00124 }
00125 #endif
00126
00127
00128 static const int LOCAL = 0;
00129 static const int CNT_AVAILABLE = 1;
00130 static const int BCAST_DONE = 2;
00131 static const int FINISH = 3;
00132 static const int DONE = 4;
00133
00134
00135 #ifdef CNC_WITH_ITAC
00136 # define TRACE( _m ) static std::string _t_n_( m_reduce->name() + _m ); VT_FUNC( _t_n_.c_str() );
00137 #else
00138 # define TRACE( _m )
00139 #endif
00140
00141 #define M1 static_cast< CType >( -1 )
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00168 class reduction : public CnC::graph
00169 {
00170 public:
00171 typedef CnC::item_collection< ITag, IType, ITuner > icoll_type;
00172 typedef CnC::item_collection< OTag, CType, CTuner > ccoll_type;
00173 typedef CnC::item_collection< OTag, IType, OTuner > ocoll_type;
00174
00175 template< typename Ctxt >
00176 reduction( CnC::context< Ctxt > & ctxt, const std::string & name, icoll_type & in, ccoll_type & c, ocoll_type & out, const ReduceOp & red, const IType & identity, const Select & sel );
00177 ~reduction();
00178
00179
00180
00181 void flush();
00182
00183
00184 private:
00185 typedef reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select > reduce_type;
00186 typedef tbb::spin_rw_mutex mutex_type;
00187
00188
00189 struct red_tls {
00190 tbb::combinable< IType > val;
00191 tbb::atomic< CType > nreduced;
00192 tbb::atomic< CType > n;
00193 mutex_type mtx;
00194 #ifdef _DIST_CNC_
00195 tbb::atomic< int > nCounts;
00196 tbb::atomic< int > nValues;
00197 int owner;
00198
00199 #endif
00200 tbb::atomic< int > status;
00201 red_tls();
00202 red_tls( const IType & v );
00203 };
00204 typedef tbb::concurrent_unordered_map< OTag, red_tls > tls_map_type;
00205
00206
00207 struct on_item_put : public icoll_type::callback_type
00208 {
00209 on_item_put( reduce_type * r );
00210 void on_put( const ITag & tag, const IType & val );
00211 #ifdef _DIST_CNC_
00212 void on_value( const OTag & otag, const typename tls_map_type::iterator & i, const IType & val );
00213 #endif
00214 void add_value( const typename tls_map_type::iterator & i, const IType & val ) const;
00215 private:
00216
00217 reduce_type * m_reduce;
00218 };
00219 friend struct on_item_put;
00220
00221
00222 struct on_count_put : public ccoll_type::callback_type
00223 {
00224 on_count_put( reduce_type * r );
00225 void on_put( const OTag & otag, const CType & cnt );
00226 #ifdef _DIST_CNC_
00227 void on_done( const OTag & otag, const typename tls_map_type::iterator & i, const int owner );
00228 void on_bcastCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt, const int owner );
00229 void on_gatherCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt );
00230 #endif
00231 private:
00232 reduce_type * m_reduce;
00233 };
00234 friend struct on_count_put;
00235
00236 typename tls_map_type::iterator get( const OTag & tag );
00237 bool try_put_value( const OTag & otag, const typename tls_map_type::iterator & i );
00238 #ifdef _DIST_CNC_
00239 bool send_count( const OTag & otag, const typename tls_map_type::iterator & i, const int to, const bool always );
00240 static int my_parent_for_root( const int root );
00241 void try_send_or_put_value( const OTag & otag, const typename tls_map_type::iterator & i );
00242 void try_send_or_put_all();
00243
00244
00245
00246 int bcast( CnC::serializer * ser, int root );
00247 int bcast_count( const OTag & tag, const CType & val, const int root );
00248 virtual void recv_msg( serializer * ser );
00249 #endif
00250 icoll_type & m_in;
00251 ccoll_type & m_cnt;
00252 ocoll_type & m_out;
00253 on_item_put * m_ondata;
00254 on_count_put * m_oncount;
00255 ReduceOp m_op;
00256 Select m_sel;
00257 tls_map_type m_reductions;
00258 const IType m_identity;
00259 #ifdef _DIST_CNC_
00260 tbb::atomic< int > m_nDones;
00261 bool m_alldone;
00262 #endif
00263 };
00264
00265
00266
00267
00268 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00269 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::red_tls::red_tls()
00270 {
00271 nreduced = 0;
00272 n = M1;
00273 status = LOCAL;
00274 #ifdef _DIST_CNC_
00275 nCounts = -1;
00276 nValues = -1;
00277 owner = -1;
00278 #endif
00279 }
00280
00281
00282
00283 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00284 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::red_tls::red_tls( const IType & v )
00285 {
00286 val = v;
00287 nreduced = 0;
00288 n = M1;
00289 status = LOCAL;
00290 #ifdef _DIST_CNC_
00291 nCounts = -1;
00292 nValues = -1;
00293 owner = -1;
00294 #endif
00295 }
00296
00297
00298
00299
00300 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00301 template< typename Ctxt >
00302 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::reduction( CnC::context< Ctxt > & ctxt, const std::string & name,
00303 icoll_type & in, ccoll_type & c, ocoll_type & out,
00304 const ReduceOp & red, const IType & identity, const Select & sel )
00305 : CnC::graph( ctxt, name ),
00306 m_in( in ),
00307 m_cnt( c ),
00308 m_out( out ),
00309 m_ondata( new on_item_put( this ) ),
00310 m_oncount( new on_count_put( this ) ),
00311 m_op( red ),
00312 m_sel( sel ),
00313 m_reductions(),
00314 m_identity( identity )
00315 {
00316
00317 m_in.on_put( m_ondata );
00318 m_cnt.on_put( m_oncount );
00319 #ifdef _DIST_CNC_
00320 m_alldone = false;
00321 m_nDones = -1;
00322 #endif
00323 }
00324
00325
00326
00327 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00328 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::~reduction()
00329 {
00330
00331
00332 }
00333
00334
00335
00336
00337
00338 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00339 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::flush()
00340 {
00341 #ifdef _DIST_CNC_
00342 if( Internal::distributor::active() ) {
00343 if( trace_level() > 0 ) {
00344 Internal::Speaker oss(std::cout);
00345 oss << this->name() << " flush: bcast ALLDONE";
00346 }
00347 CNC_ASSERT( CnC::tuner_base::myPid() == 0 );
00348 m_alldone = true;
00349 CnC::serializer * ser = this->new_serializer();
00350 (*ser) & DISTRED::ALLDONE;
00351 m_nDones = 1000;
00352 m_nDones += bcast( ser, 0 );
00353 m_nDones -= 999;
00354 try_send_or_put_all();
00355 } else
00356 #endif
00357 for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) {
00358 m_out.put( i->first, i->second.val.combine( m_op ) );
00359 }
00360 }
00361
00362 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00363 typename reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::tls_map_type::iterator
00364 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::get( const OTag & tag )
00365 {
00366 typename tls_map_type::iterator i = m_reductions.find( tag );
00367 if( i == m_reductions.end() ) {
00368 i = m_reductions.insert( typename tls_map_type::value_type( tag, red_tls() ) ).first;
00369 }
00370 return i;
00371 }
00372
00373
00374
00375 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00376 bool reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_put_value( const OTag & otag, const typename tls_map_type::iterator & i )
00377 {
00378 if( trace_level() > 2 ) {
00379 Internal::Speaker oss(std::cout);
00380 oss << this->name() << " try_put_value [" << otag << "]"
00381 #ifdef _DIST_CNC_
00382 << " nValues " << i->second.nValues << " status " << i->second.status
00383 #endif
00384 ;
00385 }
00386 if( i->second.nreduced == i->second.n ) {
00387
00388 mutex_type::scoped_lock _lock( i->second.mtx );
00389 if( i->second.status != DONE ) {
00390 this->m_out.put( otag, i->second.val.combine( this->m_op ) );
00391 i->second.status = DONE;
00392 return true;
00393 }
00394 }
00395 return false;
00396 }
00397
00398
00399
00400 #ifdef _DIST_CNC_
00401 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00402 bool reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::send_count( const OTag & otag, const typename tls_map_type::iterator & i,
00403 const int to, const bool always )
00404 {
00405
00406
00407 if( to < 0 || to == CnC::tuner_base::myPid() ) return false;
00408 CType _cnt = i->second.nreduced.fetch_and_store( 0 );
00409 if( always || _cnt > 0 ) {
00410 CnC::serializer * ser = this->new_serializer();
00411 (*ser) & DISTRED::GATHERCOUNT & otag & _cnt;
00412 if( trace_level() > 2 ) {
00413 Internal::Speaker oss(std::cout);
00414 oss << this->name() << " send GATHERCOUNT [" << otag << "] " << _cnt << " to " << to;
00415 }
00416 this->send_msg( ser, to );
00417 return true;
00418 }
00419 return false;
00420 }
00421
00422
00423
00424 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00425 int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::my_parent_for_root( const int root )
00426 {
00427 const int mpid = CnC::tuner_base::myPid();
00428 const int nps = CnC::tuner_base::numProcs();
00429 CNC_ASSERT( root != mpid );
00430 int _p = ( ( ( mpid >= root ? ( mpid - root ) : ( mpid + nps - root ) ) - 1 ) / 2 ) + root;
00431 return _p % nps;
00432 };
00433
00434
00435
00436 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00437 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_send_or_put_value( const OTag & otag, const typename tls_map_type::iterator & i )
00438 {
00439 if( trace_level() > 2 ) {
00440 Internal::Speaker oss(std::cout);
00441 oss << this->name() << " try_send_or_put_value [" << otag << "] nValues " << i->second.nValues << " status " << i->second.status;
00442 }
00443 if( i->second.nValues.fetch_and_decrement() == 1 ) {
00444 if( i->second.owner == CnC::tuner_base::myPid() ) {
00445 if( i->second.status == FINISH ) {
00446 CNC_ASSERT( i->second.nreduced == i->second.n || i->second.n == M1 );
00447 CNC_ASSERT( i->second.nValues == 0 && i->second.status == FINISH );
00448 i->second.nreduced = i->second.n;
00449 try_put_value( otag, i );
00450 }
00451 } else {
00452 CnC::serializer * ser = this->new_serializer();
00453 IType _val( i->second.val.combine( this->m_op ) );
00454 (*ser) & DISTRED::VALUE & otag & _val;
00455 const int to = my_parent_for_root( i->second.owner );
00456 if( trace_level() > 2 ) {
00457 Internal::Speaker oss(std::cout);
00458 oss << this->name() << " send VALUE [" << otag << "] " << _val << " to " << to;
00459 }
00460 this->send_msg( ser, to );
00461 i->second.status = DONE;
00462 }
00463 }
00464 }
00465
00466
00467
00468 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00469 int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::bcast( CnC::serializer * ser, int root )
00470 {
00471 const int mpid = CnC::tuner_base::myPid();
00472 const int nps = CnC::tuner_base::numProcs();
00473 int _r1 = ( ( mpid >= root ? ( mpid - root ) : ( mpid + nps - root ) ) + 1 ) * 2 - 1;
00474 if( _r1 < nps ) {
00475 if( _r1 < nps-1 ) {
00476
00477 _r1 = (root + _r1) % nps;
00478 int _recvrs[2] = { _r1, (_r1+1)%nps };
00479 this->bcast_msg( ser, _recvrs, 2 );
00480 return 2;
00481 } else {
00482
00483 _r1 = (root + _r1) % nps;
00484 this->send_msg( ser, _r1 );
00485 return 1;
00486 }
00487 }
00488 delete ser;
00489
00490 return 0;
00491 }
00492
00493
00494
00495
00496
00497
00498 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00499 int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::bcast_count( const OTag & tag, const CType & val, const int root )
00500 {
00501 CnC::serializer * ser = this->new_serializer();
00502 (*ser) & ( val != M1 ? DISTRED::BCASTCOUNT : DISTRED::DONE ) & tag & root;
00503 if( val != M1 ) (*ser) & val;
00504 int _c = bcast( ser, root );
00505 if( _c && trace_level() > 2 ) {
00506 Internal::Speaker oss(std::cout);
00507 oss << this->name() << " bcast " << (val != M1 ? " BCASTCOUNT [" : " DONE [") << tag << "]";
00508 }
00509 return _c;
00510 };
00511
00512
00513
00514 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00515 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_send_or_put_all()
00516 {
00517 CNC_ASSERT( m_alldone );
00518 if( trace_level() > 2 ) {
00519 Internal::Speaker oss(std::cout);
00520 oss << this->name() << " try_send_or_put_all " << m_nDones;
00521 }
00522 if( --m_nDones == 0 ) {
00523 if( CnC::tuner_base::myPid() == 0 ) {
00524 for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) {
00525 m_out.put( i->first, i->second.val.combine( m_op ) );
00526 }
00527 } else {
00528 int _n = m_reductions.size();
00529 CnC::serializer * ser = this->new_serializer();
00530 (*ser) & DISTRED::ALLVALUES & _n;
00531 for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) {
00532 IType _val( i->second.val.combine( this->m_op ) );
00533 (*ser) & i->first & _val;
00534 }
00535 const int to = my_parent_for_root( 0 );
00536 if( trace_level() > 2 ) {
00537 Internal::Speaker oss(std::cout);
00538 oss << this->name() << " send ALLVALUES to " << to;
00539 }
00540 this->send_msg( ser, to );
00541 }
00542 }
00543 }
00544
00545
00546
00547 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00548 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::recv_msg( serializer * ser )
00549 {
00550 char _op;
00551 (*ser) & _op;
00552
00553 switch( _op ) {
00554 case DISTRED::GATHERCOUNT : {
00555 OTag _tag;
00556 CType _cnt;
00557 (*ser) & _tag & _cnt;
00558 if( trace_level() > 2 ) {
00559 Internal::Speaker oss(std::cout);
00560 oss << this->name() << " recvd GATHERCOUNT [" << _tag << "] " << _cnt;
00561 }
00562 m_oncount->on_gatherCount( _tag, get( _tag ), _cnt );
00563 break;
00564 }
00565 case DISTRED::BCASTCOUNT : {
00566 OTag _tag;
00567 CType _cnt;
00568 int _owner;
00569 (*ser) & _tag & _owner & _cnt ;
00570 if( trace_level() > 2 ) {
00571 Internal::Speaker oss(std::cout);
00572 oss << this->name() << " recvd BCASTCOUNT [" << _tag << "] " << _cnt << " " << _owner;
00573 }
00574 m_oncount->on_bcastCount( _tag, get( _tag ), _cnt, _owner );
00575 break;
00576 }
00577 case DISTRED::VALUE : {
00578 OTag _tag;
00579 IType _val;
00580 (*ser) & _tag & _val;
00581 if( trace_level() > 2 ) {
00582 Internal::Speaker oss(std::cout);
00583 oss << this->name() << " recvd VALUE [" << _tag << "] " << _val;
00584 }
00585 m_ondata->on_value( _tag, get( _tag ), _val );
00586 break;
00587 }
00588 case DISTRED::DONE : {
00589 OTag _tag;
00590 int _owner;
00591 (*ser) & _tag & _owner;
00592 if( trace_level() > 2 ) {
00593 Internal::Speaker oss(std::cout);
00594 oss << this->name() << " recvd DONE [" << _tag << "] " << _owner;
00595 }
00596 m_oncount->on_done( _tag, get( _tag ), _owner );
00597 break;
00598 }
00599 case DISTRED::ALLDONE : {
00600 if( trace_level() > 2 ) {
00601 Internal::Speaker oss(std::cout);
00602 oss << this->name() << " recvd ALLDONE";
00603 }
00604 m_alldone = true;
00605 CnC::serializer * ser = this->new_serializer();
00606 (*ser) & DISTRED::ALLDONE;
00607 m_nDones = 1000;
00608 m_nDones += bcast( ser, 0 );
00609 m_nDones -= 999;
00610 try_send_or_put_all();
00611 break;
00612 }
00613 default:
00614 case DISTRED::ALLVALUES : {
00615 CNC_ASSERT( m_alldone = true );
00616 int _n;
00617 (*ser) & _n;
00618 if( trace_level() > 2 ) {
00619 Internal::Speaker oss(std::cout);
00620 oss << this->name() << " recvd ALLVALUES " << _n;
00621 }
00622 while( _n-- ) {
00623 OTag _tag;
00624 IType _val;
00625 (*ser) & _tag & _val;
00626 const typename tls_map_type::iterator i = get( _tag );
00627 m_ondata->add_value( i, _val );
00628 }
00629 try_send_or_put_all();
00630 break;
00631 }
00632 CNC_ASSERT_MSG( false, "Unexpected message tag in JOIN" );
00633 }
00634 }
00635
00636 #endif // _DIST_CNC_
00637
00638
00639
00640
00641 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00642 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_item_put( reduce_type * r )
00643 : m_reduce( r )
00644 {}
00645
00646
00647
00648 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00649 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::add_value( const typename tls_map_type::iterator & i, const IType & val ) const
00650 {
00651 bool _exists;
00652 IType & _rval = i->second.val.local( _exists );
00653 _rval = m_reduce->m_op( _exists ? _rval : m_reduce->m_identity, val );
00654 }
00655
00656
00657
00658 #ifdef _DIST_CNC_
00659 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00660 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_value( const OTag & otag, const typename tls_map_type::iterator & i, const IType & val )
00661 {
00662 TRACE( "::on_value" );
00663 add_value( i, val );
00664 m_reduce->try_send_or_put_value( otag, i );
00665 }
00666 #endif
00667
00668
00669
00670 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00671 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_put( const ITag & tag, const IType & val )
00672 {
00673 TRACE( "::on_data" );
00674 OTag otag;
00675 if( m_reduce->m_sel( tag, otag ) ) {
00676 typename tls_map_type::iterator i = m_reduce->get( otag );
00677 add_value( i, val );
00678 CType _n = ++i->second.nreduced;
00679 if( m_reduce->trace_level() > 0 ) {
00680 Internal::Speaker oss(std::cout);
00681 oss << m_reduce->name() << " on_put [";
00682 cnc_format( oss, tag );
00683 oss << "] for [" << otag << "] " << val << " nred now " << _n << "/" << i->second.n
00684 #ifdef _DIST_CNC_
00685 << " owner " << i->second.owner << " status " << i->second.status << " nCounts " << i->second.nCounts
00686 #endif
00687 ;
00688 }
00689 #ifdef _DIST_CNC_
00690
00691 if( Internal::distributor::active() ) {
00692
00693
00694
00695
00696 if( i->second.status >= CNT_AVAILABLE ) {
00697 if( CnC::tuner_base::myPid() == i->second.owner ) {
00698
00699 if( _n == i->second.n && i->second.nCounts <= 0 ) m_reduce->m_oncount->on_done( otag, i, CnC::tuner_base::myPid() );
00700 } else {
00701
00702
00703
00704
00705
00706 m_reduce->send_count( otag, i, i->second.owner, false );
00707 }
00708 }
00709
00710 } else
00711 #endif
00712 m_reduce->try_put_value( otag, i );
00713 } else if( m_reduce->trace_level() > 0 ) {
00714 Internal::Speaker oss(std::cout);
00715 oss << m_reduce->name() << " [";
00716 cnc_format( oss, tag );
00717 oss << "] was not selected";
00718 }
00719 }
00720
00721
00722
00723
00724 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00725 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_count_put( reduce_type * r )
00726 : m_reduce( r )
00727 {}
00728
00729
00730
00731 #ifdef _DIST_CNC_
00732 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00733 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_done( const OTag & otag, const typename tls_map_type::iterator & i, const int owner )
00734 {
00735 TRACE( "::on_done" );
00736 if( m_reduce->trace_level() > 2 ) {
00737 Internal::Speaker oss(std::cout);
00738 oss << m_reduce->name() << " on_done for [" << otag << "] nred now " << i->second.nreduced << "/" << i->second.n
00739 << " owner " << i->second.owner << " status " << i->second.status;
00740 }
00741 i->second.owner = owner;
00742 if( owner != CnC::tuner_base::myPid() || i->second.status.compare_and_swap( BCAST_DONE, CNT_AVAILABLE ) == CNT_AVAILABLE ) {
00743
00744 i->second.nValues = 1000;
00745 i->second.nValues += m_reduce->bcast_count( otag, M1, owner );
00746 int _tmp = i->second.status.compare_and_swap( FINISH, BCAST_DONE );
00747 CNC_ASSERT( owner != CnC::tuner_base::myPid() || _tmp == BCAST_DONE );
00748
00749 i->second.nValues -= 999;
00750
00751 m_reduce->try_send_or_put_value( otag, i );
00752 }
00753 }
00754
00755
00756
00757 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00758 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_bcastCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt, const int owner )
00759 {
00760 TRACE( "::on_bcast" );
00761 CNC_ASSERT( cnt != M1 );
00762 i->second.owner = owner;
00763 i->second.n = cnt;
00764
00765 int _tmp = i->second.status.compare_and_swap( CNT_AVAILABLE, LOCAL );
00766 CNC_ASSERT( _tmp == LOCAL );
00767
00768 i->second.nCounts = 1;
00769 i->second.nCounts += m_reduce->bcast_count( otag, cnt, owner );
00770
00771 if( --i->second.nCounts == 0 && owner != CnC::tuner_base::myPid() ) {
00772 m_reduce->send_count( otag, i, my_parent_for_root( owner ), true );
00773 }
00774 }
00775
00776
00777
00778 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00779 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_gatherCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt )
00780 {
00781 TRACE( "::on_gather" );
00782 CNC_ASSERT( cnt != M1 );
00783 i->second.nreduced += cnt;
00784 if( m_reduce->trace_level() > 2 ) {
00785 Internal::Speaker oss(std::cout);
00786 oss << m_reduce->name() << " on_gatherCount [" << otag << "] now " << i->second.nreduced << "/" << i->second.n << " nCounts " << i->second.nCounts;
00787 }
00788
00789 if( --i->second.nCounts <= 0 ) {
00790 if( i->second.owner == CnC::tuner_base::myPid() ) {
00791 if( i->second.n == i->second.nreduced ) {
00792 on_done( otag, i, CnC::tuner_base::myPid() );
00793 }
00794 } else {
00795 CNC_ASSERT( i->second.nCounts == 0 );
00796 m_reduce->send_count( otag, i, my_parent_for_root( i->second.owner ), true );
00797 }
00798 }
00799 }
00800 #endif // _DIST_CNC_
00801
00802
00803
00804 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00805 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_put( const OTag & otag, const CType & cnt )
00806 {
00807 TRACE( "::on_cnt" );
00808 typename tls_map_type::iterator i = m_reduce->get( otag );
00809
00810 #ifdef _DIST_CNC_
00811 if( Internal::distributor::active() ) {
00812 if( cnt != M1 ) {
00813 i->second.n = cnt;
00814
00815 if( ! m_reduce->try_put_value( otag, i ) ) {
00816 on_bcastCount( otag, i, cnt, CnC::tuner_base::myPid() );
00817 }
00818 } else {
00819
00820
00821 int _tmp = i->second.status.compare_and_swap( CNT_AVAILABLE, LOCAL );
00822 CNC_ASSERT( _tmp == LOCAL );
00823 i->second.nCounts = 0;
00824 on_done( otag, i, CnC::tuner_base::myPid() );
00825 }
00826 } else
00827 #endif
00828 {
00829 if( cnt != M1 ) i->second.n = cnt;
00830 else i->second.n = i->second.nreduced;
00831 m_reduce->try_put_value( otag, i );
00832 }
00833 }
00834
00835
00836
00837 }
00838
00839
00840
00841 #endif //_CnC_REDUCE_H_