001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe.assembly; 022 023import java.beans.ConstructorProperties; 024import java.lang.reflect.Type; 025 026import cascading.flow.FlowProcess; 027import cascading.operation.Aggregator; 028import cascading.operation.AggregatorCall; 029import cascading.operation.BaseOperation; 030import cascading.operation.OperationCall; 031import cascading.pipe.Pipe; 032import cascading.tuple.Fields; 033import cascading.tuple.Tuple; 034import cascading.tuple.TupleEntry; 035import cascading.tuple.coerce.Coercions; 036import cascading.tuple.type.CoercibleType; 037 038/** 039 * Class AverageBy is used to average values associated with duplicate keys in a tuple stream. 040 * <p/> 041 * Typically finding the average value in a tuple stream relies on a {@link cascading.pipe.GroupBy} and a {@link cascading.operation.aggregator.Average} 042 * {@link cascading.operation.Aggregator} operation. 043 * <p/> 044 * If the given {@code averageFields} has an associated type, this type will be used to coerce the resulting average value, 045 * otherwise the result will be a {@link Double}. 046 * <p/> 047 * If {@code include} is {@link Include#NO_NULLS}, {@code null} values will not be included in the average (converted to zero). 048 * By default (and for backwards compatibility) {@code null} values are included, {@link Include#ALL}. 049 * <p/> 050 * This SubAssembly uses the {@link cascading.pipe.assembly.AverageBy.AveragePartials} {@link cascading.pipe.assembly.AggregateBy.Functor} 051 * and private {@link AverageFinal} Aggregator to count and sum as many field values before the GroupBy operator to reduce IO over the network. 052 * <p/> 053 * This strategy is similar to using {@code combiners}, except no sorting or serialization is invoked and results 054 * in a much simpler mechanism. 055 * <p/> 056 * The {@code threshold} value tells the underlying AveragePartials functions how many unique key sums and counts to accumulate 057 * in the LRU cache, before emitting the least recently used entry. This accumulation happens map-side, and thus is 058 * bounded by the size of your map task JVM and the typical size of each group key. 059 * <p/> 060 * By default, either the value of {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} System property 061 * or {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_DEFAULT_CAPACITY} will be used. 062 * 063 * @see cascading.pipe.assembly.AggregateBy 064 */ 065public class AverageBy extends AggregateBy 066 { 067 public enum Include 068 { 069 ALL, 070 NO_NULLS 071 } 072 073 /** 074 * Class AveragePartials is a {@link cascading.pipe.assembly.AggregateBy.Functor} that is used to count and sum observed duplicates from the tuple stream. 075 * 076 * @see cascading.pipe.assembly.AverageBy 077 */ 078 public static class AveragePartials implements Functor 079 { 080 private final Fields declaredFields; 081 private final Include include; 082 083 /** 084 * Constructor AveragePartials creates a new AveragePartials instance. 085 * 086 * @param declaredFields of type Fields 087 */ 088 public AveragePartials( Fields declaredFields ) 089 { 090 this.declaredFields = declaredFields; 091 this.include = Include.ALL; 092 } 093 094 public AveragePartials( Fields declaredFields, Include include ) 095 { 096 this.declaredFields = declaredFields; 097 this.include = include; 098 } 099 100 @Override 101 public Fields getDeclaredFields() 102 { 103 Fields sumName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".sum", Double.class ); 104 Fields countName = new Fields( AverageBy.class.getPackage().getName() + "." + declaredFields.get( 0 ) + ".count", Long.class ); 105 106 return sumName.append( countName ); 107 } 108 109 @Override 110 public Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ) 111 { 112 if( context == null ) 113 context = Tuple.size( 2 ); 114 115 if( include == Include.NO_NULLS && args.getObject( 0 ) == null ) 116 return context; 117 118 context.set( 0, context.getDouble( 0 ) + args.getDouble( 0 ) ); 119 context.set( 1, context.getLong( 1 ) + 1 ); 120 121 return context; 122 } 123 124 @Override 125 public Tuple complete( FlowProcess flowProcess, Tuple context ) 126 { 127 return context; 128 } 129 } 130 131 /** 132 * Class AverageFinal is used to finalize the average operation on the Reduce side of the process. It must be used 133 * in tandem with a {@link AveragePartials} Functor. 134 */ 135 public static class AverageFinal extends BaseOperation<AverageFinal.Context> implements Aggregator<AverageFinal.Context> 136 { 137 /** Class Context is used to hold intermediate values. */ 138 protected static class Context 139 { 140 long nulls = 0L; 141 double sum = 0.0D; 142 long count = 0L; 143 Type type = Double.class; 144 CoercibleType canonical; 145 146 Tuple tuple = Tuple.size( 1 ); 147 148 public Context( Fields fieldDeclaration ) 149 { 150 if( fieldDeclaration.hasTypes() ) 151 this.type = fieldDeclaration.getType( 0 ); 152 153 this.canonical = Coercions.coercibleTypeFor( this.type ); 154 } 155 156 public Context reset() 157 { 158 nulls = 0L; 159 sum = 0.0D; 160 count = 0L; 161 tuple.set( 0, null ); 162 163 return this; 164 } 165 166 public Tuple result() 167 { 168 // we only saw null from upstream, so return null 169 if( count == 0 && nulls != 0 ) 170 return tuple; 171 172 tuple.set( 0, canonical.canonical( sum / count ) ); 173 174 return tuple; 175 } 176 } 177 178 /** 179 * Constructs a new instance that returns the average of the values encountered in the given fieldDeclaration field name. 180 * 181 * @param fieldDeclaration of type Fields 182 */ 183 public AverageFinal( Fields fieldDeclaration ) 184 { 185 super( 2, makeFieldDeclaration( fieldDeclaration ) ); 186 187 if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 ) 188 throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() ); 189 } 190 191 private static Fields makeFieldDeclaration( Fields fieldDeclaration ) 192 { 193 if( fieldDeclaration.hasTypes() ) 194 return fieldDeclaration; 195 196 return fieldDeclaration.applyTypes( Double.class ); 197 } 198 199 @Override 200 public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall ) 201 { 202 operationCall.setContext( new Context( getFieldDeclaration() ) ); 203 } 204 205 @Override 206 public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 207 { 208 aggregatorCall.getContext().reset(); 209 } 210 211 @Override 212 public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 213 { 214 Context context = aggregatorCall.getContext(); 215 TupleEntry arguments = aggregatorCall.getArguments(); 216 217 if( arguments.getObject( 0 ) == null ) 218 { 219 context.nulls++; 220 return; 221 } 222 223 context.sum += arguments.getDouble( 0 ); 224 context.count += arguments.getLong( 1 ); 225 } 226 227 @Override 228 public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall ) 229 { 230 aggregatorCall.getOutputCollector().add( aggregatorCall.getContext().result() ); 231 } 232 } 233 234 ///////// 235 236 /** 237 * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy} 238 * instance. 239 * 240 * @param valueField of type Fields 241 * @param averageField of type Fields 242 */ 243 @ConstructorProperties({"valueField", "averageField"}) 244 public AverageBy( Fields valueField, Fields averageField ) 245 { 246 super( valueField, new AveragePartials( averageField ), new AverageFinal( averageField ) ); 247 } 248 249 /** 250 * Constructor AverageBy creates a new AverageBy instance. Use this constructor when used with a {@link cascading.pipe.assembly.AggregateBy} 251 * instance. 252 * 253 * @param valueField of type Fields 254 * @param averageField of type Fields 255 * @param include of type boolean 256 */ 257 @ConstructorProperties({"valueField", "averageField", "include"}) 258 public AverageBy( Fields valueField, Fields averageField, Include include ) 259 { 260 super( valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ) ); 261 } 262 263 ////////////// 264 265 /** 266 * Constructor AverageBy creates a new AverageBy instance. 267 * 268 * @param pipe of type Pipe 269 * @param groupingFields of type Fields 270 * @param valueField of type Fields 271 * @param averageField of type Fields 272 */ 273 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField"}) 274 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField ) 275 { 276 this( null, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 277 } 278 279 /** 280 * Constructor AverageBy creates a new AverageBy instance. 281 * 282 * @param pipe of type Pipe 283 * @param groupingFields of type Fields 284 * @param valueField of type Fields 285 * @param averageField of type Fields 286 * @param threshold of type int 287 */ 288 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "threshold"}) 289 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 290 { 291 this( null, pipe, groupingFields, valueField, averageField, threshold ); 292 } 293 294 /** 295 * Constructor AverageBy creates a new AverageBy instance. 296 * 297 * @param name of type String 298 * @param pipe of type Pipe 299 * @param groupingFields of type Fields 300 * @param valueField of type Fields 301 * @param averageField of type Fields 302 */ 303 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField"}) 304 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField ) 305 { 306 this( name, pipe, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 307 } 308 309 /** 310 * Constructor AverageBy creates a new AverageBy instance. 311 * 312 * @param name of type String 313 * @param pipe of type Pipe 314 * @param groupingFields of type Fields 315 * @param valueField of type Fields 316 * @param averageField of type Fields 317 * @param threshold of type int 318 */ 319 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "threshold"}) 320 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 321 { 322 this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, threshold ); 323 } 324 325 /** 326 * Constructor AverageBy creates a new AverageBy instance. 327 * 328 * @param pipes of type Pipe[] 329 * @param groupingFields of type Fields 330 * @param valueField of type Fields 331 * @param averageField of type Fields 332 */ 333 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField"}) 334 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField ) 335 { 336 this( null, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 337 } 338 339 /** 340 * Constructor AverageBy creates a new AverageBy instance. 341 * 342 * @param pipes of type Pipe[] 343 * @param groupingFields of type Fields 344 * @param valueField of type Fields 345 * @param averageField of type Fields 346 * @param threshold of type int 347 */ 348 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "threshold"}) 349 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 350 { 351 this( null, pipes, groupingFields, valueField, averageField, threshold ); 352 } 353 354 /** 355 * Constructor AverageBy creates a new AverageBy instance. 356 * 357 * @param name of type String 358 * @param pipes of type Pipe[] 359 * @param groupingFields of type Fields 360 * @param valueField of type Fields 361 * @param averageField of type Fields 362 */ 363 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField"}) 364 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField ) 365 { 366 this( name, pipes, groupingFields, valueField, averageField, USE_DEFAULT_THRESHOLD ); 367 } 368 369 /** 370 * Constructor AverageBy creates a new AverageBy instance. 371 * 372 * @param name of type String 373 * @param pipes of type Pipe[] 374 * @param groupingFields of type Fields 375 * @param valueField of type Fields 376 * @param averageField of type Fields 377 * @param threshold of type int 378 */ 379 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "threshold"}) 380 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, int threshold ) 381 { 382 super( name, pipes, groupingFields, valueField, new AveragePartials( averageField ), new AverageFinal( averageField ), threshold ); 383 } 384 385 /** 386 * Constructor AverageBy creates a new AverageBy instance. 387 * 388 * @param pipe of type Pipe 389 * @param groupingFields of type Fields 390 * @param valueField of type Fields 391 * @param averageField of type Fields 392 * @param include of type boolean 393 */ 394 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include"}) 395 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 396 { 397 this( null, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 398 } 399 400 /** 401 * Constructor AverageBy creates a new AverageBy instance. 402 * 403 * @param pipe of type Pipe 404 * @param groupingFields of type Fields 405 * @param valueField of type Fields 406 * @param averageField of type Fields 407 * @param include of type boolean 408 * @param threshold of type int 409 */ 410 @ConstructorProperties({"pipe", "groupingFields", "valueField", "averageField", "include", "threshold"}) 411 public AverageBy( Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 412 { 413 this( null, pipe, groupingFields, valueField, averageField, include, threshold ); 414 } 415 416 /** 417 * Constructor AverageBy creates a new AverageBy instance. 418 * 419 * @param name of type String 420 * @param pipe of type Pipe 421 * @param groupingFields of type Fields 422 * @param valueField of type Fields 423 * @param averageField of type Fields 424 * @param include of type boolean 425 */ 426 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include"}) 427 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 428 { 429 this( name, pipe, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 430 } 431 432 /** 433 * Constructor AverageBy creates a new AverageBy instance. 434 * 435 * @param name of type String 436 * @param pipe of type Pipe 437 * @param groupingFields of type Fields 438 * @param valueField of type Fields 439 * @param averageField of type Fields 440 * @param include of type boolean 441 * @param threshold of type int 442 */ 443 @ConstructorProperties({"name", "pipe", "groupingFields", "valueField", "averageField", "include", "threshold"}) 444 public AverageBy( String name, Pipe pipe, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 445 { 446 this( name, Pipe.pipes( pipe ), groupingFields, valueField, averageField, include, threshold ); 447 } 448 449 /** 450 * Constructor AverageBy creates a new AverageBy instance. 451 * 452 * @param pipes of type Pipe[] 453 * @param groupingFields of type Fields 454 * @param valueField of type Fields 455 * @param averageField of type Fields 456 * @param include of type boolean 457 */ 458 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include"}) 459 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 460 { 461 this( null, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 462 } 463 464 /** 465 * Constructor AverageBy creates a new AverageBy instance. 466 * 467 * @param pipes of type Pipe[] 468 * @param groupingFields of type Fields 469 * @param valueField of type Fields 470 * @param averageField of type Fields 471 * @param include of type boolean 472 * @param threshold of type int 473 */ 474 @ConstructorProperties({"pipes", "groupingFields", "valueField", "averageField", "include", "threshold"}) 475 public AverageBy( Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 476 { 477 this( null, pipes, groupingFields, valueField, averageField, include, threshold ); 478 } 479 480 /** 481 * Constructor AverageBy creates a new AverageBy instance. 482 * 483 * @param name of type String 484 * @param pipes of type Pipe[] 485 * @param groupingFields of type Fields 486 * @param valueField of type Fields 487 * @param averageField of type Fields 488 * @param include of type boolean 489 */ 490 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include"}) 491 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include ) 492 { 493 this( name, pipes, groupingFields, valueField, averageField, include, USE_DEFAULT_THRESHOLD ); 494 } 495 496 /** 497 * Constructor AverageBy creates a new AverageBy instance. 498 * 499 * @param name of type String 500 * @param pipes of type Pipe[] 501 * @param groupingFields of type Fields 502 * @param valueField of type Fields 503 * @param averageField of type Fields 504 * @param include of type boolean 505 * @param threshold of type int 506 */ 507 @ConstructorProperties({"name", "pipes", "groupingFields", "valueField", "averageField", "include", "threshold"}) 508 public AverageBy( String name, Pipe[] pipes, Fields groupingFields, Fields valueField, Fields averageField, Include include, int threshold ) 509 { 510 super( name, pipes, groupingFields, valueField, new AveragePartials( averageField, include ), new AverageFinal( averageField ), threshold ); 511 } 512 }