001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe.assembly;
022
023import java.beans.ConstructorProperties;
024import java.io.Serializable;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.List;
030import java.util.Map;
031
032import cascading.CascadingException;
033import cascading.flow.FlowProcess;
034import cascading.management.annotation.Property;
035import cascading.management.annotation.PropertyConfigured;
036import cascading.management.annotation.PropertyDescription;
037import cascading.management.annotation.Visibility;
038import cascading.operation.Aggregator;
039import cascading.operation.BaseOperation;
040import cascading.operation.Function;
041import cascading.operation.FunctionCall;
042import cascading.operation.OperationCall;
043import cascading.pipe.Each;
044import cascading.pipe.Every;
045import cascading.pipe.GroupBy;
046import cascading.pipe.Pipe;
047import cascading.pipe.SubAssembly;
048import cascading.provider.FactoryLoader;
049import cascading.tuple.Fields;
050import cascading.tuple.Tuple;
051import cascading.tuple.TupleEntry;
052import cascading.tuple.TupleEntryCollector;
053import cascading.tuple.util.TupleHasher;
054import cascading.tuple.util.TupleViews;
055import cascading.util.cache.BaseCacheFactory;
056import cascading.util.cache.CacheEvictionCallback;
057import cascading.util.cache.CascadingCache;
058
059/**
060 * Class AggregateBy is a {@link SubAssembly} that serves two roles for handling aggregate operations.
061 * <p/>
062 * The first role is as a base class for composable aggregate operations that have a MapReduce Map side optimization for the
063 * Reduce side aggregation. For example 'summing' a value within a grouping can be performed partially Map side and
064 * completed Reduce side. Summing is associative and commutative.
065 * <p/>
066 * AggregateBy also supports operations that are not associative/commutative like 'counting'. Counting
067 * would result in 'counting' value occurrences Map side but summing those counts Reduce side. (Yes, counting can be
068 * transposed to summing Map and Reduce sides by emitting 1's before the first sum, but that's three operations over
069 * two, and a hack)
070 * <p/>
071 * Think of this mechanism as a MapReduce Combiner, but more efficient as no values are serialized,
072 * deserialized, saved to disk, and multi-pass sorted in the process, which consume cpu resources in trade of
073 * memory and a little or no IO.
074 * <p/>
075 * Further, Combiners are limited to only associative/commutative operations.
076 * <p/>
077 * Additionally the Cascading planner can move the Map side optimization
078 * to the previous Reduce operation further increasing IO performance (between the preceding Reduce and Map phase which
079 * is over HDFS).
080 * <p/>
081 * The second role of the AggregateBy class is to allow for composition of AggregateBy
082 * sub-classes. That is, {@link SumBy} and {@link CountBy} AggregateBy sub-classes can be performed
083 * in parallel on the same grouping keys.
084 * </p>
085 * Custom AggregateBy classes can be created by sub-classing this class and implementing a special
086 * {@link Functor} for use on the Map side. Multiple Functor instances are managed by the {@link CompositeFunction}
087 * class allowing them all to share the same LRU value map for more efficiency.
088 * <p/>
089 * AggregateBy instances return {@code argumentFields} which are used internally to control the values passed to
090 * internal Functor instances. If any argumentFields also have {@link java.util.Comparator}s, they will be used
091 * to for secondary sorting (see {@link GroupBy} {@code sortFields}. This feature is used by {@link FirstBy} to
092 * control which Tuple is seen first for a grouping.
093 * <p/>
094 * To tune the LRU, set the {@code capacity} value to a high enough value to utilize available memory. Or set a
095 * default value via the {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CAPACITY} property. The current default
096 * ({@link cascading.util.cache.BaseCacheFactory#DEFAULT_CAPACITY})
097 * is {@code 10, 000} unique keys.
098 * <p/>
099 * The LRU cache is pluggable and defaults to {@link cascading.util.cache.LRUHashMapCache}. It can be changed
100 * by setting {@link cascading.pipe.assembly.AggregateByProps#AGGREGATE_BY_CACHE_FACTORY} property to the name of a sub-class of
101 * {@link cascading.util.cache.BaseCacheFactory}.
102 * <p/>
103 * Note using a AggregateBy instance automatically inserts a {@link GroupBy} into the resulting {@link cascading.flow.Flow}.
104 * And passing multiple AggregateBy instances to a parent AggregateBy instance still results in one GroupBy.
105 * <p/>
106 * Also note that {@link Unique} is not a CompositeAggregator and is slightly more optimized internally.
107 * <p/>
108 * As of Cascading 2.6 AggregateBy honors the {@link cascading.tuple.Hasher} interface for storing keys in the cache.
109 *
110 * @see SumBy
111 * @see CountBy
112 * @see Unique
113 * @see cascading.util.cache.LRUHashMapCacheFactory
114 * @see cascading.util.cache.DirectMappedCacheFactory
115 * @see cascading.util.cache.LRUHashMapCache
116 * @see cascading.util.cache.DirectMappedCache
117 */
118public class AggregateBy extends SubAssembly
119  {
120  public static final int USE_DEFAULT_THRESHOLD = 0;
121
122  private String name;
123  private int capacity;
124  private Fields groupingFields;
125  private Fields[] argumentFields;
126  private Functor[] functors;
127  private Aggregator[] aggregators;
128  private transient GroupBy groupBy;
129
130  public enum Cache
131    {
132      Num_Keys_Flushed,
133      Num_Keys_Hit,
134      Num_Keys_Missed
135    }
136
137  /**
138   * Interface Functor provides a means to create a simple function for use with the {@link CompositeFunction} class.
139   * <p/>
140   * Note the {@link FlowProcess} argument provides access to the underlying properties and counter APIs.
141   */
142  public interface Functor extends Serializable
143    {
144    /**
145     * Method getDeclaredFields returns the declaredFields of this Functor object.
146     *
147     * @return the declaredFields (type Fields) of this Functor object.
148     */
149    Fields getDeclaredFields();
150
151    /**
152     * Method aggregate operates on the given args in tandem (optionally) with the given context values.
153     * <p/>
154     * The context argument is the result of the previous call to this method. Use it to store values between aggregate
155     * calls (the current count, or sum of the args).
156     * <p/>
157     * On the very first invocation of aggregate for a given grouping key, context will be {@code null}. All subsequent
158     * invocations context will be the value returned on the previous invocation.
159     *
160     * @param flowProcess of type FlowProcess
161     * @param args        of type TupleEntry
162     * @param context     of type Tuple   @return Tuple
163     */
164    Tuple aggregate( FlowProcess flowProcess, TupleEntry args, Tuple context );
165
166    /**
167     * Method complete allows the final aggregate computation to be performed before the return value is collected.
168     * <p/>
169     * The number of values in the returned {@link Tuple} instance must match the number of declaredFields.
170     * <p/>
171     * It is safe to return the context object as the result value.
172     *
173     * @param flowProcess of type FlowProcess
174     * @param context     of type Tuple  @return Tuple
175     */
176    Tuple complete( FlowProcess flowProcess, Tuple context );
177    }
178
179  /**
180   * Class CompositeFunction takes multiple Functor instances and manages them as a single {@link Function}.
181   *
182   * @see Functor
183   */
184  public static class CompositeFunction extends BaseOperation<CompositeFunction.Context> implements Function<CompositeFunction.Context>
185    {
186    private int capacity = 0;
187    private final Fields groupingFields;
188    private final Fields[] argumentFields;
189    private final Fields[] functorFields;
190    private final Functor[] functors;
191    private final TupleHasher tupleHasher;
192
193    public static class Context
194      {
195      CascadingCache<Tuple, Tuple[]> lru;
196      TupleEntry[] arguments;
197      Tuple result;
198      }
199
200    /**
201     * Constructor CompositeFunction creates a new CompositeFunction instance.
202     *
203     * @param groupingFields of type Fields
204     * @param argumentFields of type Fields
205     * @param functor        of type Functor
206     * @param capacity       of type int
207     */
208    public CompositeFunction( Fields groupingFields, Fields argumentFields, Functor functor, int capacity )
209      {
210      this( groupingFields, Fields.fields( argumentFields ), new Functor[]{functor}, capacity );
211      }
212
213    /**
214     * Constructor CompositeFunction creates a new CompositeFunction instance.
215     *
216     * @param groupingFields of type Fields
217     * @param argumentFields of type Fields[]
218     * @param functors       of type Functor[]
219     * @param capacity       of type int
220     */
221    public CompositeFunction( Fields groupingFields, Fields[] argumentFields, Functor[] functors, int capacity )
222      {
223      super( getFields( groupingFields, functors ) ); // todo: groupingFields should lookup incoming type information
224      this.groupingFields = groupingFields;
225      this.argumentFields = argumentFields;
226      this.functors = functors;
227      this.capacity = capacity;
228
229      this.functorFields = new Fields[ functors.length ];
230
231      for( int i = 0; i < functors.length; i++ )
232        this.functorFields[ i ] = functors[ i ].getDeclaredFields();
233
234      Comparator[] hashers = TupleHasher.merge( functorFields );
235      if( !TupleHasher.isNull( hashers ) )
236        this.tupleHasher = new TupleHasher( null, hashers );
237      else
238        this.tupleHasher = null;
239      }
240
241    private static Fields getFields( Fields groupingFields, Functor[] functors )
242      {
243      Fields fields = groupingFields;
244
245      for( Functor functor : functors )
246        fields = fields.append( functor.getDeclaredFields() );
247
248      return fields;
249      }
250
251    @Override
252    public void prepare( final FlowProcess flowProcess, final OperationCall<CompositeFunction.Context> operationCall )
253      {
254      Fields[] fields = new Fields[ functors.length + 1 ];
255
256      fields[ 0 ] = groupingFields;
257
258      for( int i = 0; i < functors.length; i++ )
259        fields[ i + 1 ] = functors[ i ].getDeclaredFields();
260
261      final Context context = new Context();
262
263      context.arguments = new TupleEntry[ functors.length ];
264
265      for( int i = 0; i < context.arguments.length; i++ )
266        {
267        Fields resolvedArgumentFields = operationCall.getArgumentFields();
268
269        int[] pos;
270
271        if( argumentFields[ i ].isAll() )
272          pos = resolvedArgumentFields.getPos();
273        else
274          pos = resolvedArgumentFields.getPos( argumentFields[ i ] ); // returns null if selector is ALL
275
276        Tuple narrow = TupleViews.createNarrow( pos );
277
278        Fields currentFields;
279
280        if( this.argumentFields[ i ].isSubstitution() )
281          currentFields = resolvedArgumentFields.select( this.argumentFields[ i ] ); // attempt to retain comparator
282        else
283          currentFields = Fields.asDeclaration( this.argumentFields[ i ] );
284
285        context.arguments[ i ] = new TupleEntry( currentFields, narrow );
286        }
287
288      context.result = TupleViews.createComposite( fields );
289
290      class AggregateByEviction implements CacheEvictionCallback<Tuple, Tuple[]>
291        {
292        @Override
293        public void evict( Map.Entry<Tuple, Tuple[]> entry )
294          {
295          completeFunctors( flowProcess, ( (FunctionCall) operationCall ).getOutputCollector(), context.result, entry );
296          flowProcess.increment( Cache.Num_Keys_Flushed, 1 );
297          }
298        }
299
300      FactoryLoader loader = FactoryLoader.getInstance();
301
302      BaseCacheFactory<Tuple, Tuple[], ?> factory = loader.loadFactoryFrom( flowProcess, AggregateByProps.AGGREGATE_BY_CACHE_FACTORY, AggregateByProps.DEFAULT_CACHE_FACTORY_CLASS );
303
304      if( factory == null )
305        throw new CascadingException( "unable to load cache factory, please check your '" + AggregateByProps.AGGREGATE_BY_CACHE_FACTORY + "' setting." );
306
307      CascadingCache<Tuple, Tuple[]> cache = factory.create( flowProcess );
308
309      cache.setCacheEvictionCallback( new AggregateByEviction() );
310
311      Integer cacheCapacity = capacity;
312      if( capacity == 0 )
313        {
314        cacheCapacity = flowProcess.getIntegerProperty( AggregateByProps.AGGREGATE_BY_CAPACITY );
315
316        if( cacheCapacity == null )
317          cacheCapacity = AggregateByProps.AGGREGATE_BY_DEFAULT_CAPACITY;
318        }
319      cache.setCapacity( cacheCapacity.intValue() );
320      cache.initialize();
321
322      context.lru = cache;
323
324      operationCall.setContext( context );
325      }
326
327    @Override
328    public void operate( FlowProcess flowProcess, FunctionCall<CompositeFunction.Context> functionCall )
329      {
330      TupleEntry arguments = functionCall.getArguments();
331      Tuple key = TupleHasher.wrapTuple( this.tupleHasher, arguments.selectTupleCopy( groupingFields ) );
332
333      Context context = functionCall.getContext();
334      Tuple[] functorContext = context.lru.get( key );
335
336      if( functorContext == null )
337        {
338        functorContext = new Tuple[ functors.length ];
339        context.lru.put( key, functorContext );
340        flowProcess.increment( Cache.Num_Keys_Missed, 1 );
341        }
342      else
343        {
344        flowProcess.increment( Cache.Num_Keys_Hit, 1 );
345        }
346
347      for( int i = 0; i < functors.length; i++ )
348        {
349        TupleViews.reset( context.arguments[ i ].getTuple(), arguments.getTuple() );
350        functorContext[ i ] = functors[ i ].aggregate( flowProcess, context.arguments[ i ], functorContext[ i ] );
351        }
352      }
353
354    @Override
355    public void flush( FlowProcess flowProcess, OperationCall<CompositeFunction.Context> operationCall )
356      {
357      // need to drain context
358      TupleEntryCollector collector = ( (FunctionCall) operationCall ).getOutputCollector();
359
360      Tuple result = operationCall.getContext().result;
361      Map<Tuple, Tuple[]> context = operationCall.getContext().lru;
362
363      for( Map.Entry<Tuple, Tuple[]> entry : context.entrySet() )
364        completeFunctors( flowProcess, collector, result, entry );
365
366      operationCall.setContext( null );
367      }
368
369    private void completeFunctors( FlowProcess flowProcess, TupleEntryCollector outputCollector, Tuple result, Map.Entry<Tuple, Tuple[]> entry )
370      {
371      Tuple[] results = new Tuple[ functors.length + 1 ];
372
373      results[ 0 ] = entry.getKey();
374
375      Tuple[] values = entry.getValue();
376
377      for( int i = 0; i < functors.length; i++ )
378        results[ i + 1 ] = functors[ i ].complete( flowProcess, values[ i ] );
379
380      TupleViews.reset( result, results );
381
382      outputCollector.add( result );
383      }
384
385    @Override
386    public boolean equals( Object object )
387      {
388      if( this == object )
389        return true;
390      if( !( object instanceof CompositeFunction ) )
391        return false;
392      if( !super.equals( object ) )
393        return false;
394
395      CompositeFunction that = (CompositeFunction) object;
396
397      if( !Arrays.equals( argumentFields, that.argumentFields ) )
398        return false;
399      if( !Arrays.equals( functorFields, that.functorFields ) )
400        return false;
401      if( !Arrays.equals( functors, that.functors ) )
402        return false;
403      if( groupingFields != null ? !groupingFields.equals( that.groupingFields ) : that.groupingFields != null )
404        return false;
405
406      return true;
407      }
408
409    @Override
410    public int hashCode()
411      {
412      int result = super.hashCode();
413      result = 31 * result + ( groupingFields != null ? groupingFields.hashCode() : 0 );
414      result = 31 * result + ( argumentFields != null ? Arrays.hashCode( argumentFields ) : 0 );
415      result = 31 * result + ( functorFields != null ? Arrays.hashCode( functorFields ) : 0 );
416      result = 31 * result + ( functors != null ? Arrays.hashCode( functors ) : 0 );
417      return result;
418      }
419    }
420
421  /**
422   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
423   *
424   * @param name     of type String
425   * @param capacity of type int
426   */
427  protected AggregateBy( String name, int capacity )
428    {
429    this.name = name;
430    this.capacity = capacity;
431    }
432
433  /**
434   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
435   *
436   * @param argumentFields of type Fields
437   * @param functor        of type Functor
438   * @param aggregator     of type Aggregator
439   */
440  protected AggregateBy( Fields argumentFields, Functor functor, Aggregator aggregator )
441    {
442    this.argumentFields = Fields.fields( argumentFields );
443    this.functors = new Functor[]{functor};
444    this.aggregators = new Aggregator[]{aggregator};
445    }
446
447  /**
448   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
449   *
450   * @param pipe           of type Pipe
451   * @param groupingFields of type Fields
452   * @param assemblies     of type CompositeAggregator...
453   */
454  @ConstructorProperties({"pipe", "groupingFields", "assemblies"})
455  public AggregateBy( Pipe pipe, Fields groupingFields, AggregateBy... assemblies )
456    {
457    this( null, Pipe.pipes( pipe ), groupingFields, 0, assemblies );
458    }
459
460  /**
461   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
462   *
463   * @param pipe           of type Pipe
464   * @param groupingFields of type Fields
465   * @param capacity       of type int
466   * @param assemblies     of type CompositeAggregator...
467   */
468  @ConstructorProperties({"pipe", "groupingFields", "capacity", "assemblies"})
469  public AggregateBy( Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies )
470    {
471    this( null, Pipe.pipes( pipe ), groupingFields, capacity, assemblies );
472    }
473
474  /**
475   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
476   *
477   * @param pipe           of type Pipe
478   * @param groupingFields of type Fields
479   * @param capacity       of type int
480   * @param assemblies     of type CompositeAggregator...
481   */
482  @ConstructorProperties({"name", "pipe", "groupingFields", "capacity", "assemblies"})
483  public AggregateBy( String name, Pipe pipe, Fields groupingFields, int capacity, AggregateBy... assemblies )
484    {
485    this( name, Pipe.pipes( pipe ), groupingFields, capacity, assemblies );
486    }
487
488  /**
489   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
490   *
491   * @param name           of type String
492   * @param pipes          of type Pipe[]
493   * @param groupingFields of type Fields
494   * @param assemblies     of type CompositeAggregator...
495   */
496  @ConstructorProperties({"name", "pipes", "groupingFields", "assemblies"})
497  public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, AggregateBy... assemblies )
498    {
499    this( name, pipes, groupingFields, 0, assemblies );
500    }
501
502  /**
503   * Constructor CompositeAggregator creates a new CompositeAggregator instance.
504   *
505   * @param name           of type String
506   * @param pipes          of type Pipe[]
507   * @param groupingFields of type Fields
508   * @param capacity       of type int
509   * @param assemblies     of type CompositeAggregator...
510   */
511  @ConstructorProperties({"name", "pipes", "groupingFields", "capacity", "assemblies"})
512  public AggregateBy( String name, Pipe[] pipes, Fields groupingFields, int capacity, AggregateBy... assemblies )
513    {
514    this( name, capacity );
515
516    List<Fields> arguments = new ArrayList<Fields>();
517    List<Functor> functors = new ArrayList<Functor>();
518    List<Aggregator> aggregators = new ArrayList<Aggregator>();
519
520    for( int i = 0; i < assemblies.length; i++ )
521      {
522      AggregateBy assembly = assemblies[ i ];
523
524      Collections.addAll( arguments, assembly.getArgumentFields() );
525      Collections.addAll( functors, assembly.getFunctors() );
526      Collections.addAll( aggregators, assembly.getAggregators() );
527      }
528
529    initialize( groupingFields, pipes, arguments.toArray( new Fields[ arguments.size() ] ), functors.toArray( new Functor[ functors.size() ] ), aggregators.toArray( new Aggregator[ aggregators.size() ] ) );
530    }
531
532  protected AggregateBy( String name, Pipe[] pipes, Fields groupingFields, Fields argumentFields, Functor functor, Aggregator aggregator, int capacity )
533    {
534    this( name, capacity );
535    initialize( groupingFields, pipes, argumentFields, functor, aggregator );
536    }
537
538  protected void initialize( Fields groupingFields, Pipe[] pipes, Fields argumentFields, Functor functor, Aggregator aggregator )
539    {
540    initialize( groupingFields, pipes, Fields.fields( argumentFields ),
541      new Functor[]{functor},
542      new Aggregator[]{aggregator} );
543    }
544
545  protected void initialize( Fields groupingFields, Pipe[] pipes, Fields[] argumentFields, Functor[] functors, Aggregator[] aggregators )
546    {
547    setPrevious( pipes );
548
549    this.groupingFields = groupingFields;
550    this.argumentFields = argumentFields;
551    this.functors = functors;
552    this.aggregators = aggregators;
553
554    verify();
555
556    Fields sortFields = Fields.copyComparators( Fields.merge( this.argumentFields ), this.argumentFields );
557    Fields argumentSelector = Fields.merge( this.groupingFields, sortFields );
558
559    if( argumentSelector.equals( Fields.NONE ) )
560      argumentSelector = Fields.ALL;
561
562    Pipe[] functions = new Pipe[ pipes.length ];
563
564    CompositeFunction function = new CompositeFunction( this.groupingFields, this.argumentFields, this.functors, capacity );
565
566    for( int i = 0; i < functions.length; i++ )
567      functions[ i ] = new Each( pipes[ i ], argumentSelector, function, Fields.RESULTS );
568
569    groupBy = new GroupBy( name, functions, this.groupingFields, sortFields.hasComparators() ? sortFields : null );
570
571    Pipe pipe = groupBy;
572
573    for( int i = 0; i < aggregators.length; i++ )
574      pipe = new Every( pipe, this.functors[ i ].getDeclaredFields(), this.aggregators[ i ], Fields.ALL );
575
576    setTails( pipe );
577    }
578
579  /** Method verify should be overridden by sub-classes if any values must be tested before the calling constructor returns. */
580  protected void verify()
581    {
582
583    }
584
585  /**
586   * Method getGroupingFields returns the Fields this instances will be grouping against.
587   *
588   * @return the current grouping fields
589   */
590  public Fields getGroupingFields()
591    {
592    return groupingFields;
593    }
594
595  /**
596   * Method getFieldDeclarations returns an array of Fields where each Field element in the array corresponds to the
597   * field declaration of the given Aggregator operations.
598   * <p/>
599   * Note the actual Fields values are returned, not planner resolved Fields.
600   *
601   * @return and array of Fields
602   */
603  public Fields[] getFieldDeclarations()
604    {
605    Fields[] fields = new Fields[ this.aggregators.length ];
606
607    for( int i = 0; i < aggregators.length; i++ )
608      fields[ i ] = aggregators[ i ].getFieldDeclaration();
609
610    return fields;
611    }
612
613  protected Fields[] getArgumentFields()
614    {
615    return argumentFields;
616    }
617
618  protected Functor[] getFunctors()
619    {
620    return functors;
621    }
622
623  protected Aggregator[] getAggregators()
624    {
625    return aggregators;
626    }
627
628  /**
629   * Method getGroupBy returns the internal {@link GroupBy} instance so that any custom properties
630   * can be set on it via {@link cascading.pipe.Pipe#getStepConfigDef()}.
631   *
632   * @return GroupBy type
633   */
634  public GroupBy getGroupBy()
635    {
636    return groupBy;
637    }
638
639  @Property(name = "capacity", visibility = Visibility.PUBLIC)
640  @PropertyDescription("Capacity of the aggregation cache.")
641  @PropertyConfigured(value = AggregateByProps.AGGREGATE_BY_CAPACITY, defaultValue = "10000")
642  public int getCapacity()
643    {
644    return capacity;
645    }
646  }