001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.operation.aggregator;
022    
023    import java.beans.ConstructorProperties;
024    import java.lang.reflect.Type;
025    
026    import cascading.flow.FlowProcess;
027    import cascading.operation.Aggregator;
028    import cascading.operation.AggregatorCall;
029    import cascading.operation.BaseOperation;
030    import cascading.operation.OperationCall;
031    import cascading.tuple.Fields;
032    import cascading.tuple.Tuple;
033    import cascading.tuple.TupleEntry;
034    import cascading.tuple.coerce.Coercions;
035    import cascading.tuple.type.CoercibleType;
036    import cascading.util.Pair;
037    
038    /** Class Sum is an {@link Aggregator} that returns the sum of all numeric values in the current group. */
039    public class Sum extends BaseOperation<Pair<Double[], Tuple>> implements Aggregator<Pair<Double[], Tuple>>
040      {
041      /** Field FIELD_NAME */
042      public static final String FIELD_NAME = "sum";
043    
044      /** Field type */
045      private Type type = Double.class;
046      private CoercibleType canonical;
047    
048    
049      /** Constructor Sum creates a new Sum instance that accepts one argument and returns a single field named "sum". */
050      public Sum()
051        {
052        super( 1, new Fields( FIELD_NAME, Double.class ) );
053        this.canonical = Coercions.coercibleTypeFor( this.type );
054        }
055    
056      /**
057       * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts
058       * only 1 argument.
059       * <p/>
060       * If the given {@code fieldDeclaration} has a type, it will be used to coerce the result value.
061       *
062       * @param fieldDeclaration of type Fields
063       */
064      @ConstructorProperties({"fieldDeclaration"})
065      public Sum( Fields fieldDeclaration )
066        {
067        super( 1, fieldDeclaration );
068    
069        if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 )
070          throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() );
071    
072        if( fieldDeclaration.hasTypes() )
073          this.type = fieldDeclaration.getType( 0 );
074    
075        this.canonical = Coercions.coercibleTypeFor( this.type );
076        }
077    
078      /**
079       * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts
080       * only 1 argument. The return result is coerced into the given Class type.
081       *
082       * @param fieldDeclaration of type Fields
083       * @param type             of type Class
084       */
085      @ConstructorProperties({"fieldDeclaration", "type"})
086      public Sum( Fields fieldDeclaration, Class type )
087        {
088        this( fieldDeclaration.applyTypes( type ) );
089        this.type = type;
090        this.canonical = Coercions.coercibleTypeFor( this.type );
091        }
092    
093      public Type getType()
094        {
095        return type;
096        }
097    
098      @Override
099      public void prepare( FlowProcess flowProcess, OperationCall<Pair<Double[], Tuple>> operationCall )
100        {
101        operationCall.setContext( new Pair<Double[], Tuple>( new Double[]{null}, Tuple.size( 1 ) ) );
102        }
103    
104      @Override
105      public void start( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall )
106        {
107        aggregatorCall.getContext().getLhs()[ 0 ] = null;
108        aggregatorCall.getContext().getRhs().set( 0, null );
109        }
110    
111      @Override
112      public void aggregate( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall )
113        {
114        TupleEntry arguments = aggregatorCall.getArguments();
115    
116        if( arguments.getObject( 0 ) == null )
117          return;
118    
119        Double[] sum = aggregatorCall.getContext().getLhs();
120    
121        double value = sum[ 0 ] == null ? 0 : sum[ 0 ];
122        sum[ 0 ] = value + arguments.getDouble( 0 );
123        }
124    
125      @Override
126      public void complete( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall )
127        {
128        aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) );
129        }
130    
131      protected Tuple getResult( AggregatorCall<Pair<Double[], Tuple>> aggregatorCall )
132        {
133        aggregatorCall.getContext().getRhs().set( 0, canonical.canonical( aggregatorCall.getContext().getLhs()[ 0 ] ) );
134    
135        return aggregatorCall.getContext().getRhs();
136        }
137    
138      @Override
139      public boolean equals( Object object )
140        {
141        if( this == object )
142          return true;
143        if( !( object instanceof Sum ) )
144          return false;
145        if( !super.equals( object ) )
146          return false;
147    
148        Sum sum = (Sum) object;
149    
150        if( type != null ? !type.equals( sum.type ) : sum.type != null )
151          return false;
152    
153        return true;
154        }
155    
156      @Override
157      public int hashCode()
158        {
159        int result = super.hashCode();
160        result = 31 * result + ( type != null ? type.hashCode() : 0 );
161        return result;
162        }
163      }