Cascading 3.0 User Guide - Pipe Assemblies
Pipe Assemblies
Each and Every Pipes
The Each and Every pipes perform operations on tuple data — for instance, perform a search-and-replace on tuple contents, filter out some of the tuples based on their contents, or count the number of tuples in a stream that share a common field value.
Here is the syntax for these pipes:
new Each( previousPipe, argumentSelector, operation, outputSelector )
new Every( previousPipe, argumentSelector, operation, outputSelector )
Both types take the following arguments on the constructor:
-
incoming Pipe instance
-
argument selector
-
Operation instance
-
output selector
"Selectors" are Fields instances that define the field positions and names that should be retrieved or returned. |
The key difference between Each and Every is that an Each pipe operates on individual tuples, and an Every pipe operates on groups of tuples sent out by GroupBy or CoGroup pipes.
An Each pipe applies operations that are subclasses of the Function and Filter classes (described in the Javadoc). For example, using Each you can parse lines from a log file into their constituent fields, filter out all lines except the HTTP GET requests, and replace the "time string" fields with date fields.
Similarly, since the Every pipe works on tuple groups (the output of a GroupBy or CoGroup pipe), it applies operations that are subclasses of Aggregators and Buffers. For example, you could use GroupBy to group the output of the above Each pipe by date, then use an Every pipe to count the "GET" requests per date. The pipe would then emit the operation results as the date and count for each group.
In the syntax shown at the start of this section, the argument selector specifies fields from the input tuple to use as input values. If the argument selector is not specified, the whole input tuple (Fields.ALL) is passed to the operation as a set of argument values.
Most Operation subclasses declare result fields (shown as "declared fields" in the diagram). The output selector specifies the fields of the output Tuple from the fields of the input Tuple and the operation result. This new output Tuple becomes the input Tuple to the next pipe in the pipe assembly. If the output selector is Fields.ALL, the output is the input Tuple plus the operation result, merged into a single Tuple.
Note that it is possible for a Function, Aggregator, or Buffer to return more than one output Tuple per input Tuple. In this case, the input tuple is duplicated as many times as necessary to create the necessary output tuples. This is similar to the reiteration of values that happens during a join. If a function is designed to always emit three result tuples for every input tuple, each of the three outgoing tuples will consist of the selected input tuple values plus one of the three sets of function result values.
If the result selector is not specified for an Each pipe performing a Functions operation, the operation results are returned by default (Fields.RESULTS), discarding the input tuple values in the tuple stream. (This is not true of Filters , which either discard the input tuple or return it intact, and thus do not use an output selector.)
For the Every pipe, the Aggregator results are appended to the input Tuple (Fields.ALL) by default.
Note that the Every pipe associates Aggregator results
with the current group Tuple (the unique keys that define the
group). For example, if you are grouping on the field "department" and
counting the number of "names" grouped by that department, the resulting output
Fields will be ["department","num_employees"]
.
If you are also adding up the salaries associated with each "name" in each
"department", the output Fields will be
["department","num_employees","total_salaries"]
.
This is only true for chains of Aggregator Operations — you are not allowed to chain Buffer operations, as explained below.
When the Every pipe is used with a Buffer operation, instead of an Aggregator, the behavior is different. Instead of being associated with the current grouping tuple, the operation results are associated with the current values tuple. This is analogous to how an Each pipe works with a Function. This approach may seem slightly unintuitive, but provides much more flexibility.
To put it another way, the results of the buffer operation appends the current keys that define the group only if appending the keys is relevant. It is also possible for a Buffer to return more than one result Tuple per unique grouping. A Buffer may or may not emulate an Aggregator in cases where an Aggregator is just a special optimized case of a Buffer.
Merge
The Merge pipe is very simple. It accepts two or more streams that have the same fields, and emits a single stream containing all the tuples from all the input streams. Thus a merge is just a mingling of all the tuples from the input streams, as if shuffling multiple card decks into one. Note that the output of Merge is in arbitrary order.
Pipe merge = new Merge( lhs, rhs );
The example above simply combines all the tuples from two existing streams ("lhs" and "rhs") into a new tuple stream ("merge").
GroupBy
GroupBy groups the tuples of a stream based on common values in specified fields. If passed multiple streams as inputs, it performs a merge before the grouping. As with Merge, a GroupBy requires that multiple input streams share the same field structure.
The output of GroupBy is suitable for the Every pipe, which performs Aggregator and Buffer operations, such as counting, totaling, or averaging groups of tuples that have a common grouping value (e.g., the same date). By default, GroupBy performs no secondary sort, so within each group the tuples are in arbitrary order. For instance, when grouping on "lastname," the tuples [doe, john] and [doe, jane] are placed in arbitrary sequence of the same group.
Secondary Sorting
If multilevel sorting is desired, the names of the sort fields must be specified to the GroupBy instance, as seen below. In this example, value1 and value2 arrive in their natural sort order (assuming they can implement java.lang.Comparable).
Fields groupFields = new Fields( "group1", "group2" );
Fields sortFields = new Fields( "value1", "value2" );
Pipe groupBy = new GroupBy( assembly, groupFields, sortFields );
If the developer does not care about the order of value2, it can be omitted from the sortFields Fields constructor.
In the next example, we reverse the order of value1 while keeping the natural order of value2.
Fields groupFields = new Fields( "group1", "group2" );
Fields sortFields = new Fields( "value1", "value2" );
sortFields.setComparator( "value1", Collections.reverseOrder() );
Pipe groupBy = new GroupBy( assembly, groupFields, sortFields );
Whenever there is an implied sort during grouping or secondary sorting, a custom java.util.Comparator can optionally be supplied to the grouping Fields or secondary sort Fields. This allows the developer to use the Fields.setComparator() call to control the sort.
To sort or group on non-Java-comparable classes, consider creating a custom Comparator.
The following example is more practical: fields are grouped by the "day of the year", but the code reverses the order of the tuples within that grouping by "time of day".
Fields groupFields = new Fields( "year", "month", "day" );
Fields sortFields = new Fields( "hour", "minute", "second" );
sortFields.setComparators(
Collections.reverseOrder(), // hour
Collections.reverseOrder(), // minute
Collections.reverseOrder() ); // second
Pipe groupBy = new GroupBy( assembly, groupFields, sortFields );
CoGroup
The CoGroup pipe is similar to GroupBy. This pipe performs a join instead of a merge. CoGroup accepts two or more input streams and groups them on one or more specified keys. The join operation is performed on equal key values, similar to a SQL join.
The output stream contains all the fields in the input streams.
As with SQL, the join can be inner, outer, left, or right. Self-joins are permitted, as well as mixed joins (for three or more streams) and custom joins. Null fields in the input streams become corresponding null fields in the output stream.
Since the output is grouped, it is suitable for the Every pipe, which performs Aggregator and Buffer operations — such as counting, totaling, or averaging groups of tuples that have a common value (e.g., the same date).
The output stream is sorted by the natural order of the grouping fields. To control this order, at least the first groupingFields value given should be an instance of Fields containing Comparator instances for the appropriate fields. This allows fine-grained control of the sort grouping order.
Field Names
In a join operation, all the field names used in any of the input tuples must be unique; duplicate field names are not allowed. If the names overlap there is a collision, as shown in the following diagram.
In this figure, two streams are to be joined on the "url" field, resulting in a new Tuple that contains fields from the two input tuples. However, the resulting tuple would include two fields with the same name ("url"), which is unworkable. To handle the conflict, developers can use the declaredFields argument (described in the Javadoc) to declare unique field names for the output tuple, as in the following example.
Fields common = new Fields( "url" );
Fields declared = new Fields(
"url1", "word", "wd_count", "url2", "sentence", "snt_count"
);
Pipe join =
new CoGroup( lhs, common, rhs, common, declared, new InnerJoin() );
This revised figure demonstrates the use of declared field names to prevent a planning failure.
It might seem preferable for Cascading to automatically recognize the duplication and simply merge the identically named fields, saving effort for the developer. However, consider the case of an outer type join in which one field (or set of fields used for the join) for a given join side happens to be null. Discarding one of the duplicate fields would lose this information.
Further, the internal implementation for reading tuples relies on field position and not field names. The field names are a device for the developer. This approach allows the behavior of the CoGroup to be deterministic and consistent.
The Joiner class
In Example 5, a Joiner class (InnerJoin) is specified to perform a join on our data. There are five Joiner subclasses, as shown in the following diagram.
In CoGroup, the join is performed after all the input streams are first co-grouped by their common keys. Cascading must create a "bag" of data for every grouping in the input streams, consisting of all the Tuple instances associated with that grouping.
As mentioned previously, joins in Cascading are analogous to joins in SQL. The most commonly used type of join is the inner join, which is the default in CoGroup. An inner join tries to match each Tuple on the "lhs" with every Tuple on the "rhs," based on matching fields values. If either side of an inner join has no tuples for a given value, no tuples are joined. An outer join, conversely, allows for either side to be empty and simply substitutes a Tuple containing null values for the nonexistent tuple.
The following sample data is used in the discussion below to explain and compare the different types of join:
LHS = [0,a] [1,b] [2,c] RHS = [0,A] [2,C] [3,D]
In each join type below, the values are joined on the first tuple position (the join key), which is a numeric value. Note that, when Cascading joins tuples, the resulting Tuple contains all the incoming values from incoming tuple streams, and does not discard the duplicate key fields. As mentioned above, on outer joins where there is no equivalent key in the alternate stream, null values are used.
For example using the data above, the result Tuple of an inner join with join key value of 2 would be [2,c,2,C]. The result Tuple of an outer join with join key value of 1 would be [1,b,null,null].
- InnerJoin
-
An inner join only returns a joined Tuple if neither bag for the join key is empty.
[0,a,0,A] [2,c,2,C]
- OuterJoin
-
An outer join performs a join if one bag (left or right) for the join key is empty or if neither bag is empty.
[0,a,0,A] [1,b,null,null] [2,c,2,C] [null,null,3,D]
- LeftJoin
-
A left join can also be stated as a left inner and right outer join, where it is acceptable for the right bag to be empty (but not the left).
[0,a,0,A] [1,b,null,null] [2,c,2,C]
- RightJoin
-
A right join can also be stated as a left outer and right inner join, where it is acceptable for the left bag to be empty (but not the right).
[0,a,0,A] [2,c,2,C] [null,null,3,D]
- MixedJoin
-
A mixed join is where 3 or more tuple streams are joined, using a small Boolean array to specify each of the join types to use. For more information, see the cascading.pipe.cogroup.MixedJoin class in the Javadoc.
- Custom
-
Developers can use the cascading.pipe.cogroup.Joiner class as a subclass to create custom join operations.
Scaling
CoGroup attempts to store the entire unique-keys tuple "bag" from the right-hand stream in memory for rapid joining to the left-hand stream. If the bag is very large, it may exceed a configurable threshold and be spilled to disk, reducing performance and potentially causing a memory error (if the threshold value is too large). Thus it is usually best to put the stream with the largest groupings on the left-hand side and, if necessary, adjust the spill threshold as described in the Javadoc.
HashJoin
HashJoin performs a join (similar to a SQL join) on two or more streams, and emits a stream of tuples that contain fields from all of the input streams. With a join, the tuples in the different input streams do not typically contain the same set of fields.
As with CoGroup, the field names must all be unique, including the names of the key fields, to avoid duplicate field names in the emitted Tuple. If necessary, use the declaredFields argument to specify unique field names for the output.
An inner join is performed by default, but you can choose inner, outer, left, right, or mixed (three or more streams). Self-joins are permitted. Developers can also create custom Joiners if desired. For more information on types of joins, refer to The Joiner class or the Javadoc.
Fields lhsFields = new Fields( "fieldA", "fieldB" );
Fields rhsFields = new Fields( "fieldC", "fieldD" );
Pipe join =
new HashJoin( lhs, lhsFields, rhs, rhsFields, new InnerJoin() );
The example above performs an inner join on two streams ("lhs" and "rhs"), based on common values in two fields. The field names that are specified in lhsFields and rhsFields are among the field names previously declared for the two input streams.
Scaling
For joins that do not require grouping, HashJoin provides faster execution than CoGroup, but it operates within stricter limitations. It is optimized for joining one or more small streams to no more than one large stream.
Unlike CoGroup, HashJoin attempts to keep the entire right-hand stream in memory for rapid comparison (not just the current grouping, as no grouping is performed for a HashJoin). Thus a very large tuple stream in the right-hand stream may exceed a configurable spill-to-disk threshold, reducing performance and potentially causing a memory error. For this reason, it’s advisable to use the smaller stream on the right-hand side. Additionally, it may be helpful to adjust the spill threshold as described in the Javadoc.
Due to the potential difficulties of using HashJoin (as compared to the slower but much more reliable CoGroup), developers should thoroughly understand this class before attempting to use it in a production environment. |
Frequently the HashJoin is fed a filtered-down stream of Tuples from what was originally a very large file. To prevent the large file from being replicated throughout a cluster, use a Checkpoint pipe at the point where the data has been filtered down to its smallest prior to entering a HashJoin. The Tuple stream will persist on disk. A new FlowStep (MapReduce job) is created to read the smaller data size more efficiently. Not all platforms support checkpointing. |