Pipe assemblies define what work should be done against a tuple stream, where during runtime tuple streams are read from Tap sources and are written to Tap sinks. Pipe assemblies may have multiple sources and multiple sinks and they can define splits, merges, and joins to manipulate how the tuple streams interact.
There are only five Pipe types: Pipe, Each, GroupBy, CoGroup, Every, and SubAssembly.
cascading.pipe.Pipe class is used
to name branches of pipe assemblies. These names are used during
planning to bind Taps as either sources or sinks (or as traps, an
advanced topic). It is also the base class for all other pipes
cascading.pipe.Each pipe applies
Operation to each Tuple that passes through it.
cascading.pipe.GroupBy manages one
input Tuple stream and does exactly as it sounds, that is, groups
the stream on selected fields in the tuple stream.
GroupBy also allows for "merging" of two or
more tuple stream that share the same field names.
cascading.pipe.CoGroup allows for
"joins" on a common set of values, just like a SQL join. The
output tuple stream of
CoGroup is the
joined input tuple streams, where a join can be an Inner, Outer,
Left, or Right join.
cascading.pipe.Every pipe applies
Aggregator (like count, or sum) or
Buffer (a sliding window) Operation to
every group of Tuples that pass through it.
allows for nesting reusable pipe assemblies into a Pipe class for
inclusion in a larger pipe assembly. See the section onSubAssemblies.
Pipe assemblies are created by chaining
cascading.pipe.Pipe classes and
Pipe subclasses together. Chaining is
accomplished by passing previous
to the constructor of the next
Example 3.1. Chaining Pipes
// the "left hand side" assembly head Pipe lhs = new Pipe( "lhs" ); lhs = new Each( lhs, new SomeFunction() ); lhs = new Each( lhs, new SomeFilter() ); // the "right hand side" assembly head Pipe rhs = new Pipe( "rhs" ); rhs = new Each( rhs, new SomeFunction() ); // joins the lhs and rhs Pipe join = new CoGroup( lhs, rhs ); join = new Every( join, new SomeAggregator() ); join = new GroupBy( join ); join = new Every( join, new SomeAggregator() ); // the tail of the assembly join = new Each( join, new SomeFunction() );
The above example, if visualized, would look like the diagram below.
Here are some common stream patterns.
A split takes a single stream and sends it down one or
more paths. This is simply achieved by passing a given
Pipe instance to two or more subsequent
Pipe instances. Note you can use the
Pipe class and name the branch (branch
names are useful for bindingFailure Traps),
or with a
A merge is where two or more streams with the exact same
Fields (and types) are treated as a single stream. This is
achieved by passing two or more
instances to a
A join is where two or more streams are connected by one or more common values. See the previous diagram for an example.
Besides defining the paths tuple streams take through splits, merges, grouping, and joining, pipe assemblies also transform and/or filter the stored values in each Tuple. This is accomplished by applying an Operation to each Tuple or group of Tuples as the tuple stream passes through the pipe assembly. To do that, the values in the Tuple typically are given field names, in the same fashion columns are named in a database so that they may be referenced or selected.
cascading.operation.Operation) accept an
input argument Tuple, and output zero or more result Tuples.
There are a few sub-types of operations defined below. Cascading
has a number of generic Operations that can be reused, or
developers can create their own custom Operations.
In Cascading, we call each record of data a Tuple
cascading.tuple.Tuple), and a series of
Tuples are a tuple stream. Think of a Tuple as an Array of
values where each value can be any
java.lang.Object Java type (or
byte array). See the section on Custom Types for supporting non-primitive
either declare the field names in a Tuple. Or reference values
in a Tuple as a selector. Fields can either be string names
("first_name"), integer positions (
-1 for the last
value), or a substitution (
Fields.ALL to select
all values in the Tuple, like an asterisk (
SQL, seeField Algebra).
pipe types are the only pipes that can be used to apply Operations to
the tuple stream.
Each pipe applies an Operation to
"each" Tuple as it passes through the pipe assembly. The
Every pipe applies an Operation to "every"
group of Tuples as they pass through the pipe assembly, on the tail
end of a
new Each( previousPipe, argumentSelector, operation, outputSelector )
new Every( previousPipe, argumentSelector, operation, outputSelector )
Every pipe take a Pipe instance, an argument
selector, Operation instance, and a output selector on the
constructor. Where each selector is a Fields instance.
Each pipe may only apply
the tuple stream as these operations may only operate on one Tuple at
Every pipe may only apply
to the tuple stream as these operations may only operate on groups of
tuples, one grouping at a time.
The "argument selector" selects values from the input Tuple to be passed to the Operation as argument values. Most Operations declare result fields, "declared fields" in the diagram. The "output selector" selects the output Tuple from an "appended" version of the input Tuple and the Operation result Tuple. This new output Tuple becomes the input Tuple to the next pipe in the pipe assembly.
Note that if a
Aggregator emits more than one Tuple, this
process will be repeated for each result Tuple against the original
input Tuple, depending on the output selector, input Tuple values
could be duplicated across each output Tuple.
If the argument selector is not given, the whole input Tuple
Fields.ALL) is passed to the Operation as argument
values. If the result selector is not given on an
Each pipe, the Operation results are returned
by default (
Fields.RESULTS), replacing the input Tuple
values in the tuple stream. This really only applies to
either discard the input Tuple, or return the input Tuple intact.
There is no opportunity to provide an output selector.
Every pipe, the Aggregator
results are appended to the input Tuple (
It is important to note that the
pipe associates Aggregator results with the current group Tuple. For
example, if you are grouping on the field "department" and counting
the number of "names" grouped by that department, the output Fields
would be ["department","num_employees"]. This is true for both
Aggregator, seen above, and
If you were also adding up the salaries associated with each
"name" in each "department", the output Fields would be
["department","num_employees","total_salaries"]. This is only true for
Aggregator Operations, you may not
Every pipe when used with a
Buffer the behavior is slightly different.
Instead of associating the Buffer results with the current grouing
Tuple, they are associated with the current values Tuple, just like an
Each pipe does with a
Function. This might be slightly more
confusing, but provides much more flexibility.
CoGroup pipes serve two roles. First, they emit
sorted grouped tuple streams allowing for Operations to be applied to
sets of related Tuple instances. Where "sorted" means the tuple groups
are emitted from the
CoGroup pipes in sort order of the field values
the groups were grouped on.
Second, they allow for two streams to be either merged or joined. Where merging allows for two or more tuple streams originating from different sources to be treated as a single stream. And joining allows two or more streams to be "joined" (in the SQL sense) on a common key or set of Tuple values in a Tuple.
It is not required that an
GroupBy or CoGroup, an
Each may follow immediately after. But an
Every many not follow an
It is important to note, for both
CoGroup, the values being grouped on must be
the same type. If your application attempts to
GroupBy on the field "size", but the value
alternates between a
String and a
Long, Hadoop will fail internally attempting to
apply a Java
Comparator to them. This also
holds true for the secondary sorting sort-by fields in
GroupBy accepts one or more tuple
streams. If two or more, they must all have the same field names (this
is also called a merge, see below).
Example 3.2. Grouping a Tuple Stream
Pipe groupBy = new GroupBy( assembly, new Fields( "group1", "group2" ) );
The example above simply creates a new tuple stream where Tuples
with the same values in "group1" and "group2" can be processed as a
set by an
Buffer Operation. The resulting stream of
tuples will be sorted by the values in "group1" and "group2".
Example 3.3. Merging a Tuple Stream
Pipe pipes = Pipe.pipes( lhs, rhs ); Pipe merge = new GroupBy( pipes, new Fields( "group1", "group2" ) );
This example merges two streams ("lhs" and "rhs") into one tuple stream and groups the resulting stream on the fields "group1" and "group2", in the same fashion as the previous example.
CoGroup accepts two or more tuple streams and does not require any common field names. The grouping fields must be provided for each tuple stream.
Example 3.4. Joining a Tuple Stream
Fields lhsFields = new Fields( "fieldA", "fieldB" ); Fields rhsFields = new Fields( "fieldC", "fieldD" ); Pipe join = new CoGroup( lhs, lhsFields, rhs, rhsFields, new InnerJoin() );
This example joins two streams ("lhs" and "rhs") on common values. Note that common field names are not required here. Actually, if there were any common field names, the Cascading planner would throw an error as duplicate field names are not allowed.
This is significant because of the nature of joining streams.
The first stage of joining has to do with identifying field names that represent the grouping key for a given stream. The second stage is emitting a new Tuple with the joined values, this includes the grouping values, and the other values.
In the above example, we see what "logically" happens during a
join. Here we join two streams on the "url" field which happens to be
common to both streams. The result is simply two Tuple instances with
the same "url" appended together into a new Tuple. In practice this
would fail since the result Tuple has duplicate field names. The
CoGroup pipe has the
declaredFields argument allowing the developer
to declare new unique field names for the resulting tuple.
Example 3.5. Joining a Tuple Stream with Duplicate Fields
Fields common = new Fields( "url" ); Fields declared = new Fields( "url1", "word", "wd_count", "url2", "sentence", "snt_count" ); Pipe join = new CoGroup( lhs, common, rhs, common, declared, new InnerJoin() );
Here we see an example of what the developer could have named the fields so the planner would not fail.
It is important to note that Cascading could just magically create a new Tuple by removing the duplicate grouping fields names so the user isn't left renaming them. In the above example, the duplicate "url" columns could be collapsed into one, as they are the same value. This is not done because field names are a user convenience, the primary mechanism to manipulate Tuples is through positions, not field names. So the result of every Pipe (Each, Every, CoGroup, GroupBy) needs to be deterministic. This gives Cascading a big performance boost, provides a means for sub-assemblies to be built without coupling to any "domain level" concepts (like "first_name", or "url), and allows for higher level abstractions to be built on-top of Cascading simply.
In the example above, we explicitly set a Joiner class to join
our data. The reason
CoGroup is named "CoGroup"
and not "Join" is because joining data is done after all the parallel
streams are co-grouped by their common keys. The details are not
terribly important, but note that a "bag" of data for every input
tuple stream must be created before an join operation can be
performed. Each bag consists of all the Tuple instances associated
with a given grouping Tuple.
Above we see two bags, one for each tuple stream ("lhs" and
"rhs"). Each Tuple in bag is independent but all Tuples in both bags
have the same "url" value since we are grouping on "url", from the
previous example. A Joiner will match up every Tuple on the "lhs" with
a Tuple on the "rhs". An InnerJoin is the most common. This is where
each Tuple on the "lhs" is matched with every Tuple on the "rhs". This
is the default behaviour one would see in SQL when doing a join. If
one of the bags was empty, no Tuples would be joined. An OuterJoin
allows for either bag to be empty, and if that is the case, a Tuple
null values would be substituted.
Above we see all supported Joiner types.
LHS = [0,a] [1,b] [2,c] RHS = [0,A] [2,C] [3,D]Using the above simple data sets, we will define each join type where the values are joined on the first position, a numeric value. Note when Cascading joins Tuples, the resulting Tuple will contain all the incoming values. The duplicate common key(s) is not discarded if given. And on outer joins, where there is no equivalent key in the alternate stream,
nullvalues are used as placeholders.
An Inner join will only return a joined Tuple if neither bag has is empty.
An Outer join will join if either the left or right bag is empty.
[0,a,0,A] [1,b,null,null] [2,c,2,C] [null,null,3,D]
A Left join can also be stated as a Left Inner and Right Outer join, where it is fine if the right bag is empty.
[0,a,0,A] [1,b,null,null] [2,c,2,C]
A Right join can also be stated as a Left Outer and Right Inner join, where it is fine if the left bag is empty.
[0,a,0,A] [2,c,2,C] [null,null,3,D]
A Mixed join is where 3 or more tuple streams are
joined, and each pair must be joined differently. See the
for more details.
A custom join is where the developer subclasses the
By virtue of the Reduce method, in the MapReduce model
CoGroup, all groups of Tuples will be locally
sorted by their grouping values. That is, both the
Operations will receive groups in their natural sort order. But the
values associated within those groups are not sorted.
That is, if we sort on 'lastname' with the tuples
[jane, doe], the 'firstname' values will
arrive in an arbitrary order to the
In the below example we provide sorting fields to the
GroupBy instance. Now
value2 will arrive in their natural sort order (assuming
Example 3.6. Secondary Sorting
Fields groupFields = new Fields( "group1", "group2" ); Fields sortFields = new Fields( "value1", "value2" ); Pipe groupBy = new GroupBy( assembly, groupFields, sortFields );
If we didn't care about the order of
could have left it out of the
In this example, we reverse the order of
while keeping the natural order of
Example 3.7. Reversing Secondary Sort Order
Fields groupFields = new Fields( "group1", "group2" ); Fields sortFields = new Fields( "value1", "value2" ); sortFields.setComparator( "value1", Collections.reverseOrder() ); Pipe groupBy = new GroupBy( assembly, groupFields, sortFields );
Whenever there is an implied sort, during grouping or secondary
sorting, a custom
java.util.Comparator can be
supplied to the grouping
Fields or secondary
Fields to influence the sorting through
Creating a custom
Comparator also allows
Comparable classes to be sorted and/or
Here is a more practical example were we group by the 'day of the year', but want to reverse the order of the Tuples within that grouping by 'time of day'.
Example 3.8. Reverse Order by Time
Fields groupFields = new Fields( "year", "month", "day" ); Fields sortFields = new Fields( "hour", "minute", "second" ); sortFields.setComparators( Collections.reverseOrder(), // hour Collections.reverseOrder(), // minute Collections.reverseOrder() ); // second Pipe groupBy = new GroupBy( assembly, groupFields, sortFields );
Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.