Cascading for the Impatient, Part 5

Part 5 - Implementing TF-IDF in Cascading

This part builds on the Word Count app from the previous chapter and now implements TF-IDF in Cascading. We’ll show how to use a SumBy and a CoGroup to aggregate the data needed, and then how to use an ExpressionFunction to calculate the TF-IDF weights. We also continue to show best practices for workflow orchestration and test-driven development (TDD) at scale.

Theory

Fortunately, all of the data required to calculate TF-IDF weight was already available in our Word Count example in Part 4. However, we’ll need to revise the overall workflow, adding more pipe assemblies to it.

TF-IDF calculates a metric for each token which indicates how ``important'' that token is to a document within the context of a collection of documents. The metric is calculated based on relative frequencies. On one hand, tokens which appear in most documents tend to have very low TF-IDF weights. On the other hand, tokens which are less common but appear multiple times in a few documents tend to have very high TF-IDF weights. Consequently, the TF-IDF algorithm gets used to drive the indexing in some text search engines, such as Apache Lucene. In particular, TF-IDF provides an effective way to rank documents for a search query. For a good discussion of this in gory detail, see the Similarity class in Lucene.

Note that in the literature, token and term may be used interchangeably for this sample app. More advanced text analytics might look at sequences of words, in which case a term becomes a more complex structure. However, we are only looking at single words.

We will need the following components to calculate TF-IDF:

  • term count: number of times a given term appears in a given document

  • document frequency: how frequently a given term appears across all documents

  • number of terms: total number of terms in a given document

  • document count: total number of documents

Slight modifications to Word Count provides the means to get both term count and document frequency, along with the other two components which get calculated almost as by-products. In this sense, we get to leverage Cascading by re-using the results of some pipes within our workflow. A conceptual diagram for this implementation of TF-IDF in Cascading is shown as:

plumb5

Source

Look for the part5 directory, you will find everything in there.

First, let’s add another sink tap to write the TF-IDF weights as an output data set:

String tfidfPath = args[ 3 ];
Tap tfidfTap = new Hfs( new TextDelimited( true, "\t" ), tfidfPath );

Next we’ll modify the existing pipe assemblies for Word Count, beginning immediately after the ``stop words'' filter. We add the following line to retain only the doc_id and token fields:

tokenPipe = new Retain( tokenPipe, fieldSelector );

Then we re-use the intermediate results from tokenPipe, creating three different branches in the workflow. The first addresses term counts:

// one branch of the flow tallies the token counts for term frequency (TF)
Pipe tfPipe = new Pipe( "TF", tokenPipe );
tfPipe = new GroupBy( tfPipe, new Fields( "doc_id", "token" ) );
Fields tf_count = new Fields( "tf_count" );
tfPipe = new Every( tfPipe, Fields.ALL, new Count( tf_count ), Fields.ALL );
Fields tf_token = new Fields( "tf_token" );
tfPipe = new Rename( tfPipe, token, tf_token );

At that point, we have TF values for each token.

In a second branch we’ll calculate D, the total number of documents in a way which can be consumed later in a join. This uses a built-in partial aggregate operation called SumBy:

// one branch counts the number of documents (D)
Fields doc_id = new Fields( "doc_id" );
Fields tally = new Fields( "tally" );
Fields rhs_join = new Fields( "rhs_join" );
Fields n_docs = new Fields( "n_docs" );
Pipe dPipe = new Unique( "D", tokenPipe, doc_id );
dPipe = new Each( dPipe, new Insert( tally, 1 ), Fields.ALL );
dPipe = new Each( dPipe, new Insert( rhs_join, 1 ), Fields.ALL );
dPipe = new SumBy( dPipe, rhs_join, tally, n_docs, long.class );

This part may seem less than intuitive…​ and it is a bit odd. We need a total document count as a field, in each tuple for the RHS of the join. That keeps our processing parallel, allowing this calculation to scale-out horizontally.

The third branch calculates DF as a step toward inverse document frequency per token:

// one branch tallies the token counts for document frequency (DF)
Pipe dfPipe = new Unique( "DF", tokenPipe, Fields.ALL );
dfPipe = new GroupBy( dfPipe, token );
Fields df_count = new Fields( "df_count" );
Fields df_token = new Fields( "df_token" );
Fields lhs_join = new Fields( "lhs_join" );
dfPipe = new Every( dfPipe, Fields.ALL, new Count( df_count ), Fields.ALL );
dfPipe = new Rename( dfPipe, token, df_token );
dfPipe = new Each( dfPipe, new Insert( lhs_join, 1 ), Fields.ALL );

Now we have all the components needed to calculate TF-IDF weights. We’ll use two kinds of joins - a HashJoin followed by a CoGroup - to merge the three branches together:

// join to bring together all the components for calculating TF-IDF
// the D side of the join is smaller, so it goes on the RHS
Pipe idfPipe = new HashJoin( dfPipe, lhs_join, dPipe, rhs_join );

