Cascading for the Impatient, Part 4

Part 4 - Implementing a Stop Word Filter

In the previous installement we showed how to write a custom Operation for a Cascading application. If you have not read that yet, it is probably best to start there.

Today’s lesson takes that same Word Count app and expands on it to implement a stop words filter, which is a list of tokens to nix from the stream. We’ll show how to use HashJoin on two pipes, so we can perform that filtering at scale. Again, this code is leading toward an implementation of TF-IDF in Cascading. We’ll show best practices for workflow orchestration and test-driven development (TDD) at scale.

Theory

The first question to consider is, why do we want to use a stop words list? After all, the TF-IDF algorithm is supposed to filter out the less significant words anyway. Why would we need to include additional filtering if the TF-IDF is implemented correctly?

Use of a stop words list originated in work by Hans Peter Luhn at IBM Research, during the dawn of computing. The reasons for it are two-fold. On one hand, consider that the most common words in any given natural language are generally not useful for text analytics. For example in English, words such as the'', of'', ``and'' are probably not what you want to search, and probably not interesting for Word Count metrics. They represent the long tail of the token distribution: high frequency, low semantic value. Consequently, they cause the bulk of the processing. Natural languages tend to have on the order of 10^5 words, so the potential size of any stop words list is nicely bounded. Filtering those high-frequency words out of the token stream reduces the amount processing required later in the workflow, dramatically.

On the other hand, you may also want to remove some words explicitly from the token stream. This almost always comes up in practice, especially when working with public discussions such as social network comments. Think about it, what are some of the most common words posted online in comments? Words which are not the most common words in 'polite' English? Based on the math for TF-IDF, those would tend to get ranked highest. Do you really want those words to bubble up to the 'most significant' positions in your text analytics? In automated systems which leverage unsupervised learning, this can lead to highly embarrassing situations. Caveat machinator.

Next, let’s consider about working with a Joiner in Cascading. We will have two pipes, one for the 'scrubbed' token stream and another for the stop words list. We want to filter all instances of tokens from the stop words list out of the token stream. If we were not working in MapReduce, a naive approach would simply load the stop words list into a hashtable, then iterate through our token stream to lookup each token in the hashtable and delete it if found. If we were coding in Hadoop directly, a less naive approach would be to put the stop words list into the distributed cache and have a job step which loads it during setup, then rinse/lather/repeat from the naive coding approach described above.

Instead we want to leverage the workflow orchestration in Cascading. One might try to write a custom operation in Cascading, as we did in Part 3 e.g., a custom Filter. That sounds like extra work, plus also extra code to verify and maintain, when the built-in primatives will to tend to be more efficient anyway.

Cascading provides for joins on pipes, and conceptually a Left Outer Join would solve our requirement to filter stop words. Think of joining the token stream with the stop words list. When the result is non-null, the join has identified a stop word. Discard it.

Understand that there’s a big problem with using joins in MapReduce. Outside of the context of a relational database, arbitrary joins do not work efficiently. Suppose you have N items in one tuple stream and M items in another, and want to join them? In the general case, for an arbitrary join, that requires N x M operations and also introduces a data dependeny, such that the join cannot be performed in parallel. If both N and M are relatively large, say in the millions of tuples, then we’d end up processing 10^12 operations on a single processor which kind of defeats the purpose, in terms of leveraging MapReduce.

Fortunately, if some of that data is sparse then we can use specific variants of joins to compute efficiently in parallel. Cascading includes a HashJoin which joins two or more tuple streams into a single stream via a Joiner - when all but one tuple stream are small enough to fit into memory. In other words, given some insights about the ``shape'' of the data, when you have a large data set (non-sparse) you can join with one or more small data sets (sparse) in memory.

A join has a left-hand side (LHS) and a right-hand side (RHS); in Cascading we put the sparser data on the right-hand side. So the HashJoin implements a non-blocking asymmetrical join'' or replicated join'', where the left-most side will not block (accumulate into memory) in order to complete the join, but the right-most sides will.

Recall that stop words lists tend to be bounded at approximately 10^5, which is relatively sparse when compared with an arbitrarily large token stream. In typical ``web scale'' text analytics use cases for TF-IDF, that might be in the range billions of tokens, i.e., several orders of magnitude larger than our largest possible stop words list. Sounds like a great use case for HashJoin.

A conceptual diagram for this implementation of Word Count in Cascading is shown as:

plumb4

Source

You can find all code in the part4 sub-directory. The input data stays the same as in the earlier parts of the series.

This example in Part 4 uses a HashJoin in Cascading to implement a stop words list, filtering some words out of the token stream prior to counting.

First, let’s add another source tap to read the stop words list as an input data set:

String stopPath = args[ 2 ];
Fields stop = new Fields( "stop" );
Tap stopTap = new Hfs( new TextDelimited( stop, true, "\t" ), stopPath );

Next we’ll insert another pipe into the assembly, placing tokenPipe between our 'scrub' and 'count' sections of our workflow. That’s where the HashJoin gets performed, implementing a left join:

// perform a left join to remove stop words, discarding the rows
// which joined with stop words, i.e., were non-null after left join
Pipe stopPipe = new Pipe( "stop" );
Pipe tokenPipe = new HashJoin( docPipe, token, stopPipe, stop, new LeftJoin() );

Next we discard the non-null results from the left join, using a RegexFilter:

tokenPipe = new Each( tokenPipe, stop, new RegexFilter( "^$" ) );

Now this new tokenPipe can be fitted back into the wcPipe which we used before. The workflow continues on much the same from there:

Pipe wcPipe = new Pipe( "wc", tokenPipe );

Last, we’ll add the additional source tap to the FlowDef, to include input data for our stop words list:

// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
 .setName( "wc" )
 .addSource( docPipe, docTap )
 .addSource( stopPipe, stopTap )
 .addTailSink( wcPipe, wcTap );

Modify the Main method to make those changes, then build a JAR file. You should be good to go. For those keeping score, the resulting physical plan in MapReduce for Part 4 still uses one mapper and one reducer.

The diagram for the Cascading flow will be in the dot/ subdirectory after the app runs. Here we have annotated it to show where the mapper and reducer phases are running, and also the section which was added since Part 3:

wc_part4

Build

To build the sample app from the command line use:

gradle clean jar

Run

To run the app, do the following:

rm -rf output
hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc data/en.stop

Driven

Open your Driven-enabled app to track the progress of your application in real-time. Make sure that you have set the Refresh feature to ON. By default, the Driven updates the visualization every 30 seconds.

If you have not installed the Driven plugin, you can still explore Part 4 through Driven by following this link

Note

If you registered at http://driven.io and installed the Driven API key, you will have accces to the 'All Applications' view that tracks all your historical application runs. This view starts becoming interesting over a period of time when you want to track trending, identify outlier behavior, or monitor applications based on their termination status

driven-part4

You can get detailed insights into how your Cascading steps translated into Map Reduce by clicking on your Flow name in the timeline view ("wc").

driven-part4-b

Output text gets stored in the partition file output/wc which you can then verify:

more output/wc/part-00000

Here is a log file from our run of the sample app, part 4. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascading-user email forum.

Next

Part 5 of Cascading for the Impatient implements the TF-IDF with Cascading.