Data Processing on Amazon Web Services (AWS)

Part 2: Simple ETL using Lingual JDBC with S3, EMR and Redshift

What You Will See

Part 2 of this tutorial has two components, a shell script that manages the compilation, required data transfers and EMR cluster creation, as well as a Java application (part2/src/main/SampleFlow.java) that will be executed on the EMR cluster.

emrExample.sh

When you call the script cascading-aws/part2/src/scripts/emrExample.sh with the required arguments the following will take place:

  1. clean and compile part 2 of the tutorial

  2. use AWS CLI to create new S3 bucket or delete contents if bucket already exists

  3. use AWS CLI to push three data files and compiled jar file to S3 bucket

  4. use AWS CLI to create an EMR cluster while creating the cluster we will also bootstrap the Cascading SDK which includes Cascading Lingual.

SampleFlow.java

When the compiled jar file is being executed on the EMR cluster the following will occur:

  1. Source data files from S3 bucket

  2. Strip extraneous fields from the data using Retain - Cascading Subassemblies

  3. Perform an aggregation and joining operation on the data using SQL via Cascading Lingual-JDBC

  4. Sink results to S3 and Redshift.

Note
Cascading’s RedshiftTap has a constructor argument "boolean useDirectInsert". When true it will perform traditional insert statements to load data to Redshift. When false it will use Redshift’s COPY command to load the data. In part 2 we will demonstate direct insert and in part 3 we will use the COPY command.

Run and Validate Your Program

Step 1: If you have not done so already please refer to the prerequsites section for environment setup.

Step 2: Call the script from the tutorials/cascading-aws home directory.

$ cd ./[PATH]/[TO]/cascading-tutorials/cascading-aws/
$ part2/src/scripts/emrExample.sh [REDSHIFT_URL] [REDSHIFT_USER] [REDSHIFT_PASSWORD] [S3_BUCKET]
[AWS_ACCESS_KEY] [AWS_SECRET_KEY]

The script logs all calls and output to the console. When it is complete you should see the following:

{
    "ClusterId": "[CLUSTER_ID]"
}

Step 3: Inspect AWS S3 for the bucket you specified when calling the emrExample.sh script. Ensure that the busket exists and that it contains one jar file and three .dat files. The EMR cluster will also write it’s log files to [BUCKET]/logs.

Step 4: Inspect your AWS EMR console for the new cluster. Here you can monitor progress of each step and gather other valuable information. If there are any errors during execution you will find them by selecting the cluster in question, and then selecting "Steps" where you will be able to view logs for any failed jobs.

Step 5: View your new table (part2_results) in Redshift using SQLWorkbenchJ

If the task completes successfully, you will have a new table in Redshift "part2_results" and 3 new folders in your s3TempDir containing part-XXXX files from the M/R tasks.

You can also use this live link to view the application in Driven.

part_2

=== What’s Going On?

==== emrExample.sh

First, let’s take a closer look at the shell script (./cascading-aws/part2/src/scripts/emrExample.sh). The primary points of interest include:

Cleaning and compiling the program:

# clean and build part 2 application
gradle :part2:clean :part2:jar

AWS CLI commands to create or empty S3 bucket and transfer required files:

# create the bucket or delete the contents if it exists
aws s3 mb s3://$BUCKET || aws s3 rm s3://$BUCKET --recursive

# create tmp bucket or delete the contents if it exists
aws s3 mb s3://$BUCKET/tmp || aws s3 rm s3://$BUCKET/tmp --recursive

# push required data files to S3
aws s3 cp data/date_dim.dat s3://$BUCKET/
aws s3 cp data/store_sales.dat s3://$BUCKET/
aws s3 cp data/item.dat s3://$BUCKET/

# push built jar file to S3
aws s3 cp $BUILD/$NAME s3://$BUCKET/$NAME

The AWS CLI command to create an EMR cluster - providing all necessary information to run the application we just compiled and sent to S3. Notice the bootstrap-action that installs the Cascading SDK:

INSTALL_SDK_URL="s3://files.cascading.org/sdk/2.6/install-cascading-sdk.sh"

