A Buffer
expects set of argument tuples in
the same grouping, and may return zero or more result tuples.
A Buffer
is very similar to an
Aggregator
, except that it receives the current
Grouping Tuple, and an iterator of all the arguments it expects, for
every value Tuple in the current grouping - all on the same method call.
This is very similar to the typical Reducer interface in MapReduce, and
is best used for operations that need visibility to the previous and
next elements in the stream - such as smoothing a series of time-stamps
where there are missing values.
A Buffer
may only be used with an
Every
pipe, and it may only follow a
GroupBy
or CoGroup
pipe
type.
To create a custom Buffer
, subclass the
class cascading.operation.BaseOperation
and implement the
interface cascading.operation.Buffer
. Because
BaseOperation
has been subclassed, the operate
method, as defined on the Buffer
interface, is the only
method that must be implemented.
Example 5.8. Custom Buffer
public class SomeBuffer extends BaseOperation implements Buffer
{
public void operate( FlowProcess flowProcess, BufferCall bufferCall )
{
// get the group values for the current grouping
TupleEntry group = bufferCall.getGroup();
// get all the current argument values for this grouping
Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();
// create a Tuple to hold our result values
Tuple result = new Tuple();
while( arguments.hasNext() )
{
TupleEntry argument = arguments.next();
// insert some values into the result Tuple based on the arguemnts
}
// return the result Tuple
bufferCall.getOutputCollector().add( result );
}
}
Buffers should declare both the number of argument values they expect and the field names of the Tuple they return.
For input, Buffers must accept one or more values in a Tuple as arguments. If not specified, the default is to accept any number of values (Operation.ANY). During the planning phase, Cascading verifies that the number of arguments selected is the same as the number of arguments expected.
For output, it's good practice for Buffers to declare the field
names they return. If not specified, the default is
Fields.UNKNOWN
, meaning that an unknown number of fields
are returned in each Tuple.
Both declarations - the number of input arguments and declared
result fields - must be done on the constructor, either by passing
default values to the super
constructor, or by accepting
the values from the user via a constructor implementation.
Example 5.9. Average Buffer
public class AverageBuffer extends BaseOperation implements Buffer
{
public AverageBuffer()
{
super( 1, new Fields( "average" ) );
}
public AverageBuffer( Fields fieldDeclaration )
{
super( 1, fieldDeclaration );
}
public void operate( FlowProcess flowProcess, BufferCall bufferCall )
{
// init the count and sum
long count = 0;
long sum = 0;
// get all the current argument values for this grouping
Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();
while( arguments.hasNext() )
{
count++;
sum += arguments.next().getInteger( 0 );
}
// create a Tuple to hold our result values
Tuple result = new Tuple( sum / count );
// return the result Tuple
bufferCall.getOutputCollector().add( result );
}
}
The example above implements a buffer that accepts a value in the argument Tuple, adds all these argument tuples in the current grouping, and returns the result divided by the number of argument tuples counted in a new Tuple.
The first constructor above assumes a default field name for the
field that this Buffer
returns. In practice, it's
good to give the user the option of overriding the declared field names,
allowing them to prevent possible field name collisions that might cause
the planner to fail
Note that this example is somewhat artificial. In actual practice,
an Aggregator
would be a better way to compute
averages for an entire dataset. A Buffer
is
better suited for calculating running averages across very large spans,
for example.
There are several constraints on the use of Buffers that may not be self-evident. These are detailed in the Javadoc.
As with the Function
example above, a
Buffer
may define a custom context object and
implement the prepare()
and
cleanup()
methods to maintain state, or re-use
outgoing Tuple
instances for efficiency.
Given a result Tuple instance can be re-used, non-primitive types stored inside the tuple may not be re-used. For example, storing a new Tuple inside the result Tuple. If Cascading needs to cache the result Tuple, it will make a shallow copy of the Tuple, it will not clone/copy any child Tuple instances (or Java Collections or any other mutable Object).
Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.