Cascading for the Impatient, Part 3

Part 3 - Scrubbing

In the previous installement we have implemented the famous Word Count as a Cascading application.

This part takes the same app and stretches it even more. We’ll show how to write a custom Operation. Again, this code is leading toward an implementation of TF-IDF in Cascading. We’ll show best practices for workflow orchestration and test-driven development (TDD) at scale.


This example in Part 3 uses a custom operation in Cascading to ``scrub'' the token stream, prior to counting the tokens. Previously we used a RegexSplitGenerator to tokenize the text, which is a built-in operation and works pretty well. However, one thing you’ll find in working with most any text analytics at scale is that there are lots and lots of edge cases. Cleaning up the edge cases generally represents the bulk of the engineering work in text analytics. Hyphens, exponents, different kinds of quotes, etc. If you try to incorporate every possible edge case into a regex, that tends to becomes complex and brittle. Identifying edge cases is an iterative process, based on learnings over time, based on operations at scale. Also, each application will tend to have its own nuances, which makes it difficult to leverage standard libraries for text processing.

So we subclass the Operation class in Cascading and work through all the edge cases, adding more as they get identified. An added benefit is that we can also add unit tests to our custom class, with test coverage increasing as each new issue gets found. More about unit tests later.

Operations get used in Cascading to perform the T'' part of ETL. In other words, operations act on the data to transform the tuple stream, filter it, analyze it, etc. Think what roles command line utilities such as grep or awk perform in Unix shell scripts. Cascading provides a rich library of standard operations, to encode the business logic'' of transforming Big Data''. It is relatively simple to develop your own custom operations, and our text scrubbing'' example here shows a good use case. However, if you find yourself starting to develop _lots_ of custom operations every time you begin to write a Cascading app, that’s a good indication that you need to step back and reevaluate. On one hand, the standard operations have been developed over the years and they tend to cover a large class of MapReduce applications. On the other hand, if you are not careful while defining a custom operation, you may inadvertently introduce a bottleneck. The standard operations encapsulate best practices and design patterns for parallelism. Something to think about.

Meanwhile, a conceptual diagram for this implementation of Word Count in Cascading is shown as:



You can find the code of this part in the part3 subdirectory. The input data stays the same as in the earlier code

First, let’s make room for using our custom operation ScrubFunction to clean up the token stream. We place it into the docPipe assembly, immediately after the regex which splits the raw text:

Fields scrubArguments = new Fields( "doc_id", "token" );
docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS );

Next, we need to define a constructor for our custom operation:

public ScrubFunction( Fields fieldDeclaration )
  super( 2, fieldDeclaration );

The fieldDeclaration parameter allows us to name the tuple fields from the flow. Since we’ll output the results as TSV, a header will be created from the tuple fields.

Next, we define an operate method. In other words, we define the function which operates on the tuple stream. This is essentially a wrapper which pulls tuples from the input stream, applies our scrubText method to each token, then inserts new tuples into the output stream:

public void operate( FlowProcess flowProcess, FunctionCall functionCall )
  TupleEntry argument = functionCall.getArguments();
  String doc_id = argument.getString( 0 );
  String token = scrubText( argument.getString( 1 ) );

  if( token.length() > 0 )
    Tuple result = new Tuple();
    result.add( doc_id );
    result.add( token );
    functionCall.getOutputCollector().add( result );

Last but not least, we define the scrubText method to clean up tokens. This version is relatively simple, and in practice it would have many more cases. It’s also relatively simple to write unit tests against:

public String scrubText( String text )
  return text.trim().toLowerCase();

Place that first set of source lines all into a Main method, create an additional class for ScrubFunction for the rest of the source shown, then build a JAR file. You should be good to go.

The diagram for the Cascading flow will be in the dot/ subdirectory after the app runs. Here we have annotated it to show where the mapper and reducer phases are running, and also the section which was added since Part 2:



To build the sample app from the command line use:

gradle clean jar


Run the app like this:

rm -rf output
hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc


Explore with Driven how your application got composed into Cascading.

If you did not install the Driven plugin, you can still explore Part 3 through Driven by link


If you ran your application on a distributed Hadoop cluster, your flow name ("wc") in the timeline view will have a link into how the steps got composed into Mappers and Reducers.


Click on the step link ("(1/1) output/wc") in the Performance view if you want to explore how your application executed from the Hadoop job dashboard.


Driven exposes three views of the DAG — Logical, Physical and Contracted. The Logical View lets you inspect and visualize the Tap, Pipe and other Cascading constructs that you specified in your code. With Physical View, you can also inspect intermediate Tap and Pipe Assemblies embedded in your code. In our case, we can see that the Retain function was used in the subassembly.

The bottom half of the screen contains the 'Timeline View', which will give details associated with each flow run. You can click on the 'Add Columns' to visualize other signals too.

To understand how best to understand the timing counters, read Understanding Timing Counters

Output text gets stored in the partition file output/wc1 which you can then verify:

more output/wc/part-00000

Here’s a log file from our run of the sample app, part 3. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascading-user email forum.


In Part 4 of Cascading for the Impatient you will learn how to implement a stop word filter.