001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.operation.aggregator;
022
023import java.beans.ConstructorProperties;
024import java.lang.reflect.Type;
025
026import cascading.flow.FlowProcess;
027import cascading.operation.Aggregator;
028import cascading.operation.AggregatorCall;
029import cascading.operation.BaseOperation;
030import cascading.operation.OperationCall;
031import cascading.tuple.Fields;
032import cascading.tuple.Tuple;
033import cascading.tuple.TupleEntry;
034import cascading.tuple.coerce.Coercions;
035import cascading.tuple.type.CoercibleType;
036import cascading.util.Pair;
037
038/** Class Sum is an {@link Aggregator} that returns the sum of all numeric values in the current group. */
039public class Sum extends BaseOperation<Pair<Double[], Tuple>> implements Aggregator<Pair<Double[], Tuple>>
040  {
041  /** Field FIELD_NAME */
042  public static final String FIELD_NAME = "sum";
043
044  /** Field type */
045  private Type type = Double.class;
046  private CoercibleType canonical;
047
048  /** Constructor Sum creates a new Sum instance that accepts one argument and returns a single field named "sum". */
049  public Sum()
050    {
051    super( 1, new Fields( FIELD_NAME, Double.class ) );
052    this.canonical = Coercions.coercibleTypeFor( this.type );
053    }
054
055  /**
056   * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts
057   * only 1 argument.
058   * <p/>
059   * If the given {@code fieldDeclaration} has a type, it will be used to coerce the result value.
060   *
061   * @param fieldDeclaration of type Fields
062   */
063  @ConstructorProperties({"fieldDeclaration"})
064  public Sum( Fields fieldDeclaration )
065    {
066    super( 1, fieldDeclaration );
067
068    if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 )
069      throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() );
070
071    if( fieldDeclaration.hasTypes() )
072      this.type = fieldDeclaration.getType( 0 );
073
074    this.canonical = Coercions.coercibleTypeFor( this.type );
075    }
076
077  /**
078   * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts
079   * only 1 argument. The return result is coerced into the given Class type.
080   *
081   * @param fieldDeclaration of type Fields
082   * @param type             of type Class
083   */
084  @ConstructorProperties({"fieldDeclaration", "type"})
085  public Sum( Fields fieldDeclaration, Class type )
086    {
087    this( fieldDeclaration.applyTypes( type ) );
088    this.type = type;
089    this.canonical = Coercions.coercibleTypeFor( this.type );
090    }
091
092  public Type getType()
093    {
094    return type;
095    }
096
097  @Override
098  public void prepare( FlowProcess flowProcess, OperationCall<Pair<Double[], Tuple>> operationCall )
099    {
100    operationCall.setContext( new Pair<Double[], Tuple>( new Double[]{null}, Tuple.size( 1 ) ) );
101    }
102
103  @Override
104  public void start( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall )
105    {
106    aggregatorCall.getContext().getLhs()[ 0 ] = null;
107    aggregatorCall.getContext().getRhs().set( 0, null );
108    }
109
110  @Override
111  public void aggregate( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall )
112    {
113    TupleEntry arguments = aggregatorCall.getArguments();
114
115    if( arguments.getObject( 0 ) == null )
116      return;
117
118    Double[] sum = aggregatorCall.getContext().getLhs();
119
120    double value = sum[ 0 ] == null ? 0 : sum[ 0 ];
121    sum[ 0 ] = value + arguments.getDouble( 0 );
122    }
123
124  @Override
125  public void complete( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall )
126    {
127    aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) );
128    }
129
130  protected Tuple getResult( AggregatorCall<Pair<Double[], Tuple>> aggregatorCall )
131    {
132    aggregatorCall.getContext().getRhs().set( 0, canonical.canonical( aggregatorCall.getContext().getLhs()[ 0 ] ) );
133
134    return aggregatorCall.getContext().getRhs();
135    }
136
137  @Override
138  public boolean equals( Object object )
139    {
140    if( this == object )
141      return true;
142    if( !( object instanceof Sum ) )
143      return false;
144    if( !super.equals( object ) )
145      return false;
146
147    Sum sum = (Sum) object;
148
149    if( type != null ? !type.equals( sum.type ) : sum.type != null )
150      return false;
151
152    return true;
153    }
154
155  @Override
156  public int hashCode()
157    {
158    int result = super.hashCode();
159    result = 31 * result + ( type != null ? type.hashCode() : 0 );
160    return result;
161    }
162  }