3.5 Flows

When pipe assemblies are bound to source and sink Taps, a Flow is created. Flows are executable in the sense that once created they can be "started" and will begin execution on a configured Hadoop cluster.

Think of a Flow as a data processing workflow that reads data from sources, processes the data as defined by the pipe assembly, and writes data to the sinks. Input source data does not need to exist when the Flow is created, but it must exist when the Flow is executed (unless executed as part of a Cascade, seeCascades).

The most common pattern is to create a Flow from an existing pipe assembly. But there are cases where a MapReduce job has already been created and it makes sense to encapsulate it in a Flow class so that it may participate in a Cascade and be scheduled with other Flow instances. Alternatively, via the Riffle annotations, third party applications can participate in a Cascade, or complex algorithms that result in iterative Flow executions can be encapsulated as a single Flow. All patterns are covered here.

Creating Flows from Pipe Assemblies

Example 3.11. Creating a new Flow

Flow flow = new FlowConnector().connect( "flow-name", source, sink, pipe );

To create a Flow, it must be planned though the FlowConnector object. The connect() method is used to create new Flow instances based on a set of sink Taps, source Taps, and a pipe assembly. The example above is quite trivial.

Example 3.12. Binding Taps in a Flow

// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );

lhs = new Each( lhs, new SomeFunction() );
lhs = new Each( lhs, new SomeFilter() );

// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );

rhs = new Each( rhs, new SomeFunction() );

// joins the lhs and rhs
Pipe join = new CoGroup( lhs, rhs );

join = new Every( join, new SomeAggregator() );

Pipe groupBy = new GroupBy( join );

groupBy = new Every( groupBy, new SomeAggregator() );

// the tail of the assembly
groupBy = new Each( groupBy, new SomeFunction() );

Tap lhsSource = new Hfs( new TextLine(), "lhs.txt" );
Tap rhsSource = new Hfs( new TextLine(), "rhs.txt" );

Tap sink = new Hfs( new TextLine(), "output" );

Map<String, Tap> sources = new HashMap<String, Tap>();

sources.put( "lhs", lhsSource );
sources.put( "rhs", rhsSource );

Flow flow = new FlowConnector().connect( "flow-name", sources, sink, groupBy );

The example above expands on our previous pipe assembly example by creating source and sink Taps and planning a Flow. Note there are two branches in the pipe assembly, one named "lhs" and the other "rhs". Cascading uses those names to bind the source Taps to the pipe assembly. A HashMap of names and taps must be passed to FlowConnector in order to bind Taps to branches.

Since there is only one tail, the "join" pipe, we don't need to bind the sink to a branch name. Nor do we need to pass the heads of the assembly to the FlowConnector, it can determine the heads of the pipe assembly on the fly. When creating more complex Flows with multiple heads and tails, all Taps will need to be explicitly named, and the proper connect() method will need be called.

Configuring Flows

The FlowConnector constructor accepts the java.util.Property object so that default Cascading and Hadoop properties can be passed down through the planner to the Hadoop runtime. Subsequently any relevant Hadoop hadoop-default.xml properties may be added (mapred.map.tasks.speculative.execution, mapred.reduce.tasks.speculative.execution, or mapred.child.java.opts would be very common).

One property that must be set for production applications is the application Jar class or Jar path.

Example 3.13. Configuring the Application Jar

Properties properties = new Properties();

// pass in the class name of your application
// this will find the parent jar at runtime
FlowConnector.setApplicationJarClass( properties, Main.class );

// or pass in the path to the parent jar
FlowConnector.setApplicationJarPath( properties, pathToJar );

FlowConnector flowConnector = new FlowConnector( properties );

More information on packaging production applications can be found inExecuting Processes.

Note the pattern of using a static property setter method (cascading.flow.FlowConnector.setApplicationJarPath), other classes that can be used to set properties are cascading.flow.MultiMapReducePlanner and cascading.flow.Flow.

Since the FlowConnector can be reused, any properties passed on the constructor will be handed to all the Flows it is used to create. If Flows need to be created with different default properties, a new FlowConnector will need to be instantiated with those properties.

Skipping Flows

When a Flow participates in a Cascade, the Flow#isSkip() method is consulted before calling Flow#start() on the flow. By default isSkip() returns true if any of the sinks are stale in relation to the Flow sources. Where stale is if they don't exist or the resources are older than the sources.

This behavior is pluggable through the cascading.flow.FlowSkipStrategy interface. A new strategy can be set on a Flow instance after its created.

FlowSkipIfSinkStale

The cascading.flow.FlowSkipIfSinkStale strategy is the default strategy. Sinks are stale if they don't exist or the resources are older than the sources. If the SinkMode for the sink Tap is REPLACE, then the Tap will be treated as stale.

FlowSkipIfSinkExists

The cascading.flow.FlowSkipIfSinkExists strategy will skip a Flow if the sink Tap exists, regardless of age. If the SinkMode for the sink Tap is REPLACE, then the Tap will be treated as stale.

Note Flow#start() and Flow#complete() will not consult the isSkip() method and subsequently will always try to start the Flow if called. It is up to user code to call isSkip() to decide if the current strategy suggests the Flow should be skipped.

Creating Flows from a JobConf

If a MapReduce job already exists and needs to be managed by a Cascade, then the cascading.flow.MapReduceFlow class should be used. After creating a Hadoop JobConf instance, just pass it into the MapReduceFlow constructor. The resulting Flow instance can be used like any other Flow.

Creating Custom Flows

Any custom Class can be treated as a Flow if given the correct Riffle annotations. Riffle is an Apache licensed set of Java Annotations that identify specific methods on a Class as providing specific life-cycle and dependency functionality. See the Riffle documentation and examples. To use with Cascading, a Riffle annotated instance must be passed to the cascading.flow.FlowProcess constructor method. The resulting FlowProcess instance can be used like any other Flow instance.

Since many algorithms need to have multiple passes over a given data set, a Riffle annotated Class can be written that internally creates Cascading Flows and executes them until no more passes are needed. This is like nesting a Flows and Cascades in a parent Flow which in turn can participate in a Cascade.

Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.