9.2 Stream Shaping

Split (branch) a Tuple Stream
Pipe pipe = new Pipe( "head" );
pipe = new Each( pipe, new SomeFunction() );
// ...

// split left with the branch name 'lhs'
Pipe lhs = new Pipe( "lhs", pipe );
lhs = new Each( lhs, new SomeFunction() );
// ...

// split right with the branch name 'rhs'
Pipe rhs = new Pipe( "rhs", pipe );
rhs = new Each( rhs, new SomeFunction() );
// ...
Copy a field value
Fields argument = new Fields( "field" );
Identity identity = new Identity( new Fields( "copy" ) );

// identity copies the incoming argument to the result tuple
pipe = new Each( pipe, argument, identity, Fields.ALL );
Discard (drop) a field
// incoming -> "keepField", "dropField"
pipe = new Each( pipe, new Fields( "keepField" ), new Identity(),
  Fields.RESULTS );
// outgoing -> "keepField"
Rename a field
// a simple SubAssembly that uses Identity internally
pipe = new Rename( pipe, new Fields( "from" ), new Fields( "to" ) );
Coerce field values from Strings to primitives
Fields arguments = new Fields( "longField", "booleanField" );
Class types[] = new Class[]{long.class, boolean.class};
Identity identity = new Identity( types );

// convert from string to given type, inline replace values
pipe = new Each( pipe, arguments, identity, Fields.REPLACE );
Insert constant values into a stream
Fields fields = new Fields( "constant1", "constant2" );
pipe = new Each( pipe, new Insert( fields, "value1", "value2" ),
  Fields.ALL );

Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.