001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.Serializable;
025    import java.util.ArrayList;
026    import java.util.Arrays;
027    import java.util.Collections;
028    import java.util.Comparator;
029    import java.util.LinkedHashMap;
030    import java.util.List;
031    import java.util.Map;
032    
033    import cascading.flow.FlowProcess;
034    import cascading.management.annotation.Property;
035    import cascading.management.annotation.PropertyConfigured;
036    import cascading.management.annotation.PropertyDescription;
037    import cascading.management.annotation.Visibility;
038    import cascading.operation.Aggregator;
039    import cascading.operation.BaseOperation;
040    import cascading.operation.Function;
041    import cascading.operation.FunctionCall;
042    import cascading.operation.OperationCall;
043    import cascading.pipe.Each;
044    import cascading.pipe.Every;
045    import cascading.pipe.GroupBy;
046    import cascading.pipe.Pipe;
047    import cascading.pipe.SubAssembly;
048    import cascading.tuple.Fields;
049    import cascading.tuple.Tuple;
050    import cascading.tuple.TupleEntry;
051    import cascading.tuple.TupleEntryCollector;
052    import cascading.tuple.util.TupleHasher;
053    import cascading.tuple.util.TupleViews;
054    import org.slf4j.Logger;
055    import org.slf4j.LoggerFactory;
056    
057    /**
058     * Class AggregateBy is a {@link SubAssembly} that serves two roles for handling aggregate operations.
059     * <p/>
060     * The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the
061     * Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and
062     * completed Reduce side. Summing is associative and commutative.
063     * <p/>
064     * AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting
065     * would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be
066     * transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over
067     * two, and a hack)
068     * <p/>
069     * Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized,
070     * deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of
071     * memory and a little or no IO.
072     * <p/>
073     * Further, Combiners are limited to only associative/commutative operations.
074     * <p/>
075     * Additionally the Cascading planner can move the Map side optimization
076     * to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which
077     * is over HDFS).
078     * <p/>
079     * The second role of the AggregateBy class is to allow for composition of AggregateBy
080     * sub-classes. That is, {@link SumBy} and {@link CountBy} AggregateBy sub-classes can be performed
081     * in parallel on the same grouping keys.
082     * </p>
083     * Custom AggregateBy classes can be created by sub-classing this class and implementing a special
084     * {@link Functor} for use on the Map side. Multiple Functor instances are managed by the {@link CompositeFunction}
085     * class allowing them all to share the same LRU value map for more efficiency.
086     * <p/>
087     * AggregateBy instances return {@code argumentFields} which are used internally to control the values passed to
088     * internal Functor instances. If any argumentFields also have {@link java.util.Comparator}s, they will be used
089     * to for secondary sorting (see {@link GroupBy} {@code sortFields}. This feature is used by {@link FirstBy} to
090     * control which Tuple is seen first for a grouping.
091     * <p/>
092     * <p/>
093     * To tune the LRU, set the {@code threshold} value to a high enough value to utilize available memory. Or set a
094     * default value via the {@link #AGGREGATE_BY_THRESHOLD} property. The current default ({@link CompositeFunction#DEFAULT_THRESHOLD})
095     * is {@code 10, 000} unique keys. Note "flushes" from the LRU will be logged in threshold increments along with memory
096     * information.
097     * <p/>
098     * Note using a AggregateBy instance automatically inserts a {@link GroupBy} into the resulting {@link cascading.flow.Flow}.
099     * And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy.
100     * <p/>
101     * Also note that {@link Unique} is not a CompositeAggregator and is slightly more optimized internally.
102     * <p/>
103     * As of Cascading 2.6 AggregateBy honors the {@link cascading.tuple.Hasher} interface for storing keys in the cache.
104     *
105     * @see SumBy
106     * @see CountBy
107     * @see Unique
108     */
109    public class AggregateBy extends SubAssembly
110      {
111      private static final Logger LOG = LoggerFactory.getLogger( AggregateBy.class );
112    
113      public static final int USE_DEFAULT_THRESHOLD = 0;
114      public static final int DEFAULT_THRESHOLD = CompositeFunction.DEFAULT_THRESHOLD;
115      public static final String AGGREGATE_BY_THRESHOLD = "cascading.aggregateby.threshold";
116    
117      private String name;
118      private int threshold;
119      private Fields groupingFields;
120      private Fields[] argumentFields;
121      private Functor[] functors;
122      private Aggregator[] aggregators;
123      private transient GroupBy groupBy;
124    
125      public enum Cache
126        {
127          Num_Keys_Flushed,
128          Num_Keys_Hit,
129          Num_Keys_Missed
130        }
131    
132      @Deprecated
133      public enum Flush
134        {
135          Num_Keys_Flushed
136        }
137    
138      /**
139       * Interface Functor provides a means to create a simple function for use with the {@link CompositeFunction} class.
140       * <p/>
141       * Note the {@link FlowProcess} argument provides access to the underlying properties and counter APIs.
142       */
143      public interface Functor extends Serializable
144        {
145        /**
146         * Method getDeclaredFields returns the declaredFields of this Functor object.
147         *
148         * @return the declaredFields (type Fields) of this Functor object.
149         */
150        Fields getDeclaredFields();
151    
152        /**
153         * Method aggregate operates on the given args in tandem (optionally) with the given context values.
154         * <p/>
155         * The context argument is the result of the previous call to this method. Use it to store values between aggregate
156         * calls (the current count, or sum of the args).
157         * <p/>
158         * On the very first invocation of aggregate for a given grouping key, context will be {@code null}. All subsequent
159         * invocations context will be the value returned on the previous invocation.
160         *
161         * @param flowProcess of type FlowProcess
162         * @param args        of type TupleEntry
163         * @param context     of type Tuple   @return Tuple
164         */
165        Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context );
166    
167        /**
168         * Method complete allows the final aggregate computation to be performed before the return value is collected.
169         * <p/>
170         * The number of values in the returned {@link Tuple} instance must match the number of declaredFields.
171         * <p/>
172         * It is safe to return the context object as the result value.
173         *
174         * @param flowProcess of type FlowProcess
175         * @param context     of type Tuple  @return Tuple
176         */
177        Tuple complete( FlowProcess flowProcess, Tuple context );
178        }
179    
180      /**
181       * Class CompositeFunction takes multiple Functor instances and manages them as a single {@link Function}.
182       *
183       * @see Functor
184       */
185      public static class CompositeFunction extends BaseOperation<CompositeFunction.Context> implements Function<CompositeFunction.Context>
186        {
187        public static final int DEFAULT_THRESHOLD = 10000;
188    
189        private int threshold = 0;
190        private final Fields groupingFields;
191        private final Fields[] argumentFields;
192        private final Fields[] functorFields;
193        private final Functor[] functors;
194        private final TupleHasher tupleHasher;
195    
196        public static class Context
197          {
198          LinkedHashMap<Tuple, Tuple[]> lru;
199          TupleEntry[] arguments;
200          Tuple result;
201          }
202    
203        /**
204         * Constructor CompositeFunction creates a new CompositeFunction instance.
205         *
206         * @param groupingFields of type Fields
207         * @param argumentFields of type Fields
208         * @param functor        of type Functor
209         * @param threshold      of type int
210         */
211        public CompositeFunction( Fields groupingFields, Fields argumentFields, Functor functor, int threshold )
212          {
213          this( groupingFields, Fields.fields( argumentFields ), new Functor[]{functor}, threshold );
214          }
215    
216        /**
217         * Constructor CompositeFunction creates a new CompositeFunction instance.
218         *
219         * @param groupingFields of type Fields
220         * @param argumentFields of type Fields[]
221         * @param functors       of type Functor[]
222         * @param threshold      of type int
223         */
224        public CompositeFunction( Fields groupingFields, Fields[] argumentFields, Functor[] functors, int threshold )
225          {
226          super( getFields( groupingFields, functors ) ); // todo: groupingFields should lookup incoming type information
227          this.groupingFields = groupingFields;
228          this.argumentFields = argumentFields;
229          this.functors = functors;
230          this.threshold = threshold;
231    
232          this.functorFields = new Fields[ functors.length ];
233    
234          for( int i = 0; i < functors.length; i++ )
235            this.functorFields[ i ] = functors[ i ].getDeclaredFields();
236    
237          Comparator[] hashers = TupleHasher.merge( functorFields );
238          if( !TupleHasher.isNull( hashers ) )
239            this.tupleHasher = new TupleHasher( null, hashers );
240          else
241            this.tupleHasher = null;
242          }
243    
244        private static Fields getFields( Fields groupingFields, Functor[] functors )
245          {
246          Fields fields = groupingFields;
247    
248          for( Functor functor : functors )
249            fields = fields.append( functor.getDeclaredFields() );
250    
251          return fields;
252          }
253    
254        @Override
255        public void prepare( final FlowProcess flowProcess, final OperationCall<CompositeFunction.Context> operationCall )
256          {
257          if( threshold == 0 )
258            {
259            Integer value = flowProcess.getIntegerProperty( AGGREGATE_BY_THRESHOLD );
260    
261            if( value != null && value > 0 )
262              threshold = value;
263            else
264              threshold = DEFAULT_THRESHOLD;
265            }
266    
267          LOG.info( "using threshold value: {}", threshold );
268    
269          Fields[] fields = new Fields[ functors.length + 1 ];
270    
271          fields[ 0 ] = groupingFields;
272    
273          for( int i = 0; i < functors.length; i++ )
274            fields[ i + 1 ] = functors[ i ].getDeclaredFields();
275    
276          final Context context = new Context();
277    
278          context.arguments = new TupleEntry[ functors.length ];
279    
280          for( int i = 0; i < context.arguments.length; i++ )
281            {
282            Fields resolvedArgumentFields = operationCall.getArgumentFields();
283    
284            int[] pos;
285    
286            if( argumentFields[ i ].isAll() )
287              pos = resolvedArgumentFields.getPos();
288            else
289              pos = resolvedArgumentFields.getPos( argumentFields[ i ] ); // returns null if selector is ALL
290    
291            Tuple narrow = TupleViews.createNarrow( pos );
292    
293            Fields currentFields;
294    
295            if( this.argumentFields[ i ].isSubstitution() )
296              currentFields = resolvedArgumentFields.select( this.argumentFields[ i ] ); // attempt to retain comparator
297            else
298              currentFields = Fields.asDeclaration( this.argumentFields[ i ] );
299    
300            context.arguments[ i ] = new TupleEntry( currentFields, narrow );
301            }
302    
303          context.result = TupleViews.createComposite( fields );
304    
305          context.lru = new LinkedHashMap<Tuple, Tuple[]>( threshold, 0.75f, true )
306          {
307          long flushes = 0;
308    
309          @Override
310          protected boolean removeEldestEntry( Map.Entry<Tuple, Tuple[]> eldest )
311            {
312            boolean doRemove = size() > threshold;
313    
314            if( doRemove )
315              {
316              completeFunctors( flowProcess, ( (FunctionCall) operationCall ).getOutputCollector(), context.result, eldest );
317              flowProcess.increment( Cache.Num_Keys_Flushed, 1 );
318              flowProcess.increment( Flush.Num_Keys_Flushed, 1 );
319    
320              if( flushes % threshold == 0 ) // every multiple, write out data
321                {
322                Runtime runtime = Runtime.getRuntime();
323                long freeMem = runtime.freeMemory() / 1024 / 1024;
324                long maxMem = runtime.maxMemory() / 1024 / 1024;
325                long totalMem = runtime.totalMemory() / 1024 / 1024;
326    
327                LOG.info( "flushed keys num times: {}, with threshold: {}", flushes + 1, threshold );
328                LOG.info( "mem on flush (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem );
329    
330                float percent = (float) totalMem / (float) maxMem;
331    
332                if( percent < 0.80F )
333                  LOG.info( "total mem is {}% of max mem, to better utilize unused memory consider increasing current LRU threshold with system property \"{}\"", (int) ( percent * 100.0F ), AGGREGATE_BY_THRESHOLD );
334                }
335    
336              flushes++;
337              }
338    
339            return doRemove;
340            }
341          };
342    
343          operationCall.setContext( context );
344          }
345    
346        @Override
347        public void operate( FlowProcess flowProcess, FunctionCall<CompositeFunction.Context> functionCall )
348          {
349          TupleEntry arguments = functionCall.getArguments();
350          Tuple key = TupleHasher.wrapTuple( this.tupleHasher, arguments.selectTupleCopy( groupingFields ) );
351    
352          Context context = functionCall.getContext();
353          Tuple[] functorContext = context.lru.get( key );
354    
355          if( functorContext == null )
356            {
357            functorContext = new Tuple[ functors.length ];
358            context.lru.put( key, functorContext );
359            flowProcess.increment( Cache.Num_Keys_Missed, 1 );
360            }
361          else
362            {
363            flowProcess.increment( Cache.Num_Keys_Hit, 1 );
364            }
365    
366          for( int i = 0; i < functors.length; i++ )
367            {
368            TupleViews.reset( context.arguments[ i ].getTuple(), arguments.getTuple() );
369            functorContext[ i ] = functors[ i ].aggregate( flowProcess, context.arguments[ i ], functorContext[ i ] );
370            }
371          }
372    
373        @Override
374        public void flush( FlowProcess flowProcess, OperationCall<CompositeFunction.Context> operationCall )
375          {
376          // need to drain context
377          TupleEntryCollector collector = ( (FunctionCall) operationCall ).getOutputCollector();
378    
379          Tuple result = operationCall.getContext().result;
380          LinkedHashMap<Tuple, Tuple[]> context = operationCall.getContext().lru;
381    
382          for( Map.Entry<Tuple, Tuple[]> entry : context.entrySet() )
383            completeFunctors( flowProcess, collector, result, entry );
384    
385          operationCall.setContext( null );
386          }
387    
388        private void completeFunctors( FlowProcess flowProcess, TupleEntryCollector outputCollector, Tuple result, Map.Entry<Tuple, Tuple[]> entry )
389          {
390          Tuple[] results = new Tuple[ functors.length + 1 ];
391    
392          results[ 0 ] = entry.getKey();
393    
394          Tuple[] values = entry.getValue();
395    
396          for( int i = 0; i < functors.length; i++ )
397            results[ i + 1 ] = functors[ i ].complete( flowProcess, values[ i ] );
398    
399          TupleViews.reset( result, results );
400    
401          outputCollector.add( result );
402          }
403    
404        @Override
405        public boolean equals( Object object )
406          {
407          if( this == object )
408            return true;
409          if( !( object instanceof CompositeFunction ) )
410            return false;
411          if( !super.equals( object ) )
412            return false;
413    
414          CompositeFunction that = (CompositeFunction) object;
415    
416          if( !Arrays.equals( argumentFields, that.argumentFields ) )
417            return false;
418          if( !Arrays.equals( functorFields, that.functorFields ) )
419            return false;
420          if( !Arrays.equals( functors, that.functors ) )
421            return false;
422          if( groupingFields != null ? !groupingFields.equals( that.groupingFields ) : that.groupingFields != null )
423            return false;
424    
425          return true;
426          }
427    
428        @Override
429        public int hashCode()
430          {
431          int result = super.hashCode();
432          result = 31 * result + ( groupingFields != null ? groupingFields.hashCode() : 0 );
433          result = 31 * result + ( argumentFields != null ? Arrays.hashCode( argumentFields ) : 0 );
434          result = 31 * result + ( functorFields != null ? Arrays.hashCode( functorFields ) : 0 );
435          result = 31 * result + ( functors != null ? Arrays.hashCode( functors ) : 0 );
436          return result;
437          }
438        }
439    
440      /**
441       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
442       *
443       * @param name      of type String
444       * @param threshold of type int
445       */
446      protected AggregateBy( String name, int threshold )
447        {
448        this.name = name;
449        this.threshold = threshold;
450        }
451    
452      /**
453       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
454       *
455       * @param argumentFields of type Fields
456       * @param functor        of type Functor
457       * @param aggregator     of type Aggregator
458       */
459      protected AggregateBy( Fields argumentFields, Functor functor, Aggregator aggregator )
460        {
461        this.argumentFields = Fields.fields( argumentFields );
462        this.functors = new Functor[]{functor};
463        this.aggregators = new Aggregator[]{aggregator};
464        }
465    
466      /**
467       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
468       *
469       * @param pipe           of type Pipe
470       * @param groupingFields of type Fields
471       * @param assemblies     of type CompositeAggregator...
472       */
473      @ConstructorProperties( {"pipe", "groupingFields", "assemblies"} )
474      public AggregateBy( Pipe pipe, Fields groupingFields, AggregateBy... assemblies )
475        {
476        this( null, Pipe.pipes( pipe ), groupingFields, 0, assemblies );
477        }
478    
479      /**
480       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
481       *
482       * @param pipe           of type Pipe
483       * @param groupingFields of type Fields
484       * @param threshold      of type int
485       * @param assemblies     of type CompositeAggregator...
486       */
487      @ConstructorProperties( {"pipe", "groupingFields", "threshold", "assemblies"} )
488      public AggregateBy( Pipe pipe, Fields groupingFields, int threshold, AggregateBy... assemblies )
489        {
490        this( null, Pipe.pipes( pipe ), groupingFields, threshold, assemblies );
491        }
492    
493      /**
494       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
495       *
496       * @param pipe           of type Pipe
497       * @param groupingFields of type Fields
498       * @param threshold      of type int
499       * @param assemblies     of type CompositeAggregator...
500       */
501      @ConstructorProperties( {"name", "pipe", "groupingFields", "threshold", "assemblies"} )
502      public AggregateBy( String name, Pipe pipe, Fields groupingFields, int threshold, AggregateBy... assemblies )
503        {
504        this( name, Pipe.pipes( pipe ), groupingFields, threshold, assemblies );
505        }
506    
507      /**
508       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
509       *
510       * @param name           of type String
511       * @param pipes          of type Pipe[]
512       * @param groupingFields of type Fields
513       * @param assemblies     of type CompositeAggregator...
514       */
515      @ConstructorProperties( {"name", "pipes", "groupingFields", "assemblies"} )
516      public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies )
517        {
518        this( name, pipes, groupingFields, 0, assemblies );
519        }
520    
521      /**
522       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
523       *
524       * @param name           of type String
525       * @param pipes          of type Pipe[]
526       * @param groupingFields of type Fields
527       * @param threshold      of type int
528       * @param assemblies     of type CompositeAggregator...
529       */
530      @ConstructorProperties( {"name", "pipes", "groupingFields", "threshold", "assemblies"} )
531      public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, int threshold, AggregateBy... assemblies )
532        {
533        this( name, threshold );
534    
535        List<Fields> arguments = new ArrayList<Fields>();
536        List<Functor> functors = new ArrayList<Functor>();
537        List<Aggregator> aggregators = new ArrayList<Aggregator>();
538    
539        for( int i = 0; i < assemblies.length; i++ )
540          {
541          AggregateBy assembly = assemblies[ i ];
542    
543          Collections.addAll( arguments, assembly.getArgumentFields() );
544          Collections.addAll( functors, assembly.getFunctors() );
545          Collections.addAll( aggregators, assembly.getAggregators() );
546          }
547    
548        initialize( groupingFields, pipes, arguments.toArray( new Fields[ arguments.size() ] ), functors.toArray( new Functor[ functors.size() ] ), aggregators.toArray( new Aggregator[ aggregators.size() ] ) );
549        }
550    
551      protected AggregateBy( String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, Functor functor, Aggregator aggregator, int threshold )
552        {
553        this( name, threshold );
554        initialize( groupingFields, pipes, argumentFields, functor, aggregator );
555        }
556    
557      protected void initialize( Fields groupingFields, Pipe[] pipes, Fields argumentFields, Functor functor, Aggregator aggregator )
558        {
559        initialize( groupingFields, pipes, Fields.fields( argumentFields ),
560          new Functor[]{functor},
561          new Aggregator[]{aggregator} );
562        }
563    
564      protected void initialize( Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, Functor[] functors, Aggregator[] aggregators )
565        {
566        setPrevious( pipes );
567    
568        this.groupingFields = groupingFields;
569        this.argumentFields = argumentFields;
570        this.functors = functors;
571        this.aggregators = aggregators;
572    
573        verify();
574    
575        Fields sortFields = Fields.copyComparators( Fields.merge( this.argumentFields ), this.argumentFields );
576        Fields argumentSelector = Fields.merge( this.groupingFields, sortFields );
577    
578        if( argumentSelector.equals( Fields.NONE ) )
579          argumentSelector = Fields.ALL;
580    
581        Pipe[] functions = new Pipe[ pipes.length ];
582    
583        CompositeFunction function = new CompositeFunction( this.groupingFields, this.argumentFields, this.functors, threshold );
584    
585        for( int i = 0; i < functions.length; i++ )
586          functions[ i ] = new Each( pipes[ i ], argumentSelector, function, Fields.RESULTS );
587    
588        groupBy = new GroupBy( name, functions, this.groupingFields, sortFields.hasComparators() ? sortFields : null );
589    
590        Pipe pipe = groupBy;
591    
592        for( int i = 0; i < aggregators.length; i++ )
593          pipe = new Every( pipe, this.functors[ i ].getDeclaredFields(), this.aggregators[ i ], Fields.ALL );
594    
595        setTails( pipe );
596        }
597    
598      /** Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns. */
599      protected void verify()
600        {
601    
602        }
603    
604      /**
605       * Method getGroupingFields returns the Fields this instances will be grouping against.
606       *
607       * @return the current grouping fields
608       */
609      public Fields getGroupingFields()
610        {
611        return groupingFields;
612        }
613    
614      /**
615       * Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the
616       * field declaration of the given Aggregator operations.
617       * <p/>
618       * Note the actual Fields values are returned, not planner resolved Fields.
619       *
620       * @return and array of Fields
621       */
622      public Fields[] getFieldDeclarations()
623        {
624        Fields[] fields = new Fields[ this.aggregators.length ];
625    
626        for( int i = 0; i < aggregators.length; i++ )
627          fields[ i ] = aggregators[ i ].getFieldDeclaration();
628    
629        return fields;
630        }
631    
632      protected Fields[] getArgumentFields()
633        {
634        return argumentFields;
635        }
636    
637      protected Functor[] getFunctors()
638        {
639        return functors;
640        }
641    
642      protected Aggregator[] getAggregators()
643        {
644        return aggregators;
645        }
646    
647      /**
648       * Method getGroupBy returns the internal {@link GroupBy} instance so that any custom properties
649       * can be set on it via {@link cascading.pipe.Pipe#getStepConfigDef()}.
650       *
651       * @return GroupBy type
652       */
653      public GroupBy getGroupBy()
654        {
655        return groupBy;
656        }
657    
658      @Property( name = "threshold", visibility = Visibility.PUBLIC )
659      @PropertyDescription( "Threshold of the aggregation." )
660      @PropertyConfigured( value = AGGREGATE_BY_THRESHOLD, defaultValue = "10000" )
661      public int getThreshold()
662        {
663        return threshold;
664        }
665      }