Cascading for the Impatient, Part 2
Part 2 - Word count
In our first installment of this series we showed how to create the simplest possible Cascading application. If you have not read that yet, its probably best to start there.
Today’s lesson takes the same app and stretches it a bit further. Undoubtedly you have seen Word Count before. We’d feel remiss if we did not provide a Word Count example. It is the ``Hello World'' of MapReduce apps. Fortunately, this code is one of the basic steps toward developing a implementation of a word scoring algorithm such as TF-IDF. How convenient. We will also show how to use Cascading to generate a visualization of your MapReduce app.
Theory
Our example code in Part 1 of this series, we showed how to move data from point A to point B. It was simply a distributed file copy - loading data via distributed tasks, an instance of the ``L'' in ETL.
That may seem overly simple, and it may seem like Cascading is overkill for that kind of work. However, moving important data from point A to point B reliably can be a crucial job to perform. This helps illustrate one of the key reasons to use Cascading.
Let’s use an analogy of building a small ferris wheel. With a little bit of imagination and some background in welding, a person could cobble together one using old bicycles parts. In fact, those DIY ferris wheels show up at events such as Maker Faire. Starting out, a person might construct a little ferris wheel, just for demo. It might not hold anything larger than hamsters, but it’s not a hard problem. With a bit more skill, a person could probably build a somewhat larger instance, one that’s big enough for small children to ride.
Let me ask this: how robust would a DIY ferris wheel need to be before you let your kids ride on it? That’s precisely part of the challenge at an event like Maker Faire. Makers must be able to build a device such as a ferris wheel out of spare bicycle parts which is robust enough that strangers will let their kids ride. Let’s hope those welds were made using best practices and good materials, to avoid catastrophes.
That’s a key reason why Cascading was created. When you need to move a few GB from point A to point B, it’s probably simple enough to write a Bash script, or just use a single command line copy. When your work requires some reshaping of the data, then a few lines of Python will probably work fine. Run that Python code from your Bash script and you are done. I have used that approach many times, when it fit the use case requirements. However, suppose you are not moving just GB around? Suppose you’re moving TB, or PB? Bash scripts won’t get you very far. Also think about this: suppose your app not only needs to move data from point A to point B, but it must run within the constraints of an enterprise IT shop. Millions of dollars and potentially even some jobs ride on the fact that your app performs correctly. Day in and day out. That’s not unlike strangers trusting a ferris wheel; they want to make sure it was not just built out of spare bicycle parts by some amateur welder. Robustness is key.
Or taking this analogy a few steps in another interesting direction… Perhaps you are not only moving data and reshaping it a little, but you are applying some interesting machine learning algorithms, some natural language processing, resequencing genes… who knows. Those imply lots of resource use, lots of potential expense in case of failures. Or lots of customer exposure. You will want to use an application framework which is significantly more robust than a bunch of scripts cobbled together.
With Cascading, you can package your entire MapReduce application, including its orchestration and testing, within a single JAR file. You define all of that within the context of one programming language - whether that language may be Java, Scala, Clojure, Python, Ruby, etc. That way your tests are included within a single program, not spread across several scripts written in different languages. Having a single JAR file define the app also allows for typical tooling required in enterprise IT: unit tests, stream assertions, revision control, continuous integration, Maven repos, role-based configuration management, advanced schedulers, monitoring and notifications, etc.
Those are key reasons why we make Cascading, why people use it for robust MapReduce apps which run at scale.
Meanwhile, a conceptual diagram for this implementation of Word Count in Cascading is shown as:
Source
The code of this example is in the part2
sub-directory of the github
repository.
The input data stays the same as in the part 1 of the series.
Note that the names of the taps have changed. Instead of inTap
and outTap
,
we’re now using docTap
and wcTap
. We’ll be adding more taps, so this will help
to have more descriptive names. Makes it simpler to follow all the plumbing.
Previously we defined a simple pipe to connect the taps. This example shows a more complex pipe. We use a generator inside of an Each, to split the document text into a token stream. We use a regex to split on word boundaries:
Fields token = new Fields( "token" );
Fields text = new Fields( "text" );
RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );
// only returns "token"
Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS );
Out of that pipe, we’ll get a tuple stream of token
to feed into the count. You
can change the regex to handle more complex cases of splitting tokens — without
having to rewrite a different generator class.
Next, we use a GroupBy to count the occurrences of each token:
Pipe wcPipe = new Pipe( "wc", docPipe );
wcPipe = new GroupBy( wcPipe, token );
wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );
From that pipe, we’ll have a resulting tuple stream of token and count for the output. So we connect up the plumbing with a FlowDef:
FlowDef flowDef = FlowDef.flowDef().setName( "wc" )
.addSource( docPipe, docTap )
.addTailSink( wcPipe, wcTap );
Finally, we generate a DOT file, to depict the Cascading flow graphically. You can load the DOT file into OmniGraffle or Visio. Those diagrams are really helpful for troubleshooting MapReduce workflows in Cascading:
Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.writeDOT( "dot/wc.dot" );
wcFlow.complete();
Place those source lines all into a Main
method, then build a JAR file. You
should be good to go.
If you want to read in more detail about the classes in the Cascading API which User Guide and JavaDoc. were used, see the Cascading
Build
To build the sample app from the command line use:
gradle clean jar
Run
Clear the output directory (Apache Hadoop insists, if you’re running in standalone mode) and run the app:
rm -rf output hadoop jar ./build/libs/impatient.jar data/rain.txt output/rain
Notice how those command line arguments align with args[] in the source. The
file data/rain.txt
gets copied, TSV row by TSV row. Output text gets stored in
the partition file output/rain
which you can then verify:
more output/rain/part-00000
Here’s a log file from our run of the sample app, part 2. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascading-user email forum.
So that is our Word Count example. Twenty lines of yummy goodness.
Driven
Now, we will start viewing some interesting details and getting some application insights through Driven.
If you have not installed the Driven plugin, you can still explore how Driven will visualize Part 2 by visiting this link
You can inspect the application run either by following the URL provided in system command, or visiting http://trial.driven.io if your registered your key.
-
The first thing you will see is a graph — Directed Acyclic Graph (DAG) in formal parlance — that shows all the steps in your code, and the dependencies. The circles represent the Tap, and you can now inspect the function, Group by, and the count function used by your code by clicking on each step.
-
Click on each step of the DAG. You will see additional details about the specific operator, and the reference to the line of the code where the that step was invoked.
-
The timeline chart visualizes how the application executed in your environment. You can see details about the time taken by the flow to execute, and get additional insights by clicking on "Add Columns" button.
-
If you executed your application on the Hadoop cluster in a distributed mode, you will get additional insights regarding how your Cascading flows mapped into mappers and reducers. Note, that the 'Performance View' is only available if you ran your application on Hadoop (distributed mode)
-
In the timeline view, click on the your flow name link ("wc"). You will see how your application logic got decomposed into the mapper (regex function and Group By), and what part of your application logic became part of the Reducer (of course, the count). You can also see how many slices were created for the shard.
As your applications become more complex, the 'Performance View' becomes seminal in understanding the behavior of your application.
If you registered and configured the Driven API key, you will also have an “All Application” view, where we can see all the applications that are running, or have run in the Hadoop cluster for a given user. You can customize the view to display additional attributes such as application name, ID, owner. In addition, you can customize the view to filter the information based on status and dates.
To understand how best to understand the timing counters, read Understanding Timing Counters
Next
Learn how to write custom Operations to clean your data in Part 3 of Cascading for the Impatient.