Cascading 3.3 User Guide - Using and Developing Operations

1. Introduction

1.1. What Is Cascading?

2. Diving into the APIs

2.1. Anatomy of a Word-Count Application

3. Cascading Basic Concepts

3.1. Terminology

3.3. Pipes

3.4. Platforms

3.6. Sink Modes

3.7. Flows

4. Tuple Fields

4.1. Field Sets

5. Pipe Assemblies

5.1. Each and Every Pipes

5.2. Merge

5.3. GroupBy

5.4. CoGroup

5.5. HashJoin

6. Flows

6.1. Creating Flows from Pipe Assemblies

7. Cascades

7.1. Creating a Cascade

8. Configuring

8.1. Introduction

9. Local Platform

9.1. Building an Application

10. The Apache Hadoop Platforms

10.1. What is Apache Hadoop?

11. Apache Hadoop MapReduce Platform

11.1. Configuring Applications

11.3. Building

12. Apache Tez Platform

12.1. Configuring Applications

12.2. Building

13. Using and Developing Operations

13.1. Introduction

13.2. Functions

13.3. Filters

13.4. Aggregators

13.5. Buffers

14. Custom Taps and Schemes

14.1. Introduction

14.2. Custom Taps

15. Advanced Processing

15.1. SubAssemblies

16. Built-In Operations

16.1. Identity Function

16.9. Assertions

16.11. Buffers

17. Built-in SubAssemblies

17.1. Optimized Aggregations

18. Cascading Best Practices

18.1. Unit Testing

19. Extending Cascading

19.1. Scripting

20. Cookbook: Code Examples of Cascading Idioms

20.1. Tuples and Fields

20.5. API Usage

21. The Cascading Process Planner

21.1. FlowConnector

21.3. RuleRegistry

Using and Developing Operations

Introduction

Previous sections of this guide covered setting up sources and sinks, shaping the data streams, referencing the data fields, and so on. Within this Pipe framework, Operations are used to act upon the data — e.g., alter it, filter it, analyze it, or transform it. You can use the standard Operations in the Cascading library to create powerful and robust applications by combining them in chains (much like UNIX operations, such as sed, grep, sort, uniq, and awk). And if you want to go further, it’s also very simple to develop custom Operations in Cascading.

There are four kinds of Operations:

operations

Operations typically require a multivalued Tuple as an input value. And all Operations can return zero or more multivalued Tuple results — except Filter, which simply returns a Boolean indicating whether to discard the current Tuple.

A Function, for instance, can parse a string passed by an argument Tuple and return a new Tuple for every value parsed (i.e., one Tuple for each "word"), or it may create a single Tuple with every parsed value included as an element in one Tuple object (e.g., one Tuple with "first-name" and "last-name" fields).

In theory, a Function can be used as a Filter by not emitting a Tuple result. However, the Filter type is optimized for filtering, and can be combined with logical Operations such as Not, And, Or, etc.

During runtime, Operations actually receive arguments as one or more instances of the TupleEntry object. The TupleEntry object holds the current Tuple, and a Fields object that defines field names for positions within the Tuple.

Except for Filter, all Operations must declare result Fields. If the actual output does not match the declaration, the process fails. For example, consider a Function written to parse words out of a String and return a new Tuple for each word. If it declares that its intended output is a Tuple with a single field named "word," and then returns more values in the Tuple beyond that single "word," processing halts. However, Operations designed to return arbitrary numbers of values in a result Tuple may declare Fields.UNKNOWN.

The Cascading planner always attempts to "fail fast" where possible by checking the field name dependencies between Pipes and Operations, but there may be some cases the planner cannot assess.

All Operations must be wrapped by either an Each or an Every pipe instance. The pipe is responsible for passing in an argument Tuple and accepting the resulting output Tuple from the Operation. In addition, the pipe merges or replaces the incoming Tuple values with the results of the Operation.

Operations by default are assumed by the Cascading planner to be "safe." A safe Operation is idempotent; it can safely execute multiple times on the exact same record or Tuple; it has no side-effects. If a custom Operation is not idempotent, the isSafe() method must return false. This value influences how the Cascading planner renders the Flow under certain circumstances.

