Cascading for the Impatient, Part 6
Part 6 - TDD at scale
This part of the Impatient series. extends the TF-IDF app to show best practices for test-driven development (TDD) at scale. We will incorporate unit tests into the build (should have done so sooner), plus show how to leverage TDD features which are unique to Cascading: checkpoints, traps, assertions, etc. These features are based on using Checkpoint, Debug, and AssertMatches.
We will keep building on this example to show how to leverage 'local mode'.
Theory
At first glance, the notion of test-driven development (TDD) might seem a bit antithetical in the context of Big Data. After all, TDD is all about short development cycles, writing automated test cases which are intended to fail, and lots of refactoring. Those descriptions would not appear to fit with batch jobs involving terabytes of data and huge clusters running apps that take days to complete.
Stated in a different way, according to Kent Beck, TDD encourages simple
designs and inspires confidence.'' That statement does actually fit well with
Cascading. The API is intended to provide simple design patterns for working
with data - GroupBy, Join, Count, Regex, Filter - so that the need for writing
custom functions becomes relatively rare. That speaks to encouraging simple
designs directly. The practice in Cascading of modeling business process and
orchestrating MapReduce workflows - that speaks to
inspiring confidence'' in a
big way.
So now we will let the cat out of the bag for a little secret… Working with unstructured data at scale has been shown to be quite valuable (Google, Amazon, LinkedIn, Twitter, etc.) however most of the 'heavy lifting' which we perform in MapReduce workflows is essentially cleaning up data. DJ Patil explained this point quite eloquently in Data Jujitsu: ``It’s impossible to overstress this: 80% of the work in any data project is in cleaning the data… Work done up front in getting clean data will be amply repaid over the course of the project.''
Cleaning up the data allows for subsequent use of sampling techniques, dimensional reduction, and other practices which help alleviate some of the bottlenecks which might otherwise be encountered in Big Data. In other words, there are great use cases for formalisms which help demonstrate that ``dirty" data at scale has been cleaned up. Those turn out to be quite valuable in practice.
However, TDD practices tend to be based on unit tests or mocks… how does one write a quick unit test for a Godzilla-sized dataset?
The short answer is: you don’t. However, you can greatly reduce the need for writing unit test coverage by limiting the amount of custom code required. Hopefully we have shown that aspect of Cascading by now. Beyond that aspect, you can use sampling techniques to quantify the confidence for an app running correctly. You can also define system tests at scale in relatively simple ways. Furthermore, you can define contingencies for what to do when assumptions fail… as they inevitably do, at scale.
Let’s discuss sampling… generally speaking, large MapReduce workflows are relatively opaque processes which are difficult to observe. However, Cascading provides two techniques for observing portions of a workflow. One very simple approach is to insert a Debug into a pipe, to see the tuple values passing through a particular part of a workflow. Debug output goes to the log instead of a file, but it can be turned off, e.g., with a command line option. If the data is large, one can use a Sample filter to sample the tuple values which get written to the log.
Another approach is to use a Checkpoint, which forces intermediate data to be written out to HDFS. This may also become important for performance reasons, i.e., forcing results to disk to avoid recomputing – e.g., when there are multiple uses for the output of a pipe downstream such as with the right side of a HashJoin. Sampling may be performed either before (like with Debug) or after the data gets persisted to HDFS.
Next, let’s talk about system tests. Cascading include support for
stream
assertions. These provide mechanisms for asserting that the values in a tuple
stream meet certain criteria - similar to the assert
keyword in Java, or an
assertNotNull()
in a unit test. We can assert patterns strictly as unit tests
during development, then run testing against regression data. For performance
reasons, we might use command line options to turn off assertions in production.
Or keep them, if a use case requires that level of guarantees.
Lastly, what to do when assumptions fail? One lesson of working with data at scale is that the best assumptions will inevitably fail. Unexpected things happen, and 80% of the work will be cleaning up problems. Cascading defines failure traps which capture data that causes an Operation to fail, e.g., throw an Exception. For example, perhaps 99% of the cases in your log files can be rolled up into a set of standard reports… but 1% requires manual review. Great, process the 99% which work and shunt the 1% failure cases into a special file, marked for manual review. Keep in mind, however, that traps are intended for handling exceptional cases. If you know in advance how to categorize good vs. bad data, then use a filter instead of a trap.
Meanwhile, a conceptual diagram for this implementation of TF-IDF in Cascading is shown as:
Source
All code and data is in the part6
subdirectory of the github project.
Let’s add a unit test and show how that works into this example. In the Gradle
build script build.gradle
we need to modify the compile task to include JUnit
and other testing dependencies:
testCompile group: 'cascading', name: 'cascading-platform', version: "3.0.0", classifier: 'tests'
testCompile group: 'cascading', name: 'cascading-hadoop2-mr1', version: "3.0.0", classifier: 'tests'
testCompile group: 'cascading', name: 'cascading-core', version: "3.0.0", classifier: 'tests'
testCompile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: "2.4.1"
testCompile group: 'org.apache.hadoop', name: 'hadoop-minicluster', version: "2.4.1"
testCompile group: 'junit', name: 'junit', version: '4.11'
testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.5'
testCompile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.5'
Then we will add a test task:
test {
include 'impatient/**'
//makes the standard streams (err and out) visible at console when running tests
testLogging.showStandardStreams = true
//listening to test execution events
beforeTest { descriptor ->
logger.lifecycle("Running test: " + descriptor)
}
onOutput { descriptor, event ->
logger.lifecycle("Test: " + descriptor + " produced standard out/err: " + event.message )
}
}
A little restructuring of the source directories is required see our GitHub
code repo, where it iss all set up property. Then we add a unit test for our
custom function to 'scrub' tokens, which was created in Part 3. This goes into
a new class ScrubTest.java
:
public class ScrubTest
{
@Test
public void testMain() throws Exception
{
ScrubTest tester = new ScrubTest();
Fields fieldDeclaration = new Fields( "doc_id", "token" );
ScrubFunction scrub = new ScrubFunction( fieldDeclaration );
assertEquals( "Scrub", "foo bar", scrub.scrubText( "FoO BAR " ) );
}
}
This is a particularly good place for a unit test. Scrubbing tokens is a likely point at which edge cases get encountered at scale. In practice, you’d probably want to define even more unit tests.
Next, going back to the Main.java
module, let’s add sink taps for writing out
trapped data and checkpointed data:
String trapPath = args[ 4 ];
String checkPath = args[ 5 ];
Tap trapTap = new Hfs( new TextDelimited( true, "\t" ), trapPath );
Tap checkTap = new Hfs( new TextDelimited( true, "\t" ), checkPath );
Next we’ll modify the head of the existing pipe assembly for TF-IDF to incorporate a Stream Assertion. We use an AssertMatches to define the expected pattern for input data. Then we apply AssertionLevel.STRICT to force validation of the data:
// use a stream assertion to validate the input data
Pipe docPipe = new Pipe( "token" );
AssertMatches assertMatches = new AssertMatches( "doc\\d+\\s.*" );
docPipe = new Each( docPipe, AssertionLevel.STRICT, assertMatches );
Next we’ll add a Debug
and DebugLevel.VERBOSE
to the D branch, to trace
the tuple values in the flow there:
// example use of a debug, to observe tuple stream; turn off below
dfPipe = new Each( dfPipe, DebugLevel.VERBOSE, new Debug( true ) );
Next we’ll add a Checkpoint after the join of the DF and D branches. That forces the tuples at this point in the workflow to be persisted to HDFS:
// create a checkpoint, to observe the intermediate data in DF stream
Checkpoint idfCheck = new Checkpoint( "checkpoint", idfPipe );
Pipe tfidfPipe = new CoGroup( tfPipe, tf_token, idfCheck, df_token );
Next we have a relatively more complex set of taps to connect in the FlowDef
,
to include output data for TDD-related features:
// connect the taps, pipes, traps, checkpoints, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
.setName( "tfidf" )
.addSource( docPipe, docTap )
.addSource( stopPipe, stopTap )
.addTailSink( tfidfPipe, tfidfTap )
.addTailSink( wcPipe, wcTap )
.addTrap( docPipe, trapTap )
.addCheckpoint( idfCheck, checkTap );
Last, we’ll specify the verbosity level for the debug trace, and the strictness level for the stream assertion:
// set to DebugLevel.VERBOSE for trace, or DebugLevel.NONE in production
flowDef.setDebugLevel( DebugLevel.VERBOSE );
// set to AssertionLevel.STRICT for all assertions, or AssertionLevel.NONE in production
flowDef.setAssertionLevel( AssertionLevel.STRICT );
Modify the Main
method to make those changes, then build a JAR file. You should
be good to go. For those keeping score, the resulting physical plan in
MapReduce for Part 6 now uses twelve mappers and nine reducers. In other words,
we added one mapper as the overhead for gaining lots of test features.
The diagram for the Cascading flow will be in the dot/
subdirectory after the
app runs. Here we have annotated it to show where the mapper and reducer phases
are running, and also the sections which were added since Part 5:
Build
To build the sample app from the command line use:
gradle clean jar
Run
Running this version is as easy as:
rm -rf output hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc data/en.stop output/tfidf output/trap output/check
The output log should include a warning, based on the stream assertion, which looks like this:
12/08/06 14:15:07 WARN stream.TrapHandler: exception trap on branch: 'token', for fields: [{2}:'doc_id', 'text'] tuple: ['zoink', 'null'] cascading.operation.AssertionException: argument tuple: ['zoink', 'null'] did not match: doc\d+\s.* at cascading.operation.assertion.BaseAssertion.throwFail(BaseAssertion.java:107) at cascading.operation.assertion.AssertMatches.doAssert(AssertMatches.java:84) at cascading.flow.stream.ValueAssertionEachStage.receive(ValueAssertionEachStage.java:57) at cascading.flow.stream.ValueAssertionEachStage.receive(ValueAssertionEachStage.java:33) at cascading.flow.stream.SourceStage.map(SourceStage.java:102) at cascading.flow.stream.SourceStage.run(SourceStage.java:58) at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:124) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
That is expected behavior. We directed the API to show warning when stream assertions failed. The data which caused this warning will get trapped.
Not too far after that point in the log, there should be debug output which looks like the following:
12/08/06 14:15:46 INFO hadoop.FlowReducer: sinking to: TempHfs["SequenceFile[ ['df_count', 'df_token', 'lhs_join']]"][DF/93669/] ['df_count', 'df_token', 'lhs_join'] ['1', 'air', '1'] ['3', 'area', '1'] ['1', 'australia', '1'] ['1', 'broken', '1']
plus several more lines. That is the result of our debug trace.
Output text gets stored in the partition file output/tfidf
which you can then
verify:
more output/tfidf/part-00000 more output/trap/part-m-00001-00000 more output/check/part-00000
Notice the data tuple output/trap:
zoink null
That did not match the regex doc\\d+\\s.*
which was specified by the stream
assertion.
Here’s a log file from our run of the sample app, part 6. If your run looks terribly different, something is probably not set up correctly.
To run this same app on the Amazon AWS
Elastic MapReduce service, based on their command line interface, use the
following commands. Be sure to replace temp.cascading.org
with your own S3
bucket name:
s3cmd put build/libs/impatient.jar s3://temp.cascading.org/impatient/part6.jar
s3cmd put data/rain.txt s3://temp.cascading.org/impatient/
s3cmd put data/en.stop s3://temp.cascading.org/impatient/
elastic-mapreduce --create --name "TF-IDF" \
--jar s3n://temp.cascading.org/impatient/part6.jar \
--arg s3n://temp.cascading.org/impatient/rain.txt \
--arg s3n://temp.cascading.org/impatient/out/wc \
--arg s3n://temp.cascading.org/impatient/en.stop \
--arg s3n://temp.cascading.org/impatient/out/tfidf \
--arg s3n://temp.cascading.org/impatient/out/trap \
--arg s3n://temp.cascading.org/impatient/out/check
Driven
If you have not installed the Driven plugin, but would like to explore an instance of a historical run of Part 6, visit this link
That’s all folks!