Cascading 3.2 User Guide - Diving into the APIs
- 1. Introduction
-
1.1. What Is Cascading?
1.2. Another Perspective
1.3. Why Use Cascading?
1.5. Who Are the Users?
- 2. Diving into the APIs
- 3. Cascading Basic Concepts
-
3.1. Terminology
3.2. Pipe Assemblies
3.3. Pipes
3.4. Platforms
3.6. Sink Modes
3.7. Flows
- 4. Tuple Fields
-
4.1. Field Sets
4.2. Field Algebra
4.3. Field Typing
4.4. Type Coercion
- 5. Pipe Assemblies
-
5.1. Each and Every Pipes
5.2. Merge
5.3. GroupBy
5.4. CoGroup
5.5. HashJoin
- 6. Flows
-
6.1. Creating Flows from Pipe Assemblies
6.2. Configuring Flows
6.3. Skipping Flows
6.6. Runtime Metrics
- 7. Cascades
-
7.1. Creating a Cascade
- 8. Configuring
-
8.1. Introduction
8.2. Creating Properties
8.3. Passing Properties
- 9. Local Platform
-
9.3. Source and Sink Taps
- 10. The Apache Hadoop Platforms
-
10.1. What is Apache Hadoop?
10.4. Configuring Applications
10.5. Building an Application
10.6. Executing an Application
10.8. Source and Sink Taps
10.9. Custom Taps and Schemes
- 11. Apache Hadoop MapReduce Platform
-
11.1. Configuring Applications
11.3. Building
- 12. Apache Tez Platform
-
12.1. Configuring Applications
12.2. Building
- 13. Using and Developing Operations
-
13.1. Introduction
13.2. Functions
13.3. Filters
13.4. Aggregators
13.5. Buffers
- 14. Custom Taps and Schemes
-
14.1. Introduction
14.2. Custom Taps
14.3. Custom Schemes
14.5. Tap Life-Cycle Methods
- 15. Advanced Processing
-
15.1. SubAssemblies
15.2. Stream Assertions
15.3. Failure Traps
15.4. Checkpointing
15.7. PartitionTaps
- 16. Built-In Operations
-
16.1. Identity Function
16.2. Debug Function
16.4. Insert Function
16.5. Text Functions
16.8. XML Operations
16.9. Assertions
16.10. Logical Filter Operators
16.11. Buffers
- 17. Built-in SubAssemblies
-
17.1. Optimized Aggregations
17.2. Stream Shaping
- 18. Cascading Best Practices
-
18.1. Unit Testing
18.2. Flow Granularity
18.7. Optimizing Joins
18.8. Debugging Streams
18.11. Fields Constants
18.12. Checking the Source Code
- 19. Extending Cascading
-
19.1. Scripting
- 20. Cookbook: Code Examples of Cascading Idioms
-
20.1. Tuples and Fields
20.2. Stream Shaping
20.3. Common Operations
20.4. Stream Ordering
20.5. API Usage
- 21. The Cascading Process Planner
-
21.1. FlowConnector
21.2. RuleRegistrySet
21.3. RuleRegistry
Diving into the APIs
Anatomy of a Word-Count Application
The most common example presented to new developers is an application that counts words. It is the data processing equivalent to a "Hello World" application.
In a word-counting application, a document is parsed into individual words and the frequency (count) of each word is calculated. In the last paragraph, for example, "is" appears twice and "equivalent" appears once.
The following code example uses the default Cascading API to read each line of text from a document file, parse it into words, and then count the number of times each word appears.
// define source and sink Taps.
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath ); (1)
// the 'head' of the pipe assembly
Pipe assembly = new Pipe( "wordcount" );
// For each input Tuple
// parse out each word into a new Tuple with the field name "word"
// regular expressions are optional in Cascading
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function ); (2)
// group the Tuple stream by the "word" value
assembly = new GroupBy( assembly, new Fields( "word" ) ); (3)
// For every Tuple group
// count the number of occurrences of "word" and store result in
// a field named "count"
Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count ); (4)
Scheme sinkScheme = new TextDelimited( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE ); (5)
// initialize app properties, tell Hadoop which jar file to use
Properties properties = AppProps.appProps() (6)
.setName( "word-count-application" )
.setJarClass( Main.class )
.buildProperties();
// plan a new Flow from the assembly using the source and sink Taps
// with the above properties
FlowConnector flowConnector = new Hadoop2MR1FlowConnector( properties ); (7)
Flow flow = flowConnector.connect( "word-count", source, sink, assembly ); (8)
// execute the flow, block until complete
flow.complete(); (9)
1 | Read each line of text from a file and give it the field name "line" |
2 | Parse each "line" into words with the RegexGenerator object, which returns each word in the field named "word" |
3 | Sort and group all the tuples on the "word" field, using the GroupBy object |
4 | Count the number of elements in each group, using the Count object, and store this value in the "count" field |
5 | Write out the "word" and "count" fields |
6 | Set application-specific metadata to allow the application to run |
7 | Choose the platform for execution |
8 | Plan the unit of work (a Flow) to be executed |
9 | Start the flow and wait until it is completed |
Several features of this example are worth highlighting:
-
The pipe assembly is not coupled to the data (i.e., the Tap instances) until the last moment before execution. File paths or references are not embedded in the pipe assembly. Instead, the pipe assembly is specified independent of data inputs and outputs. The only dependency is the data scheme (i.e., the field names).
-
In Cascading, every input or output file has field names associated with its contents, and every processing element of the pipe assembly either expects the specified fields from upstream or creates them. This allows developers to easily self-document their code. The Cascading planner "fails fast" if an expected dependency between elements is not satisfied — for instance, if a needed field name is missing or incorrect.
-
Pipe assemblies are assembled through constructor chaining. This may seem odd, but it is done for two reasons. First, constructor chaining allows each object to be immutable. Second, it prevents developers from creating "cycles" (i.e., recursive loops) in the resulting pipe assembly. Recursive loops hinder the generation of insightful directed acyclic graphs (DAGs) from pipe assemblies. (If looping processes are desired, there are safer approaches to achieving this result.)
-
The very first Pipe instance has a name. That instance is the head of this particular pipe assembly. Pipe assemblies can have any number of heads, and any number of tails. Although the tail in this example does not have an explicit name, in a more complex assembly it would (since this assembly is a single branch, the tail inherits the head name).
Heads and tails of pipe assemblies are assigned names to disambiguate them. One reason is that names are used to bind sources and sinks to pipes during planning. (The example above is an exception, because there is only one head and one tail — and consequently only one source and one sink—so the binding is unmistakable.) Another reason is that the naming of pipes contributes to self-documentation of pipe assemblies, especially where there are splits, joins, and merges in the assembly.
Fluid: An Alternative Fluent API
The above example is using the default "raw" Cascading API. This API is handy when creating very complex business logic, or creating frameworks that may need to algorithmically generate Cascading assemblies. Lingual, an ANSI SQL layer over Cascading, leverages this API.
An alternative API is called Fluid. Currently Fluid is maintained in a different project, but there are plans to make it available as part of all future Cascading releases. The goal is to provide a build plugin that generates fluent APIs from any custom code that is run in the build.
Example 2 shows how the same word-count application that appeared in Example 1 could be coded with the Fluid fluent API (with all comments removed).
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath ); (1)
Pipe assembly = Fluid.assembly()
.startBranch( "wordcount" )
.each( new Fields( "line" ) ) (2)
.function(
Fluid.function()
.RegexGenerator()
.fieldDeclaration( new Fields( "word" ) )
.patternString( "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)" ).end()
)
.outgoing( Fields.RESULTS )
.groupBy( new Fields( "word" ) ) (3)
.every( Fields.ALL ) (4)
.aggregator(
Fluid.aggregator().Count( new Fields( "count" ) )
)
.outgoing( Fields.ALL )
.completeGroupBy()
.completeBranch();
Scheme sinkScheme = new TextDelimited( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE ); (5)
Properties properties = AppProps.appProps() (6)
.setName( "word-count-application" )
.setJarClass( Main.class )
.buildProperties();
FlowConnector flowConnector = new Hadoop2MR1FlowConnector( properties ); (7)
Flow flow = flowConnector.connect( "word-count", source, sink, assembly ); (8)
flow.complete(); (9)
1 | Read each line of text from a file and give it the field name "line" |
2 | Parse each "line" into words with the RegexGenerator object, which returns each word in the field named "word" |
3 | Sort and group all the tuples on the "word" field, using the GroupBy object |
4 | Count the number of elements in each group, using the Count object, and store this value in the "count" field |
5 | Write out the "word" and "count" fields |
6 | Set application-specific metadata to allow the application to run |
7 | Choose the platform for execution |
8 | Plan the unit of work (a Flow) to be executed |
9 | Start the flow and wait until it is completed |
The remainder of this Cascading User Guide focuses on the default Cascading Java API introduced in Example 1.