Java Developers Guide to ETL with Cascading

Part 6: Implementing a Branch & a Join

What You Will See

In Part 6 of the tutorial, we will demonstrate a design pattern that branches a pipe and performs joins. For that, we will create two branches using Cascading HashJoin to join them.

  • The first branch uses filtering to find the users who have accessed a given page.

  • The second branch creates a subset of users and associates a randomly generated number.

  • Next, the code performs a HashJoin between the two branches.

  • Finally, we sort on score and filter any users with a score less than 60.

Run and Validate Your Program

Step 1: Compile your program

$ cd etl-log/part6
$ gradle clean jar

Step 2: If you have not done it already from the Part 1, copy the log file to Hadoop:

$ hadoop dfs -mkdir /logs
$ hadoop dfs -put ../data/NASA_access_log_Aug95.txt /logs

Step 3: Run your ETL flow

$ hadoop jar ./build/libs/etl.jar /logs/NASA_access_log_Aug95.txt /output

Step 4: View the execution of your ETL flow in real-time through Driven

Depending on how you configured your Driven Plugin, either click the Driven URL from your console or log into the Driven application.

14/08/28 12:01:53 INFO state.AppStats: shutdown hook finished.
14/08/28 12:01:53 INFO rest.DrivenDocumentService: http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16
14/08/28 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON

Attached is a live Driven link to execute Part 6 exercise on the Concurrent cloud service.

etl-part-6

Figure 1: An example of the performance view in Driven.

Note

If you registered at http://driven.io and installed the Driven API key, you will have accces to the “All Applications” view that tracks all your historical application runs. This view starts becoming interesting over a period of time when you want to track trending, identify outlier behavior, or monitor applications based on their termination status

Open your Driven-enabled app to track the progress of your application in real-time. Make sure that you have set the Refresh feature to ON. By default, the Driven updates the visualization every 30 seconds.

Note

Driven lets you visually track the progress of your application in real-time. This feature comes in very handy to sanity-check the progress of large, complex jobs. In addition, as the data applications get complex, the graph is an excellent way to review the architecture for your data-driven application. Examples of quick checks that can be conducted include ensuring that much of filtering of data pipes is done ahead of a join, establishing points where checkpoints have to be introduced, validating that the business requirements are aligned with the actual implementation of the data transformation function…

Now, it gets interesting to start exploring the application in the Driven Performance View. You can observe the intermediate Taps being created in each step. As applications get more complex, or the data sets become larger, the performance view becomes very important to understand how your code steps get decomposed into Mappers and Reducers, the cost associated with such steps (execution time), helping address such questions as, "how much did the join cost me?"

Step 5: View contents of the result file that contain the sorted score of users with a score greater than 60

$ hadoop fs -cat /output/part-00000

What’s Going On?

We will only cover the parts in the code that are different from the previous section.

Step 1: Create two branches from the pipe. From first branch, filter the lines with "GET /images/". In the second branch, add score field to the tuple stream.

// 1st branch: filter IPs which have accessed GET /images/
Pipe filteredPipe = new Pipe( "filteredPipe", transformPipe );
filteredPipe = new Each( filteredPipe, new Fields( "request" ), new RegexFilter( "GET /images/" ) );
filteredPipe = new Retain( filteredPipe, new Fields( "ip" ) );
filteredPipe = new Unique( filteredPipe, new Fields( "ip" ) );

// 2nd branch: add a generated score
Pipe userPipe = new Pipe( "userPipe", transformPipe );

userPipe = new Rename( userPipe, new Fields( "ip" ), new Fields( "userip" ) );

// Add a field score generated from userip using hash function
userPipe = new Each( userPipe, new Fields( "userip" ), new ScoreNumber( new Fields( "score" ) ), Fields.ALL );

Step 2: Filter out scores less than 60

cascading.operation.expression.ExpressionFilter evaluates a Boolean expression, assigning argument Tuple values to variables in the expression. If the expression returns true, the Tuple is removed from the stream. ExpressionFilter coerces the value into the specified type if necessary to make the comparison.

// Filter out IPs with score lower than 60, (hint: ExpressionFilter can be used)
ExpressionFilter filterScore = new ExpressionFilter( "score < 60", Integer.TYPE );
userPipe = new Each( userPipe, new Fields( "score" ), filterScore );

Step 3: Add checkpoint after filter and before HashJoin

A Checkpoint forces intermediate data to be written to HDFS. By default, the Checkpoint is anonymous, and is cleaned up immediately after the flow completes. Checkpoints are typically used to:

  • Debug complex flows

  • Restart partially-failed flows - When using a Checkpoint pipe in a flow and the flow fails, a future execution of the flow will be restarted after the last successful FlowStep writing to a checkpoint file

  • Optimize HashJoins - Frequently the HashJoin is fed a filtered down stream of tuples from what was originally a very large file. To prevent the large file from being replicated throughout a cluster, use a Checkpoint at the point where the data has been filtered to its smallest but before it is streamed into a HashJoin

Checkpoint userPipeCheckpoint = new Checkpoint( "checkpoint", userPipe ) ;

Step 4: Perform HashJoin

HashJoin performs a join on two or more streams based on join fields. The output is the combined tuple joined on field(s) that contain fields from all of the input streams. HashJoin are optimized for joining one or more small streams to one large stream. They attempt to keep the entire right-hand stream in memory for rapid comparison. This construct is very useful when implementing joins with lookup tables (which can be stored in memory).

Often, applications attempt to reduce the size of the stream (by applying filters) so that they can attempt to use HashJoin by placing the reduced stream in RHS. In this case, ensure that you apply a Checkpoint after the completion of the filtering and before commencing on performing a join.

// We will now join both branches with a HashJoin on IP address, creating "userip" and using InnerJoin
Pipe join = new HashJoin( filteredPipe, new Fields( "ip" ), userPipe, new Fields( "userip" ), new InnerJoin() );

References

For more details about the particular operations or to understand how some of these steps can be modified for your use case, use the following resources: