Re-using CnC graphs (reductions, cross/join...)

CnC programs can be written in a way so that they can be re-used. An example could be a CnC graph which implements matrix-multiplication or a generic reduction. The idea is to reuse such an implementation either multiple times in the same application or in different applications.

On the abstract, every CnC graph has input collections and output collections. The input collections of a given sub-graph must be the output collection of another graph (and/or the environment). Similarly, an output collection of a sub-graph is used as input by another graph (and/or the environment). To make a graph re-usable, all we need is a way of connecting its input and output collections to the rest of the application. The inner of such a re-usable graph is of no interest to the outside - it might be a normal CnC graph or something else. In any case, any communication with the graph is done with the normal semantics of CnC collections. At instantiation time, we simply need to give it the input and output collections that we want it to work on.

Using Sub-Graphs

The CnC distribution ships with a few generic predefined graphs. Some of them are relatively complex template-classes as they aim to be as generic as possible. To facilitate their instantiation they are accompanied with a creator function. It accepts the input/output collections as its argument and returns a parametrized instance of the re-usable graph which is fully wired with the given collections. That's it, now we just use the input/output collections as normal, the sub-graph will automatically "execute" behind the scenes.

Example: Using A Generic Reduction Graph

Here's a simple example (samples/count_words/count_words/count_words.cpp) which uses the CnC reduction to count the occurrences of strings in a file. The global graph consists of a collection for all the lines in the file (blocks), a collection with counts of strings per line (counts_per_line) and the collection with the final count per string (counts). Here's how the instantiation (and wiring) of the graph works by assigning the reduction graph to reduce using CnC::make_reduce_graph:

        : tags( *this, "tags" ),
          blocks( *this, "blocks" ),
          counts_per_block( *this, "counts_per_block" ),
          counts( *this, "counts" ),
          red_counts( *this, "red_counts" ),
          steps( *this, "counter" ),
          reduce( NULL )
        // here we wire the graph/reduction
        reduce = CnC::make_reduce_graph( *this,            // context
                                         "reduce",         // name
                                         counts_per_block, // input collection
                                         red_counts,       // number of items per reduction
                                         counts,           // the final result for each reduction
                                         std::plus<size_type>(), // the reduction operation
                                         size_type(0),     // identity element
                                         // we use a lambda as the selector
                                         // it maps the item to the reduction identified by t.second (the string)
                                         // e.g. it reduces over all blocks
                                         []( const tag_type & t, std::string & _s )->bool{_s=t.second;return true;} );
        tags.prescribes( steps, *this );
        steps.consumes( blocks );
        steps.produces( counts_per_block );
        //CnC::debug::trace( *reduce, 3 );

Providing data as input to the reduction is a normal put. In this example, the step computes the number of occurrences for a given word and line and then just puts the computed value.

// get block/line (tag.first) and determine number of occurances of string (tag.second)
// put result into counts_per_block
int counter::execute( const tag_type & t, cw_context & ctxt ) const
    std::string _str;
    ctxt.blocks.get( t.first, _str );
    const size_type _sz = _str.size();
    size_type _pos = -1;
    size_type _cnt = 0;
    while( ( _pos = _str.find( t.second, _pos+1 ) ) < _sz ) ++_cnt;
    // always to results, even if 0
    // this allows us to determine the number of reduced items in main
    ctxt.counts_per_block.put( t, _cnt );
    return 0;