// the IDF side of the join is smaller, so it goes on the RHS
Pipe tfidfPipe = new CoGroup( tfPipe, tf_token, idfPipe, df_token );

Then we calculate the weights using an ExpressionFunction in Cascading:

// calculate the TF-IDF weights, per token, per document
Fields tfidf = new Fields( "tfidf" );
String expression = "(double) tf_count * Math.log( (double) n_docs / ( 1.0 + df_count ) )";
ExpressionFunction tfidfExpression = new ExpressionFunction( tfidf, expression, Double.class );
Fields tfidfArguments = new Fields( "tf_count", "df_count", "n_docs" );
tfidfPipe = new Each( tfidfPipe, tfidfArguments, tfidfExpression, Fields.ALL );
fieldSelector = new Fields( "tf_token", "doc_id", "tfidf" );
tfidfPipe = new Retain( tfidfPipe, fieldSelector );
tfidfPipe = new Rename( tfidfPipe, tf_token, token );

Now we can get back to the remainder of the workflow. We will keep the actual Word Count metrics, since those are useful for testing:

// keep track of the word counts, which are useful for QA
Pipe wcPipe = new Pipe( "wc", tfPipe );

Fields count = new Fields( "count" );
wcPipe = new SumBy( wcPipe, tf_token, tf_count, count, long.class );
wcPipe = new Rename( wcPipe, tf_token, token );

Last, we will add another sink tap to the FlowDef, to include output data for our TF-IDF weights:

// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
 .setName( "tfidf" )
 .addSource( docPipe, docTap )
 .addSource( stopPipe, stopTap )
 .addTailSink( tfidfPipe, tfidfTap )
 .addTailSink( wcPipe, wcTap );

We’ll change the name of the resulting Flow too, to keep our code properly descriptive:

// write a DOT file and run the flow
Flow tfidfFlow = flowConnector.connect( flowDef );
tfidfFlow.writeDOT( "dot/tfidf.dot" );
tfidfFlow.complete();

Modify the Main method to make those changes, then build a JAR file. You should be good to go. For those keeping score, the resulting physical plan in Cascading for Part 5 now uses eleven mappers and nine reducers. That amount jumped by 5x since our previous example.

The diagram for the Cascading flow will be in the dot/ subdirectory after the app runs. Here we have annotated it to show where the mapper and reducer phases are running, and also the sections which were added since Part 4:

tfidf1

Build

To build the sample app from the command line use, as always:

gradle clean jar

Run

Before running this sample app, you will need to have a supported release of Apache Hadoop installed. Here is what was used to develop and test our example code:

$ hadoop version
Hadoop 2.4.1

To run this incarnation of our app, do this:

rm -rf output
hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc data/en.stop output/tfidf

Driven

Let’s see how Driven helps you visualize this application. Depending upon how you installed the Driven plugin, start your Driven-enabled application.

If you did not install Driven plugin, you can still explore a historical Part 5 run through Driven by visiting this link

Make sure that your refresh is set to ON; observe how Driven renders what part of the code is currently being executed, and what part has already been executed. In addition, from the upper panel, you can track what percentage of your end-end flow is complete.

Alternate between the Logical and the Physical view to explore the intermediate Tap and Pipe subassemblies in the code.

driven-part5

Note

Driven lets you visually track the progress of your application in real-time. While this particular application view is not intended to be a replacement for a more formal operational monitoring view, this feature comes in very handy to sanity-check the progress of large, complex jobs. In addition, as the data applications get complex, the graph is an excellent way to review the architecture for your data-driven application. Examples of quick checks that can be conducted include ensuring that much of filtering of data pipes is done ahead of a join, establishing points where checkpoints have to be introduced, validating that the business requirements are aligned with the actual implementation of the data transformation function

Now, it gets interesting to start exploring the application in the Driven Performance View. You can observe the intermediate Taps being created in each step. As applications get more complex, or the data sets become larger, the performance view becomes very important to understand how your code steps get decomposed into Mappers and Reducers, the cost associated with such steps (execution time), helping address such questions as, "how much did the join cost me?"

driven-part5-b

The bottom half of the screen contains the 'Timeline View', which will give details associated with each flow run. You can click on the 'Add Columns' to visualize other signals too.

To understand how best to understand the timing counters, read Understanding Timing Counters

Output text gets stored in the partition file output/tfidf which you can then verify:

more output/tfidf/part-00000

BTW, did you notice what the TF-IDF weights for the tokens rain and shadow were? Those represent what the documents have in common. How do those compare with weights for the other tokens? Conversely, consider the weights for australia (high weight) or area (different weights).

Here’s a log file from our run of the sample app, part 5. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascading-user email forum.

Next

Part 6 of Cascading for the Impatient explores testing with Cascading.