aws emr create-cluster \
  --ami-version 3.3.1 \
  --instance-type $INSTANCE_TYPE \
  --instance-count $INSTANCE_COUNT \
  --bootstrap-actions Path=$INSTALL_SDK_URL,Name=SDK_LINGUAL_BOOTSTRAP \
  --name "cascading-aws-tutorial-2" \
  --visible-to-all-users \
  --enable-debugging \
  --auto-terminate \
  --no-termination-protected \
  --log-uri s3n://$BUCKET/logs/ \
  --steps Type=CUSTOM_JAR,Name=Part2,ActionOnFailure=TERMINATE_CLUSTER,Jar=s3n://$BUCKET/$NAME,Args=$REDSHIFT_URL,$REDSHIFT_USER,$REDSHIFT_PASSWORD,$AWS_ACCESS_KEY,$AWS_SECRET_KEY,$BUCKET

==== SampleFlow.java

Now let’s take a look at the java application (./cascading-aws/part2/src/main/java/cascading/aws/SampleFlow.java). We begin by instantiating our command line arguments.

String hfsDataDir = args[0];
String s3ResultsDir = args[1];
String redshiftJdbcUrl = args[2];
String redshiftUsername = args[3];
String redshiftPassword  = args[4];
String accessKey = args[5];
String secretKey = args[6];

After formatting a few variables, initializing our application and instantiating our AWSCredentials object we need to declare our Field names and types for the incoming data.

Fields DATE_DIM_FIELDS = new Fields( "d_date_sk", "d_date_id", "d_date", ... , "d_trailing_field" );
Class[] DATE_DIM_TABLE_TYPES = new Class[]{Integer.class, String.class, ... , String.class};
...

Then we define our SQL statement that will use to aggregate and join the data.

String statement = ("select count(store_sales.\"ss_item_sk\") as sales_count, items.\"i_category\" as category, dates.\"d_day_name\" " +
"from \"example\".\"dates\" as dates " +
"join \"example\".\"store_sales\" as store_sales on dates.\"d_date_sk\" = store_sales.\"ss_sold_date_sk\" " +
"join \"example\".\"items\" as items on items.\"i_item_sk\" = store_sales.\"ss_item_sk\" " +
"where items.\"i_category\" is not null " +
"group by items.\"i_category\", dates.\"d_day_name\" order by count(store_sales.\"ss_item_sk\") desc ");

Since we are only interested in a few of the Fields in the data file let’s go ahead and filter out all the unnecessary to expedite the processing.

//we only want these two fields from the dates file
Fields retainDates = new Fields( "d_day_name", "d_date_sk" );
//we only want these two fields from sales file
Fields retainSales = new Fields( "ss_item_sk", "ss_sold_date_sk" );
//we only want these two fields from items file
Fields retainItems = new Fields( "i_category", "i_item_sk" );

Pipe retainDatesPipe = new Pipe( "retainDates" );
retainDatesPipe = new Retain( retainDatesPipe, retainDates );

Pipe retainSalesPipe = new Pipe( "retainStoreSales" );
retainSalesPipe = new Retain( retainSalesPipe, retainSales );

Pipe retainItemsPipe = new Pipe( "retainItems" );
retainItemsPipe = new Retain( retainItemsPipe, retainItems );

Now that we’re working with our desired data set let’s instantiate our source and sink Taps.

// source taps
Tap datesDataTap = new Hfs( new TextDelimited( DATE_DIM_FIELDS, "|",
  DATE_DIM_TABLE_TYPES ), hfsDataDir + "/date_dim.dat" );
Tap salesDataTap = new Hfs( new TextDelimited( STORE_SALES_FIELDS, "|",
  STORE_SALES_TABLE_TYPES ), hfsDataDir + "/store_sales.dat" );
Tap itemsDataTap = new Hfs( new TextDelimited( ITEM_FIELDS, "|",
  ITEM_FIELDS_TYPES ), hfsDataDir + "/item.dat" );

// sink taps
Tap resultsDatesTap = new Hfs( new TextDelimited( new Fields( "d_day_name", "d_date_sk" ) ),
  "s3n://" + accessKey + ":" + secretKey + "@" + s3ResultsDir + "/dates", SinkMode.REPLACE );
Tap resultsItemsTap = new Hfs( new TextDelimited( new Fields( "i_category", "i_item_sk" ) ),
  "s3n://" + accessKey + ":" + secretKey + "@" + s3ResultsDir + "/items", SinkMode.REPLACE );
