001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner;
022
023import java.io.Serializable;
024import java.util.HashSet;
025import java.util.LinkedList;
026import java.util.Map;
027import java.util.Set;
028
029import cascading.tuple.Fields;
030
031import static cascading.tuple.Fields.asDeclaration;
032
033/** Class Scope is an internal representation of the linkages between operations. */
034public class Scope implements Serializable
035  {
036  /** Enum Kind */
037  static public enum Kind
038    {
039      TAP, EACH, EVERY, GROUPBY, COGROUP, HASHJOIN, MERGE
040    }
041
042  /** Field name */
043  private String name;
044  private LinkedList<String> priorNames = new LinkedList<>();
045
046  /** Field ordinal */
047  private Integer ordinal = 0; // do not copy
048  /** Field isStreamed is not accumulated. currently only relevant to HashJoin * */
049  private boolean isNonBlocking = true; // do not copy
050
051  // copied
052
053  /** Field kind */
054  private Kind kind;
055  /** Field incomingPassThroughFields */
056  private Fields incomingPassThroughFields;
057  /** Field remainderPassThroughFields */
058  private Fields remainderPassThroughFields;
059
060  /** Field argumentSelector */
061  private Fields operationArgumentFields;
062  /** Field declaredFields */
063  private Fields operationDeclaredFields; // fields declared by the operation
064
065  /** Field groupingSelectors */
066  private Map<String, Fields> keySelectors;
067  /** Field sortingSelectors */
068  private Map<String, Fields> sortingSelectors;
069
070  /** Field outGroupingSelector */
071  private Fields outGroupingSelector;
072  /** Field outGroupingFields */
073  private Fields outGroupingFields; // all key fields
074
075  /** Field outValuesSelector */
076  private Fields outValuesSelector;
077  /** Field outValuesFields */
078  private Fields outValuesFields; // all value fields, includes keys
079
080  /** Default constructor. */
081  public Scope()
082    {
083    }
084
085  /**
086   * Copy constructor
087   *
088   * @param scope of type Scope
089   */
090  public Scope( Scope scope )
091    {
092    this.name = scope.getName();
093    copyFields( scope );
094    }
095
096  /**
097   * Tap constructor
098   *
099   * @param outFields of type Fields
100   */
101  public Scope( Fields outFields )
102    {
103    this.kind = Kind.TAP;
104
105    if( outFields == null )
106      throw new IllegalArgumentException( "fields may not be null" );
107
108    this.outGroupingFields = outFields;
109    this.outValuesFields = outFields;
110    }
111
112  /**
113   * Constructor Scope creates a new Scope instance. Used by classes Each and Every.
114   *
115   * @param name                      of type String
116   * @param kind                      of type Kind
117   * @param incomingPassThroughFields //   * @param remainderPassThroughFields   of type Fields
118   * @param operationArgumentFields   of type Fields
119   * @param operationDeclaredFields   of type Fields
120   * @param outGroupingFields         of type Fields
121   * @param outValuesFields           of type Fields
122   */
123  public Scope( String name, Kind kind, Fields incomingPassThroughFields, Fields remainderPassThroughFields, Fields operationArgumentFields, Fields operationDeclaredFields, Fields outGroupingFields, Fields outValuesFields )
124    {
125    this.name = name;
126    this.kind = kind;
127    this.incomingPassThroughFields = incomingPassThroughFields;
128    this.remainderPassThroughFields = remainderPassThroughFields;
129    this.operationArgumentFields = operationArgumentFields;
130    this.operationDeclaredFields = operationDeclaredFields;
131
132    if( outGroupingFields == null )
133      throw new IllegalArgumentException( "grouping may not be null" );
134
135    if( outValuesFields == null )
136      throw new IllegalArgumentException( "values may not be null" );
137
138    if( kind == Kind.EACH )
139      {
140      this.outGroupingSelector = outGroupingFields;
141      this.outGroupingFields = asDeclaration( outGroupingFields );
142      this.outValuesSelector = outValuesFields;
143      this.outValuesFields = asDeclaration( outValuesFields );
144      }
145    else if( kind == Kind.EVERY )
146      {
147      this.outGroupingSelector = outGroupingFields;
148      this.outGroupingFields = asDeclaration( outGroupingFields );
149      this.outValuesSelector = outValuesFields;
150      this.outValuesFields = asDeclaration( outValuesFields );
151      }
152    else
153      {
154      throw new IllegalArgumentException( "may not use the constructor for kind: " + kind );
155      }
156    }
157
158  /**
159   * Constructor Scope creates a new Scope instance. Used by the Group class.
160   *
161   * @param name                    of type String
162   * @param operationDeclaredFields of type Fields
163   * @param outGroupingFields       of type Fields
164   * @param keySelectors            of type Map<String, Fields>
165   * @param sortingSelectors        of type Fields
166   * @param outValuesFields         of type Fields
167   * @param kind                    of type boolean
168   */
169  public Scope( String name, Fields operationDeclaredFields, Fields outGroupingFields, Map<String, Fields> keySelectors, Map<String, Fields> sortingSelectors, Fields outValuesFields, Kind kind )
170    {
171    this.name = name;
172    this.kind = kind;
173
174    if( keySelectors == null )
175      throw new IllegalArgumentException( "grouping may not be null" );
176
177    if( outValuesFields == null )
178      throw new IllegalArgumentException( "values may not be null" );
179
180    this.operationDeclaredFields = operationDeclaredFields;
181    this.outGroupingFields = asDeclaration( outGroupingFields );
182    this.keySelectors = keySelectors;
183    this.sortingSelectors = sortingSelectors; // null ok
184    this.outValuesFields = asDeclaration( outValuesFields );
185    }
186
187  /**
188   * Constructor Scope creates a new Scope instance.
189   *
190   * @param name of type String
191   */
192  public Scope( String name )
193    {
194    this.name = name;
195    }
196
197  public void addPriorNames( Scope incoming, Scope outgoing )
198    {
199    priorNames.addAll( 0, outgoing.priorNames );
200    priorNames.addFirst( incoming.getName() );
201    priorNames.addAll( 0, incoming.priorNames );
202    }
203
204  public String getPriorName()
205    {
206    if( priorNames.isEmpty() )
207      return getName();
208
209    return priorNames.getFirst();
210    }
211
212  public Integer getOrdinal()
213    {
214    return ordinal;
215    }
216
217  public void setOrdinal( Integer ordinal )
218    {
219    this.ordinal = ordinal;
220    }
221
222  public void setNonBlocking( boolean isNonBlocking )
223    {
224    this.isNonBlocking = isNonBlocking;
225    }
226
227  public boolean isNonBlocking()
228    {
229    return isNonBlocking;
230    }
231
232  public boolean isSplice()
233    {
234    return isGroupBy() || isCoGroup() || isMerge() || isHashJoin();
235    }
236
237  public boolean isGroup()
238    {
239    return kind == Kind.GROUPBY || kind == Kind.COGROUP;
240    }
241
242  public boolean isGroupBy()
243    {
244    return kind == Kind.GROUPBY;
245    }
246
247  public boolean isCoGroup()
248    {
249    return kind == Kind.COGROUP;
250    }
251
252  public boolean isMerge()
253    {
254    return kind == Kind.MERGE;
255    }
256
257  public boolean isHashJoin()
258    {
259    return kind == Kind.HASHJOIN;
260    }
261
262  /**
263   * Method isEach returns true if this Scope object represents an Each.
264   *
265   * @return the each (type boolean) of this Scope object.
266   */
267  public boolean isEach()
268    {
269    return kind == Kind.EACH;
270    }
271
272  /**
273   * Method isEvery returns true if this Scope object represents an Every.
274   *
275   * @return the every (type boolean) of this Scope object.
276   */
277  public boolean isEvery()
278    {
279    return kind == Kind.EVERY;
280    }
281
282  /**
283   * Method isTap returns true if this Scope object represents a Tap.
284   *
285   * @return the tap (type boolean) of this Scope object.
286   */
287  public boolean isTap()
288    {
289    return kind == Kind.TAP;
290    }
291
292  /**
293   * Method getName returns the name of this Scope object.
294   *
295   * @return the name (type String) of this Scope object.
296   */
297  public String getName()
298    {
299    return name;
300    }
301
302  /**
303   * Method setName sets the name of this Scope object.
304   *
305   * @param name the name of this Scope object.
306   */
307  public void setName( String name )
308    {
309    this.name = name;
310    }
311
312  /**
313   * Method getRemainderFields returns the remainderFields of this Scope object.
314   *
315   * @return the remainderFields (type Fields) of this Scope object.
316   */
317  public Fields getRemainderPassThroughFields()
318    {
319    return remainderPassThroughFields;
320    }
321
322  /**
323   * Method getArgumentSelector returns the argumentSelector of this Scope object.
324   *
325   * @return the argumentSelector (type Fields) of this Scope object.
326   */
327  public Fields getArgumentsSelector()
328    {
329    return operationArgumentFields;
330    }
331
332  /**
333   * Method getArguments returns the arguments of this Scope object.
334   *
335   * @return the arguments (type Fields) of this Scope object.
336   */
337  public Fields getArgumentsDeclarator()
338    {
339    return asDeclaration( operationArgumentFields );
340    }
341
342  /**
343   * Method getDeclaredFields returns the declaredFields of this Scope object.
344   *
345   * @return the declaredFields (type Fields) of this Scope object.
346   */
347  public Fields getOperationDeclaredFields()
348    {
349    return operationDeclaredFields;
350    }
351
352  /**
353   * Method getGroupingSelectors returns the groupingSelectors of this Scope object.
354   *
355   * @return the groupingSelectors (type Map<String, Fields>) of this Scope object.
356   */
357  public Map<String, Fields> getKeySelectors()
358    {
359    return keySelectors;
360    }
361
362  /**
363   * Method getSortingSelectors returns the sortingSelectors of this Scope object.
364   *
365   * @return the sortingSelectors (type Map<String, Fields>) of this Scope object.
366   */
367  public Map<String, Fields> getSortingSelectors()
368    {
369    return sortingSelectors;
370    }
371
372  /**
373   * Method getOutGroupingSelector returns the outGroupingSelector of this Scope object.
374   *
375   * @return the outGroupingSelector (type Fields) of this Scope object.
376   */
377  public Fields getOutGroupingSelector()
378    {
379    return outGroupingSelector;
380    }
381
382  public Fields getIncomingTapFields()
383    {
384    if( isEvery() )
385      return getOutGroupingFields();
386    else
387      return getOutValuesFields();
388    }
389
390  public Fields getIncomingFunctionArgumentFields()
391    {
392    if( isEvery() )
393      return getOutGroupingFields();
394    else
395      return getOutValuesFields();
396    }
397
398  public Fields getIncomingFunctionPassThroughFields()
399    {
400    if( isEvery() )
401      return getOutGroupingFields();
402    else
403      return getOutValuesFields();
404    }
405
406  public Fields getIncomingAggregatorArgumentFields()
407    {
408    if( isEach() || isTap() )
409      throw new IllegalStateException( "Every cannot follow a Tap or an Each" );
410
411    return getOutValuesFields();
412    }
413
414  public Fields getIncomingAggregatorPassThroughFields()
415    {
416    if( isEach() || isTap() )
417      throw new IllegalStateException( "Every cannot follow a Tap or an Each" );
418
419    return getOutGroupingFields();
420    }
421
422  public Fields getIncomingBufferArgumentFields()
423    {
424    if( isEach() || isTap() )
425      throw new IllegalStateException( "Every cannot follow a Tap or an Each" );
426
427    return getOutValuesFields();
428    }
429
430  public Fields getIncomingBufferPassThroughFields()
431    {
432    if( isEach() || isTap() )
433      throw new IllegalStateException( "Every cannot follow a Tap or an Each" );
434
435    return getOutValuesFields();
436    }
437
438  public Fields getIncomingSpliceFields()
439    {
440    if( isEvery() )
441      return getOutGroupingFields();
442    else
443      return getOutValuesFields();
444    }
445
446  /**
447   * Method getOutGroupingFields returns the outGroupingFields of this Scope object.
448   *
449   * @return the outGroupingFields (type Fields) of this Scope object.
450   */
451  public Fields getOutGroupingFields()
452    {
453    if( !isSplice() )
454      return outGroupingFields;
455
456    Fields first = keySelectors.values().iterator().next();
457
458    // if more than one, this is a merge, so same key names are expected
459    if( keySelectors.size() == 1 || isGroupBy() )
460      return first;
461
462    // handling CoGroup only
463
464    // if given by user as resultGroupFields
465    if( outGroupingFields != null )
466      return outGroupingFields;
467
468    // todo throw an exception if we make it this far
469
470    // if all have the same names, then use for grouping
471    Set<Fields> set = new HashSet<Fields>( keySelectors.values() );
472
473    if( set.size() == 1 )
474      return first;
475
476    return Fields.size( first.size() );
477    }
478
479  public Fields getOutGroupingValueFields()
480    {
481    return getOutValuesFields().subtract( getOutGroupingFields() );
482    }
483
484  /**
485   * Method getOutValuesSelector returns the outValuesSelector of this Scope object.
486   *
487   * @return the outValuesSelector (type Fields) of this Scope object.
488   */
489  public Fields getOutValuesSelector()
490    {
491    return outValuesSelector;
492    }
493
494  /**
495   * Method getOutValuesFields returns the outValuesFields of this Scope object.
496   *
497   * @return the outValuesFields (type Fields) of this Scope object.
498   */
499  public Fields getOutValuesFields()
500    {
501    return outValuesFields;
502    }
503
504  /**
505   * Method copyFields copies the given Scope instance fields to this instance.
506   *
507   * @param scope of type Scope
508   */
509  public void copyFields( Scope scope )
510    {
511    this.kind = scope.kind;
512    this.incomingPassThroughFields = scope.incomingPassThroughFields;
513    this.remainderPassThroughFields = scope.remainderPassThroughFields;
514    this.operationArgumentFields = scope.operationArgumentFields;
515    this.operationDeclaredFields = scope.operationDeclaredFields;
516    this.keySelectors = scope.keySelectors;
517    this.sortingSelectors = scope.sortingSelectors;
518    this.outGroupingSelector = scope.outGroupingSelector;
519    this.outGroupingFields = scope.outGroupingFields;
520    this.outValuesSelector = scope.outValuesSelector;
521    this.outValuesFields = scope.outValuesFields;
522    }
523
524  @Override
525  public String toString()
526    {
527    return print();
528    }
529
530  public String print()
531    {
532    StringBuilder buffer = new StringBuilder();
533
534    if( ordinal != null )
535      buffer.append( "[" ).append( ordinal ).append( "]" );
536
537    if( getOutValuesFields() == null )
538      return buffer.toString(); // endpipes
539
540    buffer.append( "\n" );
541
542    if( keySelectors != null && !keySelectors.isEmpty() )
543      {
544      for( String name : keySelectors.keySet() )
545        {
546        if( buffer.length() != 0 && buffer.charAt( buffer.length() - 1 ) != '\n' )
547          buffer.append( " | " );
548
549        buffer.append( name ).append( keySelectors.get( name ).printVerbose() );
550        }
551
552      buffer.append( "\n" );
553      }
554
555    if( outGroupingFields != null )
556      buffer.append( getOutGroupingFields().printVerbose() ).append( "\n" );
557
558    buffer.append( getOutValuesFields().printVerbose() );
559
560    return buffer.toString();
561    }
562  }