Java Developers Guide to ETL with Cascading

Part 2: Using Filters to extract data patterns

What You Will See

In Part 2 of the tutorial, we will create a simple filter to remove log entries that have match a specific pattern (tuples with no size). Filter are easy to implement and are powerful Cascading constructs, which are used for multiple data patterns to:

  • Separate unwanted data and store it to a different file for separate analysis

  • Implement in-line data quality checks

  • Perform different processing logic based on content (you can use filters to create multiple branches from the incoming pipe, a topic we will cover in the later parts of the tutorial)

Run and Validate Your Program

Step1 : Compile your program

$ cd etl-log/part2
$ gradle clean jar

Step 2: If you have not done it already from the previous part, copy the log file to Hadoop:

$ hadoop dfs -mkdir /logs
$ hadoop dfs -put ../data/NASA_access_log_Aug95.txt /logs

Step 3: Run your ETL flow

$ hadoop jar ./build/libs/etl.jar /logs/NASA_access_log_Aug95.txt /output

Step 4: View the execution of your ETL flow in real-time through Driven

Depending on how you configured your Driven Plugin, either click the Driven URL from your console or log into the Driven application.

14/08/28 12:01:53 INFO state.AppStats: shutdown hook finished. +
14/08/28 12:01:53 INFO rest.DrivenDocumentService: *http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16* +
14/08/28 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON

Attached is a live Driven link to execute Part 2 exercise on the Driven cloud service.

etl-part-2

Figure 1: An example of the performance view in Driven.

  1. The first thing you will see is a graph — Directed Acyclic Graph (DAG) in formal parlance — that shows all the steps in your code, and the dependencies. The circles represent the Tap, and you can now inspect the function, Group by, and the count function used by your code by clicking on each step.

  2. Click on each step of the DAG. You will see additional details about the specific operator, and the reference to the line of the code where the that step was invoked.

  3. The timeline chart visualizes how the application executed in your environment. You can see details about the time taken by the flow to execute, and get additional insights by clicking on "Add Columns" button.

  4. If you executed your application on the Hadoop cluster in a distributed mode, you will get additional insights regarding how your Cascading flows mapped into mappers and reducers. Note, that the 'Performance View' is only available if you ran your application on Hadoop (distributed mode)

  5. In the timeline view, click on the your flow name link. You will see how your application logic got decomposed into mappers and reducers. You can also see the numbers of tasks created for each step, which is important to understanding performance bottlenecks.

     As your applications become more complex, the 'Performance View' becomes seminal in
    understanding the behavior of your application.

If you registered and configured the Driven API key, you will also have an “All Application” view, where we can see all the applications that are running, or have run in the Hadoop cluster for a given user. You can customize the view to display additional attributes such as application name, ID, owner. In addition, you can customize the view to filter the information based on status and dates.

Step 5: View contents of the tab-separated data does not contain any rows with zero size

$ hadoop dfs -cat /output/part-00000

What’s Going On?

We will only cover the parts in the code that are different from the previous section.

A filter is responsible for testing teach tuple to see if it should be removed from the stream. A filter either discards the input tuple or returns it intact.

// We have to filter out the tuples with no size (- instead) such as the one with 404 and 30x response codes +
ExpressionFilter filterResponse =
    new ExpressionFilter("size.equals(\"-\")", String.class);

processPipe = new Each(processPipe, new Fields("size"), filterResponse);

References

For more details about the particular operations or to understand how some of these steps can be modified for your use case, use the following resources:

Next