In Hadoop mode, Cascading does not support MapReduce "Combiners". Combiners are a simple optimization allowing some Reduce functions to run on the Map side of MapReduce. Combiners are very powerful in that they reduce the I/O between the Mappers and Reducers - why send all of your Mapper data to Reducers when you can compute some values on the Map side and combine them in the Reducer? But Combiners are limited to Associative and Commutative functions only, such as "sum" and "max". And the process requires that the values emitted by the Map task must be serialized, sorted (which involves deserialization and comparison), deserialized again, and operated on - after which the results are again serialized and sorted. Combiners trade CPU for gains in I/O.
Cascading takes a different approach. It provides a mechanism to perform partial aggregations on the Map side and combine the results on the Reduce side, but trades memory, instead of CPU, for I/O gains by caching values (up to a threshold limit). This bypasses the redundant serialization, deserialization, and sorting. Also, Cascading allows any aggregate function to be implemented - not just Associative and Commutative functions.
Cascading supports a few built-in partial aggregate operations, including AverageBy, CountBy, SumBy, and FirstBy. These are actually SubAssemblies, not Operations, and are subclasses of the AggregateBy SubAssembly. For more on this, see the section on AggregateBy.
Using partial aggregate operations is quite easy. They are actually less verbose than a standard Aggregate operation.
Example 8.10. Using a SumBy
Pipe assembly = new Pipe( "assembly" );
// ...
Fields groupingFields = new Fields( "date" );
Fields valueField = new Fields( "size" );
Fields sumField = new Fields( "total-size" );
assembly =
new SumBy( assembly, groupingFields, valueField, sumField, long.class );
For composing multiple partial aggregate operations, things are done a little differently.
Example 8.11. Composing partials with AggregateBy
Pipe assembly = new Pipe( "assembly" );
// ...
Fields groupingFields = new Fields( "date" );
// note we do not pass the parent assembly Pipe in
Fields valueField = new Fields( "size" );
Fields sumField = new Fields( "total-size", long.class );
SumBy sumBy = new SumBy( valueField, sumField );
Fields countField = new Fields( "num-events" );
CountBy countBy = new CountBy( countField );
assembly = new AggregateBy( assembly, groupingFields, sumBy, countBy );
It's important to note that a GroupBy
Pipe
is embedded in the resulting assemblies above. But only one GroupBy is
performed in the case of the AggregateBy, and all of the partial
aggregations will be performed simultaneously. It is also important to
note that, depending on the final pipe assembly, the Map side partial
aggregate functions may be planned into the previous Reduce operation in
Hadoop, further improving the performance of the application.
Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.