Java Developers Guide to Hive with Cascading
Part 2: Intro to HiveFlow
What You Will See
In Part 2 of the tutorial, we will create several new Hive tables afterwhich we will run HQL queries from with HiveFlows
Run and Validate Your Program
Step1 : Compile your program
$ cd cascading-hive/part2 $ gradle clean jar
Step 2: If you have not done it already from the previous part, copy the data files to Hadoop:
$ hadoop dfs -mkdir /tpcds $ hadoop dfs -mkdir /tpcds/data $ hadoop dfs -mkdir /tpcds/taps $ hadoop dfs -put ../data/* /tpcds/data
Step 3: Run your flow
$ hadoop jar {path}/{to}/tutorials/cascading-hive/part2/build/libs/cascading-hive-1.0.0.jar hivedemo.Main
Step 4: View the execution of your flow in real-time through Driven
Depending on how you configured your Driven Plugin, either click the Driven URL from your console or log into the Driven application. If you are running this locally you will find it at http://localhost:8080/index.html
14/08/28 12:01:53 INFO state.AppStats: shutdown hook finished. +
14/08/28 12:01:53 INFO rest.DrivenDocumentService: *http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16* +
14/08/28 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON
You can also use this live link to view the application in Driven.
What’s Going On?
We will only cover the parts in the code that are different from the previous section.
In part 2 we introduce the HiveFlow. HiveFlow is a subclass of ProcessFlow for running Hive queries.
In this example we will create two new Hive tables, populate them with data, and run Hive queries against them within a Hive Flow. We will also use the Discard subassembly to trim unwanted trailing fields.
// add Discard subassembly to each pipe to discard trailing field off each row in data files
salesCopyPipe = new Discard( salesCopyPipe, new Fields( "trailing_field" ) );
returnsCopyPipe = new Discard( returnsCopyPipe, new Fields( "trailing_field" ) );
// create Hive query for sales_catalog tables
String salesQuery = "SELECT cs_item_sk, COUNT(cs_item_sk) AS quantity_sold " +
"FROM sales_catalog " +
"GROUP BY cs_item_sk ORDER BY quantity_sold DESC LIMIT 20";
// add salesQuery to array for use in HiveFlow
String queriesSales[] = {salesQuery};
// create HiveTableDescriptor for salesQuery results
HiveTableDescriptor topSalesTableDescriptor = new HiveTableDescriptor( "Top_20_Sales", new String[]{"cs_item_sk", "quantity_sold"}, new String[]{"string", "int"} );
// create HiveTap as sink for salesQuery results
HiveTap topSalesTap = new HiveTap( topSalesTableDescriptor, topSalesTableDescriptor.toScheme(), REPLACE, true );
// create HiveFlow using salesQuery, salesTap (HiveTap) as sources, topSalesTap (HiveTap) as sink
HiveFlow topSalesByCategoryHiveFlow = new HiveFlow( "Hive Flow - TopSalesByCategory", queriesSales, Arrays.<Tap>asList( salesTap ), topSalesTap );