The Scalding QuickStart Tutorial

Part 3: Filtering Data

In this part, we will filter data in one of the branches obtained in part 2. Recall that we’re trying to simulate the case of gauging interest in a particular product category by some users. In this part, we will narrow down the stream to contain only the requests which went to a particular area of our website ("/GET/ images").

Step 1: Parse, clean and split the input into two branches

We’ve already done this in part 2, and we show the code here for completeness:

  val input = TextLine(args("input"))
  val output = Tsv(args("output"))

  val inputFields = 'line
  val regexFields = ('ip, 'time, 'request, 'response, 'size)
  val regexString = "^([^ ]*) \\S+ \\S+ \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([^ ]*).*$"

  val filteredInput = input
    .read
    .mapTo('line -> regexFields) {
    te: TupleEntry =>
      val regex = new Regex(regexString)
      val split = regex
        .findFirstMatchIn(te.getString("line"))
        .get
        .subgroups(split(0), split(1), split(2), split(3), split(4))
  }.filterNot('size) {
    size: String =>  size == "-"
  }

  val branch1 = new RichPipe(filteredInput)
  val branch2 = new RichPipe(filteredInput)

Our task is to filter branch 1. We do that next.

Step 2: Filter data in branch 1

We need to narrow down branch1 to include only those IP addresses which requested our newly introduced product. Let’s assume that our newly introduced product lies in the "/GET /images/" section. Note that this is a contrived example, but it is not too far from a real case—​you can easily substitute a real product category here. Our task boils down to filtering out the IPs that haven’t accessed the images:

val processedBranch1 = branch1.filterNot('request) { req: String => req == "GET /images/" }

This gives us all IPs which accessed images on our website. We are only interested in IP addresses, and we can discard all other fields (request, size, etc.) In Scalding, the construct which retains only the specified fields from a tuple stream is called a Project.

val processedBranch1 = branch1.filterNot('request) { req: String => req == "GET /images/" }
                       .project('ip)

At this point we have a stream consisting only of IP addresses who have visited the images section. However, the same IP address could have accessed the page multiple times. We need to remove duplicate IPs from this stream, and we can do it by using Unique. The Unique operator uses an internal first n buffer to effectively find the unique elements in a tuple stream.

val processedBranch1 = branch1.filterNot('request) { req: String => req == "GET /images/" }
                       .project('ip)
                       .unique('ip)

In the end, let’s write this stream to the output path so we can validate it later.

processedBranch1.write(output)

Run and Validate Your Program

Step 1: Compile

Go into the sclading-data-processing folder from your terminal, and type:

$ cd scalding-data-processing/part3
$ gradle clean fatjar

This will compile the code and create a "fat-jar", a jar file which contains all the required dependencies inside it. The fatjar will be present in the build/libs/ folder.

The build.gradle file is identical to parts 1 and 2. Please see the explanation in part 1 for the dependencies required.

Step 2: Prepare the input and output directories in HDFS, only if you haven’t done already

$ hadoop fs -mkdir logs
$ hadoop fs -mkdir output
$ hadoop fs -put ../data/NASA_access_log_Aug95.txt logs

Step 3: Run the program

$ yarn jar build/libs/part3-fat.jar etl.Main --hdfs --input logs/NASA_access_log_Aug95.txt --output output/out.txt

Step 4: View the execution graph in Driven

Depending on how you configured your Driven plugin, either click the Driven URL from your console or log into the Driven application.

14/12/11 12:01:53 INFO state.AppStats: shutdown hook finished.
14/12/11 12:01:53 INFO rest.DrivenDocumentService: *http://localhost:8080/driven/3B8BE330F87B4DF9BA9C7CABB3E1BC16*
14/12/11 12:01:53 INFO rest.DrivenDocumentService: messaging version 1.0-eap-57:JSON

part3

Figure 1: An example of the application’s view in Driven.

Here’s a link to see this part’s execution graph on the Driven cloud service.

Step 5: Validate output

Let’s view what the output folder contains. Do:

$ hadoop fs -cat output/out.txt/* > out.txt
$ tail out.txt

You should see the following on your screen:

zuul.tfn.com
zvezda.mit.edu
zweerts.et.tudelft.nl
zzdshing.slip.cc.uq.oz.au
zzdtarli.slip.cc.uq.oz.au
zzfgshs.slip.cc.uq.oz.au
zzghodso.slip.cc.uq.oz.au
zzjpolla.slip.cc.uq.oz.au
zztduffy.slip.cc.uq.oz.au
zzzzzzzz.mindspring.com

Note how the output consists only of IP addresses, all other fields are removed. This was the result of the Project operator. Also note that there are no two duplicate IPs, and that the output is sorted. This is the result of the Unique operator which is a reduce side operation in MapReduce.

In the next part, we continue our data operations on the other branch

Next: Part 4 - Implementing custom functions