Cascading for the Impatient, Part 3
Part 3 - Scrubbing
In the previous installement we have implemented the famous Word Count as a Cascading application.
This part takes the same app and stretches it even more. We’ll show how to write a custom Operation. Again, this code is leading toward an implementation of TF-IDF in Cascading. We’ll show best practices for workflow orchestration and test-driven development (TDD) at scale.
Theory
This example in Part 3 uses a custom operation in Cascading to ``scrub'' the token stream, prior to counting the tokens. Previously we used a RegexSplitGenerator to tokenize the text, which is a built-in operation and works pretty well. However, one thing you’ll find in working with most any text analytics at scale is that there are lots and lots of edge cases. Cleaning up the edge cases generally represents the bulk of the engineering work in text analytics. Hyphens, exponents, different kinds of quotes, etc. If you try to incorporate every possible edge case into a regex, that tends to becomes complex and brittle. Identifying edge cases is an iterative process, based on learnings over time, based on operations at scale. Also, each application will tend to have its own nuances, which makes it difficult to leverage standard libraries for text processing.
So we subclass the Operation class in Cascading and work through all the edge cases, adding more as they get identified. An added benefit is that we can also add unit tests to our custom class, with test coverage increasing as each new issue gets found. More about unit tests later.
Operations get used in Cascading to perform the T'' part of
ETL. In other words,
operations act on the data to transform the tuple stream, filter it, analyze it,
etc. Think what roles command line utilities such as
business logic'' of transforming grep
or awk
perform in Unix
shell scripts. Cascading provides a rich library of standard operations, to
encode the Big Data''. It is relatively simple
to develop your own custom operations, and our text
scrubbing'' example here
shows a good use case. However, if you find yourself starting to develop _lots_ of
custom operations every time you begin to write a Cascading app, that’s a good
indication that you need to step back and reevaluate. On one hand, the standard
operations have been developed over the years and they tend to cover a large
class of MapReduce applications. On the other hand, if you are not careful while
defining a custom operation, you may inadvertently introduce a bottleneck. The
standard operations encapsulate best practices and design patterns for
parallelism. Something to think about.
Meanwhile, a conceptual diagram for this implementation of Word Count in Cascading is shown as:
Source
You can find the code of this part in the part3
subdirectory. The input data
stays the same as in the earlier code
First, let’s make room for using our custom operation ScrubFunction
to clean up
the token stream. We place it into the docPipe
assembly, immediately after the
regex which splits the raw text:
Fields scrubArguments = new Fields( "doc_id", "token" );
docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS );
Next, we need to define a constructor for our custom operation:
public ScrubFunction( Fields fieldDeclaration )
{
super( 2, fieldDeclaration );
}
The fieldDeclaration
parameter allows us to name the tuple fields from the
flow. Since we’ll output the results as TSV, a header will be created from the
tuple fields.
Next, we define an operate
method. In other words, we define the function which
operates on the tuple stream. This is essentially a wrapper which pulls tuples
from the input stream, applies our scrubText method to each token, then inserts
new tuples into the output stream:
public void operate( FlowProcess flowProcess, FunctionCall functionCall )
{
TupleEntry argument = functionCall.getArguments();
String doc_id = argument.getString( 0 );
String token = scrubText( argument.getString( 1 ) );
if( token.length() > 0 )
{
Tuple result = new Tuple();
result.add( doc_id );
result.add( token );
functionCall.getOutputCollector().add( result );
}
}
Last but not least, we define the scrubText
method to clean up tokens. This
version is relatively simple, and in practice it would have many more cases.
It’s also relatively simple to write unit tests against:
public String scrubText( String text )
{
return text.trim().toLowerCase();
}
Place that first set of source lines all into a Main
method, create an
additional class for ScrubFunction
for the rest of the source shown, then build
a JAR file. You should be good to go.
The diagram for the Cascading flow will be in the dot/
subdirectory after the
app runs. Here we have annotated it to show where the mapper and reducer phases
are running, and also the section which was added since Part 2:
Build
To build the sample app from the command line use:
gradle clean jar
Run
Run the app like this:
rm -rf output hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc
Driven
Explore with Driven how your application got composed into Cascading.
If you did not install the Driven plugin, you can still explore Part 3 through Driven by link
If you ran your application on a distributed Hadoop cluster, your flow name ("wc") in the timeline view will have a link into how the steps got composed into Mappers and Reducers.
Click on the step link ("(1/1) output/wc") in the Performance view if you want to explore how your application executed from the Hadoop job dashboard.
Note
|
Driven exposes three views of the DAG — Logical, Physical and Contracted. The Logical View lets you inspect and visualize the Tap, Pipe and other Cascading constructs that you specified in your code. With Physical View, you can also inspect intermediate Tap and Pipe Assemblies embedded in your code. In our case, we can see that the Retain function was used in the subassembly. |
The bottom half of the screen contains the 'Timeline View', which will give details associated with each flow run. You can click on the 'Add Columns' to visualize other signals too.
To understand how best to understand the timing counters, read Understanding Timing Counters
Output text gets stored in the partition file output/wc1
which you can then
verify:
more output/wc/part-00000
Here’s a log file from our run of the sample app, part 3. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascading-user email forum.
Next
In Part 4 of Cascading for the Impatient you will learn how to implement a stop word filter.