New to Cascading 2, and only supported by the Hadoop planner, is
the ability to "checkpoint" data within a Flow by using the
Pipe. That is, a Tuple stream can be persisted to
disk at most any arbitrary point. Doing so forces a new FlowStep
(MapReduce job when using Hadoop) after the checkpoint position.
By default the checkpoint is anonymous and is cleaned up immediately after the Flow completes. This feature is useful when used in conjunction with a HashJoin where the small side of the join starts out extremely large but is filtered down to fit into memory before being read into the HashJoin. By forcing a checkpoint before the HashJoin, only the small filtered version of the data is replicated over the cluster. Without the checkpoint, it is likely the full unfiltered file will be replicated to every node the FlowStep is executing.
Alternatively, checkpointing is useful for debugging when used with a checkpoint Tap, where the Tap has specified a TextDelimited Scheme without any declared Fields.
Example 8.8. Adding a Checkpoint
// the "left hand side" assembly head Pipe lhs = new Pipe( "lhs" ); lhs = new Each( lhs, new SomeFunction() ); lhs = new Each( lhs, new SomeFilter() ); // the "right hand side" assembly head Pipe rhs = new Pipe( "rhs" ); rhs = new Each( rhs, new SomeFunction() ); // joins the lhs and rhs Pipe join = new CoGroup( lhs, rhs ); join = new Every( join, new SomeAggregator() ); // we want to see the data passing through this point Checkpoint checkpoint = new Checkpoint( "checkpoint", join ); Pipe groupBy = new GroupBy( checkpoint ); groupBy = new Every( groupBy, new SomeAggregator() ); // the tail of the assembly groupBy = new Each( groupBy, new SomeFunction() ); Tap lhsSource = new Hfs( new TextLine(), "lhs.txt" ); Tap rhsSource = new Hfs( new TextLine(), "rhs.txt" ); Tap sink = new Hfs( new TextLine(), "output" ); // write all data as a tab delimited file, with headers Tap checkpointTap = new Hfs( new TextDelimited( true, "\t" ), "checkpoint" ); FlowDef flowDef = new FlowDef() .setName( "flow-name" ) .addSource( rhs, rhsSource ) .addSource( lhs, lhsSource ) .addTailSink( groupBy, sink ) .addCheckpoint( checkpoint, checkpointTap ); // bind the checkpoint tap Flow flow = new HadoopFlowConnector().connect( flowDef );
As can be seen above, we instantiate a new
Checkpoint tap by passing it the previous
Pipe. This will be
the point at which data is persisted. Since we wish to keep the data
Flow has completed, we create a
checkpointTap that saves the data as a TAB delimited text
file. We also specify that field names should be written out into a
header file on the
Tap is bound to the
Pipe using the
Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.