The following diagram shows the use of Failure Traps in a pipe assembly.
Failure Traps are similar to tap sinks (as opposed to tap sources) in that they allow data to be stored. The difference is that Tap sinks are bound to a particular tail pipe in a pipe assembly and are the primary outlet of a branch in a pipe assembly. Traps can be bound to intermediate pipe assembly branches - just like Stream Assertions - yet they only capture data that causes an Operation to fail (throw an Exception).
Whenever an operation fails and throws an exception, if there is an associated trap, the offending Tuple is saved to the resource specified by the trap Tap. This allows the job to continue processing without any data loss.
By design, clusters are hardware fault-tolerant - lose a node, and the cluster continues working. But fault tolerance for software is a little different. Failure Traps provide a means for the processing to continue without losing track of the data that caused the fault. For high fidelity applications, this may not be very useful, since you likely will want any errors during processing to cause the application to stop. But for low fidelity applications such as webpage indexing, where skipping a page or two out of a few million is acceptable, this can dramatically improve processing reliability.
Example 8.7. Setting Traps
// ...some useful pipes here
// name this pipe assembly segment
assembly = new Pipe( "assertions", assembly );
AssertNotNull notNull = new AssertNotNull();
assembly = new Each( assembly, AssertionLevel.STRICT, notNull );
AssertSizeEquals equals = new AssertSizeEquals( 6 );
assembly = new Each( assembly, AssertionLevel.STRICT, equals );
AssertMatchesAll matchesAll = new AssertMatchesAll( "(GET|HEAD|POST)" );
Fields method = new Fields( "method" );
assembly =
new Each( assembly, method, AssertionLevel.STRICT, matchesAll );
// ...some more useful pipes here
FlowDef flowDef = new FlowDef();
flowDef
.setName( "log-parser" )
.addSource( "logs", source )
.addTailSink( assembly, sink );
// set the trap on the "assertions" branch
flowDef
.addTrap( "assertions", trap );
FlowConnector flowConnector = new HadoopFlowConnector();
Flow flow =
flowConnector.connect( flowDef );
The example above binds a trap tap to the pipe assembly segment
named "assertions". Note how we can name branches and segments by using
a single Pipe
instance, and that the naming
applies to all subsequent Pipe
instances.
Traps are for exceptional cases, in the same way that Java Exception handling is. Traps are not intended for application flow control, and not a means to filter some data into other locations. Applications that need to filter out bad data should do so explicitly, using filters. For more on this, see Handling Good and Bad Data.
Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.