Java Developers Guide to Hive with Cascading

Part 3: Hive flows within a Cascade

What You Will See

In Part 3 of the tutorial, we will really dig in. You will hashjoins, cogroups, filters, each pipes, subassemblies and Hive flows running within Cascades.

Run and Validate Your Program

Step 1: Compile your program

$ cd cascading-hive/part3
$ gradle clean jar

Step 2: If you have not done it already from part 1 or part 2, copy the data files to Hadoop:

$ hadoop dfs -mkdir /tpcds
$ hadoop dfs -mkdir /tpcds/data
$ hadoop dfs -mkdir /tpcds/taps
$ hadoop dfs -put ../data/* /tpcds/data

Step 3: Run your flow

$ hadoop jar {path}/{to}/tutorials/cascading-hive/part3/build/libs/cascading-hive-1.0.0.jar hivedemo.Main

Step 4: View the execution of your flow in real-time through Driven

Depending on how you configured your Driven Plugin, either click the Driven URL from your console or log into the Driven application. If you are running this locally you will find it at http://localhost:8080/index.html

14/08/28 12:01:53 INFO state.AppStats: shutdown hook finished.
14/08/28 12:01:53 INFO rest.DrivenDocumentService: *http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16*
14/08/28 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON

You can also use this live link to view the application in Driven.

part3

What’s Going On?

There are five primary sections of logic in Part 3. Let’s take a closer look:

Step 1: declare our Fields, table columns and column types

// create Cascading Fields for date_dim data
public static final Fields DATE_DIM_FIELDS = new Fields(...)
// create Hive table fields for date_dim data
public static final String[] DATE_DIM_TABLE_FIELDS = new String[]{...}
// create Hive column types for date_dim data
public static final String[] DATE_DIM_TABLE_TYPES = new String[]{...}

// continue for additional files and tables
// ...

Step 2: Initialize the application

Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );
// add ApplicationTag for Driven identification and search functionality
AppProps.addApplicationTag( properties, "Cascading-Hive Demo Part3" );
HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

Step 3: Filter data using RegexFilter and Each pipes - Source from HDFS sink to Hive

// we will add all flows to this array to be added to the Cascade later
List<Flow> queryFlows = new ArrayList<Flow>();

// create FlowDef for date filter flow
FlowDef dateDimFilterFlow = FlowDef.flowDef();
// give name to FlowDef for Driven visibility
dateDimFilterFlow.setName( "FilterDateDim (Hive Sink)" );
// create initial Pipe
Pipe inputFilesPipe = new Pipe( "datedim_filter" );
// create RegexFilter to filter for all data from 2002
RegexFilter regexFilter = new RegexFilter( "2002" );
// create Each pipe to iterate over each record and apply regexFilter
inputFilesPipe = new Each( inputFilesPipe, new Fields( "d_year" ), regexFilter );
// add source and pipe to dateDimFilterFlow
dateDimFilterFlow.addSource( inputFilesPipe, new Hfs( new TextDelimited( DATE_DIM_FIELDS, "|" ), "/tpcds/data/date_dim.dat" ) );

// create HiveTableDescriptor for date_dim data
HiveTableDescriptor dateDimSinkTableDescriptor = new HiveTableDescriptor( "filtered_date_dim", DATE_DIM_TABLE_FIELDS, DATE_DIM_TABLE_TYPES );
// create HiveTap as sink
HiveTap dateDimSinkTap = new HiveTap( dateDimSinkTableDescriptor, dateDimSinkTableDescriptor.toScheme(), REPLACE, true );
// add tail sink to dateDimFilterFlow
dateDimFilterFlow.addTailSink( inputFilesPipe, dateDimSinkTap );

// add dateDimFilterFlow to queryFlows ArrayList for later use
queryFlows.add( flowConnector.connect( dateDimFilterFlow ) );

// repeat for Demographics and Store data
// ...

Step 4: Perform a series of HashJoins

Map<String, Tap> sources = new HashMap<String, Tap>();
Map<String, Tap> sinks = new HashMap<String, Tap>();
// create Hive table for sales<>item join results
HiveTableDescriptor storeSaleItemSinkTableDescriptor = new HiveTableDescriptor( "StoreSalesItemJoin", STORE_SALES_TABLE_FIELDS, STORE_SALES_TABLE_TYPES );
HiveTap storeSaleItemSink = new HiveTap( storeSaleItemSinkTableDescriptor, storeSaleItemSinkTableDescriptor.toScheme(), REPLACE, true );
sinks.put( "StoreSalesItemJoin", storeSaleItemSink );

// everything joins against store_sales so put that in first.
Tap storeSales = new Hfs( new TextDelimited( STORE_SALES_FIELDS, "|" ), "/tpcds/data/store_sales.dat" );
sources.put( "StoreSales", storeSales );
Pipe storeSalesPipe = new Pipe( "StoreSales" );

// JOIN item on (store_sales.ss_item_sk = item.i_item_sk)
Tap item = new Hfs( new TextDelimited( ITEM_FIELDS, "|" ), "/tpcds/data/item.dat" );
sources.put( "Item", item );
Pipe itemPipe = new Pipe( "Item" );
Pipe storeSalesItemJoin = new HashJoin( "StoreSalesItemJoin", storeSalesPipe, new Fields( "ss_item_sk" ), itemPipe, new Fields( "i_item_sk" ) );

// continue for joins on date_dim, store_sales, customer_demographics
// ...

// wire all the join flows together
queryFlows.add( flowConnector.connect( "JoinStoreSales (Hive Sources)", sources, sinks, storeSalesItemJoin, storeSalesDateDimJoin, storeSalesCustomerDemographicsJoin, storeSalesStoreJoin ) );

Step 5: Strip out extraneous fields using Retain

/*
* Strip out extraneous fields now
 */
