A Buffer
expects set of argument Tuples in
the same grouping, and may return zero or more result Tuples.
The Buffer
is very similar to an
Aggregator
except it receives the current
Grouping Tuple and an iterator of all the arguments it expects for every
value Tuple in the current grouping, all on the same method call. This
is very similar to the typical Reducer interface, and is best used for
operations that need greater visibility to the previous and next
elements in the stream. For example, smoothing a series of time-stamps
where there are missing values.
An Buffer
may only be used with an
Every
pipe, and it may only follow a
GroupBy
or CoGroup
pipe
type.
To create a customBuffer
, subclass the
class cascading.operation.BaseOperation
and implement the
interfacecascading.operation.Buffer
. Because
BaseOperation
has been subclassed, the operate
method, as defined on the Buffer
interface, is the only
method that must be implemented.
Example 5.7. Custom Buffer
public class SomeBuffer extends BaseOperation implements Buffer { public void operate( FlowProcess flowProcess, BufferCall bufferCall ) { // get the group values for the current grouping TupleEntry group = bufferCall.getGroup(); // get all the current argument values for this grouping Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator(); // create a Tuple to hold our result values Tuple result = new Tuple(); while( arguments.hasNext() ) { TupleEntry argument = arguments.next(); // insert some values into the result Tuple based on the arguemnts } // return the result Tuple bufferCall.getOutputCollector().add( result ); } }
Buffer should declare both the number of argument values they expect, and the field names of the Tuple they will return.
Buffers must accept 1 or more values in a Tuple as arguments, by
default they will accept any number (Operation.ANY
) of
values. Cascading will verify the number of arguments selected match the
number of arguments expected.
Buffers may optionally declare the field names they return, by
default Buffers
declare
Fields.UNKNOWN
.
Both declarations must be done on the constructor, either by
passing default values to the super
constructor, or by
accepting the values from the user via a constructor
implementation.
Example 5.8. Average Buffer
public class AverageBuffer extends BaseOperation implements Buffer { public AverageBuffer() { super( 1, new Fields( "average" ) ); } public AverageBuffer( Fields fieldDeclaration ) { super( 1, fieldDeclaration ); } public void operate( FlowProcess flowProcess, BufferCall bufferCall ) { // init the count and sum long count = 0; long sum = 0; // get all the current argument values for this grouping Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator(); while( arguments.hasNext() ) { count++; sum += arguments.next().getInteger( 0 ); } // create a Tuple to hold our result values Tuple result = new Tuple( sum / count ); // return the result Tuple bufferCall.getOutputCollector().add( result ); } }
The example above implements a fully functional buffer that accepts one value in argument Tuple, adds all these argument Tuples in the current grouping, and returns the result divided by the number of argument tuples counted in a new Tuple.
The first constructor assumes a default field name this
Buffer
will return, but it is a best practice to
always give the user the option to override the declared field names to
prevent any field name collisions that would cause the planner to
fail.
Note this example is somewhat fabricated, in practice a
Aggregator
should be implemented to compute
averages. A Buffer
would be better suited for
"running averages" across very large spans, for example.
Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.