Cascading 3.2 User Guide - Flows

1. Introduction

1.1. What Is Cascading?

2. Diving into the APIs

2.1. Anatomy of a Word-Count Application

3. Cascading Basic Concepts

3.1. Terminology

3.3. Pipes

3.4. Platforms

3.6. Sink Modes

3.7. Flows

4. Tuple Fields

4.1. Field Sets

5. Pipe Assemblies

5.1. Each and Every Pipes

5.2. Merge

5.3. GroupBy

5.4. CoGroup

5.5. HashJoin

6. Flows

6.1. Creating Flows from Pipe Assemblies

7. Cascades

7.1. Creating a Cascade

8. Configuring

8.1. Introduction

9. Local Platform

9.1. Building an Application

10. The Apache Hadoop Platforms

10.1. What is Apache Hadoop?

11. Apache Hadoop MapReduce Platform

11.1. Configuring Applications

11.3. Building

12. Apache Tez Platform

12.1. Configuring Applications

12.2. Building

13. Using and Developing Operations

13.1. Introduction

13.2. Functions

13.3. Filters

13.4. Aggregators

13.5. Buffers

14. Custom Taps and Schemes

14.1. Introduction

14.2. Custom Taps

15. Advanced Processing

15.1. SubAssemblies

16. Built-In Operations

16.1. Identity Function

16.9. Assertions

16.11. Buffers

17. Built-in SubAssemblies

17.1. Optimized Aggregations

18. Cascading Best Practices

18.1. Unit Testing

19. Extending Cascading

19.1. Scripting

20. Cookbook: Code Examples of Cascading Idioms

20.1. Tuples and Fields

20.5. API Usage

21. The Cascading Process Planner

21.1. FlowConnector

21.3. RuleRegistry


Creating Flows from Pipe Assemblies

Example 1. Creating a new Flow
FlowConnector flowConnector = new LocalFlowConnector();

Flow flow =
  flowConnector.connect( "flow-name", source, sink, pipe );

To create a Flow, it must be planned though one of the FlowConnector subclass objects. In Cascading, each platform (i.e., local and Hadoop) has its own connectors. The connect() method is used to create new flow instances based on a set of sink taps, source taps, and a pipe assembly. Above is a trivial example that uses the local-mode connector.

Example 2. Binding taps in a Flow
// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );

lhs = new Each( lhs, new SomeFunction() );
lhs = new Each( lhs, new SomeFilter() );

// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );

rhs = new Each( rhs, new SomeFunction() );

// joins the lhs and rhs
Pipe join = new CoGroup( lhs, rhs );

join = new Every( join, new SomeAggregator() );

Pipe groupBy = new GroupBy( join );

groupBy = new Every( groupBy, new SomeAggregator() );

// the tail of the assembly
groupBy = new Each( groupBy, new SomeFunction() );

Tap lhsSource = new FileTap( new TextLine(), "lhs.txt" );
Tap rhsSource = new FileTap( new TextLine(), "rhs.txt" );

Tap sink = new FileTap( new TextLine(), "output" );

FlowDef flowDef = new FlowDef() (1)
  .setName( "flow-name" )
  .addDescription( "joins lhs and rhs" )
  .addSource( rhs, rhsSource )
  .addSource( lhs, lhsSource )
  .addTailSink( groupBy, sink );

Flow flow = new LocalFlowConnector().connect( flowDef );

<1>The FlowDef class is a fluent API for passing required and optional metadata to the FlowConnector.

The example above expands on our previous pipe assembly example by creating multiple source and sink taps and planning a Flow. Note there are two branches in the pipe assembly — one named "lhs" and the other named "rhs." Internally Cascading uses those names to bind the source taps to the pipe assembly.

Configuring Flows

The FlowConnector constructor accepts the java.util.Property object so that default Cascading and any platform-specific properties can be passed down through the planner to the platform at runtime.

In the case of Hadoop, any relevant Hadoop configuration properties may be added. For instance, it’s very common to add mapreduce.job.reduces to set the number of reducers.

When running on a cluster, one of the two properties that must always be set for production applications is the application JAR class or JAR path.

