6. Advanced Processing

6.1 SubAssemblies

Cascading SubAssemblies are reusable pipe assemblies that are linked into larger pipe assemblies. Think of them as subroutines in a programming language. The help organize complex pipe assemblies and allow for commonly used pipe assemblies to be packaged into libraries for inclusion by other users.

To create a SubAssembly, the cascading.pipe.SubAssembly class must be subclassed.

Example 6.1. Creating a SubAssembly

public class SomeSubAssembly extends SubAssembly
  {
  public SomeSubAssembly( Pipe lhs, Pipe rhs )
    {
    // continue assembling against lhs
    lhs = new Each( lhs, new SomeFunction() );
    lhs = new Each( lhs, new SomeFilter() );

    // continue assembling against lhs
    rhs = new Each( rhs, new SomeFunction() );

    // joins the lhs and rhs
    Pipe join = new CoGroup( lhs, rhs );

    join = new Every( join, new SomeAggregator() );

    join = new GroupBy( join );

    join = new Every( join, new SomeAggregator() );

    // the tail of the assembly
    join = new Each( join, new SomeFunction() );

    // must register all assembly tails
    setTails( join );
    }
  }

In the above example, we pass in via the constructor pipes we wish to continue assembling against, and the last line we register the 'join' pipe as a tail. This allows SubAssemblies to be nested within larger pipe assemblies or other SubAssemblies.

Example 6.2. Using a SubAssembly

// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );

// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );

// our custom SubAssembly
Pipe pipe = new SomeSubAssembly( lhs, rhs );

pipe = new Each( pipe, new SomeFunction() );

Above we see how natural it is to include a SubAssembly into a new pipe assembly.

If we had a SubAssembly that represented a split, that is, had two or more tails, we could use the getTails() method to get at the array of "tails" set internally by the setTails() method.

Example 6.3. Creating a Split SubAssembly

public class SplitSubAssembly extends SubAssembly
  {
  public SplitSubAssembly( Pipe pipe )
    {
    // continue assembling against lhs
    pipe = new Each( pipe, new SomeFunction() );

    Pipe lhs = new Pipe( "lhs", pipe );
    lhs = new Each( lhs, new SomeFunction() );

    Pipe rhs = new Pipe( "rhs", pipe );
    rhs = new Each( rhs, new SomeFunction() );

    // must register all assembly tails
    setTails( lhs, rhs );
    }
  }

Example 6.4. Using a Split SubAssembly

// the "left hand side" assembly head
Pipe head = new Pipe( "head" );

// our custom SubAssembly
SubAssembly pipe = new SplitSubAssembly( head );

// grab the split branches
Pipe lhs = new Each( pipe.getTails()[ 0 ], new SomeFunction() );
Pipe rhs = new Each( pipe.getTails()[ 1 ], new SomeFunction() );

To rephrase, if a SubAssembly does not split the incoming Tuple stream, the SubAssembly instance can be passed directly to the next Pipe instance. But, if the SubAssembly splits the stream into multiple branches, each branch tail must be passed to the setTails() method, and the getTails() method should be called to get a handle to the correct branch to pass to the next Pipe instances.

Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.