Cascading 3.0 User Guide - Custom Taps and Schemes

Custom Taps and Schemes

Introduction

Cascading is designed to be easily configured and enhanced by developers. In addition to creating custom Operations, developers can create custom Tap and Scheme classes that let applications connect to external systems or read/write data to proprietary formats.

A Tap represents something physical, like a file or a database table. Accordingly, Tap implementations are responsible for life-cycle issues around the resource they represent, such as tests for resource existence or deletion of a resource (e.g., dropping a remote SQL table).

A Scheme represents a format or representation — such as a text format for a file, the columns in a table, etc. Schemes are used to convert between the source data’s native format and a cascading.tuple.Tuple instance.

Creating custom taps and schemes is an involved process and requires knowledge of the underlying platform. Not only do Tap and Scheme classes manage the access to a resource or the format of the resource, they must also selectively hide details of the underlying platform to retain platform independence.

For examples of how to implement a custom Tap and Scheme for different platforms, see Cascading Extensions for links to source code of similiar extensions.

Custom Taps

All custom Tap classes must subclass the cascading.tap.Tap abstract class and implement the required methods. The method getIdentifier() must return a String that uniquely identifies the resource the Tap instance is managing. Any two Tap instances with the same fully-qualified identifier value are considered equal.

A Tap can set any custom properties the underlying platform requires, via the methods sourceConfInit() (for when it is used as a source tap) and sinkConfInit() (for when it is used as a sink tap). These two methods may be called more than once with new configuration objects and should be idempotent.

Tuples are always read (sourced) from the openForRead() method via a TupleEntryIterator — i.e., openForRead() is always called in the same process that reads the data. The Tap returns a TupleEntryIterator that is responsible for iterating across the resource, returning a TupleEntry instance (and Tuple instance) for each "record" in the resource. TupleEntryIterator.close() is always called when no more entries will be read. See TupleEntrySchemeIterator in the Javadoc for more information.

On some platforms, openForRead() is called with a pre-instantiated Input type. Typically this Input type should be used instead of instantiating a new instance of the appropriate type.

Similarly, when writing (sinking) Tuple data, a Tap is always used to sink data from the openForWrite() method via the TupleEntryCollector. Here again, openForWrite() is always called in the process in which data will be written. The Tap returns a TupleEntryCollector that will accept and store any number of TupleEntry or Tuple instances for each record that is processed or created by a given Flow. TupleEntryCollector.close() is always called when no more entries will be written. See TupleEntrySchemeCollector in the Javadoc for more information.

Again, on some platforms, openForWrite() will be called with a pre-instantiated Output type. Typically this Output type should be used instead of instantiating a new instance of the appropriate type.

Both the TupleEntrySchemeIterator and TupleEntrySchemeCollector should be used to hold any state or resources necessary to communicate with any remote services. For example, when connecting to a SQL database, any JDBC drivers should be created on the constructor and cleaned up on close().

Note that the Tap is not responsible for reading or writing data to the Input or Output type. This is delegated to the Scheme passed on the constructor of the Tap. Consequently, the Scheme is responsible for configuring the Input and Output types it will be reading and writing.

Important: Consider how a Tap and Scheme implementation that is not based on HDFS should behave in the face of a failure. The Tap class has a few methods to help cleanup after a failed Flow.

Custom Schemes

All custom Scheme classes must subclass the cascading.scheme.Scheme abstract class and implement the required methods.

A Scheme is ultimately responsible for sourcing and sinking Tuples of data. Consequently a Scheme must be programmed to automatically present the correct Fields during sourcing and to accept specific Fields during sinking. Therefore, the constructors on the base Scheme type must be set with the source and sink Fields.

TextLine Scheme configures a Scheme so that it sources different Fields than it sinks. On the other hand, the TextDelimited Scheme forces the source and sink Fields to be the same.

The retrieveSourceFields() and retrieveSinkFields() methods, as well as the TextDelimited class, allow a custom Scheme to fetch source and sink Fields immediately before the Cascading planner is invoked. For example, the Scheme could fetch Fields from the header of a file. Also, the presentSourceFields() and presentSinkFields() methods, as well as the TextDelimited class, notify the Scheme of the Fields that the planner expects the Scheme to handle. For example, Fields could to write the field names as a header.

A Scheme can set any custom properties that the underlying platform requires, via the sourceConfInit() and sinkConfInit() methods. These methods may be called more than once with new configuration objects and should be idempotent.

A Scheme is always sourced via the source() method. The sink() method must be called to sink data to the Scheme. The sequence of methods for a Scheme is the following:

  1. The sourcePrepare() or sinkPrepare() method is called.

  2. The source() or sink() method is called.

  3. After all values have been read or written, the sourceCleanup() or sinkCleanup() method is called.

The *Prepare() methods allow a Scheme to initialize any state necessary — for example, to create a new java.util.regex.Matcher instance for use against all record reads. Conversely, the *Cleanup() methods allow for clearing up any resources. These methods are always called in the same process space as their associated source() and sink() calls. In the case of a clustered platform, this is usually on the cluster side.

Calls to *ConfInit() are usually on the client side.

Schemes are also responsible for converting canonical data types to supported types for the system that the Scheme is wrapping, and vice versa. See the section on Field Typing for more information.

Taps with File and Nonfile Resources

Tap classes that are meant to manage resources that are files should implement the cascading.tap.type.FileType interface. This interface allows a a user to directly access a Tap to manipulate the remote filesystem or to retrieve child resources (files in subdirectories).

Nonfile resources should not be managed by a Tap that implements the cascading.tap.type.FileType interface. For example, a "JDBCTap" implementation does not treat tables as directories with child files.

Tap Life-Cycle Methods

The Tap interface has a number of life-cycle methods that allow the underlying resource to be managed by the Tap instance.

Existence

These methods test for existence or create the resource managed by the Tap.

  • resourceExists() - called frequently; should be idempotent

  • deleteResource() - only called if SinkMode is SinkMode.REPLACE, or from within Cascading and the resource is stale.

Initialization

These methods allow for client-side initialization of a remote resource that will be shared across parallelized instances on the cluster side. For example, creating a database table if it does not exist so that data may be written to it from the cluster. Note this is not the same as initializing a JDBC Driver on the client side and sharing it with the cluster; Driver initialization must happen with openForWrite() or openForRead().

  • prepareResourceForRead() - initialize any shared remote resource for reading

  • prepareResourceForWrite() - initialize any shared remote resource for writing

Reading and Writing

These methods may be called on the client or cluster side. Either method can be invoked by Cascading during execution or by a developer wishing direct access to the underlying data managed by the Tap instance. One of these methods must be called to read or write data.

These methods are described in more detail above.

  • openForRead()

  • openForWrite()

Transactional

These methods notify a given Tap instance if the parent Flow was successful or if there was a failure. They are called on the client side so that any remote shared resources can be cleaned up or any changes written can be committed/persisted. They are only invoked if the Tap instance is used as a sink.

  • commitResource() - commit the saved values written to the resource

  • rollbackResource() - revert the resource to its original state