Java Developers Guide to ETL with Cascading

Part 1: Simple file copy with partitioning

What You Will See

In Part 1 of the tutorial, we will several basic but vital operations involved in an ETL flow:

  • Extracting log (unstructured) data and creating tuples for processing in the Cascading flow

  • Filtering unwanted log data with error codes

  • Storing processed data in a tab-delimited format, partitioned by day (binning)

For the purposes of this tutorial, we will use a sample Apache log file from the NASA archives.

Run and Validate Your Program

Step 1: Compile your program

$ cd etl-log/part1
$ gradle clean jar

Step 2: To run the Cascading ETL flow in Hadoop pseudo-distributed mode, copy the input log file to Hadoop

$ hadoop dfs -mkdir /logs
$ hadoop dfs -put ../data/NASA_access_log_Aug95.txt /logs

Step 3: Run your ETL flow

$ hadoop jar ./build/libs/etl.jar /logs/NASA_access_log_Aug95.txt /output

Step 4: View the execution of your ETL flow in real-time through Driven

Depending on how you configured your Driven plugin, either click the Driven URL from your console or log into the Driven application.

14/08/28 12:01:53 INFO state.AppStats: shutdown hook finished.
14/08/28 12:01:53 INFO rest.DrivenDocumentService: *http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16*
14/08/28 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON

Attached is a live Driven link to execute Part 1 exercise on the Driven cloud service.

etl-part-1

Figure 1: An example of the performance view in Driven with details of a tuple.

We will get additional insights in later parts as we create more complex applications. From the screenshot, you will see two key components as part of the application developer view. The top half will help you visualize the graph associated with your application, showing you all the dependencies between different Cascading steps and flows. Clicking on the two taps (green circles) will give you additional attribute information, including reference to the source code where the Tap was defined.

The bottom half of the screen contains the 'Timeline View', which will give details associated with each flow run. You can click on the 'Add Columns' to explore other counters too. As your applications get more complex, these counters will help you gain insights if a particular run-time behavior is caused by code, the infrastructure, or the network.

Step 5: Validate output data in stored in tab separated format (binned by day)

$ hadoop dfs -ls /output

Step 6: View contents of the tab-separated data

$ hadoop dfs -cat /output/1/part-00000-00000

Step 7: Validate that the contents trapped include log messages with error codes

$ hadoop dfs -cat /output/trap/part-m-00001-00000

What’s Going On?

In the first part, we perform the following tasks in our Cascading application:

Step 1: Initialize the application

Hadoop Flow connectors enable the flow to run on Hadoop. The cascading.flow.hadoop.HadoopFlowConnector provides a planner for running Cascading on an Apache Hadoop 1 cluster; cascading.flow.hadoop.Hadoop2MR1FlowConnector provides a planner for running Cascading on an Apache Hadoop 2 cluster.

Properties properties = new Properties(); +
AppProps.setApplicationJarClass( properties, Main.class ); +
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

Step 2: Setup source and sinks (Cascading Taps)

A Tap models the integration of a Cascading application to a usable data source. A Tap provides your ETL flow the ability to read or write data from multiple external systems. A Tap could represent a data source such as file on a local file system, or as in our tutorial, a file on a Hadoop Distributed File System (HDFS). In addition, Cascading provides four utility taps – MultiSourceTap, MultiSinkTap, PartitionTap, GlobHfs.

  • The cascading.tap.MultiSourceTap is used to tie multiple tap instances into a single tap for use as an input source.

  • The cascading.tap.MultiSinkTap is used to tie multiple tap instances into a single tap for use as output sinks.

  • The cascading.tap.hadoop.PartitionTap is used to sink tuples into directory paths based on the values in the Tuple.

  • The cascading.tap.hadoop.GlobHfs tap accepts Hadoop style "file globbing" expression patterns. This allows for multiple paths to be used as a single source, where all paths match the given pattern.

In addition, Data taps are available for integrating Cascading with several data sources; the full list is available at http://www.cascading.org/extensions/

// Input file +
String inputPath = args[ 0 ];

// Output file +
String outputPath = args[ 1 ];
Hfs inTap = new Hfs( new TextLine(), inputPath );

