Java Developers Guide to ETL with Cascading

Part 4: Implement Count and “Top 10” aggregation function

What you will see

In Part 4 of the tutorial, we will count the number of occurrences of IP addresses, and report the top 10 IP addresses. In order to perform this operation, we will first group and sort the records (similar to previous exercise). Then, apply the 'Count' function and retain the top 10 records using the Limit function.

Run and Validate Your Program

Step 1: Compile your program

$ cd etl-log/part4
$ gradle clean jar

Step 2: If you have not done it already from the Part 1, copy the log file to Hadoop:

$ hadoop dfs -mkdir /logs
$ hadoop dfs -put ../data/NASA_access_log_Aug95.txt /logs

Step 3: Run your ETL flow

$ hadoop jar ./build/libs/etl.jar /logs/NASA_access_log_Aug95.txt /output

Step 4: View the execution of your ETL flow in real-time through Driven

Depending on how you configured your Driven Plugin, either click the Driven URL from your console or log into the Driven application.

14/08/28 12:01:53 INFO state.AppStats: shutdown hook finished.
14/08/28 12:01:53 INFO rest.DrivenDocumentService: http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16
14/08/28 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON

Attached is a live Driven link to execute Part 4 exercise on the Concurrent cloud service.

etl-part-4

Figure 1: An example of the performance view in Driven.

Step 5: View contents of the result file that contain the top 10 commonly referred IP addresses in the input file

$ hadoop fs -cat /output/part-00000

What’s Going On?

We will only cover the parts in the code that are different from the previous section.

// Count occurences of each IP, and store them in field IPcount
Pipe countByIpPipe = new Pipe( "countByIpPipe", transformPipe );
countByIpPipe = new GroupBy( countByIpPipe, new Fields( "ip" ) );
countByIpPipe = new Every( countByIpPipe, Fields.GROUP, new Count( new Fields( "IPcount" ) ), Fields.ALL );

// Sort by IPcount
Pipe sortedCountByIpPipe = new Pipe( "sortedCountByIpPipe", countByIpPipe );
sortedCountByIpPipe = new GroupBy( sortedCountByIpPipe, new Fields( "IPcount" ), true );

// Create a sortedCountByIpPipe using Each and Retain to limit to top 10 results
sortedCountByIpPipe = new Each( sortedCountByIpPipe, new Fields( "IPcount" ), new Limit( 10 ) );

References

For more details about the particular operations or to understand how some of these steps can be modified for your use case, use the following resources:

Next