5.4 Aggregator

An Aggregator expects a stream of tuple groups (the output of a GroupBy or CoGroup pipe), and returns zero or more result tuples for every group. An Aggregator may only be used with an Every pipe - which may follow a GroupBy, a CoGroup, or another Every pipe, but not an Each.

To create a custom Aggregator, subclass the class cascading.operation.BaseOperation and implement the interface cascading.operation.Aggregator. Because BaseOperation has been subclassed, the start, aggregate, and complete methods, as defined on the Aggregator interface, are the only methods that must be implemented.

Example 5.6. Custom Aggregator

public class SomeAggregator extends BaseOperation<SomeAggregator.Context>
  implements Aggregator<SomeAggregator.Context>
  {
  public static class Context
    {
    Object value;
    }

  public void start( FlowProcess flowProcess,
                     AggregatorCall<Context> aggregatorCall )
    {
    // get the group values for the current grouping
    TupleEntry group = aggregatorCall.getGroup();

    // create a new custom context object
    Context context = new Context();

    // optionally, populate the context object

    // set the context object
    aggregatorCall.setContext( context );
    }

  public void aggregate( FlowProcess flowProcess,
                         AggregatorCall<Context> aggregatorCall )
    {
    // get the current argument values
    TupleEntry arguments = aggregatorCall.getArguments();

    // get the context for this grouping
    Context context = aggregatorCall.getContext();

    // update the context object
    }

  public void complete( FlowProcess flowProcess,
                        AggregatorCall<Context> aggregatorCall )
    {
    Context context = aggregatorCall.getContext();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    // insert some values into the result Tuple based on the context

    // return the result Tuple
    aggregatorCall.getOutputCollector().add( result );
    }
  }

Whenever possible, Aggregators should declare both the number of argument values they expect and the field names of the Tuple they return. However, these declarations are optional, as explained below.

For input, Aggregators must accept one or more values in a Tuple as arguments. If not specified, the default is to accept any number of values (Operation.ANY). Cascading verifies during planning that the number of arguments selected is the same as the number of arguments expected.

For output, it's good practice for Aggregators to declare the field names they return. If not specified, the default is Fields.UNKNOWN, meaning that an unknown number of fields are returned in each Tuple.

Both declarations - the number of input arguments and declared result fields - must be done on the constructor, either by passing default values to the super constructor, or by accepting the values from the user via a constructor implementation.

Example 5.7. Add Tuples Aggregator

public class AddTuplesAggregator
    extends BaseOperation<AddTuplesAggregator.Context>
    implements Aggregator<AddTuplesAggregator.Context>
  {
  public static class Context
    {
    long value = 0;
    }

  public AddTuplesAggregator()
    {
    // expects 1 argument, fail otherwise
    super( 1, new Fields( "sum" ) );
    }

  public AddTuplesAggregator( Fields fieldDeclaration )
    {
    // expects 1 argument, fail otherwise
    super( 1, fieldDeclaration );
    }

  public void start( FlowProcess flowProcess,
                     AggregatorCall<Context> aggregatorCall )
    {
    // set the context object, starting at zero
    aggregatorCall.setContext( new Context() );
    }

  public void aggregate( FlowProcess flowProcess,
                         AggregatorCall<Context> aggregatorCall )
    {
    TupleEntry arguments = aggregatorCall.getArguments();
    Context context = aggregatorCall.getContext();

    // add the current argument value to the current sum
    context.value += arguments.getInteger( 0 );
    }

  public void complete( FlowProcess flowProcess,
                        AggregatorCall<Context> aggregatorCall )
    {
    Context context = aggregatorCall.getContext();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    // set the sum
    result.add( context.value );

    // return the result Tuple
    aggregatorCall.getOutputCollector().add( result );
    }
  }

The example above implements an Aggregator that accepts a value in the argument Tuple, adds all the argument tuples in the current grouping, and returns the result as a new Tuple.

The first constructor above assumes a default field name that this Aggregator returns. In practice, it's good to give the user the option of overriding the declared field names, allowing them to prevent possible field name collisions that might cause the planner to fail.

There are several constraints on the use of Aggregators that may not be self-evident. These are detailed in the Javadoc

Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.