Java Developers Guide to Hive with Cascading

Part 1: Simple file copy from HDFS to Hive

What You Will See

In Part 1 of the tutorial, we will cover several basic but vital operations involved while using Hive within Cascading:

  • Creating a simple Cascading source Tap (Hfs)

  • Creating a new Hive table using Cascading’s HiveTableDescriptor

  • Creating a simple Cascading sink Tap (HiveTap)

For the purposes of this tutorial, we will be using sample retail transaction data files.

Run and Validate Your Program

Step 1: Compile your program

$ cd cascading-hive/part1
$ gradle clean jar

Step 2: To run the the flow in Hadoop pseudo-distributed mode, copy the data files to Hadoop

$ hadoop dfs -mkdir /tpcds
$ hadoop dfs -mkdir /tpcds/data
$ hadoop dfs -mkdir /tpcds/taps
$ hadoop dfs -put ../data/* /tpcds/data

Step 3: Run the applications

$ hadoop jar {path}/{to}/tutorials/cascading-hive/part1/build/libs/cascading-hive-1.0.0.jar hivedemo.Main

Step 4: View the execution of your flow in real-time through Driven

Depending on how you configured your Driven plugin, either click the Driven URL from your console or log into the Driven application. If you are running this locally you will find it at http://localhost:8080/index.html

14/08/28 12:01:53 INFO state.AppStats: shutdown hook finished.
14/08/28 12:01:53 INFO rest.DrivenDocumentService: *http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16*
14/08/28 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON

You can also use this live link to view the application in Driven.

part1

Step 5: Review the console output and ensure that the results from "select * from call_center" have been printed:

---------------------- Hive JDBC --------------------------
data from hive table copy: key=5,value=AAAAAAAAEAAAAAAA
data from hive table copy: key=6,value=AAAAAAAAEAAAAAAA
data from hive table copy: key=1,value=AAAAAAAABAAAAAAA
data from hive table copy: key=2,value=AAAAAAAACAAAAAAA
data from hive table copy: key=3,value=AAAAAAAACAAAAAAA
data from hive table copy: key=4,value=AAAAAAAAEAAAAAAA
------------------------------------------------------------

What’s Going On?

In part 1 we will be creating a new Hive table and populating it with the contents of a file on HDFS using Cascading. We accomplish this with the following steps in our Cascading application:

Step 1: Initialize the application

Hadoop Flow connectors enable the flow to run on Hadoop. The cascading.flow.hadoop.HadoopFlowConnector provides a planner for running Cascading on an Apache Hadoop 1 cluster; cascading.flow.hadoop.Hadoop2MR1FlowConnector provides a planner for running Cascading on an Apache Hadoop 2 cluster.

Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );
// add ApplicationTag for Driven identification and search functionality
AppProps.addApplicationTag( properties, "Cascading-Hive Demo Part1" );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

Step 2: Setup source and sinks (Cascading Taps)

A Tap models the integration of a Cascading application to a usable data source. A Tap provides your flow the ability to read or write data from multiple external systems. A Tap could represent a data source such as file on a local file system, a database, a file on a Hadoop Distributed File System (HDFS), and as shown in this tutorial, an Apache Hive table. The Hive table will be automatically created if it does not exist. If it exists already any contents will be replaced each time the application runs.

The HiveTableDescriptor encapsulates information about a table in Hive like the table name, column names, types, partitioning etc. The class can convert the information to Hive specific objects or Cascading specific objects. It acts as a translator of the concepts of a Hive table and the concepts of a Cascading Tap/Scheme.

// create initial Hfs tap to read call_center.dat from local HDFS
Tap inTapCallCenter = new Hfs( new TextDelimited( CALL_CENTER_FIELDS, "|" ), "/tpcds/data/call_center.dat" );

// create HiveTableDescriptor to describe and create (if needed) call_center Hive table
HiveTableDescriptor sinkTableDescriptor = new HiveTableDescriptor( "call_center", CALL_CENTER_TABLE_FIELDS, CALL_CENTER_TABLE_TYPES );

// create HiveTap sinkTap using HiveTableDescriptor and descriptor scheme
HiveTap sinkTap = new HiveTap( sinkTableDescriptor, sinkTableDescriptor.toScheme(), REPLACE, true );

Step 3: Create a Pipe, Flow and Cascade and execute it

// create simple Pipe for copying data from HDFS to Hive
Pipe copyPipe = new Pipe( "copyPipe" );

HadoopFlowConnector flowConnector = new HadoopFlowConnector();

// create flow using HadoopFlowConnector, inTapCallCenter(HfsTap) as source, sinkTap (HIveTap) as sink and copyPipe to copy
Flow flow1 = flowConnector.connect( "flow1", inTapCallCenter, sinkTap, copyPipe );

// create, connect and complete cascade
CascadeConnector connector = new CascadeConnector();
Cascade cascade = connector.connect( flow1 );
cascade.complete();

Step 4: Create standard connection to Apache Hive and print table

In part 1 of this tutorial we do this simply to retrieve all contents of the newly created and populated Hive table and print them to the console.

Class.forName( "org.apache.hadoop.hive.jdbc.HiveDriver" );

// create local connection to Hive - your connection parameters may differ
Connection con = DriverManager.getConnection( "jdbc:hive://", "", "" );
Statement stmt = con.createStatement();

// select * from call_center Hive table we just populated
ResultSet rs = stmt.executeQuery( "select * from call_center" );
System.out.println( "----------------------Hive JDBC --------------------------" );
while( rs.next() )
  System.out.printf( "data from hive table copy: key=%s,value=%s\n", rs.getString( 1 ), rs.getString( 2 ) );

System.out.println( "---------------------------------------------------------" );
stmt.close();
con.close();

Next