001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.pipe; 022 023import java.lang.reflect.Type; 024import java.util.ArrayList; 025import java.util.Comparator; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.LinkedHashMap; 030import java.util.List; 031import java.util.Map; 032import java.util.Set; 033 034import cascading.flow.FlowElement; 035import cascading.flow.planner.DeclaresResults; 036import cascading.flow.planner.Scope; 037import cascading.pipe.joiner.BufferJoin; 038import cascading.pipe.joiner.InnerJoin; 039import cascading.pipe.joiner.Joiner; 040import cascading.tuple.Fields; 041import cascading.tuple.FieldsResolverException; 042import cascading.tuple.TupleException; 043import cascading.tuple.coerce.Coercions; 044import cascading.tuple.type.CoercibleType; 045import cascading.util.Util; 046 047import static java.util.Arrays.asList; 048 049/** 050 * The base class for {@link GroupBy}, {@link CoGroup}, {@link Merge}, and {@link HashJoin}. This class should not be used directly. 051 * 052 * @see GroupBy 053 * @see CoGroup 054 * @see Merge 055 * @see HashJoin 056 */ 057public class Splice extends Pipe 058 { 059 static enum Kind 060 { 061 GroupBy, CoGroup, Merge, Join 062 } 063 064 private Kind kind; 065 /** Field spliceName */ 066 private String spliceName; 067 /** Field pipes */ 068 private final List<Pipe> pipes = new ArrayList<Pipe>(); 069 /** Field groupFieldsMap */ 070 protected final Map<String, Fields> keyFieldsMap = new LinkedHashMap<String, Fields>(); // keep order 071 /** Field sortFieldsMap */ 072 protected Map<String, Fields> sortFieldsMap = new LinkedHashMap<String, Fields>(); // keep order 073 /** Field reverseOrder */ 074 private boolean reverseOrder = false; 075 /** Field declaredFields */ 076 protected Fields declaredFields; 077 /** Field resultGroupFields */ 078 protected Fields resultGroupFields; 079 /** Field repeat */ 080 private int numSelfJoins = 0; 081 /** Field coGrouper */ 082 private Joiner joiner; 083 084 /** Field pipePos */ 085 private transient Map<String, Integer> pipePos; 086 087 /** 088 * Constructor Splice creates a new Splice instance. 089 * 090 * @param lhs of type Pipe 091 * @param lhsGroupFields of type Fields 092 * @param rhs of type Pipe 093 * @param rhsGroupFields of type Fields 094 * @param declaredFields of type Fields 095 */ 096 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields ) 097 { 098 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, null, null ); 099 } 100 101 /** 102 * Constructor Splice creates a new Splice instance. 103 * 104 * @param lhs of type Pipe 105 * @param lhsGroupFields of type Fields 106 * @param rhs of type Pipe 107 * @param rhsGroupFields of type Fields 108 * @param declaredFields of type Fields 109 * @param resultGroupFields of type Fields 110 */ 111 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields ) 112 { 113 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields, null ); 114 } 115 116 /** 117 * Constructor Splice creates a new Splice instance. 118 * 119 * @param lhs of type Pipe 120 * @param lhsGroupFields of type Fields 121 * @param rhs of type Pipe 122 * @param rhsGroupFields of type Fields 123 * @param declaredFields of type Fields 124 * @param joiner of type CoGrouper 125 */ 126 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Joiner joiner ) 127 { 128 this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ), declaredFields, joiner ); 129 } 130 131 /** 132 * Constructor Splice creates a new Splice instance. 133 * 134 * @param lhs of type Pipe 135 * @param lhsGroupFields of type Fields 136 * @param rhs of type Pipe 137 * @param rhsGroupFields of type Fields 138 * @param declaredFields of type Fields 139 * @param resultGroupFields of type Fields 140 * @param joiner of type Joiner 141 */ 142 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 143 { 144 this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ), declaredFields, resultGroupFields, joiner ); 145 } 146 147 /** 148 * Constructor Splice creates a new Splice instance. 149 * 150 * @param lhs of type Pipe 151 * @param lhsGroupFields of type Fields 152 * @param rhs of type Pipe 153 * @param rhsGroupFields of type Fields 154 * @param joiner of type CoGrouper 155 */ 156 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Joiner joiner ) 157 { 158 this( lhs, lhsGroupFields, rhs, rhsGroupFields, null, joiner ); 159 } 160 161 /** 162 * Constructor Splice creates a new Splice instance. 163 * 164 * @param lhs of type Pipe 165 * @param lhsGroupFields of type Fields 166 * @param rhs of type Pipe 167 * @param rhsGroupFields of type Fields 168 */ 169 protected Splice( Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields ) 170 { 171 this( Pipe.pipes( lhs, rhs ), Fields.fields( lhsGroupFields, rhsGroupFields ) ); 172 } 173 174 /** 175 * Constructor Splice creates a new Splice instance. 176 * 177 * @param pipes of type Pipe... 178 */ 179 protected Splice( Pipe... pipes ) 180 { 181 this( pipes, (Fields[]) null ); 182 } 183 184 /** 185 * Constructor Splice creates a new Splice instance. 186 * 187 * @param pipes of type Pipe[] 188 * @param groupFields of type Fields[] 189 */ 190 protected Splice( Pipe[] pipes, Fields[] groupFields ) 191 { 192 this( null, pipes, groupFields, null, null ); 193 } 194 195 /** 196 * Constructor Splice creates a new Splice instance. 197 * 198 * @param spliceName of type String 199 * @param pipes of type Pipe[] 200 * @param groupFields of type Fields[] 201 */ 202 protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields ) 203 { 204 this( spliceName, pipes, groupFields, null, null ); 205 } 206 207 /** 208 * Constructor Splice creates a new Splice instance. 209 * 210 * @param spliceName of type String 211 * @param pipes of type Pipe[] 212 * @param groupFields of type Fields[] 213 * @param declaredFields of type Fields 214 */ 215 protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields ) 216 { 217 this( spliceName, pipes, groupFields, declaredFields, null ); 218 } 219 220 /** 221 * Constructor Splice creates a new Splice instance. 222 * 223 * @param spliceName of type String 224 * @param pipes of type Pipe[] 225 * @param groupFields of type Fields[] 226 * @param declaredFields of type Fields 227 * @param resultGroupFields of type Fields 228 */ 229 protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields ) 230 { 231 this( spliceName, pipes, groupFields, declaredFields, resultGroupFields, null ); 232 } 233 234 /** 235 * Constructor Splice creates a new Splice instance. 236 * 237 * @param pipes of type Pipe[] 238 * @param groupFields of type Fields[] 239 * @param declaredFields of type Fields 240 * @param joiner of type CoGrouper 241 */ 242 protected Splice( Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Joiner joiner ) 243 { 244 this( null, pipes, groupFields, declaredFields, null, joiner ); 245 } 246 247 /** 248 * Constructor Splice creates a new Splice instance. 249 * 250 * @param pipes of type Pipe[] 251 * @param groupFields of type Fields[] 252 * @param declaredFields of type Fields 253 * @param resultGroupFields of type Fields 254 * @param joiner of type Joiner 255 */ 256 protected Splice( Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 257 { 258 this( null, pipes, groupFields, declaredFields, resultGroupFields, joiner ); 259 } 260 261 /** 262 * Constructor Splice creates a new Splice instance. 263 * 264 * @param spliceName of type String 265 * @param pipes of type Pipe[] 266 * @param groupFields of type Fields[] 267 * @param declaredFields of type Fields 268 * @param joiner of type CoGrouper 269 */ 270 protected Splice( String spliceName, Pipe[] pipes, Fields[] groupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 271 { 272 if( pipes == null ) 273 throw new IllegalArgumentException( "pipes array may not be null" ); 274 275 setKind(); 276 this.spliceName = spliceName; 277 278 int uniques = new HashSet<Pipe>( asList( Pipe.resolvePreviousAll( pipes ) ) ).size(); 279 280 if( pipes.length > 1 && uniques == 1 ) 281 { 282 if( isMerge() ) 283 throw new IllegalArgumentException( "may not merge a pipe with itself without intermediate operations after the split" ); 284 285 if( groupFields == null ) 286 throw new IllegalArgumentException( "groupFields array may not be null" ); 287 288 if( new HashSet<Fields>( asList( groupFields ) ).size() != 1 ) 289 throw new IllegalArgumentException( "all groupFields must be identical" ); 290 291 addPipe( pipes[ 0 ] ); 292 this.numSelfJoins = pipes.length - 1; 293 this.keyFieldsMap.put( pipes[ 0 ].getName(), groupFields[ 0 ] ); 294 295 if( resultGroupFields != null && groupFields[ 0 ].size() * pipes.length != resultGroupFields.size() ) 296 throw new IllegalArgumentException( "resultGroupFields and cogroup joined fields must be same size" ); 297 } 298 else 299 { 300 int last = -1; 301 for( int i = 0; i < pipes.length; i++ ) 302 { 303 addPipe( pipes[ i ] ); 304 305 if( groupFields == null || groupFields.length == 0 ) 306 { 307 addGroupFields( pipes[ i ], Fields.FIRST ); 308 continue; 309 } 310 311 if( last != -1 && last != groupFields[ i ].size() ) 312 throw new IllegalArgumentException( "all groupFields must be same size" ); 313 314 last = groupFields[ i ].size(); 315 addGroupFields( pipes[ i ], groupFields[ i ] ); 316 } 317 318 if( resultGroupFields != null && last * pipes.length != resultGroupFields.size() ) 319 throw new IllegalArgumentException( "resultGroupFields and cogroup resulting joined fields must be same size" ); 320 } 321 322 this.declaredFields = declaredFields; 323 this.resultGroupFields = resultGroupFields; 324 this.joiner = joiner; 325 326 verifyCoGrouper(); 327 } 328 329 /** 330 * Constructor Splice creates a new Splice instance. 331 * 332 * @param spliceName of type String 333 * @param lhs of type Pipe 334 * @param lhsGroupFields of type Fields 335 * @param rhs of type Pipe 336 * @param rhsGroupFields of type Fields 337 * @param declaredFields of type Fields 338 */ 339 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields ) 340 { 341 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields ); 342 this.spliceName = spliceName; 343 } 344 345 /** 346 * Constructor Splice creates a new Splice instance. 347 * 348 * @param spliceName of type String 349 * @param lhs of type Pipe 350 * @param lhsGroupFields of type Fields 351 * @param rhs of type Pipe 352 * @param rhsGroupFields of type Fields 353 * @param declaredFields of type Fields 354 * @param resultGroupFields of type Fields 355 */ 356 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields ) 357 { 358 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields ); 359 this.spliceName = spliceName; 360 } 361 362 /** 363 * Constructor Splice creates a new Splice instance. 364 * 365 * @param spliceName of type String 366 * @param lhs of type Pipe 367 * @param lhsGroupFields of type Fields 368 * @param rhs of type Pipe 369 * @param rhsGroupFields of type Fields 370 * @param declaredFields of type Fields 371 * @param joiner of type CoGrouper 372 */ 373 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Joiner joiner ) 374 { 375 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, joiner ); 376 this.spliceName = spliceName; 377 } 378 379 /** 380 * Constructor Splice creates a new Splice instance. 381 * 382 * @param spliceName of type String 383 * @param lhs of type Pipe 384 * @param lhsGroupFields of type Fields 385 * @param rhs of type Pipe 386 * @param rhsGroupFields of type Fields 387 * @param declaredFields of type Fields 388 * @param resultGroupFields of type Fields 389 * @param joiner of type Joiner 390 */ 391 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 392 { 393 this( lhs, lhsGroupFields, rhs, rhsGroupFields, declaredFields, resultGroupFields, joiner ); 394 this.spliceName = spliceName; 395 } 396 397 /** 398 * Constructor Splice creates a new Splice instance. 399 * 400 * @param spliceName of type String 401 * @param lhs of type Pipe 402 * @param lhsGroupFields of type Fields 403 * @param rhs of type Pipe 404 * @param rhsGroupFields of type Fields 405 * @param joiner of type CoGrouper 406 */ 407 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields, Joiner joiner ) 408 { 409 this( lhs, lhsGroupFields, rhs, rhsGroupFields, joiner ); 410 this.spliceName = spliceName; 411 } 412 413 /** 414 * Constructor Splice creates a new Splice instance. 415 * 416 * @param spliceName of type String 417 * @param lhs of type Pipe 418 * @param lhsGroupFields of type Fields 419 * @param rhs of type Pipe 420 * @param rhsGroupFields of type Fields 421 */ 422 protected Splice( String spliceName, Pipe lhs, Fields lhsGroupFields, Pipe rhs, Fields rhsGroupFields ) 423 { 424 this( lhs, lhsGroupFields, rhs, rhsGroupFields ); 425 this.spliceName = spliceName; 426 } 427 428 /** 429 * Constructor Splice creates a new Splice instance. 430 * 431 * @param spliceName of type String 432 * @param pipes of type Pipe... 433 */ 434 protected Splice( String spliceName, Pipe... pipes ) 435 { 436 this( pipes ); 437 this.spliceName = spliceName; 438 } 439 440 /** 441 * Constructor Splice creates a new Splice instance. 442 * 443 * @param pipe of type Pipe 444 * @param groupFields of type Fields 445 * @param numSelfJoins of type int 446 * @param declaredFields of type Fields 447 */ 448 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields ) 449 { 450 this( pipe, groupFields, numSelfJoins ); 451 this.declaredFields = declaredFields; 452 } 453 454 /** 455 * Constructor Splice creates a new Splice instance. 456 * 457 * @param pipe of type Pipe 458 * @param groupFields of type Fields 459 * @param numSelfJoins of type int 460 * @param declaredFields of type Fields 461 * @param resultGroupFields of type Fields 462 */ 463 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields ) 464 { 465 this( pipe, groupFields, numSelfJoins ); 466 this.declaredFields = declaredFields; 467 this.resultGroupFields = resultGroupFields; 468 469 if( resultGroupFields != null && groupFields.size() * numSelfJoins != resultGroupFields.size() ) 470 throw new IllegalArgumentException( "resultGroupFields and cogroup resulting join fields must be same size" ); 471 } 472 473 /** 474 * Constructor Splice creates a new Splice instance. 475 * 476 * @param pipe of type Pipe 477 * @param groupFields of type Fields 478 * @param numSelfJoins of type int 479 * @param declaredFields of type Fields 480 * @param joiner of type CoGrouper 481 */ 482 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Joiner joiner ) 483 { 484 this( pipe, groupFields, numSelfJoins, declaredFields ); 485 this.joiner = joiner; 486 487 verifyCoGrouper(); 488 } 489 490 /** 491 * Constructor Splice creates a new Splice instance. 492 * 493 * @param pipe of type Pipe 494 * @param groupFields of type Fields 495 * @param numSelfJoins of type int 496 * @param declaredFields of type Fields 497 * @param resultGroupFields of type Fields 498 * @param joiner of type Joiner 499 */ 500 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 501 { 502 this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields ); 503 this.joiner = joiner; 504 505 verifyCoGrouper(); 506 } 507 508 /** 509 * Constructor Splice creates a new Splice instance. 510 * 511 * @param pipe of type Pipe 512 * @param groupFields of type Fields 513 * @param numSelfJoins of type int 514 * @param joiner of type CoGrouper 515 */ 516 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins, Joiner joiner ) 517 { 518 setKind(); 519 addPipe( pipe ); 520 this.keyFieldsMap.put( pipe.getName(), groupFields ); 521 this.numSelfJoins = numSelfJoins; 522 this.joiner = joiner; 523 524 verifyCoGrouper(); 525 } 526 527 /** 528 * Constructor Splice creates a new Splice instance. 529 * 530 * @param pipe of type Pipe 531 * @param groupFields of type Fields 532 * @param numSelfJoins of type int 533 */ 534 protected Splice( Pipe pipe, Fields groupFields, int numSelfJoins ) 535 { 536 this( pipe, groupFields, numSelfJoins, (Joiner) null ); 537 } 538 539 /** 540 * Constructor Splice creates a new Splice instance. 541 * 542 * @param spliceName of type String 543 * @param pipe of type Pipe 544 * @param groupFields of type Fields 545 * @param numSelfJoins of type int 546 * @param declaredFields of type Fields 547 */ 548 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields ) 549 { 550 this( pipe, groupFields, numSelfJoins, declaredFields ); 551 this.spliceName = spliceName; 552 } 553 554 /** 555 * Constructor Splice creates a new Splice instance. 556 * 557 * @param spliceName of type String 558 * @param pipe of type Pipe 559 * @param groupFields of type Fields 560 * @param numSelfJoins of type int 561 * @param declaredFields of type Fields 562 * @param resultGroupFields of type Fields 563 */ 564 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields ) 565 { 566 this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields ); 567 this.spliceName = spliceName; 568 } 569 570 /** 571 * Constructor Splice creates a new Splice instance. 572 * 573 * @param spliceName of type String 574 * @param pipe of type Pipe 575 * @param groupFields of type Fields 576 * @param numSelfJoins of type int 577 * @param declaredFields of type Fields 578 * @param joiner of type CoGrouper 579 */ 580 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Joiner joiner ) 581 { 582 this( pipe, groupFields, numSelfJoins, declaredFields, joiner ); 583 this.spliceName = spliceName; 584 } 585 586 /** 587 * Constructor Splice creates a new Splice instance. 588 * 589 * @param spliceName of type String 590 * @param pipe of type Pipe 591 * @param groupFields of type Fields 592 * @param numSelfJoins of type int 593 * @param declaredFields of type Fields 594 * @param resultGroupFields of type Fields 595 * @param joiner of type Joiner 596 */ 597 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Fields declaredFields, Fields resultGroupFields, Joiner joiner ) 598 { 599 this( pipe, groupFields, numSelfJoins, declaredFields, resultGroupFields, joiner ); 600 this.spliceName = spliceName; 601 } 602 603 /** 604 * Constructor Splice creates a new Splice instance. 605 * 606 * @param spliceName of type String 607 * @param pipe of type Pipe 608 * @param groupFields of type Fields 609 * @param numSelfJoins of type int 610 * @param joiner of type CoGrouper 611 */ 612 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins, Joiner joiner ) 613 { 614 this( pipe, groupFields, numSelfJoins, joiner ); 615 this.spliceName = spliceName; 616 } 617 618 /** 619 * Constructor Splice creates a new Splice instance. 620 * 621 * @param spliceName of type String 622 * @param pipe of type Pipe 623 * @param groupFields of type Fields 624 * @param numSelfJoins of type int 625 */ 626 protected Splice( String spliceName, Pipe pipe, Fields groupFields, int numSelfJoins ) 627 { 628 this( pipe, groupFields, numSelfJoins ); 629 this.spliceName = spliceName; 630 } 631 632 //////////// 633 // GROUPBY 634 //////////// 635 636 /** 637 * Constructor Splice creates a new Splice instance where grouping occurs on {@link Fields#ALL} fields. 638 * 639 * @param pipe of type Pipe 640 */ 641 protected Splice( Pipe pipe ) 642 { 643 this( null, pipe, Fields.ALL, null, false ); 644 } 645 646 /** 647 * Constructor Splice creates a new Splice instance. 648 * 649 * @param pipe of type Pipe 650 * @param groupFields of type Fields 651 */ 652 protected Splice( Pipe pipe, Fields groupFields ) 653 { 654 this( null, pipe, groupFields, null, false ); 655 } 656 657 /** 658 * Constructor Splice creates a new Splice instance. 659 * 660 * @param spliceName of type String 661 * @param pipe of type Pipe 662 * @param groupFields of type Fields 663 */ 664 protected Splice( String spliceName, Pipe pipe, Fields groupFields ) 665 { 666 this( spliceName, pipe, groupFields, null, false ); 667 } 668 669 /** 670 * Constructor Splice creates a new Splice instance. 671 * 672 * @param pipe of type Pipe 673 * @param groupFields of type Fields 674 * @param sortFields of type Fields 675 */ 676 protected Splice( Pipe pipe, Fields groupFields, Fields sortFields ) 677 { 678 this( null, pipe, groupFields, sortFields, false ); 679 } 680 681 /** 682 * Constructor Splice creates a new Splice instance. 683 * 684 * @param spliceName of type String 685 * @param pipe of type Pipe 686 * @param groupFields of type Fields 687 * @param sortFields of type Fields 688 */ 689 protected Splice( String spliceName, Pipe pipe, Fields groupFields, Fields sortFields ) 690 { 691 this( spliceName, pipe, groupFields, sortFields, false ); 692 } 693 694 /** 695 * Constructor Splice creates a new Splice instance. 696 * 697 * @param pipe of type Pipe 698 * @param groupFields of type Fields 699 * @param sortFields of type Fields 700 * @param reverseOrder of type boolean 701 */ 702 protected Splice( Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder ) 703 { 704 this( null, pipe, groupFields, sortFields, reverseOrder ); 705 } 706 707 /** 708 * Constructor Splice creates a new Splice instance. 709 * 710 * @param spliceName of type String 711 * @param pipe of type Pipe 712 * @param groupFields of type Fields 713 * @param sortFields of type Fields 714 * @param reverseOrder of type boolean 715 */ 716 protected Splice( String spliceName, Pipe pipe, Fields groupFields, Fields sortFields, boolean reverseOrder ) 717 { 718 this( spliceName, Pipe.pipes( pipe ), groupFields, sortFields, reverseOrder ); 719 } 720 721 /** 722 * Constructor Splice creates a new Splice instance. 723 * 724 * @param pipes of type Pipe 725 * @param groupFields of type Fields 726 */ 727 protected Splice( Pipe[] pipes, Fields groupFields ) 728 { 729 this( null, pipes, groupFields, null, false ); 730 } 731 732 /** 733 * Constructor Splice creates a new Splice instance. 734 * 735 * @param spliceName of type String 736 * @param pipes of type Pipe 737 * @param groupFields of type Fields 738 */ 739 protected Splice( String spliceName, Pipe[] pipes, Fields groupFields ) 740 { 741 this( spliceName, pipes, groupFields, null, false ); 742 } 743 744 /** 745 * Constructor Splice creates a new Splice instance. 746 * 747 * @param pipes of type Pipe 748 * @param groupFields of type Fields 749 * @param sortFields of type Fields 750 */ 751 protected Splice( Pipe[] pipes, Fields groupFields, Fields sortFields ) 752 { 753 this( null, pipes, groupFields, sortFields, false ); 754 } 755 756 /** 757 * Constructor Splice creates a new Splice instance. 758 * 759 * @param spliceName of type String 760 * @param pipe of type Pipe 761 * @param groupFields of type Fields 762 * @param sortFields of type Fields 763 */ 764 protected Splice( String spliceName, Pipe[] pipe, Fields groupFields, Fields sortFields ) 765 { 766 this( spliceName, pipe, groupFields, sortFields, false ); 767 } 768 769 /** 770 * Constructor Splice creates a new Splice instance. 771 * 772 * @param pipes of type Pipe 773 * @param groupFields of type Fields 774 * @param sortFields of type Fields 775 * @param reverseOrder of type boolean 776 */ 777 protected Splice( Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder ) 778 { 779 this( null, pipes, groupFields, sortFields, reverseOrder ); 780 } 781 782 /** 783 * Constructor Splice creates a new Splice instance. 784 * 785 * @param spliceName of type String 786 * @param pipes of type Pipe[] 787 * @param groupFields of type Fields 788 * @param sortFields of type Fields 789 * @param reverseOrder of type boolean 790 */ 791 protected Splice( String spliceName, Pipe[] pipes, Fields groupFields, Fields sortFields, boolean reverseOrder ) 792 { 793 if( pipes == null ) 794 throw new IllegalArgumentException( "pipes array may not be null" ); 795 796 if( groupFields == null ) 797 throw new IllegalArgumentException( "groupFields may not be null" ); 798 799 setKind(); 800 this.spliceName = spliceName; 801 802 for( Pipe pipe : pipes ) 803 { 804 addPipe( pipe ); 805 this.keyFieldsMap.put( pipe.getName(), groupFields ); 806 807 if( sortFields != null ) 808 this.sortFieldsMap.put( pipe.getName(), sortFields ); 809 } 810 811 this.reverseOrder = reverseOrder; 812 this.joiner = new InnerJoin(); 813 } 814 815 private void verifyCoGrouper() 816 { 817 if( isJoin() && joiner instanceof BufferJoin ) 818 throw new IllegalArgumentException( "invalid joiner, may not use BufferJoiner in a HashJoin" ); 819 820 if( joiner == null ) 821 { 822 joiner = new InnerJoin(); 823 return; 824 } 825 826 if( joiner.numJoins() == -1 ) 827 return; 828 829 int joins = Math.max( numSelfJoins, keyFieldsMap.size() - 1 ); // joining two streams is one join 830 831 if( joins != joiner.numJoins() ) 832 throw new IllegalArgumentException( "invalid joiner, only accepts " + joiner.numJoins() + " joins, there are: " + joins ); 833 } 834 835 private void setKind() 836 { 837 if( this instanceof GroupBy ) 838 kind = Kind.GroupBy; 839 else if( this instanceof CoGroup ) 840 kind = Kind.CoGroup; 841 else if( this instanceof Merge ) 842 kind = Kind.Merge; 843 else 844 kind = Kind.Join; 845 } 846 847 /** 848 * Method getDeclaredFields returns the declaredFields of this Splice object. 849 * 850 * @return the declaredFields (type Fields) of this Splice object. 851 */ 852 public Fields getDeclaredFields() 853 { 854 return declaredFields; 855 } 856 857 private void addPipe( Pipe pipe ) 858 { 859 if( pipe.getName() == null ) 860 throw new IllegalArgumentException( "each input pipe must have a name" ); 861 862 pipes.add( pipe ); // allow same pipe 863 } 864 865 private void addGroupFields( Pipe pipe, Fields fields ) 866 { 867 if( keyFieldsMap.containsKey( pipe.getName() ) ) 868 throw new IllegalArgumentException( "each input pipe branch must be uniquely named" ); 869 870 keyFieldsMap.put( pipe.getName(), fields ); 871 } 872 873 @Override 874 public String getName() 875 { 876 if( spliceName != null ) 877 return spliceName; 878 879 StringBuffer buffer = new StringBuffer(); 880 881 for( Pipe pipe : pipes ) 882 { 883 if( buffer.length() != 0 ) 884 { 885 if( isGroupBy() || isMerge() ) 886 buffer.append( "+" ); 887 else if( isCoGroup() || isJoin() ) 888 buffer.append( "*" ); // more semantically correct 889 } 890 891 buffer.append( pipe.getName() ); 892 } 893 894 spliceName = buffer.toString(); 895 896 return spliceName; 897 } 898 899 @Override 900 public Pipe[] getPrevious() 901 { 902 return pipes.toArray( new Pipe[ pipes.size() ] ); 903 } 904 905 /** 906 * Method getGroupingSelectors returns the groupingSelectors of this Splice object. 907 * 908 * @return the groupingSelectors (type Map<String, Fields>) of this Splice object. 909 */ 910 public Map<String, Fields> getKeySelectors() 911 { 912 return keyFieldsMap; 913 } 914 915 /** 916 * Method getSortingSelectors returns the sortingSelectors of this Splice object. 917 * 918 * @return the sortingSelectors (type Map<String, Fields>) of this Splice object. 919 */ 920 public Map<String, Fields> getSortingSelectors() 921 { 922 return sortFieldsMap; 923 } 924 925 /** 926 * Method isSorted returns true if this Splice instance is sorting values other than the group fields. 927 * 928 * @return the sorted (type boolean) of this Splice object. 929 */ 930 public boolean isSorted() 931 { 932 return !sortFieldsMap.isEmpty(); 933 } 934 935 /** 936 * Method isSortReversed returns true if sorting is reversed. 937 * 938 * @return the sortReversed (type boolean) of this Splice object. 939 */ 940 public boolean isSortReversed() 941 { 942 return reverseOrder; 943 } 944 945 public synchronized Map<String, Integer> getPipePos() 946 { 947 if( pipePos != null ) 948 return pipePos; 949 950 pipePos = new HashMap<String, Integer>(); 951 952 int pos = 0; 953 for( Object pipe : pipes ) 954 pipePos.put( ( (Pipe) pipe ).getName(), pos++ ); 955 956 return pipePos; 957 } 958 959 public Joiner getJoiner() 960 { 961 return joiner; 962 } 963 964 /** 965 * Method isGroupBy returns true if this Splice instance will perform a GroupBy operation. 966 * 967 * @return the groupBy (type boolean) of this Splice object. 968 */ 969 public final boolean isGroupBy() 970 { 971 return kind == Kind.GroupBy; 972 } 973 974 public final boolean isCoGroup() 975 { 976 return kind == Kind.CoGroup; 977 } 978 979 public final boolean isMerge() 980 { 981 return kind == Kind.Merge; 982 } 983 984 public final boolean isJoin() 985 { 986 return kind == Kind.Join; 987 } 988 989 public int getNumSelfJoins() 990 { 991 return numSelfJoins; 992 } 993 994 public boolean isSelfJoin() 995 { 996 return numSelfJoins != 0; 997 } 998 999 // FIELDS 1000 1001 @Override 1002 public Scope outgoingScopeFor( Set<Scope> incomingScopes ) 1003 { 1004 Map<String, Fields> groupingSelectors = resolveGroupingSelectors( incomingScopes ); 1005 Map<String, Fields> sortingSelectors = resolveSortingSelectors( incomingScopes ); 1006 Fields declared = resolveDeclared( incomingScopes ); 1007 1008 Fields outGroupingFields = resultGroupFields; 1009 1010 if( outGroupingFields == null && isCoGroup() ) 1011 outGroupingFields = createJoinFields( incomingScopes, groupingSelectors, declared ); 1012 1013 // for Group, the outgoing fields are the same as those declared 1014 Scope.Kind kind = getScopeKind(); 1015 1016 return new Scope( getName(), declared, outGroupingFields, groupingSelectors, sortingSelectors, declared, kind ); 1017 } 1018 1019 private Scope.Kind getScopeKind() 1020 { 1021 switch( kind ) 1022 { 1023 case GroupBy: 1024 return Scope.Kind.GROUPBY; 1025 case CoGroup: 1026 return Scope.Kind.COGROUP; 1027 case Merge: 1028 return Scope.Kind.MERGE; 1029 case Join: 1030 return Scope.Kind.HASHJOIN; 1031 } 1032 1033 throw new IllegalStateException( "unknown kind: " + kind ); 1034 } 1035 1036 private Fields createJoinFields( Set<Scope> incomingScopes, Map<String, Fields> groupingSelectors, Fields declared ) 1037 { 1038 if( declared.isNone() ) 1039 declared = Fields.UNKNOWN; 1040 1041 Map<String, Fields> incomingFields = new HashMap<String, Fields>(); 1042 1043 for( Scope scope : incomingScopes ) 1044 incomingFields.put( scope.getName(), scope.getIncomingSpliceFields() ); 1045 1046 Fields outGroupingFields = Fields.NONE; 1047 1048 int offset = 0; 1049 for( Pipe pipe : pipes ) // need to retain order of pipes 1050 { 1051 String pipeName = pipe.getName(); 1052 Fields pipeGroupingSelector = groupingSelectors.get( pipeName ); 1053 Fields incomingField = incomingFields.get( pipeName ); 1054 1055 if( !pipeGroupingSelector.isNone() ) 1056 { 1057 Fields offsetFields = incomingField.selectPos( pipeGroupingSelector, offset ); 1058 Fields resolvedSelect = declared.select( offsetFields ); 1059 1060 outGroupingFields = outGroupingFields.append( resolvedSelect ); 1061 } 1062 1063 offset += incomingField.size(); 1064 } 1065 1066 return outGroupingFields; 1067 } 1068 1069 Map<String, Fields> resolveGroupingSelectors( Set<Scope> incomingScopes ) 1070 { 1071 try 1072 { 1073 Map<String, Fields> groupingSelectors = getKeySelectors(); 1074 Map<String, Fields> groupingFields = resolveSelectorsAgainstIncoming( incomingScopes, groupingSelectors, "grouping" ); 1075 1076 if( !verifySameSize( groupingFields ) ) 1077 throw new OperatorException( this, "all grouping fields must be same size: " + toString() ); 1078 1079 verifySameTypes( groupingSelectors, groupingFields ); 1080 1081 return groupingFields; 1082 } 1083 catch( FieldsResolverException exception ) 1084 { 1085 throw new OperatorException( this, OperatorException.Kind.grouping, exception.getSourceFields(), exception.getSelectorFields(), exception ); 1086 } 1087 catch( RuntimeException exception ) 1088 { 1089 throw new OperatorException( this, "could not resolve grouping selector in: " + this, exception ); 1090 } 1091 } 1092 1093 private boolean verifySameTypes( Map<String, Fields> groupingSelectors, Map<String, Fields> groupingFields ) 1094 { 1095 // create array of field positions with comparators from the grouping selectors 1096 // unsure which side has the comparators declared so make a union 1097 boolean[] hasComparator = new boolean[ groupingFields.values().iterator().next().size() ]; 1098 1099 for( Map.Entry<String, Fields> entry : groupingSelectors.entrySet() ) 1100 { 1101 Comparator[] comparatorsArray = entry.getValue().getComparators(); 1102 1103 for( int i = 0; i < comparatorsArray.length; i++ ) 1104 hasComparator[ i ] = hasComparator[ i ] || comparatorsArray[ i ] != null; 1105 } 1106 1107 // compare all the rhs fields with the lhs (lhs and rhs are arbitrary here) 1108 Iterator<Fields> iterator = groupingFields.values().iterator(); 1109 Fields lhsFields = iterator.next(); 1110 Type[] lhsTypes = lhsFields.getTypes(); 1111 1112 // if types are null, no basis for comparison 1113 if( lhsTypes == null ) 1114 return true; 1115 1116 while( iterator.hasNext() ) 1117 { 1118 Fields rhsFields = iterator.next(); 1119 Type[] rhsTypes = rhsFields.getTypes(); 1120 1121 // if types are null, no basis for comparison 1122 if( rhsTypes == null ) 1123 return true; 1124 1125 for( int i = 0; i < lhsTypes.length; i++ ) 1126 { 1127 if( hasComparator[ i ] ) 1128 continue; 1129 1130 Type lhs = lhsTypes[ i ]; 1131 Type rhs = rhsTypes[ i ]; 1132 1133 lhs = getCanonicalType( lhs ); 1134 rhs = getCanonicalType( rhs ); 1135 1136 if( lhs.equals( rhs ) ) 1137 continue; 1138 1139 Fields lhsError = new Fields( lhsFields.get( i ), lhsFields.getType( i ) ); 1140 Fields rhsError = new Fields( rhsFields.get( i ), rhsFields.getType( i ) ); 1141 1142 throw new OperatorException( this, "grouping fields must declare same types:" + lhsError.printVerbose() + " not same as " + rhsError.printVerbose() ); 1143 } 1144 } 1145 1146 return true; 1147 } 1148 1149 private Type getCanonicalType( Type type ) 1150 { 1151 if( type instanceof CoercibleType ) 1152 type = ( (CoercibleType) type ).getCanonicalType(); 1153 1154 // if one side is primitive, normalize to its primitive wrapper type 1155 if( type instanceof Class ) 1156 type = Coercions.asNonPrimitive( (Class) type ); 1157 1158 return type; 1159 } 1160 1161 private boolean verifySameSize( Map<String, Fields> groupingFields ) 1162 { 1163 Iterator<Fields> iterator = groupingFields.values().iterator(); 1164 int size = iterator.next().size(); 1165 1166 while( iterator.hasNext() ) 1167 { 1168 Fields groupingField = iterator.next(); 1169 1170 if( groupingField.size() != size ) 1171 return false; 1172 1173 size = groupingField.size(); 1174 } 1175 1176 return true; 1177 } 1178 1179 private Map<String, Fields> resolveSelectorsAgainstIncoming( Set<Scope> incomingScopes, Map<String, Fields> selectors, String type ) 1180 { 1181 Map<String, Fields> resolvedFields = new HashMap<String, Fields>(); 1182 1183 for( Scope incomingScope : incomingScopes ) 1184 { 1185 Fields selector = selectors.get( incomingScope.getName() ); 1186 1187 if( selector == null ) 1188 throw new OperatorException( this, "no " + type + " selector found for: " + incomingScope.getName() ); 1189 1190 Fields incomingFields; 1191 1192 if( selector.isNone() ) 1193 incomingFields = Fields.NONE; 1194 else if( selector.isAll() ) 1195 incomingFields = incomingScope.getIncomingSpliceFields(); 1196 else if( selector.isGroup() ) 1197 incomingFields = incomingScope.getOutGroupingFields(); 1198 else if( selector.isValues() ) 1199 incomingFields = incomingScope.getOutValuesFields().subtract( incomingScope.getOutGroupingFields() ); 1200 else 1201 incomingFields = incomingScope.getIncomingSpliceFields().select( selector ); 1202 1203 resolvedFields.put( incomingScope.getName(), incomingFields ); 1204 } 1205 1206 return resolvedFields; 1207 } 1208 1209 Map<String, Fields> resolveSortingSelectors( Set<Scope> incomingScopes ) 1210 { 1211 try 1212 { 1213 if( getSortingSelectors().isEmpty() ) 1214 return null; 1215 1216 return resolveSelectorsAgainstIncoming( incomingScopes, getSortingSelectors(), "sorting" ); 1217 } 1218 catch( FieldsResolverException exception ) 1219 { 1220 throw new OperatorException( this, OperatorException.Kind.sorting, exception.getSourceFields(), exception.getSelectorFields(), exception ); 1221 } 1222 catch( RuntimeException exception ) 1223 { 1224 throw new OperatorException( this, "could not resolve sorting selector in: " + this, exception ); 1225 } 1226 } 1227 1228 @Override 1229 public Fields resolveIncomingOperationPassThroughFields( Scope incomingScope ) 1230 { 1231 return incomingScope.getIncomingSpliceFields(); 1232 } 1233 1234 Fields resolveDeclared( Set<Scope> incomingScopes ) 1235 { 1236 try 1237 { 1238 Fields declaredFields = getJoinDeclaredFields(); 1239 1240 // Fields.NONE is a flag to the CoGroup the following Buffer will use the JoinerClosure directly 1241 if( declaredFields != null && declaredFields.isNone() ) 1242 { 1243 if( !isCoGroup() ) 1244 throw new IllegalArgumentException( "Fields.NONE may only be declared as the join fields when using a CoGroup" ); 1245 1246 return Fields.NONE; 1247 } 1248 1249 if( declaredFields != null ) // null for GroupBy 1250 { 1251 if( incomingScopes.size() != pipes.size() && isSelfJoin() ) 1252 throw new OperatorException( this, "self joins without intermediate operators are not permitted, see 'numSelfJoins' constructor or identity function" ); 1253 1254 int size = 0; 1255 boolean foundUnknown = false; 1256 1257 List<Fields> appendableFields = getOrderedResolvedFields( incomingScopes ); 1258 1259 for( Fields fields : appendableFields ) 1260 { 1261 foundUnknown = foundUnknown || fields.isUnknown(); 1262 size += fields.size(); 1263 } 1264 1265 // we must relax field checking in the face of unknown fields 1266 if( !foundUnknown && declaredFields.size() != size * ( numSelfJoins + 1 ) ) 1267 { 1268 if( isSelfJoin() ) 1269 throw new OperatorException( this, "declared grouped fields not same size as grouped values, declared: " + declaredFields.printVerbose() + " != size: " + size * ( numSelfJoins + 1 ) ); 1270 else 1271 throw new OperatorException( this, "declared grouped fields not same size as grouped values, declared: " + declaredFields.printVerbose() + " resolved: " + Util.print( appendableFields, "" ) ); 1272 } 1273 1274 int i = 0; 1275 for( Fields appendableField : appendableFields ) 1276 { 1277 Type[] types = appendableField.getTypes(); 1278 1279 if( types == null ) 1280 { 1281 i += appendableField.size(); 1282 continue; 1283 } 1284 1285 for( Type type : types ) 1286 { 1287 if( type != null ) 1288 declaredFields = declaredFields.applyType( i, type ); 1289 1290 i++; 1291 } 1292 } 1293 1294 return declaredFields; 1295 } 1296 1297 // support merge or cogrouping here 1298 if( isGroupBy() || isMerge() ) 1299 { 1300 Iterator<Scope> iterator = incomingScopes.iterator(); 1301 Fields commonFields = iterator.next().getIncomingSpliceFields(); 1302 1303 while( iterator.hasNext() ) 1304 { 1305 Scope incomingScope = iterator.next(); 1306 Fields fields = incomingScope.getIncomingSpliceFields(); 1307 1308 if( !commonFields.equalsFields( fields ) ) 1309 throw new OperatorException( this, "merged streams must declare the same field names, in the same order, expected: " + commonFields.printVerbose() + " found: " + fields.printVerbose() ); 1310 } 1311 1312 return commonFields; 1313 } 1314 else 1315 { 1316 List<Fields> appendableFields = getOrderedResolvedFields( incomingScopes ); 1317 Fields appendedFields = new Fields(); 1318 1319 try 1320 { 1321 // will fail on name collisions 1322 for( Fields appendableField : appendableFields ) 1323 appendedFields = appendedFields.append( appendableField ); 1324 } 1325 catch( TupleException exception ) 1326 { 1327 String fields = ""; 1328 1329 for( Fields appendableField : appendableFields ) 1330 fields += appendableField.print(); 1331 1332 throw new OperatorException( this, "found duplicate field names in joined tuple stream: " + fields, exception ); 1333 } 1334 1335 return appendedFields; 1336 } 1337 } 1338 catch( OperatorException exception ) 1339 { 1340 throw exception; 1341 } 1342 catch( RuntimeException exception ) 1343 { 1344 throw new OperatorException( this, "could not resolve declared fields in: " + this, exception ); 1345 } 1346 } 1347 1348 public Fields getJoinDeclaredFields() 1349 { 1350 Fields declaredFields = getDeclaredFields(); 1351 1352 if( !( joiner instanceof DeclaresResults ) ) 1353 return declaredFields; 1354 1355 if( declaredFields == null && ( (DeclaresResults) joiner ).getFieldDeclaration() != null ) 1356 declaredFields = ( (DeclaresResults) joiner ).getFieldDeclaration(); 1357 1358 return declaredFields; 1359 } 1360 1361 private List<Fields> getOrderedResolvedFields( Set<Scope> incomingScopes ) 1362 { 1363 Map<String, Scope> scopesMap = new HashMap<String, Scope>(); 1364 1365 for( Scope incomingScope : incomingScopes ) 1366 scopesMap.put( incomingScope.getName(), incomingScope ); 1367 1368 List<Fields> appendableFields = new ArrayList<Fields>(); 1369 1370 for( Pipe pipe : pipes ) 1371 appendableFields.add( scopesMap.get( pipe.getName() ).getIncomingSpliceFields() ); 1372 return appendableFields; 1373 } 1374 1375 @Override 1376 public boolean isEquivalentTo( FlowElement element ) 1377 { 1378 boolean equivalentTo = super.isEquivalentTo( element ); 1379 1380 if( !equivalentTo ) 1381 return equivalentTo; 1382 1383 Splice splice = (Splice) element; 1384 1385 if( !keyFieldsMap.equals( splice.keyFieldsMap ) ) 1386 return false; 1387 1388 if( !pipes.equals( splice.pipes ) ) 1389 return false; 1390 1391 return true; 1392 } 1393 1394 // OBJECT OVERRIDES 1395 1396 @Override 1397 @SuppressWarnings({"RedundantIfStatement"}) 1398 public boolean equals( Object object ) 1399 { 1400 if( this == object ) 1401 return true; 1402 if( object == null || getClass() != object.getClass() ) 1403 return false; 1404 if( !super.equals( object ) ) 1405 return false; 1406 1407 Splice splice = (Splice) object; 1408 1409 if( spliceName != null ? !spliceName.equals( splice.spliceName ) : splice.spliceName != null ) 1410 return false; 1411 if( keyFieldsMap != null ? !keyFieldsMap.equals( splice.keyFieldsMap ) : splice.keyFieldsMap != null ) 1412 return false; 1413 if( pipes != null ? !pipes.equals( splice.pipes ) : splice.pipes != null ) 1414 return false; 1415 1416 return true; 1417 } 1418 1419 @Override 1420 public int hashCode() 1421 { 1422 int result = super.hashCode(); 1423 result = 31 * result + ( pipes != null ? pipes.hashCode() : 0 ); 1424 result = 31 * result + ( keyFieldsMap != null ? keyFieldsMap.hashCode() : 0 ); 1425 result = 31 * result + ( spliceName != null ? spliceName.hashCode() : 0 ); 1426 return result; 1427 } 1428 1429 @Override 1430 public String toString() 1431 { 1432 StringBuilder buffer = new StringBuilder( super.toString() ); 1433 1434 buffer.append( "[by:" ); 1435 1436 for( String name : keyFieldsMap.keySet() ) 1437 { 1438 if( keyFieldsMap.size() > 1 ) 1439 buffer.append( " " ).append( name ).append( ":" ); 1440 1441 buffer.append( keyFieldsMap.get( name ).printVerbose() ); 1442 } 1443 1444 if( isSelfJoin() ) 1445 buffer.append( "[numSelfJoins:" ).append( numSelfJoins ).append( "]" ); 1446 1447 buffer.append( "]" ); 1448 1449 return buffer.toString(); 1450 } 1451 1452 @Override 1453 protected void printInternal( StringBuffer buffer, Scope scope ) 1454 { 1455 super.printInternal( buffer, scope ); 1456 Map<String, Fields> map = scope.getKeySelectors(); 1457 1458 if( map != null ) 1459 { 1460 buffer.append( "[by:" ); 1461 1462 // important to retain incoming pipe order 1463 for( Map.Entry<String, Fields> entry : keyFieldsMap.entrySet() ) 1464 { 1465 String name = entry.getKey(); 1466 1467 if( map.size() > 1 ) 1468 buffer.append( name ).append( ":" ); 1469 1470 Fields keys = map.get( name ); 1471 1472 // if keys null, this is likely an edge contracted map 1473 if( keys == null ) 1474 buffer.append( "<unavailable>" ); 1475 else 1476 buffer.append( keys.print() ); // get resolved keys 1477 } 1478 1479 if( isSelfJoin() ) 1480 buffer.append( "[numSelfJoins:" ).append( numSelfJoins ).append( "]" ); 1481 1482 buffer.append( "]" ); 1483 } 1484 } 1485 }