Cascading 3.0 User Guide - Cookbook: Code Examples of Cascading Idioms

Cookbook: Code Examples of Cascading Idioms

This chapter demonstrates some common idioms used in Cascading applications.

Tuples and Fields

Copy a Tuple instance
Tuple original = new Tuple( "john", "doe" );

// call copy constructor
Tuple copy = new Tuple( original );

assert copy.getObject( 0 ).equals( "john" );
assert copy.getObject( 1 ).equals( "doe" );
Nest a Tuple instance within a Tuple
Tuple parent = new Tuple();
parent.add( new Tuple( "john", "doe" ) );

assert ( (Tuple) parent.getObject( 0 ) ).getObject( 0 ).equals( "john" );
assert ( (Tuple) parent.getObject( 0 ) ).getObject( 1 ).equals( "doe" );
Build a longer Fields instance
Fields first = new Fields( "first" );
Fields middle = new Fields( "middle" );
Fields last = new Fields( "last" );

Fields full = first.append( middle ).append( last );
Remove a field from a longer Fields instance
Fields full = new Fields( "first", "middle", "last" );

Fields firstLast = full.subtract( new Fields( "middle" ) );

Stream Shaping

Split (branch) a Tuple stream
Pipe pipe = new Pipe( "head" );
pipe = new Each( pipe, new SomeFunction() );
// ...

// split left with the branch name 'lhs'
Pipe lhs = new Pipe( "lhs", pipe );
lhs = new Each( lhs, new SomeFunction() );
// ...

// split right with the branch name 'rhs'
Pipe rhs = new Pipe( "rhs", pipe );
rhs = new Each( rhs, new SomeFunction() );
// ...
Copy a field value
Fields argument = new Fields( "field" );
Identity identity = new Identity( new Fields( "copy" ) );

// identity copies the incoming argument to the result tuple
pipe = new Each( pipe, argument, identity, Fields.ALL );
Discard (drop) a field
// incoming -> "keepField", "dropField"
pipe = new Discard( pipe, new Fields( "dropField" ) );
// outgoing -> "keepField"
Retain (keep) a field
// incoming -> "keepField", "dropField"
pipe = new Retain( pipe, new Fields( "keepField" ) );
// outgoing -> "keepField"
Rename a field
// a simple SubAssembly that uses Identity internally
pipe = new Rename( pipe, new Fields( "from" ), new Fields( "to" ) );
Coerce field values from Strings to primitives
Fields fields = new Fields( "longField", "booleanField" );
Class types[] = new Class[]{long.class, boolean.class};

// convert to given type
pipe = new Coerce( pipe, fields, types );
Insert constant values into a stream
Fields fields = new Fields( "constant1", "constant2" );
Insert function = new Insert( fields, "value1", "value2" );

pipe = new Each( pipe, function, Fields.ALL );

Common Operations

Parse a String date/time value
// convert string date/time field to a long
// milliseconds "timestamp" value
String format = "yyyy:MM:dd:HH:mm:ss.SSS";
DateParser parser = new DateParser( new Fields( "ts" ), format );

pipe = new Each( pipe, new Fields( "datetime" ), parser, Fields.ALL );
Format a timestamp to a date/time value
// convert a long milliseconds "timestamp" value to a string
String format = "HH:mm:ss.SSS";
DateFormatter formatter =
  new DateFormatter( new Fields( "datetime" ), format );

pipe = new Each( pipe, new Fields( "ts" ), formatter, Fields.ALL );

Stream Ordering

Remove duplicate tuples in a stream
// remove all duplicates from the stream
pipe = new Unique( pipe, Fields.ALL );
Create a list of unique values
// narrow stream to just ips
pipe = new Retain( pipe, new Fields( "ip" ) );
// find all unique 'ip' values
pipe = new Unique( pipe, new Fields( "ip" ) );
Find first occurrence in time of a unique value
// group on all unique 'ip' values
// secondary sort on 'datetime', natural order is in ascending order
pipe = new GroupBy( pipe, new Fields( "ip" ), new Fields( "datetime" ) );
// take the first 'ip' tuple in the group which has the
// oldest 'datetime' value
pipe = new Every( pipe, Fields.ALL, new First(), Fields.RESULTS );

API Usage

Pass properties to a custom Operation
// set property on Flow
Properties properties = new Properties();
properties.put( "key", "value" );
FlowConnector flowConnector = new Hadoop2MR1FlowConnector( properties );
// ...

// get the property from within an Operation (Function, Filter, etc)
String value = (String) flowProcess.getProperty( "key" );
Bind multiple sources and sinks to a Flow
Pipe headLeft = new Pipe( "headLeft" );
// do something interesting

Pipe headRight = new Pipe( "headRight" );
// do something interesting

// merge the two input streams
Pipe merged = new GroupBy( headLeft, headRight, new Fields( "common" ) );
// ...

// branch the merged stream
Pipe tailLeft = new Pipe( "tailLeft", merged );
// filter out values to the left
tailLeft = new Each( tailLeft, new SomeFilter() );

Pipe tailRight = new Pipe( "tailRight", merged );
// filter out values to the right
tailRight = new Each( tailRight, new SomeFilter() );

// source taps
Scheme inLeftScheme =
  new TextDelimited( new Fields( "some-fields" ) );
Tap sourceLeft = new Hfs( inLeftScheme, "some/path" );

Scheme inRightScheme =
  new TextDelimited( new Fields( "some-fields" ) );
Tap sourceRight = new Hfs( inRightScheme, "some/path" );

// sink taps
Scheme outLeftScheme =
  new TextDelimited( new Fields( "some-fields" ) );
Tap sinkLeft = new Hfs( outLeftScheme, "some/path" );

Scheme outRightScheme =
  new TextDelimited( new Fields( "some-fields" ) );
Tap sinkRight = new Hfs( outRightScheme, "some/path" );

FlowDef flowDef = new FlowDef()
  .setName( "flow-name" );

// bind source taps to Pipe heads
flowDef
  .addSource( headLeft, sourceLeft )
  .addSource( headRight, sourceRight );

// bind sink taps to Pipe tails
flowDef
  .addSink( tailLeft, sinkLeft )
  .addTailSink( tailRight, sinkRight );

// ALTERNATIVELY ...

// add named source taps
// the head pipe name to bind to
flowDef
  .addSource( "headLeft", sourceLeft )    // headLeft.getName()
  .addSource( "headRight", sourceRight ); // headRight.getName()

// add named sink taps
flowDef
  .addSink( "tailLeft", sinkLeft )    // tailLeft.getName()
  .addSink( "tailRight", sinkRight ); // tailRight.getName()

// add tails -- heads are reachable from the tails
flowDef
  .addTail( tailLeft )
  .addTail( tailRight );

// set property on Flow
FlowConnector flowConnector = new Hadoop2MR1FlowConnector();

Flow flow = flowConnector.connect( flowDef );