In Cascading, SubAssemblies are reusable pipe assemblies that are linked into larger pipe assemblies. They function much like subroutines in a larger program. SubAssemblies are a good way to organize complex pipe assemblies, and they allow for commonly-used pipe assemblies to be packaged into libraries for inclusion in other projects by other users.
To create a SubAssembly, subclass the
cascading.pipe.SubAssembly
class.
Example 8.1. Creating a SubAssembly
public class SomeSubAssembly extends SubAssembly
{
public SomeSubAssembly( Pipe lhs, Pipe rhs )
{
// must register incoming pipes
setPrevious( lhs, rhs );
// continue assembling against lhs
lhs = new Each( lhs, new SomeFunction() );
lhs = new Each( lhs, new SomeFilter() );
// continue assembling against rhs
rhs = new Each( rhs, new SomeFunction() );
// joins the lhs and rhs
Pipe join = new CoGroup( lhs, rhs );
join = new Every( join, new SomeAggregator() );
join = new GroupBy( join );
join = new Every( join, new SomeAggregator() );
// the tail of the assembly
join = new Each( join, new SomeFunction() );
// must register all assembly tails
setTails( join );
}
}
In the example above, we pass in (as parameters via the constructor) the pipes that we wish to continue assembling against, in the first line we register the incoming "previous" pipes, and in the last line we register the outgoing "join" pipe as a tail. This allows SubAssemblies to be nested within larger pipe assemblies or other SubAssemblies.
Example 8.2. Using a SubAssembly
// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );
// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );
// our custom SubAssembly
Pipe pipe = new SomeSubAssembly( lhs, rhs );
pipe = new Each( pipe, new SomeFunction() );
The example above demonstrates how to include a SubAssembly into a new pipe assembly.
Note that in a SubAssembly that represents a split - that is, a
SubAssembly with two or more tails - you can use the
getTails()
method to access the array of tails
set internally by the setTails()
method.
Example 8.3. Creating a Split SubAssembly
public class SplitSubAssembly extends SubAssembly
{
public SplitSubAssembly( Pipe pipe )
{
// must register incoming pipe
setPrevious( pipe );
// continue assembling against pipe
pipe = new Each( pipe, new SomeFunction() );
Pipe lhs = new Pipe( "lhs", pipe );
lhs = new Each( lhs, new SomeFunction() );
Pipe rhs = new Pipe( "rhs", pipe );
rhs = new Each( rhs, new SomeFunction() );
// must register all assembly tails
setTails( lhs, rhs );
}
}
Example 8.4. Using a Split SubAssembly
// the "left hand side" assembly head
Pipe head = new Pipe( "head" );
// our custom SubAssembly
SubAssembly pipe = new SplitSubAssembly( head );
// grab the split branches
Pipe lhs = new Each( pipe.getTails()[ 0 ], new SomeFunction() );
Pipe rhs = new Each( pipe.getTails()[ 1 ], new SomeFunction() );
To rephrase, if a SubAssembly
does not
split the incoming Tuple stream, the SubAssembly instance can be passed
directly to the next Pipe instance. But, if the
SubAssembly
splits the stream into multiple
branches, handles will be needed to access them. The solution is to pass
each branch tail to the setTails()
method, and
call the getTails()
method to get handles for
the desired branches, which can be passed to subsequent instances of
Pipe
.
Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.