00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #ifndef _CnC_JOIN_H_
00033 #define _CnC_JOIN_H_
00034
00035 #include <cnc/internal/cnc_stddef.h>
00036 #include <tbb/queuing_mutex.h>
00037 #include <set>
00038
00039 namespace CnC
00040 {
00041
00042 template< typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC > class join;
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063 template< typename Ctxt, typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC >
00064 graph * make_join_graph( CnC::context< Ctxt > & ctxt, const std::string & name,
00065 CnC::tag_collection< TagA, TunerA > & a,
00066 CnC::tag_collection< TagB, TunerB > & b,
00067 CnC::tag_collection< TagC, TunerC > & c )
00068 {
00069 return new join< TagA, TunerA, TagB, TunerB, TagC, TunerC >( ctxt, name, a, b, c );
00070 }
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090 template< typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC >
00091 class join : public CnC::graph
00092 {
00093 typedef tbb::queuing_mutex mutex_type;
00094 typedef std::set< TagA > seta_type;
00095 typedef std::set< TagB > setb_type;
00096 typedef CnC::tag_collection< TagA, TunerA > colla_type;
00097 typedef CnC::tag_collection< TagB, TunerB > collb_type;
00098 typedef CnC::tag_collection< TagC, TunerC > collc_type;
00099 typedef join< TagA, TunerA, TagB, TunerB, TagC, TunerC > join_type;
00100 enum { JOINA = 'A', JOINB = 'B' };
00101 friend struct on_a_put;
00102 friend struct on_b_put;
00103
00104 public:
00105 template< typename Ctxt >
00106 join( CnC::context< Ctxt > & ctxt, const std::string & name, colla_type & a, collb_type & b, collc_type & c )
00107 : CnC::graph( ctxt, name ),
00108 m_a( a ),
00109 m_b( b ),
00110 m_c( c ),
00111 m_seta(),
00112 m_setb(),
00113 m_mutex()
00114 {
00115
00116 m_a.on_put( new on_a_put( this ) );
00117 m_b.on_put( new on_b_put( this ) );
00118 }
00119
00120
00121 private:
00122
00123 template< typename Tag, typename Set, typename joiner >
00124 void join_one( const Tag & t, const Set & set, const joiner & j )
00125 {
00126
00127 for( typename Set::iterator i=set.begin(); i!=set.end(); ++i ) {
00128 m_c.put( j( t, *i ) );
00129 }
00130 }
00131
00132
00133 struct join_ba
00134 {
00135 TagC operator()( const TagB & b, const TagA & a ) const {
00136 return TagC( a, b );
00137 }
00138 };
00139
00140 struct join_ab
00141 {
00142 TagC operator()( const TagA & a, const TagB & b ) const {
00143 return TagC( a, b );
00144 }
00145 };
00146 typedef join_ab join_ab_type;
00147 typedef join_ba join_ba_type;
00148
00149 template< typename Tag >
00150 void send_tag( const Tag & tag, const char op )
00151 {
00152 #ifdef _DIST_CNC_
00153 if( Internal::distributor::active() ) {
00154 if( this->trace_level() > 2 ) {
00155 Internal::Speaker oss(std::cout);
00156 oss << this->name() << "::send_tag JOIN" << (op==JOINA?"A":"B") << " [";
00157 cnc_format( oss, tag );
00158 oss << "]";
00159 }
00160 CnC::serializer * ser = this->new_serializer();
00161 (*ser) & op & tag;
00162 this->bcast_msg( ser );
00163 }
00164 #endif
00165 }
00166
00167
00168 struct on_a_put : public colla_type::callback_type
00169 {
00170 on_a_put( join_type * j ) : m_join( j ) {}
00171
00172 void on_put( const TagA & tag )
00173 {
00174 if( m_join->trace_level() > 0 ) {
00175 Internal::Speaker oss(std::cout);
00176 oss << m_join->name() << "::on_put_a <";
00177 cnc_format( oss, tag );
00178 oss << ">";
00179 }
00180 m_join->send_tag( tag, JOINA );
00181 mutex_type::scoped_lock _lock( m_join->m_mutex );
00182 m_join->join_one( tag, m_join->m_setb, join_ab_type() );
00183 m_join->m_seta.insert( tag );
00184 }
00185 private:
00186 join_type * m_join;
00187 };
00188
00189
00190 struct on_b_put : public collb_type::callback_type
00191 {
00192 on_b_put( join_type * j ) : m_join( j ) {}
00193
00194 void on_put( const TagB & tag )
00195 {
00196 if( m_join->trace_level() > 0 ) {
00197 Internal::Speaker oss(std::cout);
00198 oss << m_join->name() << "::on_put_b <";
00199 cnc_format( oss, tag );
00200 oss << ">";
00201 }
00202 m_join->send_tag( tag, JOINB );
00203 mutex_type::scoped_lock _lock( m_join->m_mutex );
00204 m_join->join_one( tag, m_join->m_seta, join_ba_type() );
00205 m_join->m_setb.insert( tag );
00206 }
00207 private:
00208 join_type * m_join;
00209 };
00210
00211 #ifdef _DIST_CNC_
00212 virtual void recv_msg( serializer * ser )
00213 {
00214 char _op;
00215 (*ser) & _op;
00216 switch( _op ) {
00217 case JOINA : {
00218 TagA _tag;
00219 (*ser) & _tag;
00220 if( this->trace_level() > 2 ) {
00221 Internal::Speaker oss(std::cout);
00222 oss << this->name() << "::recv_msg JOINA <";
00223 cnc_format( oss, _tag );
00224 oss << ">";
00225 }
00226 mutex_type::scoped_lock _lock( m_mutex );
00227 join_one( _tag, m_setb, join_ab_type() );
00228 break;
00229 }
00230 case JOINB : {
00231 CNC_ASSERT_MSG( _op == JOINB, "Unexpected message tag" );
00232 TagB _tag;
00233 (*ser) & _tag;
00234 if( this->trace_level() > 2 ) {
00235 Internal::Speaker oss(std::cout);
00236 oss << this->name() << "::recv_msg JOINB <";
00237 cnc_format( oss, _tag );
00238 oss << ">";
00239 }
00240 mutex_type::scoped_lock _lock( m_mutex );
00241 join_one( _tag, m_seta, join_ba_type() );
00242 break;
00243 }
00244 default :
00245 CNC_ASSERT_MSG( false, "Unexpected message tag in JOIN" );
00246 }
00247 }
00248 #endif
00249
00250 colla_type & m_a;
00251 collb_type & m_b;
00252 collc_type & m_c;
00253 mutex_type m_mutex;
00254 seta_type m_seta;
00255 setb_type m_setb;
00256 };
00257
00258
00259 }
00260
00261 #endif //_CnC_JOIN_H_