001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Set;
025    
026    import cascading.flow.planner.Scope;
027    import cascading.operation.Aggregator;
028    import cascading.operation.AssertionLevel;
029    import cascading.operation.Buffer;
030    import cascading.operation.GroupAssertion;
031    import cascading.tuple.Fields;
032    
033    /**
034     * The Every operator applies an {@link Aggregator} or {@link Buffer} to every grouping.
035     * <p/>
036     * Any number of Every instances may follow other Every, {@link GroupBy}, or {@link CoGroup} instances if they apply an
037     * Aggregator, not a Buffer. If a Buffer, only one Every may follow a GroupBy or CoGroup.
038     * <p/>
039     * Every operators create aggregate values for every grouping they encounter. This aggregate value is added to the current
040     * grouping Tuple.
041     * <p/>
042     * In the case of a CoGroup, the grouping Tuple will be all the grouping keys from all joined streams,
043     * and if an "outer" type join is used, one value on the groupingTuple may be null.
044     * <p/>
045     * Subsequent Every instances can continue to append values to the grouping Tuple. When an Each follows
046     * and Every, the Each applies its operation to the grouping Tuple (thus all child values in the grouping are discarded
047     * and only aggregate values are propagated).
048     */
049    public class Every extends Operator
050      {
051      /** Field AGGREGATOR_ARGUMENTS */
052      private static final Fields AGGREGATOR_ARGUMENTS = Fields.ALL;
053      /** Field AGGREGATOR_SELECTOR */
054      private static final Fields AGGREGATOR_SELECTOR = Fields.ALL;
055      /** Field ASSERTION_SELECTOR */
056      private static final Fields ASSERTION_SELECTOR = Fields.RESULTS;
057    
058      /**
059       * Constructor Every creates a new Every instance.
060       *
061       * @param previous   previous Pipe to receive input Tuples from
062       * @param aggregator Aggregator to be applied to every input Tuple grouping
063       */
064      @ConstructorProperties({"previous", "aggregator"})
065      public Every( Pipe previous, Aggregator aggregator )
066        {
067        super( previous, AGGREGATOR_ARGUMENTS, aggregator, AGGREGATOR_SELECTOR );
068        }
069    
070      /**
071       * Constructor Every creates a new Every instance.
072       *
073       * @param previous         previous Pipe to receive input Tuples from
074       * @param argumentSelector field selector that selects Function arguments from the input Tuple
075       * @param aggregator       Aggregator to be applied to every input Tuple grouping
076       */
077      @ConstructorProperties({"previous", "argumentSelector", "aggregator"})
078      public Every( Pipe previous, Fields argumentSelector, Aggregator aggregator )
079        {
080        super( previous, argumentSelector, aggregator, AGGREGATOR_SELECTOR );
081        }
082    
083      /**
084       * Constructor Every creates a new Every instance.
085       *
086       * @param previous         previous Pipe to receive input Tuples from
087       * @param argumentSelector field selector that selects Function arguments from the input Tuple
088       * @param aggregator       Aggregator to be applied to every input Tuple grouping
089       * @param outputSelector   field selector that selects the output Tuple from the grouping and Aggregator results Tuples
090       */
091      @ConstructorProperties({"previous", "argumentSelector", "aggregator", "outputSelector"})
092      public Every( Pipe previous, Fields argumentSelector, Aggregator aggregator, Fields outputSelector )
093        {
094        super( previous, argumentSelector, aggregator, outputSelector );
095        }
096    
097      /**
098       * Constructor Every creates a new Every instance.
099       *
100       * @param previous       previous Pipe to receive input Tuples from
101       * @param aggregator     Aggregator to be applied to every input Tuple grouping
102       * @param outputSelector field selector that selects the output Tuple from the grouping and Aggregator results Tuples
103       */
104      @ConstructorProperties({"previous", "aggregator", "outputSelector"})
105      public Every( Pipe previous, Aggregator aggregator, Fields outputSelector )
106        {
107        super( previous, AGGREGATOR_ARGUMENTS, aggregator, outputSelector );
108        }
109    
110      /**
111       * Constructor Every creates a new Every instance.
112       *
113       * @param previous previous Pipe to receive input Tuples from
114       * @param buffer   Buffer to be applied to every input Tuple grouping
115       */
116      @ConstructorProperties({"previous", "buffer"})
117      public Every( Pipe previous, Buffer buffer )
118        {
119        super( previous, AGGREGATOR_ARGUMENTS, buffer, AGGREGATOR_SELECTOR );
120        }
121    
122      /**
123       * Constructor Every creates a new Every instance.
124       *
125       * @param previous         previous Pipe to receive input Tuples from
126       * @param argumentSelector field selector that selects Function arguments from the input Tuple
127       * @param buffer           Buffer to be applied to every input Tuple grouping
128       */
129      @ConstructorProperties({"previous", "argumentSelector", "buffer"})
130      public Every( Pipe previous, Fields argumentSelector, Buffer buffer )
131        {
132        super( previous, argumentSelector, buffer, AGGREGATOR_SELECTOR );
133        }
134    
135      /**
136       * Constructor Every creates a new Every instance.
137       *
138       * @param previous         previous Pipe to receive input Tuples from
139       * @param argumentSelector field selector that selects Function arguments from the input Tuple
140       * @param buffer           Buffer to be applied to every input Tuple grouping
141       * @param outputSelector   field selector that selects the output Tuple from the grouping and Buffer results Tuples
142       */
143      @ConstructorProperties({"previous", "argumentSelector", "buffer", "outputSelector"})
144      public Every( Pipe previous, Fields argumentSelector, Buffer buffer, Fields outputSelector )
145        {
146        super( previous, argumentSelector, buffer, outputSelector );
147        }
148    
149      /**
150       * Constructor Every creates a new Every instance.
151       *
152       * @param previous       previous Pipe to receive input Tuples from
153       * @param buffer         Buffer to be applied to every input Tuple grouping
154       * @param outputSelector field selector that selects the output Tuple from the grouping and Buffer results Tuples
155       */
156      @ConstructorProperties({"previous", "buffer", "outputSelector"})
157      public Every( Pipe previous, Buffer buffer, Fields outputSelector )
158        {
159        super( previous, AGGREGATOR_ARGUMENTS, buffer, outputSelector );
160        }
161    
162      /**
163       * Constructor Every creates a new Every instance.
164       *
165       * @param previous       previous Pipe to receive input Tuples from
166       * @param assertionLevel of type AssertionLevel
167       * @param assertion      GroupAssertion to be applied to every input Tuple grouping
168       */
169      @ConstructorProperties({"previous", "assertionLevel", "assertion"})
170      public Every( Pipe previous, AssertionLevel assertionLevel, GroupAssertion assertion )
171        {
172        super( previous, AGGREGATOR_ARGUMENTS, assertionLevel, assertion, ASSERTION_SELECTOR );
173        }
174    
175      /**
176       * Constructor Every creates a new Every instance.
177       *
178       * @param previous         previous Pipe to receive input Tuples from
179       * @param argumentSelector field selector that selects Function arguments from the input Tuple
180       * @param assertionLevel   AssertionLevel to associate with the Assertion
181       * @param assertion        GroupAssertion to be applied to every input Tuple grouping
182       */
183      @ConstructorProperties({"previous", "argumentSelector", "assertionLevel", "assertion"})
184      public Every( Pipe previous, Fields argumentSelector, AssertionLevel assertionLevel, GroupAssertion assertion )
185        {
186        super( previous, argumentSelector, assertionLevel, assertion, ASSERTION_SELECTOR );
187        }
188    
189      /**
190       * Method isBuffer returns true if this Every instance holds a {@link cascading.operation.Buffer} operation.
191       *
192       * @return boolean
193       */
194      public boolean isBuffer()
195        {
196        return operation instanceof Buffer;
197        }
198    
199      /**
200       * Method isReducer returns true if this Every instance holds a {@link Aggregator} operation.
201       *
202       * @return boolean
203       */
204      public boolean isAggregator()
205        {
206        return operation instanceof Aggregator;
207        }
208    
209      public boolean isGroupAssertion()
210        {
211        return operation instanceof GroupAssertion;
212        }
213    
214      public Aggregator getAggregator()
215        {
216        return (Aggregator) operation;
217        }
218    
219      public Buffer getBuffer()
220        {
221        return (Buffer) operation;
222        }
223    
224      public GroupAssertion getGroupAssertion()
225        {
226        return (GroupAssertion) operation;
227        }
228    
229      @Override
230      public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
231        {
232        if( isBuffer() )
233          return incomingScope.getIncomingBufferArgumentFields();
234        else
235          return incomingScope.getIncomingAggregatorArgumentFields();
236        }
237    
238      @Override
239      public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
240        {
241        if( isBuffer() )
242          return incomingScope.getIncomingBufferPassThroughFields();
243        else
244          return incomingScope.getIncomingAggregatorPassThroughFields();
245        }
246    
247      @Override
248      public Scope outgoingScopeFor( Set<Scope> incomingScopes )
249        {
250        Scope incomingScope = getFirst( incomingScopes );
251    
252        if( !isBuffer() && incomingScope.getOutValuesFields().isNone() )
253          throw new OperatorException( this, "only a Buffer may be preceded by a CoGroup declaring Fields.NONE as the join fields" );
254    
255        Fields argumentFields = resolveArgumentSelector( incomingScopes );
256    
257        verifyArguments( argumentFields );
258    
259        // we currently don't support using result from a previous Every in the current Every
260        verifyAggregatorArguments( argumentFields, incomingScope );
261    
262        Fields declaredFields = resolveDeclared( incomingScopes, argumentFields );
263    
264        verifyDeclaredFields( declaredFields );
265    
266        Fields outgoingGroupingFields = resolveOutgoingGroupingSelector( incomingScopes, argumentFields, declaredFields );
267    
268        verifyOutputSelector( outgoingGroupingFields );
269    
270        Fields outgoingValuesFields = incomingScope.getOutValuesFields();
271    
272        // the incoming fields eligible to be outgoing, for Every only the grouping fields.
273        Fields passThroughFields = resolveIncomingOperationPassThroughFields( incomingScope );
274        Fields remainderFields = resolveRemainderFields( incomingScopes, argumentFields );
275    
276        return new Scope( getName(), Scope.Kind.EVERY, passThroughFields, remainderFields, argumentFields, declaredFields, outgoingGroupingFields, outgoingValuesFields );
277        }
278    
279      private void verifyAggregatorArguments( Fields argumentFields, Scope incomingScope )
280        {
281        if( ( !isBuffer() ) && incomingScope.isEvery() && argumentFields.contains( incomingScope.getOperationDeclaredFields() ) )
282          throw new OperatorException( this, "arguments may not select a declared field from a previous Every" );
283        }
284    
285      Fields resolveOutgoingGroupingSelector( Set<Scope> incomingScopes, Fields argumentSelector, Fields declared )
286        {
287        try
288          {
289          return resolveOutgoingSelector( incomingScopes, argumentSelector, declared );
290          }
291        catch( Exception exception )
292          {
293          if( exception instanceof OperatorException )
294            throw (OperatorException) exception;
295    
296          if( isBuffer() )
297            throw new OperatorException( this, "could not resolve outgoing values selector in: " + this, exception );
298          else
299            throw new OperatorException( this, "could not resolve outgoing grouping selector in: " + this, exception );
300          }
301        }
302      }