Cascading 3.0 User Guide - Advanced Processing

Advanced Processing


In Cascading, a SubAssembly is a reusable pipe assembly that can be joined with other instances of a SubAssembly to form a larger pipe assembly. SubAssemblies are much like subroutines in a larger program. SubAssemblies are a good way to organize complex pipe assemblies, and they allow for commonly used pipe assemblies to be packaged into libraries for inclusion in other projects by other users.

Many prebuilt SubAssemblies are available in the core Cascading library. See Built-in SubAssemblies for details.

To create a SubAssembly, subclass the cascading.pipe.SubAssembly class.

Example 1. Creating a SubAssembly
public class SomeSubAssembly extends SubAssembly
  public SomeSubAssembly( Pipe lhs, Pipe rhs )
    // must register incoming pipes
    setPrevious( lhs, rhs );

    // continue assembling against lhs
    lhs = new Each( lhs, new SomeFunction() );
    lhs = new Each( lhs, new SomeFilter() );

    // continue assembling against rhs
    rhs = new Each( rhs, new SomeFunction() );

    // joins the lhs and rhs
    Pipe join = new CoGroup( lhs, rhs );

    join = new Every( join, new SomeAggregator() );

    join = new GroupBy( join );

    join = new Every( join, new SomeAggregator() );

    // the tail of the assembly
    join = new Each( join, new SomeFunction() );

    // must register all assembly tails
    setTails( join );

Notice that in Example 1:

  1. The pipes to be configured and joined are passed in as parameters with the constructor.

  2. The incoming pipes are registered.

  3. The pipes are joined to form a pipe assembly (a tail).

  4. The tail is registered.

Example 2 demonstrates how to include a SubAssembly in a new pipe assembly.

Example 2. Using a SubAssembly
// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );

// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );

// our custom SubAssembly
Pipe pipe = new SomeSubAssembly( lhs, rhs );

pipe = new Each( pipe, new SomeFunction() );

In a SubAssembly that represents a split — that is, a SubAssembly with two or more tails — you can use the getTails() method to access the array of tails set internally by the setTails() method.

Example 3. Creating a split SubAssembly
public class SplitSubAssembly extends SubAssembly
  public SplitSubAssembly( Pipe pipe )
    // must register incoming pipe
    setPrevious( pipe );

    // continue assembling against pipe
    pipe = new Each( pipe, new SomeFunction() );

    Pipe lhs = new Pipe( "lhs", pipe );
    lhs = new Each( lhs, new SomeFunction() );

    Pipe rhs = new Pipe( "rhs", pipe );
    rhs = new Each( rhs, new SomeFunction() );

    // must register all assembly tails
    setTails( lhs, rhs );
Example 4. Using a split SubAssembly
// the "left hand side" assembly head
Pipe head = new Pipe( "head" );

// our custom SubAssembly
SubAssembly pipe = new SplitSubAssembly( head );

// grab the split branches
Pipe lhs = new Each( pipe.getTails()[ 0 ], new SomeFunction() );
Pipe rhs = new Each( pipe.getTails()[ 1 ], new SomeFunction() );

To rephrase, if a SubAssembly does not split the incoming Tuple stream, the SubAssembly instance can be passed directly to the next Pipe instance. But, if the SubAssembly splits the stream into multiple branches, handles will be needed to access them. The solution is to pass each branch tail to the setTails() method and to call the getTails() method to get handles for the desired branches. The handles can be passed to subsequent instances of Pipe.

Stream Assertions


Above we have inserted "assertion" pipes into the pipe assembly either between other pipes and/or taps.

Stream assertions are simply a mechanism for asserting that one or more values in a Tuple stream meet certain criteria. This is similar to the Java language assert keyword or a unit test. Command examples are assert not null and assert matches.

Assertions are treated like any other function or aggregator in Cascading. They are embedded directly into the pipe assembly by the developer. By default, if an assertion fails, the processing fails. As an alternative, an assertion failure can be caught by a failure Trap.

Assertions may be more, or less, desirable in different contexts. For this reason, stream assertions can be treated as either "strict" or "validating." Strict assertions make sense when running tests against regression data. These assemblies should be small and should represent many of the edge cases that the processing assembly must robustly support. Validating assertions, on the other hand, make more sense when running tests in staging or when using data that may vary in quality due to an unmanaged source.

And of course there are cases where assertions are unnecessary because they only would impede processing.

Cascading can be instructed to plan out (i.e., omit) strict assertions (leaving the validating assertions) or both strict and validating assertions when building the Flow. To create optimal performance, Cascading implements this by actually leaving the undesired assertions out of the final Flow (not merely disabling the assertions).