Functions

A Function expects a stream of individual argument Tuples, and returns zero or more result Tuples for each of them. Like a Filter, a Function is used with an Each pipe, which may follow any pipe type.

To create a custom Function, subclass the cascading.operation.BaseOperation class and implement the cascading.operation.Function interface. Since the BaseOperation has been subclassed, the operate method, as defined on the Function interface, is the only method that must be implemented.

Example 1. Custom Function
public class SomeFunction extends BaseOperation implements Function
  {
  public void operate( FlowProcess flowProcess, FunctionCall functionCall )
    {
    // get the arguments TupleEntry
    TupleEntry arguments = functionCall.getArguments();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    // insert some values into the result Tuple

    // return the result Tuple
    functionCall.getOutputCollector().add( result );
    }
  }

Whenever possible, functions should declare both the number of argument values they expect and the field names of the Tuple they return. However, these declarations are optional, as explained below.

For input, functions must accept one or more values in a Tuple as arguments. If not specified, the default is to accept any number of values (Operation.ANY). Cascading verifies during planning that the number of arguments selected matches the number of arguments expected.

For output, it is a good practice to declare the field names that a function returns. If not specified, the default is Fields.UNKNOWN, meaning that an unknown number of fields are returned in each Tuple.

Both declarations — the number of input arguments and declared result fields — must be done on the constructor, either by passing default values to the super constructor or by accepting the values from the user via a constructor implementation.

Example 2. AddValuesFunction
public class AddValuesFunction extends BaseOperation implements Function
  {
  public AddValuesFunction()
    {
    // expects 2 arguments, fail otherwise
    super( 2, new Fields( "sum" ) );
    }

  public AddValuesFunction( Fields fieldDeclaration )
    {
    // expects 2 arguments, fail otherwise
    super( 2, fieldDeclaration );
    }

  public void operate( FlowProcess flowProcess, FunctionCall functionCall )
    {
    // get the arguments TupleEntry
    TupleEntry arguments = functionCall.getArguments();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    // sum the two arguments
    int sum = arguments.getInteger( 0 ) + arguments.getInteger( 1 );

    // add the sum value to the result Tuple
    result.add( sum );

    // return the result Tuple
    functionCall.getOutputCollector().add( result );
    }
  }

The example above implements a Function that accepts two values in the argument Tuple, adds them together, and returns the result in a new Tuple.

The first constructor above assumes a default field name for the field that this Function returns. In practice, it’s good to give the user the option of overriding the declared field names, allowing them to prevent possible field name collisions that might cause the planner to fail.

This line is especially important:

int sum = arguments.getInteger( 0 ) + arguments.getInteger( 1 );

Note that ordinal numbers, not field names, are used here to get argument values. If field names are used, the AddValuesFunction is coupled to the incoming stream.

Example 3. AddValuesFunction and Context
public class EfficientAddValuesFunction
  extends BaseOperation<Tuple> implements Function<Tuple>
  {
  public EfficientAddValuesFunction()
    {
    // expects 2 arguments, fail otherwise
    super( 2, new Fields( "sum" ) );
    }

  public EfficientAddValuesFunction( Fields fieldDeclaration )
    {
    // expects 2 arguments, fail otherwise
    super( 2, fieldDeclaration );
    }

  @Override
  public void prepare( FlowProcess flowProcess, OperationCall<Tuple> call )
    {
    // create a reusable Tuple of size 1
    call.setContext( Tuple.size( 1 ) );
    }

  public void operate( FlowProcess flowProcess, FunctionCall<Tuple> call )
    {
    // get the arguments TupleEntry
    TupleEntry arguments = call.getArguments();

    // get our previously created Tuple
    Tuple result = call.getContext();

    // sum the two arguments
    int sum = arguments.getInteger( 0 ) + arguments.getInteger( 1 );

    // set the sum value on the result Tuple
    result.set( 0, sum );

    // return the result Tuple
    call.getOutputCollector().add( result );
    }

  @Override
  public void cleanup( FlowProcess flowProcess, OperationCall<Tuple> call )
    {
    call.setContext( null );
    }
  }