Example 3. Configuring the application JAR
Properties properties = new Properties();

// pass in the class name of your application
// this will find the parent jar at runtime
properties = AppProps.appProps()
  .setName( "sample-app" )
  .setVersion( "1.2.3" )
  .addTags( "cluster:east", "deploy:prod", "team:engineering" )
  .setJarClass( Main.class ) // find jar from class
  .buildProperties( properties ); // returns a copy


// pass in the path to the parent jar
properties = AppProps.appProps()
  .setName( "sample-app" )
  .setVersion( "1.2.3" )
  .addTags( "cluster:east", "deploy:prod", "team:engineering" )
  .setJarPath( pathToJar ) // set jar path
  .buildProperties( properties ); // returns a copy

// pass properties to the connector
FlowConnector flowConnector = new LocalFlowConnector( properties );

Since the FlowConnector can be reused, any properties passed on the constructor are handed to all the flows it is used to create. If flows need to be created with different default properties, a new FlowConnector must be instantiated with those properties. Alternatively, properties can be set on a given Pipe or Tap instance directly — with the getConfigDef(), getNodeConfigDef() or getStepConfigDef() methods.

Skipping Flows

When a Flow participates in a Cascade, the Flow.isSkipFlow() method is checked before calling Flow.start() on the Flow. The result is based on the skip strategy of the Flow.

By default, isSkipFlow() returns true if any of the sinks are stale — i.e., the sinks do not exist or the resources are older than the sources. However, the strategy can be changed with either the Flow.setFlowSkipStrategy() method or the Cascade.setFlowSkipStrategy() method.

Cascading provides a choice of two standard skip strategies:


This strategy — cascading.flow.FlowSkipIfSinkNotStale — is the default. Sinks are treated as stale if they do not exist or the sink resources are older than the sources. If the SinkMode for the sink tap is REPLACE, then the tap is treated as stale.


The cascading.flow.FlowSkipIfSinkExists strategy skips the Flow if the sink tap exists, regardless of age. If the SinkMode for the sink tap is REPLACE, then the tap is treated as stale.

Additionally, you can implement custom skip strategies by implementing the interface cascading.flow.FlowSkipStrategy.

Note that Flow.start() does not check the isSkipFlow() method, and consequently always tries to start the Flow if called. The user code determines whether or not to call isSkipFlow() for assessing if the programming logic indicates that the Flow should be skipped.

Creating Custom Flows

Custom classes can be treated as flows if given the correct Riffle annotations. Riffle is a set of Java annotations that identify specific methods on a class as providing specific life-cycle and dependency functionality. For more information, see the Riffle documentation and examples. To use with Cascading, a Riffle-annotated instance must be passed to the cascading.flow.process.ProcessFlow constructor method. The resulting ProcessFlow instance can be used like any other flow instance.

Since many algorithms need to perform multiple passes over a given data set, a Riffle-annotated class can be written that internally creates Cascading flows and executes them until no more passes are needed. This is like nesting flows or Cascades in a parent Flow, which in turn can participate in a Cascade.

Process Levels in the Flow Hierarchy

A Flow is a parent of other process levels, which have other specific roles.


The prior sections provide a detailed description of flows, but at its essence flows are business-oriented units-of-work. The consumed and produced data has some level of durability. A flow satisfies a business or architectural need.


A FlowStep represents a unit of platform-managed work.

Hadoop MapReduce term: job
Apache Tez term: DAG


A FlowNode represents the complete unit-of-work that conceptually fits in a single JVM and what becomes parallelized by handling subsets of the input data source.

The FlowNode may represent multiple data paths, where one path is selected at runtime depending on which input data set is actually being processed.

Hadoop MapReduce terms: mapper and reducer
Apache Tez term: vertex


The FlowSlice is the smallest unit of parallelization. At runtime, a FlowSlice represents the actual JVM-executing Cascading code and the data path(s) being executed in that JVM.

If the FlowNode being parallelized has FlowPipelines, one of those pipelines is represented here.

Hadoop MapReduce and Apache Tez terms: task attempt

Runtime Metrics

Developers can retrieve runtime metrics (counters) of each process level in a platform-independent way.