001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.aggregator;
022
023import java.beans.ConstructorProperties;
024import java.lang.reflect.Type;
025
026import cascading.flow.FlowProcess;
027import cascading.operation.Aggregator;
028import cascading.operation.AggregatorCall;
029import cascading.operation.BaseOperation;
030import cascading.operation.OperationCall;
031import cascading.tuple.Fields;
032import cascading.tuple.Tuple;
033import cascading.tuple.TupleEntry;
034import cascading.tuple.coerce.Coercions;
035import cascading.tuple.type.CoercibleType;
036
037/** Class Average is an {@link Aggregator} that returns the average of all numeric values in the current group. */
038public class Average extends BaseOperation<Average.Context> implements Aggregator<Average.Context>
039  {
040  /** Field FIELD_NAME */
041  public static final String FIELD_NAME = "average";
042
043  /** Field type */
044  private Type type = Double.class;
045  private CoercibleType canonical;
046
047  /** Class Context is used to hold intermediate values. */
048  protected static class Context
049    {
050    private final CoercibleType canonical;
051
052    Tuple tuple = Tuple.size( 1 );
053    double sum = 0.0D;
054    long count = 0L;
055
056    public Context( CoercibleType canonical )
057      {
058      this.canonical = canonical;
059      }
060
061    public Context reset()
062      {
063      sum = 0.0D;
064      count = 0L;
065
066      return this;
067      }
068
069    public Tuple result()
070      {
071      tuple.set( 0, canonical.canonical( sum / count ) );
072
073      return tuple;
074      }
075    }
076
077  /** Constructs a new instance that returns the average of the values encountered in the field name "average". */
078  public Average()
079    {
080    super( 1, new Fields( FIELD_NAME, Double.class ) );
081
082    this.canonical = Coercions.coercibleTypeFor( this.type );
083    }
084
085  /**
086   * Constructs a new instance that returns the average of the values encountered in the given fieldDeclaration field name.
087   *
088   * @param fieldDeclaration of type Fields
089   */
090  @ConstructorProperties({"fieldDeclaration"})
091  public Average( Fields fieldDeclaration )
092    {
093    super( 1, fieldDeclaration );
094
095    if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 )
096      throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() );
097
098    if( fieldDeclaration.hasTypes() )
099      this.type = fieldDeclaration.getType( 0 );
100
101    this.canonical = Coercions.coercibleTypeFor( this.type );
102    }
103
104  @Override
105  public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCall )
106    {
107    operationCall.setContext( new Context( canonical ) );
108    }
109
110  @Override
111  public void start( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
112    {
113    aggregatorCall.getContext().reset();
114    }
115
116  @Override
117  public void aggregate( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
118    {
119    Context context = aggregatorCall.getContext();
120    TupleEntry arguments = aggregatorCall.getArguments();
121
122    context.sum += arguments.getDouble( 0 );
123    context.count += 1L;
124    }
125
126  @Override
127  public void complete( FlowProcess flowProcess, AggregatorCall<Context> aggregatorCall )
128    {
129    aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) );
130    }
131
132  private Tuple getResult( AggregatorCall<Context> aggregatorCall )
133    {
134    return aggregatorCall.getContext().result();
135    }
136  }