Cascading 3.0 User Guide - The Cascading Process Planner

The Cascading Process Planner

For an introduction to the thinking and design behind the query planner, see the The Cascading 3.0 Query Planner blog posting.

FlowConnector

All FlowConnector subclasses provide a custom RuleRegistrySet. The component layers of RuleRegistrySet can be summarized as:

  • RuleRegistrySet consists of one or more RuleRegistry instances

  • RuleRegistry instances consist of Rule instances

A RuleRegistrySet is mutable. Consequently, the default registry can be retrieved via FlowConnector.getRuleRegistrySet() and updated.

The various FlowConnector subclasses are covered in Platforms.

RuleRegistrySet

The RuleRegistrySet manages one or more RuleRegistry instances.

During planning, all registered RuleRegistry instances are used independently to create candidate execution plans. If more than one RuleRegistry is available, the instances are executed in parallel. However, a total plan timeout can be provided in the case a planner executes for too long.

Either the first or "best" plan is utilized when the planner completes, depending on the value of RuleRegistrySet.setSelect(). The value can be one of the following values:

Select.FIRST

The first RuleRegistry instance to complete is used. All remaining planners are cancelled.

Select.COMPARED

The best RuleRegistry instance is used, where "best" is a function of the registred planComparator. The planComparator is set by calling RuleRegistrySet.setPlanComparator().

If not set, the default behavior is to choose the plan with the fewest FlowStep instances. If the number of FlowStep instances among plans is equal, the plan with the fewest FlowNode instances is chosen. Otherwise, the order in which the RuleRegistry instances were registered prevails.

Relying on the fewest FlowStep and FlowNode instances in the resulting plan is a simplified version of cost. If competing plans can be measured differently, supply a custom Comparator to perform the comparison.

Support for multiple registries is important. As Cascading advances, primitives at the Pipe level are added that offer new optimizations or capabilities.

These additions have consequences when related to the set of other pipe primitives being utilized. Each new primitive adds to the time complexity of finding a suitable plan. Frequently two or more primitives working together add yet another topological complexity (some combinations could cause runtime or planner failures if not accounted for).

The end result is that a one-size-fits-all rule set, in order to be safe, makes conservative decisions so that plan time and the resulting execution are reasonable instead of optimal.

By allowing multiple rule registries to be declared in the RuleRegistrySet, any given registry can decide if it is applicable, and if so, will apply itself. If not applicable, it will leave the competition.

For example, the Cascading Apache Tez planner provides one rule registry that supports HashJoin pipes, and one that does not.

Under very complex scenarios, when HashJoins are in play, the planner can execute must faster if it makes some compromises to limit the search space the planner must navigate to find a functional plan. When there are no HashJoins, it isn’t worth the time to find and apply the compromises.

RuleRegistry

The RuleRegistry consists of a set of rules. Rules in short navigate and mutate what is known as an element graph, which is simply a directed acyclic graph (DAG) of Pipe and Tap instances connected by edges (Scope instances).

The input of a rule is an element graph, and the output is a PlannerException, a modified element graph, or a subgraph of the given element graph.

Types of Rules

The following are the main types of rules:

RuleAssert

This rule simply provides an assertion mechanism about the structure of the pipe assembly. If a Pipe or Tap is misplaced or unsupported, it can in effect cause a syntax error. The error message includes the name and location of the problematic element.

RuleTransformer

This rule provides a means to modify or mutate an element graph by either adding or removing elements.

In addition to mutating an element graph, a transformer can annotate the graph with metadata that can inject logic to downstream rules or Cascading itself during runtime. Identifying the accumulated and streamed sides of a HashJoin greatly simplifies the construction of the actual execution logic.

RulePartitioner

This rule provides a means to break a large graph into smaller graphs. This is where a pipe assembly along with attached sources and sinks are broken down into FlowStep, FlowNode, and possibly FlowPipeline instances.

A partitioner can partition the current top-level element graph or repartition an element subgraph into new subgraphs.

See the section on Flow Process Hierarchy for details on the roles these classes play.

Phases for Rules

Rules are applied at different phases. The following is a list of PlanPhase types:

PreBalanceAssembly

The PreBalanceAssembly phase is where most assertions are applied.

BalanceAssembly

The BalanceAssembly phase is where, for example with MapReduce, intermediate Tap instances are injected into the element graph.

PostBalanceAssembly

The PostBalanceAssembly phase provides a means to perform any cleanup after the balancing phase.

PreResolveAssembly

The PreResolveAssembly phase is where any no op Pipe instances or unnecessary Debug or Assertion filters should be removed (depending on the configured DebugLevel or AssertionLevel, respectively.)

ResolveAssembly

The ResolveAssembly is where Cascading performs all field resolution by inspecting the source and sink Taps and any Operations that produce or require fields. The general purpose of this phase is to ensure that all field-level dependencies are satisfied.

No rules are applied in this phase.

PostResolveAssembly

The PostResolveAssembly phase is where any logical optimizations could be applied based on now fully resolved field names in the element graph prior to any subgraph partitioning.

PartitionSteps

The PartitionSteps phase is where the element graphs that represent the work that would be contained in individual FlowStep instances are found.

All MapReduce jobs are separated into individual element subgraphs, each bounded by source and sink Taps, or intermediate, temporary source and sink Taps.

PostSteps

The PostSteps phase provides a means to clean up post-step partitioning.

PartitionNodes

The PartitionNodes phase is where the element graphs that represent the work that would be contained in individual FlowNode instances are found.

PostNodes

The PostNodes phase provides a means to clean up post-node partitioning. Frequently malformed remainder subgraphs can result in rules that were already applied. This phase is the best opportunity to remove these graphs.

PartitionPipelines

The PartitionPipelines phase is where the element graphs that represent the work that would be contained in individual FlowPipeline instances are found.

In the case of MapReduce, a mapper function can have multiple discrete input paths that correspond to different computation paths. Consider joining two files: one pipeline to process the left side and another to process the right side. The side to process within the FlowNode is determined at runtime when the child mapper JVM is instantiated and handed an input split, a portion of one of the input files, which in turn corresponds to one of the pipelines.

PostPipelines

The PostPipelines phase provides a means to clean up post-pipeline partitioning. Frequently malformed remainder subgraphs can result in rules that were applied. This phase is the best opportunity to remove these graphs.

Debugging RuleRegistrySets

You can debug planner rules by enabling the various trace mechanisms outlined in the section on Debugging Planner Failures.