8. Advanced Processing

8.1 SubAssemblies

In Cascading, SubAssemblies are reusable pipe assemblies that are linked into larger pipe assemblies. They function much like subroutines in a larger program. SubAssemblies are a good way to organize complex pipe assemblies, and they allow for commonly-used pipe assemblies to be packaged into libraries for inclusion in other projects by other users.

To create a SubAssembly, subclass the cascading.pipe.SubAssembly class.

Example 8.1. Creating a SubAssembly

public class SomeSubAssembly extends SubAssembly
  {
  public SomeSubAssembly( Pipe lhs, Pipe rhs )
    {
    // must register incoming pipes
    setPrevious( lhs, rhs );

    // continue assembling against lhs
    lhs = new Each( lhs, new SomeFunction() );
    lhs = new Each( lhs, new SomeFilter() );

    // continue assembling against rhs
    rhs = new Each( rhs, new SomeFunction() );

    // joins the lhs and rhs
    Pipe join = new CoGroup( lhs, rhs );

    join = new Every( join, new SomeAggregator() );

    join = new GroupBy( join );

    join = new Every( join, new SomeAggregator() );

    // the tail of the assembly
    join = new Each( join, new SomeFunction() );

    // must register all assembly tails
    setTails( join );
    }
  }

In the example above, we pass in (as parameters via the constructor) the pipes that we wish to continue assembling against, in the first line we register the incoming "previous" pipes, and in the last line we register the outgoing "join" pipe as a tail. This allows SubAssemblies to be nested within larger pipe assemblies or other SubAssemblies.

Example 8.2. Using a SubAssembly

// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );

// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );

// our custom SubAssembly
Pipe pipe = new SomeSubAssembly( lhs, rhs );

pipe = new Each( pipe, new SomeFunction() );

The example above demonstrates how to include a SubAssembly into a new pipe assembly.

Note that in a SubAssembly that represents a split - that is, a SubAssembly with two or more tails - you can use the getTails() method to access the array of tails set internally by the setTails() method.

Example 8.3. Creating a Split SubAssembly

public class SplitSubAssembly extends SubAssembly
  {
  public SplitSubAssembly( Pipe pipe )
    {
    // must register incoming pipe
    setPrevious( pipe );

    // continue assembling against pipe
    pipe = new Each( pipe, new SomeFunction() );

    Pipe lhs = new Pipe( "lhs", pipe );
    lhs = new Each( lhs, new SomeFunction() );

    Pipe rhs = new Pipe( "rhs", pipe );
    rhs = new Each( rhs, new SomeFunction() );

    // must register all assembly tails
    setTails( lhs, rhs );
    }
  }

Example 8.4. Using a Split SubAssembly

// the "left hand side" assembly head
Pipe head = new Pipe( "head" );

// our custom SubAssembly
SubAssembly pipe = new SplitSubAssembly( head );

// grab the split branches
Pipe lhs = new Each( pipe.getTails()[ 0 ], new SomeFunction() );
Pipe rhs = new Each( pipe.getTails()[ 1 ], new SomeFunction() );

To rephrase, if a SubAssembly does not split the incoming Tuple stream, the SubAssembly instance can be passed directly to the next Pipe instance. But, if the SubAssembly splits the stream into multiple branches, handles will be needed to access them. The solution is to pass each branch tail to the setTails() method, and call the getTails() method to get handles for the desired branches, which can be passed to subsequent instances of Pipe.

Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.