Multitool is a simple command line interface for building large-scale data processing jobs based on Cascading. It provides the functions of many popular Unix/Linux command line utilities which are typically connected by pipes - such as grep, sed, awk, cut, join, uniq, wc, tr, cat, sort - with Apache Hadoop doing the heavy lifting.

Multitool supports all Cascading platforms namely hadoop, hadoop2-mr1, and local. The hadoop platform is for hadoop distributions based on hadoop 1. The hadoop2-mr1 platform is for all distributions based on Hadoop 2 (YARN) and the local platform is for running everything in a local JVM without Hadoop. The compatibility page shows you, if your version of Hadoop is compatible with Cascading and Multitool.

The code of Multitool is hosted on GitHub.

Installation

To install the latest version of Multitool do the following:

> curl http://files.cascading.org/multitool/2.6/install-multitool.sh | bash

This script downloads and installs the latest multitool shell script into ~/.multitool/ and updates any local .bashrc file.

Setting the platform

As explained above Multitool supports all Cascading platforms. In order for Multitool to work, you have to tell it which platform you want to use. Below we see all ways of setting the platform in order of precedence:

The first way of setting the platform is using the --platform <name> command line switch. Valid platform names are hadoop, hadoop2-mr1 and local. The next possibilty is setting the MULTITOOL_PLATFORM environment variable. If that is not set, Multitool will try to use the CASCADING_PLATFORM environment variable. If that is also not set, it will look for a file called .cascading in the current working directory. The .cascading file is a Java properties file and can be used to set the platform via the keys multitool.platform.name or cascading.platform.name. If no file is present in the current working directory, multitool will look for a .cascading file in the user’s $HOME directory, which may contain the same keys.

The Multitool shell wrapper will chose hadoop as the default platform, if none of the above settings can be found.

Locating your Hadoop.

If you use the hadoop platform Multitool will try to locate the hadoop command in your environment by searching the PATH and by inspecting the HADOOP_HOME environment variable. If no usable hadoop script can be found, Multitool will fail to run. Likewise if you choose to run on the hadoop2-mr1 platform it tries to locate the yarn script that ships with Hadoop 2 based distributions. It will look in the PATH, the HADOOP_YARN_HOME and the HADOOP_HOME directories.

If you do not have any version of Hadoop installed, you can use the local platform and still benefit from the power of Multitool and Cascading.

Invocation

The preferred way of running Multitool is using the shell wrapper coming with the distribution:

> multitool param1 param2 .. paramN

It is also possible to download the hadoop and/or hadoop2 jar files compatible with your Hadoop version. This is considered an expert mode of using Multitool. If you are unsure, stick with the full client install.

If you choose to work with the jar files directly, you can download them like so:

Hadoop 1:

> wget -i http://files.cascading.org/multitool/2.6/latest-hadoop-jar.txt

To run it do this:

> hadoop jar multitool-hadoop-<version>.jar param1 param2 .. paramN

For Hadoop 2:

> wget -i http://files.cascading.org/multitool/2.6/latest-hadoop2-mr1-jar.txt

To run it do this:

> yarn jar multitool-hadoop2-mr1-<version>.jar param1 param2 .. paramN

Please note that you have to set the platform to a valid value, even if you work with the jar files.

All downloads are hosted on a public S3 bucket, which makes it easy to deploy Multitool in a bootstrap action an Amazon EMR cluster.

Update

If you want to update Multitool to the lastest version, you can do that like so:

> multitool --selfupdate

This feature is only available in the shell wrapper and not in the jar files.

Why use Multitool?

Multitool can run on your laptop with a small data set. The very same commands can also be used to run on a large Hadoop cluster with petabytes of data. For example, you can prototype a large, complex ETL workflow simply using sample data on a Linux command line prompt. Then deploy that same app at scale on a Hadoop cluster.

Another benefit is learning about efficient MapReduce programming, because Multitool can generate DOT files to represent its workflows. These can be viewed with popular drawing apps such as OmniGraffle and Visio. In other words, you can work through a problem with small data sets on a command line, ensuring that correct results get produced. Then generate a DOT file to see how to program the same workflow in Cascading. In addition to Java, that could also be coded in Python, Scala, Clojure, or Ruby - with all the benefits of libraries, tools, and software lifecycle process which those languages provide.

The data to run the examples is included in the full client install of Multitool and can be found in the data/ directory in the root of the installation directory.

Example 1: Grep

Consider having a file called days.txt which contains the following text:

I read in the newspapers they are going to have 30 minutes of intellectual stuff on television every Monday from 7:30 to 8.
Sometimes it pays to stay in bed in Monday, rather than spending the rest of the week debugging Monday's code.
Monday's child is fair in face, Tuesday's child is full of grace, Wednesday's child is full of woe.
Monday religion is better than Sunday profession.
A schedule so tight that it would only work if I didn't sleep on Monday nights.
We're still investigating. I heard that Monday or Tuesday we will probably be having a press conference announcing more.
We take time to go to a restaurant two times a week. A little candlelight, dinner, soft music and dancing. She goes Tuesdays, I go Fridays.

One could use the grep utility to find text lines in that file which match a given pattern. For example, to write all the lines containing the string Tuesday into a file called output/tuesday.txt:

> grep Tuesday data/days.txt > output/tuesday.txt

The results in the output/tuesday.txt file would be:

Monday's child is fair in face, Tuesday's child is full of grace, Wednesday's child is full of woe.
We're still investigating. I heard that Monday or Tuesday we will probably be having a press conference announcing more.
We take time to go to a restaurant two times a week. A little candlelight, dinner, soft music and dancing. She goes Tuesdays, I go Fridays.

To use Multitool for the exact same work:

> multitool source=data/days.txt select=Tuesday sink=output/tuesday.txt

Note that Multitool uses Cascading operations for its parameters - a source for input and a sink for output in this case. That allows for a graph of MapReduce jobs to be connected together as a workflow, run as a single application.

Example 2: Word Count

Everybody loves the ubiquitous MapReduce example of Word Count, so let’s take a look at it in terms of Multitool compared with Linux command line utilities. First, the Multitool version:

> multitool source=data/days.txt \
          expr="\$0.toLowerCase()" \
          gen gen.delim=" " \
          group=0 \
          count \
          group=1 \
          group.secondary=0 \
          group.secondary.reverse=true \
          sink=output/ \
          sink.part=1

Note that you may need to escape some characters when running from a Linux shell, such as in the expr statement above. That may not be needed in other Hadoop environments, such as Elastic MapReduce.

Log output from an example run is stored in a gist.

The first 10 lines in the results would be:

to  5
a   5
of  4
monday  4
is  4
in  4
i   4
the 3
child   3
we  2

The same work performed with Linux command line utilities would be:

> cat data/days.txt | tr [A-Z] [a-z] | tr " " \\n | sort | uniq -c | tr -s " " | cut -f 2,3 -d" " | sort -nr | awk -F'[ ]' '{ printf "%s\t%s\n", $2, $1 }'

These two different approaches use roughly the same length of command lines. However, the Multitool approach is stated in terms of set operations used in database theory, and is much closer conceptually to what’s happening in MapReduce. When you need to think in terms of data processing with MapReduce, then Multitool provides a simpler approach. It also scales as big as your Hadoop cluster.

We can look at this differently by adding another Multitool parameter --dot=flow.dot to generate a file flow.dot, which depicts the MapReduce workflow as a directed acyclic graph or DAG.

> multitool --dot=flow.dot \
          source=data/days.txt \
          expr="\$0.toLowerCase()" \
          gen gen.delim=" " \
          group=0 \
          count \
          group=1 \
          group.secondary=0 \
          group.secondary.reverse=true \
          sink=output/ sink.part=1

The generated graph (in the center column of the following diagram) shows how this workflow is built out of two MapReduce jobs connected in a flow:

Directed graph for Word Count example

The first mapper phase loads from days.txt and processes the text. It splits each line on the spaces, converts the words to lowercase, then emits those words as tuples for the following reducer. The first reducer phase performs a group by to count the occurrence of each word. The second mapper phase simply loads this input (an identity mapper), and the second reducer phase sorts the results in decreasing order based on count.

Back to the point about learning to program MapReduce efficiently, let’s examine each of the Multitool commands which were used:

> multitool source=data/days.txt \
          expr="\$0.toLowerCase()" \
          gen \
          gen.delim=" " \
          group=0 count \
          group=1 \
          group.secondary=0 \
          group.secondary.reverse=true \
          sink=output/ sink.part=1

The source argument specifies that the input should be read from the file days.txt for the initial map phase. That is called a tap in Cascading.

The expr argument applies a function to the data, in this case a Java String function to convert the data to lower case.

The two gen arguments split the input text using a space delimiter, where the resulting stream of words is used for the input tuples to the rest of the flow.

The next two arguments, group=0 and count specify that fields emitted by the regular expression for the preceding gen should be grouped and counted according to the first column, i.e. count the number of occurrences of each word in the stream. This will produce two columns: words and counts. Note that a group must be used before a count argument.

The next set of group arguments cause the output to be sorted in descending order, based on the counts.

Finally, the two sink arguments write the output to the “output/” directory, where “sink.part=1” means that all output should be consolidated into a single file.

Example 3: Join

Next, let’s join a view of the data in days.txt with a view of the data in another file called rhymes.txt which contains the following text:

Hey diddle diddle! The cat and the fiddle the cow jumped over the moon
Humpty Dumpty sat on a wall, Humpty Dumpty had a great fall. All the King's horses, and all the King's men cannot put Humpty Dumpty together again.
Jack be nimble, Jack be quick, Jack jump over the candlestick.
Hickory, dickory dock! The mouse ran up the clock. The clock struck one, and down her un, hickory, dickory, dock!
If you sneeze on Monday, you sneeze for danger; sneeze on a Tuesday, kiss a stranger.

For this example, we take two flows, one based on days.txt which we label as lhs and another based on rhymes.text which we will label as “rhs”, apply “unique” to both, and then join them to produce the output in a directory output/join/:

> multitool source=data/days.txt \
          source.name=lhs \
          expr="\$0.toLowerCase()" gen gen.delim="[^a-z]+" \
          group=0 unique \
          source=data/rhymes.txt source.name=rhs \
          expr="\$0.toLowerCase()" gen gen.delim="[^a-z]+" \
          group=0 unique \
          join join.lhs=lhs join.rhs=rhs \
          cut=0 \
          sink=output/join sink.part=1

The results provide the intersection of words common to both input files:

a
and
be
if
monday
on
s
the
tuesday

Since this workflow is a graph and not a linear pipeline, this example becomes more complex to state as a pipe of Unix/Linux command line utilities. We could use temporary files, and multiple command lines. However, keep in mind that the Multitool example only had two flows being joined, while it could have many flows joined in a complex DAG. That’s where the power of Cascading really begins to shine.

CLI Options Reference

Usage:

first tap must be a source and last tap must be a sink

Table 1. Options:

--selfupdate

Installs the latest version of Multitool (only available in the shell wrapper)

-h/--help

show this help text

--markdown

generate help text as GitHub Flavored Markdown

--appname=name

sets cascading application name

--tags=comma,separated

sets cascading application tags, comma separated

--platform=name

name of the cascading platform to use

--version

prints the version and exits

--dot=filename

write a plan DOT file, then exit

Table 2. Taps:

sink

an url to output path

sink.select

fields to sink

sink.replace

set true if output should be overwritten

sink.compress

compression: enable, disable, or default

sink.writeheader

set true to write field names as the first line

sink.delim

delimiter used to separate fields

sink.seqfile

write to a sequence file instead of text; writeheader, delim, and compress are ignored

source

an url to input data

source.name

name of this source, required if more than one

source.skipheader

set true if the first line should be skipped

source.hasheader

set true if the first line should be used for field names

source.delim

delimiter used to separate fields

source.seqfile

read from a sequence file instead of text; specify N fields, or true

Table 3. Operations:

gen

split the first field, and return the given result fields as new tuples

gen.delim

regex delimiter, default: \t (TAB)

unique

return the first value in each grouping

reject

regex, matches are discarded. all fields are matched unless args is specified

reject.args

fields to match against

count

count the number of values in the grouping

select

regex, matches are kept. matches against all fields unless args is given

select.args

fields to match against

sexpr

use java expression as filter, e.g. $0 != null

sexpr.args

the fields to use as arguments

sum

sum the values in the grouping

pgen

parse the first field with given regex, return as new tuples

expr

use java expression as function, e.g. $0.toLowerCase()

expr.args

the fields to use as arguments

debug

print tuples to stdout of task jvm

debug.prefix

a value to distinguish which branch debug output is coming from

join

what fields to join and group on, grouped fields are sorted

join.lhs

source name of the lhs of the join

join.lhs.group

lhs fields to group on, default FIRST

join.rhs

source name of the rhs of the join

join.rhs.group

rhs fields to group on, default FIRST

join.joiner

join type: inner, outer, left, right

join.name

branch name

discard

narrow the stream removing the given fields. 0 for first, -1 for last

concat

join the given fields, will join ALL by default

concat.delim

delimiter, default: \t (TAB)

replace

apply replace the regex

replace.replace

replacement string

replace.replaceAll

true if pattern should be applied more than once

filename

include the filename from which the current value was found (not supported on the local platform)

filename.append

append the filename to the record (not supported on the local platform)

filename.only

only return the filename (not supported on the local platform)

retain

narrow the stream to the given fields. 0 for first, -1 for last

cut

split the first field, and return the given fields, or all fields. 0 for first, -1 for last

cut.delim

regex delimiter, default: \t (TAB)

parse

parse the first field with given regex

parse.groups

regex groups, comma delimited

group

what fields to group/sort on, grouped fields are sorted

group.secondary

fields to secondary sort on

group.secondary.reverse

set true to reverse secondary sort