Cascading 3.0 User Guide - Cookbook: Code Examples of Cascading Idioms
Cookbook: Code Examples of Cascading Idioms
This chapter demonstrates some common idioms used in Cascading applications.
Tuples and Fields
- Copy a Tuple instance
Tuple original = new Tuple( "john", "doe" );
// call copy constructor
Tuple copy = new Tuple( original );
assert copy.getObject( 0 ).equals( "john" );
assert copy.getObject( 1 ).equals( "doe" );
- Nest a Tuple instance within a Tuple
Tuple parent = new Tuple();
parent.add( new Tuple( "john", "doe" ) );
assert ( (Tuple) parent.getObject( 0 ) ).getObject( 0 ).equals( "john" );
assert ( (Tuple) parent.getObject( 0 ) ).getObject( 1 ).equals( "doe" );
- Build a longer Fields instance
Fields first = new Fields( "first" );
Fields middle = new Fields( "middle" );
Fields last = new Fields( "last" );
Fields full = first.append( middle ).append( last );
- Remove a field from a longer Fields instance
Fields full = new Fields( "first", "middle", "last" );
Fields firstLast = full.subtract( new Fields( "middle" ) );
Stream Shaping
- Split (branch) a Tuple stream
Pipe pipe = new Pipe( "head" );
pipe = new Each( pipe, new SomeFunction() );
// ...
// split left with the branch name 'lhs'
Pipe lhs = new Pipe( "lhs", pipe );
lhs = new Each( lhs, new SomeFunction() );
// ...
// split right with the branch name 'rhs'
Pipe rhs = new Pipe( "rhs", pipe );
rhs = new Each( rhs, new SomeFunction() );
// ...
- Copy a field value
Fields argument = new Fields( "field" );
Identity identity = new Identity( new Fields( "copy" ) );
// identity copies the incoming argument to the result tuple
pipe = new Each( pipe, argument, identity, Fields.ALL );
- Discard (drop) a field
// incoming -> "keepField", "dropField"
pipe = new Discard( pipe, new Fields( "dropField" ) );
// outgoing -> "keepField"
- Retain (keep) a field
// incoming -> "keepField", "dropField"
pipe = new Retain( pipe, new Fields( "keepField" ) );
// outgoing -> "keepField"
- Rename a field
// a simple SubAssembly that uses Identity internally
pipe = new Rename( pipe, new Fields( "from" ), new Fields( "to" ) );
- Coerce field values from Strings to primitives
Fields fields = new Fields( "longField", "booleanField" );
Class types[] = new Class[]{long.class, boolean.class};
// convert to given type
pipe = new Coerce( pipe, fields, types );
- Insert constant values into a stream
Fields fields = new Fields( "constant1", "constant2" );
Insert function = new Insert( fields, "value1", "value2" );
pipe = new Each( pipe, function, Fields.ALL );
Common Operations
- Parse a String date/time value
// convert string date/time field to a long
// milliseconds "timestamp" value
String format = "yyyy:MM:dd:HH:mm:ss.SSS";
DateParser parser = new DateParser( new Fields( "ts" ), format );
pipe = new Each( pipe, new Fields( "datetime" ), parser, Fields.ALL );
- Format a timestamp to a date/time value
// convert a long milliseconds "timestamp" value to a string
String format = "HH:mm:ss.SSS";
DateFormatter formatter =
new DateFormatter( new Fields( "datetime" ), format );
pipe = new Each( pipe, new Fields( "ts" ), formatter, Fields.ALL );
Stream Ordering
- Remove duplicate tuples in a stream
// remove all duplicates from the stream
pipe = new Unique( pipe, Fields.ALL );
- Create a list of unique values
// narrow stream to just ips
pipe = new Retain( pipe, new Fields( "ip" ) );
// find all unique 'ip' values
pipe = new Unique( pipe, new Fields( "ip" ) );
- Find first occurrence in time of a unique value
// group on all unique 'ip' values
// secondary sort on 'datetime', natural order is in ascending order
pipe = new GroupBy( pipe, new Fields( "ip" ), new Fields( "datetime" ) );
// take the first 'ip' tuple in the group which has the
// oldest 'datetime' value
pipe = new Every( pipe, Fields.ALL, new First(), Fields.RESULTS );
API Usage
- Pass properties to a custom Operation
// set property on Flow
Properties properties = new Properties();
properties.put( "key", "value" );
FlowConnector flowConnector = new Hadoop2MR1FlowConnector( properties );
// ...
// get the property from within an Operation (Function, Filter, etc)
String value = (String) flowProcess.getProperty( "key" );
- Bind multiple sources and sinks to a Flow
Pipe headLeft = new Pipe( "headLeft" );
// do something interesting
Pipe headRight = new Pipe( "headRight" );
// do something interesting
// merge the two input streams
Pipe merged = new GroupBy( headLeft, headRight, new Fields( "common" ) );
// ...
// branch the merged stream
Pipe tailLeft = new Pipe( "tailLeft", merged );
// filter out values to the left
tailLeft = new Each( tailLeft, new SomeFilter() );
Pipe tailRight = new Pipe( "tailRight", merged );
// filter out values to the right
tailRight = new Each( tailRight, new SomeFilter() );
// source taps
Scheme inLeftScheme =
new TextDelimited( new Fields( "some-fields" ) );
Tap sourceLeft = new Hfs( inLeftScheme, "some/path" );
Scheme inRightScheme =
new TextDelimited( new Fields( "some-fields" ) );
Tap sourceRight = new Hfs( inRightScheme, "some/path" );
// sink taps
Scheme outLeftScheme =
new TextDelimited( new Fields( "some-fields" ) );
Tap sinkLeft = new Hfs( outLeftScheme, "some/path" );
Scheme outRightScheme =
new TextDelimited( new Fields( "some-fields" ) );
Tap sinkRight = new Hfs( outRightScheme, "some/path" );
FlowDef flowDef = new FlowDef()
.setName( "flow-name" );
// bind source taps to Pipe heads
flowDef
.addSource( headLeft, sourceLeft )
.addSource( headRight, sourceRight );
// bind sink taps to Pipe tails
flowDef
.addSink( tailLeft, sinkLeft )
.addTailSink( tailRight, sinkRight );
// ALTERNATIVELY ...
// add named source taps
// the head pipe name to bind to
flowDef
.addSource( "headLeft", sourceLeft ) // headLeft.getName()
.addSource( "headRight", sourceRight ); // headRight.getName()
// add named sink taps
flowDef
.addSink( "tailLeft", sinkLeft ) // tailLeft.getName()
.addSink( "tailRight", sinkRight ); // tailRight.getName()
// add tails -- heads are reachable from the tails
flowDef
.addTail( tailLeft )
.addTail( tailRight );
// set property on Flow
FlowConnector flowConnector = new Hadoop2MR1FlowConnector();
Flow flow = flowConnector.connect( flowDef );