001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    import java.lang.reflect.Type;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.operation.Aggregator;
028    import cascading.operation.AggregatorCall;
029    import cascading.operation.BaseOperation;
030    import cascading.operation.OperationCall;
031    import cascading.pipe.Pipe;
032    import cascading.tuple.Fields;
033    import cascading.tuple.Tuple;
034    import cascading.tuple.TupleEntry;
035    import cascading.tuple.coerce.Coercions;
036    import cascading.tuple.type.CoercibleType;
037    
038    /**
039     * Class AverageBy is used to average values associated with duplicate keys in a tuple stream.
040     * <p/>
041     * Typically finding the average value in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Average}
042     * {@link cascading.operation.Aggregator} operation.
043     * <p/>
044     * If the given {@code averageFields} has an associated type, this type will be used to coerce the resulting average value,
045     * otherwise the result will be a {@link Double}.
046     * <p/>
047     * If {@code include} is {@link Include#NO_NULLS}, {@code null} values will not be included in the average (converted to zero).
048     * By default (and for backwards compatibility) {@code null} values are included, {@link Include#ALL}.
049     * <p/>
050     * This SubAssembly uses the {@link cascading.pipe.assembly.AverageBy.AveragePartials} {@link cascading.pipe.assembly.AggregateBy.Functor}
051     * and private {@link AverageFinal} Aggregator to count and sum as many field values before the GroupBy operator to reduce IO over the network.
052     * <p/>
053     * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
054     * in a much simpler mechanism.
055     * <p/>
056     * The {@code threshold} value tells the underlying AveragePartials functions how many unique key sums and counts to accumulate
057     * in the LRU cache, before emitting the least recently used entry.
058     * <p/>
059     * By default, either the value of {@link #AGGREGATE_BY_THRESHOLD} System property or {@link AggregateBy#DEFAULT_THRESHOLD}
060     * will be used.
061     *
062     * @see cascading.pipe.assembly.AggregateBy
063     */
064    public class AverageBy extends AggregateBy
065      {
066      /** DEFAULT_THRESHOLD */
067      @Deprecated
068      public static final int DEFAULT_THRESHOLD = 10000;
069    
070      public enum Include
071        {
072          ALL,
073          NO_NULLS
074        }
075    
076      /**
077       * Class AveragePartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count and sum observed duplicates from the tuple stream.
078       *
079       * @see cascading.pipe.assembly.AverageBy
080       */
081      public static class AveragePartials implements Functor
082        {
083        private final Fields declaredFields;
084        private final Include include;
085    
086        /**
087         * Constructor AveragePartials creates a new AveragePartials instance.
088         *
089         * @param declaredFields of type Fields
090         */
091        public AveragePartials( Fields declaredFields )
092          {
093          this.declaredFields = declaredFields;
094          this.include = Include.ALL;
095          }
096    
097        public AveragePartials( Fields declaredFields, Include include )
098          {
099          this.declaredFields = declaredFields;
100          this.include = include;
101          }
102    
103        @Override
104        public Fields getDeclaredFields()
105          {
106          Fields sumName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".sum", Double.class );
107          Fields countName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".count", Long.class );
108    
109          return sumName.append( countName );
110          }
111    
112        @Override
113        public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
114          {
115          if( context == null )
116            context = Tuple.size( 2 );
117    
118          if( include == Include.NO_NULLS && args.getObject( 0 ) == null )
119            return context;
120    
121          context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) );
122          context.set( 1, context.getLong( 1 ) + 1 );
123    
124          return context;
125          }
126    
127        @Override
128        public Tuple complete( FlowProcess flowProcess, Tuple context )
129          {
130          return context;
131          }
132        }
133    
134      /**
135       * Class AverageFinal is used to finalize the average operation on the Reduce side of the process. It must be used
136       * in tandem with a {@link AveragePartials} Functor.
137       */
138      public static class AverageFinal extends BaseOperation<AverageFinal.Context> implements Aggregator<AverageFinal.Context>
139        {
140        /** Class Context is used to hold intermediate values. */
141        protected static class Context
142          {
143          long nulls = 0L;
144          double sum = 0.0D;
145          long count = 0L;
146          Type type = Double.class;
147          CoercibleType canonical;
148    
149          Tuple tuple = Tuple.size( 1 );
150    
151          public Context( Fields fieldDeclaration )
152            {
153            if( fieldDeclaration.hasTypes() )
154              this.type = fieldDeclaration.getType( 0 );
155    
156            this.canonical = Coercions.coercibleTypeFor( this.type );
157            }
158    
159          public Context reset()
160            {
161            nulls = 0L;
162            sum = 0.0D;
163            count = 0L;
164            tuple.set( 0, null );
165    
166            return this;
167            }
168    
169          public Tuple result()
170            {
171            // we only saw null from upstream, so return null
172            if( count == 0 && nulls != 0 )
173              return tuple;
174    
175            tuple.set( 0, canonical.canonical( sum / count ) );
176    
177            return tuple;
178            }
179          }
180    
181        /**
182         * Constructs a new instance that returns the average of the values encountered in the given fieldDeclaration field name.
183         *
184         * @param fieldDeclaration of type Fields
185         */
186        public AverageFinal( Fields fieldDeclaration )
187          {
188          super( 2, makeFieldDeclaration( fieldDeclaration ) );
189    
190          if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 )
191            throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() );
192          }
193    
194        private static Fields makeFieldDeclaration( Fields fieldDeclaration )
195          {
196          if( fieldDeclaration.hasTypes() )
197            return fieldDeclaration;
198    
199          return fieldDeclaration.applyTypes( Double.class );
200          }
201    
202        @Override
203        public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
204          {
205          operationCall.setContext( new Context( getFieldDeclaration() ) );
206          }
207    
208        @Override
209        public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
210          {
211          aggregatorCall.getContext().reset();
212          }
213    
214        @Override
215        public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
216          {
217          Context context = aggregatorCall.getContext();
218          TupleEntry arguments = aggregatorCall.getArguments();
219    
220          if( arguments.getObject( 0 ) == null )
221            {
222            context.nulls++;
223            return;
224            }
225    
226          context.sum += arguments.getDouble( 0 );
227          context.count += arguments.getLong( 1 );
228          }
229    
230        @Override
231        public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
232          {
233          aggregatorCall.getOutputCollector().add( aggregatorCall.getContext().result() );
234          }
235        }
236    
237      /////////
238    
239      /**
240       * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy}
241       * instance.
242       *
243       * @param valueField   of type Fields
244       * @param averageField of type Fields
245       */
246      @ConstructorProperties({"valueField", "averageField"})
247      public AverageBy( Fields valueField, Fields averageField )
248        {
249        super( valueField, new AveragePartials( averageField ), new AverageFinal( averageField ) );
250        }
251    
252      /**
253       * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy}
254       * instance.
255       *
256       * @param valueField   of type Fields
257       * @param averageField of type Fields
258       * @param include      of type boolean
259       */
260      @ConstructorProperties({"valueField", "averageField", "include"})
261      public AverageBy( Fields valueField, Fields averageField, Include include )
262        {
263        super( valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ) );
264        }
265    
266      //////////////
267    
268      /**
269       * Constructor AverageBy creates a new AverageBy instance.
270       *
271       * @param pipe           of type Pipe
272       * @param groupingFields of type Fields
273       * @param valueField     of type Fields
274       * @param averageField   of type Fields
275       */
276      @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField"})
277      public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField )
278        {
279        this( null, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
280        }
281    
282      /**
283       * Constructor AverageBy creates a new AverageBy instance.
284       *
285       * @param pipe           of type Pipe
286       * @param groupingFields of type Fields
287       * @param valueField     of type Fields
288       * @param averageField   of type Fields
289       * @param threshold      of type int
290       */
291      @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "threshold"})
292      public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
293        {
294        this( null, pipe, groupingFields, valueField, averageField, threshold );
295        }
296    
297      /**
298       * Constructor AverageBy creates a new AverageBy instance.
299       *
300       * @param name           of type String
301       * @param pipe           of type Pipe
302       * @param groupingFields of type Fields
303       * @param valueField     of type Fields
304       * @param averageField   of type Fields
305       */
306      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField"})
307      public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField )
308        {
309        this( name, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
310        }
311    
312      /**
313       * Constructor AverageBy creates a new AverageBy instance.
314       *
315       * @param name           of type String
316       * @param pipe           of type Pipe
317       * @param groupingFields of type Fields
318       * @param valueField     of type Fields
319       * @param averageField   of type Fields
320       * @param threshold      of type int
321       */
322      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "threshold"})
323      public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
324        {
325        this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, threshold );
326        }
327    
328      /**
329       * Constructor AverageBy creates a new AverageBy instance.
330       *
331       * @param pipes          of type Pipe[]
332       * @param groupingFields of type Fields
333       * @param valueField     of type Fields
334       * @param averageField   of type Fields
335       */
336      @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField"})
337      public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField )
338        {
339        this( null, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
340        }
341    
342      /**
343       * Constructor AverageBy creates a new AverageBy instance.
344       *
345       * @param pipes          of type Pipe[]
346       * @param groupingFields of type Fields
347       * @param valueField     of type Fields
348       * @param averageField   of type Fields
349       * @param threshold      of type int
350       */
351      @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "threshold"})
352      public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
353        {
354        this( null, pipes, groupingFields, valueField, averageField, threshold );
355        }
356    
357      /**
358       * Constructor AverageBy creates a new AverageBy instance.
359       *
360       * @param name           of type String
361       * @param pipes          of type Pipe[]
362       * @param groupingFields of type Fields
363       * @param valueField     of type Fields
364       * @param averageField   of type Fields
365       */
366      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField"})
367      public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField )
368        {
369        this( name, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD );
370        }
371    
372      /**
373       * Constructor AverageBy creates a new AverageBy instance.
374       *
375       * @param name           of type String
376       * @param pipes          of type Pipe[]
377       * @param groupingFields of type Fields
378       * @param valueField     of type Fields
379       * @param averageField   of type Fields
380       * @param threshold      of type int
381       */
382      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "threshold"})
383      public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold )
384        {
385        super( name, pipes, groupingFields, valueField, new AveragePartials( averageField ), new AverageFinal( averageField ), threshold );
386        }
387    
388      /**
389       * Constructor AverageBy creates a new AverageBy instance.
390       *
391       * @param pipe           of type Pipe
392       * @param groupingFields of type Fields
393       * @param valueField     of type Fields
394       * @param averageField   of type Fields
395       * @param include        of type boolean
396       */
397      @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include"})
398      public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include )
399        {
400        this( null, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
401        }
402    
403      /**
404       * Constructor AverageBy creates a new AverageBy instance.
405       *
406       * @param pipe           of type Pipe
407       * @param groupingFields of type Fields
408       * @param valueField     of type Fields
409       * @param averageField   of type Fields
410       * @param include        of type boolean
411       * @param threshold      of type int
412       */
413      @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include", "threshold"})
414      public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
415        {
416        this( null, pipe, groupingFields, valueField, averageField, include, threshold );
417        }
418    
419      /**
420       * Constructor AverageBy creates a new AverageBy instance.
421       *
422       * @param name           of type String
423       * @param pipe           of type Pipe
424       * @param groupingFields of type Fields
425       * @param valueField     of type Fields
426       * @param averageField   of type Fields
427       * @param include        of type boolean
428       */
429      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include"})
430      public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include )
431        {
432        this( name, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
433        }
434    
435      /**
436       * Constructor AverageBy creates a new AverageBy instance.
437       *
438       * @param name           of type String
439       * @param pipe           of type Pipe
440       * @param groupingFields of type Fields
441       * @param valueField     of type Fields
442       * @param averageField   of type Fields
443       * @param include        of type boolean
444       * @param threshold      of type int
445       */
446      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include", "threshold"})
447      public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
448        {
449        this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, include, threshold );
450        }
451    
452      /**
453       * Constructor AverageBy creates a new AverageBy instance.
454       *
455       * @param pipes          of type Pipe[]
456       * @param groupingFields of type Fields
457       * @param valueField     of type Fields
458       * @param averageField   of type Fields
459       * @param include        of type boolean
460       */
461      @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include"})
462      public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include )
463        {
464        this( null, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
465        }
466    
467      /**
468       * Constructor AverageBy creates a new AverageBy instance.
469       *
470       * @param pipes          of type Pipe[]
471       * @param groupingFields of type Fields
472       * @param valueField     of type Fields
473       * @param averageField   of type Fields
474       * @param include        of type boolean
475       * @param threshold      of type int
476       */
477      @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include", "threshold"})
478      public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
479        {
480        this( null, pipes, groupingFields, valueField, averageField, include, threshold );
481        }
482    
483      /**
484       * Constructor AverageBy creates a new AverageBy instance.
485       *
486       * @param name           of type String
487       * @param pipes          of type Pipe[]
488       * @param groupingFields of type Fields
489       * @param valueField     of type Fields
490       * @param averageField   of type Fields
491       * @param include        of type boolean
492       */
493      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include"})
494      public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include )
495        {
496        this( name, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD );
497        }
498    
499      /**
500       * Constructor AverageBy creates a new AverageBy instance.
501       *
502       * @param name           of type String
503       * @param pipes          of type Pipe[]
504       * @param groupingFields of type Fields
505       * @param valueField     of type Fields
506       * @param averageField   of type Fields
507       * @param include        of type boolean
508       * @param threshold      of type int
509       */
510      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include", "threshold"})
511      public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold )
512        {
513        super( name, pipes, groupingFields, valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ), threshold );
514        }
515      }