001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.operation.aggregator; 022 023 import java.beans.ConstructorProperties; 024 import java.lang.reflect.Type; 025 026 import cascading.flow.FlowProcess; 027 import cascading.operation.Aggregator; 028 import cascading.operation.AggregatorCall; 029 import cascading.operation.BaseOperation; 030 import cascading.operation.OperationCall; 031 import cascading.tuple.Fields; 032 import cascading.tuple.Tuple; 033 import cascading.tuple.TupleEntry; 034 import cascading.tuple.coerce.Coercions; 035 import cascading.tuple.type.CoercibleType; 036 import cascading.util.Pair; 037 038 /** Class Sum is an {@link Aggregator} that returns the sum of all numeric values in the current group. */ 039 public class Sum extends BaseOperation<Pair<Double[], Tuple>> implements Aggregator<Pair<Double[], Tuple>> 040 { 041 /** Field FIELD_NAME */ 042 public static final String FIELD_NAME = "sum"; 043 044 /** Field type */ 045 private Type type = Double.class; 046 private CoercibleType canonical; 047 048 049 /** Constructor Sum creates a new Sum instance that accepts one argument and returns a single field named "sum". */ 050 public Sum() 051 { 052 super( 1, new Fields( FIELD_NAME, Double.class ) ); 053 this.canonical = Coercions.coercibleTypeFor( this.type ); 054 } 055 056 /** 057 * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts 058 * only 1 argument. 059 * <p/> 060 * If the given {@code fieldDeclaration} has a type, it will be used to coerce the result value. 061 * 062 * @param fieldDeclaration of type Fields 063 */ 064 @ConstructorProperties({"fieldDeclaration"}) 065 public Sum( Fields fieldDeclaration ) 066 { 067 super( 1, fieldDeclaration ); 068 069 if( !fieldDeclaration.isSubstitution() && fieldDeclaration.size() != 1 ) 070 throw new IllegalArgumentException( "fieldDeclaration may only declare 1 field, got: " + fieldDeclaration.size() ); 071 072 if( fieldDeclaration.hasTypes() ) 073 this.type = fieldDeclaration.getType( 0 ); 074 075 this.canonical = Coercions.coercibleTypeFor( this.type ); 076 } 077 078 /** 079 * Constructs a new instance that returns the fields declared in fieldDeclaration and accepts 080 * only 1 argument. The return result is coerced into the given Class type. 081 * 082 * @param fieldDeclaration of type Fields 083 * @param type of type Class 084 */ 085 @ConstructorProperties({"fieldDeclaration", "type"}) 086 public Sum( Fields fieldDeclaration, Class type ) 087 { 088 this( fieldDeclaration.applyTypes( type ) ); 089 this.type = type; 090 this.canonical = Coercions.coercibleTypeFor( this.type ); 091 } 092 093 public Type getType() 094 { 095 return type; 096 } 097 098 @Override 099 public void prepare( FlowProcess flowProcess, OperationCall<Pair<Double[], Tuple>> operationCall ) 100 { 101 operationCall.setContext( new Pair<Double[], Tuple>( new Double[]{null}, Tuple.size( 1 ) ) ); 102 } 103 104 @Override 105 public void start( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall ) 106 { 107 aggregatorCall.getContext().getLhs()[ 0 ] = null; 108 aggregatorCall.getContext().getRhs().set( 0, null ); 109 } 110 111 @Override 112 public void aggregate( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall ) 113 { 114 TupleEntry arguments = aggregatorCall.getArguments(); 115 116 if( arguments.getObject( 0 ) == null ) 117 return; 118 119 Double[] sum = aggregatorCall.getContext().getLhs(); 120 121 double value = sum[ 0 ] == null ? 0 : sum[ 0 ]; 122 sum[ 0 ] = value + arguments.getDouble( 0 ); 123 } 124 125 @Override 126 public void complete( FlowProcess flowProcess, AggregatorCall<Pair<Double[], Tuple>> aggregatorCall ) 127 { 128 aggregatorCall.getOutputCollector().add( getResult( aggregatorCall ) ); 129 } 130 131 protected Tuple getResult( AggregatorCall<Pair<Double[], Tuple>> aggregatorCall ) 132 { 133 aggregatorCall.getContext().getRhs().set( 0, canonical.canonical( aggregatorCall.getContext().getLhs()[ 0 ] ) ); 134 135 return aggregatorCall.getContext().getRhs(); 136 } 137 138 @Override 139 public boolean equals( Object object ) 140 { 141 if( this == object ) 142 return true; 143 if( !( object instanceof Sum ) ) 144 return false; 145 if( !super.equals( object ) ) 146 return false; 147 148 Sum sum = (Sum) object; 149 150 if( type != null ? !type.equals( sum.type ) : sum.type != null ) 151 return false; 152 153 return true; 154 } 155 156 @Override 157 public int hashCode() 158 { 159 int result = super.hashCode(); 160 result = 31 * result + ( type != null ? type.hashCode() : 0 ); 161 return result; 162 } 163 }