Counting words in a document is the most common example presented to new Hadoop (and MapReduce) developers, it is the Hadoop equivalent to the "Hello World" application.
Word counting is where a document is parsed into individual words, and the frequency of those words are counted.
For example, if we counted the last paragraph "is" would be counted twice, and "document" counted once.
In the code example below, we will use Cascading to read each line of text from a file (our document), parse it into words, then count the number of time the word is encountered.
Example 2.1. Word Counting
// define source and sink Taps. Scheme sourceScheme = new TextLine( new Fields( "line" ) ); Tap source = new Hfs( sourceScheme, inputPath ); Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) ); Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE ); // the 'head' of the pipe assembly Pipe assembly = new Pipe( "wordcount" ); // For each input Tuple // parse out each word into a new Tuple with the field name "word" // regular expressions are optional in Cascading String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)"; Function function = new RegexGenerator( new Fields( "word" ), regex ); assembly = new Each( assembly, new Fields( "line" ), function ); // group the Tuple stream by the "word" value assembly = new GroupBy( assembly, new Fields( "word" ) ); // For every Tuple group // count the number of occurrences of "word" and store result in // a field named "count" Aggregator count = new Count( new Fields( "count" ) ); assembly = new Every( assembly, count ); // initialize app properties, tell Hadoop which jar file to use Properties properties = new Properties(); FlowConnector.setApplicationJarClass( properties, Main.class ); // plan a new Flow from the assembly using the source and sink Taps // with the above properties FlowConnector flowConnector = new FlowConnector( properties ); Flow flow = flowConnector.connect( "word-count", source, sink, assembly ); // execute the flow, block until complete flow.complete();
There are a couple things to take away from this example.
First, the pipe assembly is not coupled to the data (the Tap instances) until the last moment before execution. That is, file paths or references are not embedded in the pipe assembly. The pipe assembly remains independent of which data it processes until execution. The only dependency is what the data looks like, its "scheme", or the field names that make it up.
That brings up fields. Every input and output file has field names associated with it, and every processing element of the pipe assembly either expects certain fields, or creates new fields. This allows the developer to self document their code, and allows the Cascading planner to "fail fast" during planning if a dependency between elements isn't satisfied (used a missing or wrong field name).
It is also important to point out that pipe assemblies are assembled through constructor chaining. This may seem odd but is done for two reasons. It keeps the code more concise. And it prevents developers from creating "cycles" in the resulting pipe assembly. Pipe assemblies are Directed Acyclic Graphs (or DAGs). The Cascading planner cannot handle processes that feed themselves, that have cycles (not to say there are ways around this that are much safer).
Notice the very first
Pipe instance has a name. That
instance is the "head" of this particular pipe assembly. Pipe assemblies
can have any number of heads, and any number of tails. This example does
not name the tail assembly, but for complex assemblies, tails must be
named for reasons described below.
Heads and tails of pipe assemblies generally need names, this is how sources and sinks are "bound" to them during planning. In our example above, there is only one head and one tail, and subsequently only one source and one sink, respectively. So naming in this case is optional, it's obvious what goes where. Naming is also useful for self documenting pipe assemblies, especially where there are splits, joins, and merges in the assembly.
To paraphrase, our example will:
read each line of text from a file and give it the field name "line",
parse each "line" into words by the
object which in turn returns each word in the field named
groups on the field named "word" using the
then counts the number of elements in each grouping using the
Count() object and stores this value in the "count"
finally the "word" and "count" fields are written out.
Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.