When pipe assemblies are bound to source and sink taps, a
Flow is created. Flows are executable in the
sense that, once they are created, they can be started and will execute
on the specified platform. If the Hadoop platform is specified, the Flow
will execute on a Hadoop cluster.
A Flow is essentially a data processing pipeline that reads data from sources, processes the data as defined by the pipe assembly, and writes data to the sinks. Input source data does not need to exist at the time the Flow is created, but it must exist by the time the Flow is executed (unless it is executed as part of a Cascade - see Cascades for more on this).
The most common pattern is to create a Flow from an existing pipe
assembly. But there are cases where a MapReduce job (if running on
Hadoop) has already been created, and it makes sense to encapsulate it
in a Flow class so that it may participate in a
Cascade and be scheduled with other
Flow instances. Alternatively, via the Riffle
annotations, third-party applications can participate in a
Cascade, and complex algorithms that result in
iterative Flow executions can be encapsulated as a single Flow. All
patterns are covered here.
Example 3.12. Creating a new Flow
HadoopFlowConnector flowConnector = new HadoopFlowConnector(); Flow flow = flowConnector.connect( "flow-name", source, sink, pipe );
To create a Flow, it must be planned though one of the
FlowConnector subclass objects. In Cascading, each platform (i.e.,
local and Hadoop) has its own connectors. The
method is used to create new Flow instances based on a set of sink
taps, source taps, and a pipe assembly. Above is a trivial example
that uses the Hadoop mode connector.
Example 3.13. Binding taps in a Flow
// the "left hand side" assembly head Pipe lhs = new Pipe( "lhs" ); lhs = new Each( lhs, new SomeFunction() ); lhs = new Each( lhs, new SomeFilter() ); // the "right hand side" assembly head Pipe rhs = new Pipe( "rhs" ); rhs = new Each( rhs, new SomeFunction() ); // joins the lhs and rhs Pipe join = new CoGroup( lhs, rhs ); join = new Every( join, new SomeAggregator() ); Pipe groupBy = new GroupBy( join ); groupBy = new Every( groupBy, new SomeAggregator() ); // the tail of the assembly groupBy = new Each( groupBy, new SomeFunction() ); Tap lhsSource = new Hfs( new TextLine(), "lhs.txt" ); Tap rhsSource = new Hfs( new TextLine(), "rhs.txt" ); Tap sink = new Hfs( new TextLine(), "output" ); FlowDef flowDef = new FlowDef() .setName( "flow-name" ) .addSource( rhs, rhsSource ) .addSource( lhs, lhsSource ) .addTailSink( groupBy, sink ); Flow flow = new HadoopFlowConnector().connect( flowDef );
The example above expands on our previous pipe assembly example by creating multiple source and sink taps and planning a Flow. Note there are two branches in the pipe assembly - one named "lhs" and the other named "rhs". Internally Cascading uses those names to bind the source taps to the pipe assembly. New in 2.0, a FlowDef can be created to manage the names and taps that must be passed to a FlowConnector.
The FlowConnector constructor accepts the
java.util.Property object so that default
Cascading and any platform-specific properties can be passed down
through the planner to the platform at runtime. In the case of Hadoop,
any relevant Hadoop
hadoop-default.xml properties may be
added. For instance, it's very common to add
One of the two properties that must always be set for production applications is the application Jar class or Jar path.
Example 3.14. Configuring the Application Jar
Properties properties = new Properties(); // pass in the class name of your application // this will find the parent jar at runtime AppProps.setApplicationJarClass( properties, Main.class ); // ALTERNATIVELY ... // pass in the path to the parent jar AppProps.setApplicationJarPath( properties, pathToJar ); // pass properties to the connector FlowConnector flowConnector = new HadoopFlowConnector( properties );
More information on packaging production applications can be found in Executing Processes.
Note the pattern of using a static property-setter method
FlowConnector can be reused,
any properties passed on the constructor will be handed to all the
Flows it is used to create. If Flows need to be created with different
default properties, a new FlowConnector will need to be instantiated
with those properties, or properties will need to be set on a given
directly - via the
Flow participates in a
Flow.isSkipFlow() method is consulted before
Flow.start() on the flow. The result is
based on the Flow's skip strategy.
isSkipFlow() returns true if any
of the sinks are stale - i.e., the sinks don't exist or the resources
are older than the sources. However, the strategy can be changed via
Cascade.setFlowSkipStrategy() method, which can
be called before or after a particular
instance has been created.
Cascading provides a choice of two standard skip strategies:
This strategy -
is the default. Sinks are treated as stale if they don't exist
or the sink resources are older than the sources. If the
SinkMode for the sink tap is REPLACE, then the tap is treated as
strategy skips the Flow if the sink tap exists, regardless of
age. If the
SinkMode for the sink tap is
REPLACE, then the tap is treated as stale.
Additionally, you can implement custom skip strategies by using
Flow.start() does not consult
isSkipFlow() method, and consequently
always tries to start the Flow if called. It is up to the user code to
isSkipFlow() to determine whether the
current strategy indicates that the Flow should be skipped.
If a MapReduce job already exists and needs to be managed by a
Cascade, then the
should be used. To do this, after creating a Hadoop
JobConf instance simply pass it into the
MapReduceFlow constructor. The resulting
Flow instance can be used like any other
Any custom Class can be treated as a Flow if given the correct
annotations. Riffle is a set of Java annotations that identify
specific methods on a class as providing specific life-cycle and
dependency functionality. For more information, see the Riffle
documentation and examples. To use with Cascading, a Riffle-annotated
instance must be passed to the
method. The resulting
ProcessFlow instance can
be used like any other Flow instance.
Since many algorithms need to perform multiple passes over a given data set, a Riffle-annotated Class can be written that internally creates Cascading Flows and executes them until no more passes are needed. This is like nesting Flows or Cascades in a parent Flow, which in turn can participate in a Cascade.
Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.