Java Developers Guide to ETL with Cascading
Part 5: Advanced Aggregation: Implementing moving averages
What You Will See
Part 5 is an advanced aggregation exercise that performs rolling averages in an ETL flow. There are no new Cascading constructs introduced, rather this exercise illustrates the design pattern to implement rolling averages and other trending computations so often used in data applications.
Run and Validate Your Program
Step 1: Compile your program
$ cd etl-log/part5
$ gradle clean jar
Step 2: If you have not done it already from the Part 1, copy the log file to Hadoop:
$ hadoop dfs -mkdir /logs
$ hadoop dfs -put ../data/NASA_access_log_Aug95.txt /logs
Step 3: Run your ETL flow
$ hadoop jar ./build/libs/etl.jar /logs/NASA_access_log_Aug95.txt /output
Step 4: View the execution of your ETL flow in real-time through Driven
Depending on how you configured your Driven Plugin, either click the Driven URL from your console or log into the Driven application.
14/08/28 12:01:53 INFO state.AppStats: shutdown hook finished.
14/08/28 12:01:53 INFO rest.DrivenDocumentService: http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16
14/08/28 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON
Attached is a live Driven link to execute Part 5 exercise on the Concurrent cloud service.
Figure 1: An example of the performance view in Driven.
Note
|
Driven exposes three views of the DAG — Logical, Physical and Contracted. The Logical View lets you inspect and visualize the Tap, Pipe and other Cascading constructs that you specified in your code. With Physical View, you can also inspect intermediate Tap and Pipe Assemblies embedded in your code. In our case, we can see that the Retain function was used in the subassembly. |
Step 5: View contents of the result file that contain the moving average by bandwidth size.
$ hadoop fs -cat /output/part-00000
What’s Going On?
Note: The comments in the snippet explain the moving average operation.
// Create a new Each named moving_avgSizeByMinPipe to augment the tuple with hour of the day for time
Pipe moving_avgSizeByMinPipe = new Each( filterResponsePipe, new Fields( "time" ), new DayForTimestamp(), Fields.ALL );
// Calculates moving average 60 mins based on bandwith
// Groups by min, calculates 60 mins moving average on field size
moving_avgSizeByMinPipe = new GroupBy( moving_avgSizeByMinPipe, new Fields( "day" ) );
moving_avgSizeByMinPipe = new Every( moving_avgSizeByMinPipe, new Fields( "size", "time" ),
new MovingAverageBuffer( new Fields( "min", "moving_average" ) ), Fields.RESULTS );