The cascading.operation.Identity
function
is used to "shape" a tuple stream. Here are some common patterns that
illustrate how Cascading "field algebra" works. (Note that, in actual
practice, some of these example tasks might be better performed with
helper subassemblies such as
cascading.pipe.assembly.Rename
,
cascading.pipe.assembly.Retain
, and
cascading.pipe.assembly.Discard
.)
Here Identity passes its arguments out as results, thanks
to the Fields.ARGS
field declaration.
// incoming -> "ip", "time", "method", "event", "status", "size"
Identity identity = new Identity( Fields.ARGS );
Fields ipMethod = new Fields( "ip", "method" );
pipe =
new Each( pipe, ipMethod, identity, Fields.RESULTS );
// outgoing -> "ip", "method"
In practice the field declaration can be left out, as
Field.ARGS
is the default declaration for the
Identity function. And Fields.RESULTs
can be left
off, as it is the default for the Every
pipe. Thus, simpler code yields the same result:
// incoming -> "ip", "time", "method", "event", "status", "size"
pipe = new Each( pipe, new Fields( "ip", "method" ), new Identity() );
// outgoing -> "ip", "method"
Here Identity renames the incoming arguments. Since Fields.RESULTS is implied, the incoming Tuple is replaced by the selected arguments and given new field names as declared on Identity.
// incoming -> "ip", "method"
Identity identity = new Identity( new Fields( "address", "request" ) );
pipe = new Each( pipe, new Fields( "ip", "method" ), identity );
// outgoing -> "address", "request"
In the example above, if there were more fields than "ip" and "method", it would work fine - all the extra fields would be discarded. But if the same were true for the next example, the planner would fail.
// incoming -> "ip", "method"
Identity identity = new Identity( new Fields( "address", "request" ) );
pipe = new Each( pipe, Fields.ALL, identity );
// outgoing -> "address", "request"
Since Fields.ALL
is the default argument
selector for the Each
pipe, it can be
left out as shown below. Again, the above and below examples
will fail unless there are exactly two fields in the tuples of
the incoming stream.
// incoming -> "ip", "method"
Identity identity = new Identity( new Fields( "address", "request" ) );
pipe = new Each( pipe, identity );
// outgoing -> "address", "request"
Here we rename a single field and return it, along with an input Tuple field, as the result. All other fields are dropped.
// incoming -> "ip", "time", "method", "event", "status", "size"
Fields fieldSelector = new Fields( "address", "method" );
Identity identity = new Identity( new Fields( "address" ) );
pipe = new Each( pipe, new Fields( "ip" ), identity, fieldSelector );
// outgoing -> "address", "method"
Here we replace the Tuple String values "status" and
"size" with int
and
long
values, respectively. All other
fields are dropped.
// incoming -> "ip", "time", "method", "event", "status", "size"
Identity identity = new Identity( Integer.TYPE, Long.TYPE );
pipe = new Each( pipe, new Fields( "status", "size" ), identity );
// outgoing -> "status", "size"
Or we can replace just the Tuple String value "status"
with an int
, while keeping all the other
values in the output Tuple.
// incoming -> "ip", "time", "method", "event", "status", "size"
Identity identity = new Identity( Integer.TYPE );
pipe =
new Each( pipe, new Fields( "status" ), identity, Fields.REPLACE );
// outgoing -> "ip", "time", "method", "event", "status", "size"
Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.