Cascading 3.2 User Guide - The Apache Hadoop Platform

1. Introduction

1.1. What Is Cascading?

2. Diving into the APIs

2.1. Anatomy of a Word-Count Application

3. Cascading Basic Concepts

3.1. Terminology

3.3. Pipes

3.4. Platforms

3.6. Sink Modes

3.7. Flows

4. Tuple Fields

4.1. Field Sets

5. Pipe Assemblies

5.1. Each and Every Pipes

5.2. Merge

5.3. GroupBy

5.4. CoGroup

5.5. HashJoin

6. Flows

6.1. Creating Flows from Pipe Assemblies

7. Cascades

7.1. Creating a Cascade

8. Configuring

8.1. Introduction

9. Local Platform

9.1. Building an Application

10. The Apache Hadoop Platforms

10.1. What is Apache Hadoop?

11. Apache Hadoop MapReduce Platform

11.1. Configuring Applications

11.3. Building

12. Apache Tez Platform

12.1. Configuring Applications

12.2. Building

13. Using and Developing Operations

13.1. Introduction

13.2. Functions

13.3. Filters

13.4. Aggregators

13.5. Buffers

14. Custom Taps and Schemes

14.1. Introduction

14.2. Custom Taps

15. Advanced Processing

15.1. SubAssemblies

16. Built-In Operations

16.1. Identity Function

16.9. Assertions

16.11. Buffers

17. Built-in SubAssemblies

17.1. Optimized Aggregations

18. Cascading Best Practices

18.1. Unit Testing

19. Extending Cascading

19.1. Scripting

20. Cookbook: Code Examples of Cascading Idioms

20.1. Tuples and Fields

20.5. API Usage

21. The Cascading Process Planner

21.1. FlowConnector

21.3. RuleRegistry

The Apache Hadoop Platforms

This chapter covers some of the operational mechanics of running an application that uses Cascading with the Hadoop platform, including building the application JAR file and configuring the operating mode.

Cascading requires that Apache Hadoop be installed and correctly configured. Hadoop is an open-source Apache project, which is freely available for download from the Hadoop website.

What is Apache Hadoop?

The Hadoop website describes the technology as "a software platform that lets one easily write and run applications that process vast amounts of data." Hadoop does this by providing a storage layer that holds vast amounts of data and an execution layer that runs an application in parallel across the cluster. The platform distributes the application load as subsets of the stored data across the cluster and coordinates the distributed processing tasks to optimize the compute resources of the environment.

The Hadoop Distributed File System (HDFS) is the storage layer, which serves as a single storage volume that is optimized for many concurrent serialized reads of large data files — where "large" might be measured in gigabytes or petabytes. However, HDFS does have limitations. For example, random access to the data is not really possible in an efficient manner. Also, Hadoop only supports a single writer for output. But this limit helps make Hadoop high-performance and reliable, in part because it allows for the data to be replicated across the cluster. The replication reduces the chance of data loss.

MapReduce is the default execution layer, which relies on a "divide-and-conquer" strategy to manage massive data sets and computing processes. Fully explaining MapReduce is beyond the scope of this document. However, the difficulty of developing real-world applications for the complex MapReduce framework is the original driving force behind the creation of Cascading.

As of Cascading 3.0, Apache Tez has become a third execution layer, or platform, option replacing MapReduce. See Hadoop 2 MapReduce vs Hadoop 2 Tez below.

Hadoop, according to its documentation, can be configured to run in three modes:

  • Stand-alone mode (i.e., on the local computer, useful for testing and debugging in an IDE)

  • Pseudo-distributed mode (i.e., on an emulated "cluster" of one computer, which generally is not useful)

  • Fully-distributed mode (on a full cluster, for staging or production purposes)

The pseudo-distributed mode does not add value for most purposes. This documentation does not explore this mode further.

Cascading itself can run in local mode or on the Hadoop platform, where Hadoop itself may be in stand-alone or distributed mode.

