8.4 Checkpointing

New to Cascading 2, and only supported by the Hadoop planner, is the ability to "checkpoint" data within a Flow by using the cascading.pipe.Checkpoint Pipe. That is, a Tuple stream can be persisted to disk at most any arbitrary point. Doing so forces a new FlowStep (MapReduce job when using Hadoop) after the checkpoint position.

By default the checkpoint is anonymous and is cleaned up immediately after the Flow completes. This feature is useful when used in conjunction with a HashJoin where the small side of the join starts out extremely large but is filtered down to fit into memory before being read into the HashJoin. By forcing a checkpoint before the HashJoin, only the small filtered version of the data is replicated over the cluster. Without the checkpoint, it is likely the full unfiltered file will be replicated to every node the FlowStep is executing.

Alternatively, checkpointing is useful for debugging when used with a checkpoint Tap, where the Tap has specified a TextDelimited Scheme without any declared Fields.

Example 8.8. Adding a Checkpoint

// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );

lhs = new Each( lhs, new SomeFunction() );
lhs = new Each( lhs, new SomeFilter() );

// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );

rhs = new Each( rhs, new SomeFunction() );

// joins the lhs and rhs
Pipe join = new CoGroup( lhs, rhs );

join = new Every( join, new SomeAggregator() );

// we want to see the data passing through this point
Checkpoint checkpoint = new Checkpoint( "checkpoint", join );

Pipe groupBy = new GroupBy( checkpoint );

groupBy = new Every( groupBy, new SomeAggregator() );

// the tail of the assembly
groupBy = new Each( groupBy, new SomeFunction() );

Tap lhsSource = new Hfs( new TextLine(), "lhs.txt" );
Tap rhsSource = new Hfs( new TextLine(), "rhs.txt" );

Tap sink = new Hfs( new TextLine(), "output" );

// write all data as a tab delimited file, with headers
Tap checkpointTap =
  new Hfs( new TextDelimited( true, "\t" ), "checkpoint" );

FlowDef flowDef = new FlowDef()
  .setName( "flow-name" )
  .addSource( rhs, rhsSource )
  .addSource( lhs, lhsSource )
  .addTailSink( groupBy, sink )
  .addCheckpoint( checkpoint, checkpointTap ); // bind the checkpoint tap

Flow flow = new HadoopFlowConnector().connect( flowDef );

As can be seen above, we instantiate a new Checkpoint tap by passing it the previous Every Pipe. This will be the point at which data is persisted. Since we wish to keep the data after the Flow has completed, we create a checkpointTap that saves the data as a TAB delimited text file. We also specify that field names should be written out into a header file on the TextDelimited constructor. Finally the Tap is bound to the Checkpoint Pipe using the FlowDef.

Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.