001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.pipe.assembly; 022 023 import java.beans.ConstructorProperties; 024 import java.io.Serializable; 025 import java.util.ArrayList; 026 import java.util.Arrays; 027 import java.util.Collections; 028 import java.util.Comparator; 029 import java.util.LinkedHashMap; 030 import java.util.List; 031 import java.util.Map; 032 033 import cascading.flow.FlowProcess; 034 import cascading.management.annotation.Property; 035 import cascading.management.annotation.PropertyConfigured; 036 import cascading.management.annotation.PropertyDescription; 037 import cascading.management.annotation.Visibility; 038 import cascading.operation.Aggregator; 039 import cascading.operation.BaseOperation; 040 import cascading.operation.Function; 041 import cascading.operation.FunctionCall; 042 import cascading.operation.OperationCall; 043 import cascading.pipe.Each; 044 import cascading.pipe.Every; 045 import cascading.pipe.GroupBy; 046 import cascading.pipe.Pipe; 047 import cascading.pipe.SubAssembly; 048 import cascading.tuple.Fields; 049 import cascading.tuple.Tuple; 050 import cascading.tuple.TupleEntry; 051 import cascading.tuple.TupleEntryCollector; 052 import cascading.tuple.util.TupleHasher; 053 import cascading.tuple.util.TupleViews; 054 import org.slf4j.Logger; 055 import org.slf4j.LoggerFactory; 056 057 /** 058 * Class AggregateBy is a {@link SubAssembly} that serves two roles for handling aggregate operations. 059 * <p/> 060 * The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the 061 * Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and 062 * completed Reduce side. Summing is associative and commutative. 063 * <p/> 064 * AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting 065 * would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be 066 * transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over 067 * two, and a hack) 068 * <p/> 069 * Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized, 070 * deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of 071 * memory and a little or no IO. 072 * <p/> 073 * Further, Combiners are limited to only associative/commutative operations. 074 * <p/> 075 * Additionally the Cascading planner can move the Map side optimization 076 * to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which 077 * is over HDFS). 078 * <p/> 079 * The second role of the AggregateBy class is to allow for composition of AggregateBy 080 * sub-classes. That is, {@link SumBy} and {@link CountBy} AggregateBy sub-classes can be performed 081 * in parallel on the same grouping keys. 082 * </p> 083 * Custom AggregateBy classes can be created by sub-classing this class and implementing a special 084 * {@link Functor} for use on the Map side. Multiple Functor instances are managed by the {@link CompositeFunction} 085 * class allowing them all to share the same LRU value map for more efficiency. 086 * <p/> 087 * AggregateBy instances return {@code argumentFields} which are used internally to control the values passed to 088 * internal Functor instances. If any argumentFields also have {@link java.util.Comparator}s, they will be used 089 * to for secondary sorting (see {@link GroupBy} {@code sortFields}. This feature is used by {@link FirstBy} to 090 * control which Tuple is seen first for a grouping. 091 * <p/> 092 * <p/> 093 * To tune the LRU, set the {@code threshold} value to a high enough value to utilize available memory. Or set a 094 * default value via the {@link #AGGREGATE_BY_THRESHOLD} property. The current default ({@link CompositeFunction#DEFAULT_THRESHOLD}) 095 * is {@code 10, 000} unique keys. Note "flushes" from the LRU will be logged in threshold increments along with memory 096 * information. 097 * <p/> 098 * Note using a AggregateBy instance automatically inserts a {@link GroupBy} into the resulting {@link cascading.flow.Flow}. 099 * And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy. 100 * <p/> 101 * Also note that {@link Unique} is not a CompositeAggregator and is slightly more optimized internally. 102 * <p/> 103 * As of Cascading 2.6 AggregateBy honors the {@link cascading.tuple.Hasher} interface for storing keys in the cache. 104 * 105 * @see SumBy 106 * @see CountBy 107 * @see Unique 108 */ 109 public class AggregateBy extends SubAssembly 110 { 111 private static final Logger LOG = LoggerFactory.getLogger( AggregateBy.class ); 112 113 public static final int USE_DEFAULT_THRESHOLD = 0; 114 public static final int DEFAULT_THRESHOLD = CompositeFunction.DEFAULT_THRESHOLD; 115 public static final String AGGREGATE_BY_THRESHOLD = "cascading.aggregateby.threshold"; 116 117 private String name; 118 private int threshold; 119 private Fields groupingFields; 120 private Fields[] argumentFields; 121 private Functor[] functors; 122 private Aggregator[] aggregators; 123 private transient GroupBy groupBy; 124 125 public enum Cache 126 { 127 Num_Keys_Flushed, 128 Num_Keys_Hit, 129 Num_Keys_Missed 130 } 131 132 @Deprecated 133 public enum Flush 134 { 135 Num_Keys_Flushed 136 } 137 138 /** 139 * Interface Functor provides a means to create a simple function for use with the {@link CompositeFunction} class. 140 * <p/> 141 * Note the {@link FlowProcess} argument provides access to the underlying properties and counter APIs. 142 */ 143 public interface Functor extends Serializable 144 { 145 /** 146 * Method getDeclaredFields returns the declaredFields of this Functor object. 147 * 148 * @return the declaredFields (type Fields) of this Functor object. 149 */ 150 Fields getDeclaredFields(); 151 152 /** 153 * Method aggregate operates on the given args in tandem (optionally) with the given context values. 154 * <p/> 155 * The context argument is the result of the previous call to this method. Use it to store values between aggregate 156 * calls (the current count, or sum of the args). 157 * <p/> 158 * On the very first invocation of aggregate for a given grouping key, context will be {@code null}. All subsequent 159 * invocations context will be the value returned on the previous invocation. 160 * 161 * @param flowProcess of type FlowProcess 162 * @param args of type TupleEntry 163 * @param context of type Tuple @return Tuple 164 */ 165 Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context ); 166 167 /** 168 * Method complete allows the final aggregate computation to be performed before the return value is collected. 169 * <p/> 170 * The number of values in the returned {@link Tuple} instance must match the number of declaredFields. 171 * <p/> 172 * It is safe to return the context object as the result value. 173 * 174 * @param flowProcess of type FlowProcess 175 * @param context of type Tuple @return Tuple 176 */ 177 Tuple complete( FlowProcess flowProcess, Tuple context ); 178 } 179 180 /** 181 * Class CompositeFunction takes multiple Functor instances and manages them as a single {@link Function}. 182 * 183 * @see Functor 184 */ 185 public static class CompositeFunction extends BaseOperation<CompositeFunction.Context> implements Function<CompositeFunction.Context> 186 { 187 public static final int DEFAULT_THRESHOLD = 10000; 188 189 private int threshold = 0; 190 private final Fields groupingFields; 191 private final Fields[] argumentFields; 192 private final Fields[] functorFields; 193 private final Functor[] functors; 194 private final TupleHasher tupleHasher; 195 196 public static class Context 197 { 198 LinkedHashMap<Tuple, Tuple[]> lru; 199 TupleEntry[] arguments; 200 Tuple result; 201 } 202 203 /** 204 * Constructor CompositeFunction creates a new CompositeFunction instance. 205 * 206 * @param groupingFields of type Fields 207 * @param argumentFields of type Fields 208 * @param functor of type Functor 209 * @param threshold of type int 210 */ 211 public CompositeFunction( Fields groupingFields, Fields argumentFields, Functor functor, int threshold ) 212 { 213 this( groupingFields, Fields.fields( argumentFields ), new Functor[]{functor}, threshold ); 214 } 215 216 /** 217 * Constructor CompositeFunction creates a new CompositeFunction instance. 218 * 219 * @param groupingFields of type Fields 220 * @param argumentFields of type Fields[] 221 * @param functors of type Functor[] 222 * @param threshold of type int 223 */ 224 public CompositeFunction( Fields groupingFields, Fields[] argumentFields, Functor[] functors, int threshold ) 225 { 226 super( getFields( groupingFields, functors ) ); // todo: groupingFields should lookup incoming type information 227 this.groupingFields = groupingFields; 228 this.argumentFields = argumentFields; 229 this.functors = functors; 230 this.threshold = threshold; 231 232 this.functorFields = new Fields[ functors.length ]; 233 234 for( int i = 0; i < functors.length; i++ ) 235 this.functorFields[ i ] = functors[ i ].getDeclaredFields(); 236 237 Comparator[] hashers = TupleHasher.merge( functorFields ); 238 if( !TupleHasher.isNull( hashers ) ) 239 this.tupleHasher = new TupleHasher( null, hashers ); 240 else 241 this.tupleHasher = null; 242 } 243 244 private static Fields getFields( Fields groupingFields, Functor[] functors ) 245 { 246 Fields fields = groupingFields; 247 248 for( Functor functor : functors ) 249 fields = fields.append( functor.getDeclaredFields() ); 250 251 return fields; 252 } 253 254 @Override 255 public void prepare( final FlowProcess flowProcess, final OperationCall<CompositeFunction.Context> operationCall ) 256 { 257 if( threshold == 0 ) 258 { 259 Integer value = flowProcess.getIntegerProperty( AGGREGATE_BY_THRESHOLD ); 260 261 if( value != null && value > 0 ) 262 threshold = value; 263 else 264 threshold = DEFAULT_THRESHOLD; 265 } 266 267 LOG.info( "using threshold value: {}", threshold ); 268 269 Fields[] fields = new Fields[ functors.length + 1 ]; 270 271 fields[ 0 ] = groupingFields; 272 273 for( int i = 0; i < functors.length; i++ ) 274 fields[ i + 1 ] = functors[ i ].getDeclaredFields(); 275 276 final Context context = new Context(); 277 278 context.arguments = new TupleEntry[ functors.length ]; 279 280 for( int i = 0; i < context.arguments.length; i++ ) 281 { 282 Fields resolvedArgumentFields = operationCall.getArgumentFields(); 283 284 int[] pos; 285 286 if( argumentFields[ i ].isAll() ) 287 pos = resolvedArgumentFields.getPos(); 288 else 289 pos = resolvedArgumentFields.getPos( argumentFields[ i ] ); // returns null if selector is ALL 290 291 Tuple narrow = TupleViews.createNarrow( pos ); 292 293 Fields currentFields; 294 295 if( this.argumentFields[ i ].isSubstitution() ) 296 currentFields = resolvedArgumentFields.select( this.argumentFields[ i ] ); // attempt to retain comparator 297 else 298 currentFields = Fields.asDeclaration( this.argumentFields[ i ] ); 299 300 context.arguments[ i ] = new TupleEntry( currentFields, narrow ); 301 } 302 303 context.result = TupleViews.createComposite( fields ); 304 305 context.lru = new LinkedHashMap<Tuple, Tuple[]>( threshold, 0.75f, true ) 306 { 307 long flushes = 0; 308 309 @Override 310 protected boolean removeEldestEntry( Map.Entry<Tuple, Tuple[]> eldest ) 311 { 312 boolean doRemove = size() > threshold; 313 314 if( doRemove ) 315 { 316 completeFunctors( flowProcess, ( (FunctionCall) operationCall ).getOutputCollector(), context.result, eldest ); 317 flowProcess.increment( Cache.Num_Keys_Flushed, 1 ); 318 flowProcess.increment( Flush.Num_Keys_Flushed, 1 ); 319 320 if( flushes % threshold == 0 ) // every multiple, write out data 321 { 322 Runtime runtime = Runtime.getRuntime(); 323 long freeMem = runtime.freeMemory() / 1024 / 1024; 324 long maxMem = runtime.maxMemory() / 1024 / 1024; 325 long totalMem = runtime.totalMemory() / 1024 / 1024; 326 327 LOG.info( "flushed keys num times: {}, with threshold: {}", flushes + 1, threshold ); 328 LOG.info( "mem on flush (mb), free: " + freeMem + ", total: " + totalMem + ", max: " + maxMem ); 329 330 float percent = (float) totalMem / (float) maxMem; 331 332 if( percent < 0.80F ) 333 LOG.info( "total mem is {}% of max mem, to better utilize unused memory consider increasing current LRU threshold with system property \"{}\"", (int) ( percent * 100.0F ), AGGREGATE_BY_THRESHOLD ); 334 } 335 336 flushes++; 337 } 338 339 return doRemove; 340 } 341 }; 342 343 operationCall.setContext( context ); 344 } 345 346 @Override 347 public void operate( FlowProcess flowProcess, FunctionCall<CompositeFunction.Context> functionCall ) 348 { 349 TupleEntry arguments = functionCall.getArguments(); 350 Tuple key = TupleHasher.wrapTuple( this.tupleHasher, arguments.selectTupleCopy( groupingFields ) ); 351 352 Context context = functionCall.getContext(); 353 Tuple[] functorContext = context.lru.get( key ); 354 355 if( functorContext == null ) 356 { 357 functorContext = new Tuple[ functors.length ]; 358 context.lru.put( key, functorContext ); 359 flowProcess.increment( Cache.Num_Keys_Missed, 1 ); 360 } 361 else 362 { 363 flowProcess.increment( Cache.Num_Keys_Hit, 1 ); 364 } 365 366 for( int i = 0; i < functors.length; i++ ) 367 { 368 TupleViews.reset( context.arguments[ i ].getTuple(), arguments.getTuple() ); 369 functorContext[ i ] = functors[ i ].aggregate( flowProcess, context.arguments[ i ], functorContext[ i ] ); 370 } 371 } 372 373 @Override 374 public void flush( FlowProcess flowProcess, OperationCall<CompositeFunction.Context> operationCall ) 375 { 376 // need to drain context 377 TupleEntryCollector collector = ( (FunctionCall) operationCall ).getOutputCollector(); 378 379 Tuple result = operationCall.getContext().result; 380 LinkedHashMap<Tuple, Tuple[]> context = operationCall.getContext().lru; 381 382 for( Map.Entry<Tuple, Tuple[]> entry : context.entrySet() ) 383 completeFunctors( flowProcess, collector, result, entry ); 384 385 operationCall.setContext( null ); 386 } 387 388 private void completeFunctors( FlowProcess flowProcess, TupleEntryCollector outputCollector, Tuple result, Map.Entry<Tuple, Tuple[]> entry ) 389 { 390 Tuple[] results = new Tuple[ functors.length + 1 ]; 391 392 results[ 0 ] = entry.getKey(); 393 394 Tuple[] values = entry.getValue(); 395 396 for( int i = 0; i < functors.length; i++ ) 397 results[ i + 1 ] = functors[ i ].complete( flowProcess, values[ i ] ); 398 399 TupleViews.reset( result, results ); 400 401 outputCollector.add( result ); 402 } 403 404 @Override 405 public boolean equals( Object object ) 406 { 407 if( this == object ) 408 return true; 409 if( !( object instanceof CompositeFunction ) ) 410 return false; 411 if( !super.equals( object ) ) 412 return false; 413 414 CompositeFunction that = (CompositeFunction) object; 415 416 if( !Arrays.equals( argumentFields, that.argumentFields ) ) 417 return false; 418 if( !Arrays.equals( functorFields, that.functorFields ) ) 419 return false; 420 if( !Arrays.equals( functors, that.functors ) ) 421 return false; 422 if( groupingFields != null ? !groupingFields.equals( that.groupingFields ) : that.groupingFields != null ) 423 return false; 424 425 return true; 426 } 427 428 @Override 429 public int hashCode() 430 { 431 int result = super.hashCode(); 432 result = 31 * result + ( groupingFields != null ? groupingFields.hashCode() : 0 ); 433 result = 31 * result + ( argumentFields != null ? Arrays.hashCode( argumentFields ) : 0 ); 434 result = 31 * result + ( functorFields != null ? Arrays.hashCode( functorFields ) : 0 ); 435 result = 31 * result + ( functors != null ? Arrays.hashCode( functors ) : 0 ); 436 return result; 437 } 438 } 439 440 /** 441 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 442 * 443 * @param name of type String 444 * @param threshold of type int 445 */ 446 protected AggregateBy( String name, int threshold ) 447 { 448 this.name = name; 449 this.threshold = threshold; 450 } 451 452 /** 453 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 454 * 455 * @param argumentFields of type Fields 456 * @param functor of type Functor 457 * @param aggregator of type Aggregator 458 */ 459 protected AggregateBy( Fields argumentFields, Functor functor, Aggregator aggregator ) 460 { 461 this.argumentFields = Fields.fields( argumentFields ); 462 this.functors = new Functor[]{functor}; 463 this.aggregators = new Aggregator[]{aggregator}; 464 } 465 466 /** 467 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 468 * 469 * @param pipe of type Pipe 470 * @param groupingFields of type Fields 471 * @param assemblies of type CompositeAggregator... 472 */ 473 @ConstructorProperties( {"pipe", "groupingFields", "assemblies"} ) 474 public AggregateBy( Pipe pipe, Fields groupingFields, AggregateBy... assemblies ) 475 { 476 this( null, Pipe.pipes( pipe ), groupingFields, 0, assemblies ); 477 } 478 479 /** 480 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 481 * 482 * @param pipe of type Pipe 483 * @param groupingFields of type Fields 484 * @param threshold of type int 485 * @param assemblies of type CompositeAggregator... 486 */ 487 @ConstructorProperties( {"pipe", "groupingFields", "threshold", "assemblies"} ) 488 public AggregateBy( Pipe pipe, Fields groupingFields, int threshold, AggregateBy... assemblies ) 489 { 490 this( null, Pipe.pipes( pipe ), groupingFields, threshold, assemblies ); 491 } 492 493 /** 494 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 495 * 496 * @param pipe of type Pipe 497 * @param groupingFields of type Fields 498 * @param threshold of type int 499 * @param assemblies of type CompositeAggregator... 500 */ 501 @ConstructorProperties( {"name", "pipe", "groupingFields", "threshold", "assemblies"} ) 502 public AggregateBy( String name, Pipe pipe, Fields groupingFields, int threshold, AggregateBy... assemblies ) 503 { 504 this( name, Pipe.pipes( pipe ), groupingFields, threshold, assemblies ); 505 } 506 507 /** 508 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 509 * 510 * @param name of type String 511 * @param pipes of type Pipe[] 512 * @param groupingFields of type Fields 513 * @param assemblies of type CompositeAggregator... 514 */ 515 @ConstructorProperties( {"name", "pipes", "groupingFields", "assemblies"} ) 516 public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies ) 517 { 518 this( name, pipes, groupingFields, 0, assemblies ); 519 } 520 521 /** 522 * Constructor CompositeAggregator creates a new CompositeAggregator instance. 523 * 524 * @param name of type String 525 * @param pipes of type Pipe[] 526 * @param groupingFields of type Fields 527 * @param threshold of type int 528 * @param assemblies of type CompositeAggregator... 529 */ 530 @ConstructorProperties( {"name", "pipes", "groupingFields", "threshold", "assemblies"} ) 531 public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, int threshold, AggregateBy... assemblies ) 532 { 533 this( name, threshold ); 534 535 List<Fields> arguments = new ArrayList<Fields>(); 536 List<Functor> functors = new ArrayList<Functor>(); 537 List<Aggregator> aggregators = new ArrayList<Aggregator>(); 538 539 for( int i = 0; i < assemblies.length; i++ ) 540 { 541 AggregateBy assembly = assemblies[ i ]; 542 543 Collections.addAll( arguments, assembly.getArgumentFields() ); 544 Collections.addAll( functors, assembly.getFunctors() ); 545 Collections.addAll( aggregators, assembly.getAggregators() ); 546 } 547 548 initialize( groupingFields, pipes, arguments.toArray( new Fields[ arguments.size() ] ), functors.toArray( new Functor[ functors.size() ] ), aggregators.toArray( new Aggregator[ aggregators.size() ] ) ); 549 } 550 551 protected AggregateBy( String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, Functor functor, Aggregator aggregator, int threshold ) 552 { 553 this( name, threshold ); 554 initialize( groupingFields, pipes, argumentFields, functor, aggregator ); 555 } 556 557 protected void initialize( Fields groupingFields, Pipe[] pipes, Fields argumentFields, Functor functor, Aggregator aggregator ) 558 { 559 initialize( groupingFields, pipes, Fields.fields( argumentFields ), 560 new Functor[]{functor}, 561 new Aggregator[]{aggregator} ); 562 } 563 564 protected void initialize( Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, Functor[] functors, Aggregator[] aggregators ) 565 { 566 setPrevious( pipes ); 567 568 this.groupingFields = groupingFields; 569 this.argumentFields = argumentFields; 570 this.functors = functors; 571 this.aggregators = aggregators; 572 573 verify(); 574 575 Fields sortFields = Fields.copyComparators( Fields.merge( this.argumentFields ), this.argumentFields ); 576 Fields argumentSelector = Fields.merge( this.groupingFields, sortFields ); 577 578 if( argumentSelector.equals( Fields.NONE ) ) 579 argumentSelector = Fields.ALL; 580 581 Pipe[] functions = new Pipe[ pipes.length ]; 582 583 CompositeFunction function = new CompositeFunction( this.groupingFields, this.argumentFields, this.functors, threshold ); 584 585 for( int i = 0; i < functions.length; i++ ) 586 functions[ i ] = new Each( pipes[ i ], argumentSelector, function, Fields.RESULTS ); 587 588 groupBy = new GroupBy( name, functions, this.groupingFields, sortFields.hasComparators() ? sortFields : null ); 589 590 Pipe pipe = groupBy; 591 592 for( int i = 0; i < aggregators.length; i++ ) 593 pipe = new Every( pipe, this.functors[ i ].getDeclaredFields(), this.aggregators[ i ], Fields.ALL ); 594 595 setTails( pipe ); 596 } 597 598 /** Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns. */ 599 protected void verify() 600 { 601 602 } 603 604 /** 605 * Method getGroupingFields returns the Fields this instances will be grouping against. 606 * 607 * @return the current grouping fields 608 */ 609 public Fields getGroupingFields() 610 { 611 return groupingFields; 612 } 613 614 /** 615 * Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the 616 * field declaration of the given Aggregator operations. 617 * <p/> 618 * Note the actual Fields values are returned, not planner resolved Fields. 619 * 620 * @return and array of Fields 621 */ 622 public Fields[] getFieldDeclarations() 623 { 624 Fields[] fields = new Fields[ this.aggregators.length ]; 625 626 for( int i = 0; i < aggregators.length; i++ ) 627 fields[ i ] = aggregators[ i ].getFieldDeclaration(); 628 629 return fields; 630 } 631 632 protected Fields[] getArgumentFields() 633 { 634 return argumentFields; 635 } 636 637 protected Functor[] getFunctors() 638 { 639 return functors; 640 } 641 642 protected Aggregator[] getAggregators() 643 { 644 return aggregators; 645 } 646 647 /** 648 * Method getGroupBy returns the internal {@link GroupBy} instance so that any custom properties 649 * can be set on it via {@link cascading.pipe.Pipe#getStepConfigDef()}. 650 * 651 * @return GroupBy type 652 */ 653 public GroupBy getGroupBy() 654 { 655 return groupBy; 656 } 657 658 @Property( name = "threshold", visibility = Visibility.PUBLIC ) 659 @PropertyDescription( "Threshold of the aggregation." ) 660 @PropertyConfigured( value = AGGREGATE_BY_THRESHOLD, defaultValue = "10000" ) 661 public int getThreshold() 662 { 663 return threshold; 664 } 665 }