The primary difference between these two platforms, local or Hadoop, is that local-mode Cascading does not use Hadoop APIs and processes data in memory. In-memory processing quickens the runtime of applications, but consequently running on local mode is not as robust or scalable as running on the Hadoop platform.

Hadoop 1 MapReduce vs. Hadoop 2 MapReduce

Cascading supports both Hadoop 1.x and 2.x by providing two Java dependencies: cascading-hadoop.jar and cascading-hadoop2-mr1.jar. These dependencies can be interchanged but the hadoop2-mr1.jar introduces new APIs and deprecates older API calls where appropriate. It should be pointed out hadoop2-mr1.jar only supports MapReduce 1 API conventions. With this naming scheme new API conventions can be introduced without risk of naming collisions on dependencies.

It is extremely important to use the correct Cascading Hadoop dependencies that correspond with the cluster version to which the Cascading application is deployed. There are a number of subtle API and configuration property differences that can be difficult to diagnose.

Hadoop 2 MapReduce vs Hadoop 2 Tez

Apache Hadoop 2 introduces YARN as the resource manager for the cluster. In short, this allows a single physical cluster to contain and execute different computing foundations simultaneously. By default a Hadoop 2 installation includes an implementation of MapReduce that is API-compatible with Apache Hadoop 1.

But YARN also allows other technologies to co-exist and share the same HDFS deployment. Apache Tez is one such technology. A single Hadoop YARN cluster can now execute some applications on MapReduce and others on Tez, where both frameworks could share or pass data via HDFS. The two resource managers can run concurrently without cluster-wide reconfiguration.

Apache Tez has yet to reach a 1.0 status, but is now currently supported by Cascading via the cascading-hadoop2-tez.jar dependency.

Since both MapReduce and Tez share HDFS, both implementations share much of the same APIs provided by Cascading. This greatly simplifies porting applications from MapReduce to Tez, in most cases the changes are to the build and to one line of code to switch to the appropriate FlowConnector.

The following sections describe the common features of each supported Hadoop-based platform.

See the Apache Tez Platform documentation of this Cascading User Guide for more information on running Apache Tez.

Configuring Applications

During runtime, Hadoop must be told which application JAR file to push to the cluster. The specific details are described in the platform-specific topics of this Cascading User Guide.

But to remain platform-independent, the AppProps class can be used, which is a helper fluent API for setting application-level configuration settings. Use this class for maximum portability.

Example 1. Configuring the application JAR
Properties properties = AppProps.appProps()
  .setName( "sample-app" )
  .setVersion( "1.2.3" )
  .addTags( "deploy:prod", "team:engineering" )

  .setJarClass( Main.class ) (1)

    // ALTERNATIVELY ...

  .setJarPath( pathToJar ) (2)

  .buildProperties();

Other Hadoop-specific Props classes include:

cascading.tap.hadoop.HfsProps

Allows for setting Hadoop-specific filesystem properties, specifically properties around enabling the "combined input format" support. Combining inputs minimizes the performance penalty around processing large numbers of small files.

cascading.tuple.hadoop.TupleSerializationProps

Allows for setting Hadoop-specific serialization and deserialization properties. See Custom Types.

Building an Application

See the platform-specific topics of this Cascading User Guide for information about building an application for execution on a particular platform.

Executing an Application

Running a Cascading application is the same as running any Hadoop application.

After packaging your application into a single JAR (see Building an Application), you must use bin/hadoop on Hadoop 1, or bin/yarn on Hadoop 2, to submit the application to the cluster.

For example, to execute an application stuffed into your-application.jar, call the Hadoop 2 shell script:

Example 2. Running a Cascading application
$HADOOP_HOME/bin/yarn jar your-application.jar [your parameters]

If the configuration scripts in $HADOOP_CONF_DIR are configured to use a cluster, the JAR is pushed into that cluster for execution.

Cascading does not rely on any environment variables like $HADOOP_HOME or $HADOOP_CONF_DIR. Only the executable binary files in bin/hadoop and bin/yarn rely on those environment variables.

