5.5 Buffer

A Buffer expects set of argument tuples in the same grouping, and may return zero or more result tuples.

A Buffer is very similar to an Aggregator, except that it receives the current Grouping Tuple, and an iterator of all the arguments it expects, for every value Tuple in the current grouping - all on the same method call. This is very similar to the typical Reducer interface in MapReduce, and is best used for operations that need visibility to the previous and next elements in the stream - such as smoothing a series of time-stamps where there are missing values.

A Buffer may only be used with an Every pipe, and it may only follow a GroupBy or CoGroup pipe type.

To create a custom Buffer, subclass the class cascading.operation.BaseOperation and implement the interface cascading.operation.Buffer. Because BaseOperation has been subclassed, the operate method, as defined on the Buffer interface, is the only method that must be implemented.

Example 5.8. Custom Buffer

public class SomeBuffer extends BaseOperation implements Buffer
  {
  public void operate( FlowProcess flowProcess, BufferCall bufferCall )
    {
    // get the group values for the current grouping
    TupleEntry group = bufferCall.getGroup();

    // get all the current argument values for this grouping
    Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    while( arguments.hasNext() )
      {
      TupleEntry argument = arguments.next();

      // insert some values into the result Tuple based on the arguemnts
      }

    // return the result Tuple
    bufferCall.getOutputCollector().add( result );
    }
  }

Buffers should declare both the number of argument values they expect and the field names of the Tuple they return.

For input, Buffers must accept one or more values in a Tuple as arguments. If not specified, the default is to accept any number of values (Operation.ANY). During the planning phase, Cascading verifies that the number of arguments selected is the same as the number of arguments expected.

For output, it's good practice for Buffers to declare the field names they return. If not specified, the default is Fields.UNKNOWN, meaning that an unknown number of fields are returned in each Tuple.

Both declarations - the number of input arguments and declared result fields - must be done on the constructor, either by passing default values to the super constructor, or by accepting the values from the user via a constructor implementation.

Example 5.9. Average Buffer

public class AverageBuffer extends BaseOperation implements Buffer
  {

  public AverageBuffer()
    {
    super( 1, new Fields( "average" ) );
    }

  public AverageBuffer( Fields fieldDeclaration )
    {
    super( 1, fieldDeclaration );
    }

  public void operate( FlowProcess flowProcess, BufferCall bufferCall )
    {
    // init the count and sum
    long count = 0;
    long sum = 0;

    // get all the current argument values for this grouping
    Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();

    while( arguments.hasNext() )
      {
      count++;
      sum += arguments.next().getInteger( 0 );
      }

    // create a Tuple to hold our result values
    Tuple result = new Tuple( sum / count );

    // return the result Tuple
    bufferCall.getOutputCollector().add( result );
    }
  }

The example above implements a buffer that accepts a value in the argument Tuple, adds all these argument tuples in the current grouping, and returns the result divided by the number of argument tuples counted in a new Tuple.

The first constructor above assumes a default field name for the field that this Buffer returns. In practice, it's good to give the user the option of overriding the declared field names, allowing them to prevent possible field name collisions that might cause the planner to fail

Note that this example is somewhat artificial. In actual practice, an Aggregator would be a better way to compute averages for an entire dataset. A Buffer is better suited for calculating running averages across very large spans, for example.

There are several constraints on the use of Buffers that may not be self-evident. These are detailed in the Javadoc.

As with the Function example above, a Buffer may define a custom context object and implement the prepare() and cleanup() methods to maintain state, or re-use outgoing Tuple instances for efficiency.

Given a result Tuple instance can be re-used, non-primitive types stored inside the tuple may not be re-used. For example, storing a new Tuple inside the result Tuple. If Cascading needs to cache the result Tuple, it will make a shallow copy of the Tuple, it will not clone/copy any child Tuple instances (or Java Collections or any other mutable Object).

Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.