Cascading 3.2 User Guide - Cookbook: Code Examples of Cascading Idioms
- 1. Introduction
-
1.1. What Is Cascading?
1.2. Another Perspective
1.3. Why Use Cascading?
1.5. Who Are the Users?
- 2. Diving into the APIs
- 3. Cascading Basic Concepts
-
3.1. Terminology
3.2. Pipe Assemblies
3.3. Pipes
3.4. Platforms
3.6. Sink Modes
3.7. Flows
- 4. Tuple Fields
-
4.1. Field Sets
4.2. Field Algebra
4.3. Field Typing
4.4. Type Coercion
- 5. Pipe Assemblies
-
5.1. Each and Every Pipes
5.2. Merge
5.3. GroupBy
5.4. CoGroup
5.5. HashJoin
- 6. Flows
-
6.1. Creating Flows from Pipe Assemblies
6.2. Configuring Flows
6.3. Skipping Flows
6.6. Runtime Metrics
- 7. Cascades
-
7.1. Creating a Cascade
- 8. Configuring
-
8.1. Introduction
8.2. Creating Properties
8.3. Passing Properties
- 9. Local Platform
-
9.3. Source and Sink Taps
- 10. The Apache Hadoop Platforms
-
10.1. What is Apache Hadoop?
10.4. Configuring Applications
10.5. Building an Application
10.6. Executing an Application
10.8. Source and Sink Taps
10.9. Custom Taps and Schemes
- 11. Apache Hadoop MapReduce Platform
-
11.1. Configuring Applications
11.3. Building
- 12. Apache Tez Platform
-
12.1. Configuring Applications
12.2. Building
- 13. Using and Developing Operations
-
13.1. Introduction
13.2. Functions
13.3. Filters
13.4. Aggregators
13.5. Buffers
- 14. Custom Taps and Schemes
-
14.1. Introduction
14.2. Custom Taps
14.3. Custom Schemes
14.5. Tap Life-Cycle Methods
- 15. Advanced Processing
-
15.1. SubAssemblies
15.2. Stream Assertions
15.3. Failure Traps
15.4. Checkpointing
15.7. PartitionTaps
- 16. Built-In Operations
-
16.1. Identity Function
16.2. Debug Function
16.4. Insert Function
16.5. Text Functions
16.8. XML Operations
16.9. Assertions
16.10. Logical Filter Operators
16.11. Buffers
- 17. Built-in SubAssemblies
-
17.1. Optimized Aggregations
17.2. Stream Shaping
- 18. Cascading Best Practices
-
18.1. Unit Testing
18.2. Flow Granularity
18.7. Optimizing Joins
18.8. Debugging Streams
18.11. Fields Constants
18.12. Checking the Source Code
- 19. Extending Cascading
-
19.1. Scripting
- 20. Cookbook: Code Examples of Cascading Idioms
-
20.1. Tuples and Fields
20.2. Stream Shaping
20.3. Common Operations
20.4. Stream Ordering
20.5. API Usage
- 21. The Cascading Process Planner
-
21.1. FlowConnector
21.2. RuleRegistrySet
21.3. RuleRegistry
Cookbook: Code Examples of Cascading Idioms
This chapter demonstrates some common idioms used in Cascading applications.
Tuples and Fields
- Copy a Tuple instance
Tuple original = new Tuple( "john", "doe" );
// call copy constructor
Tuple copy = new Tuple( original );
assert copy.getObject( 0 ).equals( "john" );
assert copy.getObject( 1 ).equals( "doe" );
- Nest a Tuple instance within a Tuple
Tuple parent = new Tuple();
parent.add( new Tuple( "john", "doe" ) );
assert ( (Tuple) parent.getObject( 0 ) ).getObject( 0 ).equals( "john" );
assert ( (Tuple) parent.getObject( 0 ) ).getObject( 1 ).equals( "doe" );
- Build a longer Fields instance
Fields first = new Fields( "first" );
Fields middle = new Fields( "middle" );
Fields last = new Fields( "last" );
Fields full = first.append( middle ).append( last );
- Remove a field from a longer Fields instance
Fields full = new Fields( "first", "middle", "last" );
Fields firstLast = full.subtract( new Fields( "middle" ) );
Stream Shaping
- Split (branch) a Tuple stream
Pipe pipe = new Pipe( "head" );
pipe = new Each( pipe, new SomeFunction() );
// ...
// split left with the branch name 'lhs'
Pipe lhs = new Pipe( "lhs", pipe );
lhs = new Each( lhs, new SomeFunction() );
// ...
// split right with the branch name 'rhs'
Pipe rhs = new Pipe( "rhs", pipe );
rhs = new Each( rhs, new SomeFunction() );
// ...
- Copy a field value
Fields argument = new Fields( "field" );
Identity identity = new Identity( new Fields( "copy" ) );
// identity copies the incoming argument to the result tuple
pipe = new Each( pipe, argument, identity, Fields.ALL );
- Discard (drop) a field
// incoming -> "keepField", "dropField"
pipe = new Discard( pipe, new Fields( "dropField" ) );
// outgoing -> "keepField"
- Retain (keep) a field
// incoming -> "keepField", "dropField"
pipe = new Retain( pipe, new Fields( "keepField" ) );
// outgoing -> "keepField"
- Rename a field
// a simple SubAssembly that uses Identity internally
pipe = new Rename( pipe, new Fields( "from" ), new Fields( "to" ) );
- Coerce field values from Strings to primitives
Fields fields = new Fields( "longField", "booleanField" );
Class types[] = new Class[]{long.class, boolean.class};
// convert to given type
pipe = new Coerce( pipe, fields, types );
- Insert constant values into a stream
Fields fields = new Fields( "constant1", "constant2" );
Insert function = new Insert( fields, "value1", "value2" );
pipe = new Each( pipe, function, Fields.ALL );
Common Operations
- Parse a String date/time value
// convert string date/time field to a long
// milliseconds "timestamp" value
String format = "yyyy:MM:dd:HH:mm:ss.SSS";
DateParser parser = new DateParser( new Fields( "ts" ), format );
pipe = new Each( pipe, new Fields( "datetime" ), parser, Fields.ALL );
- Format a timestamp to a date/time value
// convert a long milliseconds "timestamp" value to a string
String format = "HH:mm:ss.SSS";
DateFormatter formatter =
new DateFormatter( new Fields( "datetime" ), format );
pipe = new Each( pipe, new Fields( "ts" ), formatter, Fields.ALL );
Stream Ordering
- Remove duplicate tuples in a stream
// remove all duplicates from the stream
pipe = new Unique( pipe, Fields.ALL );
- Create a list of unique values
// narrow stream to just ips
pipe = new Retain( pipe, new Fields( "ip" ) );
// find all unique 'ip' values
pipe = new Unique( pipe, new Fields( "ip" ) );
- Find first occurrence in time of a unique value
// group on all unique 'ip' values
// secondary sort on 'datetime', natural order is in ascending order
pipe = new GroupBy( pipe, new Fields( "ip" ), new Fields( "datetime" ) );
// take the first 'ip' tuple in the group which has the
// oldest 'datetime' value
pipe = new Every( pipe, Fields.ALL, new First(), Fields.RESULTS );
API Usage
- Pass properties to a custom Operation
// set property on Flow
Properties properties = new Properties();
properties.put( "key", "value" );
FlowConnector flowConnector = new Hadoop2MR1FlowConnector( properties );
// ...
// get the property from within an Operation (Function, Filter, etc)
String value = (String) flowProcess.getProperty( "key" );
- Bind multiple sources and sinks to a Flow
Pipe headLeft = new Pipe( "headLeft" );
// do something interesting
Pipe headRight = new Pipe( "headRight" );
// do something interesting
// merge the two input streams
Pipe merged = new GroupBy( headLeft, headRight, new Fields( "common" ) );
// ...
// branch the merged stream
Pipe tailLeft = new Pipe( "tailLeft", merged );
// filter out values to the left
tailLeft = new Each( tailLeft, new SomeFilter() );
Pipe tailRight = new Pipe( "tailRight", merged );
// filter out values to the right
tailRight = new Each( tailRight, new SomeFilter() );
// source taps
Scheme inLeftScheme =
new TextDelimited( new Fields( "some-fields" ) );
Tap sourceLeft = new Hfs( inLeftScheme, "some/path" );
Scheme inRightScheme =
new TextDelimited( new Fields( "some-fields" ) );
Tap sourceRight = new Hfs( inRightScheme, "some/path" );
// sink taps
Scheme outLeftScheme =
new TextDelimited( new Fields( "some-fields" ) );
Tap sinkLeft = new Hfs( outLeftScheme, "some/path" );
Scheme outRightScheme =
new TextDelimited( new Fields( "some-fields" ) );
Tap sinkRight = new Hfs( outRightScheme, "some/path" );
FlowDef flowDef = new FlowDef()
.setName( "flow-name" );
// bind source taps to Pipe heads
flowDef
.addSource( headLeft, sourceLeft )
.addSource( headRight, sourceRight );
// bind sink taps to Pipe tails
flowDef
.addSink( tailLeft, sinkLeft )
.addTailSink( tailRight, sinkRight );
// ALTERNATIVELY ...
// add named source taps
// the head pipe name to bind to
flowDef
.addSource( "headLeft", sourceLeft ) // headLeft.getName()
.addSource( "headRight", sourceRight ); // headRight.getName()
// add named sink taps
flowDef
.addSink( "tailLeft", sinkLeft ) // tailLeft.getName()
.addSink( "tailRight", sinkRight ); // tailRight.getName()
// add tails -- heads are reachable from the tails
flowDef
.addTail( tailLeft )
.addTail( tailRight );
// set property on Flow
FlowConnector flowConnector = new Hadoop2MR1FlowConnector();
Flow flow = flowConnector.connect( flowDef );