001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe;
022
023import java.util.Set;
024
025import cascading.flow.planner.Scope;
026import cascading.operation.BaseOperation;
027import cascading.operation.Operation;
028import cascading.operation.PlannedOperation;
029import cascading.operation.PlannerLevel;
030import cascading.tuple.Fields;
031import cascading.tuple.FieldsResolverException;
032import cascading.tuple.TupleException;
033
034/**
035 * An Operator is a type of {@link Pipe}. Operators pass specified arguments to a given {@link cascading.operation.BaseOperation}.
036 * </p>
037 * The argFields value select the input fields used by the operation. By default the whole input Tuple is passes as arguments.
038 * The outFields value select the fields in the result Tuple returned by this Pipe. By default, the operation results
039 * of the given operation replace the input Tuple.
040 */
041public abstract class Operator extends Pipe
042  {
043  /** Field operation */
044  protected final Operation operation;
045  /** Field argumentSelector */
046  protected Fields argumentSelector = Fields.ALL; // use wildcard. let the operation choose
047  /** Field outputSelector */
048  protected Fields outputSelector = Fields.RESULTS;  // this is overridden by the subclasses via the ctor
049  /** Field assertionLevel */
050  protected PlannerLevel plannerLevel; // do not initialize a default
051
052  protected Operator( Operation operation )
053    {
054    this.operation = operation;
055    verifyOperation();
056    }
057
058  protected Operator( String name, Operation operation )
059    {
060    super( name );
061    this.operation = operation;
062    verifyOperation();
063    }
064
065  protected Operator( String name, Operation operation, Fields outputSelector )
066    {
067    super( name );
068    this.operation = operation;
069    this.outputSelector = outputSelector;
070    verifyOperation();
071    }
072
073  protected Operator( String name, Fields argumentSelector, Operation operation )
074    {
075    super( name );
076    this.operation = operation;
077    this.argumentSelector = argumentSelector;
078    verifyOperation();
079    }
080
081  protected Operator( String name, Fields argumentSelector, Operation operation, Fields outputSelector )
082    {
083    super( name );
084    this.operation = operation;
085    this.argumentSelector = argumentSelector;
086    this.outputSelector = outputSelector;
087    verifyOperation();
088    }
089
090  protected Operator( Pipe previous, Operation operation )
091    {
092    super( previous );
093    this.operation = operation;
094    verifyOperation();
095    }
096
097  protected Operator( Pipe previous, Fields argumentSelector, Operation operation )
098    {
099    super( previous );
100    this.operation = operation;
101    this.argumentSelector = argumentSelector;
102    verifyOperation();
103    }
104
105  protected Operator( Pipe previous, Fields argumentSelector, Operation operation, Fields outputSelector )
106    {
107    super( previous );
108    this.operation = operation;
109    this.argumentSelector = argumentSelector;
110    this.outputSelector = outputSelector;
111    verifyOperation();
112    }
113
114  protected Operator( Pipe previous, Operation operation, Fields outputSelector )
115    {
116    super( previous );
117    this.operation = operation;
118    this.outputSelector = outputSelector;
119    verifyOperation();
120    }
121
122  protected Operator( String name, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector )
123    {
124    super( name );
125    this.plannerLevel = plannerLevel;
126    this.operation = operation;
127    this.outputSelector = outputSelector;
128    verifyOperation();
129    }
130
131  protected Operator( String name, Fields argumentSelector, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector )
132    {
133    super( name );
134    this.plannerLevel = plannerLevel;
135    this.operation = operation;
136    this.argumentSelector = argumentSelector;
137    this.outputSelector = outputSelector;
138    verifyOperation();
139    }
140
141  protected Operator( Pipe previous, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector )
142    {
143    super( previous );
144    this.plannerLevel = plannerLevel;
145    this.operation = operation;
146    this.outputSelector = outputSelector;
147    verifyOperation();
148    }
149
150  protected Operator( Pipe previous, Fields argumentSelector, PlannerLevel plannerLevel, PlannedOperation operation, Fields outputSelector )
151    {
152    super( previous );
153    this.plannerLevel = plannerLevel;
154    this.operation = operation;
155    this.argumentSelector = argumentSelector;
156    this.outputSelector = outputSelector;
157    verifyOperation();
158    }
159
160  protected void verifyOperation()
161    {
162    if( operation == null )
163      throw new IllegalArgumentException( "operation may not be null" );
164
165    if( argumentSelector == null )
166      throw new IllegalArgumentException( "argumentSelector may not be null" );
167
168    if( outputSelector == null )
169      throw new IllegalArgumentException( "outputSelector may not be null" );
170
171    if( operation instanceof PlannedOperation )
172      {
173      if( plannerLevel == null )
174        throw new IllegalArgumentException( "planner level may not be null" );
175      else if( plannerLevel.isNoneLevel() )
176        throw new IllegalArgumentException( "given planner level: " + plannerLevel.getClass().getName() + ", may not be NONE" );
177      }
178    }
179
180  /**
181   * Method getOperation returns the operation managed by this Operator object.
182   *
183   * @return the operation (type Operation) of this Operator object.
184   */
185  public Operation getOperation()
186    {
187    return operation;
188    }
189
190  /**
191   * Method getArgumentSelector returns the argumentSelector of this Operator object.
192   *
193   * @return the argumentSelector (type Fields) of this Operator object.
194   */
195  public Fields getArgumentSelector()
196    {
197    return argumentSelector;
198    }
199
200  /**
201   * Method getFieldDeclaration returns the fieldDeclaration of this Operator object.
202   *
203   * @return the fieldDeclaration (type Fields) of this Operator object.
204   */
205  public Fields getFieldDeclaration()
206    {
207    return operation.getFieldDeclaration();
208    }
209
210  /**
211   * Method getOutputSelector returns the outputSelector of this Operator object.
212   *
213   * @return the outputSelector (type Fields) of this Operator object.
214   */
215  public Fields getOutputSelector()
216    {
217    return outputSelector;
218    }
219
220  /**
221   * Method getPlannerLevel returns the plannerLevel of this Operator object.
222   *
223   * @return the plannerLevel (type PlannerLevel) of this Operator object.
224   */
225  public PlannerLevel getPlannerLevel()
226    {
227    return plannerLevel;
228    }
229
230  /**
231   * Method hasPlannerLevel returns true if this Operator object holds a {@link PlannedOperation} object with an associated
232   * {@link PlannerLevel} level.
233   *
234   * @return boolean
235   */
236  public boolean hasPlannerLevel()
237    {
238    return plannerLevel != null;
239    }
240
241  // FIELDS
242
243  protected Fields resolveRemainderFields( Set<Scope> incomingScopes, Fields argumentFields )
244    {
245    Fields fields = resolveIncomingOperationArgumentFields( getFirst( incomingScopes ) );
246
247    if( fields.isUnknown() )
248      return fields;
249
250    return fields.subtract( argumentFields );
251    }
252
253  public abstract Scope outgoingScopeFor( Set<Scope> incomingScopes );
254
255  void verifyDeclaredFields( Fields declared )
256    {
257    if( declared.isDefined() && declared.size() == 0 )
258      throw new OperatorException( this, "field declaration: " + getFieldDeclaration().printVerbose() + ", resolves to an empty field set, current grouping is on all fields" );
259    }
260
261  void verifyOutputSelector( Fields outputSelector )
262    {
263    if( outputSelector.isDefined() && outputSelector.size() == 0 )
264      throw new OperatorException( this, "output selector: " + getOutputSelector().printVerbose() + ", resolves to an empty field set, current grouping is on all fields" );
265    }
266
267  void verifyArguments( Fields argumentSelector )
268    {
269    if( argumentSelector.isUnknown() )
270      return;
271
272    if( operation.getNumArgs() != Operation.ANY && argumentSelector.size() < operation.getNumArgs() )
273      throw new OperatorException( this, "resolved wrong number of arguments: " + argumentSelector.printVerbose() + ", expected: " + operation.getNumArgs() );
274    }
275
276  Fields resolveOutgoingSelector( Set<Scope> incomingScopes, Fields argumentFields, Fields declaredFields )
277    {
278    Scope incomingScope = getFirst( incomingScopes );
279    Fields outputSelector = getOutputSelector();
280
281    if( outputSelector.isResults() )
282      return declaredFields;
283
284    if( outputSelector.isArguments() )
285      return argumentFields;
286
287    if( outputSelector.isGroup() )
288      return incomingScope.getOutGroupingFields();
289
290    if( outputSelector.isValues() )
291      return incomingScope.getOutGroupingValueFields();
292
293    Fields incomingFields = resolveIncomingOperationPassThroughFields( incomingScope );
294
295    // not part of resolve as we need the argumentFields
296    if( outputSelector.isSwap() )
297      return Fields.asDeclaration( incomingFields.subtract( argumentFields ) ).append( declaredFields );
298
299    try
300      {
301      return Fields.resolve( outputSelector, Fields.asDeclaration( incomingFields ), declaredFields );
302      }
303    catch( TupleException exception )
304      {
305      throw new OperatorException( this, incomingFields, declaredFields, outputSelector, exception );
306      }
307    }
308
309  Fields resolveArgumentSelector( Set<Scope> incomingScopes )
310    {
311    Fields argumentSelector = getArgumentSelector();
312
313    try
314      {
315      Scope incomingScope = getFirst( incomingScopes );
316
317      if( argumentSelector.isAll() )
318        return resolveIncomingOperationArgumentFields( incomingScope );
319
320      if( argumentSelector.isGroup() )
321        return incomingScope.getOutGroupingFields();
322
323      if( argumentSelector.isValues() )
324        return incomingScope.getOutGroupingValueFields();
325
326      return resolveIncomingOperationArgumentFields( incomingScope ).select( argumentSelector );
327      }
328    catch( FieldsResolverException exception )
329      {
330      throw new OperatorException( this, OperatorException.Kind.argument, exception.getSourceFields(), argumentSelector, exception );
331      }
332    catch( Exception exception )
333      {
334      throw new OperatorException( this, "unable to resolve argument selector: " + argumentSelector.printVerbose(), exception );
335      }
336    }
337
338  Fields resolveDeclared( Set<Scope> incomingScopes, Fields arguments )
339    {
340    Fields fieldDeclaration = getFieldDeclaration();
341
342    if( getOutputSelector().isReplace() )
343      {
344      if( arguments.isDefined() && fieldDeclaration.isDefined() && arguments.size() != fieldDeclaration.size() )
345        throw new OperatorException( this, "during REPLACE both the arguments selector and field declaration must be the same size, arguments: " + arguments.printVerbose() + " declaration: " + fieldDeclaration.printVerbose() );
346
347      if( fieldDeclaration.isArguments() ) // there is no type info, so inherit it
348        return arguments;
349
350      return arguments.project( fieldDeclaration );
351      }
352
353    try
354      {
355      Scope incomingScope = getFirst( incomingScopes );
356
357      if( fieldDeclaration.isUnknown() )
358        return fieldDeclaration;
359
360      if( fieldDeclaration.isArguments() )
361        return Fields.asDeclaration( arguments );
362
363      if( fieldDeclaration.isAll() )
364        return resolveIncomingOperationPassThroughFields( incomingScope );
365
366      if( fieldDeclaration.isGroup() )
367        return incomingScope.getOutGroupingFields();
368
369      // VALUES is the diff between all fields and group fields
370      if( fieldDeclaration.isValues() )
371        return incomingScope.getOutGroupingValueFields();
372
373      }
374    catch( Exception exception )
375      {
376      throw new OperatorException( this, "could not resolve declared fields in:  " + this, exception );
377      }
378
379    return fieldDeclaration;
380    }
381
382  // OBJECT OVERRIDES
383
384  @Override
385  public String toString()
386    {
387    return super.toString() + "[" + operation + "]";
388    }
389
390  @Override
391  protected void printInternal( StringBuffer buffer, Scope scope )
392    {
393    super.printInternal( buffer, scope );
394    buffer.append( "[" );
395    BaseOperation.printOperationInternal( operation, buffer, scope );
396    buffer.append( "]" );
397    }
398
399  @SuppressWarnings({"RedundantIfStatement"})
400  public boolean equals( Object object )
401    {
402    if( this == object )
403      return true;
404    if( object == null || getClass() != object.getClass() )
405      return false;
406    if( !super.equals( object ) )
407      return false;
408
409    Operator operator = (Operator) object;
410
411    if( argumentSelector != null ? !argumentSelector.equals( operator.argumentSelector ) : operator.argumentSelector != null )
412      return false;
413    if( operation != null ? !operation.equals( operator.operation ) : operator.operation != null )
414      return false;
415    if( outputSelector != null ? !outputSelector.equals( operator.outputSelector ) : operator.outputSelector != null )
416      return false;
417
418    return true;
419    }
420
421  @Override
422  public int hashCode()
423    {
424    int result = super.hashCode();
425    result = 31 * result + ( operation != null ? operation.hashCode() : 0 );
426    result = 31 * result + ( argumentSelector != null ? argumentSelector.hashCode() : 0 );
427    result = 31 * result + ( outputSelector != null ? outputSelector.hashCode() : 0 );
428    return result;
429    }
430  }