001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.pipe;
022
023import java.lang.reflect.Type;
024import java.util.ArrayList;
025import java.util.Comparator;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.LinkedHashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033
034import cascading.flow.FlowElement;
035import cascading.flow.planner.DeclaresResults;
036import cascading.flow.planner.Scope;
037import cascading.pipe.joiner.BufferJoin;
038import cascading.pipe.joiner.InnerJoin;
039import cascading.pipe.joiner.Joiner;
040import cascading.tuple.Fields;
041import cascading.tuple.FieldsResolverException;
042import cascading.tuple.TupleException;
043import cascading.tuple.coerce.Coercions;
044import cascading.tuple.type.CoercibleType;
045import cascading.util.Util;
046
047import static java.util.Arrays.asList;
048
049/**
050 * The base class for {@link GroupBy}, {@link CoGroup}, {@link Merge}, and {@link HashJoin}. This class should not be used directly.
051 *
052 * @see GroupBy
053 * @see CoGroup
054 * @see Merge
055 * @see HashJoin
056 */
057public class Splice extends Pipe
058  {
059  static enum Kind
060    {
061      GroupBy, CoGroup, Merge, Join
062    }
063
064  private Kind kind;
065  /** Field spliceName */
066  private String spliceName;
067  /** Field pipes */
068  private final List<Pipe> pipes = new ArrayList<Pipe>();
069  /** Field groupFieldsMap */
070  protected final Map<String, Fields> keyFieldsMap = new LinkedHashMap<String, Fields>(); // keep order
071  /** Field sortFieldsMap */
072  protected Map<String, Fields> sortFieldsMap = new LinkedHashMap<String, Fields>(); // keep order
073  /** Field reverseOrder */
074  private boolean reverseOrder = false;
075  /** Field declaredFields */
076  protected Fields declaredFields;
077  /** Field resultGroupFields */
078  protected Fields resultGroupFields;
079  /** Field repeat */
080  private int numSelfJoins = 0;
081  /** Field coGrouper */
082  private Joiner joiner;
083
084  /** Field pipePos */
085  private transient Map<String, Integer> pipePos;
086
087  /**
088   * Constructor Splice creates a new Splice instance.
089   *
090   * @param lhs            of type Pipe
091   * @param lhsGroupFields of type Fields
092   * @param rhs            of type Pipe
093   * @param rhsGroupFields of type Fields
094   * @param declaredFields of type Fields
095   */
096  protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields )
097    {
098    this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, null, null );
099    }
100
101  /**
102   * Constructor Splice creates a new Splice instance.
103   *
104   * @param lhs               of type Pipe
105   * @param lhsGroupFields    of type Fields
106   * @param rhs               of type Pipe
107   * @param rhsGroupFields    of type Fields
108   * @param declaredFields    of type Fields
109   * @param resultGroupFields of type Fields
110   */
111  protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields )
112    {
113    this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields, null );
114    }
115
116  /**
117   * Constructor Splice creates a new Splice instance.
118   *
119   * @param lhs            of type Pipe
120   * @param lhsGroupFields of type Fields
121   * @param rhs            of type Pipe
122   * @param rhsGroupFields of type Fields
123   * @param declaredFields of type Fields
124   * @param joiner         of type CoGrouper
125   */
126  protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Joiner joiner )
127    {
128    this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ), declaredFields, joiner );
129    }
130
131  /**
132   * Constructor Splice creates a new Splice instance.
133   *
134   * @param lhs               of type Pipe
135   * @param lhsGroupFields    of type Fields
136   * @param rhs               of type Pipe
137   * @param rhsGroupFields    of type Fields
138   * @param declaredFields    of type Fields
139   * @param resultGroupFields of type Fields
140   * @param joiner            of type Joiner
141   */
142  protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner )
143    {
144    this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ), declaredFields, resultGroupFields, joiner );
145    }
146
147  /**
148   * Constructor Splice creates a new Splice instance.
149   *
150   * @param lhs            of type Pipe
151   * @param lhsGroupFields of type Fields
152   * @param rhs            of type Pipe
153   * @param rhsGroupFields of type Fields
154   * @param joiner         of type CoGrouper
155   */
156  protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Joiner joiner )
157    {
158    this( lhs, lhsGroupFields, rhs, rhsGroupFields, null, joiner );
159    }
160
161  /**
162   * Constructor Splice creates a new Splice instance.
163   *
164   * @param lhs            of type Pipe
165   * @param lhsGroupFields of type Fields
166   * @param rhs            of type Pipe
167   * @param rhsGroupFields of type Fields
168   */
169  protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields )
170    {
171    this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ) );
172    }
173
174  /**
175   * Constructor Splice creates a new Splice instance.
176   *
177   * @param pipes of type Pipe...
178   */
179  protected Splice( Pipe... pipes )
180    {
181    this( pipes, (Fields[]) null );
182    }
183
184  /**
185   * Constructor Splice creates a new Splice instance.
186   *
187   * @param pipes       of type Pipe[]
188   * @param groupFields of type Fields[]
189   */
190  protected Splice( Pipe[] pipes, Fields[] groupFields )
191    {
192    this( null, pipes, groupFields, null, null );
193    }
194
195  /**
196   * Constructor Splice creates a new Splice instance.
197   *
198   * @param spliceName  of type String
199   * @param pipes       of type Pipe[]
200   * @param groupFields of type Fields[]
201   */
202  protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields )
203    {
204    this( spliceName, pipes, groupFields, null, null );
205    }
206
207  /**
208   * Constructor Splice creates a new Splice instance.
209   *
210   * @param spliceName     of type String
211   * @param pipes          of type Pipe[]
212   * @param groupFields    of type Fields[]
213   * @param declaredFields of type Fields
214   */
215  protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields )
216    {
217    this( spliceName, pipes, groupFields, declaredFields, null );
218    }
219
220  /**
221   * Constructor Splice creates a new Splice instance.
222   *
223   * @param spliceName        of type String
224   * @param pipes             of type Pipe[]
225   * @param groupFields       of type Fields[]
226   * @param declaredFields    of type Fields
227   * @param resultGroupFields of type Fields
228   */
229  protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields )
230    {
231    this( spliceName, pipes, groupFields, declaredFields, resultGroupFields, null );
232    }
233
234  /**
235   * Constructor Splice creates a new Splice instance.
236   *
237   * @param pipes          of type Pipe[]
238   * @param groupFields    of type Fields[]
239   * @param declaredFields of type Fields
240   * @param joiner         of type CoGrouper
241   */
242  protected Splice( Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Joiner joiner )
243    {
244    this( null, pipes, groupFields, declaredFields, null, joiner );
245    }
246
247  /**
248   * Constructor Splice creates a new Splice instance.
249   *
250   * @param pipes             of type Pipe[]
251   * @param groupFields       of type Fields[]
252   * @param declaredFields    of type Fields
253   * @param resultGroupFields of type Fields
254   * @param joiner            of type Joiner
255   */
256  protected Splice( Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner )
257    {
258    this( null, pipes, groupFields, declaredFields, resultGroupFields, joiner );
259    }
260
261  /**
262   * Constructor Splice creates a new Splice instance.
263   *
264   * @param spliceName     of type String
265   * @param pipes          of type Pipe[]
266   * @param groupFields    of type Fields[]
267   * @param declaredFields of type Fields
268   * @param joiner         of type CoGrouper
269   */
270  protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner )
271    {
272    if( pipes == null )
273      throw new IllegalArgumentException( "pipes array may not be null" );
274
275    setKind();
276    this.spliceName = spliceName;
277
278    int uniques = new HashSet<Pipe>( asList( Pipe.resolvePreviousAll( pipes ) ) ).size();
279
280    if( pipes.length > 1 && uniques == 1 )
281      {
282      if( isMerge() )
283        throw new IllegalArgumentException( "may not merge a pipe with itself without intermediate operations after the split" );
284
285      if( groupFields == null )
286        throw new IllegalArgumentException( "groupFields array may not be null" );
287
288      if( new HashSet<Fields>( asList( groupFields ) ).size() != 1 )
289        throw new IllegalArgumentException( "all groupFields must be identical" );
290
291      addPipe( pipes[ 0 ] );
292      this.numSelfJoins = pipes.length - 1;
293      this.keyFieldsMap.put( pipes[ 0 ].getName(), groupFields[ 0 ] );
294
295      if( resultGroupFields != null && groupFields[ 0 ].size() * pipes.length != resultGroupFields.size() )
296        throw new IllegalArgumentException( "resultGroupFields and cogroup joined fields must be same size" );
297      }
298    else
299      {
300      int last = -1;
301      for( int i = 0; i < pipes.length; i++ )
302        {
303        addPipe( pipes[ i ] );
304
305        if( groupFields == null || groupFields.length == 0 )
306          {
307          addGroupFields( pipes[ i ], Fields.FIRST );
308          continue;
309          }
310
311        if( last != -1 && last != groupFields[ i ].size() )
312          throw new IllegalArgumentException( "all groupFields must be same size" );
313
314        last = groupFields[ i ].size();
315        addGroupFields( pipes[ i ], groupFields[ i ] );
316        }
317
318      if( resultGroupFields != null && last * pipes.length != resultGroupFields.size() )
319        throw new IllegalArgumentException( "resultGroupFields and cogroup resulting joined fields must be same size" );
320      }
321
322    this.declaredFields = declaredFields;
323    this.resultGroupFields = resultGroupFields;
324    this.joiner = joiner;
325
326    verifyCoGrouper();
327    }
328
329  /**
330   * Constructor Splice creates a new Splice instance.
331   *
332   * @param spliceName     of type String
333   * @param lhs            of type Pipe
334   * @param lhsGroupFields of type Fields
335   * @param rhs            of type Pipe
336   * @param rhsGroupFields of type Fields
337   * @param declaredFields of type Fields
338   */
339  protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields )
340    {
341    this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields );
342    this.spliceName = spliceName;
343    }
344
345  /**
346   * Constructor Splice creates a new Splice instance.
347   *
348   * @param spliceName        of type String
349   * @param lhs               of type Pipe
350   * @param lhsGroupFields    of type Fields
351   * @param rhs               of type Pipe
352   * @param rhsGroupFields    of type Fields
353   * @param declaredFields    of type Fields
354   * @param resultGroupFields of type Fields
355   */
356  protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields )
357    {
358    this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields );
359    this.spliceName = spliceName;
360    }
361
362  /**
363   * Constructor Splice creates a new Splice instance.
364   *
365   * @param spliceName     of type String
366   * @param lhs            of type Pipe
367   * @param lhsGroupFields of type Fields
368   * @param rhs            of type Pipe
369   * @param rhsGroupFields of type Fields
370   * @param declaredFields of type Fields
371   * @param joiner         of type CoGrouper
372   */
373  protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Joiner joiner )
374    {
375    this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, joiner );
376    this.spliceName = spliceName;
377    }
378
379  /**
380   * Constructor Splice creates a new Splice instance.
381   *
382   * @param spliceName        of type String
383   * @param lhs               of type Pipe
384   * @param lhsGroupFields    of type Fields
385   * @param rhs               of type Pipe
386   * @param rhsGroupFields    of type Fields
387   * @param declaredFields    of type Fields
388   * @param resultGroupFields of type Fields
389   * @param joiner            of type Joiner
390   */
391  protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner )
392    {
393    this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields, joiner );
394    this.spliceName = spliceName;
395    }
396
397  /**
398   * Constructor Splice creates a new Splice instance.
399   *
400   * @param spliceName     of type String
401   * @param lhs            of type Pipe
402   * @param lhsGroupFields of type Fields
403   * @param rhs            of type Pipe
404   * @param rhsGroupFields of type Fields
405   * @param joiner         of type CoGrouper
406   */
407  protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Joiner joiner )
408    {
409    this( lhs, lhsGroupFields, rhs, rhsGroupFields, joiner );
410    this.spliceName = spliceName;
411    }
412
413  /**
414   * Constructor Splice creates a new Splice instance.
415   *
416   * @param spliceName     of type String
417   * @param lhs            of type Pipe
418   * @param lhsGroupFields of type Fields
419   * @param rhs            of type Pipe
420   * @param rhsGroupFields of type Fields
421   */
422  protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields )
423    {
424    this( lhs, lhsGroupFields, rhs, rhsGroupFields );
425    this.spliceName = spliceName;
426    }
427
428  /**
429   * Constructor Splice creates a new Splice instance.
430   *
431   * @param spliceName of type String
432   * @param pipes      of type Pipe...
433   */
434  protected Splice( String spliceName, Pipe... pipes )
435    {
436    this( pipes );
437    this.spliceName = spliceName;
438    }
439
440  /**
441   * Constructor Splice creates a new Splice instance.
442   *
443   * @param pipe           of type Pipe
444   * @param groupFields    of type Fields
445   * @param numSelfJoins   of type int
446   * @param declaredFields of type Fields
447   */
448  protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields )
449    {
450    this( pipe, groupFields, numSelfJoins );
451    this.declaredFields = declaredFields;
452    }
453
454  /**
455   * Constructor Splice creates a new Splice instance.
456   *
457   * @param pipe              of type Pipe
458   * @param groupFields       of type Fields
459   * @param numSelfJoins      of type int
460   * @param declaredFields    of type Fields
461   * @param resultGroupFields of type Fields
462   */
463  protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields )
464    {
465    this( pipe, groupFields, numSelfJoins );
466    this.declaredFields = declaredFields;
467    this.resultGroupFields = resultGroupFields;
468
469    if( resultGroupFields != null && groupFields.size() * numSelfJoins != resultGroupFields.size() )
470      throw new IllegalArgumentException( "resultGroupFields and cogroup resulting join fields must be same size" );
471    }
472
473  /**
474   * Constructor Splice creates a new Splice instance.
475   *
476   * @param pipe           of type Pipe
477   * @param groupFields    of type Fields
478   * @param numSelfJoins   of type int
479   * @param declaredFields of type Fields
480   * @param joiner         of type CoGrouper
481   */
482  protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Joiner joiner )
483    {
484    this( pipe, groupFields, numSelfJoins, declaredFields );
485    this.joiner = joiner;
486
487    verifyCoGrouper();
488    }
489
490  /**
491   * Constructor Splice creates a new Splice instance.
492   *
493   * @param pipe              of type Pipe
494   * @param groupFields       of type Fields
495   * @param numSelfJoins      of type int
496   * @param declaredFields    of type Fields
497   * @param resultGroupFields of type Fields
498   * @param joiner            of type Joiner
499   */
500  protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields, Joiner joiner )
501    {
502    this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields );
503    this.joiner = joiner;
504
505    verifyCoGrouper();
506    }
507
508  /**
509   * Constructor Splice creates a new Splice instance.
510   *
511   * @param pipe         of type Pipe
512   * @param groupFields  of type Fields
513   * @param numSelfJoins of type int
514   * @param joiner       of type CoGrouper
515   */
516  protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Joiner joiner )
517    {
518    setKind();
519    addPipe( pipe );
520    this.keyFieldsMap.put( pipe.getName(), groupFields );
521    this.numSelfJoins = numSelfJoins;
522    this.joiner = joiner;
523
524    verifyCoGrouper();
525    }
526
527  /**
528   * Constructor Splice creates a new Splice instance.
529   *
530   * @param pipe         of type Pipe
531   * @param groupFields  of type Fields
532   * @param numSelfJoins of type int
533   */
534  protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins )
535    {
536    this( pipe, groupFields, numSelfJoins, (Joiner) null );
537    }
538
539  /**
540   * Constructor Splice creates a new Splice instance.
541   *
542   * @param spliceName     of type String
543   * @param pipe           of type Pipe
544   * @param groupFields    of type Fields
545   * @param numSelfJoins   of type int
546   * @param declaredFields of type Fields
547   */
548  protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields )
549    {
550    this( pipe, groupFields, numSelfJoins, declaredFields );
551    this.spliceName = spliceName;
552    }
553
554  /**
555   * Constructor Splice creates a new Splice instance.
556   *
557   * @param spliceName        of type String
558   * @param pipe              of type Pipe
559   * @param groupFields       of type Fields
560   * @param numSelfJoins      of type int
561   * @param declaredFields    of type Fields
562   * @param resultGroupFields of type Fields
563   */
564  protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields )
565    {
566    this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields );
567    this.spliceName = spliceName;
568    }
569
570  /**
571   * Constructor Splice creates a new Splice instance.
572   *
573   * @param spliceName     of type String
574   * @param pipe           of type Pipe
575   * @param groupFields    of type Fields
576   * @param numSelfJoins   of type int
577   * @param declaredFields of type Fields
578   * @param joiner         of type CoGrouper
579   */
580  protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Joiner joiner )
581    {
582    this( pipe, groupFields, numSelfJoins, declaredFields, joiner );
583    this.spliceName = spliceName;
584    }
585
586  /**
587   * Constructor Splice creates a new Splice instance.
588   *
589   * @param spliceName        of type String
590   * @param pipe              of type Pipe
591   * @param groupFields       of type Fields
592   * @param numSelfJoins      of type int
593   * @param declaredFields    of type Fields
594   * @param resultGroupFields of type Fields
595   * @param joiner            of type Joiner
596   */
597  protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields, Joiner joiner )
598    {
599    this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields, joiner );
600    this.spliceName = spliceName;
601    }
602
603  /**
604   * Constructor Splice creates a new Splice instance.
605   *
606   * @param spliceName   of type String
607   * @param pipe         of type Pipe
608   * @param groupFields  of type Fields
609   * @param numSelfJoins of type int
610   * @param joiner       of type CoGrouper
611   */
612  protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Joiner joiner )
613    {
614    this( pipe, groupFields, numSelfJoins, joiner );
615    this.spliceName = spliceName;
616    }
617
618  /**
619   * Constructor Splice creates a new Splice instance.
620   *
621   * @param spliceName   of type String
622   * @param pipe         of type Pipe
623   * @param groupFields  of type Fields
624   * @param numSelfJoins of type int
625   */
626  protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins )
627    {
628    this( pipe, groupFields, numSelfJoins );
629    this.spliceName = spliceName;
630    }
631
632  ////////////
633  // GROUPBY
634  ////////////
635
636  /**
637   * Constructor Splice creates a new Splice instance where grouping occurs on {@link Fields#ALL} fields.
638   *
639   * @param pipe of type Pipe
640   */
641  protected Splice( Pipe pipe )
642    {
643    this( null, pipe, Fields.ALL, null, false );
644    }
645
646  /**
647   * Constructor Splice creates a new Splice instance.
648   *
649   * @param pipe        of type Pipe
650   * @param groupFields of type Fields
651   */
652  protected Splice( Pipe pipe, Fields groupFields )
653    {
654    this( null, pipe, groupFields, null, false );
655    }
656
657  /**
658   * Constructor Splice creates a new Splice instance.
659   *
660   * @param spliceName  of type String
661   * @param pipe        of type Pipe
662   * @param groupFields of type Fields
663   */
664  protected Splice( String spliceName, Pipe pipe, Fields groupFields )
665    {
666    this( spliceName, pipe, groupFields, null, false );
667    }
668
669  /**
670   * Constructor Splice creates a new Splice instance.
671   *
672   * @param pipe        of type Pipe
673   * @param groupFields of type Fields
674   * @param sortFields  of type Fields
675   */
676  protected Splice( Pipe pipe, Fields groupFields, Fields sortFields )
677    {
678    this( null, pipe, groupFields, sortFields, false );
679    }
680
681  /**
682   * Constructor Splice creates a new Splice instance.
683   *
684   * @param spliceName  of type String
685   * @param pipe        of type Pipe
686   * @param groupFields of type Fields
687   * @param sortFields  of type Fields
688   */
689  protected Splice( String spliceName, Pipe pipe, Fields groupFields, Fields sortFields )
690    {
691    this( spliceName, pipe, groupFields, sortFields, false );
692    }
693
694  /**
695   * Constructor Splice creates a new Splice instance.
696   *
697   * @param pipe         of type Pipe
698   * @param groupFields  of type Fields
699   * @param sortFields   of type Fields
700   * @param reverseOrder of type boolean
701   */
702  protected Splice( Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder )
703    {
704    this( null, pipe, groupFields, sortFields, reverseOrder );
705    }
706
707  /**
708   * Constructor Splice creates a new Splice instance.
709   *
710   * @param spliceName   of type String
711   * @param pipe         of type Pipe
712   * @param groupFields  of type Fields
713   * @param sortFields   of type Fields
714   * @param reverseOrder of type boolean
715   */
716  protected Splice( String spliceName, Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder )
717    {
718    this( spliceName, Pipe.pipes( pipe ), groupFields, sortFields, reverseOrder );
719    }
720
721  /**
722   * Constructor Splice creates a new Splice instance.
723   *
724   * @param pipes       of type Pipe
725   * @param groupFields of type Fields
726   */
727  protected Splice( Pipe[] pipes, Fields groupFields )
728    {
729    this( null, pipes, groupFields, null, false );
730    }
731
732  /**
733   * Constructor Splice creates a new Splice instance.
734   *
735   * @param spliceName  of type String
736   * @param pipes       of type Pipe
737   * @param groupFields of type Fields
738   */
739  protected Splice( String spliceName, Pipe[] pipes, Fields groupFields )
740    {
741    this( spliceName, pipes, groupFields, null, false );
742    }
743
744  /**
745   * Constructor Splice creates a new Splice instance.
746   *
747   * @param pipes       of type Pipe
748   * @param groupFields of type Fields
749   * @param sortFields  of type Fields
750   */
751  protected Splice( Pipe[] pipes, Fields groupFields, Fields sortFields )
752    {
753    this( null, pipes, groupFields, sortFields, false );
754    }
755
756  /**
757   * Constructor Splice creates a new Splice instance.
758   *
759   * @param spliceName  of type String
760   * @param pipe        of type Pipe
761   * @param groupFields of type Fields
762   * @param sortFields  of type Fields
763   */
764  protected Splice( String spliceName, Pipe[] pipe, Fields groupFields, Fields sortFields )
765    {
766    this( spliceName, pipe, groupFields, sortFields, false );
767    }
768
769  /**
770   * Constructor Splice creates a new Splice instance.
771   *
772   * @param pipes        of type Pipe
773   * @param groupFields  of type Fields
774   * @param sortFields   of type Fields
775   * @param reverseOrder of type boolean
776   */
777  protected Splice( Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder )
778    {
779    this( null, pipes, groupFields, sortFields, reverseOrder );
780    }
781
782  /**
783   * Constructor Splice creates a new Splice instance.
784   *
785   * @param spliceName   of type String
786   * @param pipes        of type Pipe[]
787   * @param groupFields  of type Fields
788   * @param sortFields   of type Fields
789   * @param reverseOrder of type boolean
790   */
791  protected Splice( String spliceName, Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder )
792    {
793    if( pipes == null )
794      throw new IllegalArgumentException( "pipes array may not be null" );
795
796    if( groupFields == null )
797      throw new IllegalArgumentException( "groupFields may not be null" );
798
799    setKind();
800    this.spliceName = spliceName;
801
802    for( Pipe pipe : pipes )
803      {
804      addPipe( pipe );
805      this.keyFieldsMap.put( pipe.getName(), groupFields );
806
807      if( sortFields != null )
808        this.sortFieldsMap.put( pipe.getName(), sortFields );
809      }
810
811    this.reverseOrder = reverseOrder;
812    this.joiner = new InnerJoin();
813    }
814
815  private void verifyCoGrouper()
816    {
817    if( isJoin() && joiner instanceof BufferJoin )
818      throw new IllegalArgumentException( "invalid joiner, may not use BufferJoiner in a HashJoin" );
819
820    if( joiner == null )
821      {
822      joiner = new InnerJoin();
823      return;
824      }
825
826    if( joiner.numJoins() == -1 )
827      return;
828
829    int joins = Math.max( numSelfJoins, keyFieldsMap.size() - 1 ); // joining two streams is one join
830
831    if( joins != joiner.numJoins() )
832      throw new IllegalArgumentException( "invalid joiner, only accepts " + joiner.numJoins() + " joins, there are: " + joins );
833    }
834
835  private void setKind()
836    {
837    if( this instanceof GroupBy )
838      kind = Kind.GroupBy;
839    else if( this instanceof CoGroup )
840      kind = Kind.CoGroup;
841    else if( this instanceof Merge )
842      kind = Kind.Merge;
843    else
844      kind = Kind.Join;
845    }
846
847  /**
848   * Method getDeclaredFields returns the declaredFields of this Splice object.
849   *
850   * @return the declaredFields (type Fields) of this Splice object.
851   */
852  public Fields getDeclaredFields()
853    {
854    return declaredFields;
855    }
856
857  private void addPipe( Pipe pipe )
858    {
859    if( pipe.getName() == null )
860      throw new IllegalArgumentException( "each input pipe must have a name" );
861
862    pipes.add( pipe ); // allow same pipe
863    }
864
865  private void addGroupFields( Pipe pipe, Fields fields )
866    {
867    if( keyFieldsMap.containsKey( pipe.getName() ) )
868      throw new IllegalArgumentException( "each input pipe branch must be uniquely named" );
869
870    keyFieldsMap.put( pipe.getName(), fields );
871    }
872
873  @Override
874  public String getName()
875    {
876    if( spliceName != null )
877      return spliceName;
878
879    StringBuffer buffer = new StringBuffer();
880
881    for( Pipe pipe : pipes )
882      {
883      if( buffer.length() != 0 )
884        {
885        if( isGroupBy() || isMerge() )
886          buffer.append( "+" );
887        else if( isCoGroup() || isJoin() )
888          buffer.append( "*" ); // more semantically correct
889        }
890
891      buffer.append( pipe.getName() );
892      }
893
894    spliceName = buffer.toString();
895
896    return spliceName;
897    }
898
899  @Override
900  public Pipe[] getPrevious()
901    {
902    return pipes.toArray( new Pipe[ pipes.size() ] );
903    }
904
905  /**
906   * Method getGroupingSelectors returns the groupingSelectors of this Splice object.
907   *
908   * @return the groupingSelectors (type Map<String, Fields>) of this Splice object.
909   */
910  public Map<String, Fields> getKeySelectors()
911    {
912    return keyFieldsMap;
913    }
914
915  /**
916   * Method getSortingSelectors returns the sortingSelectors of this Splice object.
917   *
918   * @return the sortingSelectors (type Map<String, Fields>) of this Splice object.
919   */
920  public Map<String, Fields> getSortingSelectors()
921    {
922    return sortFieldsMap;
923    }
924
925  /**
926   * Method isSorted returns true if this Splice instance is sorting values other than the group fields.
927   *
928   * @return the sorted (type boolean) of this Splice object.
929   */
930  public boolean isSorted()
931    {
932    return !sortFieldsMap.isEmpty();
933    }
934
935  /**
936   * Method isSortReversed returns true if sorting is reversed.
937   *
938   * @return the sortReversed (type boolean) of this Splice object.
939   */
940  public boolean isSortReversed()
941    {
942    return reverseOrder;
943    }
944
945  public synchronized Map<String, Integer> getPipePos()
946    {
947    if( pipePos != null )
948      return pipePos;
949
950    pipePos = new HashMap<String, Integer>();
951
952    int pos = 0;
953    for( Object pipe : pipes )
954      pipePos.put( ( (Pipe) pipe ).getName(), pos++ );
955
956    return pipePos;
957    }
958
959  public Joiner getJoiner()
960    {
961    return joiner;
962    }
963
964  /**
965   * Method isGroupBy returns true if this Splice instance will perform a GroupBy operation.
966   *
967   * @return the groupBy (type boolean) of this Splice object.
968   */
969  public final boolean isGroupBy()
970    {
971    return kind == Kind.GroupBy;
972    }
973
974  public final boolean isCoGroup()
975    {
976    return kind == Kind.CoGroup;
977    }
978
979  public final boolean isMerge()
980    {
981    return kind == Kind.Merge;
982    }
983
984  public final boolean isJoin()
985    {
986    return kind == Kind.Join;
987    }
988
989  public int getNumSelfJoins()
990    {
991    return numSelfJoins;
992    }
993
994  public boolean isSelfJoin()
995    {
996    return numSelfJoins != 0;
997    }
998
999  // FIELDS
1000
1001  @Override
1002  public Scope outgoingScopeFor( Set<Scope> incomingScopes )
1003    {
1004    Map<String, Fields> groupingSelectors = resolveGroupingSelectors( incomingScopes );
1005    Map<String, Fields> sortingSelectors = resolveSortingSelectors( incomingScopes );
1006    Fields declared = resolveDeclared( incomingScopes );
1007
1008    Fields outGroupingFields = resultGroupFields;
1009
1010    if( outGroupingFields == null && isCoGroup() )
1011      outGroupingFields = createJoinFields( incomingScopes, groupingSelectors, declared );
1012
1013    // for Group, the outgoing fields are the same as those declared
1014    Scope.Kind kind = getScopeKind();
1015
1016    return new Scope( getName(), declared, outGroupingFields, groupingSelectors, sortingSelectors, declared, kind );
1017    }
1018
1019  private Scope.Kind getScopeKind()
1020    {
1021    switch( kind )
1022      {
1023      case GroupBy:
1024        return Scope.Kind.GROUPBY;
1025      case CoGroup:
1026        return Scope.Kind.COGROUP;
1027      case Merge:
1028        return Scope.Kind.MERGE;
1029      case Join:
1030        return Scope.Kind.HASHJOIN;
1031      }
1032
1033    throw new IllegalStateException( "unknown kind: " + kind );
1034    }
1035
1036  private Fields createJoinFields( Set<Scope> incomingScopes, Map<String, Fields> groupingSelectors, Fields declared )
1037    {
1038    if( declared.isNone() )
1039      declared = Fields.UNKNOWN;
1040
1041    Map<String, Fields> incomingFields = new HashMap<String, Fields>();
1042
1043    for( Scope scope : incomingScopes )
1044      incomingFields.put( scope.getName(), scope.getIncomingSpliceFields() );
1045
1046    Fields outGroupingFields = Fields.NONE;
1047
1048    int offset = 0;
1049    for( Pipe pipe : pipes ) // need to retain order of pipes
1050      {
1051      String pipeName = pipe.getName();
1052      Fields pipeGroupingSelector = groupingSelectors.get( pipeName );
1053      Fields incomingField = incomingFields.get( pipeName );
1054
1055      if( !pipeGroupingSelector.isNone() )
1056        {
1057        Fields offsetFields = incomingField.selectPos( pipeGroupingSelector, offset );
1058        Fields resolvedSelect = declared.select( offsetFields );
1059
1060        outGroupingFields = outGroupingFields.append( resolvedSelect );
1061        }
1062
1063      offset += incomingField.size();
1064      }
1065
1066    return outGroupingFields;
1067    }
1068
1069  Map<String, Fields> resolveGroupingSelectors( Set<Scope> incomingScopes )
1070    {
1071    try
1072      {
1073      Map<String, Fields> groupingSelectors = getKeySelectors();
1074      Map<String, Fields> groupingFields = resolveSelectorsAgainstIncoming( incomingScopes, groupingSelectors, "grouping" );
1075
1076      if( !verifySameSize( groupingFields ) )
1077        throw new OperatorException( this, "all grouping fields must be same size: " + toString() );
1078
1079      verifySameTypes( groupingSelectors, groupingFields );
1080
1081      return groupingFields;
1082      }
1083    catch( FieldsResolverException exception )
1084      {
1085      throw new OperatorException( this, OperatorException.Kind.grouping, exception.getSourceFields(), exception.getSelectorFields(), exception );
1086      }
1087    catch( RuntimeException exception )
1088      {
1089      throw new OperatorException( this, "could not resolve grouping selector in: " + this, exception );
1090      }
1091    }
1092
1093  private boolean verifySameTypes( Map<String, Fields> groupingSelectors, Map<String, Fields> groupingFields )
1094    {
1095    // create array of field positions with comparators from the grouping selectors
1096    // unsure which side has the comparators declared so make a union
1097    boolean[] hasComparator = new boolean[ groupingFields.values().iterator().next().size() ];
1098
1099    for( Map.Entry<String, Fields> entry : groupingSelectors.entrySet() )
1100      {
1101      Comparator[] comparatorsArray = entry.getValue().getComparators();
1102
1103      for( int i = 0; i < comparatorsArray.length; i++ )
1104        hasComparator[ i ] = hasComparator[ i ] || comparatorsArray[ i ] != null;
1105      }
1106
1107    // compare all the rhs fields with the lhs (lhs and rhs are arbitrary here)
1108    Iterator<Fields> iterator = groupingFields.values().iterator();
1109    Fields lhsFields = iterator.next();
1110    Type[] lhsTypes = lhsFields.getTypes();
1111
1112    // if types are null, no basis for comparison
1113    if( lhsTypes == null )
1114      return true;
1115
1116    while( iterator.hasNext() )
1117      {
1118      Fields rhsFields = iterator.next();
1119      Type[] rhsTypes = rhsFields.getTypes();
1120
1121      // if types are null, no basis for comparison
1122      if( rhsTypes == null )
1123        return true;
1124
1125      for( int i = 0; i < lhsTypes.length; i++ )
1126        {
1127        if( hasComparator[ i ] )
1128          continue;
1129
1130        Type lhs = lhsTypes[ i ];
1131        Type rhs = rhsTypes[ i ];
1132
1133        lhs = getCanonicalType( lhs );
1134        rhs = getCanonicalType( rhs );
1135
1136        if( lhs.equals( rhs ) )
1137          continue;
1138
1139        Fields lhsError = new Fields( lhsFields.get( i ), lhsFields.getType( i ) );
1140        Fields rhsError = new Fields( rhsFields.get( i ), rhsFields.getType( i ) );
1141
1142        throw new OperatorException( this, "grouping fields must declare same types:" + lhsError.printVerbose() + " not same as " + rhsError.printVerbose() );
1143        }
1144      }
1145
1146    return true;
1147    }
1148
1149  private Type getCanonicalType( Type type )
1150    {
1151    if( type instanceof CoercibleType )
1152      type = ( (CoercibleType) type ).getCanonicalType();
1153
1154    // if one side is primitive, normalize to its primitive wrapper type
1155    if( type instanceof Class )
1156      type = Coercions.asNonPrimitive( (Class) type );
1157
1158    return type;
1159    }
1160
1161  private boolean verifySameSize( Map<String, Fields> groupingFields )
1162    {
1163    Iterator<Fields> iterator = groupingFields.values().iterator();
1164    int size = iterator.next().size();
1165
1166    while( iterator.hasNext() )
1167      {
1168      Fields groupingField = iterator.next();
1169
1170      if( groupingField.size() != size )
1171        return false;
1172
1173      size = groupingField.size();
1174      }
1175
1176    return true;
1177    }
1178
1179  private Map<String, Fields> resolveSelectorsAgainstIncoming( Set<Scope> incomingScopes, Map<String, Fields> selectors, String type )
1180    {
1181    Map<String, Fields> resolvedFields = new HashMap<String, Fields>();
1182
1183    for( Scope incomingScope : incomingScopes )
1184      {
1185      Fields selector = selectors.get( incomingScope.getName() );
1186
1187      if( selector == null )
1188        throw new OperatorException( this, "no " + type + " selector found for: " + incomingScope.getName() );
1189
1190      Fields incomingFields;
1191
1192      if( selector.isNone() )
1193        incomingFields = Fields.NONE;
1194      else if( selector.isAll() )
1195        incomingFields = incomingScope.getIncomingSpliceFields();
1196      else if( selector.isGroup() )
1197        incomingFields = incomingScope.getOutGroupingFields();
1198      else if( selector.isValues() )
1199        incomingFields = incomingScope.getOutValuesFields().subtract( incomingScope.getOutGroupingFields() );
1200      else
1201        incomingFields = incomingScope.getIncomingSpliceFields().select( selector );
1202
1203      resolvedFields.put( incomingScope.getName(), incomingFields );
1204      }
1205
1206    return resolvedFields;
1207    }
1208
1209  Map<String, Fields> resolveSortingSelectors( Set<Scope> incomingScopes )
1210    {
1211    try
1212      {
1213      if( getSortingSelectors().isEmpty() )
1214        return null;
1215
1216      return resolveSelectorsAgainstIncoming( incomingScopes, getSortingSelectors(), "sorting" );
1217      }
1218    catch( FieldsResolverException exception )
1219      {
1220      throw new OperatorException( this, OperatorException.Kind.sorting, exception.getSourceFields(), exception.getSelectorFields(), exception );
1221      }
1222    catch( RuntimeException exception )
1223      {
1224      throw new OperatorException( this, "could not resolve sorting selector in: " + this, exception );
1225      }
1226    }
1227
1228  @Override
1229  public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
1230    {
1231    return incomingScope.getIncomingSpliceFields();
1232    }
1233
1234  Fields resolveDeclared( Set<Scope> incomingScopes )
1235    {
1236    try
1237      {
1238      Fields declaredFields = getJoinDeclaredFields();
1239
1240      // Fields.NONE is a flag to the CoGroup the following Buffer will use the JoinerClosure directly
1241      if( declaredFields != null && declaredFields.isNone() )
1242        {
1243        if( !isCoGroup() )
1244          throw new IllegalArgumentException( "Fields.NONE may only be declared as the join fields when using a CoGroup" );
1245
1246        return Fields.NONE;
1247        }
1248
1249      if( declaredFields != null ) // null for GroupBy
1250        {
1251        if( incomingScopes.size() != pipes.size() && isSelfJoin() )
1252          throw new OperatorException( this, "self joins without intermediate operators are not permitted, see 'numSelfJoins' constructor or identity function" );
1253
1254        int size = 0;
1255        boolean foundUnknown = false;
1256
1257        List<Fields> appendableFields = getOrderedResolvedFields( incomingScopes );
1258
1259        for( Fields fields : appendableFields )
1260          {
1261          foundUnknown = foundUnknown || fields.isUnknown();
1262          size += fields.size();
1263          }
1264
1265        // we must relax field checking in the face of unknown fields
1266        if( !foundUnknown && declaredFields.size() != size * ( numSelfJoins + 1 ) )
1267          {
1268          if( isSelfJoin() )
1269            throw new OperatorException( this, "declared grouped fields not same size as grouped values, declared: " + declaredFields.printVerbose() + " != size: " + size * ( numSelfJoins + 1 ) );
1270          else
1271            throw new OperatorException( this, "declared grouped fields not same size as grouped values, declared: " + declaredFields.printVerbose() + " resolved: " + Util.print( appendableFields, "" ) );
1272          }
1273
1274        int i = 0;
1275        for( Fields appendableField : appendableFields )
1276          {
1277          Type[] types = appendableField.getTypes();
1278
1279          if( types == null )
1280            {
1281            i += appendableField.size();
1282            continue;
1283            }
1284
1285          for( Type type : types )
1286            {
1287            if( type != null )
1288              declaredFields = declaredFields.applyType( i, type );
1289
1290            i++;
1291            }
1292          }
1293
1294        return declaredFields;
1295        }
1296
1297      // support merge or cogrouping here
1298      if( isGroupBy() || isMerge() )
1299        {
1300        Iterator<Scope> iterator = incomingScopes.iterator();
1301        Fields commonFields = iterator.next().getIncomingSpliceFields();
1302
1303        while( iterator.hasNext() )
1304          {
1305          Scope incomingScope = iterator.next();
1306          Fields fields = incomingScope.getIncomingSpliceFields();
1307
1308          if( !commonFields.equalsFields( fields ) )
1309            throw new OperatorException( this, "merged streams must declare the same field names, in the same order, expected: " + commonFields.printVerbose() + " found: " + fields.printVerbose() );
1310          }
1311
1312        return commonFields;
1313        }
1314      else
1315        {
1316        List<Fields> appendableFields = getOrderedResolvedFields( incomingScopes );
1317        Fields appendedFields = new Fields();
1318
1319        try
1320          {
1321          // will fail on name collisions
1322          for( Fields appendableField : appendableFields )
1323            appendedFields = appendedFields.append( appendableField );
1324          }
1325        catch( TupleException exception )
1326          {
1327          String fields = "";
1328
1329          for( Fields appendableField : appendableFields )
1330            fields += appendableField.print();
1331
1332          throw new OperatorException( this, "found duplicate field names in joined tuple stream: " + fields, exception );
1333          }
1334
1335        return appendedFields;
1336        }
1337      }
1338    catch( OperatorException exception )
1339      {
1340      throw exception;
1341      }
1342    catch( RuntimeException exception )
1343      {
1344      throw new OperatorException( this, "could not resolve declared fields in: " + this, exception );
1345      }
1346    }
1347
1348  public Fields getJoinDeclaredFields()
1349    {
1350    Fields declaredFields = getDeclaredFields();
1351
1352    if( !( joiner instanceof DeclaresResults ) )
1353      return declaredFields;
1354
1355    if( declaredFields == null && ( (DeclaresResults) joiner ).getFieldDeclaration() != null )
1356      declaredFields = ( (DeclaresResults) joiner ).getFieldDeclaration();
1357
1358    return declaredFields;
1359    }
1360
1361  private List<Fields> getOrderedResolvedFields( Set<Scope> incomingScopes )
1362    {
1363    Map<String, Scope> scopesMap = new HashMap<String, Scope>();
1364
1365    for( Scope incomingScope : incomingScopes )
1366      scopesMap.put( incomingScope.getName(), incomingScope );
1367
1368    List<Fields> appendableFields = new ArrayList<Fields>();
1369
1370    for( Pipe pipe : pipes )
1371      appendableFields.add( scopesMap.get( pipe.getName() ).getIncomingSpliceFields() );
1372    return appendableFields;
1373    }
1374
1375  @Override
1376  public boolean isEquivalentTo( FlowElement element )
1377    {
1378    boolean equivalentTo = super.isEquivalentTo( element );
1379
1380    if( !equivalentTo )
1381      return equivalentTo;
1382
1383    Splice splice = (Splice) element;
1384
1385    if( !keyFieldsMap.equals( splice.keyFieldsMap ) )
1386      return false;
1387
1388    if( !pipes.equals( splice.pipes ) )
1389      return false;
1390
1391    return true;
1392    }
1393
1394  // OBJECT OVERRIDES
1395
1396  @Override
1397  @SuppressWarnings({"RedundantIfStatement"})
1398  public boolean equals( Object object )
1399    {
1400    if( this == object )
1401      return true;
1402    if( object == null || getClass() != object.getClass() )
1403      return false;
1404    if( !super.equals( object ) )
1405      return false;
1406
1407    Splice splice = (Splice) object;
1408
1409    if( spliceName != null ? !spliceName.equals( splice.spliceName ) : splice.spliceName != null )
1410      return false;
1411    if( keyFieldsMap != null ? !keyFieldsMap.equals( splice.keyFieldsMap ) : splice.keyFieldsMap != null )
1412      return false;
1413    if( pipes != null ? !pipes.equals( splice.pipes ) : splice.pipes != null )
1414      return false;
1415
1416    return true;
1417    }
1418
1419  @Override
1420  public int hashCode()
1421    {
1422    int result = super.hashCode();
1423    result = 31 * result + ( pipes != null ? pipes.hashCode() : 0 );
1424    result = 31 * result + ( keyFieldsMap != null ? keyFieldsMap.hashCode() : 0 );
1425    result = 31 * result + ( spliceName != null ? spliceName.hashCode() : 0 );
1426    return result;
1427    }
1428
1429  @Override
1430  public String toString()
1431    {
1432    StringBuilder buffer = new StringBuilder( super.toString() );
1433
1434    buffer.append( "[by:" );
1435
1436    for( String name : keyFieldsMap.keySet() )
1437      {
1438      if( keyFieldsMap.size() > 1 )
1439        buffer.append( " " ).append( name ).append( ":" );
1440
1441      buffer.append( keyFieldsMap.get( name ).printVerbose() );
1442      }
1443
1444    if( isSelfJoin() )
1445      buffer.append( "[numSelfJoins:" ).append( numSelfJoins ).append( "]" );
1446
1447    buffer.append( "]" );
1448
1449    return buffer.toString();
1450    }
1451
1452  @Override
1453  protected void printInternal( StringBuffer buffer, Scope scope )
1454    {
1455    super.printInternal( buffer, scope );
1456    Map<String, Fields> map = scope.getKeySelectors();
1457
1458    if( map != null )
1459      {
1460      buffer.append( "[by:" );
1461
1462      // important to retain incoming pipe order
1463      for( Map.Entry<String, Fields> entry : keyFieldsMap.entrySet() )
1464        {
1465        String name = entry.getKey();
1466
1467        if( map.size() > 1 )
1468          buffer.append( name ).append( ":" );
1469
1470        Fields keys = map.get( name );
1471
1472        // if keys null, this is likely an edge contracted map
1473        if( keys == null )
1474          buffer.append( "<unavailable>" );
1475        else
1476          buffer.append( keys.print() ); // get resolved keys
1477        }
1478
1479      if( isSelfJoin() )
1480        buffer.append( "[numSelfJoins:" ).append( numSelfJoins ).append( "]" );
1481
1482      buffer.append( "]" );
1483      }
1484    }
1485  }