// Create a sink tap to write to the Hfs; by default, TextDelimited writes all fields out +
Hfs outTap = new Hfs( new TextDelimited( true, "\t" ), outputPath, SinkMode.REPLACE );

Step 3: Parse input data and store them in fields

Pipes define the processing of the data, and control the flow of data applying operations to each tuple or groups of tuples. You can chain piples to form a branch.

Each() operates on each individual tuple. Using the Each() pipe, you can perform operations on individual tuples such as:

  • Parse lines from a logfile into their constituent fields.

  • Filter out all lines based on a line value.


  • Replace timestring fields with date fields.

  • Conditionally replace field values.

  • Removing tuples that have values outside a target range.

  • Specify a list of fields to output, thereby removing unwanted fields from a stream.

// Declare the field names used to parse out of the log file
Fields apacheFields = new Fields( "ip", "time", "request", "response", "size" );

// Define the regular expression used to parse the log file
String apacheRegex = "^([^ ]*) \\S+ \\S+ \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([^ ]*).*$";

// Declare the groups from the above regex. Each group will be given a field name from 'apacheFields'
int[] allGroups = {1, 2, 3, 4, 5};

// Create the parser
RegexParser parser = new RegexParser( apacheFields, apacheRegex, allGroups );

// Create the main import pipe element, and with the input argument named "line"
Pipe processPipe = new Each( "processPipe", new Fields( "line" ), parser, Fields.RESULTS );

Step 4: Trap records based on specific field values

Traps are useful for capturing data that would otherwise have caused an operation to fail by throwing an exception; Failure Traps allow processing to continue without losing track of the data that caused the fault — they are similar to tap sinks in that they allow data to be stored. Traps only capture data that causes an Operation to fail

(i.e. throws an Exception).

Traps are for exceptional cases, in the same way that Java Exception handling is used.

Since traps are an expensive operation, applications that need to filter bad data should do so explicitly using filters. We will be covering how to use filters in Part 2.

//trap for catching 404 messages
AssertExpression assertExp = new AssertExpression( "response != 404", Long.class );
processPipe = new Each( processPipe, AssertionLevel.VALID, assertExp );

Step 5: Store data (partitioned by day)

To store data partitioned by day, we need to first break down the time in data for further analysis

// Applies a text parser to create a timestamp from date and replace date by this timestamp
DateParser dateParser = new DateParser( new Fields( "time" ), "dd/MMM/yyyy:HH:mm:ss Z" );

processPipe = new Each( processPipe, new Fields( "time" ), dateParser, Fields.REPLACE );

// Augment the tuple with day for time
processPipe = new Each( processPipe, new Fields( "time" ), new DayForTimestamp(), Fields.ALL );
processPipe = new GroupBy( processPipe, new Fields( "day" ) );

// Create a TextDelimited scheme +
TextDelimited scheme = new TextDelimited( new Fields( "day", "ip", "time", "request", "size" ), true, "\t" );

// Create DelimitedPartition instance used to partition the files based on Fields("day")
DelimitedPartition partition = new DelimitedPartition( new Fields( "day" ), "-" );

// Create the Partition Tap +
Tap daysTap = new PartitionTap( outTap, partition, SinkMode.REPLACE );
Tap trapTap = new Hfs( new TextDelimited( true, "\t" ), outputPath + "/trap", SinkMode.REPLACE );

Step 6: Create a flow (unit of execution) by connecting taps to pipes

FlowDef is a “fluent way” to define a Flow. A FlowDef manages the names and taps that must be passed to a FlowConnector.

FlowDef flowDef = FlowDef.flowDef()
                          	.setName( "part1" )
                          	.addSource( processPipe, inTap )
                          	.addTailSink( processPipe, daysTap )
                          	.addTrap( "processPipe", trapTap );

// Run the flow +
Flow wcFlow = flowConnector.connect( flowDef );

flowDef.setAssertionLevel( AssertionLevel.VALID );

wcFlow.complete();

References

For more details about the particular operations or to understand how some of these steps can be modified for your use case, use the following resources:

Next