5.4 Aggregator

An Aggregator expects set of argument Tuples in the same grouping, and may return zero or more result Tuples.

An Aggregator may only be used with an Every pipe, and it may only follow a GroupBy,CoGroup, or another Every pipe type.

To create a customAggregator, subclass the class cascading.operation.BaseOperation and implement the interfacecascading.operation.Aggregator. Because BaseOperation has been subclassed, thestart, aggregate, and complete methods, as defined on the Aggregator interface, are the only methods that must be implemented.

Example 5.5. Custom Aggregator

public class SomeAggregator extends BaseOperation<SomeAggregator.Context>
  implements Aggregator<SomeAggregator.Context>
  {
  public static class Context
    {
    Object value;
    }

  public void start( FlowProcess flowProcess,
                     AggregatorCall<Context> aggregatorCall )
    {
    // get the group values for the current grouping
    TupleEntry group = aggregatorCall.getGroup();

    // create a new custom context object
    Context context = new Context();

    // optionally, populate the context object

    // set the context object
    aggregatorCall.setContext( context );
    }

  public void aggregate( FlowProcess flowProcess,
                         AggregatorCall<Context> aggregatorCall )
    {
    // get the current argument values
    TupleEntry arguments = aggregatorCall.getArguments();

    // get the context for this grouping
    Context context = aggregatorCall.getContext();

    // update the context object
    }

  public void complete( FlowProcess flowProcess,
                        AggregatorCall<Context> aggregatorCall )
    {
    Context context = aggregatorCall.getContext();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    // insert some values into the result Tuple based on the context

    // return the result Tuple
    aggregatorCall.getOutputCollector().add( result );
    }
  }

Aggregators should declare both the number of argument values they expect, and the field names of the Tuple they will return.

Aggregators must accept 1 or more values in a Tuple as arguments, by default they will accept any number (Operation.ANY) of values. Cascading will verify the number of arguments selected match the number of arguments expected.

Aggregators may optionally declare the field names they return, by default Aggregators declare Fields.UNKNOWN.

Both declarations must be done on the constructor, either by passing default values to the super constructor, or by accepting the values from the user via a constructor implementation.

Example 5.6. Add Tuples Aggregator

public class AddTuplesAggregator
    extends BaseOperation<AddTuplesAggregator.Context>
    implements Aggregator<AddTuplesAggregator.Context>
  {
  public static class Context
    {
    long value = 0;
    }

  public AddTuplesAggregator()
    {
    // expects 1 argument, fail otherwise
    super( 1, new Fields( "sum" ) );
    }

  public AddTuplesAggregator( Fields fieldDeclaration )
    {
    // expects 1 argument, fail otherwise
    super( 1, fieldDeclaration );
    }

  public void start( FlowProcess flowProcess,
                     AggregatorCall<Context> aggregatorCall )
    {
    // set the context object, starting at zero
    aggregatorCall.setContext( new Context() );
    }

  public void aggregate( FlowProcess flowProcess,
                         AggregatorCall<Context> aggregatorCall )
    {
    TupleEntry arguments = aggregatorCall.getArguments();
    Context context = aggregatorCall.getContext();

    // add the current argument value to the current sum
    context.value += arguments.getInteger( 0 );
    }

  public void complete( FlowProcess flowProcess,
                        AggregatorCall<Context> aggregatorCall )
    {
    Context context = aggregatorCall.getContext();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    // set the sum
    result.add( context.value );

    // return the result Tuple
    aggregatorCall.getOutputCollector().add( result );
    }
  }

The example above implements a fully functional Aggregator that accepts one value in the argument Tuple, adds all these argument Tuples in the current grouping, and returns the result as a new Tuple.

The first constructor assumes a default field name this Aggregator will return, but it is a best practice to always give the user the option to override the declared field names to prevent any field name collisions that would cause the planner to fail.

Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.