CnC
 All Classes Namespaces Functions Variables Typedefs Enumerator Friends
join.h
00001 /* *******************************************************************************
00002  *  Copyright (c) 2007-2014, Intel Corporation
00003  *
00004  *  Redistribution and use in source and binary forms, with or without
00005  *  modification, are permitted provided that the following conditions are met:
00006  *
00007  *  * Redistributions of source code must retain the above copyright notice,
00008  *    this list of conditions and the following disclaimer.
00009  *  * Redistributions in binary form must reproduce the above copyright
00010  *    notice, this list of conditions and the following disclaimer in the
00011  *    documentation and/or other materials provided with the distribution.
00012  *  * Neither the name of Intel Corporation nor the names of its contributors
00013  *    may be used to endorse or promote products derived from this software
00014  *    without specific prior written permission.
00015  *
00016  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017  *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018  *  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00019  *  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
00020  *  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00021  *  DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
00022  *  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00023  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
00024  *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00025  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00026  ********************************************************************************/
00027 
00028 /*
00029   Joins/corss products for CnC, provided as a re-usable graph.
00030 */
00031 
00032 #ifndef _CnC_JOIN_H_
00033 #define _CnC_JOIN_H_
00034 
00035 #include <cnc/internal/cnc_stddef.h>
00036 #include <tbb/queuing_mutex.h>
00037 #include <set>
00038 
00039 namespace CnC
00040 {
00041 
00042     template< typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC > class join;
00043 
00044     /// Returns a graph that joins 2 tag-collections into a third one.
00045     /// Continuously produces the join product of the 2 input tag-collections
00046     /// and puts it into the output tag-collection.
00047     ///
00048     /// Accepts any types; only requires that a an output tag-type
00049     /// provides a constructor which accepts (taga, tagb) to construct
00050     /// a joined tag from tag a and b.
00051     ///
00052     /// On distributed memory duplicate output tags might be produced.
00053     /// To avoid duplicate step execution use tag-preservation on the
00054     /// output tag-collection (CnC::preserve_tuner) and suitable
00055     /// distribution functions on the prescribed step-collection
00056     /// (CnC::step_tuner).
00057     ///
00058     /// \param ctxt  the context to which the graph belongs
00059     /// \param name  the name of this join graph instance
00060     /// \param a     first input collection
00061     /// \param b     second input collection
00062     /// \param c     output collection
00063     template< typename Ctxt, typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC >
00064     graph * make_join_graph( CnC::context< Ctxt > & ctxt, const std::string & name, 
00065                              CnC::tag_collection< TagA, TunerA > & a,
00066                              CnC::tag_collection< TagB, TunerB > & b,
00067                              CnC::tag_collection< TagC, TunerC > & c )
00068     {
00069         return new join< TagA, TunerA, TagB, TunerB, TagC, TunerC >( ctxt, name, a, b, c );
00070     }
00071 
00072     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00073     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00074 
00075     // The actual join graph. A complex template construct; use
00076     // make_join_graph to create a join graph. Only the constructor is
00077     // public, everything else is private ("hidden").
00078     //
00079     // When ever a new tag arrives, it gets joined with all existing
00080     // tags in the other collection.  The actual join is protected by a
00081     // mutex. This makes it safe on a single process.
00082     //
00083     // We cannot rely on the collection to keep tags because we
00084     // currently have no thread-safe iteration facility. Hence we use
00085     // std::set to keep them privately in the graph.
00086     //
00087     // We need explicit handling of distributed memory. We keep the
00088     // tags locally where they are put and just send a bcast message
00089     // when a new tag arrives. This can produce duplicate output tags.
00090     template< typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC >
00091     class join : public CnC::graph
00092     {
00093         typedef tbb::queuing_mutex mutex_type;
00094         typedef std::set< TagA > seta_type;
00095         typedef std::set< TagB > setb_type;
00096         typedef CnC::tag_collection< TagA, TunerA > colla_type;
00097         typedef CnC::tag_collection< TagB, TunerB > collb_type;
00098         typedef CnC::tag_collection< TagC, TunerC > collc_type;
00099         typedef join< TagA, TunerA, TagB, TunerB, TagC, TunerC > join_type;
00100         enum { JOINA = 'A', JOINB = 'B' };
00101         friend struct on_a_put;
00102         friend struct on_b_put;
00103 
00104     public:
00105         template< typename Ctxt >
00106         join( CnC::context< Ctxt > & ctxt, const std::string & name, colla_type & a, collb_type & b, collc_type & c )
00107             : CnC::graph( ctxt, name ),
00108               m_a( a ),
00109               m_b( b ),
00110               m_c( c ),
00111               m_seta(),
00112               m_setb(),
00113               m_mutex()
00114         {
00115             // callback objects must persist the lifetime of collections
00116             m_a.on_put( new on_a_put( this ) );
00117             m_b.on_put( new on_b_put( this ) );
00118         }
00119 
00120         // the implementation is "hidden"
00121     private:
00122         // join a given tag with all existing tags in the other set
00123         template< typename Tag, typename Set, typename joiner >
00124         void join_one( const Tag & t, const Set & set, const joiner & j )
00125         {
00126             // in a more advanced version we can go parallel by using parallel_for or a range
00127             for( typename Set::iterator i=set.begin(); i!=set.end(); ++i ) {
00128                 m_c.put( j( t, *i ) );
00129             }
00130         }
00131 
00132         // join a b-tag with a a-tag
00133         struct join_ba
00134         { 
00135             TagC operator()( const TagB & b, const TagA & a ) const {
00136                 return TagC( a, b );
00137             }
00138         };
00139         // join a a-tag with a b-tag
00140         struct join_ab
00141         { 
00142             TagC operator()( const TagA & a, const TagB & b ) const {
00143                 return TagC( a, b );
00144             }
00145         };
00146         typedef join_ab join_ab_type;
00147         typedef join_ba join_ba_type;
00148 
00149         template< typename Tag >
00150         void send_tag( const Tag & tag, const char op )
00151         {
00152 #ifdef _DIST_CNC_
00153             if( Internal::distributor::active() ) {
00154                 if( this->trace_level() > 2 ) {
00155                     Internal::Speaker oss(std::cout);
00156                     oss << this->name() << "::send_tag JOIN" << (op==JOINA?"A":"B") << " [";
00157                     cnc_format( oss, tag );
00158                     oss << "]";
00159                 }
00160                 CnC::serializer * ser = this->new_serializer();
00161                 (*ser) & op & tag;
00162                 this->bcast_msg( ser );
00163             }
00164 #endif
00165         }
00166 
00167         // callback for collection a
00168         struct on_a_put : public colla_type::callback_type
00169         {
00170             on_a_put( join_type * j  ) : m_join( j ) {}
00171 
00172             void on_put( const TagA & tag )
00173             {
00174                 if( m_join->trace_level() > 0 ) {
00175                     Internal::Speaker oss(std::cout);
00176                     oss << m_join->name() << "::on_put_a <";
00177                     cnc_format( oss, tag );
00178                     oss << ">";
00179                 }
00180                 m_join->send_tag( tag, JOINA );
00181                 mutex_type::scoped_lock _lock( m_join->m_mutex );
00182                 m_join->join_one( tag, m_join->m_setb, join_ab_type() );
00183                 m_join->m_seta.insert( tag );
00184             }
00185         private:
00186             join_type * m_join;
00187         };
00188 
00189         // callback for collection b
00190         struct on_b_put : public collb_type::callback_type
00191         {
00192             on_b_put( join_type * j  ) : m_join( j ) {}
00193 
00194             void on_put( const TagB & tag )
00195             {
00196                 if( m_join->trace_level() > 0 ) {
00197                     Internal::Speaker oss(std::cout);
00198                     oss << m_join->name() << "::on_put_b <";
00199                     cnc_format( oss, tag );
00200                     oss << ">";
00201                 }
00202                 m_join->send_tag( tag, JOINB );
00203                 mutex_type::scoped_lock _lock( m_join->m_mutex );
00204                 m_join->join_one( tag, m_join->m_seta, join_ba_type() );
00205                 m_join->m_setb.insert( tag );
00206             }
00207         private:
00208             join_type * m_join;
00209         };
00210         
00211 #ifdef _DIST_CNC_
00212         virtual void recv_msg( serializer * ser )
00213         {
00214             char _op;
00215             (*ser) & _op;
00216             switch( _op ) {
00217             case JOINA : {
00218                 TagA _tag;
00219                 (*ser) & _tag;
00220                 if( this->trace_level() > 2 ) {
00221                     Internal::Speaker oss(std::cout);
00222                     oss << this->name() << "::recv_msg JOINA <";
00223                     cnc_format( oss, _tag );
00224                     oss << ">";
00225                 }
00226                 mutex_type::scoped_lock _lock( m_mutex );
00227                 join_one( _tag, m_setb, join_ab_type() );
00228                 break;
00229             }
00230             case JOINB : {
00231                 CNC_ASSERT_MSG( _op == JOINB, "Unexpected message tag" );
00232                 TagB _tag;
00233                 (*ser) & _tag;
00234                 if( this->trace_level() > 2 ) {
00235                     Internal::Speaker oss(std::cout);
00236                     oss << this->name() << "::recv_msg JOINB <";
00237                     cnc_format( oss, _tag );
00238                     oss << ">";
00239                 }
00240                 mutex_type::scoped_lock _lock( m_mutex );
00241                 join_one( _tag, m_seta, join_ba_type() );
00242                 break;
00243             }
00244             default :
00245                 CNC_ASSERT_MSG( false, "Unexpected message tag in JOIN" );
00246             }
00247         }
00248 #endif
00249 
00250         colla_type & m_a;
00251         collb_type & m_b;
00252         collc_type & m_c;
00253         mutex_type   m_mutex;
00254         seta_type    m_seta;
00255         setb_type    m_setb;
00256     };
00257 
00258 
00259 } // namespace CnC
00260 
00261 #endif //_CnC_JOIN_H_
 All Classes Namespaces Functions Variables Typedefs Enumerator Friends