Cascading 3.0 User Guide - Diving into the APIs
Diving into the APIs
Anatomy of a Word-Count Application
The most common example presented to new developers is an application that counts words. It is the data processing equivalent to a "Hello World" application.
In a word-counting application, a document is parsed into individual words and the frequency (count) of each word is calculated. In the last paragraph, for example, "is" appears twice and "equivalent" appears once.
The following code example uses the default Cascading API to read each line of text from a document file, parse it into words, and then count the number of times each word appears.
// define source and sink Taps.
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath ); (1)
// the 'head' of the pipe assembly
Pipe assembly = new Pipe( "wordcount" );
// For each input Tuple
// parse out each word into a new Tuple with the field name "word"
// regular expressions are optional in Cascading
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function ); (2)
// group the Tuple stream by the "word" value
assembly = new GroupBy( assembly, new Fields( "word" ) ); (3)
// For every Tuple group
// count the number of occurrences of "word" and store result in
// a field named "count"
Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count ); (4)
Scheme sinkScheme = new TextDelimited( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE ); (5)
// initialize app properties, tell Hadoop which jar file to use
Properties properties = AppProps.appProps() (6)
.setName( "word-count-application" )
.setJarClass( Main.class )
.buildProperties();
// plan a new Flow from the assembly using the source and sink Taps
// with the above properties
FlowConnector flowConnector = new Hadoop2MR1FlowConnector( properties ); (7)
Flow flow = flowConnector.connect( "word-count", source, sink, assembly ); (8)
// execute the flow, block until complete
flow.complete(); (9)
1 | Read each line of text from a file and give it the field name "line" |
2 | Parse each "line" into words with the RegexGenerator object, which returns each word in the field named "word" |
3 | Sort and group all the tuples on the "word" field, using the GroupBy object |
4 | Count the number of elements in each group, using the Count object, and store this value in the "count" field |
5 | Write out the "word" and "count" fields |
6 | Set application-specific metadata to allow the application to run |
7 | Choose the platform for execution |
8 | Plan the unit of work (a Flow) to be executed |
9 | Start the flow and wait until it is completed |
Several features of this example are worth highlighting:
-
The pipe assembly is not coupled to the data (i.e., the Tap instances) until the last moment before execution. File paths or references are not embedded in the pipe assembly. Instead, the pipe assembly is specified independent of data inputs and outputs. The only dependency is the data scheme (i.e., the field names).
-
In Cascading, every input or output file has field names associated with its contents, and every processing element of the pipe assembly either expects the specified fields from upstream or creates them. This allows developers to easily self-document their code. The Cascading planner "fails fast" if an expected dependency between elements is not satisfied — for instance, if a needed field name is missing or incorrect.
-
Pipe assemblies are assembled through constructor chaining. This may seem odd, but it is done for two reasons. First, constructor chaining allows each object to be immutable. Second, it prevents developers from creating "cycles" (i.e., recursive loops) in the resulting pipe assembly. Recursive loops hinder the generation of insightful directed acyclic graphs (DAGs) from pipe assemblies. (If looping processes are desired, there are safer approaches to achieving this result.)
-
The very first Pipe instance has a name. That instance is the head of this particular pipe assembly. Pipe assemblies can have any number of heads, and any number of tails. Although the tail in this example does not have an explicit name, in a more complex assembly it would (since this assembly is a single branch, the tail inherits the head name).
Heads and tails of pipe assemblies are assigned names to disambiguate them. One reason is that names are used to bind sources and sinks to pipes during planning. (The example above is an exception, because there is only one head and one tail — and consequently only one source and one sink—so the binding is unmistakable.) Another reason is that the naming of pipes contributes to self-documentation of pipe assemblies, especially where there are splits, joins, and merges in the assembly.
Fluid: An Alternative Fluent API
The above example is using the default "raw" Cascading API. This API is handy when creating very complex business logic, or creating frameworks that may need to algorithmically generate Cascading assemblies. Lingual, an ANSI SQL layer over Cascading, leverages this API.
An alternative API is called Fluid. Currently Fluid is maintained in a different project, but there are plans to make it available as part of all future Cascading releases. The goal is to provide a build plugin that generates fluent APIs from any custom code that is run in the build.
Example 2 shows how the same word-count application that appeared in Example 1 could be coded with the Fluid fluent API (with all comments removed).
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath ); (1)
Pipe assembly = Fluid.assembly()
.startBranch( "wordcount" )
.each( new Fields( "line" ) ) (2)
.function(
Fluid.function()
.RegexGenerator()
.fieldDeclaration( new Fields( "word" ) )
.patternString( "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)" ).end()
)
.outgoing( Fields.RESULTS )
.groupBy( new Fields( "word" ) ) (3)
.every( Fields.ALL ) (4)
.aggregator(
Fluid.aggregator().Count( new Fields( "count" ) )
)
.outgoing( Fields.ALL )
.completeGroupBy()
.completeBranch();
Scheme sinkScheme = new TextDelimited( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE ); (5)
Properties properties = AppProps.appProps() (6)
.setName( "word-count-application" )
.setJarClass( Main.class )
.buildProperties();
FlowConnector flowConnector = new Hadoop2MR1FlowConnector( properties ); (7)
Flow flow = flowConnector.connect( "word-count", source, sink, assembly ); (8)
flow.complete(); (9)
1 | Read each line of text from a file and give it the field name "line" |
2 | Parse each "line" into words with the RegexGenerator object, which returns each word in the field named "word" |
3 | Sort and group all the tuples on the "word" field, using the GroupBy object |
4 | Count the number of elements in each group, using the Count object, and store this value in the "count" field |
5 | Write out the "word" and "count" fields |
6 | Set application-specific metadata to allow the application to run |
7 | Choose the platform for execution |
8 | Plan the unit of work (a Flow) to be executed |
9 | Start the flow and wait until it is completed |
The remainder of this Cascading User Guide focuses on the default Cascading Java API introduced in Example 1.