Example 5. Adding assertions
// incoming -> "ip", "time", "method", "event", "status", "size"

AssertNotNull notNull = new AssertNotNull();
assembly = new Each( assembly, AssertionLevel.STRICT, notNull );

AssertSizeEquals equals = new AssertSizeEquals( 6 );
assembly = new Each( assembly, AssertionLevel.STRICT, equals );

AssertMatchesAll matchesAll = new AssertMatchesAll( "(GET|HEAD|POST)" );
assembly = new Each( assembly, new Fields( "method" ),
  AssertionLevel.STRICT, matchesAll );

// outgoing -> "ip", "time", "method", "event", "status", "size"

Again, assertions are added to a pipe assembly like any other operation, except that the AssertionLevel must be set to tell the planner how to treat the assertion during planning.

Example 6. Planning out assertions
// FlowDef is a fluent way to define a Flow
FlowDef flowDef = new FlowDef();

// bind the taps and pipes
  .addSource( assembly.getName(), source )
  .addSink( assembly.getName(), sink )
  .addTail( assembly );

// removes all assertions from the Flow
  .setAssertionLevel( AssertionLevel.NONE );

Flow flow = new Hadoop2MR1FlowConnector().connect( flowDef );

To configure the planner to remove some or all assertions, a property can be set via the FlowConnectorProps.setAssertionLevel() method or directly on the FlowDef instance. An example of setting the FlowDef instance is shown in Example 6.

Assertion-level properties

Removes all assertions.


Retains VALID assertions but removes STRICT ones


Retains all assertions (the Cascading planner default value)

Failure Traps

Cascading provides the ability to trap the data and associated diagnostics that cause Java exceptions to be thrown from an Operation or Tap.

Typically if an exception is thrown cluster side, Cascading stops the complete executing Flow and forces the Flow.complete() method to throw an exception on the client side. Obviously if this exception is not handled, the client application will exit.

To prevent the shutdown, a trap Tap can be bound to whole branches. When an exception is encountered, the argument data is saved to the location specified by the trap Tap, including any specific diagnostic fields that may aid in resolving persistent issues.

The following diagram shows the use of traps in a pipe assembly.


Failure Traps are similar to tap sinks (as opposed to tap sources) in that they allow data to be stored. The difference is that Tap sinks are bound to a particular tail pipe in a pipe assembly and are the primary outlet of a branch in a pipe assembly. Traps can be bound to intermediate pipe assembly branches, but they only capture data that cause an Operation to fail (those that throw an exception).

Whenever an operation fails and throws an exception, if there is an associated trap, the offending Tuple is saved to the resource specified by the trap Tap. This allows the job to continue processing, while saving any "bad" data for future inspection.

By design, clusters are hardware fault-tolerant - lose a node, and the cluster continues working. But fault tolerance for software is a little different. Failure Traps provide a means for the processing to continue without losing track of the data that caused the fault. For high-fidelity applications, this may not be very useful, since you likely will want any errors during processing to cause the application to stop. But for low-fidelity applications, such as webpage indexing, where skipping a page or two out of a few million is acceptable, this can dramatically improve processing reliability.

Example 7. Setting traps
// ...some useful pipes here

// name this pipe assembly segment
assembly = new Pipe( "assertions", assembly );

AssertNotNull notNull = new AssertNotNull();
assembly = new Each( assembly, AssertionLevel.STRICT, notNull );

AssertSizeEquals equals = new AssertSizeEquals( 6 );
assembly = new Each( assembly, AssertionLevel.STRICT, equals );

AssertMatchesAll matchesAll = new AssertMatchesAll( "(GET|HEAD|POST)" );
Fields method = new Fields( "method" );
assembly =
  new Each( assembly, method, AssertionLevel.STRICT, matchesAll );

// ...some more useful pipes here

FlowDef flowDef = new FlowDef();

  .setName( "log-parser" )
  .addSource( "logs", source )
  .addTailSink( assembly, sink );

// set the trap on the "assertions" branch
  .addTrap( "assertions", trap );

FlowConnector flowConnector = new Hadoop2MR1FlowConnector();
Flow flow =
  flowConnector.connect( flowDef );

The example above binds a trap Tap to the pipe assembly segment named "assertions." Note how we can name branches and segments by using a single Pipe instance. The naming applies to all subsequent Pipe instances.

Traps are for exceptional cases, in the same way that Java Exception handling is. Traps are not intended for application flow control, and not a means to filter some data into other locations. Applications that need to filter out bad data should do so explicitly, using filters. For more on this, see Handling Good and Bad Data.

