Java Developers Guide to ETL with Cascading

Part 3: Merging multiple data sources in an ETL flow

What You Will See

In Part 3 of the tutorial, we will have a automatically merge record from multiple input files (using 'MultiSourceTap'), and then sort them by IP address.

This capability becomes important when you are implementing the following design patterns:

  • Globbing time-series files stored in different directories (as we did in Part 1 of the tutorial)

  • Importing multiple-files with different schemas to do a Join (covered in Part 6)

Run and Validate Your Program

Step 1: Compile your program

$ cd etl-log/part3
$ gradle clean jar

Step 2: To run the Cascading ETL flow in Hadoop pseudo-distributed mode, copy the input log file to Hadoop

$ hadoop dfs -put ../data/NASA_access_log_Aug95_head_5k.txt /logs
$ hadoop dfs -put ../data/NASA_access_log_Aug95_tail_5k.txt /logs

Step 3: Run your ETL flow

$ hadoop jar ./build/libs/etl.jar /logs/NASA_access_log_Aug95_head_5k.txt /logs/NASA_access_log_Aug95_tail_5k.txt /output

Step 4: View the execution of your ETL flow in real-time through Driven

Depending on how you configured your Driven Plugin, either click the Driven URL from your console or log into the Driven application.

14/08/28 12:01:53 INFO state.AppStats: shutdown hook finished.
14/08/28 12:01:53 INFO rest.DrivenDocumentService: *http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16*
14/08/28 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON

Attached is a live Driven link to execute Part 3 exercise on the Driven cloud service.

etl-part-3

Figure 1: An example of the performance view in Driven.

Step 5: View contents of the merged tab-separated data

Validate that the rows are sorted by IP address (in descending order)*

$ hadoop fs -cat /output/part-00000

What’s Going On?

We will only cover the parts in the code that are different from the previous section.

The cascading.tap.hadoop.GlobHfs tap accepts Hadoop style "file globbing" expression patterns — this allows for multiple paths to be used as a single source, where all paths match the given pattern. This tap is only available when running on the Hadoop platform.

GlobHfs inTap1 = new GlobHfs( new TextLine(), inputPath1 );
GlobHfs inTap2 = new GlobHfs( new TextLine(), inputPath2 );

The cascading.tap.MultiSourceTap is used to tie multiple tap instances into a single tap for use as an input source. All of the tap instances passed to the new MultiSourceTap must share the same schema

MultiSourceTap sourceTap = new MultiSourceTap( inTap1, inTap2 );

GroupBy performs a union and an order by.
 The output is grouped by a provided fields. The last boolean parameters indicates if the stream is to be sorted.

//Sort them by IP address
processPipe = new GroupBy( processPipe, new Fields( "ip" ), true );

References

For more details about the particular operations or to understand how some of these steps can be modified for your use case, use the following resources:

Next