When pipe assemblies are bound to source and sink Taps, a
Flow
is created. Flows are executable in the
sense that once created they can be "started" and will begin execution
on a configured Hadoop cluster.
Think of a Flow as a data processing workflow that reads data from sources, processes the data as defined by the pipe assembly, and writes data to the sinks. Input source data does not need to exist when the Flow is created, but it must exist when the Flow is executed (unless executed as part of a Cascade, seeCascades).
The most common pattern is to create a Flow from an existing pipe assembly. But there are cases where a MapReduce job has already been created and it makes sense to encapsulate it in a Flow class so that it may participate in a Cascade and be scheduled with other Flow instances. Alternatively, via the Riffle annotations, third party applications can participate in a Cascade, or complex algorithms that result in iterative Flow executions can be encapsulated as a single Flow. All patterns are covered here.
Example 3.11. Creating a new Flow
Flow flow = new FlowConnector().connect( "flow-name", source, sink, pipe );
To create a Flow, it must be planned though the FlowConnector
object. The connect()
method is used to create new Flow
instances based on a set of sink Taps, source Taps, and a pipe
assembly. The example above is quite trivial.
Example 3.12. Binding Taps in a Flow
// the "left hand side" assembly head Pipe lhs = new Pipe( "lhs" ); lhs = new Each( lhs, new SomeFunction() ); lhs = new Each( lhs, new SomeFilter() ); // the "right hand side" assembly head Pipe rhs = new Pipe( "rhs" ); rhs = new Each( rhs, new SomeFunction() ); // joins the lhs and rhs Pipe join = new CoGroup( lhs, rhs ); join = new Every( join, new SomeAggregator() ); Pipe groupBy = new GroupBy( join ); groupBy = new Every( groupBy, new SomeAggregator() ); // the tail of the assembly groupBy = new Each( groupBy, new SomeFunction() ); Tap lhsSource = new Hfs( new TextLine(), "lhs.txt" ); Tap rhsSource = new Hfs( new TextLine(), "rhs.txt" ); Tap sink = new Hfs( new TextLine(), "output" ); Map<String, Tap> sources = new HashMap<String, Tap>(); sources.put( "lhs", lhsSource ); sources.put( "rhs", rhsSource ); Flow flow = new FlowConnector().connect( "flow-name", sources, sink, groupBy );
The example above expands on our previous pipe assembly example by creating source and sink Taps and planning a Flow. Note there are two branches in the pipe assembly, one named "lhs" and the other "rhs". Cascading uses those names to bind the source Taps to the pipe assembly. A HashMap of names and taps must be passed to FlowConnector in order to bind Taps to branches.
Since there is only one tail, the "join" pipe, we don't need to
bind the sink to a branch name. Nor do we need to pass the heads of
the assembly to the FlowConnector, it can determine the heads of the
pipe assembly on the fly. When creating more complex Flows with
multiple heads and tails, all Taps will need to be explicitly named,
and the proper connect()
method will need be
called.
The FlowConnector constructor accepts the
java.util.Property
object so that default
Cascading and Hadoop properties can be passed down through the planner
to the Hadoop runtime. Subsequently any relevant Hadoop
hadoop-default.xml
properties may be added
(mapred.map.tasks.speculative.execution
,
mapred.reduce.tasks.speculative.execution
, or
mapred.child.java.opts
would be very common).
One property that must be set for production applications is the application Jar class or Jar path.
Example 3.13. Configuring the Application Jar
Properties properties = new Properties(); // pass in the class name of your application // this will find the parent jar at runtime FlowConnector.setApplicationJarClass( properties, Main.class ); // or pass in the path to the parent jar FlowConnector.setApplicationJarPath( properties, pathToJar ); FlowConnector flowConnector = new FlowConnector( properties );
More information on packaging production applications can be found inExecuting Processes.
Note the pattern of using a static property setter method
(cascading.flow.FlowConnector.setApplicationJarPath
),
other classes that can be used to set properties are
cascading.flow.MultiMapReducePlanner
and
cascading.flow.Flow
.
Since the FlowConnector
can be reused,
any properties passed on the constructor will be handed to all the
Flows it is used to create. If Flows need to be created with different
default properties, a new FlowConnector will need to be instantiated
with those properties.
When a Flow participates in a Cascade, the
Flow#isSkip()
method is consulted before
calling Flow#start()
on the flow. By default
isSkip()
returns true if any of the sinks are
stale in relation to the Flow sources. Where stale is if they don't
exist or the resources are older than the sources.
This behavior is pluggable through the
cascading.flow.FlowSkipStrategy
interface. A
new strategy can be set on a Flow
instance
after its created.
The
cascading.flow.FlowSkipIfSinkStale
strategy is the default strategy. Sinks are stale if they don't
exist or the resources are older than the sources. If the
SinkMode for the sink Tap is REPLACE, then the Tap will be
treated as stale.
The
cascading.flow.FlowSkipIfSinkExists
strategy will skip a Flow if the sink Tap exists, regardless of
age. If the SinkMode
for the sink Tap is
REPLACE
, then the Tap will be treated as
stale.
Note Flow#start()
and
Flow#complete()
will not consult the
isSkip()
method and subsequently will always
try to start the Flow if called. It is up to user code to call
isSkip()
to decide if the current strategy
suggests the Flow should be skipped.
If a MapReduce job already exists and needs to be managed by a
Cascade, then the cascading.flow.MapReduceFlow
class should be used. After creating a Hadoop
JobConf
instance, just pass it into the
MapReduceFlow
constructor. The resulting
Flow
instance can be used like any other
Flow.
Any custom Class can be treated as a Flow if given the correct
Riffle
annotations. Riffle is an Apache licensed set of Java Annotations that
identify specific methods on a Class as providing specific life-cycle
and dependency functionality. See the Riffle documentation and
examples. To use with Cascading, a Riffle annotated instance must be
passed to the cascading.flow.FlowProcess
constructor method. The resulting FlowProcess instance can be used
like any other Flow instance.
Since many algorithms need to have multiple passes over a given data set, a Riffle annotated Class can be written that internally creates Cascading Flows and executes them until no more passes are needed. This is like nesting a Flows and Cascades in a parent Flow which in turn can participate in a Cascade.
Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.