This example, a minor variation on the previous one, introduces the use of a "context" object and prepare() and cleanup() methods.

All Operations allow for a context object, simply a user-defined object that can hold user state information between calls to the operate() method. This allows for a given instance of the Operation to be thread safe on a platform that may use multiple threads of execution versus multiple processes. It also allows deferring initialization of complex resources until the Operation is engaged.

The prepare() and cleanup() methods are invoked once per thread of execution. In a clustered-platform environment, the methods are invoked only on the cluster side and never on the client.

In Example 3, a Tuple is used as the context; a more complex type is not necessary. Also note that the Tuple is not storing state, but is reused to reduce the number of new Object instances. In Cascading, it is perfectly safe to output the same Tuple instance from operate(). The method functionCall.getOutputCollector().add( result ) does not return until the resulting Tuple has been processed, copied, or persisted downstream.

Filters

A Filter expects a stream of individual argument Tuples and returns a boolean value for each one, stating whether it should be discarded. Like a Function, a Filter is used with an Each pipe, which may follow any pipe type.

To create a custom Filter, subclass the class cascading.operation.BaseOperation and implement the interface cascading.operation.Filter. Because BaseOperation has been subclassed, the isRemove method, as defined on the Filter interface, is the only method that must be implemented.

Example 4. Custom Filter
public class SomeFilter extends BaseOperation implements Filter
  {
  public boolean isRemove( FlowProcess flowProcess, FilterCall call )
    {
    // get the arguments TupleEntry
    TupleEntry arguments = call.getArguments();

    // initialize the return result
    boolean isRemove = false;

    // test the argument values and set isRemove accordingly

    return isRemove;
    }
  }

Filters must accept one or more values in a Tuple as arguments. When coding a Filter, declare the number of argument values that you want the Filter to accept. If not specified, the default is to accept any number of values (Operation.ANY). Cascading verifies during planning that the number of arguments selected matches the number of arguments expected.

The number of arguments declaration must be done on the constructor, either by passing a default value to the super constructor or by accepting the value from the user via a constructor implementation.

Example 5. StringLengthFilter
public class StringLengthFilter extends BaseOperation implements Filter
  {
  public StringLengthFilter()
    {
    // expects 2 arguments, fail otherwise
    super( 2 );
    }

  public boolean isRemove( FlowProcess flowProcess, FilterCall call )
    {
    // get the arguments TupleEntry
    TupleEntry arguments = call.getArguments();

    // filter out the current Tuple if the first argument length is greater
    // than the second argument integer value
    return arguments.getString( 0 ).length() > arguments.getInteger( 1 );
    }
  }

The example above implements a Filter that accepts two arguments and filters out the current Tuple if the first argument, String length, is greater than the integer value of the second argument.

Aggregators

An Aggregator expects a stream of tuple groups (the output of a GroupBy or CoGroup pipe), and returns zero or more result tuples for every group.

An Aggregator may only be used with an Every pipe — an Every may follow a GroupBy, a CoGroup, or another Every pipe, but not an Each.

To create a custom Aggregator, subclass the class cascading.operation.BaseOperation and implement the interface cascading.operation.Aggregator. Because BaseOperation has been subclassed, the start, aggregate, and complete methods, as defined on the Aggregator interface, are the only methods that must be implemented.

