001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading; 022 023import java.io.Serializable; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.Comparator; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032 033import cascading.flow.Flow; 034import cascading.flow.FlowDef; 035import cascading.flow.FlowStep; 036import cascading.flow.planner.graph.ElementGraph; 037import cascading.operation.Aggregator; 038import cascading.operation.Function; 039import cascading.operation.Identity; 040import cascading.operation.aggregator.Count; 041import cascading.operation.aggregator.First; 042import cascading.operation.expression.ExpressionFunction; 043import cascading.operation.regex.RegexFilter; 044import cascading.operation.regex.RegexSplitter; 045import cascading.pipe.Checkpoint; 046import cascading.pipe.CoGroup; 047import cascading.pipe.Each; 048import cascading.pipe.Every; 049import cascading.pipe.GroupBy; 050import cascading.pipe.HashJoin; 051import cascading.pipe.Merge; 052import cascading.pipe.Pipe; 053import cascading.pipe.joiner.InnerJoin; 054import cascading.pipe.joiner.Joiner; 055import cascading.pipe.joiner.LeftJoin; 056import cascading.pipe.joiner.MixedJoin; 057import cascading.pipe.joiner.OuterJoin; 058import cascading.pipe.joiner.RightJoin; 059import cascading.tap.SinkMode; 060import cascading.tap.Tap; 061import cascading.tuple.Fields; 062import cascading.tuple.Hasher; 063import cascading.tuple.Tuple; 064import org.junit.Test; 065 066import static data.InputData.*; 067 068public class JoinFieldedPipesPlatformTest extends PlatformTestCase 069 { 070 public JoinFieldedPipesPlatformTest() 071 { 072 super( true, 4, 1 ); // leave cluster testing enabled 073 } 074 075 @Test 076 public void testCross() throws Exception 077 { 078 getPlatform().copyFromLocal( inputFileLhs ); 079 getPlatform().copyFromLocal( inputFileRhs ); 080 081 Map sources = new HashMap(); 082 083 sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) ); 084 sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) ); 085 086 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE ); 087 088 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 089 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 090 091 Pipe cross = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 092 093 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross ); 094 095 flow.complete(); 096 097 validateLength( flow, 37, null ); 098 099 List<Tuple> values = getSinkAsList( flow ); 100 101 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 102 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 103 } 104 105 @Test 106 public void testJoin() throws Exception 107 { 108 getPlatform().copyFromLocal( inputFileLower ); 109 getPlatform().copyFromLocal( inputFileUpper ); 110 111 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 112 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 113 114 Map sources = new HashMap(); 115 116 sources.put( "lower", sourceLower ); 117 sources.put( "upper", sourceUpper ); 118 119 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE ); 120 121 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 122 123 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 124 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 125 126 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 127 128 Map<Object, Object> properties = getProperties(); 129 130 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 131 132 flow.complete(); 133 134 validateLength( flow, 5 ); 135 136 List<Tuple> values = getSinkAsList( flow ); 137 138 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 139 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 140 } 141 142 @Test 143 public void testJoinSamePipeName() throws Exception 144 { 145 getPlatform().copyFromLocal( inputFileLower ); 146 getPlatform().copyFromLocal( inputFileUpper ); 147 148 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 149 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 150 151 Map sources = new HashMap(); 152 153 sources.put( "lower", sourceLower ); 154 sources.put( "upper", sourceUpper ); 155 156 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE ); 157 158 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 159 160 Pipe pipeLower = new Pipe( "lower" ); 161 Pipe pipeUpper = new Pipe( "upper" ); 162 163 // these pipes will hide the source name, and could cause one to be lost 164 pipeLower = new Pipe( "same", pipeLower ); 165 pipeUpper = new Pipe( "same", pipeUpper ); 166 167 pipeLower = new Each( pipeLower, new Fields( "line" ), splitter ); 168 pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter ); 169 170// pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 171// pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 172 173 pipeLower = new Pipe( "left", pipeLower ); 174 pipeUpper = new Pipe( "right", pipeUpper ); 175 176// pipeLower = new Each( pipeLower, new Debug( true ) ); 177// pipeUpper = new Each( pipeUpper, new Debug( true ) ); 178 179 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 180 181// splice = new Each( splice, new Debug( true ) ); 182 splice = new Pipe( "splice", splice ); 183 splice = new Pipe( "tail", splice ); 184 185 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 186 187 flow.complete(); 188 189 validateLength( flow, 5 ); 190 191 List<Tuple> values = getSinkAsList( flow ); 192 193 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 194 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 195 } 196 197 @Test 198 public void testJoinWithUnknowns() throws Exception 199 { 200 getPlatform().copyFromLocal( inputFileLower ); 201 getPlatform().copyFromLocal( inputFileUpper ); 202 203 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 204 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 205 206 Map sources = new HashMap(); 207 208 sources.put( "lower", sourceLower ); 209 sources.put( "upper", sourceUpper ); 210 211 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE ); 212 213 Function splitter = new RegexSplitter( Fields.UNKNOWN, " " ); 214 215 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 216 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 217 218 Pipe splice = new HashJoin( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) ); 219 220 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 221 222 flow.complete(); 223 224 validateLength( flow, 5 ); 225 226 List<Tuple> values = getSinkAsList( flow ); 227 228 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 229 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 230 } 231 232 /** 233 * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with 234 * a new stream using an outerjoin. 235 * 236 * @throws Exception 237 */ 238 @Test 239 public void testJoinFilteredBranch() throws Exception 240 { 241 getPlatform().copyFromLocal( inputFileLower ); 242 getPlatform().copyFromLocal( inputFileUpper ); 243 244 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 245 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 246 247 Map sources = new HashMap(); 248 249 sources.put( "lower", sourceLower ); 250 sources.put( "upper", sourceUpper ); 251 252 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinfilteredbranch" ), SinkMode.REPLACE ); 253 254 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 255 256 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 257 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 258 pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all 259 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 260 261 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() ); 262 263 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 264 265 flow.complete(); 266 267 validateLength( flow, 5 ); 268 269 List<Tuple> values = getSinkAsList( flow ); 270 271 assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) ); 272 assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) ); 273 } 274 275 @Test 276 public void testJoinSelf() throws Exception 277 { 278 getPlatform().copyFromLocal( inputFileLower ); 279 280 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 281 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 282 283 Map sources = new HashMap(); 284 285 sources.put( "lower", sourceLower ); 286 sources.put( "upper", sourceUpper ); 287 288 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinself" ), SinkMode.REPLACE ); 289 290 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 291 292 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 293 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 294 295 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 296 297 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 298 299 flow.complete(); 300 301 validateLength( flow, 5 ); 302 303 List<Tuple> values = getSinkAsList( flow ); 304 305 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 306 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 307 } 308 309 /** 310 * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join 311 * 312 * @throws Exception when 313 */ 314 @Test 315 public void testJoinAfterEvery() throws Exception 316 { 317 getPlatform().copyFromLocal( inputFileLower ); 318 getPlatform().copyFromLocal( inputFileUpper ); 319 320 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 321 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 322 323 Map sources = new HashMap(); 324 325 sources.put( "lower", sourceLower ); 326 sources.put( "upper", sourceUpper ); 327 328 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE ); 329 330 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 331 332 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 333 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) ); 334 pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL ); 335 336 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 337 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 338 pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL ); 339 340 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 341 342 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 343 344 flow.complete(); 345 346 validateLength( flow, 5, null ); 347 348 List<Tuple> values = getSinkAsList( flow ); 349 350 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 351 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 352 } 353 354 @Test 355 public void testJoinInnerSingleField() throws Exception 356 { 357 getPlatform().copyFromLocal( inputFileLowerOffset ); 358 getPlatform().copyFromLocal( inputFileUpper ); 359 360 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 361 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 362 363 Map sources = new HashMap(); 364 365 sources.put( "lower", sourceLower ); 366 sources.put( "upper", sourceUpper ); 367 368 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joininnersingle" ), SinkMode.REPLACE ); 369 370 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) ); 371 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) ); 372 373 Pipe join = new HashJoin( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) ); 374 375 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 376 377 flow.complete(); 378 379 validateLength( flow, 3, null ); 380 381 Set<Tuple> results = new HashSet<Tuple>(); 382 383 results.add( new Tuple( "1\t1" ) ); 384 results.add( new Tuple( "5\t5" ) ); 385 386 List<Tuple> actual = getSinkAsList( flow ); 387 388 results.removeAll( actual ); 389 390 assertEquals( 0, results.size() ); 391 } 392 393 /** 394 * 1 a1 395 * 1 a2 396 * 1 a3 397 * 2 b1 398 * 3 c1 399 * 4 d1 400 * 4 d2 401 * 4 d3 402 * 5 e1 403 * 5 e2 404 * 5 e3 405 * 7 g1 406 * 7 g2 407 * 7 g3 408 * 7 g4 409 * 7 g5 410 * null h1 411 * <p/> 412 * 1 A1 413 * 1 A2 414 * 1 A3 415 * 2 B1 416 * 2 B2 417 * 2 B3 418 * 4 D1 419 * 6 F1 420 * 6 F2 421 * null H1 422 * <p/> 423 * 1 a1 1 A1 424 * 1 a1 1 A2 425 * 1 a1 1 A3 426 * 1 a2 1 A1 427 * 1 a2 1 A2 428 * 1 a2 1 A3 429 * 1 a3 1 A1 430 * 1 a3 1 A2 431 * 1 a3 1 A3 432 * 2 b1 2 B1 433 * 2 b1 2 B2 434 * 2 b1 2 B3 435 * 4 d1 4 D1 436 * 4 d2 4 D1 437 * 4 d3 4 D1 438 * null h1 null H1 439 * 440 * @throws Exception 441 */ 442 @Test 443 public void testJoinInner() throws Exception 444 { 445 HashSet<Tuple> results = new HashSet<Tuple>(); 446 447 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 448 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 449 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 450 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 451 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 452 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 453 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 454 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 455 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 456 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 457 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 458 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 459 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 460 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 461 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 462 results.add( new Tuple( null, "h1", null, "H1" ) ); 463 464 handleJoins( "joininner", new InnerJoin(), results ); 465 } 466 467 /** 468 * /** 469 * 1 a1 470 * 1 a2 471 * 1 a3 472 * 2 b1 473 * 3 c1 474 * 4 d1 475 * 4 d2 476 * 4 d3 477 * 5 e1 478 * 5 e2 479 * 5 e3 480 * 7 g1 481 * 7 g2 482 * 7 g3 483 * 7 g4 484 * 7 g5 485 * null h1 486 * <p/> 487 * 1 A1 488 * 1 A2 489 * 1 A3 490 * 2 B1 491 * 2 B2 492 * 2 B3 493 * 4 D1 494 * 6 F1 495 * 6 F2 496 * null H1 497 * <p/> 498 * 1 a1 1 A1 499 * 1 a1 1 A2 500 * 1 a1 1 A3 501 * 1 a2 1 A1 502 * 1 a2 1 A2 503 * 1 a2 1 A3 504 * 1 a3 1 A1 505 * 1 a3 1 A2 506 * 1 a3 1 A3 507 * 2 b1 2 B1 508 * 2 b1 2 B2 509 * 2 b1 2 B3 510 * 3 c1 null null 511 * 4 d1 4 D1 512 * 4 d2 4 D1 513 * 4 d3 4 D1 514 * 5 e1 null null 515 * 5 e2 null null 516 * 5 e3 null null 517 * null null 6 F1 518 * null null 6 F2 519 * 7 g1 null null 520 * 7 g2 null null 521 * 7 g3 null null 522 * 7 g4 null null 523 * 7 g5 null null 524 * null h1 null H1 525 * 526 * @throws Exception 527 */ 528 @Test 529 public void testJoinOuter() throws Exception 530 { 531 // skip if hadoop cluster mode, outer joins don't behave the same 532 if( getPlatform().isMapReduce() && getPlatform().isUseCluster() ) 533 return; 534 535 Set<Tuple> results = new HashSet<Tuple>(); 536 537 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 538 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 539 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 540 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 541 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 542 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 543 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 544 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 545 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 546 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 547 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 548 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 549 results.add( new Tuple( "3", "c1", null, null ) ); 550 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 551 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 552 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 553 results.add( new Tuple( "5", "e1", null, null ) ); 554 results.add( new Tuple( "5", "e2", null, null ) ); 555 results.add( new Tuple( "5", "e3", null, null ) ); 556 results.add( new Tuple( null, null, "6", "F1" ) ); 557 results.add( new Tuple( null, null, "6", "F2" ) ); 558 results.add( new Tuple( "7", "g1", null, null ) ); 559 results.add( new Tuple( "7", "g2", null, null ) ); 560 results.add( new Tuple( "7", "g3", null, null ) ); 561 results.add( new Tuple( "7", "g4", null, null ) ); 562 results.add( new Tuple( "7", "g5", null, null ) ); 563 results.add( new Tuple( null, "h1", null, "H1" ) ); 564 565 handleJoins( "joinouter", new OuterJoin(), results ); 566 } 567 568 /** 569 * 1 a1 570 * 1 a2 571 * 1 a3 572 * 2 b1 573 * 3 c1 574 * 4 d1 575 * 4 d2 576 * 4 d3 577 * 5 e1 578 * 5 e2 579 * 5 e3 580 * 7 g1 581 * 7 g2 582 * 7 g3 583 * 7 g4 584 * 7 g5 585 * null h1 586 * <p/> 587 * 1 A1 588 * 1 A2 589 * 1 A3 590 * 2 B1 591 * 2 B2 592 * 2 B3 593 * 4 D1 594 * 6 F1 595 * 6 F2 596 * null H1 597 * <p/> 598 * 1 a1 1 A1 599 * 1 a1 1 A2 600 * 1 a1 1 A3 601 * 1 a2 1 A1 602 * 1 a2 1 A2 603 * 1 a2 1 A3 604 * 1 a3 1 A1 605 * 1 a3 1 A2 606 * 1 a3 1 A3 607 * 2 b1 2 B1 608 * 2 b1 2 B2 609 * 2 b1 2 B3 610 * 3 c1 null null 611 * 4 d1 4 D1 612 * 4 d2 4 D1 613 * 4 d3 4 D1 614 * 5 e1 null null 615 * 5 e2 null null 616 * 5 e3 null null 617 * 7 g1 null null 618 * 7 g2 null null 619 * 7 g3 null null 620 * 7 g4 null null 621 * 7 g5 null null 622 * null h1 null H1 623 * 624 * @throws Exception 625 */ 626 @Test 627 public void testJoinInnerOuter() throws Exception 628 { 629 Set<Tuple> results = new HashSet<Tuple>(); 630 631 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 632 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 633 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 634 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 635 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 636 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 637 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 638 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 639 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 640 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 641 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 642 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 643 results.add( new Tuple( "3", "c1", null, null ) ); 644 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 645 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 646 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 647 results.add( new Tuple( "5", "e1", null, null ) ); 648 results.add( new Tuple( "5", "e2", null, null ) ); 649 results.add( new Tuple( "5", "e3", null, null ) ); 650 results.add( new Tuple( "7", "g1", null, null ) ); 651 results.add( new Tuple( "7", "g2", null, null ) ); 652 results.add( new Tuple( "7", "g3", null, null ) ); 653 results.add( new Tuple( "7", "g4", null, null ) ); 654 results.add( new Tuple( "7", "g5", null, null ) ); 655 results.add( new Tuple( null, "h1", null, "H1" ) ); 656 657 handleJoins( "joininnerouter", new LeftJoin(), results ); 658 } 659 660 /** 661 * 1 a1 662 * 1 a2 663 * 1 a3 664 * 2 b1 665 * 3 c1 666 * 4 d1 667 * 4 d2 668 * 4 d3 669 * 5 e1 670 * 5 e2 671 * 5 e3 672 * 7 g1 673 * 7 g2 674 * 7 g3 675 * 7 g4 676 * 7 g5 677 * null h1 678 * <p/> 679 * 1 A1 680 * 1 A2 681 * 1 A3 682 * 2 B1 683 * 2 B2 684 * 2 B3 685 * 4 D1 686 * 6 F1 687 * 6 F2 688 * null H1 689 * <p/> 690 * 1 a1 1 A1 691 * 1 a1 1 A2 692 * 1 a1 1 A3 693 * 1 a2 1 A1 694 * 1 a2 1 A2 695 * 1 a2 1 A3 696 * 1 a3 1 A1 697 * 1 a3 1 A2 698 * 1 a3 1 A3 699 * 2 b1 2 B1 700 * 2 b1 2 B2 701 * 2 b1 2 B3 702 * 4 d1 4 D1 703 * 4 d2 4 D1 704 * 4 d3 4 D1 705 * null null 6 F1 706 * null null 6 F2 707 * null h1 null H1 708 * 709 * @throws Exception 710 */ 711 @Test 712 public void testJoinOuterInner() throws Exception 713 { 714 // skip if hadoop cluster mode, outer joins don't behave the same 715 if( getPlatform().isMapReduce() && getPlatform().isUseCluster() ) 716 return; 717 718 Set<Tuple> results = new HashSet<Tuple>(); 719 720 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 721 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 722 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 723 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 724 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 725 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 726 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 727 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 728 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 729 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 730 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 731 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 732 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 733 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 734 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 735 results.add( new Tuple( null, null, "6", "F1" ) ); 736 results.add( new Tuple( null, null, "6", "F2" ) ); 737 results.add( new Tuple( null, "h1", null, "H1" ) ); 738 739 handleJoins( "joinouterinner", new RightJoin(), results ); 740 } 741 742 private void handleJoins( String path, Joiner joiner, Set<Tuple> results ) throws Exception 743 { 744 getPlatform().copyFromLocal( inputFileLhsSparse ); 745 getPlatform().copyFromLocal( inputFileRhsSparse ); 746 747 Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class ); 748 Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse ); 749 Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse ); 750 751 Map sources = new HashMap(); 752 753 sources.put( "lower", sourceLower ); 754 sources.put( "upper", sourceUpper ); 755 756 Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE ); 757 758 Pipe pipeLower = new Pipe( "lower" ); 759 Pipe pipeUpper = new Pipe( "upper" ); 760 761 Fields declaredFields = new Fields( "num", "char", "num2", "char2" ); 762 Fields groupingFields = new Fields( "num" ); 763 764 Pipe splice = new HashJoin( pipeLower, groupingFields, pipeUpper, groupingFields, declaredFields, joiner ); 765 766 splice = new Each( splice, Fields.ALL, new Identity(), Fields.RESULTS ); 767 768 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 769 770 flow.complete(); 771 772 validateLength( flow, results.size() ); 773 774 List<Tuple> actual = getSinkAsList( flow ); 775 776 results.removeAll( actual ); 777 778 assertEquals( 0, results.size() ); 779 } 780 781 /** 782 * 1 a 783 * 5 b 784 * 6 c 785 * 5 b 786 * 5 e 787 * <p/> 788 * 1 A 789 * 2 B 790 * 3 C 791 * 4 D 792 * 5 E 793 * <p/> 794 * 1 a 795 * 2 b 796 * 3 c 797 * 4 d 798 * 5 e 799 * <p/> 800 * 1 a 1 A 1 a 801 * - - 2 B 2 b 802 * - - 3 C 3 c 803 * - - 4 D 4 d 804 * 5 b 5 E 5 e 805 * 5 e 5 E 5 e 806 * 807 * @throws Exception 808 */ 809 @Test 810 public void testJoinMixed() throws Exception 811 { 812 // skip if hadoop cluster mode, outer joins don't behave the same 813 if( getPlatform().isMapReduce() && getPlatform().isUseCluster() ) 814 return; 815 816 getPlatform().copyFromLocal( inputFileLowerOffset ); 817 getPlatform().copyFromLocal( inputFileLower ); 818 getPlatform().copyFromLocal( inputFileUpper ); 819 820 Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 821 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 822 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 823 824 Map sources = new HashMap(); 825 826 sources.put( "loweroffset", sourceLowerOffset ); 827 sources.put( "lower", sourceLower ); 828 sources.put( "upper", sourceUpper ); 829 830 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinmixed" ), SinkMode.REPLACE ); 831 832 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 833 834 Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter ); 835 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 836 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 837 838 Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower ); 839 Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) ); 840 841 MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} ); 842 Pipe splice = new HashJoin( pipes, fields, Fields.size( 6 ), join ); 843 844 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 845 846 flow.complete(); 847 848 validateLength( flow, 6 ); 849 850 Set<Tuple> results = new HashSet<Tuple>(); 851 852 results.add( new Tuple( "1\ta\t1\tA\t1\ta" ) ); 853 results.add( new Tuple( "null\tnull\t2\tB\t2\tb" ) ); 854 results.add( new Tuple( "null\tnull\t3\tC\t3\tc" ) ); 855 results.add( new Tuple( "null\tnull\t4\tD\t4\td" ) ); 856 results.add( new Tuple( "5\tb\t5\tE\t5\te" ) ); 857 results.add( new Tuple( "5\te\t5\tE\t5\te" ) ); 858 859 List<Tuple> actual = getSinkAsList( flow ); 860 861 results.removeAll( actual ); 862 863 assertEquals( 0, results.size() ); 864 } 865 866 @Test 867 public void testJoinDiffFields() throws Exception 868 { 869 getPlatform().copyFromLocal( inputFileLower ); 870 getPlatform().copyFromLocal( inputFileUpper ); 871 872 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 873 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 874 875 Map sources = new HashMap(); 876 877 sources.put( "lower", sourceLower ); 878 sources.put( "upper", sourceUpper ); 879 880 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE ); 881 882 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 883 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 884 885 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 886 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 887 888 Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 889 890 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe ); 891 892 flow.complete(); 893 894 validateLength( flow, 5 ); 895 896 List<Tuple> actual = getSinkAsList( flow ); 897 898 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 899 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 900 } 901 902 @Test 903 public void testJoinGroupBy() throws Exception 904 { 905 getPlatform().copyFromLocal( inputFileLower ); 906 getPlatform().copyFromLocal( inputFileUpper ); 907 908 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 909 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 910 911 Map sources = new HashMap(); 912 913 sources.put( "lower", sourceLower ); 914 sources.put( "upper", sourceUpper ); 915 916 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupby" ), SinkMode.REPLACE ); 917 918 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 919 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 920 921 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 922 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 923 924 Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 925 926 Pipe groupby = new GroupBy( pipe, new Fields( "numA" ) ); 927 928 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, groupby ); 929 930 flow.complete(); 931 932 validateLength( flow, 5, null ); 933 934 List<Tuple> actual = getSinkAsList( flow ); 935 936 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 937 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 938 } 939 940 @Test 941 public void testJoinSamePipe() throws Exception 942 { 943 getPlatform().copyFromLocal( inputFileLower ); 944 945 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 946 947 Map sources = new HashMap(); 948 949 sources.put( "lower", source ); 950 951 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE ); 952 953 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 954 955 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 956 957 Pipe pipe = new HashJoin( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) ); 958 959 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe ); 960 961 flow.complete(); 962 963 validateLength( flow, 5, null ); 964 965 List<Tuple> actual = getSinkAsList( flow ); 966 967 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 968 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 969 } 970 971 @Test 972 public void testJoinSamePipe2() throws Exception 973 { 974 getPlatform().copyFromLocal( inputFileLower ); 975 976 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 977 978 Map sources = new HashMap(); 979 980 sources.put( "lower", source ); 981 982 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE ); 983 984 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 985 986 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 987 988 Pipe join = new HashJoin( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 989 990 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 991 992 flow.complete(); 993 994 validateLength( flow, 5, null ); 995 996 List<Tuple> actual = getSinkAsList( flow ); 997 998 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 999 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1000 } 1001 1002 @Test 1003 public void testJoinSamePipe3() throws Exception 1004 { 1005 getPlatform().copyFromLocal( inputFileLower ); 1006 1007 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower ); 1008 1009 Map sources = new HashMap(); 1010 1011 sources.put( "lower", source ); 1012 1013 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE ); 1014 1015 Pipe pipe = new Pipe( "lower" ); 1016 1017 Pipe lhs = new Pipe( "lhs", pipe ); 1018 Pipe rhs = new Pipe( "rhs", pipe ); 1019 1020 Pipe join = new HashJoin( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1021 1022 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 1023 1024 flow.complete(); 1025 1026 validateLength( flow, 5, null ); 1027 1028 List<Tuple> actual = getSinkAsList( flow ); 1029 1030 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1031 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1032 } 1033 1034 /** 1035 * Same source as rightmost 1036 * <p/> 1037 * should be a single job as the same file accumulates into the joins 1038 * 1039 * @throws Exception 1040 */ 1041 @Test 1042 public void testJoinAroundJoinRightMost() throws Exception 1043 { 1044 getPlatform().copyFromLocal( inputFileLower ); 1045 getPlatform().copyFromLocal( inputFileUpper ); 1046 1047 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1048 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1049 1050 Map sources = new HashMap(); 1051 1052 sources.put( "lower", sourceLower ); 1053 sources.put( "upper1", sourceUpper ); 1054 sources.put( "upper2", sourceUpper ); 1055 1056 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinrightmost" ), SinkMode.REPLACE ); 1057 1058 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1059 1060 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1061 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1062 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1063 1064 Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1065 1066 splice1 = new Each( splice1, new Identity() ); 1067 1068 Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1069 1070 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1071 1072// flow.writeDOT( "joinaroundrightmost.dot" ); 1073 1074 if( getPlatform().isMapReduce() ) 1075 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1076 1077 flow.complete(); 1078 1079 validateLength( flow, 5, null ); 1080 1081 List<Tuple> actual = getSinkAsList( flow ); 1082 1083 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 1084 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) ); 1085 } 1086 1087 /** 1088 * Same source as leftmost 1089 * 1090 * @throws Exception 1091 */ 1092 @Test 1093 public void testJoinAroundJoinLeftMost() throws Exception 1094 { 1095 getPlatform().copyFromLocal( inputFileLower ); 1096 getPlatform().copyFromLocal( inputFileUpper ); 1097 1098 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1099 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1100 1101 Map sources = new HashMap(); 1102 1103 sources.put( "lower", sourceLower ); 1104 sources.put( "upper1", sourceUpper ); 1105 sources.put( "upper2", sourceUpper ); 1106 1107 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinleftmost" ), SinkMode.REPLACE ); 1108 1109 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1110 1111 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1112 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1113 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1114 1115 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1116 1117 splice1 = new Each( splice1, new Identity() ); 1118 1119 Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1120 1121 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1122 1123// flow.writeDOT( "joinaroundleftmost.dot" ); 1124 1125 if( getPlatform().isMapReduce() ) 1126 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1127 1128 flow.complete(); 1129 1130 validateLength( flow, 5, null ); 1131 1132 List<Tuple> actual = getSinkAsList( flow ); 1133 1134 assertTrue( actual.contains( new Tuple( "1\tA\t1\tA\t1\ta" ) ) ); 1135 assertTrue( actual.contains( new Tuple( "2\tB\t2\tB\t2\tb" ) ) ); 1136 } 1137 1138 /** 1139 * Upper as leftmost and rightmost forcing two jobs 1140 * 1141 * @throws Exception 1142 */ 1143 @Test 1144 public void testJoinAroundJoinRightMostSwapped() throws Exception 1145 { 1146 getPlatform().copyFromLocal( inputFileLower ); 1147 getPlatform().copyFromLocal( inputFileUpper ); 1148 1149 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1150 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1151 1152 Map sources = new HashMap(); 1153 1154 sources.put( "lower", sourceLower ); 1155 sources.put( "upper1", sourceUpper ); 1156 sources.put( "upper2", sourceUpper ); 1157 1158 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinswapped" ), SinkMode.REPLACE ); 1159 1160 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1161 1162 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1163 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1164 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1165 1166 Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1167 1168 splice1 = new Each( splice1, new Identity() ); 1169 1170 // upper2 becomes leftmost, forcing a tap between the joins 1171 Pipe splice2 = new HashJoin( pipeUpper2, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1172 1173 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1174 1175 if( getPlatform().isMapReduce() ) 1176 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1177 1178 flow.complete(); 1179 1180 validateLength( flow, 5, null ); 1181 1182 List<Tuple> actual = getSinkAsList( flow ); 1183 1184 assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\tA" ) ) ); 1185 assertTrue( actual.contains( new Tuple( "2\tB\t2\tb\t2\tB" ) ) ); 1186 } 1187 1188 @Test 1189 public void testJoinGroupByJoin() throws Exception 1190 { 1191 getPlatform().copyFromLocal( inputFileLower ); 1192 getPlatform().copyFromLocal( inputFileUpper ); 1193 getPlatform().copyFromLocal( inputFileJoined ); 1194 1195 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1196 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1197 Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined ); 1198 1199 Map sources = new HashMap(); 1200 1201 sources.put( "lower", sourceLower ); 1202 sources.put( "upper", sourceUpper ); 1203 sources.put( "joined", sourceJoined ); 1204 1205 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupbyjoin" ), SinkMode.REPLACE ); 1206 1207 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 1208 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 1209 Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" ); 1210 1211 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1212 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1213 Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined ); 1214 1215 Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1216 1217 pipe = new GroupBy( pipe, new Fields( "numA" ) ); 1218 1219 pipe = new HashJoin( pipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) ); 1220 1221 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe ); 1222 1223 if( getPlatform().isMapReduce() ) 1224 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1225 1226 flow.complete(); 1227 1228 validateLength( flow, 5, null ); 1229 1230 List<Tuple> actual = getSinkAsList( flow ); 1231 1232 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\tA" ) ) ); 1233 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tb\tB" ) ) ); 1234 } 1235 1236 /** 1237 * here the same file is fed into the same HashJoin. 1238 * <p/> 1239 * This is three jobs. 1240 * <p/> 1241 * a temp tap is inserted before the accumulated branch for two reasons on the common HashJoin 1242 * <p/> 1243 * it is assumed the accumulated side is filtered down, so pushing to disk will preserve io 1244 * if accumulated side was streamed instead via a fork, only part of the file will accumulate into the HashJoin 1245 * <p/> 1246 * /-T-\ <-- accumulated 1247 * T HJ 1248 * \---/ <-- streamed 1249 * 1250 * @throws Exception 1251 */ 1252 @Test 1253 public void testJoinSameSourceIntoJoin() throws Exception 1254 { 1255 getPlatform().copyFromLocal( inputFileLower ); 1256 getPlatform().copyFromLocal( inputFileUpper ); 1257 1258 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1259 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1260 1261 Map sources = new HashMap(); 1262 1263 sources.put( "lower", sourceLower ); 1264 sources.put( "upper1", sourceUpper ); 1265 sources.put( "upper2", sourceUpper ); 1266 1267 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoin" ), SinkMode.REPLACE ); 1268 1269 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1270 1271 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1272 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1273 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1274 1275 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1276 1277 splice1 = new Each( splice1, new Identity() ); 1278 1279 Pipe splice2 = new HashJoin( pipeLower, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1280 1281 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1282 1283// flow.writeDOT( "joinsamesourceintojoin.dot" ); 1284 1285 if( getPlatform().isMapReduce() ) 1286 assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() ); 1287 1288 flow.complete(); 1289 1290 validateLength( flow, 5, null ); 1291 1292 List<Tuple> actual = getSinkAsList( flow ); 1293 1294 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 1295 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) ); 1296 } 1297 1298 @Test 1299 public void testJoinSameSourceIntoJoinSimple() throws Exception 1300 { 1301 getPlatform().copyFromLocal( inputFileUpper ); 1302 1303 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1304 1305 Map sources = new HashMap(); 1306 1307 sources.put( "upper1", sourceUpper ); 1308 sources.put( "upper2", sourceUpper ); 1309 1310 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoinsimple" ), SinkMode.REPLACE ); 1311 1312 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1313 1314 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1315 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1316 1317 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1318 1319 splice1 = new Each( splice1, new Identity() ); 1320 1321 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice1 ); 1322 1323// flow.writeDOT( "joinsamesourceintojoin.dot" ); 1324 1325 if( getPlatform().isMapReduce() ) 1326 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1327 1328 flow.complete(); 1329 1330 validateLength( flow, 5, null ); 1331 1332 List<Tuple> actual = getSinkAsList( flow ); 1333 1334 assertTrue( actual.contains( new Tuple( "1\tA\t1\tA" ) ) ); 1335 assertTrue( actual.contains( new Tuple( "2\tB\t2\tB" ) ) ); 1336 } 1337 1338 /** 1339 * Loosely tests for a deadlock when BlockingHashJoinAnnotator rule doesn't excluce the GroupBy from the blocking 1340 * annotation. 1341 * <p/> 1342 * the deadlock is random on the order of the paths traversed from the Source Tap + fork. 1343 * 1344 * @throws Exception 1345 */ 1346 @Test 1347 public void testJoinSameSourceOverGroupByIntoJoinSimple() throws Exception 1348 { 1349 getPlatform().copyFromLocal( inputFileLower ); 1350 getPlatform().copyFromLocal( inputFileUpper ); 1351 1352 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1353 1354 Map sources = new HashMap(); 1355 1356 sources.put( "upper1", sourceUpper ); 1357 sources.put( "upper2", sourceUpper ); 1358 1359 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceovergroupbyintojoinsimple" ), SinkMode.REPLACE ); 1360 1361 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1362 1363 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1364 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1365 1366 pipeUpper1 = new GroupBy( pipeUpper1, new Fields( "num" ) ); 1367 pipeUpper2 = new GroupBy( pipeUpper2, new Fields( "num" ) ); 1368 1369 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1370 1371 splice1 = new Each( splice1, new Identity() ); 1372 1373 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice1 ); 1374 1375 if( getPlatform().isMapReduce() ) 1376 assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() ); 1377 1378 flow.complete(); 1379 1380 validateLength( flow, 5, null ); 1381 1382 List<Tuple> actual = getSinkAsList( flow ); 1383 1384 assertTrue( actual.contains( new Tuple( "1\tA\t1\tA" ) ) ); 1385 assertTrue( actual.contains( new Tuple( "2\tB\t2\tB" ) ) ); 1386 } 1387 1388 /** 1389 * Tests that two independent streamed sources with loadable tributaries properly plan into a GroupBy 1390 * without loading unused sources 1391 * 1392 * @throws Exception 1393 */ 1394 @Test 1395 public void testJoinsIntoGroupBy() throws Exception 1396 { 1397 getPlatform().copyFromLocal( inputFileLower ); 1398 getPlatform().copyFromLocal( inputFileUpper ); 1399 1400 getPlatform().copyFromLocal( inputFileLhs ); 1401 getPlatform().copyFromLocal( inputFileRhs ); 1402 1403 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1404 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1405 1406 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1407 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1408 1409 Map sources = new HashMap(); 1410 1411 sources.put( "lower", sourceLower ); 1412 sources.put( "upper", sourceUpper ); 1413 sources.put( "lhs", sourceLhs ); 1414 sources.put( "rhs", sourceRhs ); 1415 1416 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintogroupby" ), SinkMode.REPLACE ); 1417 1418 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1419 1420 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1421 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1422 1423 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1424 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1425 1426 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1427 1428 upperLower = new Each( upperLower, new Identity() ); 1429 1430 Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1431 1432 lhsRhs = new Each( lhsRhs, new Identity() ); 1433 1434 Pipe grouped = new GroupBy( "merging", Pipe.pipes( upperLower, lhsRhs ), new Fields( "num1" ) ); 1435 1436 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1437 1438 if( getPlatform().isMapReduce() ) 1439 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1440 1441 flow.complete(); 1442 1443 validateLength( flow, 42, null ); 1444 1445 List<Tuple> actual = getSinkAsList( flow ); 1446 1447 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1448 assertTrue( actual.contains( new Tuple( "5\te\t5\tE" ) ) ); 1449 } 1450 1451 @Test 1452 public void testJoinSamePipeAroundGroupBy() throws Exception 1453 { 1454 getPlatform().copyFromLocal( inputFileLower ); 1455 1456 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1457 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipearoundgroupby" ), SinkMode.REPLACE ); 1458 1459 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1460 1461 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1462 1463 Pipe lhsPipe = new Each( new Pipe( "lhs", pipeLower ), new Identity() ); 1464 1465 Pipe rhsPipe = new Each( new Pipe( "rhs", pipeLower ), new Identity() ); 1466 1467 rhsPipe = new GroupBy( rhsPipe, new Fields( "num" ) ); 1468 1469 rhsPipe = new Each( rhsPipe, new Identity() ); 1470 1471 Pipe pipe = new HashJoin( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1472 1473 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 1474 1475 flow.complete(); 1476 1477 validateLength( flow, 5, null ); 1478 1479 List<Tuple> actual = getSinkAsList( flow ); 1480 1481 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1482 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1483 } 1484 1485 /** 1486 * This test results in two MR jobs because one join feeds into the accumulated side of the second. A mapper 1487 * can only stream on branch at a time forcing a temp file between the mappers. see next test for swapped join 1488 * 1489 * @throws Exception 1490 */ 1491 @Test 1492 public void testJoinsIntoCoGroupLhs() throws Exception 1493 { 1494 getPlatform().copyFromLocal( inputFileLower ); 1495 getPlatform().copyFromLocal( inputFileUpper ); 1496 1497 getPlatform().copyFromLocal( inputFileLhs ); 1498 getPlatform().copyFromLocal( inputFileRhs ); 1499 1500 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1501 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1502 1503 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1504 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1505 1506 Map sources = new HashMap(); 1507 1508 sources.put( "lower", sourceLower ); 1509 sources.put( "upper", sourceUpper ); 1510 sources.put( "lhs", sourceLhs ); 1511 sources.put( "rhs", sourceRhs ); 1512 1513 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhs" ), SinkMode.REPLACE ); 1514 1515 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1516 1517 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1518 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1519 1520 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1521 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1522 1523 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1524 1525 upperLower = new Each( upperLower, new Identity() ); 1526 1527 Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1528 1529 lhsUpperLower = new Each( lhsUpperLower, new Identity() ); 1530 1531 Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) ); 1532 1533 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1534 1535 if( getPlatform().isMapReduce() ) 1536 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1537 1538 flow.complete(); 1539 1540 validateLength( flow, 37, null ); 1541 1542 List<Tuple> actual = getSinkAsList( flow ); 1543 1544 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta\t1\tA\t1\tA" ) ) ); 1545 assertTrue( actual.contains( new Tuple( "5\ta\t5\te\t5\tE\t5\tA" ) ) ); 1546 } 1547 1548 /** 1549 * This test results in one MR jobs because one join feeds into the streamed side of the second. 1550 * 1551 * @throws Exception 1552 */ 1553 @Test 1554 public void testJoinsIntoCoGroupLhsSwappedJoin() throws Exception 1555 { 1556 getPlatform().copyFromLocal( inputFileLower ); 1557 getPlatform().copyFromLocal( inputFileUpper ); 1558 1559 getPlatform().copyFromLocal( inputFileLhs ); 1560 getPlatform().copyFromLocal( inputFileRhs ); 1561 1562 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1563 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1564 1565 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1566 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1567 1568 Map sources = new HashMap(); 1569 1570 sources.put( "lower", sourceLower ); 1571 sources.put( "upper", sourceUpper ); 1572 sources.put( "lhs", sourceLhs ); 1573 sources.put( "rhs", sourceRhs ); 1574 1575 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhsswappedjoin" ), SinkMode.REPLACE ); 1576 1577 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1578 1579 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1580 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1581 1582 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1583 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1584 1585 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1586 1587 upperLower = new Each( upperLower, new Identity() ); 1588 1589 Pipe lhsUpperLower = new HashJoin( upperLower, new Fields( "numUpperLower" ), pipeLhs, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower", "numLhs", "charLhs" ) ); 1590 1591 lhsUpperLower = new Each( lhsUpperLower, new Identity() ); 1592 1593 Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) ); 1594 1595 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1596 1597 if( getPlatform().isMapReduce() ) 1598 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1599 1600 flow.complete(); 1601 1602 validateLength( flow, 37, null ); 1603 1604 List<Tuple> actual = getSinkAsList( flow ); 1605 1606 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) ); 1607 assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) ); 1608 } 1609 1610 @Test 1611 public void testJoinsIntoCoGroupRhs() throws Exception 1612 { 1613 getPlatform().copyFromLocal( inputFileLower ); 1614 getPlatform().copyFromLocal( inputFileUpper ); 1615 1616 getPlatform().copyFromLocal( inputFileLhs ); 1617 getPlatform().copyFromLocal( inputFileRhs ); 1618 1619 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1620 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1621 1622 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1623 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1624 1625 Map sources = new HashMap(); 1626 1627 sources.put( "lower", sourceLower ); 1628 sources.put( "upper", sourceUpper ); 1629 sources.put( "lhs", sourceLhs ); 1630 sources.put( "rhs", sourceRhs ); 1631 1632 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouprhs" ), SinkMode.REPLACE ); 1633 1634 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1635 1636 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1637 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1638 1639 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1640 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1641 1642 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1643 1644 upperLower = new Each( upperLower, new Identity() ); 1645 1646 Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1647 1648 lhsUpperLower = new Each( lhsUpperLower, new Identity() ); 1649 1650 Pipe grouped = new CoGroup( "cogrouping", pipeRhs, new Fields( "num" ), lhsUpperLower, new Fields( "numLhs" ) ); 1651 1652 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1653 1654 if( getPlatform().isMapReduce() ) 1655 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1656 1657 flow.complete(); 1658 1659 validateLength( flow, 37, null ); 1660 1661 List<Tuple> actual = getSinkAsList( flow ); 1662 1663 assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\ta\t1\tA" ) ) ); 1664 assertTrue( actual.contains( new Tuple( "5\tE\t5\te\t5\te\t5\tE" ) ) ); 1665 } 1666 1667 @Test 1668 public void testJoinsIntoCoGroup() throws Exception 1669 { 1670 getPlatform().copyFromLocal( inputFileLower ); 1671 getPlatform().copyFromLocal( inputFileUpper ); 1672 1673 getPlatform().copyFromLocal( inputFileLhs ); 1674 getPlatform().copyFromLocal( inputFileRhs ); 1675 1676 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1677 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1678 1679 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1680 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1681 1682 Map sources = new HashMap(); 1683 1684 sources.put( "lower", sourceLower ); 1685 sources.put( "upper", sourceUpper ); 1686 sources.put( "lhs", sourceLhs ); 1687 sources.put( "rhs", sourceRhs ); 1688 1689 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogroup" ), SinkMode.REPLACE ); 1690 1691 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1692 1693 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1694 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1695 1696 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1697 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1698 1699 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower1", "charUpperLower1", "numUpperLower2", "charUpperLower2" ) ); 1700 1701 upperLower = new Each( upperLower, new Identity() ); 1702 1703 Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "numLhsRhs1", "charLhsRhs1", "numLhsRhs2", "charLhsRhs2" ) ); 1704 1705 lhsRhs = new Each( lhsRhs, new Identity() ); 1706 1707 Pipe grouped = new CoGroup( "cogrouping", upperLower, new Fields( "numUpperLower1" ), lhsRhs, new Fields( "numLhsRhs1" ) ); 1708 1709 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1710 1711 if( getPlatform().isMapReduce() ) 1712 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1713 1714 flow.complete(); 1715 1716 validateLength( flow, 37, null ); 1717 1718 List<Tuple> actual = getSinkAsList( flow ); 1719 1720 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) ); 1721 assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) ); 1722 } 1723 1724 public static class AllComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable 1725 { 1726 1727 @Override 1728 public int compare( Comparable lhs, Comparable rhs ) 1729 { 1730 return lhs.toString().compareTo( rhs.toString() ); 1731 } 1732 1733 @Override 1734 public int hashCode( Comparable value ) 1735 { 1736 if( value == null ) 1737 return 0; 1738 1739 return value.toString().hashCode(); 1740 } 1741 } 1742 1743 /** 1744 * Tests Hasher being honored even if default comparator is null. 1745 * 1746 * @throws Exception 1747 */ 1748 @Test 1749 public void testJoinWithHasher() throws Exception 1750 { 1751 getPlatform().copyFromLocal( inputFileLower ); 1752 getPlatform().copyFromLocal( inputFileUpper ); 1753 1754 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1755 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1756 1757 Map sources = new HashMap(); 1758 1759 sources.put( "lower", sourceLower ); 1760 sources.put( "upper", sourceUpper ); 1761 1762 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinhasher" ), SinkMode.REPLACE ); 1763 1764 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1765 1766 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1767 1768 pipeLower = new Each( pipeLower, new Fields( "num" ), new ExpressionFunction( Fields.ARGS, "Integer.parseInt( num )", String.class ), Fields.REPLACE ); 1769 1770 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1771 1772 Fields num = new Fields( "num" ); 1773 num.setComparator( "num", new AllComparator() ); 1774 1775 Pipe splice = new HashJoin( pipeLower, num, pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 1776 1777 Map<Object, Object> properties = getProperties(); 1778 1779 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 1780 1781 flow.complete(); 1782 1783 validateLength( flow, 5 ); 1784 1785 List<Tuple> values = getSinkAsList( flow ); 1786 1787 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1788 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1789 } 1790 1791 @Test 1792 public void testJoinNone() throws Exception 1793 { 1794 getPlatform().copyFromLocal( inputFileLower ); 1795 getPlatform().copyFromLocal( inputFileUpper ); 1796 1797 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1798 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1799 1800 Map sources = new HashMap(); 1801 1802 sources.put( "lower", sourceLower ); 1803 sources.put( "upper", sourceUpper ); 1804 1805 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE ); 1806 1807 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1808 1809 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1810 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1811 1812 Pipe splice = new HashJoin( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) ); 1813 1814 Map<Object, Object> properties = getProperties(); 1815 1816 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 1817 1818 flow.complete(); 1819 1820 validateLength( flow, 25 ); 1821 1822 List<Tuple> values = getSinkAsList( flow ); 1823 1824 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1825 assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) ); 1826 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1827 } 1828 1829 @Test 1830 public void testGroupBySplitJoins() throws Exception 1831 { 1832 getPlatform().copyFromLocal( inputFileLower ); 1833 getPlatform().copyFromLocal( inputFileUpper ); 1834 getPlatform().copyFromLocal( inputFileJoined ); 1835 1836 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1837 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1838 Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined ); 1839 1840 Map sources = new HashMap(); 1841 1842 sources.put( "lower", sourceLower ); 1843 sources.put( "upper", sourceUpper ); 1844 sources.put( "joined", sourceJoined ); 1845 1846 Tap lhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ); 1847 Tap rhsSink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ); 1848 1849 Map sinks = new HashMap(); 1850 1851 sinks.put( "lhs", lhsSink ); 1852 sinks.put( "rhs", rhsSink ); 1853 1854 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 1855 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 1856 Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" ); 1857 1858 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1859 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1860 Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined ); 1861 1862 Pipe pipe = new GroupBy( pipeLower, new Fields( "numA" ) ); 1863 1864 pipe = new Every( pipe, Fields.ALL, new TestIdentityBuffer( new Fields( "numA" ), 5, false ), Fields.RESULTS ); 1865 1866 Pipe lhsPipe = new Each( pipe, new Identity() ); 1867 lhsPipe = new HashJoin( "lhs", lhsPipe, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1868 1869 Pipe rhsPipe = new Each( pipe, new Identity() ); 1870 rhsPipe = new HashJoin( "rhs", rhsPipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) ); 1871 1872 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, lhsPipe, rhsPipe ); 1873 1874 if( getPlatform().isMapReduce() ) 1875 assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() ); 1876 1877 flow.complete(); 1878 1879 validateLength( flow.openSink( "lhs" ), 5, null ); 1880 validateLength( flow.openSink( "rhs" ), 5, null ); 1881 1882 List<Tuple> lhsActual = asList( flow, lhsSink ); 1883 1884 assertTrue( lhsActual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1885 assertTrue( lhsActual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1886 1887 List<Tuple> rhsActual = asList( flow, rhsSink ); 1888 1889 assertTrue( rhsActual.contains( new Tuple( "1\ta\t1\ta\tA" ) ) ); 1890 assertTrue( rhsActual.contains( new Tuple( "2\tb\t2\tb\tB" ) ) ); 1891 } 1892 1893 /** 1894 * currently we cannot efficiently plan for this case. better to throw an error 1895 * <p/> 1896 * When run against a cluster a Merge before a GroupBy can hide the streamed/accumulated nature of a branch. 1897 * <p/> 1898 * commented code is for troubleshooting. 1899 * 1900 * @throws Exception 1901 */ 1902 @Test 1903 public void testJoinMergeGroupBy() throws Exception 1904 { 1905 getPlatform().copyFromLocal( inputFileNums10 ); 1906 getPlatform().copyFromLocal( inputFileNums20 ); 1907 1908 Tap lhsTap = getPlatform().getTextFile( new Fields( "id" ), inputFileNums10 ); 1909 Tap rhsTap = getPlatform().getTextFile( new Fields( "id2" ), inputFileNums20 ); 1910 1911 Pipe lhs = new Pipe( "lhs" ); 1912 Pipe rhs = new Pipe( "rhs" ); 1913 1914// Pipe joined = new CoGroup( messages, new Fields( "id" ), people, new Fields( "id2" ) ); 1915 Pipe joined = new HashJoin( lhs, new Fields( "id" ), rhs, new Fields( "id2" ) ); 1916 1917 Pipe pruned = new Each( joined, new Fields( "id2" ), new Identity(), Fields.RESULTS ); 1918// pruned = new Checkpoint( pruned ); 1919 Pipe merged = new Merge( pruned, rhs ); 1920 Pipe grouped = new GroupBy( merged, new Fields( "id2" ) ); 1921// Pipe grouped = new GroupBy( Pipe.pipes( pruned, people ), new Fields( "id2" ) ); 1922 Aggregator count = new Count( new Fields( "count" ) ); 1923 Pipe counted = new Every( grouped, count ); 1924 1925 String testJoinMerge = "testJoinMergeGroupBy/" + ( ( joined instanceof CoGroup ) ? "cogroup" : "hashjoin" ); 1926 Tap sink = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", null, getOutputPath( testJoinMerge ), SinkMode.REPLACE ); 1927 1928 FlowDef flowDef = FlowDef.flowDef() 1929 .setName( "join-merge" ) 1930 .addSource( rhs, rhsTap ) 1931 .addSource( lhs, lhsTap ) 1932 .addTailSink( counted, sink ); 1933 1934 boolean failOnPlanner = getPlatform().isMapReduce() || getPlatform().isDAG(); 1935 1936 Flow flow = null; 1937 1938 try 1939 { 1940 flow = getPlatform().getFlowConnector().connect( flowDef ); 1941 1942 if( failOnPlanner ) 1943 fail( "planner should throw error on plan" ); 1944 } 1945 catch( Exception exception ) 1946 { 1947 if( !failOnPlanner ) 1948 throw exception; 1949 1950 return; 1951 } 1952 1953// flow.writeDOT( "joinmerge.dot" ); 1954// flow.writeStepsDOT( "joinmerge-steps.dot" ); 1955 1956 flow.complete(); 1957 1958 validateLength( flow, 20 ); 1959 1960 List<Tuple> values = getSinkAsList( flow ); 1961 List<Tuple> expected = new ArrayList<Tuple>(); 1962 1963 expected.add( new Tuple( "1", "2" ) ); 1964 expected.add( new Tuple( "10", "2" ) ); 1965 expected.add( new Tuple( "11", "1" ) ); 1966 expected.add( new Tuple( "12", "1" ) ); 1967 expected.add( new Tuple( "13", "1" ) ); 1968 expected.add( new Tuple( "14", "1" ) ); 1969 expected.add( new Tuple( "15", "1" ) ); 1970 expected.add( new Tuple( "16", "1" ) ); 1971 expected.add( new Tuple( "17", "1" ) ); 1972 expected.add( new Tuple( "18", "1" ) ); 1973 expected.add( new Tuple( "19", "1" ) ); 1974 expected.add( new Tuple( "2", "2" ) ); 1975 expected.add( new Tuple( "20", "1" ) ); 1976 expected.add( new Tuple( "3", "2" ) ); 1977 expected.add( new Tuple( "4", "2" ) ); 1978 expected.add( new Tuple( "5", "2" ) ); 1979 expected.add( new Tuple( "6", "2" ) ); 1980 expected.add( new Tuple( "7", "2" ) ); 1981 expected.add( new Tuple( "8", "2" ) ); 1982 expected.add( new Tuple( "9", "2" ) ); 1983 1984 Collections.sort( values ); 1985 Collections.sort( expected ); 1986 1987 assertEquals( expected, values ); 1988 } 1989 1990 /** 1991 * Under tez, this can result in the HashJoin being duplicated across nodes for each split after the HashJoin 1992 * BoundaryBalanceJoinSplitTransformer inserts a Boundary at the split, preventing duplication of the path 1993 * 1994 * @throws Exception 1995 */ 1996 @Test 1997 public void testJoinSplit() throws Exception 1998 { 1999 getPlatform().copyFromLocal( inputFileLhs ); 2000 getPlatform().copyFromLocal( inputFileRhs ); 2001 2002 FlowDef flowDef = FlowDef.flowDef() 2003 .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) ) 2004 .addSource( "rhs", getPlatform().getTextFile( inputFileRhs ) ) 2005 .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) ) 2006 .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) ); 2007 2008 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 2009 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 2010 2011 Pipe join = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 2012 2013 Pipe pipeLhs = new Each( new Pipe( "lhsSink", join ), new Identity() ); 2014 Pipe pipeRhs = new Each( new Pipe( "rhsSink", join ), new Identity() ); 2015 2016 flowDef 2017 .addTail( pipeLhs ) 2018 .addTail( pipeRhs ); 2019 2020 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 2021 2022 flow.complete(); 2023 2024 validateLength( flow, 37, null ); 2025 2026 List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) ); 2027 2028 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 2029 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 2030 2031 values = asList( flow, flowDef.getSinks().get( "rhsSink" ) ); 2032 2033 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 2034 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 2035 } 2036 2037 /** 2038 * catches a situation where BottomUpJoinedBoundariesNodePartitioner may capture an invalid HashJoin sub-graph 2039 * if the in-bound Boundary is split upon. 2040 */ 2041 @Test 2042 public void testSameSourceJoinSplitIntoJoin() throws Exception 2043 { 2044 getPlatform().copyFromLocal( inputFileLhs ); 2045 getPlatform().copyFromLocal( inputFileRhs ); 2046 2047 FlowDef flowDef = FlowDef.flowDef() 2048 .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) ) 2049 .addSource( "rhs", getPlatform().getTextFile( inputFileLhs ) ) 2050 .addSource( "joinSecond", getPlatform().getTextFile( inputFileRhs ) ) 2051 .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) ) 2052 .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) ); 2053 2054 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 2055 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 2056 2057 Pipe joinFirst = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 2058 2059 Pipe pipeLhs = new Each( new Pipe( "lhsSink", joinFirst ), new Identity() ); 2060 2061 Pipe joinSecond = new Each( "joinSecond", new Fields( "line" ), new RegexSplitter( new Fields( "numRHSSecond", "charRHSSecond" ), " " ) ); 2062 2063 joinSecond = new HashJoin( joinFirst, new Fields( "numLHS" ), joinSecond, new Fields( "numRHSSecond" ) ); 2064 2065 Pipe pipeRhs = new Each( new Pipe( "rhsSink", joinSecond ), new Identity() ); 2066 2067 flowDef 2068 .addTail( pipeLhs ) 2069 .addTail( pipeRhs ); 2070 2071 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 2072 2073 flow.complete(); 2074 2075 List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) ); 2076 2077 assertEquals( 37, values.size() ); 2078 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 2079 assertTrue( values.contains( new Tuple( "1\ta\t1\tb" ) ) ); 2080 2081 values = asList( flow, flowDef.getSinks().get( "rhsSink" ) ); 2082 2083 assertEquals( 109, values.size() ); 2084 assertTrue( values.contains( new Tuple( "1\ta\t1\ta\t1\tA" ) ) ); 2085 assertTrue( values.contains( new Tuple( "1\ta\t1\tb\t1\tB" ) ) ); 2086 } 2087 2088 /** 2089 * checks that a split after a HashJoin does not result in the HashJoin execution being duplicated across 2090 * multiple nodes, one for each branch in the split. 2091 */ 2092 @Test 2093 public void testJoinSplitBeforeJoin() throws Exception 2094 { 2095 getPlatform().copyFromLocal( inputFileLhs ); 2096 getPlatform().copyFromLocal( inputFileRhs ); 2097 2098 FlowDef flowDef = FlowDef.flowDef() 2099 .addSource( "lhs", getPlatform().getTextFile( inputFileLhs ) ) 2100 .addSource( "rhs", getPlatform().getTextFile( inputFileRhs ) ) 2101 .addSource( "joinSecond", getPlatform().getTextFile( inputFileRhs ) ) 2102 .addSink( "lhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "lhs" ), SinkMode.REPLACE ) ) 2103 .addSink( "rhsSink", getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "rhs" ), SinkMode.REPLACE ) ); 2104 2105 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 2106 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 2107 2108 pipeUpper = new Checkpoint( pipeUpper ); 2109 2110 HashJoin hashJoin = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 2111 2112 Pipe joinFirst = hashJoin; 2113 2114 joinFirst = new Each( joinFirst, new Identity() ); 2115 2116 Pipe pipeLhs = new Each( new Pipe( "lhsSink", joinFirst ), new Identity() ); 2117 2118 pipeLhs = new GroupBy( pipeLhs, new Fields( "numLHS" ) ); 2119 2120 joinFirst = new Each( new Pipe( "lhsSplit", joinFirst ), new Identity() ); 2121 2122 Pipe joinSecond = new Each( "joinSecond", new Fields( "line" ), new RegexSplitter( new Fields( "numRHSSecond", "charRHSSecond" ), " " ) ); 2123 2124 joinSecond = new CoGroup( joinFirst, new Fields( "numLHS" ), joinSecond, new Fields( "numRHSSecond" ) ); 2125 2126 Pipe pipeRhs = new Each( new Pipe( "rhsSink", joinSecond ), new Identity() ); 2127 2128 flowDef 2129 .addTail( pipeLhs ) 2130 .addTail( pipeRhs ); 2131 2132 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 2133 2134 if( getPlatform().isDAG() ) 2135 { 2136 FlowStep flowStep = (FlowStep) flow.getFlowSteps().get( 0 ); 2137 List<ElementGraph> elementGraphs = flowStep.getFlowNodeGraph().getElementGraphs( hashJoin ); 2138 2139 assertEquals( 1, elementGraphs.size() ); 2140 } 2141 2142 flow.complete(); 2143 2144 List<Tuple> values = asList( flow, flowDef.getSinks().get( "lhsSink" ) ); 2145 2146 assertEquals( 37, values.size() ); 2147 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 2148 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 2149 2150 values = asList( flow, flowDef.getSinks().get( "rhsSink" ) ); 2151 2152 assertEquals( 109, values.size() ); 2153 assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 2154 assertTrue( values.contains( new Tuple( "1\ta\t1\tB\t1\tB" ) ) ); 2155 } 2156 2157 @Test 2158 public void testGroupBySplitGroupByJoin() throws Exception 2159 { 2160 getPlatform().copyFromLocal( inputFileLower ); 2161 2162 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 2163 2164 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE ); 2165 2166 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 2167 2168 Pipe pipeFirst = new Pipe( "first" ); 2169 pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter ); 2170 pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) ); 2171 pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL ); 2172 2173 Pipe pipeSecond = new Pipe( "second", pipeFirst ); 2174 pipeSecond = new Each( pipeSecond, new Identity() ); 2175 pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) ); 2176 pipeSecond = new Every( pipeSecond, new Fields( "firstFirst" ), new First( new Fields( "secondFirst" ) ), Fields.ALL ); 2177 pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) ); 2178 pipeSecond = new Every( pipeSecond, new Fields( "secondFirst" ), new First( new Fields( "thirdFirst" ) ), Fields.ALL ); 2179 2180 Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) ); 2181 2182 Flow flow = getPlatform().getFlowConnector().connect( source, sink, splice ); 2183 2184 flow.complete(); 2185 2186 validateLength( flow, 5, null ); 2187 2188 List<Tuple> values = getSinkAsList( flow ); 2189 2190 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 2191 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 2192 assertTrue( values.contains( new Tuple( "3\tc\t3\tc" ) ) ); 2193 assertTrue( values.contains( new Tuple( "4\td\t4\td" ) ) ); 2194 assertTrue( values.contains( new Tuple( "5\te\t5\te" ) ) ); 2195 } 2196 }