// set property on Flow Properties properties = new Properties(); properties.put( "key", "value" ); FlowConnector flowConnector = new FlowConnector( properties ); // ... // get the property from within an Operation (Function, Filter, etc) String value = (String) flowProcess.getProperty( "key" );
Pipe headLeft = new Pipe( "headLeft" ); // do something interesting Pipe headRight = new Pipe( "headRight" ); // do something interesting // merge the two input streams Pipe merged = new GroupBy( headLeft, headRight, new Fields( "common" ) ); // ... // branch the merged stream Pipe tailLeft = new Pipe( "tailLeft", merged ); // filter out values to the left tailLeft = new Each( tailLeft, new SomeFilter() ); Pipe tailRight = new Pipe( "tailRight", merged ); // filter out values to the right tailRight = new Each( tailRight, new SomeFilter() ); // source taps Tap sourceLeft = new Hfs( new Fields( "some-fields" ), "some/path" ); Tap sourceRight = new Hfs( new Fields( "some-fields" ), "some/path" ); Pipe[] pipesArray = Pipe.pipes( headLeft, headRight ); Tap[] tapsArray = Tap.taps( sourceLeft, sourceRight ); // a convenience function for creating branch names to tap maps Map<String, Tap> sources = Cascades.tapsMap( pipesArray, tapsArray ); // sink taps Tap sinkLeft = new Hfs( new Fields( "some-fields" ), "some/path" ); Tap sinkRight = new Hfs( new Fields( "some-fields" ), "some/path" ); pipesArray = Pipe.pipes( tailLeft, tailRight ); tapsArray = Tap.taps( sinkLeft, sinkRight ); // or create the Map manually Map<String, Tap> sinks = new HashMap<String, Tap>(); sinks.put( tailLeft.getName(), sinkLeft ); sinks.put( tailRight.getName(), sinkRight ); // set property on Flow FlowConnector flowConnector = new FlowConnector(); Flow flow = flowConnector.connect( "flow-name", sources, sinks, tailLeft, tailRight );
Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.