Example 6. Custom Aggregator
public class SomeAggregator extends BaseOperation<SomeAggregator.Context>
  implements Aggregator<SomeAggregator.Context>
  {
  public static class Context
    {
    Object value;
    }

  public void start( FlowProcess flowProcess,
                     AggregatorCall<Context> aggregatorCall )
    {
    // get the group values for the current grouping
    TupleEntry group = aggregatorCall.getGroup();

    // create a new custom context object
    Context context = new Context();

    // optionally, populate the context object

    // set the context object
    aggregatorCall.setContext( context );
    }

  public void aggregate( FlowProcess flowProcess,
                         AggregatorCall<Context> aggregatorCall )
    {
    // get the current argument values
    TupleEntry arguments = aggregatorCall.getArguments();

    // get the context for this grouping
    Context context = aggregatorCall.getContext();

    // update the context object
    }

  public void complete( FlowProcess flowProcess,
                        AggregatorCall<Context> aggregatorCall )
    {
    Context context = aggregatorCall.getContext();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    // insert some values into the result Tuple based on the context

    // return the result Tuple
    aggregatorCall.getOutputCollector().add( result );
    }
  }

Whenever possible, Aggregators should declare both the number of argument values they expect and the field names of the Tuple they return. However, these declarations are optional, as explained below.

For input, Aggregators must accept one or more values in a Tuple as arguments. If not specified, the default is to accept any number of values (Operation.ANY). Cascading verifies during planning that the number of arguments selected is the same as the number of arguments expected.

For output, it is best practice to code an Aggregator so that it declares the fields that are returned. If not specified, the default is Fields.UNKNOWN, meaning that an unknown number of fields are returned in each Tuple.

Both declarations — the number of input arguments and declared result fields — must be done on the constructor, either by passing default values to the super constructor or by accepting the values from the user via a constructor implementation.

Example 7. AddTuplesAggregator
public class AddTuplesAggregator
  extends BaseOperation<AddTuplesAggregator.Context>
  implements Aggregator<AddTuplesAggregator.Context>
  {
  public static class Context
    {
    long value = 0;
    }

  public AddTuplesAggregator()
    {
    // expects 1 argument, fail otherwise
    super( 1, new Fields( "sum" ) );
    }

  public AddTuplesAggregator( Fields fieldDeclaration )
    {
    // expects 1 argument, fail otherwise
    super( 1, fieldDeclaration );
    }

  public void start( FlowProcess flowProcess,
                     AggregatorCall<Context> aggregatorCall )
    {
    // set the context object, starting at zero
    aggregatorCall.setContext( new Context() );
    }

  public void aggregate( FlowProcess flowProcess,
                         AggregatorCall<Context> aggregatorCall )
    {
    TupleEntry arguments = aggregatorCall.getArguments();
    Context context = aggregatorCall.getContext();

    // add the current argument value to the current sum
    context.value += arguments.getInteger( 0 );
    }

  public void complete( FlowProcess flowProcess,
                        AggregatorCall<Context> aggregatorCall )
    {
    Context context = aggregatorCall.getContext();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    // set the sum
    result.add( context.value );

    // return the result Tuple
    aggregatorCall.getOutputCollector().add( result );
    }
  }

The example above implements an Aggregator that accepts a value in the argument Tuple, adds all the argument tuples in the current grouping, and returns the result as a new Tuple.

The first constructor above assumes a default field name that this Aggregator returns. In practice, it’s good to give the user the option of overriding the declared field names, allowing them to prevent possible field name collisions that might cause the planner to fail.

There are several constraints on the use of Aggregators that may not be self-evident. These constraints are detailed in the Javadoc.

Buffers

A Buffer expects a set of argument tuples in the same grouping, and may return zero or more result tuples.

A Buffer is very similar to an Aggregator, except that it receives the current grouping Tuple, and an Iterator of all the arguments it expects, for every value Tuple in the current grouping — all on the same method call.

[classname]+Buffer+s are similar to the typical Reducer interface in MapReduce, and is best used for operations that need visibility to the previous and next elements in the stream for the current group — such as smoothing a series of time stamps where there are missing values or creating a running average.

A Buffer may only be used with an Every pipe, and it may only follow a GroupBy or CoGroup pipe type.

To create a custom Buffer, subclass the class cascading.operation.BaseOperation and implement the interface cascading.operation.Buffer. Because BaseOperation has been subclassed, the operate method, as defined on the Buffer interface, is the only method that must be implemented.

