Java Developers Guide to ETL with Cascading
Part 4: Implement Count and “Top 10” aggregation function
What you will see
In Part 4 of the tutorial, we will count the number of occurrences of IP addresses, and report the top 10 IP addresses. In order to perform this operation, we will first group and sort the records (similar to previous exercise). Then, apply the 'Count' function and retain the top 10 records using the Limit function.
Run and Validate Your Program
Step 1: Compile your program
$ cd etl-log/part4 $ gradle clean jar
Step 2: If you have not done it already from the Part 1, copy the log file to Hadoop:
$ hadoop dfs -mkdir /logs $ hadoop dfs -put ../data/NASA_access_log_Aug95.txt /logs
Step 3: Run your ETL flow
$ hadoop jar ./build/libs/etl.jar /logs/NASA_access_log_Aug95.txt /output
Step 4: View the execution of your ETL flow in real-time through Driven
Depending on how you configured your Driven Plugin, either click the Driven URL from your console or log into the Driven application.
14/08/28 12:01:53 INFO state.AppStats: shutdown hook finished. 14/08/28 12:01:53 INFO rest.DrivenDocumentService: http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16 14/08/28 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON
Attached is a live Driven link to execute Part 4 exercise on the Concurrent cloud service.
Figure 1: An example of the performance view in Driven.
Step 5: View contents of the result file that contain the top 10 commonly referred IP addresses in the input file
$ hadoop fs -cat /output/part-00000
What’s Going On?
We will only cover the parts in the code that are different from the previous section.
// Count occurences of each IP, and store them in field IPcount Pipe countByIpPipe = new Pipe( "countByIpPipe", transformPipe ); countByIpPipe = new GroupBy( countByIpPipe, new Fields( "ip" ) ); countByIpPipe = new Every( countByIpPipe, Fields.GROUP, new Count( new Fields( "IPcount" ) ), Fields.ALL ); // Sort by IPcount Pipe sortedCountByIpPipe = new Pipe( "sortedCountByIpPipe", countByIpPipe ); sortedCountByIpPipe = new GroupBy( sortedCountByIpPipe, new Fields( "IPcount" ), true ); // Create a sortedCountByIpPipe using Each and Retain to limit to top 10 results sortedCountByIpPipe = new Each( sortedCountByIpPipe, new Fields( "IPcount" ), new Limit( 10 ) );
For more details about the particular operations or to understand how some of these steps can be modified for your use case, use the following resources:
Each and Every Pipes: http://docs.cascading.org/cascading/3.0/userguide/ch05-pipe-assemblies.html#each-every