Tap resultsSalesTap = new Hfs( new TextDelimited( new Fields( "ss_item_sk", "ss_sold_date_sk" ) ),
  "s3n://" + accessKey + ":" + secretKey + "@" + s3ResultsDir +  "/sales", SinkMode.REPLACE );

// define result fields
Fields resultsFields = new Fields( "$0", "$1", "$2" ).applyTypes( Long.class, String.class, String.class );
// create RedshiftTableDesc for Redshift Table
RedshiftTableDesc resultsTapDesc = new RedshiftTableDesc( "part2_results", new String[]{"sales_count", "category", "day_name"}, new String[]{"int", "varchar(100)", "varchar(100)"}, null, null );
// create Redshift output final tap
Tap resultsTap = new RedshiftTap( redshiftJdbcUrl, redshiftUsername, redshiftPassword, "s3://" + s3ResultsDir + "/part2-tmp", awsCredentials, resultsTapDesc, new RedshiftScheme( resultsFields, resultsTapDesc ), SinkMode.REPLACE, true, true );

With our Pipes and Taps in hand we can now create our Flow definitions.

FlowDef flowDefSales = FlowDef.flowDef().setName( "retain sales info flow" )
  .addSource( retainSalesPipe, salesDataTap )
  .addTailSink( retainSalesPipe, resultsSalesTap );

FlowDef flowDefItems = FlowDef.flowDef().setName( "retain items info flow" )
  .addSource( retainItemsPipe, itemsDataTap )
  .addTailSink( retainItemsPipe, resultsItemsTap );

FlowDef flowDefDates = FlowDef.flowDef().setName( "retain dates info flow" )
  .addSource( retainDatesPipe, datesDataTap )
  .addTailSink( retainDatesPipe, resultsDatesTap );

// Final flow that sources from the three previous flows. Notice here how we define our schema.table names
// as sources (ie, "example.store_sales") that are used by the SQL query.
FlowDef flowDef = FlowDef.flowDef().setName( "sql flow" )
  .addSource( "example.store_sales", resultsSalesTap )      //declares SQL table name "example.store_sales"
  .addSource( "example.items", resultsItemsTap )            //declares SQL table name "example.items"
  .addSource( "example.dates", resultsDatesTap )            //declares SQL table name "example.dates"
  .addSink( "part2_results", resultsTap );

// Add SQLPlanner to final flow def
SQLPlanner sqlPlanner = new SQLPlanner().setSql( statement );
flowDef.addAssemblyPlanner( sqlPlanner );

All that’s left to do now is connect our flows and run them in a Cascade.

Flow flow1 = new HadoopFlowConnector().connect( flowDefSales );
Flow flow2 = new HadoopFlowConnector().connect( flowDefItems );
Flow flow3 = new HadoopFlowConnector().connect( flowDefDates );
Flow flow4 = new HadoopFlowConnector().connect( flowDef );

List<Flow> queryFlows = new ArrayList<Flow>();
queryFlows.add( flow1 );
queryFlows.add( flow2 );
queryFlows.add( flow3 );
queryFlows.add( flow4 );

CascadeConnector connector = new CascadeConnector();
Cascade cascade = connector.connect( queryFlows.toArray( new Flow[ 0 ] ) );
cascade.complete();

Reference for Advanced AWS and Cascading Users

Users who are already familiar with Redshift, Cascading and Lingual can make use of
this by adding the compiled library to their existing projects. Libraries for
`cascading-redshift` are hosted on http://conjars.org[conjars.org] and can be included
in an existing Maven or Gradle project by adding the conjars repo
`http://conjars.org/repo/` to your repo list and then adding either

Maven:

`<dependency>` +
`<groupId>cascading</groupId>` +
`<artifactId>cascading-jdbc-redshift</artifactId>` +
`<version>2.6.1</version>` +
`</dependency>` +

Gradle:

`compile group: 'cascading', name: 'cascading-redshift', version: '2.7.0'`

Congratulations, you just ran SQL on Hadoop using Lingual-JDBC and Cascading!

=== References
. Cascading SDK - http://www.cascading.org/sdk/
. Lingual home page - http://www.cascading.org/projects/lingual/
. AWS Command Line Interface - http://aws.amazon.com/cli/
. Cascading SubAssemblies - http://docs.cascading.org/cascading/3.0/userguide/ch15-advanced.html#subassemblies

== Next:
=== Part 3
link:part3.html[ETL on EMR with Cascading using S3 and Redshift]