2. Diving In

The most common example presented to new Hadoop (and MapReduce) developers is an application that counts words. It is the Hadoop equivalent to a "Hello World" application.

In the word-counting application, a document is parsed into individual words and the frequency of each word is counted. In the last paragraph, for example, "is" appears twice and "equivalent" appears once.

The following code example uses Cascading to read each line of text from our document file, parse it into words, then count the number of times each word appears.

Example 2.1. Word Counting

// define source and sink Taps.
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath );

Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );

// the 'head' of the pipe assembly
Pipe assembly = new Pipe( "wordcount" );

// For each input Tuple
// parse out each word into a new Tuple with the field name "word"
// regular expressions are optional in Cascading
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function );

// group the Tuple stream by the "word" value
assembly = new GroupBy( assembly, new Fields( "word" ) );

// For every Tuple group
// count the number of occurrences of "word" and store result in
// a field named "count"
Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count );

// initialize app properties, tell Hadoop which jar file to use
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );

// plan a new Flow from the assembly using the source and sink Taps
// with the above properties
FlowConnector flowConnector = new HadoopFlowConnector( properties );
Flow flow = flowConnector.connect( "word-count", source, sink, assembly );

// execute the flow, block until complete
flow.complete();

Several features of this example are worth highlighting.

First, notice that the pipe assembly is not coupled to the data (i.e., the Tap instances) until the last moment before execution. File paths or references are not embedded in the pipe assembly; instead, the pipe assembly is specified independent of data inputs and outputs. The only dependency is the data scheme, i.e., the field names. In Cascading, every input or output file has field names associated with it, and every processing element of the pipe assembly either expects the specified fields or creates them. This allows developers to easily self-document their code, and allows the Cascading planner to "fail fast" if an expected dependency between elements isn't satisfied - for instance, if a needed field name is missing or incorrect. (If more information is desired on the planner, see MapReduce Job Planner.)

Also notice that pipe assemblies are assembled through constructor chaining. This may seem odd, but it is done for two reasons. First, it keeps the code more concise. Second, it prevents developers from creating "cycles" (i.e., recursive loops) in the resulting pipe assembly. Pipe assemblies are intended to be Directed Acyclic Graphs (DAG's), and in keeping with this, the Cascading planner is not designed to handle processes that feed themselves. (If desired, there are safer approaches to achieving this result.

Finally, notice that the very first Pipe instance has a name. That instance is the head of this particular pipe assembly. Pipe assemblies can have any number of heads, and any number of tails. Although the tail in this example does not have a name, in a more complex assembly it would. In general, heads and tails of pipe assemblies are assigned names to disambiguate them. One reason is that names are used to bind sources and sinks to pipes during planning. (The example above is an exception, because there is only one head and one tail - and consequently only one source and one sink - so the binding is unmistakable.) Another reason is that the naming of pipes contributes to self-documention of pipe assemblies, especially where there are splits, joins, and merges in the assembly.

To sum up, the example word-counting application will:

Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.