New to Cascading 2, and only supported by the Hadoop planner, is
the ability to "checkpoint" data within a Flow by using the
cascading.pipe.Checkpoint
Pipe
. That is, a Tuple stream can be persisted to
disk at most any arbitrary point. Doing so forces a new FlowStep
(MapReduce job when using Hadoop) after the checkpoint position.
By default the checkpoint is anonymous and is cleaned up immediately after the Flow completes. This feature is useful when used in conjunction with a HashJoin where the small side of the join starts out extremely large but is filtered down to fit into memory before being read into the HashJoin. By forcing a checkpoint before the HashJoin, only the small filtered version of the data is replicated over the cluster. Without the checkpoint, it is likely the full unfiltered file will be replicated to every node the FlowStep is executing.
Alternatively, checkpointing is useful for debugging when used with a checkpoint Tap, where the Tap has specified a TextDelimited Scheme without any declared Fields.
Example 8.8. Adding a Checkpoint
// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );
lhs = new Each( lhs, new SomeFunction() );
lhs = new Each( lhs, new SomeFilter() );
// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );
rhs = new Each( rhs, new SomeFunction() );
// joins the lhs and rhs
Pipe join = new CoGroup( lhs, rhs );
join = new Every( join, new SomeAggregator() );
// we want to see the data passing through this point
Checkpoint checkpoint = new Checkpoint( "checkpoint", join );
Pipe groupBy = new GroupBy( checkpoint );
groupBy = new Every( groupBy, new SomeAggregator() );
// the tail of the assembly
groupBy = new Each( groupBy, new SomeFunction() );
Tap lhsSource = new Hfs( new TextLine(), "lhs.txt" );
Tap rhsSource = new Hfs( new TextLine(), "rhs.txt" );
Tap sink = new Hfs( new TextLine(), "output" );
// write all data as a tab delimited file, with headers
Tap checkpointTap =
new Hfs( new TextDelimited( true, "\t" ), "checkpoint" );
FlowDef flowDef = new FlowDef()
.setName( "flow-name" )
.addSource( rhs, rhsSource )
.addSource( lhs, lhsSource )
.addTailSink( groupBy, sink )
.addCheckpoint( checkpoint, checkpointTap ); // bind the checkpoint tap
Flow flow = new HadoopFlowConnector().connect( flowDef );
As can be seen above, we instantiate a new
Checkpoint
tap by passing it the previous
Every
Pipe
. This will be
the point at which data is persisted. Since we wish to keep the data
after the Flow
has completed, we create a
checkpointTap
that saves the data as a TAB delimited text
file. We also specify that field names should be written out into a
header file on the TextDelimited
constructor.
Finally the Tap
is bound to the
Checkpoint
Pipe
using the
FlowDef
.
Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.