The Scalding QuickStart Tutorial

Part 4: Implementing custom functions

In this part, we will perform data processing on the other branch using unique function and a custom function. Recall that we’re trying to simulate the case of gauging interest in a particular product category by some users. We want to select only those users who have bought products from our website in the past. Now, in a real case, we would have collected this information separately. For this tutorial’s purposes, we will artificially assign scores to each user based on a hash function computed on its IP address. We will see how to implement a custom function which can transform the incoming data stream.

Let’s go through the code from top to bottom and see what’s going on.

Step 1: Parse, clean and split the input into two branches

We’ve already done this in earlier parts, and we show the code here for completeness:

  val input = TextLine(args("input"))
  val output = Tsv(args("output"))

  val inputFields = 'line
  val regexFields = ('ip, 'time, 'request, 'response, 'size)
  val regexString = "^([^ ]*) \\S+ \\S+ \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([^ ]*).*$"

  val filteredInput = input
    .read
    .mapTo('line -> regexFields) {
    te: TupleEntry =>
      val regex = new Regex(regexString)
      val split = regex
        .findFirstMatchIn(te.getString("line"))
        .get
        .subgroups(split(0), split(1), split(2), split(3), split(4))
  }.filterNot('size) {
    size: String =>  size == "-"
  }

  val branch1 = new RichPipe(filteredInput)
  val branch2 = new RichPipe(filteredInput)

Step 2: Process branch 1

This has been completed in part 1, and we show the code here for completeness:

val input = TextLine(args("input"))
  val output= Tsv(args("output"))

  val inputFields = 'line
  val regexFields = ('ip, 'time, 'request, 'response, 'size)

  val filteredInput = input.read.mapTo('line -> regexFields) {
    te: TupleEntry =>
      val regex = new Regex("^([^ ]*) \\S+ \\S+ \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([^ ]*).*$")
      val split = regex.findFirstMatchIn(te.getString("line")).get.subgroups
      (split(0), split(1), split(2), split(3), split(4))
  }.filterNot('size) { size: String => size == "-" }

  val branch1 = new RichPipe(filteredInput)
  val branch2 = new RichPipe(filteredInput)

  val processedBranch1 = branch1.filterNot('request) { req: String => req == "GET /images/" }
    .project('ip)
    .unique('ip)

Step 3: Process branch 2

In this step, we need to assign a score between 0 to 100 to each IP address, and then retain only those IP addresses which have a score greater than 60.

Let’s first remove unnecessary fields, and retain only the unique IP addresses.

  val processedBranch2 =  branch2
    .project('ip)
    .unique('ip)

For reasons explained in the next part, we need to rename the IP field to something else. The rename operator can be used for this purpose:

  val processedBranch2 =  branch2
    .project('ip)
    .unique('ip)
    .rename('ip -> 'userip)

Now we have a stream consisting only of "userip" fields, and we are ready to assign a score to each of them. Since we are introducing a new field, we use the Map operator. The general form of Map operator is:

pipe.map('inputField -> 'outputField) { function }

The Map operator shown above will inject an additional field called outputField in the input stream by operating on the inputField present in each tuple. The operation is defined by the function given inside the curly braces. In our case, we can do a simple scoring by taking the hash of the ip address:

  val processedBranch2 =  branch2
    .project('ip)
    .unique('ip)
    .rename('ip -> 'userip)
    .map('userip -> 'score) {
      ip : String =>
      var i = 0
      var hash = 7
      for(i <- 0 to ip.length) {
        hash = hash * 31 + ip.indexOf(i)
      }
      hash % 100
    }

Now we have a tuple stream containing IPs and their scores. We’re however, only interested in IPs whose scores are greater than 60. We can use the Filter operator to reject all other IP addresses. The complete processing code of branch 2 thus becomes:

  val processedBranch2 =  branch2
    .project('ip)
    .unique('ip)
    .rename('ip -> 'userip)
    .map('userip -> 'score) {
      ip : String =>
      var i = 0
      var hash = 7
      for(i <- 0 to ip.length) {
        hash = hash * 31 + ip.indexOf(i)
      }
      hash % 100
    }.filterNot('score) { score : Int => score < 60 }

In the end, we write out the branch 2 so that we can verify it later.

  processedBranch2.write(output)

Run and Validate Your Program

Step 1: Compile

Go into the scalding-data-processing folder from your terminal, and type:

$ cd scalding-data-processing/part4
$ gradle clean fatjar

This will compile the code and create a "fat-jar", a jar file which contains all the required dependencies inside it. The fatjar will be present in the build/libs/ folder.

The build.gradle file is identical to other parts. Please see the explanation in part 1 for the dependencies required.

Step 2: Prepare the input and output directories in HDFS, only if you haven’t done already

$ hadoop fs -mkdir logs
$ hadoop fs -mkdir output
$ hadoop fs -put ../data/NASA_access_log_Aug95.txt logs

Step 3: Run the program

$ yarn jar build/libs/part4-fat.jar etl.Main --hdfs --input logs/NASA_access_log_Aug95.txt --output output/out.txt

Step 4: View the execution graph in Driven

Depending on how you configured your Driven plugin, either click the Driven URL from your console or log into the Driven application.

14/12/11 12:01:53 INFO state.AppStats: shutdown hook finished.
14/12/11 12:01:53 INFO rest.DrivenDocumentService: *http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16*
14/12/11 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON

part4

Figure 1: An example of the application’s view in Driven.

Here’s a Driven link to see this part’s execution graph on the Driven cloud service.

Note

Driven exposes three views of the DAG — Logical, Physical and Contracted. The Logical View lets you inspect and visualize the Tap, Pipe and other Cascading constructs that you specified in your code. With Physical View, you can also inspect intermediate Tap and Pipe Assemblies embedded in your code. In our case, we can see that the Retain function was used in the subassembly.

Step 5: Validate output

Let’s view what the output folder contains. Do:

$ hadoop fs -cat output/out.txt/* > out.txt
$ tail out.txt

You should see the following on your screen:

zeta.coe.neu.edu    60
zeus.esy.com    64
zeus.nic.dtag.de    60
zeus.shim.org.sg    60
zig.taynet.co.uk    60
zippo2.zippo.com    60
zorch.w3.org    64
zorro.sev.se    64
zuul.lcp.com    64
zuul.tfn.com    64

Note how the output consists only of IP addresses and their scores. Only those IPs are included which have a score higher than 60.

In the next part, we finish our tutorial with a discussion on joins.

Next: Part 5 - Joins