Cascading SubAssemblies are reusable pipe assemblies that are linked into larger pipe assemblies. Think of them as subroutines in a programming language. The help organize complex pipe assemblies and allow for commonly used pipe assemblies to be packaged into libraries for inclusion by other users.
To create a SubAssembly, the
cascading.pipe.SubAssembly
class must be
subclassed.
Example 6.1. Creating a SubAssembly
public class SomeSubAssembly extends SubAssembly { public SomeSubAssembly( Pipe lhs, Pipe rhs ) { // continue assembling against lhs lhs = new Each( lhs, new SomeFunction() ); lhs = new Each( lhs, new SomeFilter() ); // continue assembling against lhs rhs = new Each( rhs, new SomeFunction() ); // joins the lhs and rhs Pipe join = new CoGroup( lhs, rhs ); join = new Every( join, new SomeAggregator() ); join = new GroupBy( join ); join = new Every( join, new SomeAggregator() ); // the tail of the assembly join = new Each( join, new SomeFunction() ); // must register all assembly tails setTails( join ); } }
In the above example, we pass in via the constructor pipes we wish to continue assembling against, and the last line we register the 'join' pipe as a tail. This allows SubAssemblies to be nested within larger pipe assemblies or other SubAssemblies.
Example 6.2. Using a SubAssembly
// the "left hand side" assembly head Pipe lhs = new Pipe( "lhs" ); // the "right hand side" assembly head Pipe rhs = new Pipe( "rhs" ); // our custom SubAssembly Pipe pipe = new SomeSubAssembly( lhs, rhs ); pipe = new Each( pipe, new SomeFunction() );
Above we see how natural it is to include a SubAssembly into a new pipe assembly.
If we had a SubAssembly that represented a split, that is, had two
or more tails, we could use the getTails()
method to get at the array of "tails" set internally by the
setTails()
method.
Example 6.3. Creating a Split SubAssembly
public class SplitSubAssembly extends SubAssembly { public SplitSubAssembly( Pipe pipe ) { // continue assembling against lhs pipe = new Each( pipe, new SomeFunction() ); Pipe lhs = new Pipe( "lhs", pipe ); lhs = new Each( lhs, new SomeFunction() ); Pipe rhs = new Pipe( "rhs", pipe ); rhs = new Each( rhs, new SomeFunction() ); // must register all assembly tails setTails( lhs, rhs ); } }
Example 6.4. Using a Split SubAssembly
// the "left hand side" assembly head Pipe head = new Pipe( "head" ); // our custom SubAssembly SubAssembly pipe = new SplitSubAssembly( head ); // grab the split branches Pipe lhs = new Each( pipe.getTails()[ 0 ], new SomeFunction() ); Pipe rhs = new Each( pipe.getTails()[ 1 ], new SomeFunction() );
To rephrase, if a SubAssembly
does not
split the incoming Tuple stream, the SubAssembly instance can be passed
directly to the next Pipe instance. But, if the
SubAssembly
splits the stream into multiple
branches, each branch tail must be passed to the
setTails()
method, and the
getTails()
method should be called to get a
handle to the correct branch to pass to the next
Pipe
instances.
Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.