An Aggregator
expects set of argument
Tuples in the same grouping, and may return zero or more result
Tuples.
An Aggregator
may only be used with an
Every
pipe, and it may only follow a
GroupBy
,CoGroup
, or
another Every
pipe type.
To create a customAggregator
, subclass the
class cascading.operation.BaseOperation
and implement the
interfacecascading.operation.Aggregator
. Because
BaseOperation
has been subclassed, thestart
,
aggregate
, and complete
methods, as defined on
the Aggregator
interface, are the only methods that must be
implemented.
Example 5.5. Custom Aggregator
public class SomeAggregator extends BaseOperation<SomeAggregator.Context> implements Aggregator<SomeAggregator.Context> { public static class Context { Object value; } public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) { // get the group values for the current grouping TupleEntry group = aggregatorCall.getGroup(); // create a new custom context object Context context = new Context(); // optionally, populate the context object // set the context object aggregatorCall.setContext( context ); } public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) { // get the current argument values TupleEntry arguments = aggregatorCall.getArguments(); // get the context for this grouping Context context = aggregatorCall.getContext(); // update the context object } public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) { Context context = aggregatorCall.getContext(); // create a Tuple to hold our result values Tuple result = new Tuple(); // insert some values into the result Tuple based on the context // return the result Tuple aggregatorCall.getOutputCollector().add( result ); } }
Aggregators should declare both the number of argument values they expect, and the field names of the Tuple they will return.
Aggregators must accept 1 or more values in a Tuple as arguments,
by default they will accept any number (Operation.ANY
) of
values. Cascading will verify the number of arguments selected match the
number of arguments expected.
Aggregators may optionally declare the field names they return, by
default Aggregators
declare
Fields.UNKNOWN
.
Both declarations must be done on the constructor, either by
passing default values to the super
constructor, or by
accepting the values from the user via a constructor
implementation.
Example 5.6. Add Tuples Aggregator
public class AddTuplesAggregator extends BaseOperation<AddTuplesAggregator.Context> implements Aggregator<AddTuplesAggregator.Context> { public static class Context { long value = 0; } public AddTuplesAggregator() { // expects 1 argument, fail otherwise super( 1, new Fields( "sum" ) ); } public AddTuplesAggregator( Fields fieldDeclaration ) { // expects 1 argument, fail otherwise super( 1, fieldDeclaration ); } public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) { // set the context object, starting at zero aggregatorCall.setContext( new Context() ); } public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) { TupleEntry arguments = aggregatorCall.getArguments(); Context context = aggregatorCall.getContext(); // add the current argument value to the current sum context.value += arguments.getInteger( 0 ); } public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) { Context context = aggregatorCall.getContext(); // create a Tuple to hold our result values Tuple result = new Tuple(); // set the sum result.add( context.value ); // return the result Tuple aggregatorCall.getOutputCollector().add( result ); } }
The example above implements a fully functional
Aggregator
that accepts one value in the argument
Tuple, adds all these argument Tuples in the current grouping, and
returns the result as a new Tuple.
The first constructor assumes a default field name this
Aggregator
will return, but it is a best practice
to always give the user the option to override the declared field names
to prevent any field name collisions that would cause the planner to
fail.
Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.