// set property on Flow
Properties properties = new Properties();
properties.put( "key", "value" );
FlowConnector flowConnector = new HadoopFlowConnector( properties );
// ...
// get the property from within an Operation (Function, Filter, etc)
String value = (String) flowProcess.getProperty( "key" );
Pipe headLeft = new Pipe( "headLeft" );
// do something interesting
Pipe headRight = new Pipe( "headRight" );
// do something interesting
// merge the two input streams
Pipe merged = new GroupBy( headLeft, headRight, new Fields( "common" ) );
// ...
// branch the merged stream
Pipe tailLeft = new Pipe( "tailLeft", merged );
// filter out values to the left
tailLeft = new Each( tailLeft, new SomeFilter() );
Pipe tailRight = new Pipe( "tailRight", merged );
// filter out values to the right
tailRight = new Each( tailRight, new SomeFilter() );
// source taps
Scheme inLeftScheme =
new TextDelimited( new Fields( "some-fields" ) );
Tap sourceLeft = new Hfs( inLeftScheme, "some/path" );
Scheme inRightScheme =
new TextDelimited( new Fields( "some-fields" ) );
Tap sourceRight = new Hfs( inRightScheme, "some/path" );
// sink taps
Scheme outLeftScheme =
new TextDelimited( new Fields( "some-fields" ) );
Tap sinkLeft = new Hfs( outLeftScheme, "some/path" );
Scheme outRightScheme =
new TextDelimited( new Fields( "some-fields" ) );
Tap sinkRight = new Hfs( outRightScheme, "some/path" );
FlowDef flowDef = new FlowDef()
.setName( "flow-name" );
// bind source Taps to Pipe heads
flowDef
.addSource( headLeft, sourceLeft )
.addSource( headRight, sourceRight );
// bind sink Taps to Pipe tails
flowDef
.addSink( tailLeft, sinkLeft )
.addTailSink( tailRight, sinkRight );
// ALTERNATIVELY ...
// add named source Taps
// the head pipe name to bind to
flowDef
.addSource( "headLeft", sourceLeft ) // headLeft.getName()
.addSource( "headRight", sourceRight ); // headRight.getName()
// add named sink Taps
flowDef
.addSink( "tailLeft", sinkLeft ) // tailLeft.getName()
.addSink( "tailRight", sinkRight ); // tailRight.getName()
// add tails -- heads are reachable from the tails
flowDef
.addTail( tailLeft )
.addTail( tailRight );
// set property on Flow
FlowConnector flowConnector = new HadoopFlowConnector();
Flow flow = flowConnector.connect( flowDef );
Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.