Java Developers Guide to Hive with Cascading

Part 2: Intro to HiveFlow

What You Will See

In Part 2 of the tutorial, we will create several new Hive tables afterwhich we will run HQL queries from with HiveFlows

Run and Validate Your Program

Step1 : Compile your program

$ cd cascading-hive/part2
$ gradle clean jar

Step 2: If you have not done it already from the previous part, copy the data files to Hadoop:

$ hadoop dfs -mkdir /tpcds
$ hadoop dfs -mkdir /tpcds/data
$ hadoop dfs -mkdir /tpcds/taps
$ hadoop dfs -put ../data/* /tpcds/data

Step 3: Run your flow

$ hadoop jar {path}/{to}/tutorials/cascading-hive/part2/build/libs/cascading-hive-1.0.0.jar hivedemo.Main

Step 4: View the execution of your flow in real-time through Driven

Depending on how you configured your Driven Plugin, either click the Driven URL from your console or log into the Driven application. If you are running this locally you will find it at http://localhost:8080/index.html

14/08/28 12:01:53 INFO state.AppStats: shutdown hook finished. +
14/08/28 12:01:53 INFO rest.DrivenDocumentService: *http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16* +
14/08/28 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON

You can also use this live link to view the application in Driven.

part2

What’s Going On?

We will only cover the parts in the code that are different from the previous section.

In part 2 we introduce the HiveFlow. HiveFlow is a subclass of ProcessFlow for running Hive queries.

In this example we will create two new Hive tables, populate them with data, and run Hive queries against them within a Hive Flow. We will also use the Discard subassembly to trim unwanted trailing fields.

// add Discard subassembly to each pipe to discard trailing field off each row in data files
salesCopyPipe = new Discard( salesCopyPipe, new Fields( "trailing_field" ) );
returnsCopyPipe = new Discard( returnsCopyPipe, new Fields( "trailing_field" ) );
// create Hive query for sales_catalog tables
String salesQuery = "SELECT cs_item_sk, COUNT(cs_item_sk) AS quantity_sold " +
  "FROM sales_catalog " +
  "GROUP BY cs_item_sk ORDER BY quantity_sold DESC LIMIT 20";

// add salesQuery to array for use in HiveFlow
String queriesSales[] = {salesQuery};

// create HiveTableDescriptor for salesQuery results
HiveTableDescriptor topSalesTableDescriptor = new HiveTableDescriptor( "Top_20_Sales", new String[]{"cs_item_sk", "quantity_sold"}, new String[]{"string", "int"} );

// create HiveTap as sink for salesQuery results
HiveTap topSalesTap = new HiveTap( topSalesTableDescriptor, topSalesTableDescriptor.toScheme(), REPLACE, true );

// create HiveFlow using salesQuery, salesTap (HiveTap) as sources, topSalesTap (HiveTap) as sink
HiveFlow topSalesByCategoryHiveFlow = new HiveFlow( "Hive Flow - TopSalesByCategory", queriesSales, Arrays.<Tap>asList( salesTap ), topSalesTap );

Next