A Function
expects a stream of individual
argument Tuples, and returns zero or more result Tuples for each of
them. Like a Filter
, a
Function
is used with an
Each
pipe, which may follow any pipe type.
To create a custom Function
, subclass the
class cascading.operation.BaseOperation
and implement the
interface cascading.operation.Function
. Since the
BaseOperation
has been subclassed, the operate
method, as defined on the Function
interface, is the only
method that must be implemented.
Example 5.1. Custom Function
public class SomeFunction extends BaseOperation implements Function
{
public void operate( FlowProcess flowProcess, FunctionCall functionCall )
{
// get the arguments TupleEntry
TupleEntry arguments = functionCall.getArguments();
// create a Tuple to hold our result values
Tuple result = new Tuple();
// insert some values into the result Tuple
// return the result Tuple
functionCall.getOutputCollector().add( result );
}
}
Whenever possible, functions should declare both the number of argument values they expect and the field names of the Tuple they return. However, these declarations are optional, as explained below.
For input, functions must accept one or more values in a Tuple as
arguments. If not specified, the default is to accept any number of
values (Operation.ANY
). Cascading verifies during planning
that the number of arguments selected matches the number of arguments
expected.
For output, it's a good practice to declare the field names that a
function returns. If not specified, the default is
Fields.UNKNOWN
, meaning that an unknown number of fields
are returned in each Tuple.
Both declarations - the number of input arguments and declared
result fields - must be done on the constructor, either by passing
default values to the super
constructor, or by accepting
the values from the user via a constructor implementation.
Example 5.2. Add Values Function
public class AddValuesFunction extends BaseOperation implements Function
{
public AddValuesFunction()
{
// expects 2 arguments, fail otherwise
super( 2, new Fields( "sum" ) );
}
public AddValuesFunction( Fields fieldDeclaration )
{
// expects 2 arguments, fail otherwise
super( 2, fieldDeclaration );
}
public void operate( FlowProcess flowProcess, FunctionCall functionCall )
{
// get the arguments TupleEntry
TupleEntry arguments = functionCall.getArguments();
// create a Tuple to hold our result values
Tuple result = new Tuple();
// sum the two arguments
int sum = arguments.getInteger( 0 ) + arguments.getInteger( 1 );
// add the sum value to the result Tuple
result.add( sum );
// return the result Tuple
functionCall.getOutputCollector().add( result );
}
}
The example above implements a Function
that accepts two values in the argument Tuple, adds them together, and
returns the result in a new Tuple.
The first constructor above assumes a default field name for the
field that this Function
returns. In practice,
it's good to give the user the option of overriding the declared field
names, allowing them to prevent possible field name collisions that
might cause the planner to fail.
This line is especially important:
int sum = arguments.getInteger( 0 ) + arguments.getInteger( 1 );
Note that ordinal numbers, not field names, are used here to get argument values. If field names had been used, the AddValuesFunction would have been coupled to the incoming stream.
Example 5.3. Add Values Function and Context
public class EfficientAddValuesFunction
extends BaseOperation<Tuple> implements Function<Tuple>
{
public EfficientAddValuesFunction()
{
// expects 2 arguments, fail otherwise
super( 2, new Fields( "sum" ) );
}
public EfficientAddValuesFunction( Fields fieldDeclaration )
{
// expects 2 arguments, fail otherwise
super( 2, fieldDeclaration );
}
@Override
public void prepare( FlowProcess flowProcess, OperationCall<Tuple> call )
{
// create a reusable Tuple of size 1
call.setContext( Tuple.size( 1 ) );
}
public void operate( FlowProcess flowProcess, FunctionCall<Tuple> call )
{
// get the arguments TupleEntry
TupleEntry arguments = call.getArguments();
// get our previously created Tuple
Tuple result = call.getContext();
// sum the two arguments
int sum = arguments.getInteger( 0 ) + arguments.getInteger( 1 );
// set the sum value on the result Tuple
result.set( 0, sum );
// return the result Tuple
call.getOutputCollector().add( result );
}
@Override
public void cleanup( FlowProcess flowProcess, OperationCall<Tuple> call )
{
call.setContext( null );
}
}
This example, a minor variation on the previous one, introduces
the use of a "context" object and prepare()
and
cleanup()
methods.
All Operations allow for a context object, simply a user-defined
object that holds state between calls to the
operate()
method. This allows for a given
instance of the Operation to be thread safe on a platform that may use
multiple threads of execution versus multiple processes. It also allows
deferring initialization of complex resources until the Operation is
engaged.
The prepare()
and
cleanup()
methods are invoked once per thread
of execution, and in the case of the Hadoop platform, only on the
cluster side, never on the client.
In the above example, a Tuple
is used as
the context; a more complex type isn't necessary. Also note that the
Tuple isn't storing state, but is re-used to reduce the number of new
Object instances created. In Cascading, it is perfectly safe to output
the same Tuple instance from operate()
. The
method functionCall.getOutputCollector().add( result )
will
not return until the result Tuple
has been
processed or persisted downstream.
Given a result Tuple instance can be re-used, non-primitive types stored inside the tuple may not be re-used. For example, storing a new Tuple inside the result Tuple. If Cascading needs to cache the result Tuple, it will make a shallow copy of the Tuple, it will not clone/copy any child Tuple instances (or Java Collections or any other mutable Object).
Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.