It should be noted that even though your-application.jar is passed on the command line to bin/hadoop or bin/yarn, this in no way configures Hadoop to push the JAR into the cluster. You must still call one of the property setters mentioned above to set the proper path to the application JAR. If misconfigured, it’s likely that one of the internal libraries (found in the lib folder) will be pushed to the cluster instead, and "Class Not Found" exceptions will be thrown on the cluster.

Troubleshooting and Debugging

Read about debugging and troubleshooting Cascading in local-mode troubleshooting before you start debugging.

For planner-related errors that present during runtime when executing a Flow, see The Cascading Process Planner.

Source and Sink Taps

Cascading provides a few HDFS-specific Tap and Scheme implementations. Regardless of the build dependencies, the package names remain consistent.

It is important to understand how Hadoop deals with directories. By default, Hadoop cannot source data from directories with nested subdirectories, and it cannot write to directories that already exist.

However, the good news is that you can simply point the Hfs tap (described below) to a directory of data files. The data files are all used as input. There is no need to enumerate each individual file into a MultiSourceTap. If there are nested directories, use GlobHfs.

Schemes

See the section on Common Schemes for information about Hadoop-specific versions of TextLine and TextDelimited.

The Hadoop version of TextLine, does not provide a line number. The offset field is used, instead of the num field that is provided in the local-mode version.
SequenceFile

cascading.scheme.hadoop.SequenceFile is based on the Hadoop Sequence file format, which is a binary format. When written to or read from, all Tuple values are saved in a binary form. This is the most efficient file format. However, be aware that the resulting files are binary and can only be read by applications running on the Hadoop platform.

WritableSequenceFile

Like the SequenceFile Scheme, cascading.scheme.hadoop.WritableSequenceFile is based on the Hadoop Sequence file, but it was designed to read and write key and/or value Hadoop Writable objects directly.

This is very useful if you have sequence files created by other applications. During writing (sinking), specified key and/or value fields are serialized directly into the sequence file. During reading (sourcing), the key and/or value objects are deserialized and wrapped in a Cascading Tuple object and passed to the downstream pipe assembly.

For best performance when running on the Hadoop platform, enable sequence file compression (either block or record-based compression) in the Hadoop property settings. Refer to the Hadoop documentation for the available properties and compression types.

Taps

The following sample code creates a new Hadoop filesystem tap that can read and write raw text files. The "offset" field is discarded because only one field name is provided. The resulting input tuple stream has only "line" values.

Example 3. Creating a new tap
Tap tap = new Hfs( new TextLine( new Fields( "line" ) ), path );

Here are the most commonly-used tap types:

Hfs

The cascading.tap.hadoop.Hfs tap uses the current Hadoop default filesystem.

