001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.pipe;
022    
023    import java.lang.reflect.Type;
024    import java.util.ArrayList;
025    import java.util.Comparator;
026    import java.util.HashMap;
027    import java.util.HashSet;
028    import java.util.Iterator;
029    import java.util.LinkedHashMap;
030    import java.util.List;
031    import java.util.Map;
032    import java.util.Set;
033    
034    import cascading.flow.FlowElement;
035    import cascading.flow.planner.DeclaresResults;
036    import cascading.flow.planner.Scope;
037    import cascading.pipe.joiner.BufferJoin;
038    import cascading.pipe.joiner.InnerJoin;
039    import cascading.pipe.joiner.Joiner;
040    import cascading.tuple.Fields;
041    import cascading.tuple.FieldsResolverException;
042    import cascading.tuple.TupleException;
043    import cascading.tuple.coerce.Coercions;
044    import cascading.tuple.type.CoercibleType;
045    import cascading.util.Util;
046    
047    import static java.util.Arrays.asList;
048    
049    /**
050     * The base class for {@link GroupBy}, {@link CoGroup}, {@link Merge}, and {@link HashJoin}. This class should not be used directly.
051     *
052     * @see GroupBy
053     * @see CoGroup
054     * @see Merge
055     * @see HashJoin
056     */
057    public class Splice extends Pipe
058      {
059      static enum Kind
060        {
061          GroupBy, CoGroup, Merge, Join
062        }
063    
064      private Kind kind;
065      /** Field spliceName */
066      private String spliceName;
067      /** Field pipes */
068      private final List<Pipe> pipes = new ArrayList<Pipe>();
069      /** Field groupFieldsMap */
070      protected final Map<String, Fields> keyFieldsMap = new LinkedHashMap<String, Fields>(); // keep order
071      /** Field sortFieldsMap */
072      protected Map<String, Fields> sortFieldsMap = new LinkedHashMap<String, Fields>(); // keep order
073      /** Field reverseOrder */
074      private boolean reverseOrder = false;
075      /** Field declaredFields */
076      protected Fields declaredFields;
077      /** Field resultGroupFields */
078      protected Fields resultGroupFields;
079      /** Field repeat */
080      private int numSelfJoins = 0;
081      /** Field coGrouper */
082      private Joiner joiner;
083    
084      /** Field pipePos */
085      private transient Map<String, Integer> pipePos;
086    
087      /**
088       * Constructor Splice creates a new Splice instance.
089       *
090       * @param lhs            of type Pipe
091       * @param lhsGroupFields of type Fields
092       * @param rhs            of type Pipe
093       * @param rhsGroupFields of type Fields
094       * @param declaredFields of type Fields
095       */
096      protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields )
097        {
098        this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, null, null );
099        }
100    
101      /**
102       * Constructor Splice creates a new Splice instance.
103       *
104       * @param lhs               of type Pipe
105       * @param lhsGroupFields    of type Fields
106       * @param rhs               of type Pipe
107       * @param rhsGroupFields    of type Fields
108       * @param declaredFields    of type Fields
109       * @param resultGroupFields of type Fields
110       */
111      protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields )
112        {
113        this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields, null );
114        }
115    
116      /**
117       * Constructor Splice creates a new Splice instance.
118       *
119       * @param lhs            of type Pipe
120       * @param lhsGroupFields of type Fields
121       * @param rhs            of type Pipe
122       * @param rhsGroupFields of type Fields
123       * @param declaredFields of type Fields
124       * @param joiner         of type CoGrouper
125       */
126      protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Joiner joiner )
127        {
128        this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ), declaredFields, joiner );
129        }
130    
131      /**
132       * Constructor Splice creates a new Splice instance.
133       *
134       * @param lhs               of type Pipe
135       * @param lhsGroupFields    of type Fields
136       * @param rhs               of type Pipe
137       * @param rhsGroupFields    of type Fields
138       * @param declaredFields    of type Fields
139       * @param resultGroupFields of type Fields
140       * @param joiner            of type Joiner
141       */
142      protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner )
143        {
144        this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ), declaredFields, resultGroupFields, joiner );
145        }
146    
147      /**
148       * Constructor Splice creates a new Splice instance.
149       *
150       * @param lhs            of type Pipe
151       * @param lhsGroupFields of type Fields
152       * @param rhs            of type Pipe
153       * @param rhsGroupFields of type Fields
154       * @param joiner         of type CoGrouper
155       */
156      protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Joiner joiner )
157        {
158        this( lhs, lhsGroupFields, rhs, rhsGroupFields, null, joiner );
159        }
160    
161      /**
162       * Constructor Splice creates a new Splice instance.
163       *
164       * @param lhs            of type Pipe
165       * @param lhsGroupFields of type Fields
166       * @param rhs            of type Pipe
167       * @param rhsGroupFields of type Fields
168       */
169      protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields )
170        {
171        this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ) );
172        }
173    
174      /**
175       * Constructor Splice creates a new Splice instance.
176       *
177       * @param pipes of type Pipe...
178       */
179      protected Splice( Pipe... pipes )
180        {
181        this( pipes, (Fields[]) null );
182        }
183    
184      /**
185       * Constructor Splice creates a new Splice instance.
186       *
187       * @param pipes       of type Pipe[]
188       * @param groupFields of type Fields[]
189       */
190      protected Splice( Pipe[] pipes, Fields[] groupFields )
191        {
192        this( null, pipes, groupFields, null, null );
193        }
194    
195      /**
196       * Constructor Splice creates a new Splice instance.
197       *
198       * @param spliceName  of type String
199       * @param pipes       of type Pipe[]
200       * @param groupFields of type Fields[]
201       */
202      protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields )
203        {
204        this( spliceName, pipes, groupFields, null, null );
205        }
206    
207      /**
208       * Constructor Splice creates a new Splice instance.
209       *
210       * @param spliceName     of type String
211       * @param pipes          of type Pipe[]
212       * @param groupFields    of type Fields[]
213       * @param declaredFields of type Fields
214       */
215      protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields )
216        {
217        this( spliceName, pipes, groupFields, declaredFields, null );
218        }
219    
220      /**
221       * Constructor Splice creates a new Splice instance.
222       *
223       * @param spliceName        of type String
224       * @param pipes             of type Pipe[]
225       * @param groupFields       of type Fields[]
226       * @param declaredFields    of type Fields
227       * @param resultGroupFields of type Fields
228       */
229      protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields )
230        {
231        this( spliceName, pipes, groupFields, declaredFields, resultGroupFields, null );
232        }
233    
234      /**
235       * Constructor Splice creates a new Splice instance.
236       *
237       * @param pipes          of type Pipe[]
238       * @param groupFields    of type Fields[]
239       * @param declaredFields of type Fields
240       * @param joiner         of type CoGrouper
241       */
242      protected Splice( Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Joiner joiner )
243        {
244        this( null, pipes, groupFields, declaredFields, null, joiner );
245        }
246    
247      /**
248       * Constructor Splice creates a new Splice instance.
249       *
250       * @param pipes             of type Pipe[]
251       * @param groupFields       of type Fields[]
252       * @param declaredFields    of type Fields
253       * @param resultGroupFields of type Fields
254       * @param joiner            of type Joiner
255       */
256      protected Splice( Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner )
257        {
258        this( null, pipes, groupFields, declaredFields, resultGroupFields, joiner );
259        }
260    
261      /**
262       * Constructor Splice creates a new Splice instance.
263       *
264       * @param spliceName     of type String
265       * @param pipes          of type Pipe[]
266       * @param groupFields    of type Fields[]
267       * @param declaredFields of type Fields
268       * @param joiner         of type CoGrouper
269       */
270      protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner )
271        {
272        if( pipes == null )
273          throw new IllegalArgumentException( "pipes array may not be null" );
274    
275        setKind();
276        this.spliceName = spliceName;
277    
278        int uniques = new HashSet<Pipe>( asList( Pipe.resolvePreviousAll( pipes ) ) ).size();
279    
280        if( pipes.length > 1 && uniques == 1 )
281          {
282          if( isMerge() )
283            throw new IllegalArgumentException( "may not merge a pipe with itself without intermediate operations after the split" );
284    
285          if( groupFields == null )
286            throw new IllegalArgumentException( "groupFields array may not be null" );
287    
288          if( new HashSet<Fields>( asList( groupFields ) ).size() != 1 )
289            throw new IllegalArgumentException( "all groupFields must be identical" );
290    
291          addPipe( pipes[ 0 ] );
292          this.numSelfJoins = pipes.length - 1;
293          this.keyFieldsMap.put( pipes[ 0 ].getName(), groupFields[ 0 ] );
294    
295          if( resultGroupFields != null && groupFields[ 0 ].size() * pipes.length != resultGroupFields.size() )
296            throw new IllegalArgumentException( "resultGroupFields and cogroup joined fields must be same size" );
297          }
298        else
299          {
300          int last = -1;
301          for( int i = 0; i < pipes.length; i++ )
302            {
303            addPipe( pipes[ i ] );
304    
305            if( groupFields == null || groupFields.length == 0 )
306              {
307              addGroupFields( pipes[ i ], Fields.FIRST );
308              continue;
309              }
310    
311            if( last != -1 && last != groupFields[ i ].size() )
312              throw new IllegalArgumentException( "all groupFields must be same size" );
313    
314            last = groupFields[ i ].size();
315            addGroupFields( pipes[ i ], groupFields[ i ] );
316            }
317    
318          if( resultGroupFields != null && last * pipes.length != resultGroupFields.size() )
319            throw new IllegalArgumentException( "resultGroupFields and cogroup resulting joined fields must be same size" );
320          }
321    
322        this.declaredFields = declaredFields;
323        this.resultGroupFields = resultGroupFields;
324        this.joiner = joiner;
325    
326        verifyCoGrouper();
327        }
328    
329      /**
330       * Constructor Splice creates a new Splice instance.
331       *
332       * @param spliceName     of type String
333       * @param lhs            of type Pipe
334       * @param lhsGroupFields of type Fields
335       * @param rhs            of type Pipe
336       * @param rhsGroupFields of type Fields
337       * @param declaredFields of type Fields
338       */
339      protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields )
340        {
341        this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields );
342        this.spliceName = spliceName;
343        }
344    
345      /**
346       * Constructor Splice creates a new Splice instance.
347       *
348       * @param spliceName        of type String
349       * @param lhs               of type Pipe
350       * @param lhsGroupFields    of type Fields
351       * @param rhs               of type Pipe
352       * @param rhsGroupFields    of type Fields
353       * @param declaredFields    of type Fields
354       * @param resultGroupFields of type Fields
355       */
356      protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields )
357        {
358        this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields );
359        this.spliceName = spliceName;
360        }
361    
362      /**
363       * Constructor Splice creates a new Splice instance.
364       *
365       * @param spliceName     of type String
366       * @param lhs            of type Pipe
367       * @param lhsGroupFields of type Fields
368       * @param rhs            of type Pipe
369       * @param rhsGroupFields of type Fields
370       * @param declaredFields of type Fields
371       * @param joiner         of type CoGrouper
372       */
373      protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Joiner joiner )
374        {
375        this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, joiner );
376        this.spliceName = spliceName;
377        }
378    
379      /**
380       * Constructor Splice creates a new Splice instance.
381       *
382       * @param spliceName        of type String
383       * @param lhs               of type Pipe
384       * @param lhsGroupFields    of type Fields
385       * @param rhs               of type Pipe
386       * @param rhsGroupFields    of type Fields
387       * @param declaredFields    of type Fields
388       * @param resultGroupFields of type Fields
389       * @param joiner            of type Joiner
390       */
391      protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner )
392        {
393        this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields, joiner );
394        this.spliceName = spliceName;
395        }
396    
397      /**
398       * Constructor Splice creates a new Splice instance.
399       *
400       * @param spliceName     of type String
401       * @param lhs            of type Pipe
402       * @param lhsGroupFields of type Fields
403       * @param rhs            of type Pipe
404       * @param rhsGroupFields of type Fields
405       * @param joiner         of type CoGrouper
406       */
407      protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Joiner joiner )
408        {
409        this( lhs, lhsGroupFields, rhs, rhsGroupFields, joiner );
410        this.spliceName = spliceName;
411        }
412    
413      /**
414       * Constructor Splice creates a new Splice instance.
415       *
416       * @param spliceName     of type String
417       * @param lhs            of type Pipe
418       * @param lhsGroupFields of type Fields
419       * @param rhs            of type Pipe
420       * @param rhsGroupFields of type Fields
421       */
422      protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields )
423        {
424        this( lhs, lhsGroupFields, rhs, rhsGroupFields );
425        this.spliceName = spliceName;
426        }
427    
428      /**
429       * Constructor Splice creates a new Splice instance.
430       *
431       * @param spliceName of type String
432       * @param pipes      of type Pipe...
433       */
434      protected Splice( String spliceName, Pipe... pipes )
435        {
436        this( pipes );
437        this.spliceName = spliceName;
438        }
439    
440      /**
441       * Constructor Splice creates a new Splice instance.
442       *
443       * @param pipe           of type Pipe
444       * @param groupFields    of type Fields
445       * @param numSelfJoins   of type int
446       * @param declaredFields of type Fields
447       */
448      protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields )
449        {
450        this( pipe, groupFields, numSelfJoins );
451        this.declaredFields = declaredFields;
452        }
453    
454      /**
455       * Constructor Splice creates a new Splice instance.
456       *
457       * @param pipe              of type Pipe
458       * @param groupFields       of type Fields
459       * @param numSelfJoins      of type int
460       * @param declaredFields    of type Fields
461       * @param resultGroupFields of type Fields
462       */
463      protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields )
464        {
465        this( pipe, groupFields, numSelfJoins );
466        this.declaredFields = declaredFields;
467        this.resultGroupFields = resultGroupFields;
468    
469        if( resultGroupFields != null && groupFields.size() * numSelfJoins != resultGroupFields.size() )
470          throw new IllegalArgumentException( "resultGroupFields and cogroup resulting join fields must be same size" );
471        }
472    
473      /**
474       * Constructor Splice creates a new Splice instance.
475       *
476       * @param pipe           of type Pipe
477       * @param groupFields    of type Fields
478       * @param numSelfJoins   of type int
479       * @param declaredFields of type Fields
480       * @param joiner         of type CoGrouper
481       */
482      protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Joiner joiner )
483        {
484        this( pipe, groupFields, numSelfJoins, declaredFields );
485        this.joiner = joiner;
486    
487        verifyCoGrouper();
488        }
489    
490      /**
491       * Constructor Splice creates a new Splice instance.
492       *
493       * @param pipe              of type Pipe
494       * @param groupFields       of type Fields
495       * @param numSelfJoins      of type int
496       * @param declaredFields    of type Fields
497       * @param resultGroupFields of type Fields
498       * @param joiner            of type Joiner
499       */
500      protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields, Joiner joiner )
501        {
502        this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields );
503        this.joiner = joiner;
504    
505        verifyCoGrouper();
506        }
507    
508      /**
509       * Constructor Splice creates a new Splice instance.
510       *
511       * @param pipe         of type Pipe
512       * @param groupFields  of type Fields
513       * @param numSelfJoins of type int
514       * @param joiner       of type CoGrouper
515       */
516      protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Joiner joiner )
517        {
518        setKind();
519        addPipe( pipe );
520        this.keyFieldsMap.put( pipe.getName(), groupFields );
521        this.numSelfJoins = numSelfJoins;
522        this.joiner = joiner;
523    
524        verifyCoGrouper();
525        }
526    
527      /**
528       * Constructor Splice creates a new Splice instance.
529       *
530       * @param pipe         of type Pipe
531       * @param groupFields  of type Fields
532       * @param numSelfJoins of type int
533       */
534      protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins )
535        {
536        this( pipe, groupFields, numSelfJoins, (Joiner) null );
537        }
538    
539      /**
540       * Constructor Splice creates a new Splice instance.
541       *
542       * @param spliceName     of type String
543       * @param pipe           of type Pipe
544       * @param groupFields    of type Fields
545       * @param numSelfJoins   of type int
546       * @param declaredFields of type Fields
547       */
548      protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields )
549        {
550        this( pipe, groupFields, numSelfJoins, declaredFields );
551        this.spliceName = spliceName;
552        }
553    
554      /**
555       * Constructor Splice creates a new Splice instance.
556       *
557       * @param spliceName        of type String
558       * @param pipe              of type Pipe
559       * @param groupFields       of type Fields
560       * @param numSelfJoins      of type int
561       * @param declaredFields    of type Fields
562       * @param resultGroupFields of type Fields
563       */
564      protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields )
565        {
566        this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields );
567        this.spliceName = spliceName;
568        }
569    
570      /**
571       * Constructor Splice creates a new Splice instance.
572       *
573       * @param spliceName     of type String
574       * @param pipe           of type Pipe
575       * @param groupFields    of type Fields
576       * @param numSelfJoins   of type int
577       * @param declaredFields of type Fields
578       * @param joiner         of type CoGrouper
579       */
580      protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Joiner joiner )
581        {
582        this( pipe, groupFields, numSelfJoins, declaredFields, joiner );
583        this.spliceName = spliceName;
584        }
585    
586      /**
587       * Constructor Splice creates a new Splice instance.
588       *
589       * @param spliceName        of type String
590       * @param pipe              of type Pipe
591       * @param groupFields       of type Fields
592       * @param numSelfJoins      of type int
593       * @param declaredFields    of type Fields
594       * @param resultGroupFields of type Fields
595       * @param joiner            of type Joiner
596       */
597      protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields, Joiner joiner )
598        {
599        this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields, joiner );
600        this.spliceName = spliceName;
601        }
602    
603      /**
604       * Constructor Splice creates a new Splice instance.
605       *
606       * @param spliceName   of type String
607       * @param pipe         of type Pipe
608       * @param groupFields  of type Fields
609       * @param numSelfJoins of type int
610       * @param joiner       of type CoGrouper
611       */
612      protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Joiner joiner )
613        {
614        this( pipe, groupFields, numSelfJoins, joiner );
615        this.spliceName = spliceName;
616        }
617    
618      /**
619       * Constructor Splice creates a new Splice instance.
620       *
621       * @param spliceName   of type String
622       * @param pipe         of type Pipe
623       * @param groupFields  of type Fields
624       * @param numSelfJoins of type int
625       */
626      protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins )
627        {
628        this( pipe, groupFields, numSelfJoins );
629        this.spliceName = spliceName;
630        }
631    
632      ////////////
633      // GROUPBY
634      ////////////
635    
636      /**
637       * Constructor Splice creates a new Splice instance where grouping occurs on {@link Fields#ALL} fields.
638       *
639       * @param pipe of type Pipe
640       */
641      protected Splice( Pipe pipe )
642        {
643        this( null, pipe, Fields.ALL, null, false );
644        }
645    
646      /**
647       * Constructor Splice creates a new Splice instance.
648       *
649       * @param pipe        of type Pipe
650       * @param groupFields of type Fields
651       */
652      protected Splice( Pipe pipe, Fields groupFields )
653        {
654        this( null, pipe, groupFields, null, false );
655        }
656    
657      /**
658       * Constructor Splice creates a new Splice instance.
659       *
660       * @param spliceName  of type String
661       * @param pipe        of type Pipe
662       * @param groupFields of type Fields
663       */
664      protected Splice( String spliceName, Pipe pipe, Fields groupFields )
665        {
666        this( spliceName, pipe, groupFields, null, false );
667        }
668    
669      /**
670       * Constructor Splice creates a new Splice instance.
671       *
672       * @param pipe        of type Pipe
673       * @param groupFields of type Fields
674       * @param sortFields  of type Fields
675       */
676      protected Splice( Pipe pipe, Fields groupFields, Fields sortFields )
677        {
678        this( null, pipe, groupFields, sortFields, false );
679        }
680    
681      /**
682       * Constructor Splice creates a new Splice instance.
683       *
684       * @param spliceName  of type String
685       * @param pipe        of type Pipe
686       * @param groupFields of type Fields
687       * @param sortFields  of type Fields
688       */
689      protected Splice( String spliceName, Pipe pipe, Fields groupFields, Fields sortFields )
690        {
691        this( spliceName, pipe, groupFields, sortFields, false );
692        }
693    
694      /**
695       * Constructor Splice creates a new Splice instance.
696       *
697       * @param pipe         of type Pipe
698       * @param groupFields  of type Fields
699       * @param sortFields   of type Fields
700       * @param reverseOrder of type boolean
701       */
702      protected Splice( Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder )
703        {
704        this( null, pipe, groupFields, sortFields, reverseOrder );
705        }
706    
707      /**
708       * Constructor Splice creates a new Splice instance.
709       *
710       * @param spliceName   of type String
711       * @param pipe         of type Pipe
712       * @param groupFields  of type Fields
713       * @param sortFields   of type Fields
714       * @param reverseOrder of type boolean
715       */
716      protected Splice( String spliceName, Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder )
717        {
718        this( spliceName, Pipe.pipes( pipe ), groupFields, sortFields, reverseOrder );
719        }
720    
721      /**
722       * Constructor Splice creates a new Splice instance.
723       *
724       * @param pipes       of type Pipe
725       * @param groupFields of type Fields
726       */
727      protected Splice( Pipe[] pipes, Fields groupFields )
728        {
729        this( null, pipes, groupFields, null, false );
730        }
731    
732      /**
733       * Constructor Splice creates a new Splice instance.
734       *
735       * @param spliceName  of type String
736       * @param pipes       of type Pipe
737       * @param groupFields of type Fields
738       */
739      protected Splice( String spliceName, Pipe[] pipes, Fields groupFields )
740        {
741        this( spliceName, pipes, groupFields, null, false );
742        }
743    
744      /**
745       * Constructor Splice creates a new Splice instance.
746       *
747       * @param pipes       of type Pipe
748       * @param groupFields of type Fields
749       * @param sortFields  of type Fields
750       */
751      protected Splice( Pipe[] pipes, Fields groupFields, Fields sortFields )
752        {
753        this( null, pipes, groupFields, sortFields, false );
754        }
755    
756      /**
757       * Constructor Splice creates a new Splice instance.
758       *
759       * @param spliceName  of type String
760       * @param pipe        of type Pipe
761       * @param groupFields of type Fields
762       * @param sortFields  of type Fields
763       */
764      protected Splice( String spliceName, Pipe[] pipe, Fields groupFields, Fields sortFields )
765        {
766        this( spliceName, pipe, groupFields, sortFields, false );
767        }
768    
769      /**
770       * Constructor Splice creates a new Splice instance.
771       *
772       * @param pipes        of type Pipe
773       * @param groupFields  of type Fields
774       * @param sortFields   of type Fields
775       * @param reverseOrder of type boolean
776       */
777      protected Splice( Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder )
778        {
779        this( null, pipes, groupFields, sortFields, reverseOrder );
780        }
781    
782      /**
783       * Constructor Splice creates a new Splice instance.
784       *
785       * @param spliceName   of type String
786       * @param pipes        of type Pipe[]
787       * @param groupFields  of type Fields
788       * @param sortFields   of type Fields
789       * @param reverseOrder of type boolean
790       */
791      protected Splice( String spliceName, Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder )
792        {
793        if( pipes == null )
794          throw new IllegalArgumentException( "pipes array may not be null" );
795    
796        if( groupFields == null )
797          throw new IllegalArgumentException( "groupFields may not be null" );
798    
799        setKind();
800        this.spliceName = spliceName;
801    
802        for( Pipe pipe : pipes )
803          {
804          addPipe( pipe );
805          this.keyFieldsMap.put( pipe.getName(), groupFields );
806    
807          if( sortFields != null )
808            this.sortFieldsMap.put( pipe.getName(), sortFields );
809          }
810    
811        this.reverseOrder = reverseOrder;
812        this.joiner = new InnerJoin();
813        }
814    
815      private void verifyCoGrouper()
816        {
817        if( isJoin() && joiner instanceof BufferJoin )
818          throw new IllegalArgumentException( "invalid joiner, may not use BufferJoiner in a HashJoin" );
819    
820        if( joiner == null )
821          {
822          joiner = new InnerJoin();
823          return;
824          }
825    
826        if( joiner.numJoins() == -1 )
827          return;
828    
829        int joins = Math.max( numSelfJoins, keyFieldsMap.size() - 1 ); // joining two streams is one join
830    
831        if( joins != joiner.numJoins() )
832          throw new IllegalArgumentException( "invalid joiner, only accepts " + joiner.numJoins() + " joins, there are: " + joins );
833        }
834    
835      private void setKind()
836        {
837        if( this instanceof GroupBy )
838          kind = Kind.GroupBy;
839        else if( this instanceof CoGroup )
840          kind = Kind.CoGroup;
841        else if( this instanceof Merge )
842          kind = Kind.Merge;
843        else
844          kind = Kind.Join;
845        }
846    
847      /**
848       * Method getDeclaredFields returns the declaredFields of this Splice object.
849       *
850       * @return the declaredFields (type Fields) of this Splice object.
851       */
852      public Fields getDeclaredFields()
853        {
854        return declaredFields;
855        }
856    
857      private void addPipe( Pipe pipe )
858        {
859        if( pipe.getName() == null )
860          throw new IllegalArgumentException( "each input pipe must have a name" );
861    
862        pipes.add( pipe ); // allow same pipe
863        }
864    
865      private void addGroupFields( Pipe pipe, Fields fields )
866        {
867        if( keyFieldsMap.containsKey( pipe.getName() ) )
868          throw new IllegalArgumentException( "each input pipe branch must be uniquely named" );
869    
870        keyFieldsMap.put( pipe.getName(), fields );
871        }
872    
873      @Override
874      public String getName()
875        {
876        if( spliceName != null )
877          return spliceName;
878    
879        StringBuffer buffer = new StringBuffer();
880    
881        for( Pipe pipe : pipes )
882          {
883          if( buffer.length() != 0 )
884            {
885            if( isGroupBy() || isMerge() )
886              buffer.append( "+" );
887            else if( isCoGroup() || isJoin() )
888              buffer.append( "*" ); // more semantically correct
889            }
890    
891          buffer.append( pipe.getName() );
892          }
893    
894        spliceName = buffer.toString();
895    
896        return spliceName;
897        }
898    
899      @Override
900      public Pipe[] getPrevious()
901        {
902        return pipes.toArray( new Pipe[ pipes.size() ] );
903        }
904    
905      /**
906       * Method getGroupingSelectors returns the groupingSelectors of this Splice object.
907       *
908       * @return the groupingSelectors (type Map<String, Fields>) of this Splice object.
909       */
910      public Map<String, Fields> getKeySelectors()
911        {
912        return keyFieldsMap;
913        }
914    
915      /**
916       * Method getSortingSelectors returns the sortingSelectors of this Splice object.
917       *
918       * @return the sortingSelectors (type Map<String, Fields>) of this Splice object.
919       */
920      public Map<String, Fields> getSortingSelectors()
921        {
922        return sortFieldsMap;
923        }
924    
925      /**
926       * Method isSorted returns true if this Splice instance is sorting values other than the group fields.
927       *
928       * @return the sorted (type boolean) of this Splice object.
929       */
930      public boolean isSorted()
931        {
932        return !sortFieldsMap.isEmpty();
933        }
934    
935      /**
936       * Method isSortReversed returns true if sorting is reversed.
937       *
938       * @return the sortReversed (type boolean) of this Splice object.
939       */
940      public boolean isSortReversed()
941        {
942        return reverseOrder;
943        }
944    
945      public synchronized Map<String, Integer> getPipePos()
946        {
947        if( pipePos != null )
948          return pipePos;
949    
950        pipePos = new HashMap<String, Integer>();
951    
952        int pos = 0;
953        for( Object pipe : pipes )
954          pipePos.put( ( (Pipe) pipe ).getName(), pos++ );
955    
956        return pipePos;
957        }
958    
959      public Joiner getJoiner()
960        {
961        return joiner;
962        }
963    
964      /**
965       * Method isGroupBy returns true if this Splice instance will perform a GroupBy operation.
966       *
967       * @return the groupBy (type boolean) of this Splice object.
968       */
969      public final boolean isGroupBy()
970        {
971        return kind == Kind.GroupBy;
972        }
973    
974      public final boolean isCoGroup()
975        {
976        return kind == Kind.CoGroup;
977        }
978    
979      public final boolean isMerge()
980        {
981        return kind == Kind.Merge;
982        }
983    
984      public final boolean isJoin()
985        {
986        return kind == Kind.Join;
987        }
988    
989      public int getNumSelfJoins()
990        {
991        return numSelfJoins;
992        }
993    
994      boolean isSelfJoin()
995        {
996        return numSelfJoins != 0;
997        }
998    
999      // FIELDS
1000    
1001      @Override
1002      public Scope outgoingScopeFor( Set<Scope> incomingScopes )
1003        {
1004        Map<String, Fields> groupingSelectors = resolveGroupingSelectors( incomingScopes );
1005        Map<String, Fields> sortingSelectors = resolveSortingSelectors( incomingScopes );
1006        Fields declared = resolveDeclared( incomingScopes );
1007    
1008        Fields outGroupingFields = resultGroupFields;
1009    
1010        if( outGroupingFields == null && isCoGroup() )
1011          outGroupingFields = createJoinFields( incomingScopes, groupingSelectors, declared );
1012    
1013        // for Group, the outgoing fields are the same as those declared
1014        return new Scope( getName(), declared, outGroupingFields, groupingSelectors, sortingSelectors, declared, isGroupBy() );
1015        }
1016    
1017      private Fields createJoinFields( Set<Scope> incomingScopes, Map<String, Fields> groupingSelectors, Fields declared )
1018        {
1019        if( declared.isNone() )
1020          declared = Fields.UNKNOWN;
1021    
1022        Map<String, Fields> incomingFields = new HashMap<String, Fields>();
1023    
1024        for( Scope scope : incomingScopes )
1025          incomingFields.put( scope.getName(), scope.getIncomingSpliceFields() );
1026    
1027        Fields outGroupingFields = Fields.NONE;
1028    
1029        int offset = 0;
1030        for( Pipe pipe : pipes ) // need to retain order of pipes
1031          {
1032          String pipeName = pipe.getName();
1033          Fields pipeGroupingSelector = groupingSelectors.get( pipeName );
1034          Fields incomingField = incomingFields.get( pipeName );
1035    
1036          if( !pipeGroupingSelector.isNone() )
1037            {
1038            Fields offsetFields = incomingField.selectPos( pipeGroupingSelector, offset );
1039            Fields resolvedSelect = declared.select( offsetFields );
1040    
1041            outGroupingFields = outGroupingFields.append( resolvedSelect );
1042            }
1043    
1044          offset += incomingField.size();
1045          }
1046    
1047        return outGroupingFields;
1048        }
1049    
1050      Map<String, Fields> resolveGroupingSelectors( Set<Scope> incomingScopes )
1051        {
1052        try
1053          {
1054          Map<String, Fields> groupingSelectors = getKeySelectors();
1055          Map<String, Fields> groupingFields = resolveSelectorsAgainstIncoming( incomingScopes, groupingSelectors, "grouping" );
1056    
1057          if( !verifySameSize( groupingFields ) )
1058            throw new OperatorException( this, "all grouping fields must be same size: " + toString() );
1059    
1060          verifySameTypes( groupingSelectors, groupingFields );
1061    
1062          return groupingFields;
1063          }
1064        catch( FieldsResolverException exception )
1065          {
1066          throw new OperatorException( this, OperatorException.Kind.grouping, exception.getSourceFields(), exception.getSelectorFields(), exception );
1067          }
1068        catch( RuntimeException exception )
1069          {
1070          throw new OperatorException( this, "could not resolve grouping selector in: " + this, exception );
1071          }
1072        }
1073    
1074      private boolean verifySameTypes( Map<String, Fields> groupingSelectors, Map<String, Fields> groupingFields )
1075        {
1076        // create array of field positions with comparators from the grouping selectors
1077        // unsure which side has the comparators declared so make a union
1078        boolean[] hasComparator = new boolean[ groupingFields.values().iterator().next().size() ];
1079    
1080        for( Map.Entry<String, Fields> entry : groupingSelectors.entrySet() )
1081          {
1082          Comparator[] comparatorsArray = entry.getValue().getComparators();
1083    
1084          for( int i = 0; i < comparatorsArray.length; i++ )
1085            hasComparator[ i ] = hasComparator[ i ] || comparatorsArray[ i ] != null;
1086          }
1087    
1088        // compare all the rhs fields with the lhs (lhs and rhs are arbitrary here)
1089        Iterator<Fields> iterator = groupingFields.values().iterator();
1090        Fields lhsFields = iterator.next();
1091        Type[] lhsTypes = lhsFields.getTypes();
1092    
1093        // if types are null, no basis for comparison
1094        if( lhsTypes == null )
1095          return true;
1096    
1097        while( iterator.hasNext() )
1098          {
1099          Fields rhsFields = iterator.next();
1100          Type[] rhsTypes = rhsFields.getTypes();
1101    
1102          // if types are null, no basis for comparison
1103          if( rhsTypes == null )
1104            return true;
1105    
1106          for( int i = 0; i < lhsTypes.length; i++ )
1107            {
1108            if( hasComparator[ i ] )
1109              continue;
1110    
1111            Type lhs = lhsTypes[ i ];
1112            Type rhs = rhsTypes[ i ];
1113    
1114            lhs = getCanonicalType( lhs );
1115            rhs = getCanonicalType( rhs );
1116    
1117            if( lhs.equals( rhs ) )
1118              continue;
1119    
1120            Fields lhsError = new Fields( lhsFields.get( i ), lhsFields.getType( i ) );
1121            Fields rhsError = new Fields( rhsFields.get( i ), rhsFields.getType( i ) );
1122    
1123            throw new OperatorException( this, "grouping fields must declare same types:" + lhsError.printVerbose() + " not same as " + rhsError.printVerbose() );
1124            }
1125          }
1126    
1127        return true;
1128        }
1129    
1130      private Type getCanonicalType( Type type )
1131        {
1132        if( type instanceof CoercibleType )
1133          type = ( (CoercibleType) type ).getCanonicalType();
1134    
1135        // if one side is primitive, normalize to its primitive wrapper type
1136        if( type instanceof Class )
1137          type = Coercions.asNonPrimitive( (Class) type );
1138    
1139        return type;
1140        }
1141    
1142      private boolean verifySameSize( Map<String, Fields> groupingFields )
1143        {
1144        Iterator<Fields> iterator = groupingFields.values().iterator();
1145        int size = iterator.next().size();
1146    
1147        while( iterator.hasNext() )
1148          {
1149          Fields groupingField = iterator.next();
1150    
1151          if( groupingField.size() != size )
1152            return false;
1153    
1154          size = groupingField.size();
1155          }
1156    
1157        return true;
1158        }
1159    
1160      private Map<String, Fields> resolveSelectorsAgainstIncoming( Set<Scope> incomingScopes, Map<String, Fields> selectors, String type )
1161        {
1162        Map<String, Fields> resolvedFields = new HashMap<String, Fields>();
1163    
1164        for( Scope incomingScope : incomingScopes )
1165          {
1166          Fields selector = selectors.get( incomingScope.getName() );
1167    
1168          if( selector == null )
1169            throw new OperatorException( this, "no " + type + " selector found for: " + incomingScope.getName() );
1170    
1171          Fields incomingFields;
1172    
1173          if( selector.isNone() )
1174            incomingFields = Fields.NONE;
1175          else if( selector.isAll() )
1176            incomingFields = incomingScope.getIncomingSpliceFields();
1177          else if( selector.isGroup() )
1178            incomingFields = incomingScope.getOutGroupingFields();
1179          else if( selector.isValues() )
1180            incomingFields = incomingScope.getOutValuesFields().subtract( incomingScope.getOutGroupingFields() );
1181          else
1182            incomingFields = incomingScope.getIncomingSpliceFields().select( selector );
1183    
1184          resolvedFields.put( incomingScope.getName(), incomingFields );
1185          }
1186    
1187        return resolvedFields;
1188        }
1189    
1190      Map<String, Fields> resolveSortingSelectors( Set<Scope> incomingScopes )
1191        {
1192        try
1193          {
1194          if( getSortingSelectors().isEmpty() )
1195            return null;
1196    
1197          return resolveSelectorsAgainstIncoming( incomingScopes, getSortingSelectors(), "sorting" );
1198          }
1199        catch( FieldsResolverException exception )
1200          {
1201          throw new OperatorException( this, OperatorException.Kind.sorting, exception.getSourceFields(), exception.getSelectorFields(), exception );
1202          }
1203        catch( RuntimeException exception )
1204          {
1205          throw new OperatorException( this, "could not resolve sorting selector in: " + this, exception );
1206          }
1207        }
1208    
1209      @Override
1210      public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope )
1211        {
1212        return incomingScope.getIncomingSpliceFields();
1213        }
1214    
1215      Fields resolveDeclared( Set<Scope> incomingScopes )
1216        {
1217        try
1218          {
1219          Fields declaredFields = getJoinDeclaredFields();
1220    
1221          // Fields.NONE is a flag to the CoGroup the following Buffer will use the JoinerClosure directly
1222          if( declaredFields != null && declaredFields.isNone() )
1223            {
1224            if( !isCoGroup() )
1225              throw new IllegalArgumentException( "Fields.NONE may only be declared as the join fields when using a CoGroup" );
1226    
1227            return Fields.NONE;
1228            }
1229    
1230          if( declaredFields != null ) // null for GroupBy
1231            {
1232            if( incomingScopes.size() != pipes.size() && isSelfJoin() )
1233              throw new OperatorException( this, "self joins without intermediate operators are not permitted, see 'numSelfJoins' constructor or identity function" );
1234    
1235            int size = 0;
1236            boolean foundUnknown = false;
1237    
1238            List<Fields> appendableFields = getOrderedResolvedFields( incomingScopes );
1239    
1240            for( Fields fields : appendableFields )
1241              {
1242              foundUnknown = foundUnknown || fields.isUnknown();
1243              size += fields.size();
1244              }
1245    
1246            // we must relax field checking in the face of unknown fields
1247            if( !foundUnknown && declaredFields.size() != size * ( numSelfJoins + 1 ) )
1248              {
1249              if( isSelfJoin() )
1250                throw new OperatorException( this, "declared grouped fields not same size as grouped values, declared: " + declaredFields.printVerbose() + " != size: " + size * ( numSelfJoins + 1 ) );
1251              else
1252                throw new OperatorException( this, "declared grouped fields not same size as grouped values, declared: " + declaredFields.printVerbose() + " resolved: " + Util.print( appendableFields, "" ) );
1253              }
1254    
1255            int i = 0;
1256            for( Fields appendableField : appendableFields )
1257              {
1258              Type[] types = appendableField.getTypes();
1259    
1260              if( types == null )
1261                {
1262                i += appendableField.size();
1263                continue;
1264                }
1265    
1266              for( Type type : types )
1267                {
1268                if( type != null )
1269                  declaredFields = declaredFields.applyType( i, type );
1270    
1271                i++;
1272                }
1273              }
1274    
1275            return declaredFields;
1276            }
1277    
1278          // support merge or cogrouping here
1279          if( isGroupBy() || isMerge() )
1280            {
1281            Iterator<Scope> iterator = incomingScopes.iterator();
1282            Fields commonFields = iterator.next().getIncomingSpliceFields();
1283    
1284            while( iterator.hasNext() )
1285              {
1286              Scope incomingScope = iterator.next();
1287              Fields fields = incomingScope.getIncomingSpliceFields();
1288    
1289              if( !commonFields.equalsFields( fields ) )
1290                throw new OperatorException( this, "merged streams must declare the same field names, in the same order, expected: " + commonFields.printVerbose() + " found: " + fields.printVerbose() );
1291              }
1292    
1293            return commonFields;
1294            }
1295          else
1296            {
1297            List<Fields> appendableFields = getOrderedResolvedFields( incomingScopes );
1298            Fields appendedFields = new Fields();
1299    
1300            try
1301              {
1302              // will fail on name collisions
1303              for( Fields appendableField : appendableFields )
1304                appendedFields = appendedFields.append( appendableField );
1305              }
1306            catch( TupleException exception )
1307              {
1308              String fields = "";
1309    
1310              for( Fields appendableField : appendableFields )
1311                fields += appendableField.print();
1312    
1313              throw new OperatorException( this, "found duplicate field names in joined tuple stream: " + fields, exception );
1314              }
1315    
1316            return appendedFields;
1317            }
1318          }
1319        catch( OperatorException exception )
1320          {
1321          throw exception;
1322          }
1323        catch( RuntimeException exception )
1324          {
1325          throw new OperatorException( this, "could not resolve declared fields in: " + this, exception );
1326          }
1327        }
1328    
1329      public Fields getJoinDeclaredFields()
1330        {
1331        Fields declaredFields = getDeclaredFields();
1332    
1333        if( !( joiner instanceof DeclaresResults ) )
1334          return declaredFields;
1335    
1336        if( declaredFields == null && ( (DeclaresResults) joiner ).getFieldDeclaration() != null )
1337          declaredFields = ( (DeclaresResults) joiner ).getFieldDeclaration();
1338    
1339        return declaredFields;
1340        }
1341    
1342      private List<Fields> getOrderedResolvedFields( Set<Scope> incomingScopes )
1343        {
1344        Map<String, Scope> scopesMap = new HashMap<String, Scope>();
1345    
1346        for( Scope incomingScope : incomingScopes )
1347          scopesMap.put( incomingScope.getName(), incomingScope );
1348    
1349        List<Fields> appendableFields = new ArrayList<Fields>();
1350    
1351        for( Pipe pipe : pipes )
1352          appendableFields.add( scopesMap.get( pipe.getName() ).getIncomingSpliceFields() );
1353        return appendableFields;
1354        }
1355    
1356      @Override
1357      public boolean isEquivalentTo( FlowElement element )
1358        {
1359        boolean equivalentTo = super.isEquivalentTo( element );
1360    
1361        if( !equivalentTo )
1362          return equivalentTo;
1363    
1364        Splice splice = (Splice) element;
1365    
1366        if( !keyFieldsMap.equals( splice.keyFieldsMap ) )
1367          return false;
1368    
1369        if( !pipes.equals( splice.pipes ) )
1370          return false;
1371    
1372        return true;
1373        }
1374    
1375      // OBJECT OVERRIDES
1376    
1377      @Override
1378      @SuppressWarnings({"RedundantIfStatement"})
1379      public boolean equals( Object object )
1380        {
1381        if( this == object )
1382          return true;
1383        if( object == null || getClass() != object.getClass() )
1384          return false;
1385        if( !super.equals( object ) )
1386          return false;
1387    
1388        Splice splice = (Splice) object;
1389    
1390        if( spliceName != null ? !spliceName.equals( splice.spliceName ) : splice.spliceName != null )
1391          return false;
1392        if( keyFieldsMap != null ? !keyFieldsMap.equals( splice.keyFieldsMap ) : splice.keyFieldsMap != null )
1393          return false;
1394        if( pipes != null ? !pipes.equals( splice.pipes ) : splice.pipes != null )
1395          return false;
1396    
1397        return true;
1398        }
1399    
1400      @Override
1401      public int hashCode()
1402        {
1403        int result = super.hashCode();
1404        result = 31 * result + ( pipes != null ? pipes.hashCode() : 0 );
1405        result = 31 * result + ( keyFieldsMap != null ? keyFieldsMap.hashCode() : 0 );
1406        result = 31 * result + ( spliceName != null ? spliceName.hashCode() : 0 );
1407        return result;
1408        }
1409    
1410      @Override
1411      public String toString()
1412        {
1413        StringBuilder buffer = new StringBuilder( super.toString() );
1414    
1415        buffer.append( "[by:" );
1416    
1417        for( String name : keyFieldsMap.keySet() )
1418          {
1419          if( keyFieldsMap.size() > 1 )
1420            buffer.append( " " ).append( name ).append( ":" );
1421    
1422          buffer.append( keyFieldsMap.get( name ).printVerbose() );
1423          }
1424    
1425        if( isSelfJoin() )
1426          buffer.append( "[numSelfJoins:" ).append( numSelfJoins ).append( "]" );
1427    
1428        buffer.append( "]" );
1429    
1430        return buffer.toString();
1431        }
1432    
1433      @Override
1434      protected void printInternal( StringBuffer buffer, Scope scope )
1435        {
1436        super.printInternal( buffer, scope );
1437        Map<String, Fields> map = scope.getKeySelectors();
1438    
1439        if( map != null )
1440          {
1441          buffer.append( "[by:" );
1442    
1443          // important to retain incoming pipe order
1444          for( Map.Entry<String, Fields> entry : keyFieldsMap.entrySet() )
1445            {
1446            String name = entry.getKey();
1447    
1448            if( map.size() > 1 )
1449              buffer.append( name ).append( ":" );
1450    
1451            buffer.append( map.get( name ).print() ); // get resolved keys
1452            }
1453    
1454          if( isSelfJoin() )
1455            buffer.append( "[numSelfJoins:" ).append( numSelfJoins ).append( "]" );
1456    
1457          buffer.append( "]" );
1458          }
1459        }
1460      }