001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe.assembly; 022 023import java.beans.ConstructorProperties; 024import java.lang.reflect.Type; 025 026import cascading.flow.FlowProcess; 027import cascading.operation.aggregator.Sum; 028import cascading.pipe.Pipe; 029import cascading.tuple.Fields; 030import cascading.tuple.Tuple; 031import cascading.tuple.TupleEntry; 032import cascading.tuple.coerce.Coercions; 033import cascading.tuple.type.CoercibleType; 034 035/** 036 * Class SumBy is used to sum values associated with duplicate keys in a tuple stream. 037 * <p/> 038 * Typically finding the sum of field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Sum} 039 * {@link cascading.operation.Aggregator} operation. 040 * <p/> 041 * If all the values to be summed are all {@code null}, the result value is a function of how null is coerced by the 042 * given {@code sumType}. If a primitive type, {@code 0} will be returned. Otherwise {@code null}. 043 * <p/> 044 * This SubAssembly also uses the {@link SumBy.SumPartials} {@link AggregateBy.Functor} 045 * to sum field values before the GroupBy operator to reduce IO over the network. 046 * <p/> 047 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 048 * in a much simpler mechanism. 049 * <p/> 050 * The {@code threshold} value tells the underlying SumPartials functions how many unique key sums to accumulate 051 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is 052 * bounded by the size of your map task JVM and the typical size of each group key. 053 * <p/> 054 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property 055 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used. 056 * 057 * @see AggregateBy 058 */ 059public class SumBy extends AggregateBy 060 { 061 /** 062 * Class SumPartials is a {@link AggregateBy.Functor} that is used to sum observed duplicates from the tuple stream. 063 * <p/> 064 * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum} 065 * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values 066 * as possible before the intermediate {@link cascading.pipe.GroupBy} operator. 067 * 068 * @see SumBy 069 */ 070 public static class SumPartials implements Functor 071 { 072 private final Fields declaredFields; 073 private final Type sumType; 074 private final CoercibleType canonical; 075 076 /** Constructor SumPartials creates a new SumPartials instance. */ 077 public SumPartials( Fields declaredFields ) 078 { 079 this.declaredFields = declaredFields; 080 081 if( !declaredFields.hasTypes() ) 082 throw new IllegalArgumentException( "result type must be declared " ); 083 084 this.sumType = declaredFields.getType( 0 ); 085 086 if( declaredFields.size() != 1 ) 087 throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields ); 088 089 this.canonical = Coercions.coercibleTypeFor( this.sumType ); 090 } 091 092 public SumPartials( Fields declaredFields, Class sumType ) 093 { 094 this.declaredFields = declaredFields; 095 this.sumType = sumType; 096 097 if( declaredFields.size() != 1 ) 098 throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields ); 099 100 this.canonical = Coercions.coercibleTypeFor( this.sumType ); 101 } 102 103 @Override 104 public Fields getDeclaredFields() 105 { 106 return declaredFields; 107 } 108 109 @Override 110 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 111 { 112 if( context == null ) 113 return args.getTupleCopy(); 114 else if( args.getObject( 0 ) == null ) 115 return context; 116 117 context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) ); 118 119 return context; 120 } 121 122 @Override 123 public Tuple complete( FlowProcess flowProcess, Tuple context ) 124 { 125 context.set( 0, canonical.canonical( context.getObject( 0 ) ) ); 126 127 return context; 128 } 129 } 130 131 /** 132 * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy} 133 * instance. 134 * 135 * @param valueField of type Fields 136 * @param sumField of type Fields 137 */ 138 @ConstructorProperties({"valueField", "sumField"}) 139 public SumBy( Fields valueField, Fields sumField ) 140 { 141 super( valueField, new SumPartials( sumField ), new Sum( sumField ) ); 142 } 143 144 ////////////// 145 146 /** 147 * Constructor SumBy creates a new SumBy instance. 148 * 149 * @param pipe of type Pipe 150 * @param groupingFields of type Fields 151 * @param valueField of type Fields 152 * @param sumField of type Fields 153 */ 154 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField"}) 155 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField ) 156 { 157 this( null, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 158 } 159 160 /** 161 * Constructor SumBy creates a new SumBy instance. 162 * 163 * @param pipe of type Pipe 164 * @param groupingFields of type Fields 165 * @param valueField of type Fields 166 * @param sumField of type Fields 167 * @param threshold of type int 168 */ 169 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "threshold"}) 170 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 171 { 172 this( null, pipe, groupingFields, valueField, sumField, threshold ); 173 } 174 175 /** 176 * Constructor SumBy creates a new SumBy instance. 177 * 178 * @param name of type String 179 * @param pipe of type Pipe 180 * @param groupingFields of type Fields 181 * @param valueField of type Fields 182 * @param sumField of type Fields 183 */ 184 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField"}) 185 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField ) 186 { 187 this( name, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 188 } 189 190 /** 191 * Constructor SumBy creates a new SumBy instance. 192 * 193 * @param name of type String 194 * @param pipe of type Pipe 195 * @param groupingFields of type Fields 196 * @param valueField of type Fields 197 * @param sumField of type Fields 198 * @param threshold of type int 199 */ 200 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "threshold"}) 201 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 202 { 203 this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, threshold ); 204 } 205 206 /** 207 * Constructor SumBy creates a new SumBy instance. 208 * 209 * @param pipes of type Pipe[] 210 * @param groupingFields of type Fields 211 * @param valueField of type Fields 212 * @param sumField of type Fields 213 */ 214 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField"}) 215 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField ) 216 { 217 this( null, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 218 } 219 220 /** 221 * Constructor SumBy creates a new SumBy instance. 222 * 223 * @param pipes of type Pipe[] 224 * @param groupingFields of type Fields 225 * @param valueField of type Fields 226 * @param sumField of type Fields 227 * @param threshold of type int 228 */ 229 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "threshold"}) 230 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 231 { 232 this( null, pipes, groupingFields, valueField, sumField, threshold ); 233 } 234 235 /** 236 * Constructor SumBy creates a new SumBy instance. 237 * 238 * @param name of type String 239 * @param pipes of type Pipe[] 240 * @param groupingFields of type Fields 241 * @param valueField of type Fields 242 * @param sumField of type Fields 243 */ 244 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField"}) 245 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField ) 246 { 247 this( name, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 248 } 249 250 /** 251 * Constructor SumBy creates a new SumBy instance. 252 * 253 * @param name of type String 254 * @param pipes of type Pipe[] 255 * @param groupingFields of type Fields 256 * @param valueField of type Fields 257 * @param sumField of type Fields 258 * @param threshold of type int 259 */ 260 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "threshold"}) 261 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 262 { 263 super( name, pipes, groupingFields, valueField, new SumPartials( sumField ), new Sum( sumField ), threshold ); 264 } 265 266/////////// 267 268 /** 269 * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy} 270 * instance. 271 * 272 * @param valueField of type Fields 273 * @param sumField of type Fields 274 * @param sumType of type Class 275 */ 276 @ConstructorProperties({"valueField", "sumField", "sumType"}) 277 public SumBy( Fields valueField, Fields sumField, Class sumType ) 278 { 279 super( valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ) ); 280 } 281 282////////////// 283 284 /** 285 * Constructor SumBy creates a new SumBy instance. 286 * 287 * @param pipe of type Pipe 288 * @param groupingFields of type Fields 289 * @param valueField of type Fields 290 * @param sumField of type Fields 291 * @param sumType of type Class 292 */ 293 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType"}) 294 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 295 { 296 this( null, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 297 } 298 299 /** 300 * Constructor SumBy creates a new SumBy instance. 301 * 302 * @param pipe of type Pipe 303 * @param groupingFields of type Fields 304 * @param valueField of type Fields 305 * @param sumField of type Fields 306 * @param sumType of type Class 307 * @param threshold of type int 308 */ 309 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 310 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 311 { 312 this( null, pipe, groupingFields, valueField, sumField, sumType, threshold ); 313 } 314 315 /** 316 * Constructor SumBy creates a new SumBy instance. 317 * 318 * @param name of type String 319 * @param pipe of type Pipe 320 * @param groupingFields of type Fields 321 * @param valueField of type Fields 322 * @param sumField of type Fields 323 * @param sumType of type Class 324 */ 325 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType"}) 326 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 327 { 328 this( name, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 329 } 330 331 /** 332 * Constructor SumBy creates a new SumBy instance. 333 * 334 * @param name of type String 335 * @param pipe of type Pipe 336 * @param groupingFields of type Fields 337 * @param valueField of type Fields 338 * @param sumField of type Fields 339 * @param sumType of type Class 340 * @param threshold of type int 341 */ 342 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 343 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 344 { 345 this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, sumType, threshold ); 346 } 347 348 /** 349 * Constructor SumBy creates a new SumBy instance. 350 * 351 * @param pipes of type Pipe[] 352 * @param groupingFields of type Fields 353 * @param valueField of type Fields 354 * @param sumField of type Fields 355 * @param sumType of type Class 356 */ 357 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType"}) 358 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 359 { 360 this( null, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 361 } 362 363 /** 364 * Constructor SumBy creates a new SumBy instance. 365 * 366 * @param pipes of type Pipe[] 367 * @param groupingFields of type Fields 368 * @param valueField of type Fields 369 * @param sumField of type Fields 370 * @param sumType of type Class 371 * @param threshold of type int 372 */ 373 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 374 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 375 { 376 this( null, pipes, groupingFields, valueField, sumField, sumType, threshold ); 377 } 378 379 /** 380 * Constructor SumBy creates a new SumBy instance. 381 * 382 * @param name of type String 383 * @param pipes of type Pipe[] 384 * @param groupingFields of type Fields 385 * @param valueField of type Fields 386 * @param sumField of type Fields 387 * @param sumType of type Class 388 */ 389 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType"}) 390 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 391 { 392 this( name, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 393 } 394 395 /** 396 * Constructor SumBy creates a new SumBy instance. 397 * 398 * @param name of type String 399 * @param pipes of type Pipe[] 400 * @param groupingFields of type Fields 401 * @param valueField of type Fields 402 * @param sumField of type Fields 403 * @param sumType of type Class 404 * @param threshold of type int 405 */ 406 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 407 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 408 { 409 super( name, pipes, groupingFields, valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ), threshold ); 410 } 411 }