The Scalding QuickStart Tutorial

Part 2: Implementing branches

In this part we’ll see how to split a data stream we obtained in the first part into two separate streams. Let’s see how to do it by going through the code.

Step 1: Read the input file, parse and filter it

We have already done this in part 1, the only difference is that we will be writing output to two directories instead of one. We have indicated this by:

  val output1= Tsv(args("output1"))
  val output2= Tsv(args("output2"))

The code for this step is repeated here for completeness.

  val input = TextLine(args("input"))
  val output1= Tsv(args("output1"))
  val output2= Tsv(args("output2"))

  val inputFields = 'line
  val regexFields = ('ip, 'time, 'request, 'response, 'size)

  val filteredInput = input.read.mapTo('line -> regexFields) {
    te: TupleEntry =>
      val regex = new Regex("^([^ ]*) \\S+ \\S+ \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([^ ]*).*$")
      val split = regex.findFirstMatchIn(te.getString("line")).get.subgroups
      (split(0), split(1), split(2), split(3), split(4))
  }.filterNot('size) {
    size: String => size == "-"
  }

At the end of this step, we have obtained each input log line’s five fields in succession in an object called filteredInput. The object in Scalding which is capable of holding fields is called a RichPipe. The object filteredInput’s type is therefore, RichPipe.

Step 2: Branch the stream

The RichPipe filteredInput is now ready to be split. We do it by:

  val branch1 = new RichPipe(filteredInput)
  val branch2 = new RichPipe(filteredInput)

Now both branch1 and branch2 will contain a copy of the data stream flowing through the filteredInput RichPipe.

Step 3: Write the output

As shown in part 1, we can write the stream flowing through a pipe by calling the write method. Since we have two streams, we write both streams out to two different locations:

  branch1.write(output1)
  branch2.write(output2)

Now that we’ve gone through the code, let’s build and run our program.

Run and Validate Your Program

Step 1: Compile

Go into the sclading-data-processing folder from your terminal, and type:

$ cd scalding-data-processing/part2
$ gradle clean fatjar

This will compile the code and create a "fat-jar", a jar file which contains all the required dependencies inside it. The fatjar will be present in the build/libs/ folder.

The build.gradle file is identical to part 1. Please see the explanation in part 1 for the dependencies required.

Step 2: Prepare the input and output directories in HDFS, only if you haven’t done already

$ hadoop fs -mkdir logs
$ hadoop fs -mkdir output1
$ hadoop fs -mkdir output2
$ hadoop fs -put ../data/NASA_access_log_Aug95.txt logs

Step 3: Run the program

$ yarn jar build/libs/part2-fat.jar etl.Main --hdfs --input logs/NASA_access_log_Aug95.txt --output1 output1/out.txt --output2 output2/out.txt

Step 4: View the execution graph in Driven

Depending on how you configured your Driven plugin, either click the Driven URL from your console or log into the Driven application.

14/12/11 12:01:53 INFO state.AppStats: shutdown hook finished.
14/12/11 12:01:53 INFO rest.DrivenDocumentService: *http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16*
14/12/11 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON

part2

Figure 1: An example of the application’s view in Driven.

Here’s a Driven link to see this part’s execution graph on the Driven cloud service.

  1. The first thing you will see is a graph — Directed Acyclic Graph (DAG) in formal parlance — that shows all the steps in your code, and the dependencies. The circles represent the Tap, and you can now inspect the function, Group by, and the count function used by your code by clicking on each step.

  2. Click on each step of the DAG. You will see additional details about the specific operator, and the reference to the line of the code where the that step was invoked.

  3. The timeline chart visualizes how the application executed in your environment. You can see details about the time taken by the flow to execute, and get additional insights by clicking on "Add Columns" button.

  4. If you executed your application on the Hadoop cluster in a distributed mode, you will get additional insights regarding how your Cascading flows mapped into mappers and reducers. Note, that the 'Performance View' is only available if you ran your application on Hadoop (distributed mode)

  5. In the timeline view, click on the your flow name link. You will see how your application logic got decomposed into mappers and reducers. You can also see the numbers of tasks created for each step, which is important to understanding performance bottlenecks.

     As your applications become more complex, the 'Performance View' becomes seminal in
    understanding the behavior of your application.

If you registered and configured the Driven API key, you will also have an “All Application” view, where we can see all the applications that are running, or have run in the Hadoop cluster for a given user. You can customize the view to display additional attributes such as application name, ID, owner. In addition, you can customize the view to filter the information based on status and dates.

Step 5: Validate output

Let’s view what the output folder contains. Do:

$ hadoop fs -cat output1/out.txt/* > out1.txt
$ less out1.txt

You should see the following on your screen:

in24.inetnebr.com       01/Aug/1995:00:00:01 -0400      GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0    200     1839
uplherc.upl.com 01/Aug/1995:00:00:07 -0400      GET / HTTP/1.0  304     0
uplherc.upl.com 01/Aug/1995:00:00:08 -0400      GET /images/ksclogo-medium.gif HTTP/1.0 304     0
uplherc.upl.com 01/Aug/1995:00:00:08 -0400      GET /images/MOSAIC-logosmall.gif HTTP/1.0       304     0
uplherc.upl.com 01/Aug/1995:00:00:08 -0400      GET /images/USA-logosmall.gif HTTP/1.0  304     0

Since we split the stream, and sent two exact copies of it to two different files, let’s see what the other directory contains:

$ hadoop fs -cat output2/out.txt/* > out2.txt
$ less out2.txt

You should again see the same output as shown above.

Let’s verify if the two outputs are identical

$ diff out1.txt out2.txt
$

Since the diff tool returns zero, or no difference, the two outputs are exact copies of each other. Thus we have seen how to split one stream into two parts.

In the next part, we continue our data operations on one of the branches.

Next: Part 3 - Filtering data