001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner;
022
023import java.io.Serializable;
024import java.util.HashSet;
025import java.util.LinkedList;
026import java.util.Map;
027import java.util.Set;
028
029import cascading.tuple.Fields;
030
031import static cascading.tuple.Fields.asDeclaration;
032
033/** Class Scope is an internal representation of the linkages between operations. */
034public class Scope implements Serializable
035  {
036  /** Enum Kind */
037  public enum Kind
038    {
039      TAP, EACH, EVERY, GROUPBY, COGROUP, HASHJOIN, MERGE
040    }
041
042  /** Field name */
043  private String name;
044  private LinkedList<String> priorNames = new LinkedList<>();
045
046  /** Field ordinal */
047  private Integer ordinal = 0; // do not copy
048  /** Field isStreamed is not accumulated. currently only relevant to HashJoin * */
049  private boolean isNonBlocking = true; // do not copy
050
051  // copied
052
053  /** Field kind */
054  private Kind kind;
055  /** Field incomingPassThroughFields */
056  private Fields incomingPassThroughFields;
057  /** Field remainderPassThroughFields */
058  private Fields remainderPassThroughFields;
059
060  /** Field argumentSelector */
061  private Fields operationArgumentFields;
062  /** Field declaredFields */
063  private Fields operationDeclaredFields; // fields declared by the operation
064
065  /** Field groupingSelectors */
066  private Map<String, Fields> keySelectors;
067  /** Field sortingSelectors */
068  private Map<String, Fields> sortingSelectors;
069
070  /** Field outGroupingSelector */
071  private Fields outGroupingSelector;
072  /** Field outGroupingFields */
073  private Fields outGroupingFields; // all key fields
074
075  /** Field outValuesSelector */
076  private Fields outValuesSelector;
077  /** Field outValuesFields */
078  private Fields outValuesFields; // all value fields, includes keys
079
080  /** Default constructor. */
081  public Scope()
082    {
083    }
084
085  /**
086   * Copy constructor
087   *
088   * @param scope of type Scope
089   */
090  public Scope( Scope scope )
091    {
092    this.name = scope.getName();
093    copyFields( scope );
094    }
095
096  /**
097   * Tap constructor
098   *
099   * @param outFields of type Fields
100   */
101  public Scope( Fields outFields )
102    {
103    this.kind = Kind.TAP;
104
105    if( outFields == null )
106      throw new IllegalArgumentException( "fields may not be null" );
107
108    this.outGroupingFields = outFields;
109    this.outValuesFields = outFields;
110    }
111
112  /**
113   * Constructor Scope creates a new Scope instance. Used by classes Each and Every.
114   *
115   * @param name                      of type String
116   * @param kind                      of type Kind
117   * @param incomingPassThroughFields //   * @param remainderPassThroughFields   of type Fields
118   * @param operationArgumentFields   of type Fields
119   * @param operationDeclaredFields   of type Fields
120   * @param outGroupingFields         of type Fields
121   * @param outValuesFields           of type Fields
122   */
123  public Scope( String name, Kind kind, Fields incomingPassThroughFields, Fields remainderPassThroughFields, Fields operationArgumentFields, Fields operationDeclaredFields, Fields outGroupingFields, Fields outValuesFields )
124    {
125    this.name = name;
126    this.kind = kind;
127    this.incomingPassThroughFields = incomingPassThroughFields;
128    this.remainderPassThroughFields = remainderPassThroughFields;
129    this.operationArgumentFields = operationArgumentFields;
130    this.operationDeclaredFields = operationDeclaredFields;
131
132    if( outGroupingFields == null )
133      throw new IllegalArgumentException( "grouping may not be null" );
134
135    if( outValuesFields == null )
136      throw new IllegalArgumentException( "values may not be null" );
137
138    if( kind == Kind.EACH || kind == Kind.EVERY )
139      {
140      this.outGroupingSelector = outGroupingFields;
141      this.outGroupingFields = asDeclaration( outGroupingFields );
142      this.outValuesSelector = outValuesFields;
143      this.outValuesFields = asDeclaration( outValuesFields );
144      }
145    else
146      {
147      throw new IllegalArgumentException( "may not use the constructor for kind: " + kind );
148      }
149    }
150
151  /**
152   * Constructor Scope creates a new Scope instance. Used by the Group class.
153   *
154   * @param name                    of type String
155   * @param operationDeclaredFields of type Fields
156   * @param outGroupingFields       of type Fields
157   * @param keySelectors            of type Map<String, Fields>
158   * @param sortingSelectors        of type Fields
159   * @param outValuesFields         of type Fields
160   * @param kind                    of type boolean
161   */
162  public Scope( String name, Fields operationDeclaredFields, Fields outGroupingFields, Map<String, Fields> keySelectors, Map<String, Fields> sortingSelectors, Fields outValuesFields, Kind kind )
163    {
164    this.name = name;
165    this.kind = kind;
166
167    if( keySelectors == null )
168      throw new IllegalArgumentException( "grouping may not be null" );
169
170    if( outValuesFields == null )
171      throw new IllegalArgumentException( "values may not be null" );
172
173    this.operationDeclaredFields = operationDeclaredFields;
174    this.outGroupingFields = asDeclaration( outGroupingFields );
175    this.keySelectors = keySelectors;
176    this.sortingSelectors = sortingSelectors; // null ok
177    this.outValuesFields = asDeclaration( outValuesFields );
178    }
179
180  /**
181   * Constructor Scope creates a new Scope instance.
182   *
183   * @param name of type String
184   */
185  public Scope( String name )
186    {
187    this.name = name;
188    }
189
190  public void addPriorNames( Scope incoming, Scope outgoing )
191    {
192    priorNames.addAll( 0, outgoing.priorNames );
193    priorNames.addFirst( incoming.getName() );
194    priorNames.addAll( 0, incoming.priorNames );
195    }
196
197  public String getPriorName()
198    {
199    if( priorNames.isEmpty() )
200      return getName();
201
202    return priorNames.getFirst();
203    }
204
205  public Integer getOrdinal()
206    {
207    return ordinal;
208    }
209
210  public void setOrdinal( Integer ordinal )
211    {
212    this.ordinal = ordinal;
213    }
214
215  public void setNonBlocking( boolean isNonBlocking )
216    {
217    this.isNonBlocking = isNonBlocking;
218    }
219
220  public boolean isNonBlocking()
221    {
222    return isNonBlocking;
223    }
224
225  public boolean isSplice()
226    {
227    return isGroupBy() || isCoGroup() || isMerge() || isHashJoin();
228    }
229
230  public boolean isGroup()
231    {
232    return kind == Kind.GROUPBY || kind == Kind.COGROUP;
233    }
234
235  public boolean isGroupBy()
236    {
237    return kind == Kind.GROUPBY;
238    }
239
240  public boolean isCoGroup()
241    {
242    return kind == Kind.COGROUP;
243    }
244
245  public boolean isMerge()
246    {
247    return kind == Kind.MERGE;
248    }
249
250  public boolean isHashJoin()
251    {
252    return kind == Kind.HASHJOIN;
253    }
254
255  /**
256   * Method isEach returns true if this Scope object represents an Each.
257   *
258   * @return the each (type boolean) of this Scope object.
259   */
260  public boolean isEach()
261    {
262    return kind == Kind.EACH;
263    }
264
265  /**
266   * Method isEvery returns true if this Scope object represents an Every.
267   *
268   * @return the every (type boolean) of this Scope object.
269   */
270  public boolean isEvery()
271    {
272    return kind == Kind.EVERY;
273    }
274
275  /**
276   * Method isTap returns true if this Scope object represents a Tap.
277   *
278   * @return the tap (type boolean) of this Scope object.
279   */
280  public boolean isTap()
281    {
282    return kind == Kind.TAP;
283    }
284
285  /**
286   * Method getName returns the name of this Scope object.
287   *
288   * @return the name (type String) of this Scope object.
289   */
290  public String getName()
291    {
292    return name;
293    }
294
295  /**
296   * Method setName sets the name of this Scope object.
297   *
298   * @param name the name of this Scope object.
299   */
300  public void setName( String name )
301    {
302    this.name = name;
303    }
304
305  /**
306   * Method getRemainderFields returns the remainderFields of this Scope object.
307   *
308   * @return the remainderFields (type Fields) of this Scope object.
309   */
310  public Fields getRemainderPassThroughFields()
311    {
312    return remainderPassThroughFields;
313    }
314
315  /**
316   * Method getArgumentSelector returns the argumentSelector of this Scope object.
317   *
318   * @return the argumentSelector (type Fields) of this Scope object.
319   */
320  public Fields getArgumentsSelector()
321    {
322    return operationArgumentFields;
323    }
324
325  /**
326   * Method getArguments returns the arguments of this Scope object.
327   *
328   * @return the arguments (type Fields) of this Scope object.
329   */
330  public Fields getArgumentsDeclarator()
331    {
332    return asDeclaration( operationArgumentFields );
333    }
334
335  /**
336   * Method getDeclaredFields returns the declaredFields of this Scope object.
337   *
338   * @return the declaredFields (type Fields) of this Scope object.
339   */
340  public Fields getOperationDeclaredFields()
341    {
342    return operationDeclaredFields;
343    }
344
345  /**
346   * Method getGroupingSelectors returns the groupingSelectors of this Scope object.
347   *
348   * @return the groupingSelectors (type Map<String, Fields>) of this Scope object.
349   */
350  public Map<String, Fields> getKeySelectors()
351    {
352    return keySelectors;
353    }
354
355  /**
356   * Method getSortingSelectors returns the sortingSelectors of this Scope object.
357   *
358   * @return the sortingSelectors (type Map<String, Fields>) of this Scope object.
359   */
360  public Map<String, Fields> getSortingSelectors()
361    {
362    return sortingSelectors;
363    }
364
365  /**
366   * Method getOutGroupingSelector returns the outGroupingSelector of this Scope object.
367   *
368   * @return the outGroupingSelector (type Fields) of this Scope object.
369   */
370  public Fields getOutGroupingSelector()
371    {
372    return outGroupingSelector;
373    }
374
375  public Fields getIncomingTapFields()
376    {
377    if( isEvery() )
378      return getOutGroupingFields();
379    else
380      return getOutValuesFields();
381    }
382
383  public Fields getIncomingFunctionArgumentFields()
384    {
385    if( isEvery() )
386      return getOutGroupingFields();
387    else
388      return getOutValuesFields();
389    }
390
391  public Fields getIncomingFunctionPassThroughFields()
392    {
393    if( isEvery() )
394      return getOutGroupingFields();
395    else
396      return getOutValuesFields();
397    }
398
399  public Fields getIncomingAggregatorArgumentFields()
400    {
401    if( isEach() || isTap() )
402      throw new IllegalStateException( "Every cannot follow a Tap or an Each" );
403
404    return getOutValuesFields();
405    }
406
407  public Fields getIncomingAggregatorPassThroughFields()
408    {
409    if( isEach() || isTap() )
410      throw new IllegalStateException( "Every cannot follow a Tap or an Each" );
411
412    return getOutGroupingFields();
413    }
414
415  public Fields getIncomingBufferArgumentFields()
416    {
417    if( isEach() || isTap() )
418      throw new IllegalStateException( "Every cannot follow a Tap or an Each" );
419
420    return getOutValuesFields();
421    }
422
423  public Fields getIncomingBufferPassThroughFields()
424    {
425    if( isEach() || isTap() )
426      throw new IllegalStateException( "Every cannot follow a Tap or an Each" );
427
428    return getOutValuesFields();
429    }
430
431  public Fields getIncomingSpliceFields()
432    {
433    if( isEvery() )
434      return getOutGroupingFields();
435    else
436      return getOutValuesFields();
437    }
438
439  /**
440   * Method getOutGroupingFields returns the outGroupingFields of this Scope object.
441   *
442   * @return the outGroupingFields (type Fields) of this Scope object.
443   */
444  public Fields getOutGroupingFields()
445    {
446    if( !isSplice() )
447      return outGroupingFields;
448
449    Fields first = keySelectors.values().iterator().next();
450
451    // if more than one, this is a merge, so same key names are expected
452    if( keySelectors.size() == 1 || isGroupBy() )
453      return first;
454
455    // handling CoGroup only
456
457    // if given by user as resultGroupFields
458    if( outGroupingFields != null )
459      return outGroupingFields;
460
461    // todo throw an exception if we make it this far
462
463    // if all have the same names, then use for grouping
464    Set<Fields> set = new HashSet<Fields>( keySelectors.values() );
465
466    if( set.size() == 1 )
467      return first;
468
469    return Fields.size( first.size() );
470    }
471
472  public Fields getOutGroupingValueFields()
473    {
474    return getOutValuesFields().subtract( getOutGroupingFields() );
475    }
476
477  /**
478   * Method getOutValuesSelector returns the outValuesSelector of this Scope object.
479   *
480   * @return the outValuesSelector (type Fields) of this Scope object.
481   */
482  public Fields getOutValuesSelector()
483    {
484    return outValuesSelector;
485    }
486
487  /**
488   * Method getOutValuesFields returns the outValuesFields of this Scope object.
489   *
490   * @return the outValuesFields (type Fields) of this Scope object.
491   */
492  public Fields getOutValuesFields()
493    {
494    return outValuesFields;
495    }
496
497  /**
498   * Method copyFields copies the given Scope instance fields to this instance.
499   *
500   * @param scope of type Scope
501   */
502  public void copyFields( Scope scope )
503    {
504    this.kind = scope.kind;
505    this.incomingPassThroughFields = scope.incomingPassThroughFields;
506    this.remainderPassThroughFields = scope.remainderPassThroughFields;
507    this.operationArgumentFields = scope.operationArgumentFields;
508    this.operationDeclaredFields = scope.operationDeclaredFields;
509    this.keySelectors = scope.keySelectors;
510    this.sortingSelectors = scope.sortingSelectors;
511    this.outGroupingSelector = scope.outGroupingSelector;
512    this.outGroupingFields = scope.outGroupingFields;
513    this.outValuesSelector = scope.outValuesSelector;
514    this.outValuesFields = scope.outValuesFields;
515    }
516
517  @Override
518  public String toString()
519    {
520    return print();
521    }
522
523  public String print()
524    {
525    StringBuilder buffer = new StringBuilder();
526
527    if( ordinal != null )
528      buffer.append( "[" ).append( ordinal ).append( "]" );
529
530    if( getOutValuesFields() == null )
531      return buffer.toString(); // endpipes
532
533    buffer.append( "\n" );
534
535    if( keySelectors != null && !keySelectors.isEmpty() )
536      {
537      for( String name : keySelectors.keySet() )
538        {
539        if( buffer.length() != 0 && buffer.charAt( buffer.length() - 1 ) != '\n' )
540          buffer.append( " | " );
541
542        buffer.append( name ).append( keySelectors.get( name ).printVerbose() );
543        }
544
545      buffer.append( "\n" );
546      }
547
548    if( outGroupingFields != null )
549      buffer.append( getOutGroupingFields().printVerbose() ).append( "\n" );
550
551    buffer.append( getOutValuesFields().printVerbose() );
552
553    return buffer.toString();
554    }
555  }