5.2 Functions

A Function expects a stream of individual argument Tuples, and returns zero or more result Tuples for each of them. Like a Filter, a Function is used with an Each pipe, which may follow any pipe type.

To create a custom Function, subclass the class cascading.operation.BaseOperation and implement the interface cascading.operation.Function. Since the BaseOperation has been subclassed, the operate method, as defined on the Function interface, is the only method that must be implemented.

Example 5.1. Custom Function

public class SomeFunction extends BaseOperation implements Function
  {
  public void operate( FlowProcess flowProcess, FunctionCall functionCall )
    {
    // get the arguments TupleEntry
    TupleEntry arguments = functionCall.getArguments();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    // insert some values into the result Tuple

    // return the result Tuple
    functionCall.getOutputCollector().add( result );
    }
  }

Whenever possible, functions should declare both the number of argument values they expect and the field names of the Tuple they return. However, these declarations are optional, as explained below.

For input, functions must accept one or more values in a Tuple as arguments. If not specified, the default is to accept any number of values (Operation.ANY). Cascading verifies during planning that the number of arguments selected matches the number of arguments expected.

For output, it's a good practice to declare the field names that a function returns. If not specified, the default is Fields.UNKNOWN, meaning that an unknown number of fields are returned in each Tuple.

Both declarations - the number of input arguments and declared result fields - must be done on the constructor, either by passing default values to the super constructor, or by accepting the values from the user via a constructor implementation.

Example 5.2. Add Values Function

public class AddValuesFunction extends BaseOperation implements Function
  {
  public AddValuesFunction()
    {
    // expects 2 arguments, fail otherwise
    super( 2, new Fields( "sum" ) );
    }

  public AddValuesFunction( Fields fieldDeclaration )
    {
    // expects 2 arguments, fail otherwise
    super( 2, fieldDeclaration );
    }

  public void operate( FlowProcess flowProcess, FunctionCall functionCall )
    {
    // get the arguments TupleEntry
    TupleEntry arguments = functionCall.getArguments();

    // create a Tuple to hold our result values
    Tuple result = new Tuple();

    // sum the two arguments
    int sum = arguments.getInteger( 0 ) + arguments.getInteger( 1 );

    // add the sum value to the result Tuple
    result.add( sum );

    // return the result Tuple
    functionCall.getOutputCollector().add( result );
    }
  }

The example above implements a Function that accepts two values in the argument Tuple, adds them together, and returns the result in a new Tuple.

The first constructor above assumes a default field name for the field that this Function returns. In practice, it's good to give the user the option of overriding the declared field names, allowing them to prevent possible field name collisions that might cause the planner to fail.

This line is especially important:

int sum = arguments.getInteger( 0 ) +
          arguments.getInteger( 1 );
        

Note that ordinal numbers, not field names, are used here to get argument values. If field names had been used, the AddValuesFunction would have been coupled to the incoming stream.

Example 5.3. Add Values Function and Context

public class EfficientAddValuesFunction
  extends BaseOperation<Tuple> implements Function<Tuple>
  {
  public EfficientAddValuesFunction()
    {
    // expects 2 arguments, fail otherwise
    super( 2, new Fields( "sum" ) );
    }

  public EfficientAddValuesFunction( Fields fieldDeclaration )
    {
    // expects 2 arguments, fail otherwise
    super( 2, fieldDeclaration );
    }

  @Override
  public void prepare( FlowProcess flowProcess, OperationCall<Tuple> call )
    {
    // create a reusable Tuple of size 1
    call.setContext( Tuple.size( 1 ) );
    }

  public void operate( FlowProcess flowProcess, FunctionCall<Tuple> call )
    {
    // get the arguments TupleEntry
    TupleEntry arguments = call.getArguments();

    // get our previously created Tuple
    Tuple result = call.getContext();

    // sum the two arguments
    int sum = arguments.getInteger( 0 ) + arguments.getInteger( 1 );

    // set the sum value on the result Tuple
    result.set( 0, sum );

    // return the result Tuple
    call.getOutputCollector().add( result );
    }

  @Override
  public void cleanup( FlowProcess flowProcess, OperationCall<Tuple> call )
    {
    call.setContext( null );
    }
  }

This example, a minor variation on the previous one, introduces the use of a "context" object and prepare() and cleanup() methods.

All Operations allow for a context object, simply a user-defined object that holds state between calls to the operate() method. This allows for a given instance of the Operation to be thread safe on a platform that may use multiple threads of execution versus multiple processes. It also allows deferring initialization of complex resources until the Operation is engaged.

The prepare() and cleanup() methods are invoked once per thread of execution, and in the case of the Hadoop platform, only on the cluster side, never on the client.

In the above example, a Tuple is used as the context; a more complex type isn't necessary. Also note that the Tuple isn't storing state, but is re-used to reduce the number of new Object instances created. In Cascading, it is perfectly safe to output the same Tuple instance from operate(). The method functionCall.getOutputCollector().add( result ) will not return until the result Tuple has been processed or persisted downstream.

Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.