Java Developers Guide to ETL with Cascading
Part 3: Merging multiple data sources in an ETL flow
What You Will See
In Part 3 of the tutorial, we will have a automatically merge record from multiple input files (using 'MultiSourceTap'), and then sort them by IP address.
This capability becomes important when you are implementing the following design patterns:
-
Globbing time-series files stored in different directories (as we did in Part 1 of the tutorial)
-
Importing multiple-files with different schemas to do a Join (covered in Part 6)
Run and Validate Your Program
Step 1: Compile your program
$ cd etl-log/part3
$ gradle clean jar
Step 2: To run the Cascading ETL flow in Hadoop pseudo-distributed mode, copy the input log file to Hadoop
$ hadoop dfs -put ../data/NASA_access_log_Aug95_head_5k.txt /logs
$ hadoop dfs -put ../data/NASA_access_log_Aug95_tail_5k.txt /logs
Step 3: Run your ETL flow
$ hadoop jar ./build/libs/etl.jar /logs/NASA_access_log_Aug95_head_5k.txt /logs/NASA_access_log_Aug95_tail_5k.txt /output
Step 4: View the execution of your ETL flow in real-time through Driven
Depending on how you configured your Driven Plugin, either click the Driven URL from your console or log into the Driven application.
14/08/28 12:01:53 INFO state.AppStats: shutdown hook finished.
14/08/28 12:01:53 INFO rest.DrivenDocumentService: *http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16*
14/08/28 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON
Attached is a live Driven link to execute Part 3 exercise on the Driven cloud service.
Figure 1: An example of the performance view in Driven.
Step 5: View contents of the merged tab-separated data
Validate that the rows are sorted by IP address (in descending order)*
$ hadoop fs -cat /output/part-00000
What’s Going On?
We will only cover the parts in the code that are different from the previous section.
The cascading.tap.hadoop.GlobHfs tap accepts Hadoop style "file globbing" expression patterns — this allows for multiple paths to be used as a single source, where all paths match the given pattern. This tap is only available when running on the Hadoop platform.
GlobHfs inTap1 = new GlobHfs( new TextLine(), inputPath1 );
GlobHfs inTap2 = new GlobHfs( new TextLine(), inputPath2 );
The cascading.tap.MultiSourceTap is used to tie multiple tap instances into a single tap for use as an input source. All of the tap instances passed to the new MultiSourceTap must share the same schema
MultiSourceTap sourceTap = new MultiSourceTap( inTap1, inTap2 );
GroupBy performs a union and an order by. The output is grouped by a provided fields. The last boolean parameters indicates if the stream is to be sorted.
//Sort them by IP address
processPipe = new GroupBy( processPipe, new Fields( "ip" ), true );
References
For more details about the particular operations or to understand how some of these steps can be modified for your use case, use the following resources:
Sorting using GroupBy and CoGroup - http://docs.cascading.org/cascading/3.0/userguide/ch05-pipe-assemblies.html#_groupby