6.9 Partial Aggregation instead of Combiners

Cascading does not support the so called MapReduce Combiners. Combiners are very powerful in that they reduce the IO between the Mappers and Reducers. Why send all your Mapper to data to Reducers when you can compute some values Map side and combine them in the Reducer. But Combiners are limited to Associative and Commutative functions only, like 'sum' and 'max'. And in order to work, values emitted from the Map task must be serialized, sorted (deserialized and compared), deserialized again and operated on, where again the results are serialized and sorted. Combiners trade CPU for gains in IO.

Cascading takes a different approach by providing a mechanism to perform partial aggregations Map side and also combine them Reduce side. But Cascading chooses to trade Memory for IO gains by caching values (up to a threshold). This approach bypasses the unnecessary serialization, deserialization, and sorting steps. It also allows for any aggregate function to be implemented, not just Associative and Commutative ones.

Cascading has a few built in partial aggregate operations, actually these "operations" are SubAssemblies. Further, they are implementations of the AggregateBy SubAssembly.

Using partial aggregate operations is quite easy, they are actually less verbose than using a standard Aggregate operation.

Example 6.8. Using a SumBy

Pipe assembly = new Pipe( "assembly" );

// ...
Fields groupingFields = new Fields( "date" );
Fields valueField = new Fields( "size" );
Fields sumField = new Fields( "total-size" );
assembly = new SumBy( assembly, groupingFields, valueField, sumField, long.class );

To compose multiple partial aggregate operations, things work slightly differently.

Example 6.9. Composing partials with AggregateBy

Pipe assembly = new Pipe( "assembly" );

// ...
Fields groupingFields = new Fields( "date" );

// note we do not pass the parent assembly Pipe in
Fields valueField = new Fields( "size" );
Fields sumField = new Fields( "total-size" );
SumBy sumBy = new SumBy( valueField, sumField, long.class );

Fields countField = new Fields( "num-events" );
CountBy countBy = new CountBy( countField );

assembly = new AggregateBy( assembly, groupingFields, sumBy, countBy );

It is important to note that a GroupBy Pipe is embedded in the resulting assemblies above. But only one GroupBy will be performed in the case of the AggregateBy, all of the partial aggregations will be performed simultaneously. It is also important to note, depending on the final pipe assembly, the Map side partial aggregate functions may be planned into the previous Reduce operation in Hadoop further improving performance of the application.

Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.