Java Developers Guide to ETL with Cascading

Part 5: Advanced Aggregation: Implementing moving averages

What You Will See

Part 5 is an advanced aggregation exercise that performs rolling averages in an ETL flow. There are no new Cascading constructs introduced, rather this exercise illustrates the design pattern to implement rolling averages and other trending computations so often used in data applications.

Run and Validate Your Program

Step 1: Compile your program

$ cd etl-log/part5
$ gradle clean jar

Step 2: If you have not done it already from the Part 1, copy the log file to Hadoop:

$ hadoop dfs -mkdir /logs
$ hadoop dfs -put ../data/NASA_access_log_Aug95.txt /logs

Step 3: Run your ETL flow

$ hadoop jar ./build/libs/etl.jar /logs/NASA_access_log_Aug95.txt /output

Step 4: View the execution of your ETL flow in real-time through Driven

Depending on how you configured your Driven Plugin, either click the Driven URL from your console or log into the Driven application.

14/08/28 12:01:53 INFO state.AppStats: shutdown hook finished.
14/08/28 12:01:53 INFO rest.DrivenDocumentService: http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16
14/08/28 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON

Attached is a live Driven link to execute Part 5 exercise on the Concurrent cloud service.

etl-part-5

Figure 1: An example of the performance view in Driven.

Note

Driven exposes three views of the DAG — Logical, Physical and Contracted. The Logical View lets you inspect and visualize the Tap, Pipe and other Cascading constructs that you specified in your code. With Physical View, you can also inspect intermediate Tap and Pipe Assemblies embedded in your code. In our case, we can see that the Retain function was used in the subassembly.

Step 5: View contents of the result file that contain the moving average by bandwidth size.

$ hadoop fs -cat /output/part-00000

What’s Going On?

Note: The comments in the snippet explain the moving average operation.

// Create a new Each named moving_avgSizeByMinPipe to augment the tuple with hour of the day for time
Pipe moving_avgSizeByMinPipe = new Each( filterResponsePipe, new Fields( "time" ), new DayForTimestamp(), Fields.ALL );

// Calculates moving average 60 mins based on bandwith
// Groups by min, calculates 60 mins moving average on field size
moving_avgSizeByMinPipe = new GroupBy( moving_avgSizeByMinPipe, new Fields( "day" ) );
moving_avgSizeByMinPipe = new Every( moving_avgSizeByMinPipe, new Fields( "size", "time" ),
                                        new MovingAverageBuffer( new Fields( "min", "moving_average" ) ), Fields.RESULTS );

Next