Integrating Cascading with Teradata

Introduction

In this tutorial, you will use Cascading to move data from HDFS to a Teradata instance, and then write the contents of the Teradata table back to HDFS. This example will demonstrate the usage of Teradata JDBC FASTEXPORT. This is particular useful when moving bulk data to and from Teradata instances.

Feel free to contact us through the Cascading User Group for any questions.

Prerequisites

  1. In order to follow the tutorial, you will need to have Java 6+, as well as Hadoop (any version will work) and Gradle v1.x installed on your computer. You will also need CRUD permissions and connection credentials to a Teradata Database. If you do not have a live Teradata instance please see ==Setting up Teradata==.

Note: Please ensure that you are using Gradle v1.x.

  1. If you do not already have the two necessary Teradata jar files on hand, go to TeraJDBC and download them to your local machine. This tutorial assumes the usage of version 14.10.00.39 for both tdgssconfig.jar and terajdbc4.jar.

Note: If you will be using a different version of the Teradata jar files then update the "ext.teradataVersion" variable in ./cascading-teradata/build.gradle as well as the "-Dversion" variable in the following commands. Also note that version 14+ is required.

  1. Run the following commands to install the Teradata jars to your local maven repository.

    $ mvn install:install-file -Dfile={path-to-terajdbc4.jar} -DgroupId=com.teradata
      -DartifactId=terajdbc4 -Dversion=15.00.00.20 -Dpackaging=jar
    $ mvn install:install-file -Dfile={path-to-tdgssconfig.jar} -DgroupId=com.teradata
      -DartifactId=tdgssconfig -Dversion=15.00.00.20 -Dpackaging=jar
  2. This tutorial depends on cascading-jdbc-teradata. To use this please clone the tutorials repository from GitHub and build it on your machine.

Clone the code onto your local disk:

$ git clone https://github.com/Cascading/cascading-jdbc.git
$ cd cascading-jdbc
  1. Ensure that the code compiles the cascading-jdbc-teradata subproject by running:

    **Note:** If you Teradata instance is not accessible, or you have the incorrect connection parameters the following command
    will fail.
    $ gradle clean install -Dcascading.jdbc.url.teradata=
        "jdbc:teradata://{td.instance.hostname}/USER={user},PASSWORD={pass}" -i
    1. This step will take a few minutues to build and run all the tests.

    2. This step will fail if your Teradata instance has not been set up (see Setting up Teradata) or the instance is currently unreachable.

  2. The sample code for this tutorial is hosted on GitHub. Clone the code onto your local disk:

    $ git clone https://github.com/Cascading/tutorials.git
    $ cd tutorials
    $ gradle :cascading-teradata:jar
  3. Start your local hadoop installation. If you do not have a local hadoop environment you can use our Vagrant setup here: Cascading Hadoop Vagrant

    $ hadoop/sbin/start-dfs.sh
    $ hadoop/start-yarn.sh
  4. In the resources folder of cascading-teradata you will find sampledata.csv. Use the following command to put sampledata.csv file into HDFS. Our sample program will read this file from HDFS and upload it to Teradata.

    $ hadoop fs -copyFromLocal /{path}/{to}/sampledata.csv /{hdfs}/{path}

Setting up Teradata

It is advisable to set up a simple sandbox environment to run this tutorial and further experiment with your Cascading-Teradata integration. Follow the instructions on this link to spin up a fresh Teradata instance: Teradata Express 14

After you have launched and configured your sandbox Teradata instance ensure that:

  1. Your Teradata instance is up and running by logging into the server and running the following command:

    $ pdestate -a
    1. If the output is not "PDE state is RUN/STARTED." then run the following command to start Teradata:

      # /etc/init.d/tpa start
  2. If you are using AWS, make sure you have modified your security group to ensure that you and your application can communicate with the server.

  3. You have created a new Teradata user, ie “dbadmin” that has full CRUD permissions.

  4. You have downloaded and configured Teradata Studio, which will allow you to explore your Teradata instance as you run your tests.

Using Teradata with Cascading