Optionally, the following diagnostic information may be captured along with the argument Tuple values.

  • element-trace - the file and line number in which the failed operation was instantiated

  • throwable-message - the Throwable#getMessage() value

  • throwable-stacktrace - the "cleansed" Throwable#printStackTrace()

See the cascading.tap.TrapProps Javadoc for more details.


Checkpointing is the ability to collapse a tuple stream within a Flow at any point as a way to improve the reliability or performance of a Flow. This is accomplished by using cascading.pipe.Checkpoint Pipe.

Checkpointing forces all tuple stream data to be written to disk, shared filesystem, or some other proprietary means provided by the underlying platform. The data is written at the end of a Pipe, prior to processing of the next Pipe in a stream.

By default a Checkpoint is anonymous and is cleaned up immediately after the Flow completes.

This feature is useful when used in conjunction with a HashJoin where the small side of the join starts out extremely large but is filtered down to fit into memory before being read into the HashJoin. By forcing a Checkpoint before the HashJoin, only the small filtered version of the data is replicated over the cluster. Without the Checkpoint, it is likely that the full, unfiltered file will be replicated to every node that the pipe assembly is executing.

On some platforms, checkpointing can allow for a Flow to be restarted after a transient failure. See Restarting a Checkpointed Flow below.

Alternatively, checkpointing is useful for debugging when used with a Checkpoint Tap, where the Tap has specified a TextDelimited Scheme without any declared Fields.

Example 8. Adding a Checkpoint
// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );

lhs = new Each( lhs, new SomeFunction() );
lhs = new Each( lhs, new SomeFilter() );

// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );

rhs = new Each( rhs, new SomeFunction() );

// joins the lhs and rhs
Pipe join = new CoGroup( lhs, rhs );

join = new Every( join, new SomeAggregator() );

// we want to see the data passing through this point
Checkpoint checkpoint = new Checkpoint( "checkpoint", join );

Pipe groupBy = new GroupBy( checkpoint );

groupBy = new Every( groupBy, new SomeAggregator() );

// the tail of the assembly
groupBy = new Each( groupBy, new SomeFunction() );

Tap lhsSource = new Hfs( new TextLine(), "lhs.txt" );
Tap rhsSource = new Hfs( new TextLine(), "rhs.txt" );

Tap sink = new Hfs( new TextLine(), "output" );

// write all data as a tab delimited file, with headers
Tap checkpointTap =
  new Hfs( new TextDelimited( true, "\t" ), "checkpoint" );

FlowDef flowDef = new FlowDef()
  .setName( "flow-name" )
  .addSource( rhs, rhsSource )
  .addSource( lhs, lhsSource )
  .addTailSink( groupBy, sink )
  .addCheckpoint( checkpoint, checkpointTap ); // bind the checkpoint tap

Flow flow = new Hadoop2MR1FlowConnector().connect( flowDef );

As can be seen above, we instantiate a new Checkpoint Tap by passing it the previous Every Pipe. This will be the point at which data is persisted.

Example 8 is for running Cascading on the Hadoop platform. Cascading in local mode ignores Checkpoint pipes.

In Example 8:

  1. A checkpointTap that saves the data as a tab-delimited text file is created to keep the data after the Flow has completed.

  2. The code specifies that field names should be written out into a header file on the TextDelimited constructor.

  3. The Tap is bound to the Checkpoint Pipe using the FlowDef.

Using a TextDelimited file as an intermediate representation within a Flow may result in subtle coercion errors when field types are not provided consistently and when dealing with complex (nonprimitive) data types.

Restarting a Checkpointed Flow

When using Checkpoint pipes in a Flow and the Flow fails, a future execution of the Flow can be restarted after the last successful FlowStep writing to a Checkpoint file. In other words, a Flow will only restart from the last Checkpoint Pipe location.

This feature requires that the following conditions are met:

  • The failed Flow is planned with a runID string value set on the FlowDef.

  • The restarted Flow uses the same runID string value as the failed Flow used.

  • The restarted Flow should be (roughly) equivalent to the previous, failed attempt — see the cautions below.

Restartable Flows are only supported by some platforms.
Example 9. Setting runID
FlowDef flowDef = new FlowDef()
  .setName( "flow-name" )
  .addSource( rhs, rhsSource )
  .addSource( lhs, lhsSource )
  .addTailSink( groupBy, sink )
  .addCheckpoint( checkpoint, checkpointTap )
  .setRunID( "some-unique-value" ); // re-use this id to restart this flow

Flow flow = new Hadoop2MR1FlowConnector().connect( flowDef );
The example above is for Cascading running on the Hadoop platform. Cascading in local mode ignores Checkpoint pipes.

Caution should be used when using restarted Checkpoint Flows. If the input data has changed or the pipe assembly has significantly been altered, the Flow may fail or there may be undetectable errors.

