5.5 Buffer

A Buffer expects set of argument Tuples in the same grouping, and may return zero or more result Tuples.

The Buffer is very similar to an Aggregator except it receives the current Grouping Tuple and an iterator of all the arguments it expects for every value Tuple in the current grouping, all on the same method call. This is very similar to the typical Reducer interface, and is best used for operations that need greater visibility to the previous and next elements in the stream. For example, smoothing a series of time-stamps where there are missing values.

An Buffer may only be used with an Every pipe, and it may only follow a GroupBy or CoGroup pipe type.

To create a customBuffer, subclass the class cascading.operation.BaseOperation and implement the interfacecascading.operation.Buffer. Because BaseOperation has been subclassed, the operate method, as defined on the Buffer interface, is the only method that must be implemented.

Example 5.7. Custom Buffer

public class SomeBuffer extends BaseOperation implements Buffer
  {
  public void operate( FlowProcess flowProcess, BufferCall bufferCall )
    {
    // get the group values for the current grouping
    TupleEntry group = bufferCall.getGroup();

    // get all the current argument values for this grouping
    Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    while( arguments.hasNext() )
      {
      TupleEntry argument = arguments.next();

      // insert some values into the result Tuple based on the arguemnts
      }

    // return the result Tuple
    bufferCall.getOutputCollector().add( result );
    }
  }

Buffer should declare both the number of argument values they expect, and the field names of the Tuple they will return.

Buffers must accept 1 or more values in a Tuple as arguments, by default they will accept any number (Operation.ANY) of values. Cascading will verify the number of arguments selected match the number of arguments expected.

Buffers may optionally declare the field names they return, by default Buffers declare Fields.UNKNOWN.

Both declarations must be done on the constructor, either by passing default values to the super constructor, or by accepting the values from the user via a constructor implementation.

Example 5.8. Average Buffer

public class AverageBuffer extends BaseOperation implements Buffer
  {

  public AverageBuffer()
    {
    super( 1, new Fields( "average" ) );
    }

  public AverageBuffer( Fields fieldDeclaration )
    {
    super( 1, fieldDeclaration );
    }

  public void operate( FlowProcess flowProcess, BufferCall bufferCall )
    {
    // init the count and sum
    long count = 0;
    long sum = 0;

    // get all the current argument values for this grouping
    Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();

    while( arguments.hasNext() )
      {
      count++;
      sum += arguments.next().getInteger( 0 );
      }

    // create a Tuple to hold our result values
    Tuple result = new Tuple( sum / count );

    // return the result Tuple
    bufferCall.getOutputCollector().add( result );
    }
  }

The example above implements a fully functional buffer that accepts one value in argument Tuple, adds all these argument Tuples in the current grouping, and returns the result divided by the number of argument tuples counted in a new Tuple.

The first constructor assumes a default field name this Buffer will return, but it is a best practice to always give the user the option to override the declared field names to prevent any field name collisions that would cause the planner to fail.

Note this example is somewhat fabricated, in practice a Aggregator should be implemented to compute averages. A Buffer would be better suited for "running averages" across very large spans, for example.

Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.