3.5 Source and Sink Taps

All input data comes in from, and all output data goes out to, some instance of cascading.tap.Tap. A tap represents a data resource - such as a file on the local file system, on a Hadoop distributed file system, or on Amazon S3. A tap can be read from, which makes it a source, or written to, which makes it a sink. Or, more commonly, taps act as both sinks and sources when shared between flows.

The platform on which your application is running (Cascading local or Hadoop) determines which specific classes you can use. Details are provided in the sections below.

Schemes

If the Tap is about where the data is and how to access it, the Scheme is about what the data is and how to read it. Every Tap must have a Scheme that describes the data. Cascading provides four Scheme classes:

TextLine

TextLine reads and writes raw text files and returns tuples which, by default, contain two fields specific to the platform used. The first field is either the byte offset or line number, and the second field is the actual line of text. When written to, all Tuple values are converted to Strings delimited with the TAB character (\t). A TextLine scheme is provided for both the local and Hadoop modes.

By default TextLine uses the UTF-8 character set. This can be overridden on the appropriate TextLine constructor.

TextDelimited

TextDelimited reads and writes character-delimited files in standard formats such as CSV (comma-separated variables), TSV (tab-separated variables), and so on. When written to, all Tuple values are converted to Strings and joined with the specified character delimiter. This Scheme can optionally handle quoted values with custom quote characters. Further, TextDelimited can coerce each value to a primitive type when reading a text file. A TextDelimited scheme is provided for both the local and Hadoop modes.

By default TextDelimited uses the UTF-8 character set. This can be overridden on appropriate the TextDelimited constructor.

SequenceFile

SequenceFile is based on the Hadoop Sequence file, which is a binary format. When written to or read from, all Tuple values are saved in their native binary form. This is the most efficient file format - but be aware that the resulting files are binary and can only be read by Hadoop applications running on the Hadoop platform.

WritableSequenceFile

Like the SequenceFile Scheme, WritableSequenceFile is based on the Hadoop Sequence file, but it was designed to read and write key and/or value Hadoop Writable objects directly. This is very useful if you have sequence files created by other applications. During writing (sinking), specified key and/or value fields are serialized directly into the sequence file. During reading (sourcing), the key and/or value objects are deserialized and wrapped in a Cascading Tuple object and passed to the downstream pipe assembly. This class is only available when running on the Hadoop platform.

There's a key difference between the TextLine and SequenceFile schemes. With the SequenceFile scheme, data is stored as binary tuples, which can be read without having to be parsed. But with the TextLine option, Cascading must parse each line into a Tuple before processing it, causing a performance hit.

Platform-specific implementation details

Depending on which platform you use (Cascading local or Hadoop), the classes you use to specify schemes will vary. Platform-specific details for each standard scheme are shown below.

Table 3.2. Platform-specific tap scheme classes

DescriptionCascading local platform Hadoop platform
Package Namecascading.scheme.localcascading.scheme.hadoop
Read lines of textTextLineTextLine
Read delimited text (CSV, TSV, etc)TextDelimitedTextDelimited
Cascading proprietary efficient binary SequenceFile
External Hadoop application binary (custom Writable type) WritableSequenceFile

Sequence File Compression

For best performance when running on the Hadoop platform, enable Sequence File Compression in the Hadoop property settings - either block or record-based compression. Refer to the Hadoop documentation for the available properties and compression types.

Taps

The following sample code creates a new Hadoop FileSystem Tap that can read and write raw text files. Since only one field name is provided, the "offset" field is discarded, resulting in an input tuple stream with only "line" values.

Example 3.10. Creating a new tap

Tap tap = new Hfs( new TextLine( new Fields( "line" ) ), path );

Here are the most commonly-used tap types:

FileTap

The cascading.tap.local.FileTap tap is used with the Cascading local platform to access files on the local file system.

Hfs

The cascading.tap.hadoop.Hfs tap uses the current Hadoop default file system, when running on the Hadoop platform.

If Hadoop is configured for "Hadoop local mode" (not to be confused with Cascading local mode), its default file system is the local file system. If configured for distributed mode, its default file system is typically the Hadoop distributed file system.

Note that Hadoop can be forced to use an external file system by specifying a prefix to the URL passed into a new Hfs tap. For instance, using "s3://somebucket/path" tells Hadoop to use the S3 FileSystem implementation to access files in an Amazon S3 bucket. More information on this can be found in the Javadoc.

Also provided are six utility taps:

MultiSourceTap

The cascading.tap.MultiSourceTap is used to tie multiple tap instances into a single tap for use as an input source. The only restriction is that all the tap instances passed to a new MultiSourceTap share the same Scheme classes (not necessarily the same Scheme instance).

MultiSinkTap

The cascading.tap.MultiSinkTap is used to tie multiple tap instances into a single tap for use as output sinks. At runtime, for every Tuple output by the pipe assembly, each child tap to the MultiSinkTap will sink the Tuple.

PartitionTap

The cascading.tap.hadoop.PartitionTap and cascading.tap.local.PartitionTap are used to sink tuples into directory paths based on the values in the Tuple. More can be read below in Partition Taps. Note the TemplateTap has been deprecated in favor of the PartitionTap.

GlobHfs

The cascading.tap.hadoop.GlobHfs tap accepts Hadoop style "file globbing" expression patterns. This allows for multiple paths to be used as a single source, where all paths match the given pattern. This tap is only available when running on the Hadoop platform.

DecoratorTap

The cascading.tap.DecoratorTap is a utility helper for wrapping an existing Tap with new functionality, via sub-class, and/or adding 'meta-data' to a Tap instance via the generic MetaInfo instance field. Further, on the Hadoop platform, planner created intermediate and Checkpoint Taps can be wrapped by a DecoratorTap implementation by the Cascading Planner. See cascading.flow.FlowConnectorProps for details.

DistCacheTap

The cascading.tap.hadoop.DistCacheTap is a sub-class of the cascading.tap.DecoratorTap that can wrap an cascading.tap.hadoop.Hfs instance. It allows for writing to HDFS, but reading from the Hadoop Distributed Cache under the write circumstances, specifically if the Tap is being read into the small side of a cascading.pipe.HashJoin.

Platform-specific implementation details

Depending on which platform you use (Cascading local or Hadoop), the classes you use to specify file systems will vary. Platform-specific details for each standard tap type are shown below.

Table 3.3. Platform-specific details for setting file system

DescriptionEither platform Cascading local platform Hadoop platform
Package Namecascading.tapcascading.tap.localcascading.tap.hadoop
File access FileTapHfs
Multiple Taps as single sourceMultiSourceTap  
Multiple Taps as single sinkMultiSinkTap  
Bin/Partition data into multiple files PartitionTapPartitionTap
Pattern match multiple files/dirs  GlobHfs
Wrapping a Tap with MetaData / Decorating intra-Flow TapsDecoratorTap  
Reading from the Hadoop Distributed Cache  DistCacheTap

Copyright © 2007-2012 Concurrent, Inc. All Rights Reserved.