Note that when using a runID, all Flow instances must use a unique value except for those that attempt to restart the Flow. The runID value is used to scope the directories for the temporary checkpoint files to prevent file name collisions.

On successful completion of a Flow with a runID, any temporary checkpoint files are removed.

Flow and Cascade Event Handling

Each Flow and Cascade has the ability to execute callbacks via an event listener. This ability is useful when an external application needs to be notified that either a Flow or Cascade has started, halted, completed, or either has thrown an exception.

For instance, at the completion of a Flow that runs on an Amazon EC2 Hadoop cluster, an Amazon SQS message can be sent to notify another application to fetch the job results from S3 or begin the shutdown of the cluster.

Flows support event listeners through the cascading.flow.FlowListener interface. Cascades support event listeners through the cascading.cascade.CascadeListener, which supports four events:


The onStarting event begins when a Flow or Cascade instance receives the start() message.


The onStopping event begins when a Flow or Cascade instance receives the stop() message.


The onCompleted event begins when a Flow or Cascade instance has completed all work, regardless of success or failure. If an exception was thrown, onThrowable will be called before this event.

Success or failure can be tested on the given Flow instance via flow.getFlowStats().getStatus().


The onThrowable event begins if any internal job client throws a Throwable type. This Throwable is passed as an argument to the event. onThrowable should return true if the given throwable was handled, and should not be thrown again from the Flow.complete() or Cascade.complete() methods.


The PartitionTap Tap class provides a simple means to break large data sets into smaller sets based on data item values. This is also commonly called binning the data, where each "bin" of data is named after some data value(s) shared by the members of that bin. For example, this is a simple way to organize log files by month and year.

Example 10. PartitionTap
TextDelimited scheme =
  new TextDelimited( new Fields( "entry" ), "\t" );
FileTap parentTap = new FileTap( scheme, path );

// dirs named "[year]-[month]"
DelimitedPartition partition = new DelimitedPartition( new Fields( "year", "month" ), "-" );
Tap monthsTap = new PartitionTap( parentTap, partition, SinkMode.REPLACE );

In the example above, a parent FileTap tap is constructed and passed to the constructor of a PartitionTap instance, along with a cascading.tap.partition.DelimitedPartition "partitioner".

If more complex path formatting is necessary, you may implement the cascading.tap.partition.Partition interface.

It is important to see in the above example that the parentTap only sinks "entry" fields to a text-delimited file. But the monthsTap expects "year", "month", and "entry" fields from the tuple stream.

Here data is stored in the directory name for each partition when the PartitionTap is a sink, there is no need to redundantly store the data in the text delimited file. When reading from a PartitionTap, the directory name is parsed and its values are added to the outgoing tuple stream.

One last thing to keep in mind is where writing happens when executing on a cluster. By doing a GroupBy on the values used to define the partition, binning will happen during the grouping (reducer or partitioning) phase, and will likely scale much better in cases where there are a very large number of unique partitions that will result in a large number of directories or files.

Partial Aggregation instead of Combiners

Cascading implements a mechanism to perform partial aggregations in order to reduce the amount of transmitted data so that a complete aggregation can be completed down stream. This implementation allows any aggregate function to be implemented — not just Associative and Commutative functions.

Cascading provides a few built-in partial aggregate operations, including AverageBy, CountBy, SumBy, and FirstBy. These are actually SubAssemblies, not Operations, and are subclasses of the AggregateBy SubAssembly. For more on this, see the section on AggregateBy.

Using partial aggregate operations is quite easy. They are actually less verbose than a standard Aggregate operation.

Example 11. Using a SumBy
Pipe assembly = new Pipe( "assembly" );

// ...
Fields groupingFields = new Fields( "date" );
Fields valueField = new Fields( "size" );
Fields sumField = new Fields( "total-size" );
assembly =
  new SumBy( assembly, groupingFields, valueField, sumField, long.class );

For composing multiple partial aggregate operations, things are done a little differently.

Example 12. Composing partials with AggregateBy
Pipe assembly = new Pipe( "assembly" );

// ...
Fields groupingFields = new Fields( "date" );

// note we do not pass the parent assembly Pipe in
Fields valueField = new Fields( "size" );
Fields sumField = new Fields( "total-size", long.class );
SumBy sumBy = new SumBy( valueField, sumField );

Fields countField = new Fields( "num-events" );
CountBy countBy = new CountBy( countField );

assembly = new AggregateBy( assembly, groupingFields, sumBy, countBy );
Important: A GroupBy Pipe is embedded in the resulting assemblies above. But only one GroupBy is performed in the case of the AggregateBy, and all of the partial aggregations are performed simultaneously.