001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    import java.io.Serializable;
025    import java.util.ArrayList;
026    import java.util.Arrays;
027    import java.util.Collections;
028    import java.util.LinkedHashMap;
029    import java.util.List;
030    import java.util.Map;
031    
032    import cascading.flow.FlowProcess;
033    import cascading.operation.Aggregator;
034    import cascading.operation.BaseOperation;
035    import cascading.operation.Function;
036    import cascading.operation.FunctionCall;
037    import cascading.operation.OperationCall;
038    import cascading.pipe.Each;
039    import cascading.pipe.Every;
040    import cascading.pipe.GroupBy;
041    import cascading.pipe.Pipe;
042    import cascading.pipe.SubAssembly;
043    import cascading.tuple.Fields;
044    import cascading.tuple.Tuple;
045    import cascading.tuple.TupleEntry;
046    import cascading.tuple.TupleEntryCollector;
047    import cascading.tuple.util.TupleViews;
048    import org.slf4j.Logger;
049    import org.slf4j.LoggerFactory;
050    
051    /**
052     * Class AggregateBy is a {@link SubAssembly} that serves two roles for handling aggregate operations.
053     * <p/>
054     * The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the
055     * Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and
056     * completed Reduce side. Summing is associative and commutative.
057     * <p/>
058     * AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting
059     * would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be
060     * transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over
061     * two, and a hack)
062     * <p/>
063     * Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized,
064     * deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of
065     * memory and a little or no IO.
066     * <p/>
067     * Further, Combiners are limited to only associative/commutative operations.
068     * <p/>
069     * Additionally the Cascading planner can move the Map side optimization
070     * to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which
071     * is over HDFS).
072     * <p/>
073     * The second role of the AggregateBy class is to allow for composition of AggregateBy
074     * sub-classes. That is, {@link SumBy} and {@link CountBy} AggregateBy sub-classes can be performed
075     * in parallel on the same grouping keys.
076     * </p>
077     * Custom AggregateBy classes can be created by sub-classing this class and implementing a special
078     * {@link Functor} for use on the Map side. Multiple Functor instances are managed by the {@link CompositeFunction}
079     * class allowing them all to share the same LRU value map for more efficiency.
080     * <p/>
081     * AggregateBy instances return {@code argumentFields} which are used internally to control the values passed to
082     * internal Functor instances. If any argumentFields also have {@link java.util.Comparator}s, they will be used
083     * to for secondary sorting (see {@link GroupBy} {@code sortFields}. This feature is used by {@link FirstBy} to
084     * control which Tuple is seen first for a grouping.
085     * <p/>
086     * <p/>
087     * To tune the LRU, set the {@code threshold} value to a high enough value to utilize available memory. Or set a
088     * default value via the {@link #AGGREGATE_BY_THRESHOLD} property. The current default ({@link CompositeFunction#DEFAULT_THRESHOLD})
089     * is {@code 10, 000} unique keys. Note "flushes" from the LRU will be logged in threshold increments along with memory
090     * information.
091     * <p/>
092     * Note using a AggregateBy instance automatically inserts a {@link GroupBy} into the resulting {@link cascading.flow.Flow}.
093     * And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy.
094     * <p/>
095     * Also note that {@link Unique} is not a CompositeAggregator and is slightly more optimized internally.
096     * <p/>
097     * Keep in mind the {@link cascading.tuple.Hasher} interface is not honored here (for storing keys in the cache). Thus
098     * arrays of primitives and object, like {@code byte[]} will not be properly stored. This is a known issue and will
099     * be resolved in a future release.
100     *
101     * @see SumBy
102     * @see CountBy
103     * @see Unique
104     */
105    public class AggregateBy extends SubAssembly
106      {
107      private static final Logger LOG = LoggerFactory.getLogger( AggregateBy.class );
108    
109      public static final int USE_DEFAULT_THRESHOLD = 0;
110      public static final int DEFAULT_THRESHOLD = CompositeFunction.DEFAULT_THRESHOLD;
111      public static final String AGGREGATE_BY_THRESHOLD = "cascading.aggregateby.threshold";
112    
113      private String name;
114      private int threshold;
115      private Fields groupingFields;
116      private Fields[] argumentFields;
117      private Functor[] functors;
118      private Aggregator[] aggregators;
119      private transient GroupBy groupBy;
120    
121      public enum Flush
122        {
123          Num_Keys_Flushed
124        }
125    
126      /**
127       * Interface Functor provides a means to create a simple function for use with the {@link CompositeFunction} class.
128       * <p/>
129       * Note the {@link FlowProcess} argument provides access to the underlying properties and counter APIs.
130       */
131      public interface Functor extends Serializable
132        {
133        /**
134         * Method getDeclaredFields returns the declaredFields of this Functor object.
135         *
136         * @return the declaredFields (type Fields) of this Functor object.
137         */
138        Fields getDeclaredFields();
139    
140        /**
141         * Method aggregate operates on the given args in tandem (optionally) with the given context values.
142         * <p/>
143         * The context argument is the result of the previous call to this method. Use it to store values between aggregate
144         * calls (the current count, or sum of the args).
145         * <p/>
146         * On the very first invocation of aggregate for a given grouping key, context will be {@code null}. All subsequent
147         * invocations context will be the value returned on the previous invocation.
148         *
149         * @param flowProcess of type FlowProcess
150         * @param args        of type TupleEntry
151         * @param context     of type Tuple   @return Tuple
152         */
153        Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context );
154    
155        /**
156         * Method complete allows the final aggregate computation to be performed before the return value is collected.
157         * <p/>
158         * The number of values in the returned {@link Tuple} instance must match the number of declaredFields.
159         * <p/>
160         * It is safe to return the context object as the result value.
161         *
162         * @param flowProcess of type FlowProcess
163         * @param context     of type Tuple  @return Tuple
164         */
165        Tuple complete( FlowProcess flowProcess, Tuple context );
166        }
167    
168      /**
169       * Class CompositeFunction takes multiple Functor instances and manages them as a single {@link Function}.
170       *
171       * @see Functor
172       */
173      public static class CompositeFunction extends BaseOperation<CompositeFunction.Context> implements Function<CompositeFunction.Context>
174        {
175        public static final int DEFAULT_THRESHOLD = 10000;
176    
177        private int threshold = 0;
178        private final Fields groupingFields;
179        private final Fields[] argumentFields;
180        private final Fields[] functorFields;
181        private final Functor[] functors;
182    
183        public static class Context
184          {
185          LinkedHashMap<Tuple, Tuple[]> lru;
186          TupleEntry[] arguments;
187          Tuple result;
188          }
189    
190        /**
191         * Constructor CompositeFunction creates a new CompositeFunction instance.
192         *
193         * @param groupingFields of type Fields
194         * @param argumentFields of type Fields
195         * @param functor        of type Functor
196         * @param threshold      of type int
197         */
198        public CompositeFunction( Fields groupingFields, Fields argumentFields, Functor functor, int threshold )
199          {
200          this( groupingFields, Fields.fields( argumentFields ), new Functor[]{functor}, threshold );
201          }
202    
203        /**
204         * Constructor CompositeFunction creates a new CompositeFunction instance.
205         *
206         * @param groupingFields of type Fields
207         * @param argumentFields of type Fields[]
208         * @param functors       of type Functor[]
209         * @param threshold      of type int
210         */
211        public CompositeFunction( Fields groupingFields, Fields[] argumentFields, Functor[] functors, int threshold )
212          {
213          super( getFields( groupingFields, functors ) ); // todo: groupingFields should lookup incoming type information
214          this.groupingFields = groupingFields;
215          this.argumentFields = argumentFields;
216          this.functors = functors;
217          this.threshold = threshold;
218    
219          this.functorFields = new Fields[ functors.length ];
220    
221          for( int i = 0; i < functors.length; i++ )
222            this.functorFields[ i ] = functors[ i ].getDeclaredFields();
223          }
224    
225        private static Fields getFields( Fields groupingFields, Functor[] functors )
226          {
227          Fields fields = groupingFields;
228    
229          for( Functor functor : functors )
230            fields = fields.append( functor.getDeclaredFields() );
231    
232          return fields;
233          }
234    
235        @Override
236        public void prepare( final FlowProcess flowProcess, final OperationCall<CompositeFunction.Context> operationCall )
237          {
238          if( threshold == 0 )
239            {
240            Integer value = flowProcess.getIntegerProperty( AGGREGATE_BY_THRESHOLD );
241    
242            if( value != null && value > 0 )
243              threshold = value;
244            else
245              threshold = DEFAULT_THRESHOLD;
246            }
247    
248          LOG.info( "using threshold value: {}", threshold );
249    
250          Fields[] fields = new Fields[ functors.length + 1 ];
251    
252          fields[ 0 ] = groupingFields;
253    
254          for( int i = 0; i < functors.length; i++ )
255            fields[ i + 1 ] = functors[ i ].getDeclaredFields();
256    
257          final Context context = new Context();
258    
259          context.arguments = new TupleEntry[ functors.length ];
260    
261          for( int i = 0; i < context.arguments.length; i++ )
262            {
263            Fields resolvedArgumentFields = operationCall.getArgumentFields();
264    
265            int[] pos;
266    
267            if( argumentFields[ i ].isAll() )
268              pos = resolvedArgumentFields.getPos();
269            else
270              pos = resolvedArgumentFields.getPos( argumentFields[ i ] ); // returns null if selector is ALL
271    
272            Tuple narrow = TupleViews.createNarrow( pos );
273    
274            Fields currentFields;
275    
276            if( this.argumentFields[ i ].isSubstitution() )
277              currentFields = resolvedArgumentFields.select( this.argumentFields[ i ] ); // attempt to retain comparator
278            else
279              currentFields = Fields.asDeclaration( this.argumentFields[ i ] );
280    
281            context.arguments[ i ] = new TupleEntry( currentFields, narrow );
282            }
283    
284          context.result = TupleViews.createComposite( fields );
285    
286          context.lru = new LinkedHashMap<Tuple, Tuple[]>( threshold, 0.75f, true )
287          {
288          long flushes = 0;
289    
290          @Override
291          protected boolean removeEldestEntry( Map.Entry<Tuple, Tuple[]> eldest )
292            {
293            boolean doRemove = size() > threshold;
294    
295            if( doRemove )
296              {
297              completeFunctors( flowProcess, ( (FunctionCall) operationCall ).getOutputCollector(), context.result, eldest );
298              flowProcess.increment( Flush.Num_Keys_Flushed, 1 );
299    
300              if( flushes % threshold == 0 ) // every multiple, write out data
301                {
302                Runtime runtime = Runtime.getRuntime();
303                long freeMem = runtime.freeMemory() / 1024 / 1024;
304                long maxMem = runtime.maxMemory() / 1024 / 1024;
305                long totalMem = runtime.totalMemory() / 1024 / 1024;
306    
307                LOG.info( "flushed keys num times: {}, with threshold: {}", flushes + 1, threshold );
308                LOG.info( "mem on flush (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem );
309    
310                float percent = (float) totalMem / (float) maxMem;
311    
312                if( percent < 0.80F )
313                  LOG.info( "total mem is {}% of max mem, to better utilize unused memory consider increasing current LRU threshold with system property \"{}\"", (int) ( percent * 100.0F ), AGGREGATE_BY_THRESHOLD );
314                }
315    
316              flushes++;
317              }
318    
319            return doRemove;
320            }
321          };
322    
323          operationCall.setContext( context );
324          }
325    
326        @Override
327        public void operate( FlowProcess flowProcess, FunctionCall<CompositeFunction.Context> functionCall )
328          {
329          TupleEntry arguments = functionCall.getArguments();
330          Tuple key = arguments.selectTupleCopy( groupingFields );
331    
332          Context context = functionCall.getContext();
333          Tuple[] functorContext = context.lru.get( key );
334    
335          if( functorContext == null )
336            {
337            functorContext = new Tuple[ functors.length ];
338            context.lru.put( key, functorContext );
339            }
340    
341          for( int i = 0; i < functors.length; i++ )
342            {
343            TupleViews.reset( context.arguments[ i ].getTuple(), arguments.getTuple() );
344            functorContext[ i ] = functors[ i ].aggregate( flowProcess, context.arguments[ i ], functorContext[ i ] );
345            }
346          }
347    
348        @Override
349        public void flush( FlowProcess flowProcess, OperationCall<CompositeFunction.Context> operationCall )
350          {
351          // need to drain context
352          TupleEntryCollector collector = ( (FunctionCall) operationCall ).getOutputCollector();
353    
354          Tuple result = operationCall.getContext().result;
355          LinkedHashMap<Tuple, Tuple[]> context = operationCall.getContext().lru;
356    
357          for( Map.Entry<Tuple, Tuple[]> entry : context.entrySet() )
358            completeFunctors( flowProcess, collector, result, entry );
359    
360          operationCall.setContext( null );
361          }
362    
363        private void completeFunctors( FlowProcess flowProcess, TupleEntryCollector outputCollector, Tuple result, Map.Entry<Tuple, Tuple[]> entry )
364          {
365          Tuple[] results = new Tuple[ functors.length + 1 ];
366    
367          results[ 0 ] = entry.getKey();
368    
369          Tuple[] values = entry.getValue();
370    
371          for( int i = 0; i < functors.length; i++ )
372            results[ i + 1 ] = functors[ i ].complete( flowProcess, values[ i ] );
373    
374          TupleViews.reset( result, results );
375    
376          outputCollector.add( result );
377          }
378    
379        @Override
380        public boolean equals( Object object )
381          {
382          if( this == object )
383            return true;
384          if( !( object instanceof CompositeFunction ) )
385            return false;
386          if( !super.equals( object ) )
387            return false;
388    
389          CompositeFunction that = (CompositeFunction) object;
390    
391          if( !Arrays.equals( argumentFields, that.argumentFields ) )
392            return false;
393          if( !Arrays.equals( functorFields, that.functorFields ) )
394            return false;
395          if( !Arrays.equals( functors, that.functors ) )
396            return false;
397          if( groupingFields != null ? !groupingFields.equals( that.groupingFields ) : that.groupingFields != null )
398            return false;
399    
400          return true;
401          }
402    
403        @Override
404        public int hashCode()
405          {
406          int result = super.hashCode();
407          result = 31 * result + ( groupingFields != null ? groupingFields.hashCode() : 0 );
408          result = 31 * result + ( argumentFields != null ? Arrays.hashCode( argumentFields ) : 0 );
409          result = 31 * result + ( functorFields != null ? Arrays.hashCode( functorFields ) : 0 );
410          result = 31 * result + ( functors != null ? Arrays.hashCode( functors ) : 0 );
411          return result;
412          }
413        }
414    
415      /**
416       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
417       *
418       * @param name      of type String
419       * @param threshold of type int
420       */
421      protected AggregateBy( String name, int threshold )
422        {
423        this.name = name;
424        this.threshold = threshold;
425        }
426    
427      /**
428       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
429       *
430       * @param argumentFields of type Fields
431       * @param functor        of type Functor
432       * @param aggregator     of type Aggregator
433       */
434      protected AggregateBy( Fields argumentFields, Functor functor, Aggregator aggregator )
435        {
436        this.argumentFields = Fields.fields( argumentFields );
437        this.functors = new Functor[]{functor};
438        this.aggregators = new Aggregator[]{aggregator};
439        }
440    
441      /**
442       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
443       *
444       * @param pipe           of type Pipe
445       * @param groupingFields of type Fields
446       * @param assemblies     of type CompositeAggregator...
447       */
448      @ConstructorProperties({"pipe", "groupingFields", "assemblies"})
449      public AggregateBy( Pipe pipe, Fields groupingFields, AggregateBy... assemblies )
450        {
451        this( null, Pipe.pipes( pipe ), groupingFields, 0, assemblies );
452        }
453    
454      /**
455       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
456       *
457       * @param pipe           of type Pipe
458       * @param groupingFields of type Fields
459       * @param threshold      of type int
460       * @param assemblies     of type CompositeAggregator...
461       */
462      @ConstructorProperties({"pipe", "groupingFields", "threshold", "assemblies"})
463      public AggregateBy( Pipe pipe, Fields groupingFields, int threshold, AggregateBy... assemblies )
464        {
465        this( null, Pipe.pipes( pipe ), groupingFields, threshold, assemblies );
466        }
467    
468      /**
469       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
470       *
471       * @param pipe           of type Pipe
472       * @param groupingFields of type Fields
473       * @param threshold      of type int
474       * @param assemblies     of type CompositeAggregator...
475       */
476      @ConstructorProperties({"name", "pipe", "groupingFields", "threshold", "assemblies"})
477      public AggregateBy( String name, Pipe pipe, Fields groupingFields, int threshold, AggregateBy... assemblies )
478        {
479        this( name, Pipe.pipes( pipe ), groupingFields, threshold, assemblies );
480        }
481    
482      /**
483       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
484       *
485       * @param name           of type String
486       * @param pipes          of type Pipe[]
487       * @param groupingFields of type Fields
488       * @param assemblies     of type CompositeAggregator...
489       */
490      @ConstructorProperties({"name", "pipes", "groupingFields", "assemblies"})
491      public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies )
492        {
493        this( name, pipes, groupingFields, 0, assemblies );
494        }
495    
496      /**
497       * Constructor CompositeAggregator creates a new CompositeAggregator instance.
498       *
499       * @param name           of type String
500       * @param pipes          of type Pipe[]
501       * @param groupingFields of type Fields
502       * @param threshold      of type int
503       * @param assemblies     of type CompositeAggregator...
504       */
505      @ConstructorProperties({"name", "pipes", "groupingFields", "threshold", "assemblies"})
506      public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, int threshold, AggregateBy... assemblies )
507        {
508        this( name, threshold );
509    
510        List<Fields> arguments = new ArrayList<Fields>();
511        List<Functor> functors = new ArrayList<Functor>();
512        List<Aggregator> aggregators = new ArrayList<Aggregator>();
513    
514        for( int i = 0; i < assemblies.length; i++ )
515          {
516          AggregateBy assembly = assemblies[ i ];
517    
518          Collections.addAll( arguments, assembly.getArgumentFields() );
519          Collections.addAll( functors, assembly.getFunctors() );
520          Collections.addAll( aggregators, assembly.getAggregators() );
521          }
522    
523        initialize( groupingFields, pipes, arguments.toArray( new Fields[ arguments.size() ] ), functors.toArray( new Functor[ functors.size() ] ), aggregators.toArray( new Aggregator[ aggregators.size() ] ) );
524        }
525    
526      protected AggregateBy( String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, Functor functor, Aggregator aggregator, int threshold )
527        {
528        this( name, threshold );
529        initialize( groupingFields, pipes, argumentFields, functor, aggregator );
530        }
531    
532      protected void initialize( Fields groupingFields, Pipe[] pipes, Fields argumentFields, Functor functor, Aggregator aggregator )
533        {
534        initialize( groupingFields, pipes, Fields.fields( argumentFields ),
535          new Functor[]{functor},
536          new Aggregator[]{aggregator} );
537        }
538    
539      protected void initialize( Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, Functor[] functors, Aggregator[] aggregators )
540        {
541        setPrevious( pipes );
542    
543        this.groupingFields = groupingFields;
544        this.argumentFields = argumentFields;
545        this.functors = functors;
546        this.aggregators = aggregators;
547    
548        verify();
549    
550        Fields sortFields = Fields.copyComparators( Fields.merge( this.argumentFields ), this.argumentFields );
551        Fields argumentSelector = Fields.merge( this.groupingFields, sortFields );
552    
553        if( argumentSelector.equals( Fields.NONE ) )
554          argumentSelector = Fields.ALL;
555    
556        Pipe[] functions = new Pipe[ pipes.length ];
557    
558        CompositeFunction function = new CompositeFunction( this.groupingFields, this.argumentFields, this.functors, threshold );
559    
560        for( int i = 0; i < functions.length; i++ )
561          functions[ i ] = new Each( pipes[ i ], argumentSelector, function, Fields.RESULTS );
562    
563        groupBy = new GroupBy( name, functions, this.groupingFields, sortFields.hasComparators() ? sortFields : null );
564    
565        Pipe pipe = groupBy;
566    
567        for( int i = 0; i < aggregators.length; i++ )
568          pipe = new Every( pipe, this.functors[ i ].getDeclaredFields(), this.aggregators[ i ], Fields.ALL );
569    
570        setTails( pipe );
571        }
572    
573      /** Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns. */
574      protected void verify()
575        {
576    
577        }
578    
579      /**
580       * Method getGroupingFields returns the Fields this instances will be grouping against.
581       *
582       * @return the current grouping fields
583       */
584      public Fields getGroupingFields()
585        {
586        return groupingFields;
587        }
588    
589      /**
590       * Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the
591       * field declaration of the given Aggregator operations.
592       * <p/>
593       * Note the actual Fields values are returned, not planner resolved Fields.
594       *
595       * @return and array of Fields
596       */
597      public Fields[] getFieldDeclarations()
598        {
599        Fields[] fields = new Fields[ this.aggregators.length ];
600    
601        for( int i = 0; i < aggregators.length; i++ )
602          fields[ i ] = aggregators[ i ].getFieldDeclaration();
603    
604        return fields;
605        }
606    
607      protected Fields[] getArgumentFields()
608        {
609        return argumentFields;
610        }
611    
612      protected Functor[] getFunctors()
613        {
614        return functors;
615        }
616    
617      protected Aggregator[] getAggregators()
618        {
619        return aggregators;
620        }
621    
622      /**
623       * Method getGroupBy returns the internal {@link GroupBy} instance so that any custom properties
624       * can be set on it via {@link cascading.pipe.Pipe#getStepConfigDef()}.
625       *
626       * @return GroupBy type
627       */
628      public GroupBy getGroupBy()
629        {
630        return groupBy;
631        }
632      }