Similarly, using the result of the reduction is a normal get on the graph's output collection. Like in this example, we can also iterate on the output collection (in safe state only):

    // iterate and write out all keys/value pairs
    for( auto i = ctxt.counts.begin(); i != ctxt.counts.end(); ++i ) {
        std::cout << i->first << " \t " << *i->second << std::endl;

Please see samples/count_words/count_words/count_words.cpp for the details of the code. You will realize that using the reduction (created by CnC::make_reduce_graph) requires providing the count of items that will be reduced in each reduction. Some might think that this is a strange requirement. In fact, it's a semantic requirement; similar information is required by all reduction implementations in other languages as well. Separating it from providing the data actually increases flexibility and the opportunity for asynchronous operation.

What If The Number Of Reduced Items Is Unknown?

Of course there are cases in which the exact number of reduced items is never known until finished. So instead of knowing the number of reduced items we know when the reduction can be completed. For such cases the reduction allows you to provide a done-flag, either for all reductions in the reduce-graph (by calling flush on the reduce-graph) or for individual reductions (by putting -1 as the count).

An example is counting all words in a file (rather than a give set of words), see samples/count_all_words/count_all_words/count_all_words.cpp. Because we do not know the words we count, there is no way to say how many contributions there will be. So when the processing is done, we finalize the reduction by calling flush():

    // A reduction needs number of reduced items.
    // We do not know the count in advance.
    // We do not even know which words are going to show up.
    // We have to wait until processing is done
    // and then tell the reduction it can do the final pass

Please refer to CnC::make_reduce_graph for details on using CnC reductions.


An obvious application for the reduction is of course the popular map-reduce paradigm. The above example contains almost everything needed to write a mapreduce framework based on CnC. Such a CnC based framework combines the ease of mapreduce with the flexibility of CnC as it allows naturally embedding mapreduce algorithms into CnC programs without introducing artificial constraints. As a proof of concept we implemented a mapreduce_context (samples/count_all_words/mapreduce/mapreduce.h) and use it to implement the same algorithm (count-all-words). To use the mapreduce_context you only need to provide the map-operation (working on a stream like other mapreduce frameworks do it), the reduce operation and then put the data files (see samples/count_all_words/mapreduce/mapreduce.cpp).

Connecting Several Graphs

The above examples use a single graph to which they input data and take its output at the end. A more interesting case would be to connect two graphs together and let the asynchronous CnC semantics play nicely. Let's sum the entries of a 2d matrix using 2 reductions: the first reduction computes the sums for each row. We then feed this output (the sums of each row) into a second reduction which then sums those into a single value. We don't need barriers or anything like this. Following the CnC spirit, we just declare the dependencies - and everything can happen asynchronously. We just let the same item-collection (sum1) serve as the output for the first reduction and as the input for the second. This implies that as soon as the first reduction (red1) produces something into sum1, the second reduction (red2) can operate on it:

        tags.prescribes( sc, *this );
        sc.produces( a );
        // first reduction inputs a and outputs sum1
        red1 = CnC::make_reduce_graph( *this, "red_row",
                                       a,            // input collection: our matrix
                                       cnt1,         // count of items per reduction
                                       sum1,         // output collection: sum per row
                                       std::plus<int>(),  // reduction operation '+'
                                       0,            //identity element
                                       selector() ); // selector
        // second reduction inputs sum1 and outputs sum2
        red2 = CnC::make_reduce_graph( *this, "red_red",
                                       sum1,         // the input collection is the output of red1
                                       cnt2,         // count of items per reduction
                                       sum2,         // the final output: sum of all values
                                       std::plus<int>(),  // reduction operation '+'
                                       0,            //identity element
                                       selector() ); // selector

In this example the step simply puts the items and the environment consumes the final sum. We hope the code samples/reduce_2d/reduce_2d/reduce_2d.cpp is sufficiently well documented to understand how it works.

Writing Re-Usable Sub-Graphs

When writing a CnC::graph three hurdles need to be taken


A CnC::graph defines a piece of an application. Such a graph is part of the overall static CnC structure: it gets instantiated with the rest of CnC and exists until the surrounding CnC context goes away. Such a sub-graph can contain anything it wants, as long as its communication with the CnC environment goes through CnC collections. It can have internal state and/or have internal collections that are not accessible from the outside.

Renaming Input And Output Collections

The graph's connections to and from the CnC context are CnC collections from which a graph takes its inputs and where it puts its output. These input/output collections must live on the outside of the graph but the graph uses them. A generic re-usable graph definition should make as little assumption as possible about these connecting collections - in particular it should not rely on their names on the outside. Moreover, different instantiations of the same graph might be used in the same application, using different collections as input and output.

This implies that the input/output collections should be parameters of a generic re-usable graph. Hence such a graph should accept the connecting collections as arguments of its constructor. Internally it will store references to them and use those in the implementation of its functionality.

Accessing Input Data

The conventional way of getting data from collections (using ::get()) is one way to get to the input data. However, this is possible only from within (graph-internal) CnC steps. Step-like data access is limits the things that can be done to what a CnC can express (e.g. it is not possible to operate on a stream of incoming data with unknown tags).

To lift this restriction CnC provides a callback interface to tag- and item collections. A graph can register callbacks on each collection. Whenever something is put into the collection the registered callback is called, providing the tag (and data) of what's currently put. Every collection defines the type of this callback: CnC::item_collection::callback_type and CnC::tag_collection::callback_type. The callback needs to implement the corresponding on_put method (see CnC::item_collection::callback_type and CnC::tag_collection::callback_type) and is registered with a collection by calling CnC::item_collection::on_put or CnC::tag_collection::on_put. The callback object can carry state and keep references to the graph. This provides a very flexible mechanism to implement almost anything within a CnC graph, even if it's - internally! - not obeying the CnC semantics.

Let's look at a simple example of such a callback. We need a class which derives from CnC::item_collection::callback_type and implements the on_put method:

    // the callback for incoming data just pushes the items on our internal queue
    struct on_data : public IC_IN::callback_type
        on_data( blocking_queue & q ) : m_q(q) {}
        void on_put( const typename IC_IN::tag_type & /*tag*/, const typename IC_IN::data_type & val )
            m_q.push( val );
        blocking_queue & m_q;

on_put simply stores the incoming data in an internal queue. In the graph's constructor we register an instance of our callback:

        m_input.on_put( new on_data( m_queue ) );

The graph can now operate on the internal copies of the data in any way it wants. When communicating back the CnC environment, the graph must use its output-collections. As a simple example, let's have a thread pop each data from the internal queue, sleep for a while and then produce control tags and output data:

            do {
                // get next input
                queue->pop( _item );
                // and do some work, we just sleep, but it could be anything
                // ideally of course this would use TBB tasks
                int _tm = rand() % 1111;
                std::this_thread::sleep_for( std::chrono::milliseconds(_tm) );
                // now produce our output, to make it a little more intersting let's make it conditional
                if( _tm % 4 ) {
                    dataout->put( _tag, _item + _tm );
                    tagout->put( _tag );
            } while( _tag < CnC::tuner_base::myPid()*1000+7 );

Please see samples/hidden_graph/hidden_graph/hidden_graph.cpp for the full code.

The hidden_graph example uses its own thread and so requires additional care (see Detection Of Graph Quiescence/Termination).

Writing generic graphs

As a typed language C++ requires static knowledge of used types. The types of tags and items must be known at compile time. A generic graph definition needs to adjust to these types at compile time. In C++ this is achieved by using templates and the template arguments of a "templated graph" depend at least on the input and output collections. As CnC's collection templates accept not only the tag and data types but also the tuner type, an increasing number of input/output collections easily leads to a very long list of template parameters.

To instantiate an object of a template class requires specifying all template arguments in the type definition. Template functions are easier to use as the C++ compiler can automatically deduce the template arguments. Hence it is recommended to accompany every template graph with generator function accepting all input/output collections and other initialization parameters.

In addition to the mere type declaration issue, the internals of a (hidden) graph might actually operate on tags and items. A generic definition must ensure that those operations are valid for all types. It should be possible for a user to provide the functionality so that the graph works on custom types. For example, the graph might use the operator '+' on items. If you overload the operator '+' of your custom type it'll work just fine. Instead of overloading you can also make such operators (template) parameters of your graph. The reduction graph is an example, it let's you provide the reduction operation.

Note that this is not only true for the data items but also for tags. Whenever the graph works on a tag, it makes some assumptions about what the tag means or what kind of information it can extract from it. For example, if the graph semantically operates on a one-dimensional data set, it should still be possible to use the graph if the actual tag-space is 2-dimensional (or even higher). In other words, even if the graph's operation is one-dimensional, it should be able to operate on a higher-dimensional item-collection (e.g. apply a reduction on every row for a 2-dimensional matrix). Like with the above operators, this can be addressed by letting the user provide the functionality to extract the information the graph needs. An example is the selector function in the reduction (see also reduce_2d for an sample use).

Detection Of Graph Quiescence/Termination

If a graph executes things only within (internal) CnC steps or within the the execution scope of the callbacks the runtime can automatically detect when the graph execution is finished. Examples of such graphs are CnC::reduction (as returned by CnC::make_reduce_graph) and CnC::join (as returned by CnC::make_join_graph).

Restricting executing code to steps and callbacks will suffice in many cases. Some algorithms might require starting threads or reacting on events outside the CnC scope (such as sensors). In such cases the graph is responsible of communicating to the CnC runtime whether it's currently active or quiescent. As soon as the runtime finds everything in quiescent state it will terminate the execution.

For signaling activity and/or quiescence CnC::graph provides CnC::graph::leave_quiescence and CnC::graph::enter_quiescence. A call to enter_quiescence must be paired with a preceding call to leave_quiescence but can otherwise be allowed at any time. leave_quiescence must be called only within the constructor of the graph, within a (internal) CnC step or within a on_put callback.

Our hidden_graph example starts a thread which operates on the data. This is outside any CnC call so we must communicate when it leaves and enters quiescence. As a graph gets born in quiescent state we call leave_quiescence (to enter active state) in the constructor and then register our callback:

    // the constructor tells the runtime that the graph is not quiescent
    // it also starts the thread and registers the on_put callback
    template< typename Ctxt >
    hidden_graph( CnC::context< Ctxt > & ctxt, const std::string & name, IC_IN & ic1, IC_OUT & ic2, TC_OUT & tc )
        : CnC::graph( ctxt, name ),
          m_input( ic1 ),
          m_dataout( ic2 ),
          m_tagout( tc ),
          m_thread( observe_and_compute(), &m_queue, &m_dataout, &m_tagout, this )
        // we started a thread, we are not quiescent until it exits
        // register the callback
        m_input.on_put( new on_data( m_queue ) );

When our thread is done, it calls enter_quiescence before it terminates.

            } while( _tag < CnC::tuner_base::myPid()*1000+7 );
            // we are done, before exiting we must inform the runtime
            CnC::Internal::Speaker oss;
            oss << "done observe_and_compute";

Note that this is a very simple and artificial example. In particular, our graph has only one region of activity. However, conceptually and in practice there is no problem letting the graph alternate between quiescent and active state - as long as it obeys the above rules.

The usual caveats apply to distributed memory: each remote "clone" of a sub-graph must report quiescence for the runtime to detect global quiescence. This might require explicit communication (see Sub-Graphs on Distributed Memory).

Sub-Graphs on Distributed Memory

Sub-graphs, in particular hidden ones, might require explicitly addressing distributed memory. A "clone" of the graph exists on each process, hence, as soon as you're using non-CnC things you have to explicitly take care for any communication that's needed between the siblings.

Examples for such explicit communication are reductions (reduce.h) and joins (join.h).

The tuners can be used to configure where items go in the usual way. If your graph has internal CnC steps you might consider letting the user provide a tuner for it and so control its distribution.

Please see CnC::graph, CnC::item_collection and CnC::tag_collection for the details about communication between siblings and where callbacks get executed.

Next: Runtime Options

 All Classes Namespaces Functions Variables Typedefs Enumerator Friends