001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe.assembly;
022
023import java.beans.ConstructorProperties;
024import java.lang.reflect.Type;
025
026import cascading.flow.FlowProcess;
027import cascading.operation.aggregator.Sum;
028import cascading.pipe.Pipe;
029import cascading.tuple.Fields;
030import cascading.tuple.Tuple;
031import cascading.tuple.TupleEntry;
032import cascading.tuple.coerce.Coercions;
033import cascading.tuple.type.CoercibleType;
034
035/**
036 * Class SumBy is used to sum values associated with duplicate keys in a tuple stream.
037 * <p/>
038 * Typically finding the sum of field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Sum}
039 * {@link cascading.operation.Aggregator} operation.
040 * <p/>
041 * If all the values to be summed are all {@code null}, the result value is a function of how null is coerced by the
042 * given {@code sumType}. If a primitive type, {@code 0} will be returned. Otherwise {@code null}.
043 * <p/>
044 * This SubAssembly also uses the {@link SumBy.SumPartials} {@link AggregateBy.Functor}
045 * to sum field values before the GroupBy operator to reduce IO over the network.
046 * <p/>
047 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results
048 * in a much simpler mechanism.
049 * <p/>
050 * The {@code threshold} value tells the underlying SumPartials functions how many unique key sums to accumulate
051 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is
052 * bounded by the size of your map task JVM and the typical size of each group key.
053 * <p/>
054 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property
055 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used.
056 *
057 * @see AggregateBy
058 */
059public class SumBy extends AggregateBy
060  {
061  /**
062   * Class SumPartials is a {@link AggregateBy.Functor} that is used to sum observed duplicates from the tuple stream.
063   * <p/>
064   * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum}
065   * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values
066   * as possible before the intermediate {@link cascading.pipe.GroupBy} operator.
067   *
068   * @see SumBy
069   */
070  public static class SumPartials implements Functor
071    {
072    private final Fields declaredFields;
073    private final Type sumType;
074    private final CoercibleType canonical;
075
076    /** Constructor SumPartials creates a new SumPartials instance. */
077    public SumPartials( Fields declaredFields )
078      {
079      this.declaredFields = declaredFields;
080
081      if( !declaredFields.hasTypes() )
082        throw new IllegalArgumentException( "result type must be declared " );
083
084      this.sumType = declaredFields.getType( 0 );
085
086      if( declaredFields.size() != 1 )
087        throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
088
089      this.canonical = Coercions.coercibleTypeFor( this.sumType );
090      }
091
092    public SumPartials( Fields declaredFields, Class sumType )
093      {
094      this.declaredFields = declaredFields;
095      this.sumType = sumType;
096
097      if( declaredFields.size() != 1 )
098        throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields );
099
100      this.canonical = Coercions.coercibleTypeFor( this.sumType );
101      }
102
103    @Override
104    public Fields getDeclaredFields()
105      {
106      return declaredFields;
107      }
108
109    @Override
110    public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context )
111      {
112      if( context == null )
113        return args.getTupleCopy();
114      else if( args.getObject( 0 ) == null )
115        return context;
116
117      context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) );
118
119      return context;
120      }
121
122    @Override
123    public Tuple complete( FlowProcess flowProcess, Tuple context )
124      {
125      context.set( 0, canonical.canonical( context.getObject( 0 ) ) );
126
127      return context;
128      }
129    }
130
131  /**
132   * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy}
133   * instance.
134   *
135   * @param valueField of type Fields
136   * @param sumField   of type Fields
137   */
138  @ConstructorProperties({"valueField", "sumField"})
139  public SumBy( Fields valueField, Fields sumField )
140    {
141    super( valueField, new SumPartials( sumField ), new Sum( sumField ) );
142    }
143
144  //////////////
145
146  /**
147   * Constructor SumBy creates a new SumBy instance.
148   *
149   * @param pipe           of type Pipe
150   * @param groupingFields of type Fields
151   * @param valueField     of type Fields
152   * @param sumField       of type Fields
153   */
154  @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField"})
155  public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField )
156    {
157    this( null, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
158    }
159
160  /**
161   * Constructor SumBy creates a new SumBy instance.
162   *
163   * @param pipe           of type Pipe
164   * @param groupingFields of type Fields
165   * @param valueField     of type Fields
166   * @param sumField       of type Fields
167   * @param threshold      of type int
168   */
169  @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "threshold"})
170  public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
171    {
172    this( null, pipe, groupingFields, valueField, sumField, threshold );
173    }
174
175  /**
176   * Constructor SumBy creates a new SumBy instance.
177   *
178   * @param name           of type String
179   * @param pipe           of type Pipe
180   * @param groupingFields of type Fields
181   * @param valueField     of type Fields
182   * @param sumField       of type Fields
183   */
184  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField"})
185  public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField )
186    {
187    this( name, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
188    }
189
190  /**
191   * Constructor SumBy creates a new SumBy instance.
192   *
193   * @param name           of type String
194   * @param pipe           of type Pipe
195   * @param groupingFields of type Fields
196   * @param valueField     of type Fields
197   * @param sumField       of type Fields
198   * @param threshold      of type int
199   */
200  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "threshold"})
201  public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
202    {
203    this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, threshold );
204    }
205
206  /**
207   * Constructor SumBy creates a new SumBy instance.
208   *
209   * @param pipes          of type Pipe[]
210   * @param groupingFields of type Fields
211   * @param valueField     of type Fields
212   * @param sumField       of type Fields
213   */
214  @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField"})
215  public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField )
216    {
217    this( null, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
218    }
219
220  /**
221   * Constructor SumBy creates a new SumBy instance.
222   *
223   * @param pipes          of type Pipe[]
224   * @param groupingFields of type Fields
225   * @param valueField     of type Fields
226   * @param sumField       of type Fields
227   * @param threshold      of type int
228   */
229  @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "threshold"})
230  public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
231    {
232    this( null, pipes, groupingFields, valueField, sumField, threshold );
233    }
234
235  /**
236   * Constructor SumBy creates a new SumBy instance.
237   *
238   * @param name           of type String
239   * @param pipes          of type Pipe[]
240   * @param groupingFields of type Fields
241   * @param valueField     of type Fields
242   * @param sumField       of type Fields
243   */
244  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField"})
245  public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField )
246    {
247    this( name, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD );
248    }
249
250  /**
251   * Constructor SumBy creates a new SumBy instance.
252   *
253   * @param name           of type String
254   * @param pipes          of type Pipe[]
255   * @param groupingFields of type Fields
256   * @param valueField     of type Fields
257   * @param sumField       of type Fields
258   * @param threshold      of type int
259   */
260  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "threshold"})
261  public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold )
262    {
263    super( name, pipes, groupingFields, valueField, new SumPartials( sumField ), new Sum( sumField ), threshold );
264    }
265
266///////////
267
268  /**
269   * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy}
270   * instance.
271   *
272   * @param valueField of type Fields
273   * @param sumField   of type Fields
274   * @param sumType    of type Class
275   */
276  @ConstructorProperties({"valueField", "sumField", "sumType"})
277  public SumBy( Fields valueField, Fields sumField, Class sumType )
278    {
279    super( valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ) );
280    }
281
282//////////////
283
284  /**
285   * Constructor SumBy creates a new SumBy instance.
286   *
287   * @param pipe           of type Pipe
288   * @param groupingFields of type Fields
289   * @param valueField     of type Fields
290   * @param sumField       of type Fields
291   * @param sumType        of type Class
292   */
293  @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType"})
294  public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
295    {
296    this( null, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
297    }
298
299  /**
300   * Constructor SumBy creates a new SumBy instance.
301   *
302   * @param pipe           of type Pipe
303   * @param groupingFields of type Fields
304   * @param valueField     of type Fields
305   * @param sumField       of type Fields
306   * @param sumType        of type Class
307   * @param threshold      of type int
308   */
309  @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
310  public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
311    {
312    this( null, pipe, groupingFields, valueField, sumField, sumType, threshold );
313    }
314
315  /**
316   * Constructor SumBy creates a new SumBy instance.
317   *
318   * @param name           of type String
319   * @param pipe           of type Pipe
320   * @param groupingFields of type Fields
321   * @param valueField     of type Fields
322   * @param sumField       of type Fields
323   * @param sumType        of type Class
324   */
325  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType"})
326  public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
327    {
328    this( name, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
329    }
330
331  /**
332   * Constructor SumBy creates a new SumBy instance.
333   *
334   * @param name           of type String
335   * @param pipe           of type Pipe
336   * @param groupingFields of type Fields
337   * @param valueField     of type Fields
338   * @param sumField       of type Fields
339   * @param sumType        of type Class
340   * @param threshold      of type int
341   */
342  @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
343  public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
344    {
345    this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, sumType, threshold );
346    }
347
348  /**
349   * Constructor SumBy creates a new SumBy instance.
350   *
351   * @param pipes          of type Pipe[]
352   * @param groupingFields of type Fields
353   * @param valueField     of type Fields
354   * @param sumField       of type Fields
355   * @param sumType        of type Class
356   */
357  @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType"})
358  public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
359    {
360    this( null, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
361    }
362
363  /**
364   * Constructor SumBy creates a new SumBy instance.
365   *
366   * @param pipes          of type Pipe[]
367   * @param groupingFields of type Fields
368   * @param valueField     of type Fields
369   * @param sumField       of type Fields
370   * @param sumType        of type Class
371   * @param threshold      of type int
372   */
373  @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
374  public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
375    {
376    this( null, pipes, groupingFields, valueField, sumField, sumType, threshold );
377    }
378
379  /**
380   * Constructor SumBy creates a new SumBy instance.
381   *
382   * @param name           of type String
383   * @param pipes          of type Pipe[]
384   * @param groupingFields of type Fields
385   * @param valueField     of type Fields
386   * @param sumField       of type Fields
387   * @param sumType        of type Class
388   */
389  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType"})
390  public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType )
391    {
392    this( name, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD );
393    }
394
395  /**
396   * Constructor SumBy creates a new SumBy instance.
397   *
398   * @param name           of type String
399   * @param pipes          of type Pipe[]
400   * @param groupingFields of type Fields
401   * @param valueField     of type Fields
402   * @param sumField       of type Fields
403   * @param sumType        of type Class
404   * @param threshold      of type int
405   */
406  @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"})
407  public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold )
408    {
409    super( name, pipes, groupingFields, valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ), threshold );
410    }
411  }