3.3 Source and Sink Taps

All input data comes from, and all output data feeds to, a cascading.tap.Tap instance.

A Tap represents a resource like a data file on the local file system, on a Hadoop distributed file system, or even on Amazon S3. Taps can be read from, which makes it a "source", or written to, which makes it a "sink". Or, more commonly, Taps can act as both sinks and sources when shared between Flows.

All Taps must have a Scheme associated with them. If the Tap is about where the data is, and how to get it, the Scheme is about what the data is. Cascading provides three Scheme classes, TextLine,TextDelimited, SequenceFile, and WritableSequenceFile.

TextLine

TextLine reads and writes raw text files and returns Tuples with two field names by default, "offset" and "line". These values are inherited from Hadoop. When written to, all Tuple values are converted to Strings and joined with the TAB character (\t).

TextDelimited

TextDelimited reads and writes character delimited files (csv, tsv, etc). When written to, all Tuple values are converted to Strings and joined with the given character delimiter. This Scheme can optionally handle quoted values with custom quote characters. Further, TextDelimited can coerce each value to a primitive type.

SequenceFile

SequenceFile is based on the Hadoop Sequence file, which is a binary format. When written or read from, all Tuple values are saved in their native binary form. This is the most efficient file format, but being binary, the result files can only be read by Hadoop applications.

WritableSequenceFile

WritableSequenceFile is based on the Hadoop Sequence file, like the SequenceFile Scheme, except it was designed to read and write key and/or value Hadoop Writable objects directly. This is very useful if you have sequence files created by other applications. During writing (sinking) specified key and/or value fields will be serialized directly into the sequence file. During reading (sourcing) the key and/or value objects will be deserialized and wrapped in a Cascading Tuple and passed to the downstream pipe assembly.

The fundamental difference behind TextLine and SequenceFile schemes is that tuples stored in the SequenceFile remain tuples, so when read, they do not need to be parsed. So a typical Cascading application will read raw text files, and parse each line into a Tuple for processing. The final Tuples are saved via the SequenceFile scheme so future applications can just read the file directly into Tuple instances without the parsing step.

It is advised for performance reasons, sequence file compression be enabled via the Hadoop properties. Either block or record based compression can be enabled. See the Hadoop documentation for the available properties and compression types available.

Example 3.9. Creating a new Tap

Tap tap = new Hfs( new TextLine( new Fields( "line" ) ), path );

The above example creates a new Hadoop FileSystem Tap that can read/write raw text files. Since only one field name was provided, the "offset" field is discarded, resulting in an input tuple stream with only "line" values.

The three most common Tap classes used are, Hfs, Dfs, and Lfs. The MultiSourceTap, MultiSinkTap, and TemplateTap are utility Taps.

Lfs

The cascading.tap.Lfs Tap is used to reference local files. Local files are files on the same machine your Cascading application is started. Even if a remote Hadoop cluster is configured, if a Lfs Tap is used as either a source or sink in a Flow, Cascading will be forced to run in "local mode" and not on the cluster. This is useful when creating applications to read local files and import them into the Hadoop distributed file system.

Dfs

The cascading.tap.Dfs Tap is used to reference files on the Hadoop distributed file system.

Hfs

The cascading.tap.Hfs Tap uses the current Hadoop default file system. If Hadoop is configured for "local mode" its default file system will be the local file system. If configured as a cluster, the default file system is likely the Hadoop distributed file system. The Hfs is convenient when writing Cascading applications that may or may not be run on a cluster. Lhs and Dfs subclass the Hfs Tap.

MultiSourceTap

The cascading.tap.MultiSourceTap is used to tie multiple Tap instances into a single Tap for use as an input source. The only restriction is that all the Tap instances passed to a new MultiSourceTap share the same Scheme classes (not necessarily the same Scheme instance).

MultiSinkTap

The cascading.tap.MultiSinkTap is used to tie multiple Tap instances into a single Tap for use as an output sink. During runtime, for every Tuple output by the pipe assembly each child tap to the MultiSinkTap will sink the Tuple.

TemplateTap

The cascading.tap.TemplateTap is used to sink tuples into directory paths based on the values in the Tuple. More can be read below inTemplate Taps.

GlobHfs

The cascading.tap.GlobHfs Tap accepts Hadoop style 'file globbing' expression patterns. This allows for multiple paths to be used as a single source, where all paths match the given pattern.

Keep in mind Hadoop cannot source data from directories with nested sub-directories, and it cannot write to directories that already exist. But you can simply point the Hfs Tap to a directory of data files and they all will be used as input, no need to enumate each individual file into a MultiSourceTap.

To get around existing directories, the Hadoop related Taps allow for a SinkMode value to be set when constructed.

Example 3.10. Overwriting An Existing Resource

Tap tap = new Hfs( new TextLine( new Fields( "line" ) ), path, SinkMode.REPLACE );

Here are all the modes available by the built-in Tap types.

SinkMode.KEEP

This is the default behavior. If the resource exists, attempting to write to it will fail.

SinkMode.REPLACE

This allows Cascading to delete the file immediately after the Flow is started.

SinkMode.UPDATE

Allows for new Tap types that have the concept of update or append. For example, updating records in a database. It is up to the Tap to decide how to implement its "update" semantics. When Cascading sees the update mode, it knows not to attempt to delete the resource first or to not fail because it already exists.

Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.