3.2 Pipe Assemblies

Pipe assemblies define what work should be done against tuple streams, which are read from tap sources and written to tap sinks. The work performed on the data stream may include actions such as filtering, transforming, organizing, and calculating. Pipe assemblies may use multiple sources and multiple sinks, and may define splits, merges, and joins to manipulate the tuple streams.

Pipe Assembly Workflow

Pipe assemblies are created by chaining cascading.pipe.Pipe classes and subclasses together. Chaining is accomplished by passing the previous Pipe instances to the constructor of the next Pipe instance.

The following example demonstrates this type of chaining. It creates two pipes - a "left-hand side" (lhs) and a "right-hand side" (rhs) - and performs some processing on them both, using the Each pipe. Then it joins the two pipes into one, using the CoGroup pipe, and performs several operations on the joined pipe using Every and GroupBy. The specific operations performed are not important in the example; the point is to show the general flow of the data streams. The diagram after the example gives a visual representation of the workflow.

Example 3.1. Chaining Pipes

// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );

lhs = new Each( lhs, new SomeFunction() );
lhs = new Each( lhs, new SomeFilter() );

// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );

rhs = new Each( rhs, new SomeFunction() );

// joins the lhs and rhs
Pipe join = new CoGroup( lhs, rhs );

join = new Every( join, new SomeAggregator() );

join = new GroupBy( join );

join = new Every( join, new SomeAggregator() );

// the tail of the assembly
join = new Each( join, new SomeFunction() );

The following diagram is a visual representation of the example above.

Common Stream Patterns

As data moves through the pipe, streams may be separated or combined for various purposes. Here are the three basic patterns:


A split takes a single stream and sends it down multiple paths - that is, it feeds a single Pipe instance into two or more subsequent separate Pipe instances with unique branch names.


A merge combines two or more streams that have identical fields into a single stream. This is done by passing two or more Pipe instances to a Merge or GroupBy pipe.


A join combines data from two or more streams that have different fields, based on common field values (analogous to a SQL join.) This is done by passing two or more Pipe instances to a HashJoin or CoGroup pipe. The code sequence and diagram above give an example.

Data Processing

In addition to directing the tuple streams - using splits, merges, and joins - pipe assemblies can examine, filter, organize, and transform the tuple data as the streams move through the pipe assemblies. To facilitate this, the values in the tuple are typically given field names, just as database columns are given names, so that they may be referenced or selected. The following terminology is used:


Operations (cascading.operation.Operation) accept an input argument Tuple, and output zero or more result tuples. There are a few sub-types of operations defined below. Cascading has a number of generic Operations that can be used, or developers can create their own custom Operations.


In Cascading, data is processed as a stream of Tuples (cascading.tuple.Tuple), which are composed of fields, much like a database record or row. A Tuple is effectively an array of (field) values, where each value can be any java.lang.Object Java type (or byte[] array). For information on supporting non-primitive types, see Custom Types.


Fields (cascading.tuple.Fields) are used either to declare the field names for fields in a Tuple, or reference field values in a Tuple. They can either be strings (such as "firstname" or "birthdate"), integers (for the field position, starting at 0 for the first position, or starting at -1 for the last position), or one of the predefined Fields sets (such as Fields.ALL, which selects all values in the Tuple, like an asterisk in SQL). For more on Fields sets, see Field Algebra).

Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.