001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe.assembly;
022    
023    import java.beans.ConstructorProperties;
024    import java.lang.reflect.Type;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.operation.aggregator.Sum;
028    import cascading.pipe.Pipe;
029    import cascading.tuple.Fields;
030    import cascading.tuple.Tuple;
031    import cascading.tuple.TupleEntry;
032    import cascading.tuple.coerce.Coercions;
033    import cascading.tuple.type.CoercibleType;
034    
035    /**
036     * Class SumBy is used to sum values associated with duplicate keys in a tuple stream.
037     * <p/>
038     * Typically finding the sum of field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Sum}
039     * {@link cascading.operation.Aggregator} operation.
040     * <p/>
041     * If all the values to be summed are all {@code null}, the result value is a function of how null is coerced by the
042     * given {@code sumType}. If a primitive type, {@code 0} will be returned. Otherwise {@code null}.
043     * <p/>
044     * This SubAssembly also uses the {@link SumBy.SumPartials} {@link AggregateBy.Functor}
045     * to sum field values before the GroupBy operator to reduce IO over the network.
046     * <p/>
047     * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
048     * in a much simpler mechanism.
049     * <p/>
050     * The {@code threshold} value tells the underlying SumPartials functions how many unique key sums to accumulate
051     * in the LRU cache, before emitting the least recently used entry.
052     * <p/>
053     * By default, either the value of {@link #AGGREGATE_BY_THRESHOLD} System property or {@link AggregateBy#DEFAULT_THRESHOLD}
054     * will be used.
055     *
056     * @see AggregateBy
057     */
058    public class SumBy extends AggregateBy
059      {
060      /** DEFAULT_THRESHOLD */
061      @Deprecated
062      public static final int DEFAULT_THRESHOLD = 10000;
063    
064      /**
065       * Class SumPartials is a {@link AggregateBy.Functor} that is used to sum observed duplicates from the tuple stream.
066       * <p/>
067       * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum}
068       * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values
069       * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
070       *
071       * @see SumBy
072       */
073      public static class SumPartials implements Functor
074        {
075        private final Fields declaredFields;
076        private final Type sumType;
077        private final CoercibleType canonical;
078    
079        /** Constructor SumPartials creates a new SumPartials instance. */
080        public SumPartials( Fields declaredFields )
081          {
082          this.declaredFields = declaredFields;
083    
084          if( !declaredFields.hasTypes() )
085            throw new IllegalArgumentException( "result type must be declared " );
086    
087          this.sumType = declaredFields.getType( 0 );
088    
089          if( declaredFields.size() != 1 )
090            throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
091    
092          this.canonical = Coercions.coercibleTypeFor( this.sumType );
093          }
094    
095        public SumPartials( Fields declaredFields, Class sumType )
096          {
097          this.declaredFields = declaredFields;
098          this.sumType = sumType;
099    
100          if( declaredFields.size() != 1 )
101            throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
102    
103          this.canonical = Coercions.coercibleTypeFor( this.sumType );
104          }
105    
106        @Override
107        public Fields getDeclaredFields()
108          {
109          return declaredFields;
110          }
111    
112        @Override
113        public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
114          {
115          if( context == null )
116            return args.getTupleCopy();
117          else if( args.getObject( 0 ) == null )
118            return context;
119    
120          context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) );
121    
122          return context;
123          }
124    
125        @Override
126        public Tuple complete( FlowProcess flowProcess, Tuple context )
127          {
128          context.set( 0, canonical.canonical( context.getObject( 0 ) ) );
129    
130          return context;
131          }
132        }
133    
134      /**
135       * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy}
136       * instance.
137       *
138       * @param valueField of type Fields
139       * @param sumField   of type Fields
140       */
141      @ConstructorProperties({"valueField", "sumField"})
142      public SumBy( Fields valueField, Fields sumField )
143        {
144        super( valueField, new SumPartials( sumField ), new Sum( sumField ) );
145        }
146    
147      //////////////
148    
149      /**
150       * Constructor SumBy creates a new SumBy instance.
151       *
152       * @param pipe           of type Pipe
153       * @param groupingFields of type Fields
154       * @param valueField     of type Fields
155       * @param sumField       of type Fields
156       */
157      @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField"})
158      public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField )
159        {
160        this( null, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
161        }
162    
163      /**
164       * Constructor SumBy creates a new SumBy instance.
165       *
166       * @param pipe           of type Pipe
167       * @param groupingFields of type Fields
168       * @param valueField     of type Fields
169       * @param sumField       of type Fields
170       * @param threshold      of type int
171       */
172      @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "threshold"})
173      public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
174        {
175        this( null, pipe, groupingFields, valueField, sumField, threshold );
176        }
177    
178      /**
179       * Constructor SumBy creates a new SumBy instance.
180       *
181       * @param name           of type String
182       * @param pipe           of type Pipe
183       * @param groupingFields of type Fields
184       * @param valueField     of type Fields
185       * @param sumField       of type Fields
186       */
187      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField"})
188      public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField )
189        {
190        this( name, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
191        }
192    
193      /**
194       * Constructor SumBy creates a new SumBy instance.
195       *
196       * @param name           of type String
197       * @param pipe           of type Pipe
198       * @param groupingFields of type Fields
199       * @param valueField     of type Fields
200       * @param sumField       of type Fields
201       * @param threshold      of type int
202       */
203      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "threshold"})
204      public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
205        {
206        this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, threshold );
207        }
208    
209      /**
210       * Constructor SumBy creates a new SumBy instance.
211       *
212       * @param pipes          of type Pipe[]
213       * @param groupingFields of type Fields
214       * @param valueField     of type Fields
215       * @param sumField       of type Fields
216       */
217      @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField"})
218      public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField )
219        {
220        this( null, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
221        }
222    
223      /**
224       * Constructor SumBy creates a new SumBy instance.
225       *
226       * @param pipes          of type Pipe[]
227       * @param groupingFields of type Fields
228       * @param valueField     of type Fields
229       * @param sumField       of type Fields
230       * @param threshold      of type int
231       */
232      @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "threshold"})
233      public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
234        {
235        this( null, pipes, groupingFields, valueField, sumField, threshold );
236        }
237    
238      /**
239       * Constructor SumBy creates a new SumBy instance.
240       *
241       * @param name           of type String
242       * @param pipes          of type Pipe[]
243       * @param groupingFields of type Fields
244       * @param valueField     of type Fields
245       * @param sumField       of type Fields
246       */
247      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField"})
248      public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField )
249        {
250        this( name, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
251        }
252    
253      /**
254       * Constructor SumBy creates a new SumBy instance.
255       *
256       * @param name           of type String
257       * @param pipes          of type Pipe[]
258       * @param groupingFields of type Fields
259       * @param valueField     of type Fields
260       * @param sumField       of type Fields
261       * @param threshold      of type int
262       */
263      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "threshold"})
264      public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
265        {
266        super( name, pipes, groupingFields, valueField, new SumPartials( sumField ), new Sum( sumField ), threshold );
267        }
268    
269    ///////////
270    
271      /**
272       * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy}
273       * instance.
274       *
275       * @param valueField of type Fields
276       * @param sumField   of type Fields
277       * @param sumType    of type Class
278       */
279      @ConstructorProperties({"valueField", "sumField", "sumType"})
280      public SumBy( Fields valueField, Fields sumField, Class sumType )
281        {
282        super( valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ) );
283        }
284    
285    //////////////
286    
287      /**
288       * Constructor SumBy creates a new SumBy instance.
289       *
290       * @param pipe           of type Pipe
291       * @param groupingFields of type Fields
292       * @param valueField     of type Fields
293       * @param sumField       of type Fields
294       * @param sumType        of type Class
295       */
296      @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType"})
297      public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
298        {
299        this( null, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
300        }
301    
302      /**
303       * Constructor SumBy creates a new SumBy instance.
304       *
305       * @param pipe           of type Pipe
306       * @param groupingFields of type Fields
307       * @param valueField     of type Fields
308       * @param sumField       of type Fields
309       * @param sumType        of type Class
310       * @param threshold      of type int
311       */
312      @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
313      public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
314        {
315        this( null, pipe, groupingFields, valueField, sumField, sumType, threshold );
316        }
317    
318      /**
319       * Constructor SumBy creates a new SumBy instance.
320       *
321       * @param name           of type String
322       * @param pipe           of type Pipe
323       * @param groupingFields of type Fields
324       * @param valueField     of type Fields
325       * @param sumField       of type Fields
326       * @param sumType        of type Class
327       */
328      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType"})
329      public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
330        {
331        this( name, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
332        }
333    
334      /**
335       * Constructor SumBy creates a new SumBy instance.
336       *
337       * @param name           of type String
338       * @param pipe           of type Pipe
339       * @param groupingFields of type Fields
340       * @param valueField     of type Fields
341       * @param sumField       of type Fields
342       * @param sumType        of type Class
343       * @param threshold      of type int
344       */
345      @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
346      public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
347        {
348        this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, sumType, threshold );
349        }
350    
351      /**
352       * Constructor SumBy creates a new SumBy instance.
353       *
354       * @param pipes          of type Pipe[]
355       * @param groupingFields of type Fields
356       * @param valueField     of type Fields
357       * @param sumField       of type Fields
358       * @param sumType        of type Class
359       */
360      @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType"})
361      public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
362        {
363        this( null, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
364        }
365    
366      /**
367       * Constructor SumBy creates a new SumBy instance.
368       *
369       * @param pipes          of type Pipe[]
370       * @param groupingFields of type Fields
371       * @param valueField     of type Fields
372       * @param sumField       of type Fields
373       * @param sumType        of type Class
374       * @param threshold      of type int
375       */
376      @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
377      public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
378        {
379        this( null, pipes, groupingFields, valueField, sumField, sumType, threshold );
380        }
381    
382      /**
383       * Constructor SumBy creates a new SumBy instance.
384       *
385       * @param name           of type String
386       * @param pipes          of type Pipe[]
387       * @param groupingFields of type Fields
388       * @param valueField     of type Fields
389       * @param sumField       of type Fields
390       * @param sumType        of type Class
391       */
392      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType"})
393      public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
394        {
395        this( name, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
396        }
397    
398      /**
399       * Constructor SumBy creates a new SumBy instance.
400       *
401       * @param name           of type String
402       * @param pipes          of type Pipe[]
403       * @param groupingFields of type Fields
404       * @param valueField     of type Fields
405       * @param sumField       of type Fields
406       * @param sumType        of type Class
407       * @param threshold      of type int
408       */
409      @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
410      public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
411        {
412        super( name, pipes, groupingFields, valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ), threshold );
413        }
414      }