Fields finalFields = new Fields( new Comparable[]{"i_item_id", "s_state", "ss_quantity", "ss_list_price", "ss_coupon_amt", "ss_sales_price"}, new Type[]{String.class, String.class, Double.class, Double.class, Double.class, Double.class} );
FlowDef fieldRemovingFlowDef = FlowDef.flowDef();
fieldRemovingFlowDef.setName( "RemoveExtraFields" );
Pipe allFieldsPipe = new Pipe( "AllFields" );
Pipe fieldRemovingPipe = new Retain( allFieldsPipe, finalFields );
fieldRemovingFlowDef.addSource( fieldRemovingPipe, storeSaleCustDemSink );
HiveTableDescriptor redactedFieldsTapTableDescriptor = new HiveTableDescriptor( "AllFields", SALES_REPORT_FIELDS, SALES_REPORT_TYPES );
HiveTap redactedFieldsTap = new HiveTap( redactedFieldsTapTableDescriptor, redactedFieldsTapTableDescriptor.toScheme(), REPLACE, true );
fieldRemovingFlowDef.addTailSink( fieldRemovingPipe, redactedFieldsTap );
queryFlows.add( flowConnector.connect( fieldRemovingFlowDef ) );

Step 6: Calculate averages using HiveFlow

Fields groupingFields = new Fields( "i_item_id", "s_state" );
// average quantity Hive query
String queryAvgQuantity = "SELECT i_item_id, AVG(ss_quantity), s_state FROM AllFields GROUP BY i_item_id, s_state";
String queriesAvgQuantity[] = {queryAvgQuantity};
// Hive table for average quantity results
HiveTableDescriptor avgQuantityTableDescriptor = new HiveTableDescriptor( "QuantityAverage", new String[]{"i_item_id", "ss_quantity", "s_state"}, new String[]{"string", "int", "string"} );
HiveTap quantityAverageTap = new HiveTap( avgQuantityTableDescriptor, avgQuantityTableDescriptor.toScheme(), REPLACE, true );
// quantity average Hive flow
HiveFlow avgQuantityHiveFlow = new HiveFlow( "Hive Flow - CalculateAverageQuantity", queriesAvgQuantity, Arrays.<Tap>asList( redactedFieldsTap ), quantityAverageTap );
// add avgQuantityHiveFlow to queryFlows ArrayList for later use
queryFlows.add( avgQuantityHiveFlow );

// continue for average price, average coupon amount, average sales price
// ...

Step 7: Join averages using CoGroup and discard unwanted fields using Discard

/*
* Join the averages together
 */
Map<String, Tap> reportSources = new HashMap<String, Tap>();
Map<String, Tap> reportSinks = new HashMap<String, Tap>();

