Cascading 3.0 User Guide - Cascading Basic Concepts
Cascading Basic Concepts
Terminology
The Cascading processing model loosely is based on a metaphor of "pipes and filters".
The Cascading API allows the developer to assemble pipes into pipelines (or pipe assemblies) that split, merge, group, or join streams of data while applying operations to each data record or groups of records (called a tuple).
Pipe assemblies are specified independently of the data source they are to process. So before a pipe assembly can be executed, it must be bound to taps, i.e., data sources and sinks. The result of binding one or more pipe assemblies to taps is a flow, which is executed on a computer or cluster.
Multiple flows can be grouped together and executed as a single unit of work or process. In this context, if one flow depends on the output of another, it is not executed until all of its data dependencies are satisfied. Such a collection of flows is called a cascade.
Table 1 provides short descriptions of some elementary Cascading terms. These entities are described in more detail throughout this chapter.
Pipe |
An element of a branch or pipe assembly, connects to other pipes |
Tuple |
Data record, like an array |
Branch |
Simple chain of pipes without forks or merges |
Pipe Assembly |
Interconnected set of pipe branches — looks like a DAG |
Tuple Stream |
Series of tuples passing through a pipe branch or assembly |
Tap |
Data source or data destination for a pipe assembly |
Flow |
Pipe assemblies and their associated taps that form executable code |
Cascade |
An ordered collection of flows |
Pipe Assemblies
Pipe assemblies define what work should be done with tuple streams, which are read from tap sources and written to tap sinks. The work performed on the data stream may include actions such as filtering, transforming, organizing, and calculating.
Pipe assemblies may use multiple sources and multiple sinks, and may define splits, merges, and joins to manipulate the tuple streams.
Pipe Assembly Workflow
Pipe assemblies are created by chaining cascading.pipe.Pipe classes and subclasses together. Chaining is accomplished by passing the previous Pipe instances to the constructor of the next Pipe instance.
Example 1 below demonstrates this type of chaining. The specific operations performed are not important in the example; the point is to show the general flow of the data streams.
The code in the example creates two pipes—a "left-hand side" (lhs) and a "right-hand side" (rhs) — and performs some processing on them both, using the Each pipe.
Then the two pipes are joined into one, using the CoGroup pipe, and several operations are performed on the joined pipe using Every and GroupBy. The diagram after the example provides a visual representation of the workflow.
// the "left hand side" assembly head
Pipe lhs = new Pipe( "lhs" );
lhs = new Each( lhs, new SomeFunction() );
lhs = new Each( lhs, new SomeFilter() );
// the "right hand side" assembly head
Pipe rhs = new Pipe( "rhs" );
rhs = new Each( rhs, new SomeFunction() );
// joins the lhs and rhs
Pipe join = new CoGroup( lhs, rhs );
join = new Every( join, new SomeAggregator() );
join = new GroupBy( join );
join = new Every( join, new SomeAggregator() );
// the tail of the assembly
join = new Each( join, new SomeFunction() );
The following diagram is a visual representation of the example above.
Common Stream Patterns
As data moves through the pipe, streams may be separated or combined for various purposes. Here are the three basic patterns:
- Split
-
A split takes a single stream and sends it down multiple paths — that is, it feeds a single Pipe instance into two or more subsequent separate Pipe instances with unique branch names.
- Merge
-
A merge combines two or more streams that have identical fields into a single stream. This is done by passing two or more Pipe instances to a Merge or GroupBy pipe. This is also called a union.
- Join
-
A join combines data from two or more streams that have different fields, based on common field values (analogous to a SQL join.) This is done by passing two or more Pipe instances to a HashJoin or CoGroup pipe. The code sequence and diagram above give an example.
Data Processing
In addition to directing the tuple streams—using splits, merges, and joins — pipe assemblies can examine, filter, organize, and transform the tuple data as the streams move through the pipe assemblies. To facilitate this, the values in the tuple are typically (optionally) given field names, just as database columns are given names, so that they may be referenced or selected.
The following terminology is used:
- Operation
-
Operations (cascading.operation.Operation) accept an input argument Tuple, and output zero or more result tuples. There are a few subtypes of operations defined below. Cascading has a number of generic Operations that can be used, or developers can create their own custom Operations.
- Tuple
-
In Cascading, data is processed as a stream of tuples (cascading.tuple.Tuple), which are composed of fields, much like a database record or row. A Tuple is effectively an array of (field) values, where each value can be any java.lang.Object Java type (or byte[] array). For information on supporting nonprimitive types, see Custom Types.
- Fields
-
Fields (cascading.tuple.Fields) are used either to declare the field names for fields in a Tuple, or reference field values in a Tuple. They can be strings (such as "firstname" or "birthdate"), integers (for the field position — starting at 0 for the first position or starting at -1 for the last position), or one of the predefined Fields sets (such as Fields.ALL, which selects all values in the Tuple, like an asterisk in SQL). See Field Sets for more information.
Pipes
The code for the sample pipe assembly above, Chaining pipes, consists almost entirely of a series of Pipe constructors. This section describes the various Pipe classes in detail.
The base class cascading.pipe.Pipe and its subclasses are shown in the diagram below.
Types of Pipes
- Each
-
These pipes analyze, transform, or filter data based on the contents of tuples. The Each pipe operates on individual tuples in the stream, applying functions or filters such as conditionally replacing certain field values, removing tuples that have values outside a target range, etc.
You can also use Each to split or branch a stream, simply by routing the output of an Each into a different pipe or sink.
As with other types of pipe, you can use Each to specify a list of fields to output. Specifying an output list removes unwanted fields from a stream.
- Merge
-
Just as Each can be used to split one stream into two, Merge can be used to combine two or more streams having the same fields into one stream.
A Merge accepts two or more streams that have identical fields, and emits a single stream of tuples (in arbitrary order) that contains all the tuples from all the specified input streams. Thus a Merge is just a mingling of all the tuples from the input streams, as if shuffling multiple card decks into one.
Use Merge when no grouping is required (i.e., no aggregator or buffer operations will be performed). Subsequently Merge is much faster than GroupBy (see below) for merging.
To combine streams that have different fields, based on one or more common values, use CoGroup or HashJoin.
- GroupBy
-
GroupBy groups the tuples of a stream based on common values in a specified field.
If passed multiple streams as inputs, it performs a merge before the grouping. As with Merge, a GroupBy requires that multiple input streams share the same field structure.
The purpose of grouping is typically to prepare a stream for processing by the Every pipe, which performs aggregator and buffer operations on the groups, such as counting, totaling, or averaging values within that group.
It should be clear that "grouping" here essentially means sorting all the tuples into groups based on the value of a particular field. However, within a given group, the tuples are in arbitrary order unless you specify a secondary sort key. For most purposes, a secondary sort is not required and only increases the execution time.
- Every
-
The Every pipe operates on a tuple stream that has been grouped (by GroupBy or CoGroup) on the values of a particular field, such as "timestamp" or "zipcode". It’s used to apply aggregator or buffer operations such as counting, totaling, or averaging field values within each group. Thus the Every class is only for use on the output of GroupBy or CoGroup, and cannot be used with the output of Each, Merge, or HashJoin.
An Every instance may follow another Every instance, so Aggregator operations can be chained. This is not true for Buffer operations.
- CoGroup
-
CoGroup performs a join on two or more streams, similar to a SQL join, and groups the single resulting output stream on the values of specified fields. As with SQL, the join can be inner, outer, left, or right. Self-joins are permitted, as well as mixed joins (for three or more streams) and custom joins. Null fields in the input streams become corresponding null fields in the output stream.
The resulting output stream contains fields from all the input streams. If the streams contain any field names in common, they must be renamed to avoid duplicate field names in the resulting tuples.
- HashJoin
-
HashJoin performs a join on two or more streams, similar to a SQL join, and emits a single stream in arbitrary order. As with SQL, the join can be inner, outer, left, or right. Self-joins are permitted, as well as mixed joins (for three or more streams) and custom joins. Null fields in the input streams become corresponding null fields in the output stream.
For applications that do not require grouping, HashJoin provides faster execution than CoGroup, but only within certain prescribed cases. It is optimized for joining one or more small streams to no more than one large stream. Developers should thoroughly understand the limitations of this class, as described below, before attempting to use it.
The following table summarizes the different types of pipes.
Pipe type | Purpose | Input | Output |
---|---|---|---|
Pipe |
instantiate a pipe; create or name a branch |
name |
a (named) pipe |
SubAssembly |
create nested subassemblies |
||
Each |
apply a filter or function, or branch a stream |
tuple stream (grouped or not) |
a tuple stream, optionally filtered or transformed |
Merge |
merge two or more streams with identical fields |
two or more tuple streams |
a tuple stream, unsorted |
GroupBy |
sort/group on field values; optionally merge two or more streams with identical fields |
one or more tuple streams with identical fields |
a single tuple stream, grouped on key field(s) with optional secondary sort |
Every |
apply aggregator or buffer operation |
grouped tuple stream |
a tuple stream plus new fields with operation results |
CoGroup |
join 1 or more streams on matching field values |
one or more tuple streams |
a single tuple stream, joined on key field(s) |
HashJoin |
join 1 or more streams on matching field values |
one or more tuple streams |
a tuple stream in arbitrary order |
Platforms
Cascading supports pluggable planners that allow it to execute on differing platforms. Planners are invoked by an associated FlowConnector subclass. Currently, only four planners are provided, as described below:
- LocalFlowConnector
-
The cascading.flow.local.LocalFlowConnector provides a "local" mode planner for running Cascading completely in memory on the current computer. This allows for fast execution of Flows using local files or any other compatible custom Tap and Scheme classes.
The local mode planner and platform were not designed to scale beyond available memory, CPU, or disk on the current machine. Thus any memory-intensive processes that use GroupBy, CoGroup, or HashJoin are likely to fail when handling moderately large files.
Local mode is useful for development, testing, and interactive data exploration with sample sets.
- HadoopFlowConnector
-
The cascading.flow.hadoop.HadoopFlowConnector provides a planner for running Cascading on an Apache Hadoop 1.x cluster. This allows Cascading to execute with extremely large data sets over a cluster of computing nodes.
Note Hadoop 1.x only provides the MapReduce model for distributed computation.
- Hadoop2MR1FlowConnector
-
The cascading.flow.hadoop2.Hadoop2MR1FlowConnector provides a planner for running Cascading on an Apache Hadoop 2.x cluster. This class is roughly equivalent to the above HadoopFlowConnector except it uses Hadoop 2 specific properties and is compiled with Hadoop 2 API binaries.
The underlying planner and execution run using the default MapReduce Hadoop API.
- Hadoop2TezFlowConnector
-
The cascading.flow.tez.planner.Hadoop2TezFlowConnector provides a planner for running Cascading on an Apache Hadoop 2.x cluster with Apache Tez as an installed YARN application. It is beyond the scope of this document to provide Tez installation instructions.
Cascading’s support for pluggable planners allows a pipe assembly to be executed on an arbitrary platform, using platform-specific Tap and Scheme classes that hide the platform-related I/O details from the developer. For example, Hadoop uses org.apache.hadoop.mapred.InputFormat to read data, but local mode functions smoothly with a java.io.FileInputStream. This detail is hidden from developers unless they are creating custom Tap and Scheme classes.
Sourcing and Sinking Data
All input data comes in from, and all output data goes out to, some instance of a cascading.tap.Tap and cascading.scheme.Scheme pair. A Tap knows where data is located and how to access it — such as a file on the local filesystem, on a Hadoop distributed file system, or on Amazon S3. A Scheme knows what the data is and how to read or write it — such as the column/field names and if the file is text or binary.
A tap can be read from, which makes it a source, or written to, which makes it a sink. Or, more commonly, taps act as both sinks and sources when shared between flows (a tap may not be used as both a source and sink in the same flow).
The platform on which your application is running determines which specific Tap and Scheme classes you can use. Details are provided in subsequent chapters.
Common Schemes
Some Schemes are functionally common across platforms but remain platform specific in implementation. To prevent naming collisions, each class has a platform-specific package name.
Cascading provides two common Scheme types:
- TextLine
-
TextLine reads and writes raw text files and returns tuples that, by default, contain two fields specific to the platform used. The first field is either the byte offset or line number, and the second field is the actual line of text. When written to, all Tuple values are converted to Strings delimited with the tab character (\t). A TextLine scheme is provided for both the local and Hadoop modes.
By default TextLine uses the UTF-8 character set. This can be overridden on the appropriate TextLine constructor.
- TextDelimited
-
TextDelimited reads and writes character-delimited files in standard formats such as CSV (comma-separated values), TSV (tab-separated values), and so on. When written to, all Tuple values are converted to Strings and joined with the specified character delimiter. This Scheme can optionally handle quoted values with custom quote characters. Further, TextDelimited can coerce each value to a primitive type when reading a text file. A TextDelimited scheme is provided for both the local and Hadoop modes.
By default TextDelimited uses the UTF-8 character set. This can be overridden on the appropriate TextDelimited constructor.
Platform-Specific Implementation Details
Depending on which platform you use (Cascading local, Hadoop MapReduce, or Apache Tez), the classes you use to specify schemes will vary. Platform-specific details for each standard scheme are shown in Table 2.
Description |
Cascading local platform |
Hadoop platform |
Package Name |
cascading.scheme.local |
cascading.scheme.hadoop |
Read lines of text |
TextLine |
TextLine |
Read delimited text (CSV, TSV, etc.) |
TextDelimited |
TextDelimited |
Common Taps
The following sample code creates a new local filesystem Tap that can read and write raw text files. Since only one field name is provided, the "num" field is discarded, resulting in an input tuple stream with only "line" values.
Tap tap = new FileTap( new TextLine( new Fields( "line" ) ), path );
Some Taps are functionally common across platforms but remain platform-specific in implementation. To prevent naming collisions, each class has a platform-specific package name.
Cascading provides one common Tap type:
- PartitionTap
-
The cascading.tap.hadoop.PartitionTap and cascading.tap.local.PartitionTap are used to sink tuples into directory paths based on the values in the Tuple. More can be read below in PartitionTap.
Utility taps can be used to combine other Tap instances into a single Tap. You also might find it useful to subclass under a utility tap when creating other-purpose Tap types. Utility taps are platform-independent.
There are three utility taps:
- MultiSourceTap
-
The cascading.tap.MultiSourceTap is used to tie multiple tap instances into a single tap for use as an input source. The only restriction is that all the tap instances passed to a new MultiSourceTap share the same Scheme classes (not necessarily the same Scheme instance).
- MultiSinkTap
-
The cascading.tap.MultiSinkTap is used to tie multiple tap instances into a single tap for use as output sinks. At runtime, for every Tuple output by the pipe assembly, each child tap to the MultiSinkTap will sink the Tuple.
- DecoratorTap
-
The cascading.tap.DecoratorTap is a utility helper for wrapping an existing Tap with new functionality via a subclass and/or adding "metadata" to a Tap instance via the generic MetaInfo instance field. Furthermore, on the Hadoop platform, planner-created intermediate and Checkpoint Taps can be wrapped by a DecoratorTap implementation by the Cascading planner.
Platform-Specific Implementation Details
Depending on which platform you use (Cascading local or Hadoop), the classes you use to specify filesystems varies. Platform-specific details for each standard tap type are shown in Table 3.
Description |
Either platform |
Cascading local platform |
Hadoop platform |
Package Name |
cascading.tap |
cascading.tap.local |
cascading.tap.hadoop |
Multiple Taps as single source |
MultiSourceTap |
||
Multiple Taps as single sink |
MultiSinkTap |
||
Bin/Partition data into multiple files |
PartitionTap |
PartitionTap |
|
Wrapping a Tap with Metadata / Decorating intra-Flow Taps |
DecoratorTap |
To learn more about creating custom Tap or Scheme implementations, read the chapter on Custom Taps and Schemes |
Visit http://cascading.org/extensions/ for a list of available Tap or Scheme implementations. |
Sink Modes
Tap tap =
new FileTap( new TextLine( new Fields( "line" ) ), path, SinkMode.REPLACE );
All applications created with Cascading read data from one or more sources, process it, then write data to one or more sinks. This is done via the various Tap classes, where each class abstracts different types of back-end systems that store data as files, tables, blobs, and so on. But in order to sink data, some systems require that the resource (e.g., a file) be deleted before processing can begin. Other systems may allow for appending or updating of a resource (typical with database tables).
When creating a new Tap instance, a cascading.tap.SinkMode may be provided so that the Tap will know how to handle any existing resources. Note that not all Taps support all SinkMode values — for example, Hadoop does not support appending operations (updates) from a MapReduce job.
The available SinkModes are:
- SinkMode.KEEP
-
This is the default behavior. If the resource exists, attempting to write over it fails.
- SinkMode.REPLACE
-
This allows Cascading to delete the file immediately after the Flow is started.
- SinkMode.UPDATE
-
Allows for new tap types that can update or append — for example, to update or add records in a database. Each tap may implement this functionality in its own way. Cascading recognizes this update mode, and if a resource exists, does not fail or attempt to delete it.
Note that Cascading itself only uses these labels internally to know when to automatically call deleteResource() on the Tap or to leave the Tap alone. It is up to the Tap implementation to actually perform a write or update when processing starts. Thus, when start() or complete() is called on a Flow, any sink Tap labeled SinkMode.REPLACE will have its deleteResource() method called.
Conversely, if a Flow is in a Cascade and the Tap is set to SinkMode.KEEP or SinkMode.REPLACE, deleteResource() is called if and only if the sink is stale (i.e., older than the source). This allows a Cascade to behave like a compiler build file, only running Flows that should be run. For more information, see Skipping Flows.
Flows
When pipe assemblies are bound to source and sink taps, a Flow is created. Flows are executable in the sense that, once they are created, they can be started and will execute on the specified platform. If the Hadoop platform is specified, the Flow will execute on a Hadoop cluster.
A Flow is essentially a data-processing pipeline that reads data from sources, processes the data as defined by the pipe assembly, and writes data to the sinks. Input source data does not need to exist at the time the Flow is created, but it must exist by the time the Flow is executed.
The most common pattern is to create a Flow from an existing pipe assembly. But there are cases where a MapReduce job (if running on Hadoop MapReduce) has already been created, and it makes sense to encapsulate it in a Flow class so that it may participate in a Cascade and be scheduled with other Flow instances.
Alternatively, custom Flows can be created so that third-party applications can participate in a Cascade, and complex algorithms that result in iterative Flow executions can be encapsulated as a single Flow.
The chapter on Flows provides further detail.