The most common example presented to new Hadoop (and MapReduce) developers is an application that counts words. It is the Hadoop equivalent to a "Hello World" application.
In the word-counting application, a document is parsed into individual words and the frequency of each word is counted. In the last paragraph, for example, "is" appears twice and "equivalent" appears once.
The following code example uses Cascading to read each line of text from our document file, parse it into words, then count the number of times each word appears.
Example 2.1. Word Counting
// define source and sink Taps.
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath );
Scheme sinkScheme = new TextDelimited( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );
// the 'head' of the pipe assembly
Pipe assembly = new Pipe( "wordcount" );
// For each input Tuple
// parse out each word into a new Tuple with the field name "word"
// regular expressions are optional in Cascading
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function );
// group the Tuple stream by the "word" value
assembly = new GroupBy( assembly, new Fields( "word" ) );
// For every Tuple group
// count the number of occurrences of "word" and store result in
// a field named "count"
Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count );
// initialize app properties, tell Hadoop which jar file to use
Properties properties = AppProps.appProps()
.setName( "word-count-application" )
.setJarClass( Main.class )
.buildProperties();
// plan a new Flow from the assembly using the source and sink Taps
// with the above properties
FlowConnector flowConnector = new HadoopFlowConnector( properties );
Flow flow = flowConnector.connect( "word-count", source, sink, assembly );
// execute the flow, block until complete
flow.complete();
Several features of this example are worth highlighting.
First, notice that the pipe assembly is not coupled to the data
(i.e., the Tap
instances) until the last moment
before execution. File paths or references are not embedded in the pipe
assembly; instead, the pipe assembly is specified independent of data
inputs and outputs. The only dependency is the data scheme, i.e., the
field names. In Cascading, every input or output file has field names
associated with it, and every processing element of the pipe assembly
either expects the specified fields or creates them. This allows
developers to easily self-document their code, and allows the Cascading
planner to "fail fast" if an expected dependency between elements isn't
satisfied - for instance, if a needed field name is missing or incorrect.
(If more information is desired on the planner, see MapReduce Job Planner.)
Also notice that pipe assemblies are assembled through constructor chaining. This may seem odd, but it is done for two reasons. First, it keeps the code more concise. Second, it prevents developers from creating "cycles" (i.e., recursive loops) in the resulting pipe assembly. Pipe assemblies are intended to be Directed Acyclic Graphs (DAG's), and in keeping with this, the Cascading planner is not designed to handle processes that feed themselves. (If desired, there are safer approaches to achieving this result.
Finally, notice that the very first Pipe
instance has a
name. That instance is the head of this
particular pipe assembly. Pipe assemblies can have any number of heads,
and any number of tails. Although the
tail in this example does not have a name, in a more complex assembly it
would. In general, heads and tails of pipe assemblies are assigned names
to disambiguate them. One reason is that names are used to bind sources
and sinks to pipes during planning. (The example above is an exception,
because there is only one head and one tail - and consequently only one
source and one sink - so the binding is unmistakable.) Another reason is
that the naming of pipes contributes to self-documentation of pipe
assemblies, especially where there are splits, joins, and merges in the
assembly.
To sum up, the example word-counting application will:
Read each line of text from a file and give it the field name "line"
parse each "line" into words with the
RegexGenerator
object, which returns each word in the
field named "word"
sort and group all the tuples on the "word" field, using the
GroupBy
object
count the number of elements in each group, using the
Count
object, and store this value in the "count"
field
and write out the "word" and "count" fields.
Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.