An Aggregator
expects a stream of tuple
groups (the output of a GroupBy
or
CoGroup
pipe), and returns zero or more result
tuples for every group. An Aggregator
may only be
used with an Every
pipe - which may follow a
GroupBy
, a CoGroup
, or
another Every
pipe, but not an
Each
.
To create a custom Aggregator
, subclass the
class cascading.operation.BaseOperation
and implement the
interface cascading.operation.Aggregator
. Because
BaseOperation
has been subclassed, the start
,
aggregate
, and complete
methods, as defined on
the Aggregator
interface, are the only methods that must be
implemented.
Example 5.6. Custom Aggregator
public class SomeAggregator extends BaseOperation<SomeAggregator.Context>
implements Aggregator<SomeAggregator.Context>
{
public static class Context
{
Object value;
}
public void start( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
// get the group values for the current grouping
TupleEntry group = aggregatorCall.getGroup();
// create a new custom context object
Context context = new Context();
// optionally, populate the context object
// set the context object
aggregatorCall.setContext( context );
}
public void aggregate( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
// get the current argument values
TupleEntry arguments = aggregatorCall.getArguments();
// get the context for this grouping
Context context = aggregatorCall.getContext();
// update the context object
}
public void complete( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
Context context = aggregatorCall.getContext();
// create a Tuple to hold our result values
Tuple result = new Tuple();
// insert some values into the result Tuple based on the context
// return the result Tuple
aggregatorCall.getOutputCollector().add( result );
}
}
Whenever possible, Aggregators should declare both the number of argument values they expect and the field names of the Tuple they return. However, these declarations are optional, as explained below.
For input, Aggregators must accept one or more values in a Tuple
as arguments. If not specified, the default is to accept any number of
values (Operation.ANY
). Cascading verifies during planning
that the number of arguments selected is the same as the number of
arguments expected.
For output, it's good practice for Aggregators to declare the
field names they return. If not specified, the default is
Fields.UNKNOWN
, meaning that an unknown number of fields
are returned in each Tuple.
Both declarations - the number of input arguments and declared
result fields - must be done on the constructor, either by passing
default values to the super
constructor, or by accepting
the values from the user via a constructor implementation.
Example 5.7. Add Tuples Aggregator
public class AddTuplesAggregator
extends BaseOperation<AddTuplesAggregator.Context>
implements Aggregator<AddTuplesAggregator.Context>
{
public static class Context
{
long value = 0;
}
public AddTuplesAggregator()
{
// expects 1 argument, fail otherwise
super( 1, new Fields( "sum" ) );
}
public AddTuplesAggregator( Fields fieldDeclaration )
{
// expects 1 argument, fail otherwise
super( 1, fieldDeclaration );
}
public void start( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
// set the context object, starting at zero
aggregatorCall.setContext( new Context() );
}
public void aggregate( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
TupleEntry arguments = aggregatorCall.getArguments();
Context context = aggregatorCall.getContext();
// add the current argument value to the current sum
context.value += arguments.getInteger( 0 );
}
public void complete( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
Context context = aggregatorCall.getContext();
// create a Tuple to hold our result values
Tuple result = new Tuple();
// set the sum
result.add( context.value );
// return the result Tuple
aggregatorCall.getOutputCollector().add( result );
}
}
The example above implements an Aggregator
that accepts a value in the argument Tuple, adds all the argument tuples
in the current grouping, and returns the result as a new Tuple.
The first constructor above assumes a default field name that this
Aggregator
returns. In practice, it's good to
give the user the option of overriding the declared field names,
allowing them to prevent possible field name collisions that might cause
the planner to fail.
There are several constraints on the use of Aggregators that may not be self-evident. These are detailed in the Javadoc
Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.