12.5 API Usage

Pass properties to a custom Operation
// set property on Flow
Properties properties = new Properties();
properties.put( "key", "value" );
FlowConnector flowConnector = new HadoopFlowConnector( properties );
// ...

// get the property from within an Operation (Function, Filter, etc)
String value = (String) flowProcess.getProperty( "key" );
Bind multiple sources and sinks to a Flow
Pipe headLeft = new Pipe( "headLeft" );
// do something interesting

Pipe headRight = new Pipe( "headRight" );
// do something interesting

// merge the two input streams
Pipe merged = new GroupBy( headLeft, headRight, new Fields( "common" ) );
// ...

// branch the merged stream
Pipe tailLeft = new Pipe( "tailLeft", merged );
// filter out values to the left
tailLeft = new Each( tailLeft, new SomeFilter() );

Pipe tailRight = new Pipe( "tailRight", merged );
// filter out values to the right
tailRight = new Each( tailRight, new SomeFilter() );

// source taps
Scheme inLeftScheme =
  new TextDelimited( new Fields( "some-fields" ) );
Tap sourceLeft = new Hfs( inLeftScheme, "some/path" );

Scheme inRightScheme =
  new TextDelimited( new Fields( "some-fields" ) );
Tap sourceRight = new Hfs( inRightScheme, "some/path" );

// sink taps
Scheme outLeftScheme =
  new TextDelimited( new Fields( "some-fields" ) );
Tap sinkLeft = new Hfs( outLeftScheme, "some/path" );

Scheme outRightScheme =
  new TextDelimited( new Fields( "some-fields" ) );
Tap sinkRight = new Hfs( outRightScheme, "some/path" );

FlowDef flowDef = new FlowDef()
  .setName( "flow-name" );

// bind source Taps to Pipe heads
flowDef
  .addSource( headLeft, sourceLeft )
  .addSource( headRight, sourceRight );

// bind sink Taps to Pipe tails
flowDef
  .addSink( tailLeft, sinkLeft )
  .addTailSink( tailRight, sinkRight );

// ALTERNATIVELY ...

// add named source Taps
// the head pipe name to bind to
flowDef
  .addSource( "headLeft", sourceLeft )    // headLeft.getName()
  .addSource( "headRight", sourceRight ); // headRight.getName()

// add named sink Taps
flowDef
  .addSink( "tailLeft", sinkLeft )    // tailLeft.getName()
  .addSink( "tailRight", sinkRight ); // tailRight.getName()

// add tails -- heads are reachable from the tails
flowDef
  .addTail( tailLeft )
  .addTail( tailRight );

// set property on Flow
FlowConnector flowConnector = new HadoopFlowConnector();

Flow flow = flowConnector.connect( flowDef );

Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.