3.8 Flows

When pipe assemblies are bound to source and sink taps, a Flow is created. Flows are executable in the sense that, once they are created, they can be started and will execute on the specified platform. If the Hadoop platform is specified, the Flow will execute on a Hadoop cluster.

A Flow is essentially a data processing pipeline that reads data from sources, processes the data as defined by the pipe assembly, and writes data to the sinks. Input source data does not need to exist at the time the Flow is created, but it must exist by the time the Flow is executed (unless it is executed as part of a Cascade - see Cascades for more on this).

The most common pattern is to create a Flow from an existing pipe assembly. But there are cases where a MapReduce job (if running on Hadoop) has already been created, and it makes sense to encapsulate it in a Flow class so that it may participate in a Cascade and be scheduled with other Flow instances. Alternatively, via the Riffle annotations, third-party applications can participate in a Cascade, and complex algorithms that result in iterative Flow executions can be encapsulated as a single Flow. All patterns are covered here.

Creating Flows from Pipe Assemblies

Example 3.12. Creating a new Flow

HadoopFlowConnector flowConnector = new HadoopFlowConnector();

Flow flow =
  flowConnector.connect( "flow-name", source, sink, pipe );

To create a Flow, it must be planned though one of the FlowConnector subclass objects. In Cascading, each platform (i.e., local and Hadoop) has its own connectors. The connect() method is used to create new Flow instances based on a set of sink taps, source taps, and a pipe assembly. Above is a trivial example that uses the Hadoop mode connector.

Example 3.13. Binding taps in a Flow

// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );

lhs = new Each( lhs, new SomeFunction() );
lhs = new Each( lhs, new SomeFilter() );

// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );

rhs = new Each( rhs, new SomeFunction() );

// joins the lhs and rhs
Pipe join = new CoGroup( lhs, rhs );

join = new Every( join, new SomeAggregator() );

Pipe groupBy = new GroupBy( join );

groupBy = new Every( groupBy, new SomeAggregator() );

// the tail of the assembly
groupBy = new Each( groupBy, new SomeFunction() );

Tap lhsSource = new Hfs( new TextLine(), "lhs.txt" );
Tap rhsSource = new Hfs( new TextLine(), "rhs.txt" );

Tap sink = new Hfs( new TextLine(), "output" );

FlowDef flowDef = new FlowDef()
  .setName( "flow-name" )
  .addSource( rhs, rhsSource )
  .addSource( lhs, lhsSource )
  .addTailSink( groupBy, sink );

Flow flow = new HadoopFlowConnector().connect( flowDef );

The example above expands on our previous pipe assembly example by creating multiple source and sink taps and planning a Flow. Note there are two branches in the pipe assembly - one named "lhs" and the other named "rhs". Internally Cascading uses those names to bind the source taps to the pipe assembly. New in 2.0, a FlowDef can be created to manage the names and taps that must be passed to a FlowConnector.

Configuring Flows

The FlowConnector constructor accepts the java.util.Property object so that default Cascading and any platform-specific properties can be passed down through the planner to the platform at runtime. In the case of Hadoop, any relevant Hadoop *-default.xml properties may be added. For instance, it's very common to add mapred.map.tasks.speculative.execution, mapred.reduce.tasks.speculative.execution, or mapred.child.java.opts.

One of the two properties that must always be set for production applications is the application Jar class or Jar path.

Example 3.14. Configuring the Application Jar

Properties properties = new Properties();

// pass in the class name of your application
// this will find the parent jar at runtime
properties = AppProps.appProps()
  .setName( "sample-app" )
  .setVersion( "1.2.3" )
  .addTags( "deploy:prod", "team:engineering" )
  .setJarClass( Main.class ) // find jar from class
  .buildProperties( properties ); // returns a copy

// ALTERNATIVELY ...

// pass in the path to the parent jar
properties = AppProps.appProps()
  .setName( "sample-app" )
  .setVersion( "1.2.3" )
  .addTags( "deploy:prod", "team:engineering" )
  .setJarPath( pathToJar ) // set jar path
  .buildProperties( properties ); // returns a copy


// pass properties to the connector
FlowConnector flowConnector = new HadoopFlowConnector( properties );

More information on packaging production applications can be found in Executing Processes.

Since the FlowConnector can be reused, any properties passed on the constructor will be handed to all the Flows it is used to create. If Flows need to be created with different default properties, a new FlowConnector will need to be instantiated with those properties, or properties will need to be set on a given Pipe or Tap instance directly - via the getConfigDef() or getStepConfigDef() methods.

Skipping Flows

When a Flow participates in a Cascade, the Flow.isSkipFlow() method is consulted before calling Flow.start() on the flow. The result is based on the Flow's skip strategy. By default, isSkipFlow() returns true if any of the sinks are stale - i.e., the sinks don't exist or the resources are older than the sources. However, the strategy can be changed via the Flow.setFlowSkipStrategy() and Cascade.setFlowSkipStrategy() method, which can be called before or after a particular Flow instance has been created.

Cascading provides a choice of two standard skip strategies:

FlowSkipIfSinkNotStale

This strategy - cascading.flow.FlowSkipIfSinkNotStale - is the default. Sinks are treated as stale if they don't exist or the sink resources are older than the sources. If the SinkMode for the sink tap is REPLACE, then the tap is treated as stale.

FlowSkipIfSinkExists

The cascading.flow.FlowSkipIfSinkExists strategy skips the Flow if the sink tap exists, regardless of age. If the SinkMode for the sink tap is REPLACE, then the tap is treated as stale.

Additionally, you can implement custom skip strategies by using the interface cascading.flow.FlowSkipStrategy.

Note that Flow.start() does not consult the isSkipFlow() method, and consequently always tries to start the Flow if called. It is up to the user code to call isSkipFlow() to determine whether the current strategy indicates that the Flow should be skipped.

Creating Flows from a JobConf

If a MapReduce job already exists and needs to be managed by a Cascade, then the cascading.flow.hadoop.MapReduceFlow class should be used. To do this, after creating a Hadoop JobConf instance simply pass it into the MapReduceFlow constructor. The resulting Flow instance can be used like any other Flow.

Creating Custom Flows

Any custom Class can be treated as a Flow if given the correct Riffle annotations. Riffle is a set of Java annotations that identify specific methods on a class as providing specific life-cycle and dependency functionality. For more information, see the Riffle documentation and examples. To use with Cascading, a Riffle-annotated instance must be passed to the cascading.flow.hadoop.ProcessFlow constructor method. The resulting ProcessFlow instance can be used like any other Flow instance.

Since many algorithms need to perform multiple passes over a given data set, a Riffle-annotated Class can be written that internally creates Cascading Flows and executes them until no more passes are needed. This is like nesting Flows or Cascades in a parent Flow, which in turn can participate in a Cascade.

Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.