reportSources.put( "QuantityAveragePipe", quantityAverageTap );
Pipe quantityAveragePipe = new Pipe( "QuantityAveragePipe" );
reportSources.put( "ListPriceAverage", listPipeAverageTap );
Pipe listPriceAveragePipe = new Pipe( "ListPriceAverage" );
reportSources.put( "CouponAmountAverage", couponAmountAverageTap );
Pipe couponAmountAveragePipe = new Pipe( "CouponAmountAverage" );
reportSources.put( "SalePriceAverage", salePriceAverageTap );
Pipe salePriceAveragePipe = new Pipe( "SalePriceAverage" );

Fields junkFields = new Fields( "i_item_id_junk", "s_state_junk" );

// cogroup quantityAveragePipe & listPriceAveragePipe on "i_item_id" and "s_state"
Pipe salesReportPipe = new CoGroup( "SalesReportQL", quantityAveragePipe, groupingFields, listPriceAveragePipe, groupingFields, new Fields( "i_item_id", "s_state", "ss_quantity", "i_item_id_junk", "s_state_junk", "ss_list_price" ) );
// strip unnecessary fields from salesReportPipe
salesReportPipe = new Discard( salesReportPipe, junkFields );
// cogroup salesReportPipe & couponAmountAveragePipe on "i_item_id" and "s_state"
salesReportPipe = new CoGroup( "SalesReportQLC", salesReportPipe, groupingFields, couponAmountAveragePipe, groupingFields, new Fields( "i_item_id", "s_state", "ss_quantity", "ss_list_price", "i_item_id_junk", "s_state_junk", "ss_coupon_amt" ) );
// strip unnecessary fields from salesReportPipe
salesReportPipe = new Discard( salesReportPipe, junkFields );
// cogroup salesReportPipe & salePriceAveragePipe on "i_item_id" and "s_state"
salesReportPipe = new CoGroup( "SalesReport", salesReportPipe, groupingFields, salePriceAveragePipe, groupingFields, new Fields( "i_item_id", "s_state", "ss_quantity", "ss_list_price", "ss_coupon_amt", "i_item_id_junk", "s_state_junk", "ss_sales_price" ) );
// strip unnecessary fields from salesReportPipe
salesReportPipe = new Discard( salesReportPipe, junkFields );
// create report output Hfs sinks
reportSinks.put( "SalesReportQL", getOutputTap( "SalesReportQL", Fields.ALL ) );
reportSinks.put( "SalesReportQLC", getOutputTap( "SalesReportQLC", Fields.ALL ) );

Step 8: Create final reports

// create SalesReport Hive table and add as sink
HiveTableDescriptor allReportTableDescriptor = new HiveTableDescriptor( "SalesReport", SALES_REPORT_FIELDS, SALES_REPORT_TYPES );
HiveTap allReportTap = new HiveTap( allReportTableDescriptor, allReportTableDescriptor.toScheme(), REPLACE, true );
sinks.put( "SalesReport", allReportTap );
reportSinks.put( "SalesReport", allReportTap );
queryFlows.add( flowConnector.connect( "GenerateReport (Hive Sources)", reportSources, reportSinks, salesReportPipe ) );

// finalReport Hive query
String query1 = "Select * FROM SalesReport GROUP BY i_item_id, s_state LIMIT 100";
String queries[] = {query1};
// finalReport Hive table
HiveTableDescriptor finalReportTableDescriptor = new HiveTableDescriptor( "FinalReport", SALES_REPORT_FIELDS, SALES_REPORT_TYPES );
// finalReport HiveTap
HiveTap finalReportTap = new HiveTap( finalReportTableDescriptor, finalReportTableDescriptor.toScheme(), REPLACE, true );
// finalReport HiveFlow
HiveFlow finalHiveFlow = new HiveFlow( "Hive Flow - Format Report", queries, Arrays.<Tap>asList( allReportTap ), finalReportTap );
queryFlows.add( finalHiveFlow );

Step 9: Connect all flows and complete Cascade

// create, connect (all flows from queryFlows) and complete cascade
CascadeConnector connector = new CascadeConnector();
Cascade cascade = connector.connect( queryFlows.toArray( new Flow[ 0 ] ) );
cascade.complete();

References

For more details about the particular operations or to understand how some of these steps can be modified for your use case, use the following resources: