Multitool is a simple command line interface for building large-scale data processing jobs based on Cascading. It
provides the functions of many popular Unix/Linux command line utilities which are typically connected by pipes - such
as grep
, sed
, awk
, cut
, join
, uniq
, wc
, tr
, cat
, sort
- with Apache Hadoop doing the heavy lifting.
Multitool supports all Cascading platforms namely hadoop
, hadoop2-mr1
, and local
. The hadoop
platform is for
hadoop distributions based on hadoop 1. The hadoop2-mr1
platform is for all distributions based on Hadoop 2 (YARN) and
the local
platform is for running everything in a local JVM without Hadoop. The
compatibility page shows you, if your version of Hadoop is compatible
with Cascading and Multitool.
The code of Multitool is hosted on GitHub.
Installation
To install the latest version of Multitool do the following:
> curl http://files.cascading.org/multitool/2.6/install-multitool.sh | bash
This script downloads and installs the latest multitool
shell script into ~/.multitool/
and updates any
local .bashrc
file.
Setting the platform
As explained above Multitool supports all Cascading platforms. In order for Multitool to work, you have to tell it which platform you want to use. Below we see all ways of setting the platform in order of precedence:
The first way of setting the platform is using the --platform <name>
command line switch. Valid platform names are
hadoop
, hadoop2-mr1
and local
. The next possibilty is setting the MULTITOOL_PLATFORM
environment
variable. If that is not set, Multitool will try to use the CASCADING_PLATFORM
environment variable. If that is also
not set, it will look for a file called .cascading
in the current working directory. The .cascading
file is a Java
properties file and can be used to set the platform via the keys multitool.platform.name
or cascading.platform.name
.
If no file is present in the current working directory, multitool will look for a .cascading
file in the user’s
$HOME
directory, which may contain the same keys.
The Multitool shell wrapper will chose hadoop
as the default platform, if none of the above settings can be found.
Invocation
The preferred way of running Multitool is using the shell wrapper coming with the distribution:
> multitool param1 param2 .. paramN
It is also possible to download the hadoop and/or hadoop2 jar files compatible with your Hadoop version. This is considered an expert mode of using Multitool. If you are unsure, stick with the full client install.
If you choose to work with the jar files directly, you can download them like so:
Hadoop 1:
> wget -i http://files.cascading.org/multitool/2.6/latest-hadoop-jar.txt
To run it do this:
> hadoop jar multitool-hadoop-<version>.jar param1 param2 .. paramN
For Hadoop 2:
> wget -i http://files.cascading.org/multitool/2.6/latest-hadoop2-mr1-jar.txt
To run it do this:
> yarn jar multitool-hadoop2-mr1-<version>.jar param1 param2 .. paramN
All downloads are hosted on a public S3 bucket, which makes it easy to deploy Multitool in a bootstrap action an Amazon EMR cluster.
Update
If you want to update Multitool to the lastest version, you can do that like so:
> multitool --selfupdate
This feature is only available in the shell wrapper and not in the jar files.
Why use Multitool?
Multitool can run on your laptop with a small data set. The very same commands can also be used to run on a large Hadoop cluster with petabytes of data. For example, you can prototype a large, complex ETL workflow simply using sample data on a Linux command line prompt. Then deploy that same app at scale on a Hadoop cluster.
Another benefit is learning about efficient MapReduce programming, because Multitool can generate
DOT
files to represent
its workflows. These can be viewed with popular drawing apps such as OmniGraffle and Visio. In other words, you can work
through a problem with small data sets on a command line, ensuring that correct results get produced. Then generate a
DOT file to see how to program the same workflow in Cascading. In addition to Java, that could also be coded in Python,
Scala, Clojure, or Ruby - with all the benefits of libraries, tools, and software lifecycle process which those
languages provide.
Example 1: Grep
Consider having a file called days.txt
which contains the following text:
I read in the newspapers they are going to have 30 minutes of intellectual stuff on television every Monday from 7:30 to 8. Sometimes it pays to stay in bed in Monday, rather than spending the rest of the week debugging Monday's code. Monday's child is fair in face, Tuesday's child is full of grace, Wednesday's child is full of woe. Monday religion is better than Sunday profession. A schedule so tight that it would only work if I didn't sleep on Monday nights. We're still investigating. I heard that Monday or Tuesday we will probably be having a press conference announcing more. We take time to go to a restaurant two times a week. A little candlelight, dinner, soft music and dancing. She goes Tuesdays, I go Fridays.
One could use the grep
utility to find text lines in that file which match a given pattern. For example, to write all
the lines containing the string Tuesday
into a file called output/tuesday.txt
:
> grep Tuesday data/days.txt > output/tuesday.txt
The results in the output/tuesday.txt
file would be:
Monday's child is fair in face, Tuesday's child is full of grace, Wednesday's child is full of woe. We're still investigating. I heard that Monday or Tuesday we will probably be having a press conference announcing more. We take time to go to a restaurant two times a week. A little candlelight, dinner, soft music and dancing. She goes Tuesdays, I go Fridays.
To use Multitool for the exact same work:
> multitool source=data/days.txt select=Tuesday sink=output/tuesday.txt
Note that Multitool uses Cascading operations for its parameters - a source for input and a sink for output in this case. That allows for a graph of MapReduce jobs to be connected together as a workflow, run as a single application.
Example 2: Word Count
Everybody loves the ubiquitous MapReduce example of Word Count
, so let’s take a look at it in terms of Multitool
compared with Linux command line utilities. First, the Multitool version:
> multitool source=data/days.txt \ expr="\$0.toLowerCase()" \ gen gen.delim=" " \ group=0 \ count \ group=1 \ group.secondary=0 \ group.secondary.reverse=true \ sink=output/ \ sink.part=1
Note that you may need to escape some characters when running from a Linux shell, such as in the expr
statement above.
That may not be needed in other Hadoop environments, such as Elastic MapReduce.
Log output from an example run is stored in a gist.
The first 10 lines in the results would be:
to 5 a 5 of 4 monday 4 is 4 in 4 i 4 the 3 child 3 we 2
The same work performed with Linux command line utilities would be:
> cat data/days.txt | tr [A-Z] [a-z] | tr " " \\n | sort | uniq -c | tr -s " " | cut -f 2,3 -d" " | sort -nr | awk -F'[ ]' '{ printf "%s\t%s\n", $2, $1 }'
These two different approaches use roughly the same length of command lines. However, the Multitool approach is stated in terms of set operations used in database theory, and is much closer conceptually to what’s happening in MapReduce. When you need to think in terms of data processing with MapReduce, then Multitool provides a simpler approach. It also scales as big as your Hadoop cluster.
We can look at this differently by adding another Multitool parameter --dot=flow.dot
to generate a file flow.dot
,
which depicts the MapReduce workflow as a directed acyclic graph or DAG
.
> multitool --dot=flow.dot \ source=data/days.txt \ expr="\$0.toLowerCase()" \ gen gen.delim=" " \ group=0 \ count \ group=1 \ group.secondary=0 \ group.secondary.reverse=true \ sink=output/ sink.part=1
The generated graph (in the center column of the following diagram) shows how this workflow is built out of two MapReduce jobs connected in a flow:
The first mapper phase loads from days.txt
and processes the text. It splits each line on the spaces, converts the
words to lowercase, then emits those words as tuples for the following reducer. The first reducer phase performs a
group by
to count the occurrence of each word. The second mapper phase simply loads this input (an identity
mapper),
and the second reducer phase sorts the results in decreasing order based on count.
Back to the point about learning to program MapReduce efficiently, let’s examine each of the Multitool commands which were used:
> multitool source=data/days.txt \ expr="\$0.toLowerCase()" \ gen \ gen.delim=" " \ group=0 count \ group=1 \ group.secondary=0 \ group.secondary.reverse=true \ sink=output/ sink.part=1
The source argument specifies that the input should be read from the file days.txt
for the initial map phase. That is
called a tap in Cascading.
The expr
argument applies a function to the data, in this case a Java String function to convert the data to lower case.
The two gen
arguments split the input text using a space delimiter, where the resulting stream of words is used for the
input tuples to the rest of the flow.
The next two arguments, group=0
and count
specify that fields emitted by the regular expression for the preceding
gen
should be grouped and counted according to the first column, i.e. count the number of occurrences of each word in
the stream. This will produce two columns: words and counts. Note that a group must be used before a count argument.
The next set of group arguments cause the output to be sorted in descending order, based on the counts.
Finally, the two sink arguments write the output to the “output/” directory, where “sink.part=1” means that all output should be consolidated into a single file.
Example 3: Join
Next, let’s join a view of the data in days.txt
with a view of the data in another file called rhymes.txt
which
contains the following text:
Hey diddle diddle! The cat and the fiddle the cow jumped over the moon Humpty Dumpty sat on a wall, Humpty Dumpty had a great fall. All the King's horses, and all the King's men cannot put Humpty Dumpty together again. Jack be nimble, Jack be quick, Jack jump over the candlestick. Hickory, dickory dock! The mouse ran up the clock. The clock struck one, and down her un, hickory, dickory, dock! If you sneeze on Monday, you sneeze for danger; sneeze on a Tuesday, kiss a stranger.
For this example, we take two flows, one based on days.txt
which we label as lhs
and another based on
rhymes.text
which we will label as “rhs”, apply “unique” to both, and then join them to produce the output in a
directory output/join/
:
> multitool source=data/days.txt \ source.name=lhs \ expr="\$0.toLowerCase()" gen gen.delim="[^a-z]+" \ group=0 unique \ source=data/rhymes.txt source.name=rhs \ expr="\$0.toLowerCase()" gen gen.delim="[^a-z]+" \ group=0 unique \ join join.lhs=lhs join.rhs=rhs \ cut=0 \ sink=output/join sink.part=1
The results provide the intersection of words common to both input files:
a and be if monday on s the tuesday
Since this workflow is a graph and not a linear pipeline, this example becomes more complex to state as a pipe of Unix/Linux command line utilities. We could use temporary files, and multiple command lines. However, keep in mind that the Multitool example only had two flows being joined, while it could have many flows joined in a complex DAG. That’s where the power of Cascading really begins to shine.
CLI Options Reference
first tap must be a source
and last tap must be a sink
--selfupdate |
Installs the latest version of Multitool (only available in the shell wrapper) |
-h/--help |
show this help text |
--markdown |
generate help text as GitHub Flavored Markdown |
--appname=name |
sets cascading application name |
--tags=comma,separated |
sets cascading application tags, comma separated |
--platform=name |
name of the cascading platform to use |
--version |
prints the version and exits |
--dot=filename |
write a plan DOT file, then exit |
sink |
an url to output path |
sink.select |
fields to sink |
sink.replace |
set true if output should be overwritten |
sink.compress |
compression: enable, disable, or default |
sink.writeheader |
set true to write field names as the first line |
sink.delim |
delimiter used to separate fields |
sink.seqfile |
write to a sequence file instead of text; writeheader, delim, and compress are ignored |
source |
an url to input data |
source.name |
name of this source, required if more than one |
source.skipheader |
set true if the first line should be skipped |
source.hasheader |
set true if the first line should be used for field names |
source.delim |
delimiter used to separate fields |
source.seqfile |
read from a sequence file instead of text; specify N fields, or |
gen |
split the first field, and return the given result fields as new tuples |
gen.delim |
regex delimiter, default: |
unique |
return the first value in each grouping |
reject |
regex, matches are discarded. all fields are matched unless args is specified |
reject.args |
fields to match against |
count |
count the number of values in the grouping |
select |
regex, matches are kept. matches against all fields unless args is given |
select.args |
fields to match against |
sexpr |
use java expression as filter, e.g. $0 != null |
sexpr.args |
the fields to use as arguments |
sum |
sum the values in the grouping |
pgen |
parse the first field with given regex, return as new tuples |
expr |
use java expression as function, e.g. $0.toLowerCase() |
expr.args |
the fields to use as arguments |
debug |
print tuples to stdout of task jvm |
debug.prefix |
a value to distinguish which branch debug output is coming from |
join |
what fields to join and group on, grouped fields are sorted |
join.lhs |
source name of the lhs of the join |
join.lhs.group |
lhs fields to group on, default FIRST |
join.rhs |
source name of the rhs of the join |
join.rhs.group |
rhs fields to group on, default FIRST |
join.joiner |
join type: inner, outer, left, right |
join.name |
branch name |
discard |
narrow the stream removing the given fields. 0 for first, -1 for last |
concat |
join the given fields, will join ALL by default |
concat.delim |
delimiter, default: |
replace |
apply replace the regex |
replace.replace |
replacement string |
replace.replaceAll |
true if pattern should be applied more than once |
filename |
include the filename from which the current value was found (not supported on the local platform) |
filename.append |
append the filename to the record (not supported on the local platform) |
filename.only |
only return the filename (not supported on the local platform) |
retain |
narrow the stream to the given fields. 0 for first, -1 for last |
cut |
split the first field, and return the given fields, or all fields. 0 for first, -1 for last |
cut.delim |
regex delimiter, default: |
parse |
parse the first field with given regex |
parse.groups |
regex groups, comma delimited |
group |
what fields to group/sort on, grouped fields are sorted |
group.secondary |
fields to secondary sort on |
group.secondary.reverse |
set true to reverse secondary sort |