When pipe assemblies are bound to source and sink taps, a
Flow
is created. Flows are executable in the
sense that, once they are created, they can be started and will execute
on the specified platform. If the Hadoop platform is specified, the Flow
will execute on a Hadoop cluster.
A Flow is essentially a data processing pipeline that reads data from sources, processes the data as defined by the pipe assembly, and writes data to the sinks. Input source data does not need to exist at the time the Flow is created, but it must exist by the time the Flow is executed (unless it is executed as part of a Cascade - see Cascades for more on this).
The most common pattern is to create a Flow from an existing pipe
assembly. But there are cases where a MapReduce job (if running on
Hadoop) has already been created, and it makes sense to encapsulate it
in a Flow class so that it may participate in a
Cascade
and be scheduled with other
Flow
instances. Alternatively, via the Riffle
annotations, third-party applications can participate in a
Cascade
, and complex algorithms that result in
iterative Flow executions can be encapsulated as a single Flow. All
patterns are covered here.
Example 3.12. Creating a new Flow
HadoopFlowConnector flowConnector = new HadoopFlowConnector();
Flow flow =
flowConnector.connect( "flow-name", source, sink, pipe );
To create a Flow, it must be planned though one of the
FlowConnector subclass objects. In Cascading, each platform (i.e.,
local and Hadoop) has its own connectors. The connect()
method is used to create new Flow instances based on a set of sink
taps, source taps, and a pipe assembly. Above is a trivial example
that uses the Hadoop mode connector.
Example 3.13. Binding taps in a Flow
// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );
lhs = new Each( lhs, new SomeFunction() );
lhs = new Each( lhs, new SomeFilter() );
// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );
rhs = new Each( rhs, new SomeFunction() );
// joins the lhs and rhs
Pipe join = new CoGroup( lhs, rhs );
join = new Every( join, new SomeAggregator() );
Pipe groupBy = new GroupBy( join );
groupBy = new Every( groupBy, new SomeAggregator() );
// the tail of the assembly
groupBy = new Each( groupBy, new SomeFunction() );
Tap lhsSource = new Hfs( new TextLine(), "lhs.txt" );
Tap rhsSource = new Hfs( new TextLine(), "rhs.txt" );
Tap sink = new Hfs( new TextLine(), "output" );
FlowDef flowDef = new FlowDef()
.setName( "flow-name" )
.addSource( rhs, rhsSource )
.addSource( lhs, lhsSource )
.addTailSink( groupBy, sink );
Flow flow = new HadoopFlowConnector().connect( flowDef );
The example above expands on our previous pipe assembly example by creating multiple source and sink taps and planning a Flow. Note there are two branches in the pipe assembly - one named "lhs" and the other named "rhs". Internally Cascading uses those names to bind the source taps to the pipe assembly. New in 2.0, a FlowDef can be created to manage the names and taps that must be passed to a FlowConnector.
The FlowConnector constructor accepts the
java.util.Property
object so that default
Cascading and any platform-specific properties can be passed down
through the planner to the platform at runtime. In the case of Hadoop,
any relevant Hadoop *-default.xml
properties may be
added. For instance, it's very common to add
mapred.map.tasks.speculative.execution
,
mapred.reduce.tasks.speculative.execution
, or
mapred.child.java.opts
.
One of the two properties that must always be set for production applications is the application Jar class or Jar path.
Example 3.14. Configuring the Application Jar
Properties properties = new Properties();
// pass in the class name of your application
// this will find the parent jar at runtime
properties = AppProps.appProps()
.setName( "sample-app" )
.setVersion( "1.2.3" )
.addTags( "deploy:prod", "team:engineering" )
.setJarClass( Main.class ) // find jar from class
.buildProperties( properties ); // returns a copy
// ALTERNATIVELY ...
// pass in the path to the parent jar
properties = AppProps.appProps()
.setName( "sample-app" )
.setVersion( "1.2.3" )
.addTags( "deploy:prod", "team:engineering" )
.setJarPath( pathToJar ) // set jar path
.buildProperties( properties ); // returns a copy
// pass properties to the connector
FlowConnector flowConnector = new HadoopFlowConnector( properties );
More information on packaging production applications can be found in Executing Processes.
Since the FlowConnector
can be reused,
any properties passed on the constructor will be handed to all the
Flows it is used to create. If Flows need to be created with different
default properties, a new FlowConnector will need to be instantiated
with those properties, or properties will need to be set on a given
Pipe
or Tap
instance
directly - via the getConfigDef()
or
getStepConfigDef()
methods.
When a Flow
participates in a
Cascade
, the
Flow.isSkipFlow()
method is consulted before
calling Flow.start()
on the flow. The result is
based on the Flow's skip strategy.
By default, isSkipFlow()
returns true if any
of the sinks are stale - i.e., the sinks don't exist or the resources
are older than the sources. However, the strategy can be changed via
the Flow.setFlowSkipStrategy()
and
Cascade.setFlowSkipStrategy()
method, which can
be called before or after a particular Flow
instance has been created.
Cascading provides a choice of two standard skip strategies:
This strategy -
cascading.flow.FlowSkipIfSinkNotStale
-
is the default. Sinks are treated as stale if they don't exist
or the sink resources are older than the sources. If the
SinkMode for the sink tap is REPLACE, then the tap is treated as
stale.
The
cascading.flow.FlowSkipIfSinkExists
strategy skips the Flow if the sink tap exists, regardless of
age. If the SinkMode
for the sink tap is
REPLACE
, then the tap is treated as stale.
Additionally, you can implement custom skip strategies by using
the interface
cascading.flow.FlowSkipStrategy
.
Note that Flow.start()
does not consult
the isSkipFlow()
method, and consequently
always tries to start the Flow if called. It is up to the user code to
call isSkipFlow()
to determine whether the
current strategy indicates that the Flow should be skipped.
If a MapReduce job already exists and needs to be managed by a
Cascade, then the
cascading.flow.hadoop.MapReduceFlow
class
should be used. To do this, after creating a Hadoop
JobConf
instance simply pass it into the
MapReduceFlow
constructor. The resulting
Flow
instance can be used like any other
Flow.
Any custom Class can be treated as a Flow if given the correct
Riffle
annotations. Riffle is a set of Java annotations that identify
specific methods on a class as providing specific life-cycle and
dependency functionality. For more information, see the Riffle
documentation and examples. To use with Cascading, a Riffle-annotated
instance must be passed to the
cascading.flow.hadoop.ProcessFlow
constructor
method. The resulting ProcessFlow
instance can
be used like any other Flow instance.
Since many algorithms need to perform multiple passes over a given data set, a Riffle-annotated Class can be written that internally creates Cascading Flows and executes them until no more passes are needed. This is like nesting Flows or Cascades in a parent Flow, which in turn can participate in a Cascade.
Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.