001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.aggregator;
022    
023    import java.beans.ConstructorProperties;
024    import java.lang.reflect.Type;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.operation.Aggregator;
028    import cascading.operation.AggregatorCall;
029    import cascading.operation.BaseOperation;
030    import cascading.operation.OperationCall;
031    import cascading.tuple.Fields;
032    import cascading.tuple.Tuple;
033    import cascading.tuple.TupleEntry;
034    import cascading.tuple.coerce.Coercions;
035    import cascading.tuple.type.CoercibleType;
036    
037    /** Class Average is an {@link Aggregator} that returns the average of all numeric values in the current group. */
038    public class Average extends BaseOperation<Average.Context> implements Aggregator<Average.Context>
039      {
040      /** Field FIELD_NAME */
041      public static final String FIELD_NAME = "average";
042    
043      /** Field type */
044      private Type type = Double.class;
045      private CoercibleType canonical;
046    
047      /** Class Context is used to hold intermediate values. */
048      protected static class Context
049        {
050        private final CoercibleType canonical;
051    
052        Tuple tuple = Tuple.size( 1 );
053        double sum = 0.0D;
054        long count = 0L;
055    
056        public Context( CoercibleType canonical )
057          {
058          this.canonical = canonical;
059          }
060    
061        public Context reset()
062          {
063          sum = 0.0D;
064          count = 0L;
065    
066          return this;
067          }
068    
069        public Tuple result()
070          {
071          tuple.set( 0, canonical.canonical( sum / count ) );
072    
073          return tuple;
074          }
075        }
076    
077      /** Constructs a new instance that returns the average of the values encountered in the field name "average". */
078      public Average()
079        {
080        super( 1, new Fields( FIELD_NAME, Double.class ) );
081    
082        this.canonical = Coercions.coercibleTypeFor( this.type );
083        }
084    
085      /**
086       * Constructs a new instance that returns the average of the values encountered in the given fieldDeclaration field name.
087       *
088       * @param fieldDeclaration of type Fields
089       */
090      @ConstructorProperties({"fieldDeclaration"})
091      public Average( Fields fieldDeclaration )
092        {
093        super( 1, fieldDeclaration );
094    
095        if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 )
096          throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() );
097    
098        if( fieldDeclaration.hasTypes() )
099          this.type = fieldDeclaration.getType( 0 );
100    
101        this.canonical = Coercions.coercibleTypeFor( this.type );
102        }
103    
104      @Override
105      public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
106        {
107        operationCall.setContext( new Context( canonical ) );
108        }
109    
110      @Override
111      public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
112        {
113        aggregatorCall.getContext().reset();
114        }
115    
116      @Override
117      public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
118        {
119        Context context = aggregatorCall.getContext();
120        TupleEntry arguments = aggregatorCall.getArguments();
121    
122        context.sum += arguments.getDouble( 0 );
123        context.count += 1L;
124        }
125    
126      @Override
127      public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
128        {
129        aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) );
130        }
131    
132      private Tuple getResult( AggregatorCall<Context> aggregatorCall )
133        {
134        return aggregatorCall.getContext().result();
135        }
136      }