cascading.pipe.assembly
Class AggregateBy

java.lang.Object
  extended by cascading.pipe.Pipe
      extended by cascading.pipe.SubAssembly
          extended by cascading.pipe.assembly.AggregateBy
All Implemented Interfaces:
FlowElement, Serializable
Direct Known Subclasses:
AverageBy, CountBy, FirstBy, MaxBy, MinBy, SumBy

public class AggregateBy
extends SubAssembly

Class AggregateBy is a SubAssembly that serves two roles for handling aggregate operations.

The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and completed Reduce side. Summing is associative and commutative.

AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over two, and a hack)

Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized, deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of memory and a little or no IO.

Further, Combiners are limited to only associative/commutative operations.

Additionally the Cascading planner can move the Map side optimization to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which is over HDFS).

The second role of the AggregateBy class is to allow for composition of AggregateBy sub-classes. That is, SumBy and CountBy AggregateBy sub-classes can be performed in parallel on the same grouping keys.

Custom AggregateBy classes can be created by sub-classing this class and implementing a special AggregateBy.Functor for use on the Map side. Multiple Functor instances are managed by the AggregateBy.CompositeFunction class allowing them all to share the same LRU value map for more efficiency.

AggregateBy instances return argumentFields which are used internally to control the values passed to internal Functor instances. If any argumentFields also have Comparators, they will be used to for secondary sorting (see GroupBy sortFields. This feature is used by FirstBy to control which Tuple is seen first for a grouping.

To tune the LRU, set the threshold value to a high enough value to utilize available memory. Or set a default value via the AGGREGATE_BY_THRESHOLD property. The current default (AggregateBy.CompositeFunction.DEFAULT_THRESHOLD) is 10, 000 unique keys. Note "flushes" from the LRU will be logged in threshold increments along with memory information.

Note using a AggregateBy instance automatically inserts a GroupBy into the resulting Flow. And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy.

Also note that Unique is not a CompositeAggregator and is slightly more optimized internally.

Keep in mind the Hasher interface is not honored here (for storing keys in the cache). Thus arrays of primitives and object, like byte[] will not be properly stored. This is a known issue and will be resolved in a future release.

See Also:
SumBy, CountBy, Unique, Serialized Form

Nested Class Summary
static class AggregateBy.CompositeFunction
          Class CompositeFunction takes multiple Functor instances and manages them as a single Function.
static class AggregateBy.Flush
           
static interface AggregateBy.Functor
          Interface Functor provides a means to create a simple function for use with the AggregateBy.CompositeFunction class.
 
Field Summary
static String AGGREGATE_BY_THRESHOLD
           
static int DEFAULT_THRESHOLD
           
static int USE_DEFAULT_THRESHOLD
           
 
Fields inherited from class cascading.pipe.Pipe
configDef, parent, stepConfigDef
 
Constructor Summary
protected AggregateBy(Fields argumentFields, AggregateBy.Functor functor, Aggregator aggregator)
          Constructor CompositeAggregator creates a new CompositeAggregator instance.
  AggregateBy(Pipe pipe, Fields groupingFields, AggregateBy... assemblies)
          Constructor CompositeAggregator creates a new CompositeAggregator instance.
  AggregateBy(Pipe pipe, Fields groupingFields, int threshold, AggregateBy... assemblies)
          Constructor CompositeAggregator creates a new CompositeAggregator instance.
protected AggregateBy(String name, int threshold)
          Constructor CompositeAggregator creates a new CompositeAggregator instance.
  AggregateBy(String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies)
          Constructor CompositeAggregator creates a new CompositeAggregator instance.
protected AggregateBy(String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, AggregateBy.Functor functor, Aggregator aggregator, int threshold)
           
  AggregateBy(String name, Pipe[] pipes, Fields groupingFields, int threshold, AggregateBy... assemblies)
          Constructor CompositeAggregator creates a new CompositeAggregator instance.
  AggregateBy(String name, Pipe pipe, Fields groupingFields, int threshold, AggregateBy... assemblies)
          Constructor CompositeAggregator creates a new CompositeAggregator instance.
 
Method Summary
protected  Aggregator[] getAggregators()
           
protected  Fields[] getArgumentFields()
           
 Fields[] getFieldDeclarations()
          Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the field declaration of the given Aggregator operations.
protected  AggregateBy.Functor[] getFunctors()
           
 GroupBy getGroupBy()
          Method getGroupBy returns the internal GroupBy instance so that any custom properties can be set on it via Pipe.getStepConfigDef().
 Fields getGroupingFields()
          Method getGroupingFields returns the Fields this instances will be grouping against.
protected  void initialize(Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, AggregateBy.Functor[] functors, Aggregator[] aggregators)
           
protected  void initialize(Fields groupingFields, Pipe[] pipes, Fields argumentFields, AggregateBy.Functor functor, Aggregator aggregator)
           
protected  void verify()
          Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns.
 
Methods inherited from class cascading.pipe.SubAssembly
getName, getPrevious, getTailNames, getTails, setPrevious, setTails, unwind
 
Methods inherited from class cascading.pipe.Pipe
equals, getConfigDef, getHeads, getParent, getStepConfigDef, getTrace, hasConfigDef, hashCode, hasStepConfigDef, id, isEquivalentTo, named, names, outgoingScopeFor, pipes, print, printInternal, resolveIncomingOperationArgumentFields, resolveIncomingOperationPassThroughFields, setParent, toString
 
Methods inherited from class java.lang.Object
clone, finalize, getClass, notify, notifyAll, wait, wait, wait
 

Field Detail

USE_DEFAULT_THRESHOLD

public static final int USE_DEFAULT_THRESHOLD
See Also:
Constant Field Values

DEFAULT_THRESHOLD

public static final int DEFAULT_THRESHOLD
See Also:
Constant Field Values

AGGREGATE_BY_THRESHOLD

public static final String AGGREGATE_BY_THRESHOLD
See Also:
Constant Field Values
Constructor Detail

AggregateBy

protected AggregateBy(String name,
                      int threshold)
Constructor CompositeAggregator creates a new CompositeAggregator instance.

Parameters:
name - of type String
threshold - of type int

AggregateBy

protected AggregateBy(Fields argumentFields,
                      AggregateBy.Functor functor,
                      Aggregator aggregator)
Constructor CompositeAggregator creates a new CompositeAggregator instance.

Parameters:
argumentFields - of type Fields
functor - of type Functor
aggregator - of type Aggregator

AggregateBy

@ConstructorProperties(value={"pipe","groupingFields","assemblies"})
public AggregateBy(Pipe pipe,
                                              Fields groupingFields,
                                              AggregateBy... assemblies)
Constructor CompositeAggregator creates a new CompositeAggregator instance.

Parameters:
pipe - of type Pipe
groupingFields - of type Fields
assemblies - of type CompositeAggregator...

AggregateBy

@ConstructorProperties(value={"pipe","groupingFields","threshold","assemblies"})
public AggregateBy(Pipe pipe,
                                              Fields groupingFields,
                                              int threshold,
                                              AggregateBy... assemblies)
Constructor CompositeAggregator creates a new CompositeAggregator instance.

Parameters:
pipe - of type Pipe
groupingFields - of type Fields
threshold - of type int
assemblies - of type CompositeAggregator...

AggregateBy

@ConstructorProperties(value={"name","pipe","groupingFields","threshold","assemblies"})
public AggregateBy(String name,
                                              Pipe pipe,
                                              Fields groupingFields,
                                              int threshold,
                                              AggregateBy... assemblies)
Constructor CompositeAggregator creates a new CompositeAggregator instance.

Parameters:
pipe - of type Pipe
groupingFields - of type Fields
threshold - of type int
assemblies - of type CompositeAggregator...

AggregateBy

@ConstructorProperties(value={"name","pipes","groupingFields","assemblies"})
public AggregateBy(String name,
                                              Pipe[] pipes,
                                              Fields groupingFields,
                                              AggregateBy... assemblies)
Constructor CompositeAggregator creates a new CompositeAggregator instance.

Parameters:
name - of type String
pipes - of type Pipe[]
groupingFields - of type Fields
assemblies - of type CompositeAggregator...

AggregateBy

@ConstructorProperties(value={"name","pipes","groupingFields","threshold","assemblies"})
public AggregateBy(String name,
                                              Pipe[] pipes,
                                              Fields groupingFields,
                                              int threshold,
                                              AggregateBy... assemblies)
Constructor CompositeAggregator creates a new CompositeAggregator instance.

Parameters:
name - of type String
pipes - of type Pipe[]
groupingFields - of type Fields
threshold - of type int
assemblies - of type CompositeAggregator...

AggregateBy

protected AggregateBy(String name,
                      Pipe[] pipes,
                      Fields groupingFields,
                      Fields argumentFields,
                      AggregateBy.Functor functor,
                      Aggregator aggregator,
                      int threshold)
Method Detail

initialize

protected void initialize(Fields groupingFields,
                          Pipe[] pipes,
                          Fields argumentFields,
                          AggregateBy.Functor functor,
                          Aggregator aggregator)

initialize

protected void initialize(Fields groupingFields,
                          Pipe[] pipes,
                          Fields[] argumentFields,
                          AggregateBy.Functor[] functors,
                          Aggregator[] aggregators)

verify

protected void verify()
Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns.


getGroupingFields

public Fields getGroupingFields()
Method getGroupingFields returns the Fields this instances will be grouping against.

Returns:
the current grouping fields

getFieldDeclarations

public Fields[] getFieldDeclarations()
Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the field declaration of the given Aggregator operations.

Note the actual Fields values are returned, not planner resolved Fields.

Returns:
and array of Fields

getArgumentFields

protected Fields[] getArgumentFields()

getFunctors

protected AggregateBy.Functor[] getFunctors()

getAggregators

protected Aggregator[] getAggregators()

getGroupBy

public GroupBy getGroupBy()
Method getGroupBy returns the internal GroupBy instance so that any custom properties can be set on it via Pipe.getStepConfigDef().

Returns:
GroupBy type


Copyright © 2007-2013 Concurrent, Inc. All Rights Reserved.