If Hadoop is configured for "Hadoop stand-alone mode," the default filesystem is local filesystem (using a URI scheme of file://). If Hadoop is configured for distributed mode, the default filesystem is typically the Hadoop distributed file system (hdfs://).

Note that Hadoop can be forced to use an external filesystem by specifying a prefix to the URL passed into a new Hfs tap. For instance, using "s3://somebucket/path" tells Hadoop to use the S3 FileSystem implementation to access files in an Amazon S3 bucket. More information on this can be found in the Javadoc.

GlobHfs

The cascading.tap.hadoop.GlobHfs tap accepts Hadoop-style "file globbing" expression patterns. This allows for multiple paths to be used as a single source, where all paths match the given pattern.

DistCacheTap

The cascading.tap.hadoop.DistCacheTap is a subclass of the cascading.tap.DecoratorTap, which can wrap an Hfs instance. The tap enables writing to HDFS, but directs read operations to the Hadoop Distributed Cache if the tap is being read into the small side of a cascading.pipe.HashJoin.

At the time of this writing, the Apache Hadoop project does not have documentation for a "Distributed Cache" implementation for Tez, but Cascading leverages YARN’s ability to distribute files transparently.
PartitionTap

See cascading.tap.hadoop.PartitionTap for details.

On Hadoop you can only create subdirectories to partition data into. Hadoop must still write "part files" into each partition sub-directory, and there is no safe mechanism for manipulating "part file" names.

Custom Taps and Schemes

You can integrate Cascading with either remote systems or file types by developing code that executes one of the following implementations:

Support of a new file type

This implementation involves introducing a different file format for a file stored on an HDFS filesystem. You only need to create a new Scheme for use with the Hfs tap. See below for details.

Support of a new filesystem

A new file system, like Amazon S3, can be implemented by creating a new Hadoop FileSystem subclass. This implementation can be referenced by registering a new URI scheme (Amazon S3 uses s3://).

Such an implementation allows for the Hfs tap to be used with any pre-existing Scheme implementations (like TextDelimited).

Support of a new data storage system

By creating a new Tap and possibly a compatible Scheme, your application can read or write data on a back-end system, such as a database or key-value store.

Tap and Scheme implementations are typically tied together. The purpose of the Tap and Scheme interface is to provide a consistent interface over the disparate technologies that operate with Cascading. The interface is not designed to hide implementation details.

Before beginning, or for inspiration, please see if any integrations for your file type or storage system already exist by visiting the Cascading extensions page.

Schemes

If the goal is to simply add a new file type to be stored on a Hadoop-compatible filesystem, only a new Scheme needs to be implemented.

As noted above Tap and Scheme implementations are coupled. In this case the goal is to create a Scheme that works with the Hfs tap. Hfs is an encapsulation of the Hadoop FileSystem API, in part. Creating a new file type involves only finding or implementing an appropriate org.apache.hadoop.mapred.FileInputFormat and org.apache.hadoop.mapred.FileOutputFormat interface.

Every Scheme can set any custom properties that the underlying platform requires, via the sourceConfInit() and sinkConfInit() methods. These methods may be called more than once with new configuration objects, and should be idempotent (that is, the methods do not initialize resources).

Once the proper FileInputFormat and FileOutputFormat are created, creating the Scheme implementation is a matter of implementing the *ConfInit() methods and the source() and sink() methods to perform the key-value conversion to a Tuple (when sourcing data), and back again (on the sinking of data).

It is highly recommended that the Cascading source code is checked. In many cases simply overriding SequenceFile or WritableSequenceFile might be suitable.
Cascading only supports the mapred.* APIs, not the mapreduce.* APIs. Unfortunately, Cascading cannot support both APIs simultaneously on the MapReduce framework, so the more stable original version was chosen. Apache Tez relaxes this restriction, allowing future versions to support Input/OutputFormat implementations from both Java package namespaces.

Taps

If the goal is to simply add a new HDFS-compatible file type, see the Source and Sink Taps section above.

If the goal is to add a new data store type, the following information explains the procedure.

Creating custom taps for HDFS and Hadoop MapReduce requires intermediate to advanced knowledge of Hadoop and the Hadoop FileSystem API.

First the developer must implement Hadoop org.apache.hadoop.mapred.InputFormat and/or org.apache.hadoop.mapred.OutputFormat interfaces so that Hadoop knows how to split and handle the incoming and outgoing data. See the note above on API compatibility.

In order to configure Hadoop to use these implementations*, the custom Scheme is responsible for setting the InputFormat and OutputFormat on the Configuration object, using the sinkConfInit() and sourceConfInit() methods.

The Scheme is also responsible for translating the data from the data storage native type to a Cascading Tuple, and the back to the data storage native type when writing.

The Tap implementation is typically straightforward except for one detail. During runtime the openForRead() and openForWrite() methods are always called. Respectively they return a TupleEntryIterator and TupleEntryCollector, which in turn always produce or receive Tuple and TupleEntry instances to consume or store.

There are two cases in which these open methods are called: when the Input and Output arguments on the abstract methods openForRead() and openForWrite()

  • are null (case 1)

  • are not null (case 2)

Case 1: The first case happens when the developer wants to read or write data directly to the underlying system (put/retrieve records into/from a database table). The following scenarios can produce Case 1:

  • Client-side code is running in a test.

  • The application loads data into memory and uses the Tap instance as a convenience.

  • A lookup table is loaded from the Hadoop Distributed Cache.

  • Cascading reads or writes data. For example, when Cascading reads a stream to populate the accumulated side of a HashJoin or when a PartitionTap is writing data to bin directories.

Case 2: The second case, not null argument, happens when Hadoop does one of the following actions:

  • Initiates the read, sourcing data specified in an InputSplit

  • Manages the write operation from the output of a Map/Reduce/Vertice task

When the method arguments are null, it is the responsibility of the Tap implementation to create the declared input/output types. When they are not, they must be used directly and not ignored.

On the Hadoop platform, the Input is a RecordReader, which is created by Hadoop and passed to the Tap. This RecordReader is already configured to read data from the current InputSplit.

Also on the Hadoop platform, the Ouput is an OutputCollector, which is created by Hadoop and passed to the Tap. This OutputCollector is already configured to to write data to the current resource.

To ease the development at this stage, you can reuse the classes cascading.tap.hadoop.io.HadoopTupleEntrySchemeIterator and cascading.tap.hadoop.io.HadoopTupleEntrySchemeCollector if doing so is appropriate for your code.

Partial Aggregation instead of Combiners

In Hadoop mode, Cascading does not support MapReduce "Combiners." Combiners are a simple optimization allowing some Reduce functions to run on the Map side of MapReduce. Combiners are very powerful in that they reduce the I/O between the Mappers and Reducers — why send all of your Mapper data to Reducers when you can compute some values on the Map side and combine them in the Reducer?

But Combiners are limited to Associative and Commutative functions only, such as "sum" and "max." In addition, the process requires that the values emitted by the Map task must be serialized, sorted (which involves deserialization and comparison), deserialized again, and operated on — after which the results are again serialized and sorted. Combiners trade CPU for gains in I/O.

Cascading takes a different approach to partial aggregations on the Map side. When the results are combined on the Reduce side, Cascading trades memory instead of CPU for I/O gains. The Cascading mechanism functions by caching values (up to a threshold limit). This bypasses the redundant serialization, deserialization, and sorting. Also, Cascading allows any aggregate function to be implemented — not just Associative and Commutative functions.

Custom Types and Serialization

Cascading supports any class type, except for an enum type, as values stored and passed in Tuple instances.

But for this to work when using the Cascading Hadoop mode, any Class that isn’t a primitive type or a Hadoop Writable type requires a corresponding Hadoop serialization class registered in the Hadoop configuration files for your cluster. Hadoop Writable types work because there is already a generic serialization implementation built into Hadoop. See the Hadoop documentation for information on registering a new serialization helper or creating Writable types.

Registered serialization implementations are automatically inherited by Cascading.

During serialization and deserialization of Tuple instances that contain custom types, the Cascading Tuple serialization framework must store the class name (as a String) before serializing the custom object. This can be very space-inefficient. To overcome this, custom types can add the SerializationToken Java annotation to the custom type class. The SerializationToken annotation expects two arrays — one of integers that are used as tokens, and one of Class name strings. Both arrays must be the same size. The integer tokens must all have values of 128 or greater because the first 128 values are reserved for internal use.

During serialization and deserialization, token values are used instead of the String Class names in order to reduce storage needs.

Serialization tokens may also be stored in the Hadoop configuration files. Alternatively, the tokens can be set as a property passed to the FlowConnector, with the property name cascading.serialization.tokens. The value of this property is a comma-separated list of token=classname values.

Note that Cascading natively serializes and deserializes all primitives and byte arrays (byte[]) if the developer registers the BytesSerialization class by using TupleSerializationProps.addSerialization( properties, BytesSerialization.class.getName() ). The token 127 is used for the Hadoop BytesWritable class.

By default, Cascading uses lazy deserialization on Tuple elements during comparisons when Hadoop sorts keys during the "shuffle" phase.

Cascading supports custom serialization for custom types, as well as lazy deserialization of custom types during comparisons. This is accomplished by implementing the StreamComparator interface. See the Javadoc for detailed instructions on implementation, and the unit tests for examples.