001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.Set;
025    
026    import cascading.flow.planner.Scope;
027    import cascading.operation.Assertion;
028    import cascading.operation.AssertionLevel;
029    import cascading.operation.Debug;
030    import cascading.operation.DebugLevel;
031    import cascading.operation.Filter;
032    import cascading.operation.Function;
033    import cascading.operation.ValueAssertion;
034    import cascading.tuple.Fields;
035    import cascading.tuple.Tuple;
036    
037    /**
038     * The Each operator applies either a {@link Function} or a {@link Filter} to each entry in the {@link Tuple}
039     * stream. Any number of Each operators can follow an Each, {@link Splice}, or {@link Every}
040     * operator.
041     */
042    public class Each extends Operator
043      {
044      /** Field FUNCTION_SELECTOR */
045      private static final Fields FUNCTION_SELECTOR = Fields.RESULTS;
046      /** Field FILTER_SELECTOR */
047      private static final Fields FILTER_SELECTOR = Fields.RESULTS;
048    
049      ///////////////////
050      // TAKE FUNCTIONS
051      ///////////////////
052    
053      /**
054       * Pass all fields to the given function, only return fields declared by the function.
055       *
056       * @param name     name for this branch of Pipes
057       * @param function Function to be applied to each input Tuple
058       */
059      @ConstructorProperties({"name", "function"})
060      public Each( String name, Function function )
061        {
062        super( name, function, FUNCTION_SELECTOR );
063        }
064    
065      /**
066       * Only pass argumentFields to the given function, only return fields declared by the function.
067       *
068       * @param name             name for this branch of Pipes
069       * @param argumentSelector field selector that selects Function arguments from the input Tuple
070       * @param function         Function to be applied to each input Tuple
071       */
072      @ConstructorProperties({"name", "argumentSelector", "function"})
073      public Each( String name, Fields argumentSelector, Function function )
074        {
075        super( name, argumentSelector, function, FUNCTION_SELECTOR );
076        }
077    
078      /**
079       * Only pass argumentFields to the given function, only return fields selected by the outputSelector.
080       *
081       * @param name             name for this branch of Pipes
082       * @param argumentSelector field selector that selects Function arguments from the input Tuple
083       * @param function         Function to be applied to each input Tuple
084       * @param outputSelector   field selector that selects the output Tuple from the input and Function results Tuples
085       */
086      @ConstructorProperties({"name", "argumentSelector", "function", "outputSelector"})
087      public Each( String name, Fields argumentSelector, Function function, Fields outputSelector )
088        {
089        super( name, argumentSelector, function, outputSelector );
090        }
091    
092      /**
093       * Only return fields selected by the outputSelector.
094       *
095       * @param name           name for this branch of Pipes
096       * @param function       Function to be applied to each input Tuple
097       * @param outputSelector field selector that selects the output Tuple from the input and Function results Tuples
098       */
099      @ConstructorProperties({"name", "function", "outputSelector"})
100      public Each( String name, Function function, Fields outputSelector )
101        {
102        super( name, function, outputSelector );
103        }
104    
105      /**
106       * Pass all fields to the given function, only return fields declared by the function.
107       *
108       * @param previous previous Pipe to receive input Tuples from
109       * @param function Function to be applied to each input Tuple
110       */
111      @ConstructorProperties({"previous", "function"})
112      public Each( Pipe previous, Function function )
113        {
114        super( previous, function, FUNCTION_SELECTOR );
115        }
116    
117      /**
118       * Only pass argumentFields to the given function, only return fields declared by the function.
119       *
120       * @param previous         previous Pipe to receive input Tuples from
121       * @param argumentSelector field selector that selects Function arguments from the input Tuple
122       * @param function         Function to be applied to each input Tuple
123       */
124      @ConstructorProperties({"previous", "argumentSelector", "function"})
125      public Each( Pipe previous, Fields argumentSelector, Function function )
126        {
127        super( previous, argumentSelector, function, FUNCTION_SELECTOR );
128        }
129    
130      /**
131       * Only pass argumentFields to the given function, only return fields selected by the outputSelector.
132       *
133       * @param previous         previous Pipe to receive input Tuples from
134       * @param argumentSelector field selector that selects Function arguments from the input Tuple
135       * @param function         Function to be applied to each input Tuple
136       * @param outputSelector   field selector that selects the output Tuple from the input and Function results Tuples
137       */
138      @ConstructorProperties({"previous", "argumentSelector", "function", "outputSelector"})
139      public Each( Pipe previous, Fields argumentSelector, Function function, Fields outputSelector )
140        {
141        super( previous, argumentSelector, function, outputSelector );
142        }
143    
144      /**
145       * Only pass argumentFields to the given function, only return fields selected by the outputSelector.
146       *
147       * @param previous       previous Pipe to receive input Tuples from
148       * @param function       Function to be applied to each input Tuple
149       * @param outputSelector field selector that selects the output Tuple from the input and Function results Tuples
150       */
151      @ConstructorProperties({"previous", "function", "outputSelector"})
152      public Each( Pipe previous, Function function, Fields outputSelector )
153        {
154        super( previous, function, outputSelector );
155        }
156    
157      /////////////////
158      // TAKE FILTERS
159      /////////////////
160    
161      /**
162       * Constructor Each creates a new Each instance.
163       *
164       * @param name   name for this branch of Pipes
165       * @param filter Filter to be applied to each input Tuple
166       */
167      @ConstructorProperties({"name", "filter"})
168      public Each( String name, Filter filter )
169        {
170        super( name, filter, FILTER_SELECTOR );
171        }
172    
173      /**
174       * Constructor Each creates a new Each instance.
175       *
176       * @param name             name for this branch of Pipes
177       * @param argumentSelector field selector that selects Function arguments from the input Tuple
178       * @param filter           Filter to be applied to each input Tuple
179       */
180      @ConstructorProperties({"name", "argumentSelector", "filter"})
181      public Each( String name, Fields argumentSelector, Filter filter )
182        {
183        super( name, argumentSelector, filter, FILTER_SELECTOR );
184        }
185    
186      /**
187       * Constructor Each creates a new Each instance.
188       *
189       * @param previous previous Pipe to receive input Tuples from
190       * @param filter   Filter to be applied to each input Tuple
191       */
192      @ConstructorProperties({"previous", "filter"})
193      public Each( Pipe previous, Filter filter )
194        {
195        super( previous, filter, FILTER_SELECTOR );
196        }
197    
198      /**
199       * Constructor Each creates a new Each instance.
200       *
201       * @param previous         previous Pipe to receive input Tuples from
202       * @param argumentSelector field selector that selects Function arguments from the input Tuple
203       * @param filter           Filter to be applied to each input Tuple
204       */
205      @ConstructorProperties({"previous", "argumentSelector", "filter"})
206      public Each( Pipe previous, Fields argumentSelector, Filter filter )
207        {
208        super( previous, argumentSelector, filter, FILTER_SELECTOR );
209        }
210    
211      ///////////////
212      // ASSERTIONS
213      ///////////////
214    
215      /**
216       * Constructor Each creates a new Each instance.
217       *
218       * @param name           name for this branch of Pipes
219       * @param assertionLevel AssertionLevel to associate with the Assertion
220       * @param assertion      Assertion to be applied to each input Tuple
221       */
222      @ConstructorProperties({"name", "assertionLevel", "assertion"})
223      public Each( String name, AssertionLevel assertionLevel, Assertion assertion )
224        {
225        super( name, assertionLevel, assertion, FILTER_SELECTOR );
226        }
227    
228      /**
229       * @param name             name for this branch of Pipes
230       * @param argumentSelector field selector that selects Function arguments from the input Tuple
231       * @param assertionLevel   AssertionLevel to associate with the Assertion
232       * @param assertion        Assertion to be applied to each input Tuple
233       */
234      @ConstructorProperties({"name", "argumentSelector", "assertionLevel", "assertion"})
235      public Each( String name, Fields argumentSelector, AssertionLevel assertionLevel, Assertion assertion )
236        {
237        super( name, argumentSelector, assertionLevel, assertion, FILTER_SELECTOR );
238        }
239    
240      /**
241       * @param previous       previous Pipe to receive input Tuples from
242       * @param assertionLevel AssertionLevel to associate with the Assertion
243       * @param assertion      Assertion to be applied to each input Tuple
244       */
245      @ConstructorProperties({"previous", "assertionLevel", "assertion"})
246      public Each( Pipe previous, AssertionLevel assertionLevel, Assertion assertion )
247        {
248        super( previous, assertionLevel, assertion, FILTER_SELECTOR );
249        }
250    
251      /**
252       * @param previous         previous Pipe to receive input Tuples from
253       * @param argumentSelector field selector that selects Function arguments from the input Tuple
254       * @param assertionLevel   AssertionLevel to associate with the Assertion
255       * @param assertion        Assertion to be applied to each input Tuple
256       */
257      @ConstructorProperties({"previous", "argumentSelector", "assertionLevel", "assertion"})
258      public Each( Pipe previous, Fields argumentSelector, AssertionLevel assertionLevel, Assertion assertion )
259        {
260        super( previous, argumentSelector, assertionLevel, assertion, FILTER_SELECTOR );
261        }
262    
263      //////////
264      //DEBUG
265      //////////
266    
267      /**
268       * @param name             name for this branch of Pipes
269       * @param argumentSelector field selector that selects Function arguments from the input Tuple
270       * @param debugLevel       DebugLevel to associate with the Debug
271       * @param debug            Debug to be applied to each input Tuple
272       */
273      @ConstructorProperties({"name", "argumentSelector", "debugLevel", "debug"})
274      public Each( String name, Fields argumentSelector, DebugLevel debugLevel, Debug debug )
275        {
276        super( name, argumentSelector, debugLevel, debug, FILTER_SELECTOR );
277        }
278    
279      /**
280       * @param previous   previous Pipe to receive input Tuples from
281       * @param debugLevel DebugLevel to associate with the Debug
282       * @param debug      Debug to be applied to each input Tuple
283       */
284      @ConstructorProperties({"previous", "debugLevel", "debug"})
285      public Each( Pipe previous, DebugLevel debugLevel, Debug debug )
286        {
287        super( previous, debugLevel, debug, FILTER_SELECTOR );
288        }
289    
290      /**
291       * @param previous         previous Pipe to receive input Tuples from
292       * @param argumentSelector field selector that selects Function arguments from the input Tuple
293       * @param debugLevel       DebugLevel to associate with the Debug
294       * @param debug            Debug to be applied to each input Tuple
295       */
296      @ConstructorProperties({"previous", "argumentSelector", "debugLevel", "debug"})
297      public Each( Pipe previous, Fields argumentSelector, DebugLevel debugLevel, Debug debug )
298        {
299        super( previous, argumentSelector, debugLevel, debug, FILTER_SELECTOR );
300        }
301    
302      @Override
303      protected void verifyOperation()
304        {
305        // backwards compatibility with 1.0
306        if( plannerLevel == null && operation instanceof Debug )
307          plannerLevel = DebugLevel.DEFAULT;
308    
309        super.verifyOperation();
310    
311        if( !argumentSelector.isArgSelector() )
312          throw new IllegalArgumentException( "invalid argument selector: " + argumentSelector );
313    
314        if( !operation.getFieldDeclaration().isDeclarator() )
315          throw new IllegalArgumentException( "invalid field declaration: " + operation.getFieldDeclaration() );
316    
317        if( !outputSelector.isOutSelector() )
318          throw new IllegalArgumentException( "invalid output selector: " + outputSelector );
319        }
320    
321      public Function getFunction()
322        {
323        return (Function) operation;
324        }
325    
326      public Filter getFilter()
327        {
328        return (Filter) operation;
329        }
330    
331      public ValueAssertion getValueAssertion()
332        {
333        return (ValueAssertion) operation;
334        }
335    
336      public boolean isFunction()
337        {
338        return operation instanceof Function;
339        }
340    
341      public boolean isFilter()
342        {
343        return operation instanceof Filter;
344        }
345    
346      public boolean isValueAssertion()
347        {
348        return operation instanceof ValueAssertion;
349        }
350    
351      // FIELDS
352    
353      @Override
354      public Fields resolveIncomingOperationArgumentFields( Scope incomingScope )
355        {
356        return incomingScope.getIncomingFunctionArgumentFields();
357        }
358    
359      @Override
360      public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
361        {
362        return incomingScope.getIncomingFunctionPassThroughFields();
363        }
364    
365      @Override
366      public Scope outgoingScopeFor( Set<Scope> incomingScopes )
367        {
368        Fields argumentFields = resolveArgumentSelector( incomingScopes );
369    
370        verifyArguments( argumentFields );
371    
372        Fields declaredFields = resolveDeclared( incomingScopes, argumentFields );
373    
374        verifyDeclaredFields( declaredFields );
375    
376        Fields outgoingValuesFields = resolveOutgoingValuesSelector( incomingScopes, argumentFields, declaredFields );
377    
378        verifyOutputSelector( outgoingValuesFields );
379    
380        Fields outgoingGroupingFields = Fields.asDeclaration( outgoingValuesFields );
381    
382        // the incoming fields eligible to be outgoing
383        Fields passThroughFields = resolveIncomingOperationPassThroughFields( getFirst( incomingScopes ) );
384        Fields remainderFields = resolveRemainderFields( incomingScopes, argumentFields );
385    
386        return new Scope( getName(), Scope.Kind.EACH, passThroughFields, remainderFields, argumentFields, declaredFields, outgoingGroupingFields, outgoingValuesFields );
387        }
388    
389      Fields resolveOutgoingValuesSelector( Set<Scope> incomingScopes, Fields argumentFields, Fields declaredFields )
390        {
391        try
392          {
393          return resolveOutgoingSelector( incomingScopes, argumentFields, declaredFields );
394          }
395        catch( Exception exception )
396          {
397          if( exception instanceof OperatorException )
398            throw (OperatorException) exception;
399    
400          throw new OperatorException( this, "could not resolve outgoing values selector in: " + this, exception );
401          }
402        }
403      }