The Scalding QuickStart Tutorial

Part 5: Joins

We did some data processing on two branches in part 3 and part 4 and obtained the IPs which accessed a particular product category (branch 1), and the IPs which are of interest to us (branch 2). We are now in a position to answer the question—​which IPs with a significant score (greater than 60) have accessed our product category?

To do this, we will have to join the two branches on the IP address. Let’s go through the code from top to bottom and see what’s going on.

Step 1: Parse, clean and split the input into two branches

We’ve already done this in earlier parts, and we show the code here for completeness:

  val input = TextLine(args("input"))
  val output = Tsv(args("output"))

  val inputFields = 'line
  val regexFields = ('ip, 'time, 'request, 'response, 'size)
  val regexString = "^([^ ]*) \\S+ \\S+ \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([^ ]*).*$"

  val filteredInput = input
    .read
    .mapTo('line -> regexFields) {
    te: TupleEntry =>
      val regex = new Regex(regexString)
      val split = regex
        .findFirstMatchIn(te.getString("line"))
        .get
        .subgroups(split(0), split(1), split(2), split(3), split(4))
  }.filterNot('size) {
    size: String =>  size == "-"
  }

  val branch1 = new RichPipe(filteredInput)
  val branch2 = new RichPipe(filteredInput)

Our task is to process branch1. We do that next.

Step 2: Process branch 1

This has been completed in part 1, and we show the code here for completeness:

val input = TextLine(args("input"))
  val output= Tsv(args("output"))

  val inputFields = 'line
  val regexFields = ('ip, 'time, 'request, 'response, 'size)

  val filteredInput = input.read.mapTo('line -> regexFields) {
    te: TupleEntry =>
      val regex = new Regex("^([^ ]*) \\S+ \\S+ \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([^ ]*).*$")
      val split = regex.findFirstMatchIn(te.getString("line")).get.subgroups
      (split(0), split(1), split(2), split(3), split(4))
  }.filterNot('size) { size: String => size == "-" }

  val branch1 = new RichPipe(filteredInput)
  val branch2 = new RichPipe(filteredInput)

  val processedBranch1 = branch1.filterNot('request) { req: String => req == "GET /images/" }
    .project('ip)
    .unique('ip)

Step 3: Process branch 2

We did this in part 4, and we show it here for completeness:

  val processedBranch2 =  branch2
    .project('ip)
    .unique('ip)
    .rename('ip -> 'userip)
    .map('userip -> 'score) {
      ip : String =>
      var i = 0
      var hash = 7
      for(i <- 0 to ip.length) {
        hash = hash * 31 + ip.indexOf(i)
      }
      hash % 100
    }.filterNot('score) { score : Int => score < 60 }

Step 4: Join the branches

Now we’re ready to join our branches on IP. In Scalding, joins are performed using the various Join operators. We use the "joinWithSmaller" operator, in which the smaller branch is specified inside the argument of the method. After joining, we discard the redundant IP field, since a join produces a union of fields of all the streams being joined. Next, we follow up by grouping on userip, and sorting on scores. The code to do this is shown below:

  val joinedBranch =  processedBranch2
    .joinWithSmaller('userip -> 'ip, processedBranch1)
    .discard('ip)
    .groupBy('userip){ group => group.sortBy('score) }

In the end, we write out our final output:

  joinedBranch.write(output)

This finishes the programming discussion for the tutorial. Now let’s run and see what is the final result of our program.

Run and Validate Your Program

Step 1: Compile

Go into the scalding-data-processing folder from your terminal, and type:

$ cd scalding-data-processing/part5
$ gradle clean fatjar

This will compile the code and create a "fat-jar", a jar file which contains all the required dependencies inside it. The fatjar will be present in the build/libs/ folder.

The build.gradle file is identical to other parts. Please see the explanation in part 1 for the dependencies required.

Step 2: Prepare the input and output directories in HDFS, only if you haven’t done already

$ hadoop fs -mkdir logs
$ hadoop fs -mkdir output
$ hadoop fs -put ../data/NASA_access_log_Aug95.txt logs

Step 3: Run the program

$ yarn jar build/libs/part5-fat.jar etl.Main --hdfs --input logs/NASA_access_log_Aug95.txt --output output/out.txt

Step 4: View the execution graph in Driven

Depending on how you configured your Driven plugin, either click the Driven URL from your console or log into the Driven application.

14/12/11 12:01:53 INFO state.AppStats: shutdown hook finished.
14/12/11 12:01:53 INFO rest.DrivenDocumentService: *http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16*
14/12/11 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON

part5

Figure 1: An example of the application’s view in Driven.

Here’s a Driven link to see this part’s execution graph on the Driven cloud service.

Note

If you registered at http://driven.io and installed the Driven API key, you will have accces to the “All Applications” view that tracks all your historical application runs. This view starts becoming interesting over a period of time when you want to track trending, identify outlier behavior, or monitor applications based on their termination status

Open your Driven-enabled app to track the progress of your application in real-time. Make sure that you have set the Refresh feature to ON. By default, the Driven updates the visualization every 30 seconds.

Note

Driven lets you visually track the progress of your application in real-time. This feature comes in very handy to sanity-check the progress of large, complex jobs. In addition, as the data applications get complex, the graph is an excellent way to review the architecture for your data-driven application. Examples of quick checks that can be conducted include ensuring that much of filtering of data pipes is done ahead of a join, establishing points where checkpoints have to be introduced, validating that the business requirements are aligned with the actual implementation of the data transformation function…

Now, it gets interesting to start exploring the application in the Driven Performance View. You can observe the intermediate Taps being created in each step. As applications get more complex, or the data sets become larger, the performance view becomes very important to understand how your code steps get decomposed into Mappers and Reducers, the cost associated with such steps (execution time), helping address such questions as, "how much did the join cost me?"

Step 5: Validate output

Let’s view what the output folder contains. Do:

$ hadoop fs -cat output/out.txt/* > out.txt
$ tail out.txt

You should see the following on your screen:

zeta.coe.neu.edu    60
zeus.esy.com    64
zeus.nic.dtag.de    60
zeus.shim.org.sg    60
zig.taynet.co.uk    60
zippo2.zippo.com    60
zorch.w3.org    64
zorro.sev.se    64
zuul.lcp.com    64
zuul.tfn.com    64

This tail snippet shows that our final output consists of all the users who have a score higher than 60, and who have accessed a particular product category (/images/).

This finishes our tutorial.