9.5 API Usage

Pass properties to a custom Operation
// set property on Flow
Properties properties = new Properties();
properties.put( "key", "value" );
FlowConnector flowConnector = new FlowConnector( properties );
// ...

// get the property from within an Operation (Function, Filter, etc)
String value = (String) flowProcess.getProperty( "key" );
Bind multiple sources and sinks to a Flow
Pipe headLeft = new Pipe( "headLeft" );
// do something interesting

Pipe headRight = new Pipe( "headRight" );
// do something interesting

// merge the two input streams
Pipe merged = new GroupBy( headLeft, headRight, new Fields( "common" ) );
// ...

// branch the merged stream
Pipe tailLeft = new Pipe( "tailLeft", merged );
// filter out values to the left
tailLeft = new Each( tailLeft, new SomeFilter() );

Pipe tailRight = new Pipe( "tailRight", merged );
// filter out values to the right
tailRight = new Each( tailRight, new SomeFilter() );

// source taps
Tap sourceLeft = new Hfs( new Fields( "some-fields" ), "some/path" );
Tap sourceRight = new Hfs( new Fields( "some-fields" ), "some/path" );

Pipe[] pipesArray = Pipe.pipes( headLeft, headRight );
Tap[] tapsArray = Tap.taps( sourceLeft, sourceRight );

// a convenience function for creating branch names to tap maps
Map<String, Tap> sources = Cascades.tapsMap( pipesArray, tapsArray );

// sink taps
Tap sinkLeft = new Hfs( new Fields( "some-fields" ), "some/path" );
Tap sinkRight = new Hfs( new Fields( "some-fields" ), "some/path" );

pipesArray = Pipe.pipes( tailLeft, tailRight );
tapsArray = Tap.taps( sinkLeft, sinkRight );

// or create the Map manually
Map<String, Tap> sinks = new HashMap<String, Tap>();
sinks.put( tailLeft.getName(), sinkLeft );
sinks.put( tailRight.getName(), sinkRight );

// set property on Flow
FlowConnector flowConnector = new FlowConnector();

Flow flow = flowConnector.connect( "flow-name", sources, sinks, tailLeft, tailRight );

Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.