Integrating Cascading with Map Reduce APIs

Introduction

In this tutorial, you will learn how to wrap Map Reduce code in Cascading flows. This is particularly useful when you are integrating your legacy code written with Map Reduce APIs with on-going implementation in Cascading. In addition, there are scenarios where, for performance reasons, some parts of the application will be implemented with Map Reduce APIs and will require integration with the other parts of the application written with Cascading APIs.

In this tutorial, we will take the standard Word Count example, and wrap it inside a Cascading flow. The code change is simple (only one line), but the implications are important — once the code can be wrapped inside a Cascading flow, it can now be integrated within a Cascading application with other flows, which could be implemented with Cascading APIs, or even Hive scripts.

For purposes of keeping focus, we will not discuss how to integrate this flows with other flows written with Cascading APIs. You can learn how to build and run a Cascading application with multiple, heterogeneous flows by following the Cascading-Hive tutorial:

Feel free to contact us through the Cascading User Group for any questions.

Prerequisites

  1. In order to follow the tutorial, you will need to have Java 6+, as well as Hadoop (any version will work) and Gradle v1.x installed on your computer.

  2. The code of this tutorial is hosted on GitHub. Clone the code onto your local disk:

    $ git clone https://github.com/Cascading/tutorials.git
    $ cd tutorials/cascading-mr
    $ gradle clean jar
  3. Start your local hadoop installation. If you do not have a local hadoop environment you can use our Vagrant setup here: Cascading Hadoop Vagrant

    $ hadoop/sbin/start-dfs.sh
    $ hadoop/start-yarn.sh
  4. In the data folder of cascading-mr you will have a sample file wc.dat that we will use for our word count application. Use the following command to put wc.dat file into HDFS. Our sample program will read this file from HDFS.

    $ hadoop dfs -mkdir /wc_input
    $ hadoop fs -copyFromLocal ./data/wc.dat /wc_input

Executing the code

The file WordCount.java contains one flow that encapsulates the word count example code written with Map Reduce APIs.

Execute this demo application using:

$ hadoop jar ./build/libs/cascading-mr.jar /wc_input /wc_output

If the task completes successfully, you will have a new file in /wc_output with the word count results.

$ hadoop dfs -cat /wc_output/part-00000

Let’s verify that the code was executed as a Cascading flow by following the link available through Driven. Here is an example of how the flow would render

Understanding the Code

Let’s look inside ./cascading-mr/src/main/java/mr/WordCount.java. We will not cover the basics of Cascading (and recommend that you use the Impatient Series tutorial for that). Instead, we will focus on specifics for running the standard word count application written with Map Reduce APIs as a Cascading flow.

First, we need to import the following packages.

import cascading.flow.hadoop.MapReduceFlow;
import cascading.property.AppProps;

The first step, which is to set the application name and tag the application is optional. However, tagging your applications and setting app names is good product hygiene, and promotes better maintainence.

Properties properties = new Properties();
AppProps.addApplicationTag( properties, "tutorials" );
AppProps.addApplicationTag( properties, "cluster:development" );
AppProps.setApplicationName( properties, "cascading-mapreduce-flow") ;

Next, and last, we will create a Flow, and will use the JobConf parameters to create and execute the flow (instead of using JobClient)

MapReduceFlow flow = new MapReduceFlow( "wordcount", conf, true );

// JobClient.runJob(conf);
flow.complete() ;

Congratulations! You’ve successfully used run an application written with Map Reduce APIs as a Cascading application.