001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe;
022
023import java.util.Set;
024
025import cascading.flow.FlowElement;
026import cascading.flow.planner.Scope;
027import cascading.operation.BaseOperation;
028import cascading.operation.Operation;
029import cascading.operation.PlannedOperation;
030import cascading.operation.PlannerLevel;
031import cascading.tuple.Fields;
032import cascading.tuple.FieldsResolverException;
033import cascading.tuple.TupleException;
034
035/**
036 * An Operator is a type of {@link Pipe}. Operators pass specified arguments to a given {@link cascading.operation.BaseOperation}.
037 * </p>
038 * The argFields value select the input fields used by the operation. By default the whole input Tuple is passes as arguments.
039 * The outFields value select the fields in the result Tuple returned by this Pipe. By default, the operation results
040 * of the given operation replace the input Tuple.
041 */
042public abstract class Operator extends Pipe
043  {
044  /** Field operation */
045  protected final Operation operation;
046  /** Field argumentSelector */
047  protected Fields argumentSelector = Fields.ALL; // use wildcard. let the operation choose
048  /** Field outputSelector */
049  protected Fields outputSelector = Fields.RESULTS;  // this is overridden by the subclasses via the ctor
050  /** Field assertionLevel */
051  protected PlannerLevel plannerLevel; // do not initialize a default
052
053  protected Operator( Operation operation )
054    {
055    this.operation = operation;
056    verifyOperation();
057    }
058
059  protected Operator( String name, Operation operation )
060    {
061    super( name );
062    this.operation = operation;
063    verifyOperation();
064    }
065
066  protected Operator( String name, Operation operation, Fields outputSelector )
067    {
068    super( name );
069    this.operation = operation;
070    this.outputSelector = outputSelector;
071    verifyOperation();
072    }
073
074  protected Operator( String name, Fields argumentSelector, Operation operation )
075    {
076    super( name );
077    this.operation = operation;
078    this.argumentSelector = argumentSelector;
079    verifyOperation();
080    }
081
082  protected Operator( String name, Fields argumentSelector, Operation operation, Fields outputSelector )
083    {
084    super( name );
085    this.operation = operation;
086    this.argumentSelector = argumentSelector;
087    this.outputSelector = outputSelector;
088    verifyOperation();
089    }
090
091  protected Operator( Pipe previous, Operation operation )
092    {
093    super( previous );
094    this.operation = operation;
095    verifyOperation();
096    }
097
098  protected Operator( Pipe previous, Fields argumentSelector, Operation operation )
099    {
100    super( previous );
101    this.operation = operation;
102    this.argumentSelector = argumentSelector;
103    verifyOperation();
104    }
105
106  protected Operator( Pipe previous, Fields argumentSelector, Operation operation, Fields outputSelector )
107    {
108    super( previous );
109    this.operation = operation;
110    this.argumentSelector = argumentSelector;
111    this.outputSelector = outputSelector;
112    verifyOperation();
113    }
114
115  protected Operator( Pipe previous, Operation operation, Fields outputSelector )
116    {
117    super( previous );
118    this.operation = operation;
119    this.outputSelector = outputSelector;
120    verifyOperation();
121    }
122
123  protected Operator( String name, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector )
124    {
125    super( name );
126    this.plannerLevel = plannerLevel;
127    this.operation = operation;
128    this.outputSelector = outputSelector;
129    verifyOperation();
130    }
131
132  protected Operator( String name, Fields argumentSelector, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector )
133    {
134    super( name );
135    this.plannerLevel = plannerLevel;
136    this.operation = operation;
137    this.argumentSelector = argumentSelector;
138    this.outputSelector = outputSelector;
139    verifyOperation();
140    }
141
142  protected Operator( Pipe previous, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector )
143    {
144    super( previous );
145    this.plannerLevel = plannerLevel;
146    this.operation = operation;
147    this.outputSelector = outputSelector;
148    verifyOperation();
149    }
150
151  protected Operator( Pipe previous, Fields argumentSelector, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector )
152    {
153    super( previous );
154    this.plannerLevel = plannerLevel;
155    this.operation = operation;
156    this.argumentSelector = argumentSelector;
157    this.outputSelector = outputSelector;
158    verifyOperation();
159    }
160
161  protected void verifyOperation()
162    {
163    if( operation == null )
164      throw new IllegalArgumentException( "operation may not be null" );
165
166    if( argumentSelector == null )
167      throw new IllegalArgumentException( "argumentSelector may not be null" );
168
169    if( outputSelector == null )
170      throw new IllegalArgumentException( "outputSelector may not be null" );
171
172    if( operation instanceof PlannedOperation )
173      {
174      if( plannerLevel == null )
175        throw new IllegalArgumentException( "planner level may not be null" );
176      else if( plannerLevel.isNoneLevel() )
177        throw new IllegalArgumentException( "given planner level: " + plannerLevel.getClass().getName() + ", may not be NONE" );
178      }
179    }
180
181  /**
182   * Method getOperation returns the operation managed by this Operator object.
183   *
184   * @return the operation (type Operation) of this Operator object.
185   */
186  public Operation getOperation()
187    {
188    return operation;
189    }
190
191  /**
192   * Method getArgumentSelector returns the argumentSelector of this Operator object.
193   *
194   * @return the argumentSelector (type Fields) of this Operator object.
195   */
196  public Fields getArgumentSelector()
197    {
198    return argumentSelector;
199    }
200
201  /**
202   * Method getFieldDeclaration returns the fieldDeclaration of this Operator object.
203   *
204   * @return the fieldDeclaration (type Fields) of this Operator object.
205   */
206  public Fields getFieldDeclaration()
207    {
208    return operation.getFieldDeclaration();
209    }
210
211  /**
212   * Method getOutputSelector returns the outputSelector of this Operator object.
213   *
214   * @return the outputSelector (type Fields) of this Operator object.
215   */
216  public Fields getOutputSelector()
217    {
218    return outputSelector;
219    }
220
221  /**
222   * Method getPlannerLevel returns the plannerLevel of this Operator object.
223   *
224   * @return the plannerLevel (type PlannerLevel) of this Operator object.
225   */
226  public PlannerLevel getPlannerLevel()
227    {
228    return plannerLevel;
229    }
230
231  /**
232   * Method hasPlannerLevel returns true if this Operator object holds a {@link PlannedOperation} object with an associated
233   * {@link PlannerLevel} level.
234   *
235   * @return boolean
236   */
237  public boolean hasPlannerLevel()
238    {
239    return plannerLevel != null;
240    }
241
242  // FIELDS
243
244  protected Fields resolveRemainderFields( Set<Scope> incomingScopes, Fields argumentFields )
245    {
246    Fields fields = resolveIncomingOperationArgumentFields( getFirst( incomingScopes ) );
247
248    if( fields.isUnknown() )
249      return fields;
250
251    return fields.subtract( argumentFields );
252    }
253
254  public abstract Scope outgoingScopeFor( Set<Scope> incomingScopes );
255
256  void verifyDeclaredFields( Fields declared )
257    {
258    if( declared.isDefined() && declared.size() == 0 )
259      throw new OperatorException( this, "field declaration: " + getFieldDeclaration().printVerbose() + ", resolves to an empty field set, current grouping is on all fields" );
260    }
261
262  void verifyOutputSelector( Fields outputSelector )
263    {
264    if( outputSelector.isDefined() && outputSelector.size() == 0 )
265      throw new OperatorException( this, "output selector: " + getOutputSelector().printVerbose() + ", resolves to an empty field set, current grouping is on all fields" );
266    }
267
268  void verifyArguments( Fields argumentSelector )
269    {
270    if( argumentSelector.isUnknown() )
271      return;
272
273    if( operation.getNumArgs() != Operation.ANY && argumentSelector.size() < operation.getNumArgs() )
274      throw new OperatorException( this, "resolved wrong number of arguments: " + argumentSelector.printVerbose() + ", expected: " + operation.getNumArgs() );
275    }
276
277  Fields resolveOutgoingSelector( Set<Scope> incomingScopes, Fields argumentFields, Fields declaredFields )
278    {
279    Scope incomingScope = getFirst( incomingScopes );
280    Fields outputSelector = getOutputSelector();
281
282    if( outputSelector.isResults() )
283      return declaredFields;
284
285    if( outputSelector.isArguments() )
286      return argumentFields;
287
288    if( outputSelector.isGroup() )
289      return incomingScope.getOutGroupingFields();
290
291    if( outputSelector.isValues() )
292      return incomingScope.getOutGroupingValueFields();
293
294    Fields incomingFields = resolveIncomingOperationPassThroughFields( incomingScope );
295
296    // not part of resolve as we need the argumentFields
297    if( outputSelector.isSwap() )
298      return Fields.asDeclaration( incomingFields.subtract( argumentFields ) ).append( declaredFields );
299
300    try
301      {
302      return Fields.resolve( outputSelector, Fields.asDeclaration( incomingFields ), declaredFields );
303      }
304    catch( TupleException exception )
305      {
306      throw new OperatorException( this, incomingFields, declaredFields, outputSelector, exception );
307      }
308    }
309
310  Fields resolveArgumentSelector( Set<Scope> incomingScopes )
311    {
312    Fields argumentSelector = getArgumentSelector();
313
314    try
315      {
316      Scope incomingScope = getFirst( incomingScopes );
317
318      if( argumentSelector.isAll() )
319        return resolveIncomingOperationArgumentFields( incomingScope );
320
321      if( argumentSelector.isGroup() )
322        return incomingScope.getOutGroupingFields();
323
324      if( argumentSelector.isValues() )
325        return incomingScope.getOutGroupingValueFields();
326
327      return resolveIncomingOperationArgumentFields( incomingScope ).select( argumentSelector );
328      }
329    catch( FieldsResolverException exception )
330      {
331      throw new OperatorException( this, OperatorException.Kind.argument, exception.getSourceFields(), argumentSelector, exception );
332      }
333    catch( Exception exception )
334      {
335      throw new OperatorException( this, "unable to resolve argument selector: " + argumentSelector.printVerbose(), exception );
336      }
337    }
338
339  Fields resolveDeclared( Set<Scope> incomingScopes, Fields arguments )
340    {
341    Fields fieldDeclaration = getFieldDeclaration();
342
343    if( getOutputSelector().isReplace() )
344      {
345      if( arguments.isDefined() && fieldDeclaration.isDefined() && arguments.size() != fieldDeclaration.size() )
346        throw new OperatorException( this, "during REPLACE both the arguments selector and field declaration must be the same size, arguments: " + arguments.printVerbose() + " declaration: " + fieldDeclaration.printVerbose() );
347
348      if( fieldDeclaration.isArguments() ) // there is no type info, so inherit it
349        return arguments;
350
351      return arguments.project( fieldDeclaration );
352      }
353
354    try
355      {
356      Scope incomingScope = getFirst( incomingScopes );
357
358      if( fieldDeclaration.isUnknown() )
359        return fieldDeclaration;
360
361      if( fieldDeclaration.isArguments() )
362        return Fields.asDeclaration( arguments );
363
364      if( fieldDeclaration.isAll() )
365        return resolveIncomingOperationPassThroughFields( incomingScope );
366
367      if( fieldDeclaration.isGroup() )
368        return incomingScope.getOutGroupingFields();
369
370      // VALUES is the diff between all fields and group fields
371      if( fieldDeclaration.isValues() )
372        return incomingScope.getOutGroupingValueFields();
373
374      }
375    catch( Exception exception )
376      {
377      throw new OperatorException( this, "could not resolve declared fields in:  " + this, exception );
378      }
379
380    return fieldDeclaration;
381    }
382
383  // OBJECT OVERRIDES
384
385  @Override
386  public String toString()
387    {
388    return super.toString() + "[" + operation + "]";
389    }
390
391  @Override
392  protected void printInternal( StringBuffer buffer, Scope scope )
393    {
394    super.printInternal( buffer, scope );
395    buffer.append( "[" );
396    BaseOperation.printOperationInternal( operation, buffer, scope );
397    buffer.append( "]" );
398    }
399
400  @Override
401  public boolean isEquivalentTo( FlowElement element )
402    {
403    boolean equivalentTo = super.isEquivalentTo( element );
404
405    if( !equivalentTo )
406      return false;
407
408    Operator operator = (Operator) element;
409
410    equivalentTo = argumentSelector.equals( operator.argumentSelector );
411
412    if( !equivalentTo )
413      return false;
414
415    if( !operation.equals( operator.operation ) )
416      return false;
417
418    equivalentTo = outputSelector.equals( operator.outputSelector );
419
420    if( !equivalentTo )
421      return false;
422
423    return true;
424    }
425
426  @SuppressWarnings({"RedundantIfStatement"})
427  public boolean equals( Object object )
428    {
429    if( this == object )
430      return true;
431    if( object == null || getClass() != object.getClass() )
432      return false;
433    if( !super.equals( object ) )
434      return false;
435
436    Operator operator = (Operator) object;
437
438    if( argumentSelector != null ? !argumentSelector.equals( operator.argumentSelector ) : operator.argumentSelector != null )
439      return false;
440    if( operation != null ? !operation.equals( operator.operation ) : operator.operation != null )
441      return false;
442    if( outputSelector != null ? !outputSelector.equals( operator.outputSelector ) : operator.outputSelector != null )
443      return false;
444
445    return true;
446    }
447
448  @Override
449  public int hashCode()
450    {
451    int result = super.hashCode();
452    result = 31 * result + ( operation != null ? operation.hashCode() : 0 );
453    result = 31 * result + ( argumentSelector != null ? argumentSelector.hashCode() : 0 );
454    result = 31 * result + ( outputSelector != null ? outputSelector.hashCode() : 0 );
455    return result;
456    }
457  }