The file 'SampleFlow.java' contains two flows to illustrate Teradata’s integration with Cascading and Cascading’s JDBCTap and HFS Tap, demonstrating the entire lifecycle when data is moved to and from HDFS and Teradata.

  1. Write data from local HDFS to Teradata. The Teradata table "cascading_table_test" will be created automatically.

  2. Write data from Teradata to local HDFS in directory /CascadingTerdadaDemo/Sink to perform data transformations.

Execute this demo application using:

$ hadoop jar /{path}/{to}/cascading-teradata-sample.jar /{hdfs}/{path}/{to}/sampledata.csv
"jdbc:teradata://{teradata.instance.hostname}" {db user} {db password}

If the task completes successfully, you will have a new table in Teradata "cascading_test_table" and a new directory in HDFS /CascadingTeradataDemo/Sink containing the part-XXXX files from the M/R job that extracted the DB data.

$ hadoop dfs -ls /CascadingTeradataDemo/Sink

Understanding the Code

Let’s look inside ./cascading-teradata/src/main/java/teradata/SampleFlow.java. We will not cover the basics of Cascading (and recommend that you use the Impatient Series tutorial for that). Instead, we will focus on specifics for communicating with a Teradata instance using Cascading.

First, we need to import the following packages.

import cascading.jdbc.JDBCScheme;
import cascading.jdbc.JDBCTap;
import cascading.jdbc.TableDesc;
import cascading.jdbc.db.TeradataDBInputFormat;
import cascading.tuple.Fields;
import cascading.pipe.Pipe;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.flow.Flow;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.property.AppProps;
import cascading.scheme.hadoop.TextDelimited;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

Next, we set the variables from the parameters passed into the application.

String srcFile = args[0];        // file to read
String connStr = args[1];        // src table connection string,
    // ie, conn_str = "jdbc:teradata://ec2-1-2-3-4.compute-1.amazonaws.com"
String dbUser = args[2];         // DB username
String dbPass = args[3];         // DB password

If all arguments are present and sampledata.csv was successfully added to your local HDFS, then we can create the JDBCTap for Teradata upload and export an Hfs sink tap.

// Create Hfs source tap
Tap inTap = new Hfs(new TextDelimited(new Fields("startIpNum", "endIpNum", "locId"),
    false, ","), srcFile);

// Create JDBCTap for uploading data to Teradata
Tap uploadTeradataTap = obj.createTeraDataDbTap("cascading_test_table", connStr,
    dbUser, dbPass);

// Create JDBCTap for exporting data from Teradata using FASTEXPORT
Tap exportTeradataTap = obj.createTeraDataDbTap("cascading_test_table", connStr +
    "/TYPE=FASTEXPORT", dbUser, dbPass);

// Create Hfs sink tap for writing data to HDFS from Teradata
Tap sinkTap = new Hfs(new TextDelimited(new Fields("startIpNum", "endIpNum", "locId"),
    false, ","), "/CascadingTeradataDemo/Sink_" + getTimestamp() );

Once the taps are created, we create our simple pipes. At this stage you can add more pipes for data transformation. For this exercise we will just copy the data back and forth.

// Create two simple copy pipes - here you can add further pipes for data transformation
Pipe sourceCopyPipe = new Pipe( "sourcePipe" );
Pipe sinkCopyPipe = new Pipe( "sinkPipe" );

Now that we have our pipes, let’s create some flows.

// Create and connect flows
HadoopFlowConnector flowConnector = new HadoopFlowConnector();
Flow flow1 = flowConnector.connect( "flow1", inTap, uploadTeradataTap, sourceCopyPipe );
Flow flow2 = flowConnector.connect( "flow2", exportTeradataTap, sinkTap, sinkCopyPipe );

With our taps, pipes and flows in hand, let’s create, connect and complete a Cascade.

// Create, connect and complete cascade
CascadeConnector connector = new CascadeConnector();
Cascade cascade = connector.connect( flow1, flow2 );
cascade.complete();

Congratulations! You’ve successfully used Cascading to write from HDFS To Teradata and back from Teradata into HDFS.