001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe; 022 023import java.beans.ConstructorProperties; 024import java.util.Set; 025 026import cascading.flow.planner.Scope; 027import cascading.operation.Aggregator; 028import cascading.operation.AssertionLevel; 029import cascading.operation.Buffer; 030import cascading.operation.GroupAssertion; 031import cascading.tuple.Fields; 032 033/** 034 * The Every operator applies an {@link Aggregator} or {@link Buffer} to every grouping. 035 * <p/> 036 * Any number of Every instances may follow other Every, {@link GroupBy}, or {@link CoGroup} instances if they apply an 037 * Aggregator, not a Buffer. If a Buffer, only one Every may follow a GroupBy or CoGroup. 038 * <p/> 039 * Every operators create aggregate values for every grouping they encounter. This aggregate value is added to the current 040 * grouping Tuple. 041 * <p/> 042 * In the case of a CoGroup, the grouping Tuple will be all the grouping keys from all joined streams, 043 * and if an "outer" type join is used, one value on the groupingTuple may be null. 044 * <p/> 045 * Subsequent Every instances can continue to append values to the grouping Tuple. When an Each follows 046 * and Every, the Each applies its operation to the grouping Tuple (thus all child values in the grouping are discarded 047 * and only aggregate values are propagated). 048 */ 049public class Every extends Operator 050 { 051 /** Field AGGREGATOR_ARGUMENTS */ 052 private static final Fields AGGREGATOR_ARGUMENTS = Fields.ALL; 053 /** Field AGGREGATOR_SELECTOR */ 054 private static final Fields AGGREGATOR_SELECTOR = Fields.ALL; 055 /** Field ASSERTION_SELECTOR */ 056 private static final Fields ASSERTION_SELECTOR = Fields.RESULTS; 057 058 /** 059 * Constructor Every creates a new Every instance. 060 * 061 * @param previous previous Pipe to receive input Tuples from 062 * @param aggregator Aggregator to be applied to every input Tuple grouping 063 */ 064 @ConstructorProperties({"previous", "aggregator"}) 065 public Every( Pipe previous, Aggregator aggregator ) 066 { 067 super( previous, AGGREGATOR_ARGUMENTS, aggregator, AGGREGATOR_SELECTOR ); 068 } 069 070 /** 071 * Constructor Every creates a new Every instance. 072 * 073 * @param previous previous Pipe to receive input Tuples from 074 * @param argumentSelector field selector that selects Function arguments from the input Tuple 075 * @param aggregator Aggregator to be applied to every input Tuple grouping 076 */ 077 @ConstructorProperties({"previous", "argumentSelector", "aggregator"}) 078 public Every( Pipe previous, Fields argumentSelector, Aggregator aggregator ) 079 { 080 super( previous, argumentSelector, aggregator, AGGREGATOR_SELECTOR ); 081 } 082 083 /** 084 * Constructor Every creates a new Every instance. 085 * 086 * @param previous previous Pipe to receive input Tuples from 087 * @param argumentSelector field selector that selects Function arguments from the input Tuple 088 * @param aggregator Aggregator to be applied to every input Tuple grouping 089 * @param outputSelector field selector that selects the output Tuple from the grouping and Aggregator results Tuples 090 */ 091 @ConstructorProperties({"previous", "argumentSelector", "aggregator", "outputSelector"}) 092 public Every( Pipe previous, Fields argumentSelector, Aggregator aggregator, Fields outputSelector ) 093 { 094 super( previous, argumentSelector, aggregator, outputSelector ); 095 } 096 097 /** 098 * Constructor Every creates a new Every instance. 099 * 100 * @param previous previous Pipe to receive input Tuples from 101 * @param aggregator Aggregator to be applied to every input Tuple grouping 102 * @param outputSelector field selector that selects the output Tuple from the grouping and Aggregator results Tuples 103 */ 104 @ConstructorProperties({"previous", "aggregator", "outputSelector"}) 105 public Every( Pipe previous, Aggregator aggregator, Fields outputSelector ) 106 { 107 super( previous, AGGREGATOR_ARGUMENTS, aggregator, outputSelector ); 108 } 109 110 /** 111 * Constructor Every creates a new Every instance. 112 * 113 * @param previous previous Pipe to receive input Tuples from 114 * @param buffer Buffer to be applied to every input Tuple grouping 115 */ 116 @ConstructorProperties({"previous", "buffer"}) 117 public Every( Pipe previous, Buffer buffer ) 118 { 119 super( previous, AGGREGATOR_ARGUMENTS, buffer, AGGREGATOR_SELECTOR ); 120 } 121 122 /** 123 * Constructor Every creates a new Every instance. 124 * 125 * @param previous previous Pipe to receive input Tuples from 126 * @param argumentSelector field selector that selects Function arguments from the input Tuple 127 * @param buffer Buffer to be applied to every input Tuple grouping 128 */ 129 @ConstructorProperties({"previous", "argumentSelector", "buffer"}) 130 public Every( Pipe previous, Fields argumentSelector, Buffer buffer ) 131 { 132 super( previous, argumentSelector, buffer, AGGREGATOR_SELECTOR ); 133 } 134 135 /** 136 * Constructor Every creates a new Every instance. 137 * 138 * @param previous previous Pipe to receive input Tuples from 139 * @param argumentSelector field selector that selects Function arguments from the input Tuple 140 * @param buffer Buffer to be applied to every input Tuple grouping 141 * @param outputSelector field selector that selects the output Tuple from the grouping and Buffer results Tuples 142 */ 143 @ConstructorProperties({"previous", "argumentSelector", "buffer", "outputSelector"}) 144 public Every( Pipe previous, Fields argumentSelector, Buffer buffer, Fields outputSelector ) 145 { 146 super( previous, argumentSelector, buffer, outputSelector ); 147 } 148 149 /** 150 * Constructor Every creates a new Every instance. 151 * 152 * @param previous previous Pipe to receive input Tuples from 153 * @param buffer Buffer to be applied to every input Tuple grouping 154 * @param outputSelector field selector that selects the output Tuple from the grouping and Buffer results Tuples 155 */ 156 @ConstructorProperties({"previous", "buffer", "outputSelector"}) 157 public Every( Pipe previous, Buffer buffer, Fields outputSelector ) 158 { 159 super( previous, AGGREGATOR_ARGUMENTS, buffer, outputSelector ); 160 } 161 162 /** 163 * Constructor Every creates a new Every instance. 164 * 165 * @param previous previous Pipe to receive input Tuples from 166 * @param assertionLevel of type AssertionLevel 167 * @param assertion GroupAssertion to be applied to every input Tuple grouping 168 */ 169 @ConstructorProperties({"previous", "assertionLevel", "assertion"}) 170 public Every( Pipe previous, AssertionLevel assertionLevel, GroupAssertion assertion ) 171 { 172 super( previous, AGGREGATOR_ARGUMENTS, assertionLevel, assertion, ASSERTION_SELECTOR ); 173 } 174 175 /** 176 * Constructor Every creates a new Every instance. 177 * 178 * @param previous previous Pipe to receive input Tuples from 179 * @param argumentSelector field selector that selects Function arguments from the input Tuple 180 * @param assertionLevel AssertionLevel to associate with the Assertion 181 * @param assertion GroupAssertion to be applied to every input Tuple grouping 182 */ 183 @ConstructorProperties({"previous", "argumentSelector", "assertionLevel", "assertion"}) 184 public Every( Pipe previous, Fields argumentSelector, AssertionLevel assertionLevel, GroupAssertion assertion ) 185 { 186 super( previous, argumentSelector, assertionLevel, assertion, ASSERTION_SELECTOR ); 187 } 188 189 /** 190 * Method isBuffer returns true if this Every instance holds a {@link cascading.operation.Buffer} operation. 191 * 192 * @return boolean 193 */ 194 public boolean isBuffer() 195 { 196 return operation instanceof Buffer; 197 } 198 199 /** 200 * Method isReducer returns true if this Every instance holds a {@link Aggregator} operation. 201 * 202 * @return boolean 203 */ 204 public boolean isAggregator() 205 { 206 return operation instanceof Aggregator; 207 } 208 209 public boolean isGroupAssertion() 210 { 211 return operation instanceof GroupAssertion; 212 } 213 214 public Aggregator getAggregator() 215 { 216 return (Aggregator) operation; 217 } 218 219 public Buffer getBuffer() 220 { 221 return (Buffer) operation; 222 } 223 224 public GroupAssertion getGroupAssertion() 225 { 226 return (GroupAssertion) operation; 227 } 228 229 @Override 230 public Fields resolveIncomingOperationArgumentFields( Scope incomingScope ) 231 { 232 if( isBuffer() ) 233 return incomingScope.getIncomingBufferArgumentFields(); 234 else 235 return incomingScope.getIncomingAggregatorArgumentFields(); 236 } 237 238 @Override 239 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 240 { 241 if( isBuffer() ) 242 return incomingScope.getIncomingBufferPassThroughFields(); 243 else 244 return incomingScope.getIncomingAggregatorPassThroughFields(); 245 } 246 247 @Override 248 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 249 { 250 Scope incomingScope = getFirst( incomingScopes ); 251 252 if( !isBuffer() && incomingScope.getOutValuesFields().isNone() ) 253 throw new OperatorException( this, "only a Buffer may be preceded by a CoGroup declaring Fields.NONE as the join fields" ); 254 255 Fields argumentFields = resolveArgumentSelector( incomingScopes ); 256 257 verifyArguments( argumentFields ); 258 259 // we currently don't support using result from a previous Every in the current Every 260 verifyAggregatorArguments( argumentFields, incomingScope ); 261 262 Fields declaredFields = resolveDeclared( incomingScopes, argumentFields ); 263 264 verifyDeclaredFields( declaredFields ); 265 266 Fields outgoingGroupingFields = resolveOutgoingGroupingSelector( incomingScopes, argumentFields, declaredFields ); 267 268 verifyOutputSelector( outgoingGroupingFields ); 269 270 Fields outgoingValuesFields = incomingScope.getOutValuesFields(); 271 272 // the incoming fields eligible to be outgoing, for Every only the grouping fields. 273 Fields passThroughFields = resolveIncomingOperationPassThroughFields( incomingScope ); 274 Fields remainderFields = resolveRemainderFields( incomingScopes, argumentFields ); 275 276 return new Scope( getName(), Scope.Kind.EVERY, passThroughFields, remainderFields, argumentFields, declaredFields, outgoingGroupingFields, outgoingValuesFields ); 277 } 278 279 private void verifyAggregatorArguments( Fields argumentFields, Scope incomingScope ) 280 { 281 if( ( !isBuffer() ) && incomingScope.isEvery() && argumentFields.contains( incomingScope.getOperationDeclaredFields() ) ) 282 throw new OperatorException( this, "arguments may not select a declared field from a previous Every" ); 283 } 284 285 Fields resolveOutgoingGroupingSelector( Set<Scope> incomingScopes, Fields argumentSelector, Fields declared ) 286 { 287 try 288 { 289 return resolveOutgoingSelector( incomingScopes, argumentSelector, declared ); 290 } 291 catch( Exception exception ) 292 { 293 if( exception instanceof OperatorException ) 294 throw (OperatorException) exception; 295 296 if( isBuffer() ) 297 throw new OperatorException( this, "could not resolve outgoing values selector in: " + this, exception ); 298 else 299 throw new OperatorException( this, "could not resolve outgoing grouping selector in: " + this, exception ); 300 } 301 } 302 }