8.8 Partial Aggregation instead of Combiners

In Hadoop mode, Cascading does not support MapReduce "Combiners". Combiners are a simple optimization allowing some Reduce functions to run on the Map side of MapReduce. Combiners are very powerful in that they reduce the I/O between the Mappers and Reducers - why send all of your Mapper data to Reducers when you can compute some values on the Map side and combine them in the Reducer? But Combiners are limited to Associative and Commutative functions only, such as "sum" and "max". And the process requires that the values emitted by the Map task must be serialized, sorted (which involves deserialization and comparison), deserialized again, and operated on - after which the results are again serialized and sorted. Combiners trade CPU for gains in I/O.

Cascading takes a different approach. It provides a mechanism to perform partial aggregations on the Map side and combine the results on the Reduce side, but trades memory, instead of CPU, for I/O gains by caching values (up to a threshold limit). This bypasses the redundant serialization, deserialization, and sorting. Also, Cascading allows any aggregate function to be implemented - not just Associative and Commutative functions.

Cascading supports a few built-in partial aggregate operations, including AverageBy, CountBy, SumBy, and FirstBy. These are actually SubAssemblies, not Operations, and are subclasses of the AggregateBy SubAssembly. For more on this, see the section on AggregateBy.

Using partial aggregate operations is quite easy. They are actually less verbose than a standard Aggregate operation.

Example 8.10. Using a SumBy

Pipe assembly = new Pipe( "assembly" );

// ...
Fields groupingFields = new Fields( "date" );
Fields valueField = new Fields( "size" );
Fields sumField = new Fields( "total-size" );
assembly =
  new SumBy( assembly, groupingFields, valueField, sumField, long.class );

For composing multiple partial aggregate operations, things are done a little differently.

Example 8.11. Composing partials with AggregateBy

Pipe assembly = new Pipe( "assembly" );

// ...
Fields groupingFields = new Fields( "date" );

// note we do not pass the parent assembly Pipe in
Fields valueField = new Fields( "size" );
Fields sumField = new Fields( "total-size", long.class );
SumBy sumBy = new SumBy( valueField, sumField );

Fields countField = new Fields( "num-events" );
CountBy countBy = new CountBy( countField );

assembly = new AggregateBy( assembly, groupingFields, sumBy, countBy );

It's important to note that a GroupBy Pipe is embedded in the resulting assemblies above. But only one GroupBy is performed in the case of the AggregateBy, and all of the partial aggregations will be performed simultaneously. It is also important to note that, depending on the final pipe assembly, the Map side partial aggregate functions may be planned into the previous Reduce operation in Hadoop, further improving the performance of the application.

Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.