001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 import java.lang.reflect.Type; 025 026 import cascading.flow.FlowProcess; 027 import cascading.operation.aggregator.Sum; 028 import cascading.pipe.Pipe; 029 import cascading.tuple.Fields; 030 import cascading.tuple.Tuple; 031 import cascading.tuple.TupleEntry; 032 import cascading.tuple.coerce.Coercions; 033 import cascading.tuple.type.CoercibleType; 034 035 /** 036 * Class SumBy is used to sum values associated with duplicate keys in a tuple stream. 037 * <p/> 038 * Typically finding the sum of field in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Sum} 039 * {@link cascading.operation.Aggregator} operation. 040 * <p/> 041 * If all the values to be summed are all {@code null}, the result value is a function of how null is coerced by the 042 * given {@code sumType}. If a primitive type, {@code 0} will be returned. Otherwise {@code null}. 043 * <p/> 044 * This SubAssembly also uses the {@link SumBy.SumPartials} {@link AggregateBy.Functor} 045 * to sum field values before the GroupBy operator to reduce IO over the network. 046 * <p/> 047 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 048 * in a much simpler mechanism. 049 * <p/> 050 * The {@code threshold} value tells the underlying SumPartials functions how many unique key sums to accumulate 051 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is 052 * bounded by the size of your map task JVM and the typical size of each group key. 053 * <p/> 054 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property 055 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used. 056 * 057 * @see AggregateBy 058 */ 059 public class SumBy extends AggregateBy 060 { 061 /** DEFAULT_THRESHOLD */ 062 @Deprecated 063 public static final int DEFAULT_THRESHOLD = 10000; 064 065 /** 066 * Class SumPartials is a {@link AggregateBy.Functor} that is used to sum observed duplicates from the tuple stream. 067 * <p/> 068 * Use this class typically in tandem with a {@link cascading.operation.aggregator.Sum} 069 * {@link cascading.operation.Aggregator} in order to improve counting performance by removing as many values 070 * as possible before the intermediate {@link cascading.pipe.GroupBy} operator. 071 * 072 * @see SumBy 073 */ 074 public static class SumPartials implements Functor 075 { 076 private final Fields declaredFields; 077 private final Type sumType; 078 private final CoercibleType canonical; 079 080 /** Constructor SumPartials creates a new SumPartials instance. */ 081 public SumPartials( Fields declaredFields ) 082 { 083 this.declaredFields = declaredFields; 084 085 if( !declaredFields.hasTypes() ) 086 throw new IllegalArgumentException( "result type must be declared " ); 087 088 this.sumType = declaredFields.getType( 0 ); 089 090 if( declaredFields.size() != 1 ) 091 throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields ); 092 093 this.canonical = Coercions.coercibleTypeFor( this.sumType ); 094 } 095 096 public SumPartials( Fields declaredFields, Class sumType ) 097 { 098 this.declaredFields = declaredFields; 099 this.sumType = sumType; 100 101 if( declaredFields.size() != 1 ) 102 throw new IllegalArgumentException( "declared fields may only have one field, got: " + declaredFields ); 103 104 this.canonical = Coercions.coercibleTypeFor( this.sumType ); 105 } 106 107 @Override 108 public Fields getDeclaredFields() 109 { 110 return declaredFields; 111 } 112 113 @Override 114 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 115 { 116 if( context == null ) 117 return args.getTupleCopy(); 118 else if( args.getObject( 0 ) == null ) 119 return context; 120 121 context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) ); 122 123 return context; 124 } 125 126 @Override 127 public Tuple complete( FlowProcess flowProcess, Tuple context ) 128 { 129 context.set( 0, canonical.canonical( context.getObject( 0 ) ) ); 130 131 return context; 132 } 133 } 134 135 /** 136 * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy} 137 * instance. 138 * 139 * @param valueField of type Fields 140 * @param sumField of type Fields 141 */ 142 @ConstructorProperties({"valueField", "sumField"}) 143 public SumBy( Fields valueField, Fields sumField ) 144 { 145 super( valueField, new SumPartials( sumField ), new Sum( sumField ) ); 146 } 147 148 ////////////// 149 150 /** 151 * Constructor SumBy creates a new SumBy instance. 152 * 153 * @param pipe of type Pipe 154 * @param groupingFields of type Fields 155 * @param valueField of type Fields 156 * @param sumField of type Fields 157 */ 158 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField"}) 159 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField ) 160 { 161 this( null, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 162 } 163 164 /** 165 * Constructor SumBy creates a new SumBy instance. 166 * 167 * @param pipe of type Pipe 168 * @param groupingFields of type Fields 169 * @param valueField of type Fields 170 * @param sumField of type Fields 171 * @param threshold of type int 172 */ 173 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "threshold"}) 174 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 175 { 176 this( null, pipe, groupingFields, valueField, sumField, threshold ); 177 } 178 179 /** 180 * Constructor SumBy creates a new SumBy instance. 181 * 182 * @param name of type String 183 * @param pipe of type Pipe 184 * @param groupingFields of type Fields 185 * @param valueField of type Fields 186 * @param sumField of type Fields 187 */ 188 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField"}) 189 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField ) 190 { 191 this( name, pipe, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 192 } 193 194 /** 195 * Constructor SumBy creates a new SumBy instance. 196 * 197 * @param name of type String 198 * @param pipe of type Pipe 199 * @param groupingFields of type Fields 200 * @param valueField of type Fields 201 * @param sumField of type Fields 202 * @param threshold of type int 203 */ 204 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "threshold"}) 205 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 206 { 207 this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, threshold ); 208 } 209 210 /** 211 * Constructor SumBy creates a new SumBy instance. 212 * 213 * @param pipes of type Pipe[] 214 * @param groupingFields of type Fields 215 * @param valueField of type Fields 216 * @param sumField of type Fields 217 */ 218 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField"}) 219 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField ) 220 { 221 this( null, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 222 } 223 224 /** 225 * Constructor SumBy creates a new SumBy instance. 226 * 227 * @param pipes of type Pipe[] 228 * @param groupingFields of type Fields 229 * @param valueField of type Fields 230 * @param sumField of type Fields 231 * @param threshold of type int 232 */ 233 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "threshold"}) 234 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 235 { 236 this( null, pipes, groupingFields, valueField, sumField, threshold ); 237 } 238 239 /** 240 * Constructor SumBy creates a new SumBy instance. 241 * 242 * @param name of type String 243 * @param pipes of type Pipe[] 244 * @param groupingFields of type Fields 245 * @param valueField of type Fields 246 * @param sumField of type Fields 247 */ 248 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField"}) 249 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField ) 250 { 251 this( name, pipes, groupingFields, valueField, sumField, USE_DEFAULT_THRESHOLD ); 252 } 253 254 /** 255 * Constructor SumBy creates a new SumBy instance. 256 * 257 * @param name of type String 258 * @param pipes of type Pipe[] 259 * @param groupingFields of type Fields 260 * @param valueField of type Fields 261 * @param sumField of type Fields 262 * @param threshold of type int 263 */ 264 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "threshold"}) 265 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, int threshold ) 266 { 267 super( name, pipes, groupingFields, valueField, new SumPartials( sumField ), new Sum( sumField ), threshold ); 268 } 269 270 /////////// 271 272 /** 273 * Constructor SumBy creates a new SumBy instance. Use this constructor when used with a {@link AggregateBy} 274 * instance. 275 * 276 * @param valueField of type Fields 277 * @param sumField of type Fields 278 * @param sumType of type Class 279 */ 280 @ConstructorProperties({"valueField", "sumField", "sumType"}) 281 public SumBy( Fields valueField, Fields sumField, Class sumType ) 282 { 283 super( valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ) ); 284 } 285 286 ////////////// 287 288 /** 289 * Constructor SumBy creates a new SumBy instance. 290 * 291 * @param pipe of type Pipe 292 * @param groupingFields of type Fields 293 * @param valueField of type Fields 294 * @param sumField of type Fields 295 * @param sumType of type Class 296 */ 297 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType"}) 298 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 299 { 300 this( null, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 301 } 302 303 /** 304 * Constructor SumBy creates a new SumBy instance. 305 * 306 * @param pipe of type Pipe 307 * @param groupingFields of type Fields 308 * @param valueField of type Fields 309 * @param sumField of type Fields 310 * @param sumType of type Class 311 * @param threshold of type int 312 */ 313 @ConstructorProperties({"pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 314 public SumBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 315 { 316 this( null, pipe, groupingFields, valueField, sumField, sumType, threshold ); 317 } 318 319 /** 320 * Constructor SumBy creates a new SumBy instance. 321 * 322 * @param name of type String 323 * @param pipe of type Pipe 324 * @param groupingFields of type Fields 325 * @param valueField of type Fields 326 * @param sumField of type Fields 327 * @param sumType of type Class 328 */ 329 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType"}) 330 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 331 { 332 this( name, pipe, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 333 } 334 335 /** 336 * Constructor SumBy creates a new SumBy instance. 337 * 338 * @param name of type String 339 * @param pipe of type Pipe 340 * @param groupingFields of type Fields 341 * @param valueField of type Fields 342 * @param sumField of type Fields 343 * @param sumType of type Class 344 * @param threshold of type int 345 */ 346 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 347 public SumBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 348 { 349 this( name, Pipe.pipes( pipe ), groupingFields, valueField, sumField, sumType, threshold ); 350 } 351 352 /** 353 * Constructor SumBy creates a new SumBy instance. 354 * 355 * @param pipes of type Pipe[] 356 * @param groupingFields of type Fields 357 * @param valueField of type Fields 358 * @param sumField of type Fields 359 * @param sumType of type Class 360 */ 361 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType"}) 362 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 363 { 364 this( null, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 365 } 366 367 /** 368 * Constructor SumBy creates a new SumBy instance. 369 * 370 * @param pipes of type Pipe[] 371 * @param groupingFields of type Fields 372 * @param valueField of type Fields 373 * @param sumField of type Fields 374 * @param sumType of type Class 375 * @param threshold of type int 376 */ 377 @ConstructorProperties({"pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 378 public SumBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 379 { 380 this( null, pipes, groupingFields, valueField, sumField, sumType, threshold ); 381 } 382 383 /** 384 * Constructor SumBy creates a new SumBy instance. 385 * 386 * @param name of type String 387 * @param pipes of type Pipe[] 388 * @param groupingFields of type Fields 389 * @param valueField of type Fields 390 * @param sumField of type Fields 391 * @param sumType of type Class 392 */ 393 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType"}) 394 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType ) 395 { 396 this( name, pipes, groupingFields, valueField, sumField, sumType, USE_DEFAULT_THRESHOLD ); 397 } 398 399 /** 400 * Constructor SumBy creates a new SumBy instance. 401 * 402 * @param name of type String 403 * @param pipes of type Pipe[] 404 * @param groupingFields of type Fields 405 * @param valueField of type Fields 406 * @param sumField of type Fields 407 * @param sumType of type Class 408 * @param threshold of type int 409 */ 410 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "sumField", "sumType", "threshold"}) 411 public SumBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields sumField, Class sumType, int threshold ) 412 { 413 super( name, pipes, groupingFields, valueField, new SumPartials( sumField, sumType ), new Sum( sumField, sumType ), threshold ); 414 } 415 }