001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading; 022 023 import java.io.Serializable; 024 import java.util.ArrayList; 025 import java.util.Collections; 026 import java.util.Comparator; 027 import java.util.HashMap; 028 import java.util.HashSet; 029 import java.util.List; 030 import java.util.Map; 031 import java.util.Set; 032 033 import cascading.flow.Flow; 034 import cascading.flow.FlowDef; 035 import cascading.operation.Aggregator; 036 import cascading.operation.Function; 037 import cascading.operation.Identity; 038 import cascading.operation.aggregator.Count; 039 import cascading.operation.aggregator.First; 040 import cascading.operation.expression.ExpressionFunction; 041 import cascading.operation.regex.RegexFilter; 042 import cascading.operation.regex.RegexSplitter; 043 import cascading.pipe.CoGroup; 044 import cascading.pipe.Each; 045 import cascading.pipe.Every; 046 import cascading.pipe.GroupBy; 047 import cascading.pipe.HashJoin; 048 import cascading.pipe.Merge; 049 import cascading.pipe.Pipe; 050 import cascading.pipe.joiner.InnerJoin; 051 import cascading.pipe.joiner.Joiner; 052 import cascading.pipe.joiner.LeftJoin; 053 import cascading.pipe.joiner.MixedJoin; 054 import cascading.pipe.joiner.OuterJoin; 055 import cascading.pipe.joiner.RightJoin; 056 import cascading.tap.SinkMode; 057 import cascading.tap.Tap; 058 import cascading.tuple.Fields; 059 import cascading.tuple.Hasher; 060 import cascading.tuple.Tuple; 061 import org.junit.Test; 062 063 import static data.InputData.*; 064 065 066 public class JoinFieldedPipesPlatformTest extends PlatformTestCase 067 { 068 public JoinFieldedPipesPlatformTest() 069 { 070 super( true, 4, 1 ); // leave cluster testing enabled 071 } 072 073 @Test 074 public void testCross() throws Exception 075 { 076 getPlatform().copyFromLocal( inputFileLhs ); 077 getPlatform().copyFromLocal( inputFileRhs ); 078 079 Map sources = new HashMap(); 080 081 sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) ); 082 sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) ); 083 084 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE ); 085 086 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 087 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 088 089 Pipe cross = new HashJoin( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 090 091 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross ); 092 093 flow.complete(); 094 095 validateLength( flow, 37, null ); 096 097 List<Tuple> values = getSinkAsList( flow ); 098 099 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 100 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 101 } 102 103 @Test 104 public void testJoin() throws Exception 105 { 106 getPlatform().copyFromLocal( inputFileLower ); 107 getPlatform().copyFromLocal( inputFileUpper ); 108 109 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 110 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 111 112 Map sources = new HashMap(); 113 114 sources.put( "lower", sourceLower ); 115 sources.put( "upper", sourceUpper ); 116 117 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE ); 118 119 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 120 121 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 122 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 123 124 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 125 126 Map<Object, Object> properties = getProperties(); 127 128 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 129 130 flow.complete(); 131 132 validateLength( flow, 5 ); 133 134 List<Tuple> values = getSinkAsList( flow ); 135 136 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 137 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 138 } 139 140 @Test 141 public void testJoinSamePipeName() throws Exception 142 { 143 getPlatform().copyFromLocal( inputFileLower ); 144 getPlatform().copyFromLocal( inputFileUpper ); 145 146 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 147 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 148 149 Map sources = new HashMap(); 150 151 sources.put( "lower", sourceLower ); 152 sources.put( "upper", sourceUpper ); 153 154 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE ); 155 156 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 157 158 Pipe pipeLower = new Pipe( "lower" ); 159 Pipe pipeUpper = new Pipe( "upper" ); 160 161 // these pipes will hide the source name, and could cause one to be lost 162 pipeLower = new Pipe( "same", pipeLower ); 163 pipeUpper = new Pipe( "same", pipeUpper ); 164 165 pipeLower = new Each( pipeLower, new Fields( "line" ), splitter ); 166 pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter ); 167 168 // pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 169 // pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 170 171 pipeLower = new Pipe( "left", pipeLower ); 172 pipeUpper = new Pipe( "right", pipeUpper ); 173 174 // pipeLower = new Each( pipeLower, new Debug( true ) ); 175 // pipeUpper = new Each( pipeUpper, new Debug( true ) ); 176 177 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 178 179 // splice = new Each( splice, new Debug( true ) ); 180 splice = new Pipe( "splice", splice ); 181 splice = new Pipe( "tail", splice ); 182 183 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 184 185 flow.complete(); 186 187 validateLength( flow, 5 ); 188 189 List<Tuple> values = getSinkAsList( flow ); 190 191 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 192 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 193 } 194 195 @Test 196 public void testJoinWithUnknowns() throws Exception 197 { 198 getPlatform().copyFromLocal( inputFileLower ); 199 getPlatform().copyFromLocal( inputFileUpper ); 200 201 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 202 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 203 204 Map sources = new HashMap(); 205 206 sources.put( "lower", sourceLower ); 207 sources.put( "upper", sourceUpper ); 208 209 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE ); 210 211 Function splitter = new RegexSplitter( Fields.UNKNOWN, " " ); 212 213 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 214 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 215 216 Pipe splice = new HashJoin( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) ); 217 218 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 219 220 flow.complete(); 221 222 validateLength( flow, 5 ); 223 224 List<Tuple> values = getSinkAsList( flow ); 225 226 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 227 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 228 } 229 230 /** 231 * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with 232 * a new stream using an outerjoin. 233 * 234 * @throws Exception 235 */ 236 @Test 237 public void testJoinFilteredBranch() throws Exception 238 { 239 getPlatform().copyFromLocal( inputFileLower ); 240 getPlatform().copyFromLocal( inputFileUpper ); 241 242 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 243 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 244 245 Map sources = new HashMap(); 246 247 sources.put( "lower", sourceLower ); 248 sources.put( "upper", sourceUpper ); 249 250 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinfilteredbranch" ), SinkMode.REPLACE ); 251 252 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 253 254 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 255 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 256 pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all 257 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 258 259 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() ); 260 261 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 262 263 flow.complete(); 264 265 validateLength( flow, 5 ); 266 267 List<Tuple> values = getSinkAsList( flow ); 268 269 assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) ); 270 assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) ); 271 } 272 273 @Test 274 public void testJoinSelf() throws Exception 275 { 276 getPlatform().copyFromLocal( inputFileLower ); 277 278 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 279 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 280 281 Map sources = new HashMap(); 282 283 sources.put( "lower", sourceLower ); 284 sources.put( "upper", sourceUpper ); 285 286 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinself" ), SinkMode.REPLACE ); 287 288 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 289 290 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 291 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 292 293 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 294 295 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 296 297 flow.complete(); 298 299 validateLength( flow, 5 ); 300 301 List<Tuple> values = getSinkAsList( flow ); 302 303 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 304 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 305 } 306 307 /** 308 * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join 309 * 310 * @throws Exception when 311 */ 312 @Test 313 public void testJoinAfterEvery() throws Exception 314 { 315 getPlatform().copyFromLocal( inputFileLower ); 316 getPlatform().copyFromLocal( inputFileUpper ); 317 318 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 319 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 320 321 Map sources = new HashMap(); 322 323 sources.put( "lower", sourceLower ); 324 sources.put( "upper", sourceUpper ); 325 326 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE ); 327 328 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 329 330 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 331 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) ); 332 pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL ); 333 334 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 335 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 336 pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL ); 337 338 Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 339 340 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 341 342 flow.complete(); 343 344 validateLength( flow, 5, null ); 345 346 List<Tuple> values = getSinkAsList( flow ); 347 348 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 349 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 350 } 351 352 @Test 353 public void testJoinInnerSingleField() throws Exception 354 { 355 getPlatform().copyFromLocal( inputFileLowerOffset ); 356 getPlatform().copyFromLocal( inputFileUpper ); 357 358 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 359 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 360 361 Map sources = new HashMap(); 362 363 sources.put( "lower", sourceLower ); 364 sources.put( "upper", sourceUpper ); 365 366 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joininnersingle" ), SinkMode.REPLACE ); 367 368 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) ); 369 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) ); 370 371 Pipe join = new HashJoin( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) ); 372 373 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 374 375 flow.complete(); 376 377 validateLength( flow, 3, null ); 378 379 Set<Tuple> results = new HashSet<Tuple>(); 380 381 results.add( new Tuple( "1\t1" ) ); 382 results.add( new Tuple( "5\t5" ) ); 383 384 List<Tuple> actual = getSinkAsList( flow ); 385 386 results.removeAll( actual ); 387 388 assertEquals( 0, results.size() ); 389 } 390 391 /** 392 * 1 a1 393 * 1 a2 394 * 1 a3 395 * 2 b1 396 * 3 c1 397 * 4 d1 398 * 4 d2 399 * 4 d3 400 * 5 e1 401 * 5 e2 402 * 5 e3 403 * 7 g1 404 * 7 g2 405 * 7 g3 406 * 7 g4 407 * 7 g5 408 * null h1 409 * <p/> 410 * 1 A1 411 * 1 A2 412 * 1 A3 413 * 2 B1 414 * 2 B2 415 * 2 B3 416 * 4 D1 417 * 6 F1 418 * 6 F2 419 * null H1 420 * <p/> 421 * 1 a1 1 A1 422 * 1 a1 1 A2 423 * 1 a1 1 A3 424 * 1 a2 1 A1 425 * 1 a2 1 A2 426 * 1 a2 1 A3 427 * 1 a3 1 A1 428 * 1 a3 1 A2 429 * 1 a3 1 A3 430 * 2 b1 2 B1 431 * 2 b1 2 B2 432 * 2 b1 2 B3 433 * 4 d1 4 D1 434 * 4 d2 4 D1 435 * 4 d3 4 D1 436 * null h1 null H1 437 * 438 * @throws Exception 439 */ 440 @Test 441 public void testJoinInner() throws Exception 442 { 443 HashSet<Tuple> results = new HashSet<Tuple>(); 444 445 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 446 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 447 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 448 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 449 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 450 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 451 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 452 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 453 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 454 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 455 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 456 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 457 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 458 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 459 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 460 results.add( new Tuple( null, "h1", null, "H1" ) ); 461 462 handleJoins( "joininner", new InnerJoin(), results ); 463 } 464 465 /** 466 * /** 467 * 1 a1 468 * 1 a2 469 * 1 a3 470 * 2 b1 471 * 3 c1 472 * 4 d1 473 * 4 d2 474 * 4 d3 475 * 5 e1 476 * 5 e2 477 * 5 e3 478 * 7 g1 479 * 7 g2 480 * 7 g3 481 * 7 g4 482 * 7 g5 483 * null h1 484 * <p/> 485 * 1 A1 486 * 1 A2 487 * 1 A3 488 * 2 B1 489 * 2 B2 490 * 2 B3 491 * 4 D1 492 * 6 F1 493 * 6 F2 494 * null H1 495 * <p/> 496 * 1 a1 1 A1 497 * 1 a1 1 A2 498 * 1 a1 1 A3 499 * 1 a2 1 A1 500 * 1 a2 1 A2 501 * 1 a2 1 A3 502 * 1 a3 1 A1 503 * 1 a3 1 A2 504 * 1 a3 1 A3 505 * 2 b1 2 B1 506 * 2 b1 2 B2 507 * 2 b1 2 B3 508 * 3 c1 null null 509 * 4 d1 4 D1 510 * 4 d2 4 D1 511 * 4 d3 4 D1 512 * 5 e1 null null 513 * 5 e2 null null 514 * 5 e3 null null 515 * null null 6 F1 516 * null null 6 F2 517 * 7 g1 null null 518 * 7 g2 null null 519 * 7 g3 null null 520 * 7 g4 null null 521 * 7 g5 null null 522 * null h1 null H1 523 * 524 * @throws Exception 525 */ 526 @Test 527 public void testJoinOuter() throws Exception 528 { 529 // skip if hadoop cluster mode, outer joins don't behave the same 530 if( getPlatform().isMapReduce() && getPlatform().isUseCluster() ) 531 return; 532 533 Set<Tuple> results = new HashSet<Tuple>(); 534 535 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 536 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 537 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 538 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 539 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 540 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 541 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 542 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 543 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 544 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 545 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 546 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 547 results.add( new Tuple( "3", "c1", null, null ) ); 548 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 549 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 550 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 551 results.add( new Tuple( "5", "e1", null, null ) ); 552 results.add( new Tuple( "5", "e2", null, null ) ); 553 results.add( new Tuple( "5", "e3", null, null ) ); 554 results.add( new Tuple( null, null, "6", "F1" ) ); 555 results.add( new Tuple( null, null, "6", "F2" ) ); 556 results.add( new Tuple( "7", "g1", null, null ) ); 557 results.add( new Tuple( "7", "g2", null, null ) ); 558 results.add( new Tuple( "7", "g3", null, null ) ); 559 results.add( new Tuple( "7", "g4", null, null ) ); 560 results.add( new Tuple( "7", "g5", null, null ) ); 561 results.add( new Tuple( null, "h1", null, "H1" ) ); 562 563 handleJoins( "joinouter", new OuterJoin(), results ); 564 } 565 566 /** 567 * 1 a1 568 * 1 a2 569 * 1 a3 570 * 2 b1 571 * 3 c1 572 * 4 d1 573 * 4 d2 574 * 4 d3 575 * 5 e1 576 * 5 e2 577 * 5 e3 578 * 7 g1 579 * 7 g2 580 * 7 g3 581 * 7 g4 582 * 7 g5 583 * null h1 584 * <p/> 585 * 1 A1 586 * 1 A2 587 * 1 A3 588 * 2 B1 589 * 2 B2 590 * 2 B3 591 * 4 D1 592 * 6 F1 593 * 6 F2 594 * null H1 595 * <p/> 596 * 1 a1 1 A1 597 * 1 a1 1 A2 598 * 1 a1 1 A3 599 * 1 a2 1 A1 600 * 1 a2 1 A2 601 * 1 a2 1 A3 602 * 1 a3 1 A1 603 * 1 a3 1 A2 604 * 1 a3 1 A3 605 * 2 b1 2 B1 606 * 2 b1 2 B2 607 * 2 b1 2 B3 608 * 3 c1 null null 609 * 4 d1 4 D1 610 * 4 d2 4 D1 611 * 4 d3 4 D1 612 * 5 e1 null null 613 * 5 e2 null null 614 * 5 e3 null null 615 * 7 g1 null null 616 * 7 g2 null null 617 * 7 g3 null null 618 * 7 g4 null null 619 * 7 g5 null null 620 * null h1 null H1 621 * 622 * @throws Exception 623 */ 624 @Test 625 public void testJoinInnerOuter() throws Exception 626 { 627 Set<Tuple> results = new HashSet<Tuple>(); 628 629 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 630 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 631 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 632 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 633 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 634 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 635 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 636 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 637 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 638 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 639 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 640 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 641 results.add( new Tuple( "3", "c1", null, null ) ); 642 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 643 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 644 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 645 results.add( new Tuple( "5", "e1", null, null ) ); 646 results.add( new Tuple( "5", "e2", null, null ) ); 647 results.add( new Tuple( "5", "e3", null, null ) ); 648 results.add( new Tuple( "7", "g1", null, null ) ); 649 results.add( new Tuple( "7", "g2", null, null ) ); 650 results.add( new Tuple( "7", "g3", null, null ) ); 651 results.add( new Tuple( "7", "g4", null, null ) ); 652 results.add( new Tuple( "7", "g5", null, null ) ); 653 results.add( new Tuple( null, "h1", null, "H1" ) ); 654 655 handleJoins( "joininnerouter", new LeftJoin(), results ); 656 } 657 658 /** 659 * 1 a1 660 * 1 a2 661 * 1 a3 662 * 2 b1 663 * 3 c1 664 * 4 d1 665 * 4 d2 666 * 4 d3 667 * 5 e1 668 * 5 e2 669 * 5 e3 670 * 7 g1 671 * 7 g2 672 * 7 g3 673 * 7 g4 674 * 7 g5 675 * null h1 676 * <p/> 677 * 1 A1 678 * 1 A2 679 * 1 A3 680 * 2 B1 681 * 2 B2 682 * 2 B3 683 * 4 D1 684 * 6 F1 685 * 6 F2 686 * null H1 687 * <p/> 688 * 1 a1 1 A1 689 * 1 a1 1 A2 690 * 1 a1 1 A3 691 * 1 a2 1 A1 692 * 1 a2 1 A2 693 * 1 a2 1 A3 694 * 1 a3 1 A1 695 * 1 a3 1 A2 696 * 1 a3 1 A3 697 * 2 b1 2 B1 698 * 2 b1 2 B2 699 * 2 b1 2 B3 700 * 4 d1 4 D1 701 * 4 d2 4 D1 702 * 4 d3 4 D1 703 * null null 6 F1 704 * null null 6 F2 705 * null h1 null H1 706 * 707 * @throws Exception 708 */ 709 @Test 710 public void testJoinOuterInner() throws Exception 711 { 712 // skip if hadoop cluster mode, outer joins don't behave the same 713 if( getPlatform().isMapReduce() && getPlatform().isUseCluster() ) 714 return; 715 716 Set<Tuple> results = new HashSet<Tuple>(); 717 718 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 719 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 720 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 721 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 722 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 723 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 724 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 725 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 726 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 727 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 728 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 729 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 730 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 731 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 732 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 733 results.add( new Tuple( null, null, "6", "F1" ) ); 734 results.add( new Tuple( null, null, "6", "F2" ) ); 735 results.add( new Tuple( null, "h1", null, "H1" ) ); 736 737 handleJoins( "joinouterinner", new RightJoin(), results ); 738 } 739 740 private void handleJoins( String path, Joiner joiner, Set<Tuple> results ) throws Exception 741 { 742 getPlatform().copyFromLocal( inputFileLhsSparse ); 743 getPlatform().copyFromLocal( inputFileRhsSparse ); 744 745 Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class ); 746 Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse ); 747 Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse ); 748 749 Map sources = new HashMap(); 750 751 sources.put( "lower", sourceLower ); 752 sources.put( "upper", sourceUpper ); 753 754 Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE ); 755 756 Pipe pipeLower = new Pipe( "lower" ); 757 Pipe pipeUpper = new Pipe( "upper" ); 758 759 Fields declaredFields = new Fields( "num", "char", "num2", "char2" ); 760 Fields groupingFields = new Fields( "num" ); 761 762 Pipe splice = new HashJoin( pipeLower, groupingFields, pipeUpper, groupingFields, declaredFields, joiner ); 763 764 splice = new Each( splice, Fields.ALL, new Identity(), Fields.RESULTS ); 765 766 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 767 768 flow.complete(); 769 770 validateLength( flow, results.size() ); 771 772 List<Tuple> actual = getSinkAsList( flow ); 773 774 results.removeAll( actual ); 775 776 assertEquals( 0, results.size() ); 777 } 778 779 /** 780 * 1 a 781 * 5 b 782 * 6 c 783 * 5 b 784 * 5 e 785 * <p/> 786 * 1 A 787 * 2 B 788 * 3 C 789 * 4 D 790 * 5 E 791 * <p/> 792 * 1 a 793 * 2 b 794 * 3 c 795 * 4 d 796 * 5 e 797 * <p/> 798 * 1 a 1 A 1 a 799 * - - 2 B 2 b 800 * - - 3 C 3 c 801 * - - 4 D 4 d 802 * 5 b 5 E 5 e 803 * 5 e 5 E 5 e 804 * 805 * @throws Exception 806 */ 807 @Test 808 public void testJoinMixed() throws Exception 809 { 810 // skip if hadoop cluster mode, outer joins don't behave the same 811 if( getPlatform().isMapReduce() && getPlatform().isUseCluster() ) 812 return; 813 814 getPlatform().copyFromLocal( inputFileLowerOffset ); 815 getPlatform().copyFromLocal( inputFileLower ); 816 getPlatform().copyFromLocal( inputFileUpper ); 817 818 Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 819 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 820 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 821 822 Map sources = new HashMap(); 823 824 sources.put( "loweroffset", sourceLowerOffset ); 825 sources.put( "lower", sourceLower ); 826 sources.put( "upper", sourceUpper ); 827 828 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinmixed" ), SinkMode.REPLACE ); 829 830 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 831 832 Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter ); 833 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 834 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 835 836 Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower ); 837 Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) ); 838 839 MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} ); 840 Pipe splice = new HashJoin( pipes, fields, Fields.size( 6 ), join ); 841 842 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 843 844 flow.complete(); 845 846 validateLength( flow, 6 ); 847 848 Set<Tuple> results = new HashSet<Tuple>(); 849 850 results.add( new Tuple( "1\ta\t1\tA\t1\ta" ) ); 851 results.add( new Tuple( "null\tnull\t2\tB\t2\tb" ) ); 852 results.add( new Tuple( "null\tnull\t3\tC\t3\tc" ) ); 853 results.add( new Tuple( "null\tnull\t4\tD\t4\td" ) ); 854 results.add( new Tuple( "5\tb\t5\tE\t5\te" ) ); 855 results.add( new Tuple( "5\te\t5\tE\t5\te" ) ); 856 857 List<Tuple> actual = getSinkAsList( flow ); 858 859 results.removeAll( actual ); 860 861 assertEquals( 0, results.size() ); 862 } 863 864 @Test 865 public void testJoinDiffFields() throws Exception 866 { 867 getPlatform().copyFromLocal( inputFileLower ); 868 getPlatform().copyFromLocal( inputFileUpper ); 869 870 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 871 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 872 873 Map sources = new HashMap(); 874 875 sources.put( "lower", sourceLower ); 876 sources.put( "upper", sourceUpper ); 877 878 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE ); 879 880 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 881 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 882 883 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 884 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 885 886 Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 887 888 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe ); 889 890 flow.complete(); 891 892 validateLength( flow, 5 ); 893 894 List<Tuple> actual = getSinkAsList( flow ); 895 896 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 897 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 898 } 899 900 @Test 901 public void testJoinGroupBy() throws Exception 902 { 903 getPlatform().copyFromLocal( inputFileLower ); 904 getPlatform().copyFromLocal( inputFileUpper ); 905 906 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 907 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 908 909 Map sources = new HashMap(); 910 911 sources.put( "lower", sourceLower ); 912 sources.put( "upper", sourceUpper ); 913 914 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupby" ), SinkMode.REPLACE ); 915 916 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 917 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 918 919 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 920 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 921 922 Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 923 924 Pipe groupby = new GroupBy( pipe, new Fields( "numA" ) ); 925 926 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, groupby ); 927 928 flow.complete(); 929 930 validateLength( flow, 5, null ); 931 932 List<Tuple> actual = getSinkAsList( flow ); 933 934 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 935 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 936 } 937 938 @Test 939 public void testJoinSamePipe() throws Exception 940 { 941 getPlatform().copyFromLocal( inputFileLower ); 942 943 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 944 945 Map sources = new HashMap(); 946 947 sources.put( "lower", source ); 948 949 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE ); 950 951 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 952 953 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 954 955 Pipe pipe = new HashJoin( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) ); 956 957 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe ); 958 959 flow.complete(); 960 961 validateLength( flow, 5, null ); 962 963 List<Tuple> actual = getSinkAsList( flow ); 964 965 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 966 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 967 } 968 969 @Test 970 public void testJoinSamePipe2() throws Exception 971 { 972 getPlatform().copyFromLocal( inputFileLower ); 973 974 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 975 976 Map sources = new HashMap(); 977 978 sources.put( "lower", source ); 979 980 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE ); 981 982 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 983 984 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 985 986 Pipe join = new HashJoin( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 987 988 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 989 990 flow.complete(); 991 992 validateLength( flow, 5, null ); 993 994 List<Tuple> actual = getSinkAsList( flow ); 995 996 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 997 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 998 } 999 1000 @Test 1001 public void testJoinSamePipe3() throws Exception 1002 { 1003 getPlatform().copyFromLocal( inputFileLower ); 1004 1005 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower ); 1006 1007 Map sources = new HashMap(); 1008 1009 sources.put( "lower", source ); 1010 1011 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE ); 1012 1013 Pipe pipe = new Pipe( "lower" ); 1014 1015 Pipe lhs = new Pipe( "lhs", pipe ); 1016 Pipe rhs = new Pipe( "rhs", pipe ); 1017 1018 Pipe join = new HashJoin( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1019 1020 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 1021 1022 flow.complete(); 1023 1024 validateLength( flow, 5, null ); 1025 1026 List<Tuple> actual = getSinkAsList( flow ); 1027 1028 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1029 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1030 } 1031 1032 /** 1033 * Same source as rightmost 1034 * <p/> 1035 * should be a single job as the same file accumulates into the joins 1036 * 1037 * @throws Exception 1038 */ 1039 @Test 1040 public void testJoinAroundJoinRightMost() throws Exception 1041 { 1042 getPlatform().copyFromLocal( inputFileLower ); 1043 getPlatform().copyFromLocal( inputFileUpper ); 1044 1045 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1046 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1047 1048 Map sources = new HashMap(); 1049 1050 sources.put( "lower", sourceLower ); 1051 sources.put( "upper1", sourceUpper ); 1052 sources.put( "upper2", sourceUpper ); 1053 1054 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinrightmost" ), SinkMode.REPLACE ); 1055 1056 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1057 1058 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1059 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1060 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1061 1062 Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1063 1064 splice1 = new Each( splice1, new Identity() ); 1065 1066 Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1067 1068 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1069 1070 // flow.writeDOT( "joinaroundrightmost.dot" ); 1071 1072 if( getPlatform().isMapReduce() ) 1073 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1074 1075 flow.complete(); 1076 1077 validateLength( flow, 5, null ); 1078 1079 List<Tuple> actual = getSinkAsList( flow ); 1080 1081 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 1082 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) ); 1083 } 1084 1085 /** 1086 * Same source as leftmost 1087 * 1088 * @throws Exception 1089 */ 1090 @Test 1091 public void testJoinAroundJoinLeftMost() throws Exception 1092 { 1093 getPlatform().copyFromLocal( inputFileLower ); 1094 getPlatform().copyFromLocal( inputFileUpper ); 1095 1096 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1097 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1098 1099 Map sources = new HashMap(); 1100 1101 sources.put( "lower", sourceLower ); 1102 sources.put( "upper1", sourceUpper ); 1103 sources.put( "upper2", sourceUpper ); 1104 1105 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinleftmost" ), SinkMode.REPLACE ); 1106 1107 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1108 1109 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1110 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1111 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1112 1113 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1114 1115 splice1 = new Each( splice1, new Identity() ); 1116 1117 Pipe splice2 = new HashJoin( splice1, new Fields( "num1" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1118 1119 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1120 1121 // flow.writeDOT( "joinaroundleftmost.dot" ); 1122 1123 if( getPlatform().isMapReduce() ) 1124 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1125 1126 flow.complete(); 1127 1128 validateLength( flow, 5, null ); 1129 1130 List<Tuple> actual = getSinkAsList( flow ); 1131 1132 assertTrue( actual.contains( new Tuple( "1\tA\t1\tA\t1\ta" ) ) ); 1133 assertTrue( actual.contains( new Tuple( "2\tB\t2\tB\t2\tb" ) ) ); 1134 } 1135 1136 /** 1137 * Upper as leftmost and rightmost forcing two jobs 1138 * 1139 * @throws Exception 1140 */ 1141 @Test 1142 public void testJoinAroundJoinRightMostSwapped() throws Exception 1143 { 1144 getPlatform().copyFromLocal( inputFileLower ); 1145 getPlatform().copyFromLocal( inputFileUpper ); 1146 1147 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1148 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1149 1150 Map sources = new HashMap(); 1151 1152 sources.put( "lower", sourceLower ); 1153 sources.put( "upper1", sourceUpper ); 1154 sources.put( "upper2", sourceUpper ); 1155 1156 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinaroundjoinswapped" ), SinkMode.REPLACE ); 1157 1158 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1159 1160 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1161 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1162 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1163 1164 Pipe splice1 = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1165 1166 splice1 = new Each( splice1, new Identity() ); 1167 1168 // upper2 becomes leftmost, forcing a tap between the joins 1169 Pipe splice2 = new HashJoin( pipeUpper2, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1170 1171 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1172 1173 if( getPlatform().isMapReduce() ) 1174 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1175 1176 flow.complete(); 1177 1178 validateLength( flow, 5, null ); 1179 1180 List<Tuple> actual = getSinkAsList( flow ); 1181 1182 assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\tA" ) ) ); 1183 assertTrue( actual.contains( new Tuple( "2\tB\t2\tb\t2\tB" ) ) ); 1184 } 1185 1186 @Test 1187 public void testJoinGroupByJoin() throws Exception 1188 { 1189 getPlatform().copyFromLocal( inputFileLower ); 1190 getPlatform().copyFromLocal( inputFileUpper ); 1191 getPlatform().copyFromLocal( inputFileJoined ); 1192 1193 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1194 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1195 Tap sourceJoined = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileJoined ); 1196 1197 Map sources = new HashMap(); 1198 1199 sources.put( "lower", sourceLower ); 1200 sources.put( "upper", sourceUpper ); 1201 sources.put( "joined", sourceJoined ); 1202 1203 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joingroupbyjoin" ), SinkMode.REPLACE ); 1204 1205 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 1206 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 1207 Function splitterJoined = new RegexSplitter( new Fields( "numC", "lowerC", "upperC" ), "\t" ); 1208 1209 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1210 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1211 Pipe pipeJoined = new Each( new Pipe( "joined" ), new Fields( "line" ), splitterJoined ); 1212 1213 Pipe pipe = new HashJoin( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1214 1215 pipe = new GroupBy( pipe, new Fields( "numA" ) ); 1216 1217 pipe = new HashJoin( pipe, new Fields( "numA" ), pipeJoined, new Fields( "numC" ) ); 1218 1219 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, pipe ); 1220 1221 if( getPlatform().isMapReduce() ) 1222 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1223 1224 flow.complete(); 1225 1226 validateLength( flow, 5, null ); 1227 1228 List<Tuple> actual = getSinkAsList( flow ); 1229 1230 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\tA" ) ) ); 1231 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tb\tB" ) ) ); 1232 } 1233 1234 /** 1235 * here the same file is fed into the same HashJoin. 1236 * <p/> 1237 * This is three jobs. 1238 * <p/> 1239 * a temp tap is inserted before the accumulated branch for two reasons on the common HashJoin 1240 * <p/> 1241 * it is assumed the accumulated side is filtered down, so pushing to disk will preserve io 1242 * if accumulated side was streamed instead via a fork, only part of the file will accumulate into the HashJoin 1243 * <p/> 1244 * /-T-\ <-- accumulated 1245 * T HJ 1246 * \---/ <-- streamed 1247 * 1248 * @throws Exception 1249 */ 1250 @Test 1251 public void testJoinSameSourceIntoJoin() throws Exception 1252 { 1253 getPlatform().copyFromLocal( inputFileLower ); 1254 getPlatform().copyFromLocal( inputFileUpper ); 1255 1256 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1257 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1258 1259 Map sources = new HashMap(); 1260 1261 sources.put( "lower", sourceLower ); 1262 sources.put( "upper1", sourceUpper ); 1263 sources.put( "upper2", sourceUpper ); 1264 1265 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsamesourceintojoin" ), SinkMode.REPLACE ); 1266 1267 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1268 1269 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1270 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1271 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1272 1273 Pipe splice1 = new HashJoin( pipeUpper1, new Fields( "num" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1274 1275 splice1 = new Each( splice1, new Identity() ); 1276 1277 Pipe splice2 = new HashJoin( pipeLower, new Fields( "num" ), splice1, new Fields( "num1" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1278 1279 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1280 1281 // flow.writeDOT( "joinsamesourceintojoin.dot" ); 1282 1283 if( getPlatform().isMapReduce() ) 1284 assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() ); 1285 1286 flow.complete(); 1287 1288 validateLength( flow, 5, null ); 1289 1290 List<Tuple> actual = getSinkAsList( flow ); 1291 1292 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 1293 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) ); 1294 } 1295 1296 /** 1297 * Tests that two independent streamed sources with loadable tributaries properly plan into a GroupBy 1298 * without loading unused sources 1299 * 1300 * @throws Exception 1301 */ 1302 @Test 1303 public void testJoinsIntoGroupBy() throws Exception 1304 { 1305 getPlatform().copyFromLocal( inputFileLower ); 1306 getPlatform().copyFromLocal( inputFileUpper ); 1307 1308 getPlatform().copyFromLocal( inputFileLhs ); 1309 getPlatform().copyFromLocal( inputFileRhs ); 1310 1311 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1312 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1313 1314 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1315 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1316 1317 Map sources = new HashMap(); 1318 1319 sources.put( "lower", sourceLower ); 1320 sources.put( "upper", sourceUpper ); 1321 sources.put( "lhs", sourceLhs ); 1322 sources.put( "rhs", sourceRhs ); 1323 1324 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintogroupby" ), SinkMode.REPLACE ); 1325 1326 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1327 1328 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1329 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1330 1331 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1332 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1333 1334 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1335 1336 upperLower = new Each( upperLower, new Identity() ); 1337 1338 Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1339 1340 lhsRhs = new Each( lhsRhs, new Identity() ); 1341 1342 Pipe grouped = new GroupBy( "merging", Pipe.pipes( upperLower, lhsRhs ), new Fields( "num1" ) ); 1343 1344 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1345 1346 if( getPlatform().isMapReduce() ) 1347 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1348 1349 flow.complete(); 1350 1351 validateLength( flow, 42, null ); 1352 1353 List<Tuple> actual = getSinkAsList( flow ); 1354 1355 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1356 assertTrue( actual.contains( new Tuple( "5\te\t5\tE" ) ) ); 1357 } 1358 1359 @Test 1360 public void testJoinSamePipeAroundGroupBy() throws Exception 1361 { 1362 getPlatform().copyFromLocal( inputFileLower ); 1363 1364 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1365 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipearoundgroupby" ), SinkMode.REPLACE ); 1366 1367 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1368 1369 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1370 1371 Pipe lhsPipe = new Each( new Pipe( "lhs", pipeLower ), new Identity() ); 1372 1373 Pipe rhsPipe = new Each( new Pipe( "rhs", pipeLower ), new Identity() ); 1374 1375 rhsPipe = new GroupBy( rhsPipe, new Fields( "num" ) ); 1376 1377 rhsPipe = new Each( rhsPipe, new Identity() ); 1378 1379 Pipe pipe = new HashJoin( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1380 1381 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 1382 1383 flow.complete(); 1384 1385 validateLength( flow, 5, null ); 1386 1387 List<Tuple> actual = getSinkAsList( flow ); 1388 1389 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1390 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1391 } 1392 1393 /** 1394 * This test results in two MR jobs because one join feeds into the accumulated side of the second. A mapper 1395 * can only stream on branch at a time forcing a temp file between the mappers. see next test for swapped join 1396 * 1397 * @throws Exception 1398 */ 1399 @Test 1400 public void testJoinsIntoCoGroupLhs() throws Exception 1401 { 1402 getPlatform().copyFromLocal( inputFileLower ); 1403 getPlatform().copyFromLocal( inputFileUpper ); 1404 1405 getPlatform().copyFromLocal( inputFileLhs ); 1406 getPlatform().copyFromLocal( inputFileRhs ); 1407 1408 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1409 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1410 1411 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1412 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1413 1414 Map sources = new HashMap(); 1415 1416 sources.put( "lower", sourceLower ); 1417 sources.put( "upper", sourceUpper ); 1418 sources.put( "lhs", sourceLhs ); 1419 sources.put( "rhs", sourceRhs ); 1420 1421 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhs" ), SinkMode.REPLACE ); 1422 1423 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1424 1425 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1426 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1427 1428 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1429 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1430 1431 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1432 1433 upperLower = new Each( upperLower, new Identity() ); 1434 1435 Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1436 1437 lhsUpperLower = new Each( lhsUpperLower, new Identity() ); 1438 1439 Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) ); 1440 1441 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1442 1443 if( getPlatform().isMapReduce() ) 1444 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1445 1446 flow.complete(); 1447 1448 validateLength( flow, 37, null ); 1449 1450 List<Tuple> actual = getSinkAsList( flow ); 1451 1452 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta\t1\tA\t1\tA" ) ) ); 1453 assertTrue( actual.contains( new Tuple( "5\ta\t5\te\t5\tE\t5\tA" ) ) ); 1454 } 1455 1456 /** 1457 * This test results in one MR jobs because one join feeds into the streamed side of the second. 1458 * 1459 * @throws Exception 1460 */ 1461 @Test 1462 public void testJoinsIntoCoGroupLhsSwappedJoin() throws Exception 1463 { 1464 getPlatform().copyFromLocal( inputFileLower ); 1465 getPlatform().copyFromLocal( inputFileUpper ); 1466 1467 getPlatform().copyFromLocal( inputFileLhs ); 1468 getPlatform().copyFromLocal( inputFileRhs ); 1469 1470 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1471 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1472 1473 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1474 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1475 1476 Map sources = new HashMap(); 1477 1478 sources.put( "lower", sourceLower ); 1479 sources.put( "upper", sourceUpper ); 1480 sources.put( "lhs", sourceLhs ); 1481 sources.put( "rhs", sourceRhs ); 1482 1483 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouplhsswappedjoin" ), SinkMode.REPLACE ); 1484 1485 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1486 1487 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1488 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1489 1490 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1491 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1492 1493 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1494 1495 upperLower = new Each( upperLower, new Identity() ); 1496 1497 Pipe lhsUpperLower = new HashJoin( upperLower, new Fields( "numUpperLower" ), pipeLhs, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower", "numLhs", "charLhs" ) ); 1498 1499 lhsUpperLower = new Each( lhsUpperLower, new Identity() ); 1500 1501 Pipe grouped = new CoGroup( "cogrouping", lhsUpperLower, new Fields( "numLhs" ), pipeRhs, new Fields( "num" ) ); 1502 1503 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1504 1505 if( getPlatform().isMapReduce() ) 1506 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1507 1508 flow.complete(); 1509 1510 validateLength( flow, 37, null ); 1511 1512 List<Tuple> actual = getSinkAsList( flow ); 1513 1514 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) ); 1515 assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) ); 1516 } 1517 1518 @Test 1519 public void testJoinsIntoCoGroupRhs() throws Exception 1520 { 1521 getPlatform().copyFromLocal( inputFileLower ); 1522 getPlatform().copyFromLocal( inputFileUpper ); 1523 1524 getPlatform().copyFromLocal( inputFileLhs ); 1525 getPlatform().copyFromLocal( inputFileRhs ); 1526 1527 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1528 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1529 1530 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1531 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1532 1533 Map sources = new HashMap(); 1534 1535 sources.put( "lower", sourceLower ); 1536 sources.put( "upper", sourceUpper ); 1537 sources.put( "lhs", sourceLhs ); 1538 sources.put( "rhs", sourceRhs ); 1539 1540 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogrouprhs" ), SinkMode.REPLACE ); 1541 1542 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1543 1544 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1545 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1546 1547 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1548 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1549 1550 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1551 1552 upperLower = new Each( upperLower, new Identity() ); 1553 1554 Pipe lhsUpperLower = new HashJoin( pipeLhs, new Fields( "num" ), upperLower, new Fields( "numUpperLower" ), new Fields( "numLhs", "charLhs", "numUpperLower", "charUpperLower", "num2UpperLower", "char2UpperLower" ) ); 1555 1556 lhsUpperLower = new Each( lhsUpperLower, new Identity() ); 1557 1558 Pipe grouped = new CoGroup( "cogrouping", pipeRhs, new Fields( "num" ), lhsUpperLower, new Fields( "numLhs" ) ); 1559 1560 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1561 1562 if( getPlatform().isMapReduce() ) 1563 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1564 1565 flow.complete(); 1566 1567 validateLength( flow, 37, null ); 1568 1569 List<Tuple> actual = getSinkAsList( flow ); 1570 1571 assertTrue( actual.contains( new Tuple( "1\tA\t1\ta\t1\ta\t1\tA" ) ) ); 1572 assertTrue( actual.contains( new Tuple( "5\tE\t5\te\t5\te\t5\tE" ) ) ); 1573 } 1574 1575 @Test 1576 public void testJoinsIntoCoGroup() throws Exception 1577 { 1578 getPlatform().copyFromLocal( inputFileLower ); 1579 getPlatform().copyFromLocal( inputFileUpper ); 1580 1581 getPlatform().copyFromLocal( inputFileLhs ); 1582 getPlatform().copyFromLocal( inputFileRhs ); 1583 1584 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1585 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1586 1587 Tap sourceLhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLhs ); 1588 Tap sourceRhs = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileRhs ); 1589 1590 Map sources = new HashMap(); 1591 1592 sources.put( "lower", sourceLower ); 1593 sources.put( "upper", sourceUpper ); 1594 sources.put( "lhs", sourceLhs ); 1595 sources.put( "rhs", sourceRhs ); 1596 1597 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinsintocogroup" ), SinkMode.REPLACE ); 1598 1599 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1600 1601 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1602 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1603 1604 Pipe pipeLhs = new Each( new Pipe( "lhs" ), new Fields( "line" ), splitter ); 1605 Pipe pipeRhs = new Each( new Pipe( "rhs" ), new Fields( "line" ), splitter ); 1606 1607 Pipe upperLower = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new Fields( "numUpperLower1", "charUpperLower1", "numUpperLower2", "charUpperLower2" ) ); 1608 1609 upperLower = new Each( upperLower, new Identity() ); 1610 1611 Pipe lhsRhs = new HashJoin( pipeLhs, new Fields( "num" ), pipeRhs, new Fields( "num" ), new Fields( "numLhsRhs1", "charLhsRhs1", "numLhsRhs2", "charLhsRhs2" ) ); 1612 1613 lhsRhs = new Each( lhsRhs, new Identity() ); 1614 1615 Pipe grouped = new CoGroup( "cogrouping", upperLower, new Fields( "numUpperLower1" ), lhsRhs, new Fields( "numLhsRhs1" ) ); 1616 1617 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, grouped ); 1618 1619 if( getPlatform().isMapReduce() ) 1620 assertEquals( "wrong number of steps", 1, flow.getFlowSteps().size() ); 1621 1622 flow.complete(); 1623 1624 validateLength( flow, 37, null ); 1625 1626 List<Tuple> actual = getSinkAsList( flow ); 1627 1628 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\ta\t1\tA" ) ) ); 1629 assertTrue( actual.contains( new Tuple( "5\te\t5\tE\t5\te\t5\tE" ) ) ); 1630 } 1631 1632 public static class AllComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable 1633 { 1634 1635 @Override 1636 public int compare( Comparable lhs, Comparable rhs ) 1637 { 1638 return lhs.toString().compareTo( rhs.toString() ); 1639 } 1640 1641 @Override 1642 public int hashCode( Comparable value ) 1643 { 1644 if( value == null ) 1645 return 0; 1646 1647 return value.toString().hashCode(); 1648 } 1649 } 1650 1651 /** 1652 * Tests Hasher being honored even if default comparator is null. 1653 * 1654 * @throws Exception 1655 */ 1656 @Test 1657 public void testJoinWithHasher() throws Exception 1658 { 1659 getPlatform().copyFromLocal( inputFileLower ); 1660 getPlatform().copyFromLocal( inputFileUpper ); 1661 1662 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1663 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1664 1665 Map sources = new HashMap(); 1666 1667 sources.put( "lower", sourceLower ); 1668 sources.put( "upper", sourceUpper ); 1669 1670 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinhasher" ), SinkMode.REPLACE ); 1671 1672 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1673 1674 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1675 1676 pipeLower = new Each( pipeLower, new Fields( "num" ), new ExpressionFunction( Fields.ARGS, "Integer.parseInt( num )", String.class ), Fields.REPLACE ); 1677 1678 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1679 1680 Fields num = new Fields( "num" ); 1681 num.setComparator( "num", new AllComparator() ); 1682 1683 Pipe splice = new HashJoin( pipeLower, num, pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 1684 1685 Map<Object, Object> properties = getProperties(); 1686 1687 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 1688 1689 flow.complete(); 1690 1691 validateLength( flow, 5 ); 1692 1693 List<Tuple> values = getSinkAsList( flow ); 1694 1695 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1696 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1697 } 1698 1699 @Test 1700 public void testJoinNone() throws Exception 1701 { 1702 getPlatform().copyFromLocal( inputFileLower ); 1703 getPlatform().copyFromLocal( inputFileUpper ); 1704 1705 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1706 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1707 1708 Map sources = new HashMap(); 1709 1710 sources.put( "lower", sourceLower ); 1711 sources.put( "upper", sourceUpper ); 1712 1713 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE ); 1714 1715 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1716 1717 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1718 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1719 1720 Pipe splice = new HashJoin( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) ); 1721 1722 Map<Object, Object> properties = getProperties(); 1723 1724 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 1725 1726 flow.complete(); 1727 1728 validateLength( flow, 25 ); 1729 1730 List<Tuple> values = getSinkAsList( flow ); 1731 1732 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1733 assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) ); 1734 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1735 } 1736 1737 /** 1738 * When run against a cluster a Merge before a GroupBy can hide the streamed/accumulated nature of a branch. 1739 * 1740 * commented code is for troubleshooting. 1741 * @throws Exception 1742 */ 1743 @Test 1744 public void testJoinMergeGroupBy() throws Exception 1745 { 1746 getPlatform().copyFromLocal( inputFileNums10 ); 1747 getPlatform().copyFromLocal( inputFileNums20 ); 1748 1749 Tap lhsTap = getPlatform().getTextFile( new Fields( "id" ), inputFileNums10 ); 1750 Tap rhsTap = getPlatform().getTextFile( new Fields( "id2" ), inputFileNums20 ); 1751 1752 Pipe lhs = new Pipe( "lhs" ); 1753 Pipe rhs = new Pipe( "rhs" ); 1754 1755 // Pipe joined = new CoGroup( messages, new Fields( "id" ), people, new Fields( "id2" ) ); 1756 Pipe joined = new HashJoin( lhs, new Fields( "id" ), rhs, new Fields( "id2" ) ); 1757 1758 Pipe pruned = new Each( joined, new Fields( "id2" ), new Identity(), Fields.RESULTS ); 1759 // pruned = new Checkpoint( pruned ); 1760 Pipe merged = new Merge( pruned, rhs ); 1761 Pipe grouped = new GroupBy( merged, new Fields( "id2" ) ); 1762 // Pipe grouped = new GroupBy( Pipe.pipes( pruned, people ), new Fields( "id2" ) ); 1763 Aggregator count = new Count( new Fields( "count" ) ); 1764 Pipe counted = new Every( grouped, count ); 1765 1766 String testJoinMerge = "testJoinMergeGroupBy/" + ( ( joined instanceof CoGroup ) ? "cogroup" : "hashjoin" ); 1767 Tap sink = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", null, getOutputPath( testJoinMerge ), SinkMode.REPLACE ); 1768 1769 FlowDef flowDef = FlowDef.flowDef() 1770 .addSource( rhs, rhsTap ) 1771 .addSource( lhs, lhsTap ) 1772 .addTailSink( counted, sink ); 1773 1774 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 1775 1776 // flow.writeDOT( "joinmerge.dot" ); 1777 // flow.writeStepsDOT( "joinmerge-steps.dot" ); 1778 1779 flow.complete(); 1780 1781 validateLength( flow, 20 ); 1782 1783 List<Tuple> values = getSinkAsList( flow ); 1784 List<Tuple> expected = new ArrayList<Tuple>(); 1785 1786 expected.add( new Tuple( "1", "2" ) ); 1787 expected.add( new Tuple( "10", "2" ) ); 1788 expected.add( new Tuple( "11", "1" ) ); 1789 expected.add( new Tuple( "12", "1" ) ); 1790 expected.add( new Tuple( "13", "1" ) ); 1791 expected.add( new Tuple( "14", "1" ) ); 1792 expected.add( new Tuple( "15", "1" ) ); 1793 expected.add( new Tuple( "16", "1" ) ); 1794 expected.add( new Tuple( "17", "1" ) ); 1795 expected.add( new Tuple( "18", "1" ) ); 1796 expected.add( new Tuple( "19", "1" ) ); 1797 expected.add( new Tuple( "2", "2" ) ); 1798 expected.add( new Tuple( "20", "1" ) ); 1799 expected.add( new Tuple( "3", "2" ) ); 1800 expected.add( new Tuple( "4", "2" ) ); 1801 expected.add( new Tuple( "5", "2" ) ); 1802 expected.add( new Tuple( "6", "2" ) ); 1803 expected.add( new Tuple( "7", "2" ) ); 1804 expected.add( new Tuple( "8", "2" ) ); 1805 expected.add( new Tuple( "9", "2" ) ); 1806 1807 Collections.sort(values); 1808 Collections.sort(expected); 1809 1810 assertEquals( expected, values ); 1811 } 1812 }