Example 8. Custom Buffer
public class SomeBuffer extends BaseOperation implements Buffer
  {
  public void operate( FlowProcess flowProcess, BufferCall bufferCall )
    {
    // get the group values for the current grouping
    TupleEntry group = bufferCall.getGroup();

    // get all the current argument values for this grouping
    Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    while( arguments.hasNext() )
      {
      TupleEntry argument = arguments.next();

      // insert some values into the result Tuple based on the arguemnts
      }

    // return the result Tuple
    bufferCall.getOutputCollector().add( result );
    }
  }

Buffers should declare both the number of argument values they expect and the field names of the Tuple they return.

For input, Buffers must accept one or more values in a Tuple as arguments. If not specified, the default is to accept any number of values (Operation.ANY). During the planning phase, Cascading verifies that the number of arguments selected is the same as the number of arguments expected.

For output, it’s good practice for Buffers to declare the field names they return. If not specified, the default is Fields.UNKNOWN, meaning that an unknown number of fields are returned in each Tuple.

Both declarations — the number of input arguments and declared result fields — must be done on the constructor, either by passing default values to the super constructor or by accepting the values from the user via a constructor implementation.

Example 9. Average Buffer
public class AverageBuffer extends BaseOperation implements Buffer
  {

  public AverageBuffer()
    {
    super( 1, new Fields( "average" ) );
    }

  public AverageBuffer( Fields fieldDeclaration )
    {
    super( 1, fieldDeclaration );
    }

  public void operate( FlowProcess flowProcess, BufferCall bufferCall )
    {
    // init the count and sum
    long count = 0;
    long sum = 0;

    // get all the current argument values for this grouping
    Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();

    while( arguments.hasNext() )
      {
      count++;
      sum += arguments.next().getInteger( 0 );
      }

    // create a Tuple to hold our result values
    Tuple result = new Tuple( sum / count );

    // return the result Tuple
    bufferCall.getOutputCollector().add( result );
    }
  }

The example above implements a buffer that accepts a value in the argument Tuple, adds all these argument tuples in the current grouping, and returns the result divided by the number of argument tuples counted in a new Tuple.

The first constructor above assumes a default field name for the field that this Buffer returns. In practice, it’s good to give the user the option of overriding the declared field names, allowing them to prevent possible field name collisions that might cause the planner to fail.

Note that this example is somewhat artificial. In actual practice, an Aggregator would be a better way to compute averages for an entire dataset. A Buffer is better suited for calculating running averages across very large spans, for example.

There are several constraints on the use of Buffers that may not be self-evident. These constraints are detailed in the Javadoc.

As with the Function example above, a Buffer may define a custom context object and implement the prepare() and cleanup() methods to maintain shared state, or re-use outgoing Tuple instances for efficiency.

Operation and BaseOperation

In all of the above sections, the cascading.operation.BaseOperation class was subclassed. This class is an implementation of the cascading.operation.Operation interface, and provides a few default method implementations. It is not strictly required to extend BaseOperation when implementing this interface, but it is very convenient to do so.

When developing custom operations, the developer may need to initialize and destroy a resource. For example, when doing pattern matching, you might need to initialize a java.util.regex.Matcher and use it in a thread-safe way. Or you might need to open, and eventually close, a remote connection. But for performance reasons, the operation should not create or destroy the connection for each Tuple or every Tuple group that passes through. Nor should user code store any state values in a Class instance or static field.

For this reason, the interface Operation declares two methods: prepare() and cleanup().

In the case of a clustered platform, the prepare() and cleanup() methods are called once per cluster-side unit of processing (typically a JVM).

The prepare() method is called before any argument Tuple is passed. The cleanup() method is called after all Tuple arguments are processed in the operation.

Within each of these methods, the developer can initialize or destroy a "context" object that can hold an open-socket connection or Matcher instance. This context is user defined, and is the same mechanism used by the Aggregator operation — except that the Aggregator is also given the opportunity to initialize and destroy its context, via the start() and complete() methods.

Note that if a "context" object is used, its type should be declared in the subclass class declaration using the Java Generics notation.