The cascading.operation.Identify
function
is used to "shape" a tuple stream. Here are some common patterns.
Here Identity passes its arguments out as results, thanks
to the Fields.ARGS
field declaration.
// incoming -> "ip", "time", "method", "event", "status", "size" Identity identity = new Identity( Fields.ARGS ); pipe = new Each( pipe, new Fields( "ip", "method" ), identity, Fields.RESULTS ); // outgoing -> "ip", "method"
In practice the field declaration can be left out as
Field.ARGS
is the default declaration for the
Identity function. Additionally Fields.RESULTs
can
be left off as it is the default for the
Every
pipe.
// incoming -> "ip", "time", "method", "event", "status", "size" pipe = new Each( pipe, new Fields( "ip", "method" ), new Identity() ); // outgoing -> "ip", "method"
Here Identity renames the incoming arguments. Since Fields.RESULTS is implied, the incoming Tuple is replaced by the arguments selected and given new field names as declared on Identity.
// incoming -> "ip", "method" Identity identity = new Identity( new Fields( "address", "request" ) ); pipe = new Each( pipe, new Fields( "ip", "method" ), identity ); // outgoing -> "address", "request"
In the above example, if there were more fields than "ip" and "method", it would work fine, all the extra fields would be discarded. If the same was true for the next example, the planner would fail.
// incoming -> "ip", "method" Identity identity = new Identity( new Fields( "address", "request" ) ); pipe = new Each( pipe, Fields.ALL, identity ); // outgoing -> "address", "request"
Since Fields.ALL
is the default argument
selector for the Each
pipe, it can be
left out.
// incoming -> "ip", "method" Identity identity = new Identity( new Fields( "address", "request" ) ); pipe = new Each( pipe, identity ); // outgoing -> "address", "request"
Here we rename a single field, but return it along with an input Tuple field as the result.
// incoming -> "ip", "time", "method", "event", "status", "size" Fields fieldSelector = new Fields( "address", "method" ); Identity identity = new Identity( new Fields( "address" ) ); pipe = new Each( pipe, new Fields( "ip" ), identity, fieldSelector ); // outgoing -> "address", "method"
Here we replace the Tuple String values "status" and
"size" with int
and
long
, respectively.
// incoming -> "ip", "time", "method", "event", "status", "size" Identity identity = new Identity( Integer.TYPE, Long.TYPE ); pipe = new Each( pipe, new Fields( "status", "size" ), identity ); // outgoing -> "status", "size"
Or we can replace just the Tuple String value "status"
with int
while keeping all the other
values in the output Tuple.
// incoming -> "ip", "time", "method", "event", "status", "size" Identity identity = new Identity( Integer.TYPE ); pipe = new Each( pipe, new Fields( "status" ), identity, Fields.REPLACE ); // outgoing -> "ip", "time", "method", "event", "status", "size"
Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.