001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading; 022 023import java.io.IOException; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029 030import cascading.cascade.Cascades; 031import cascading.flow.Flow; 032import cascading.flow.FlowConnectorProps; 033import cascading.flow.FlowDef; 034import cascading.flow.FlowProps; 035import cascading.operation.Function; 036import cascading.operation.Identity; 037import cascading.operation.Insert; 038import cascading.operation.aggregator.Count; 039import cascading.operation.aggregator.First; 040import cascading.operation.regex.RegexFilter; 041import cascading.operation.regex.RegexSplitGenerator; 042import cascading.operation.regex.RegexSplitter; 043import cascading.pipe.CoGroup; 044import cascading.pipe.Each; 045import cascading.pipe.Every; 046import cascading.pipe.GroupBy; 047import cascading.pipe.Pipe; 048import cascading.pipe.assembly.Discard; 049import cascading.pipe.assembly.Rename; 050import cascading.pipe.assembly.Retain; 051import cascading.pipe.joiner.InnerJoin; 052import cascading.pipe.joiner.Joiner; 053import cascading.pipe.joiner.LeftJoin; 054import cascading.pipe.joiner.MixedJoin; 055import cascading.pipe.joiner.OuterJoin; 056import cascading.pipe.joiner.RightJoin; 057import cascading.tap.SinkMode; 058import cascading.tap.Tap; 059import cascading.tuple.Fields; 060import cascading.tuple.Tuple; 061import cascading.util.NullNotEquivalentComparator; 062import org.junit.Test; 063 064import static data.InputData.*; 065 066public class CoGroupFieldedPipesPlatformTest extends PlatformTestCase 067 { 068 public CoGroupFieldedPipesPlatformTest() 069 { 070 super( true, 4, 1 ); // leave cluster testing enabled 071 } 072 073 @Test 074 public void testCross() throws Exception 075 { 076 getPlatform().copyFromLocal( inputFileLhs ); 077 getPlatform().copyFromLocal( inputFileRhs ); 078 079 Map sources = new HashMap(); 080 081 sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) ); 082 sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) ); 083 084 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE ); 085 086 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 087 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 088 089 Pipe cross = new CoGroup( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 090 091 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross ); 092 093 flow.complete(); 094 095 validateLength( flow, 37, null ); 096 097 List<Tuple> values = getSinkAsList( flow ); 098 099 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 100 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 101 } 102 103 @Test 104 public void testCoGroup() throws Exception 105 { 106 getPlatform().copyFromLocal( inputFileLower ); 107 getPlatform().copyFromLocal( inputFileUpper ); 108 109 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 110 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 111 112 Map sources = new HashMap(); 113 114 sources.put( "lower", sourceLower ); 115 sources.put( "upper", sourceUpper ); 116 117 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroup" ), SinkMode.REPLACE ); 118 119 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 120 121 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 122 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 123 124 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new InnerJoin( Fields.size( 4 ) ) ); 125 126 Map<Object, Object> properties = getProperties(); 127 128 // make sure hasher is getting called, but does nothing special 129 FlowProps.setDefaultTupleElementComparator( properties, getPlatform().getStringComparator( false ).getClass().getCanonicalName() ); 130 131 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 132 133 flow.complete(); 134 135 validateLength( flow, 5 ); 136 137 List<Tuple> values = getSinkAsList( flow ); 138 139 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 140 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 141 } 142 143 @Test 144 public void testCoGroupSamePipeName() throws Exception 145 { 146 getPlatform().copyFromLocal( inputFileLower ); 147 getPlatform().copyFromLocal( inputFileUpper ); 148 149 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 150 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 151 152 Map sources = new HashMap(); 153 154 sources.put( "lower", sourceLower ); 155 sources.put( "upper", sourceUpper ); 156 157 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE ); 158 159 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 160 161 Pipe pipeLower = new Pipe( "lower" ); 162 Pipe pipeUpper = new Pipe( "upper" ); 163 164 // these pipes will hide the source name, and could cause one to be lost 165 pipeLower = new Pipe( "same", pipeLower ); 166 pipeUpper = new Pipe( "same", pipeUpper ); 167 168 pipeLower = new Each( pipeLower, new Fields( "line" ), splitter ); 169 pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter ); 170 171// pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 172// pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 173 174 pipeLower = new Pipe( "left", pipeLower ); 175 pipeUpper = new Pipe( "right", pipeUpper ); 176 177// pipeLower = new Each( pipeLower, new Debug( true ) ); 178// pipeUpper = new Each( pipeUpper, new Debug( true ) ); 179 180 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 181 182// splice = new Each( splice, new Debug( true ) ); 183 splice = new Pipe( "splice", splice ); 184 splice = new Pipe( "tail", splice ); 185 186 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 187 188 flow.complete(); 189 190 validateLength( flow, 5 ); 191 192 List<Tuple> values = getSinkAsList( flow ); 193 194 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 195 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 196 } 197 198 @Test 199 public void testCoGroupWithUnknowns() throws Exception 200 { 201 getPlatform().copyFromLocal( inputFileLower ); 202 getPlatform().copyFromLocal( inputFileUpper ); 203 204 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 205 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 206 207 Map sources = new HashMap(); 208 209 sources.put( "lower", sourceLower ); 210 sources.put( "upper", sourceUpper ); 211 212 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE ); 213 214 Function splitter = new RegexSplitter( Fields.UNKNOWN, " " ); 215 216 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 217 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 218 219 Pipe splice = new CoGroup( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) ); 220 221 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 222 223 flow.complete(); 224 225 validateLength( flow, 5 ); 226 227 List<Tuple> values = getSinkAsList( flow ); 228 229 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 230 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 231 } 232 233 /** 234 * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with 235 * a new stream using an outerjoin. 236 * 237 * @throws Exception 238 */ 239 @Test 240 public void testCoGroupFilteredBranch() throws Exception 241 { 242 getPlatform().copyFromLocal( inputFileLower ); 243 getPlatform().copyFromLocal( inputFileUpper ); 244 245 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 246 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 247 248 Map sources = new HashMap(); 249 250 sources.put( "lower", sourceLower ); 251 sources.put( "upper", sourceUpper ); 252 253 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupfilteredbranch" ), SinkMode.REPLACE ); 254 255 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 256 257 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 258 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 259 pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all 260 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 261 262 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() ); 263 264 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 265 266 flow.complete(); 267 268 validateLength( flow, 5 ); 269 270 List<Tuple> values = getSinkAsList( flow ); 271 272 assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) ); 273 assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) ); 274 } 275 276 @Test 277 public void testCoGroupSelf() throws Exception 278 { 279 getPlatform().copyFromLocal( inputFileLower ); 280 281 // intentionally creating multiple instances that are equivalent 282 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 283 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 284 285 Map sources = new HashMap(); 286 287 sources.put( "lower", sourceLower ); 288 sources.put( "upper", sourceUpper ); 289 290 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupself" ), SinkMode.REPLACE ); 291 292 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 293 294 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 295 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 296 297 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 298 299 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 300 301 flow.complete(); 302 303 validateLength( flow, 5 ); 304 305 List<Tuple> values = getSinkAsList( flow ); 306 307 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 308 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 309 } 310 311 @Test 312 public void testSplitCoGroupSelf() throws Exception 313 { 314 getPlatform().copyFromLocal( inputFileLower ); 315 316 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 317 318 Map sources = new HashMap(); 319 320 sources.put( "lowerLhs", source ); 321 sources.put( "upperLhs", source ); 322 sources.put( "lowerRhs", source ); 323 sources.put( "upperRhs", source ); 324 325 Tap sinkLhs = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "splitcogroupself/lhs" ), SinkMode.REPLACE ); 326 Tap sinkRhs = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "splitcogroupself/rhs" ), SinkMode.REPLACE ); 327 328 Map sinks = new HashMap(); 329 330 sinks.put( "lhs", sinkLhs ); 331 sinks.put( "rhs", sinkRhs ); 332 333 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 334 335 Pipe pipeLowerLhs = new Each( new Pipe( "lowerLhs" ), new Fields( "line" ), splitter ); 336 Pipe pipeUpperLhs = new Each( new Pipe( "upperLhs" ), new Fields( "line" ), splitter ); 337 338 Pipe spliceLhs = new CoGroup( "lhs", pipeLowerLhs, new Fields( "num" ), pipeUpperLhs, new Fields( "num" ), Fields.size( 4 ) ); 339 340 Pipe pipeLowerRhs = new Each( new Pipe( "lowerRhs" ), new Fields( "line" ), splitter ); 341 Pipe pipeUpperRhs = new Each( new Pipe( "upperRhs" ), new Fields( "line" ), splitter ); 342 343 Pipe spliceRhs = new CoGroup( "rhs", pipeLowerRhs, new Fields( "num" ), pipeUpperRhs, new Fields( "num" ), Fields.size( 4 ) ); 344 345 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, spliceLhs, spliceRhs ); 346 347 flow.complete(); 348 349 List<Tuple> values = asList( flow, sinkLhs ); 350 351 assertEquals( 5, values.size() ); 352 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 353 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 354 355 values = asList( flow, sinkRhs ); 356 357 assertEquals( 5, values.size() ); 358 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 359 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 360 } 361 362 /** 363 * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join 364 * 365 * @throws Exception when 366 */ 367 @Test 368 public void testCoGroupAfterEvery() throws Exception 369 { 370 getPlatform().copyFromLocal( inputFileLower ); 371 getPlatform().copyFromLocal( inputFileUpper ); 372 373 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 374 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 375 376 Map sources = new HashMap(); 377 378 sources.put( "lower", sourceLower ); 379 sources.put( "upper", sourceUpper ); 380 381 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE ); 382 383 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 384 385 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 386 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) ); 387 pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL ); 388 389 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 390 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 391 pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL ); 392 393 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 394 395 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 396 397 flow.complete(); 398 399 validateLength( flow, 5, null ); 400 401 List<Tuple> values = getSinkAsList( flow ); 402 403 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 404 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 405 } 406 407 /** 408 * Tests that CoGroup properly resolves fields when following an Every 409 * 410 * @throws Exception 411 */ 412 @Test 413 public void testCoGroupAfterEveryNoDeclared() throws Exception 414 { 415 getPlatform().copyFromLocal( inputFileLower ); 416 getPlatform().copyFromLocal( inputFileUpper ); 417 418 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 419 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 420 421 Map sources = new HashMap(); 422 423 sources.put( "lower", sourceLower ); 424 sources.put( "upper", sourceUpper ); 425 426 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "aftereverynodeclared" ), SinkMode.REPLACE ); 427 428 Function splitter1 = new RegexSplitter( new Fields( "num1", "char1" ), " " ); 429 430 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter1 ); 431 pipeLower = new Each( pipeLower, new Insert( new Fields( "one", "two", "three", "four" ), "one", "two", "three", "four" ), Fields.ALL ); 432 pipeLower = new GroupBy( pipeLower, new Fields( "num1" ) ); 433 pipeLower = new Every( pipeLower, new Fields( "char1" ), new First(), Fields.ALL ); 434 435 Function splitter2 = new RegexSplitter( new Fields( "num2", "char2" ), " " ); 436 437 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter2 ); 438 pipeUpper = new GroupBy( pipeUpper, new Fields( "num2" ) ); 439 pipeUpper = new Every( pipeUpper, new Fields( "char2" ), new First(), Fields.ALL ); 440 441 Pipe splice = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) ); 442 443 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 444 445 flow.complete(); 446 447 validateLength( flow, 5, null ); 448 449 List<Tuple> values = getSinkAsList( flow ); 450 451 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 452 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 453 } 454 455 @Test 456 public void testCoGroupInnerSingleField() throws Exception 457 { 458 getPlatform().copyFromLocal( inputFileLowerOffset ); 459 getPlatform().copyFromLocal( inputFileUpper ); 460 461 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 462 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 463 464 Map sources = new HashMap(); 465 466 sources.put( "lower", sourceLower ); 467 sources.put( "upper", sourceUpper ); 468 469 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupinnersingle" ), SinkMode.REPLACE ); 470 471 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) ); 472 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) ); 473 474 Pipe join = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) ); 475 476 join = new Every( join, new Count() ); 477 478// join = new Each( join, new Debug( true ) ); 479 480 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 481 482 flow.complete(); 483 484 validateLength( flow, 2, null ); 485 486 Set<Tuple> results = new HashSet<Tuple>(); 487 488 results.add( new Tuple( "1\t1\t1" ) ); 489 results.add( new Tuple( "5\t5\t2" ) ); 490 491 List<Tuple> actual = getSinkAsList( flow ); 492 493 results.removeAll( actual ); 494 495 assertEquals( 0, results.size() ); 496 } 497 498 /** 499 * 1 a1 500 * 1 a2 501 * 1 a3 502 * 2 b1 503 * 3 c1 504 * 4 d1 505 * 4 d2 506 * 4 d3 507 * 5 e1 508 * 5 e2 509 * 5 e3 510 * 7 g1 511 * 7 g2 512 * 7 g3 513 * 7 g4 514 * 7 g5 515 * null h1 516 * <p/> 517 * 1 A1 518 * 1 A2 519 * 1 A3 520 * 2 B1 521 * 2 B2 522 * 2 B3 523 * 4 D1 524 * 6 F1 525 * 6 F2 526 * null H1 527 * <p/> 528 * 1 a1 1 A1 529 * 1 a1 1 A2 530 * 1 a1 1 A3 531 * 1 a2 1 A1 532 * 1 a2 1 A2 533 * 1 a2 1 A3 534 * 1 a3 1 A1 535 * 1 a3 1 A2 536 * 1 a3 1 A3 537 * 2 b1 2 B1 538 * 2 b1 2 B2 539 * 2 b1 2 B3 540 * 4 d1 4 D1 541 * 4 d2 4 D1 542 * 4 d3 4 D1 543 * null h1 null H1 544 * 545 * @throws Exception 546 */ 547 @Test 548 public void testCoGroupInner() throws Exception 549 { 550 HashSet<Tuple> results = new HashSet<Tuple>(); 551 552 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 553 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 554 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 555 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 556 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 557 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 558 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 559 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 560 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 561 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 562 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 563 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 564 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 565 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 566 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 567 results.add( new Tuple( null, "h1", null, "H1" ) ); 568 569 handleJoins( "cogroupinner", new InnerJoin(), results, 8, false, null ); 570 handleJoins( "cogroupinner-resultgroup", new InnerJoin(), results, 8, true, null ); 571 } 572 573 /** 574 * 1 a1 575 * 1 a2 576 * 1 a3 577 * 2 b1 578 * 3 c1 579 * 4 d1 580 * 4 d2 581 * 4 d3 582 * 5 e1 583 * 5 e2 584 * 5 e3 585 * 7 g1 586 * 7 g2 587 * 7 g3 588 * 7 g4 589 * 7 g5 590 * null h1 591 * <p/> 592 * 1 A1 593 * 1 A2 594 * 1 A3 595 * 2 B1 596 * 2 B2 597 * 2 B3 598 * 4 D1 599 * 6 F1 600 * 6 F2 601 * null H1 602 * <p/> 603 * 1 a1 1 A1 604 * 1 a1 1 A2 605 * 1 a1 1 A3 606 * 1 a2 1 A1 607 * 1 a2 1 A2 608 * 1 a2 1 A3 609 * 1 a3 1 A1 610 * 1 a3 1 A2 611 * 1 a3 1 A3 612 * 2 b1 2 B1 613 * 2 b1 2 B2 614 * 2 b1 2 B3 615 * 4 d1 4 D1 616 * 4 d2 4 D1 617 * 4 d3 4 D1 618 * 619 * @throws Exception 620 */ 621 @Test 622 public void testCoGroupInnerNull() throws Exception 623 { 624 HashSet<Tuple> results = new HashSet<Tuple>(); 625 626 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 627 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 628 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 629 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 630 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 631 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 632 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 633 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 634 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 635 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 636 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 637 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 638 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 639 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 640 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 641 642 handleJoins( "cogroupinnernull", new InnerJoin(), results, 9, false, new NullNotEquivalentComparator() ); 643 handleJoins( "cogroupinnernull-resultgroup", new InnerJoin(), results, 9, true, new NullNotEquivalentComparator() ); 644 } 645 646 /** 647 * 1 a1 648 * 1 a2 649 * 1 a3 650 * 2 b1 651 * 3 c1 652 * 4 d1 653 * 4 d2 654 * 4 d3 655 * 5 e1 656 * 5 e2 657 * 5 e3 658 * 7 g1 659 * 7 g2 660 * 7 g3 661 * 7 g4 662 * 7 g5 663 * null h1 664 * <p/> 665 * 1 A1 666 * 1 A2 667 * 1 A3 668 * 2 B1 669 * 2 B2 670 * 2 B3 671 * 4 D1 672 * 6 F1 673 * 6 F2 674 * null H1 675 * <p/> 676 * 1 a1 1 A1 677 * 1 a1 1 A2 678 * 1 a1 1 A3 679 * 1 a2 1 A1 680 * 1 a2 1 A2 681 * 1 a2 1 A3 682 * 1 a3 1 A1 683 * 1 a3 1 A2 684 * 1 a3 1 A3 685 * 2 b1 2 B1 686 * 2 b1 2 B2 687 * 2 b1 2 B3 688 * 3 c1 null null 689 * 4 d1 4 D1 690 * 4 d2 4 D1 691 * 4 d3 4 D1 692 * 5 e1 null null 693 * 5 e2 null null 694 * 5 e3 null null 695 * null null 6 F1 696 * null null 6 F2 697 * 7 g1 null null 698 * 7 g2 null null 699 * 7 g3 null null 700 * 7 g4 null null 701 * 7 g5 null null 702 * null h1 null H1 703 * 704 * @throws Exception 705 */ 706 @Test 707 public void testCoGroupOuter() throws Exception 708 { 709 Set<Tuple> results = new HashSet<Tuple>(); 710 711 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 712 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 713 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 714 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 715 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 716 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 717 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 718 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 719 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 720 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 721 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 722 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 723 results.add( new Tuple( "3", "c1", null, null ) ); 724 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 725 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 726 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 727 results.add( new Tuple( "5", "e1", null, null ) ); 728 results.add( new Tuple( "5", "e2", null, null ) ); 729 results.add( new Tuple( "5", "e3", null, null ) ); 730 results.add( new Tuple( null, null, "6", "F1" ) ); 731 results.add( new Tuple( null, null, "6", "F2" ) ); 732 results.add( new Tuple( "7", "g1", null, null ) ); 733 results.add( new Tuple( "7", "g2", null, null ) ); 734 results.add( new Tuple( "7", "g3", null, null ) ); 735 results.add( new Tuple( "7", "g4", null, null ) ); 736 results.add( new Tuple( "7", "g5", null, null ) ); 737 results.add( new Tuple( null, "h1", null, "H1" ) ); 738 739 handleJoins( "cogroupouter", new OuterJoin(), results, 8, false, null ); 740 handleJoins( "cogroupouter-resultgroup", new OuterJoin(), results, 8, true, null ); 741 } 742 743 /** 744 * 1 a1 745 * 1 a2 746 * 1 a3 747 * 2 b1 748 * 3 c1 749 * 4 d1 750 * 4 d2 751 * 4 d3 752 * 5 e1 753 * 5 e2 754 * 5 e3 755 * 7 g1 756 * 7 g2 757 * 7 g3 758 * 7 g4 759 * 7 g5 760 * null h1 761 * <p/> 762 * 1 A1 763 * 1 A2 764 * 1 A3 765 * 2 B1 766 * 2 B2 767 * 2 B3 768 * 4 D1 769 * 6 F1 770 * 6 F2 771 * null H1 772 * <p/> 773 * 1 a1 1 A1 774 * 1 a1 1 A2 775 * 1 a1 1 A3 776 * 1 a2 1 A1 777 * 1 a2 1 A2 778 * 1 a2 1 A3 779 * 1 a3 1 A1 780 * 1 a3 1 A2 781 * 1 a3 1 A3 782 * 2 b1 2 B1 783 * 2 b1 2 B2 784 * 2 b1 2 B3 785 * 3 c1 null null 786 * 4 d1 4 D1 787 * 4 d2 4 D1 788 * 4 d3 4 D1 789 * 5 e1 null null 790 * 5 e2 null null 791 * 5 e3 null null 792 * null null 6 F1 793 * null null 6 F2 794 * 7 g1 null null 795 * 7 g2 null null 796 * 7 g3 null null 797 * 7 g4 null null 798 * 7 g5 null null 799 * null h1 null null 800 * null null null H1 801 * 802 * @throws Exception 803 */ 804 @Test 805 public void testCoGroupOuterNull() throws Exception 806 { 807 Set<Tuple> results = new HashSet<Tuple>(); 808 809 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 810 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 811 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 812 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 813 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 814 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 815 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 816 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 817 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 818 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 819 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 820 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 821 results.add( new Tuple( "3", "c1", null, null ) ); 822 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 823 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 824 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 825 results.add( new Tuple( "5", "e1", null, null ) ); 826 results.add( new Tuple( "5", "e2", null, null ) ); 827 results.add( new Tuple( "5", "e3", null, null ) ); 828 results.add( new Tuple( null, null, "6", "F1" ) ); 829 results.add( new Tuple( null, null, "6", "F2" ) ); 830 results.add( new Tuple( "7", "g1", null, null ) ); 831 results.add( new Tuple( "7", "g2", null, null ) ); 832 results.add( new Tuple( "7", "g3", null, null ) ); 833 results.add( new Tuple( "7", "g4", null, null ) ); 834 results.add( new Tuple( "7", "g5", null, null ) ); 835 results.add( new Tuple( null, "h1", null, null ) ); 836 results.add( new Tuple( null, null, null, "H1" ) ); 837 838 handleJoins( "cogroupouternull", new OuterJoin(), results, 9, false, new NullNotEquivalentComparator() ); 839 handleJoins( "cogroupouternull-resultgroup", new OuterJoin(), results, 9, true, new NullNotEquivalentComparator() ); 840 } 841 842 /** 843 * 1 a1 844 * 1 a2 845 * 1 a3 846 * 2 b1 847 * 3 c1 848 * 4 d1 849 * 4 d2 850 * 4 d3 851 * 5 e1 852 * 5 e2 853 * 5 e3 854 * 7 g1 855 * 7 g2 856 * 7 g3 857 * 7 g4 858 * 7 g5 859 * null h1 860 * <p/> 861 * 1 A1 862 * 1 A2 863 * 1 A3 864 * 2 B1 865 * 2 B2 866 * 2 B3 867 * 4 D1 868 * 6 F1 869 * 6 F2 870 * null H1 871 * <p/> 872 * 1 a1 1 A1 873 * 1 a1 1 A2 874 * 1 a1 1 A3 875 * 1 a2 1 A1 876 * 1 a2 1 A2 877 * 1 a2 1 A3 878 * 1 a3 1 A1 879 * 1 a3 1 A2 880 * 1 a3 1 A3 881 * 2 b1 2 B1 882 * 2 b1 2 B2 883 * 2 b1 2 B3 884 * 3 c1 null null 885 * 4 d1 4 D1 886 * 4 d2 4 D1 887 * 4 d3 4 D1 888 * 5 e1 null null 889 * 5 e2 null null 890 * 5 e3 null null 891 * 7 g1 null null 892 * 7 g2 null null 893 * 7 g3 null null 894 * 7 g4 null null 895 * 7 g5 null null 896 * null h1 null H1 897 * 898 * @throws Exception 899 */ 900 @Test 901 public void testCoGroupInnerOuter() throws Exception 902 { 903 Set<Tuple> results = new HashSet<Tuple>(); 904 905 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 906 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 907 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 908 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 909 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 910 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 911 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 912 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 913 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 914 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 915 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 916 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 917 results.add( new Tuple( "3", "c1", null, null ) ); 918 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 919 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 920 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 921 results.add( new Tuple( "5", "e1", null, null ) ); 922 results.add( new Tuple( "5", "e2", null, null ) ); 923 results.add( new Tuple( "5", "e3", null, null ) ); 924 results.add( new Tuple( "7", "g1", null, null ) ); 925 results.add( new Tuple( "7", "g2", null, null ) ); 926 results.add( new Tuple( "7", "g3", null, null ) ); 927 results.add( new Tuple( "7", "g4", null, null ) ); 928 results.add( new Tuple( "7", "g5", null, null ) ); 929 results.add( new Tuple( null, "h1", null, "H1" ) ); 930 931 handleJoins( "cogroupinnerouter", new LeftJoin(), results, 8, false, null ); 932 handleJoins( "cogroupinnerouter-resultgroup", new LeftJoin(), results, 8, true, null ); 933 } 934 935 /** 936 * 1 a1 937 * 1 a2 938 * 1 a3 939 * 2 b1 940 * 3 c1 941 * 4 d1 942 * 4 d2 943 * 4 d3 944 * 5 e1 945 * 5 e2 946 * 5 e3 947 * 7 g1 948 * 7 g2 949 * 7 g3 950 * 7 g4 951 * 7 g5 952 * null h1 953 * <p/> 954 * 1 A1 955 * 1 A2 956 * 1 A3 957 * 2 B1 958 * 2 B2 959 * 2 B3 960 * 4 D1 961 * 6 F1 962 * 6 F2 963 * null H1 964 * <p/> 965 * 1 a1 1 A1 966 * 1 a1 1 A2 967 * 1 a1 1 A3 968 * 1 a2 1 A1 969 * 1 a2 1 A2 970 * 1 a2 1 A3 971 * 1 a3 1 A1 972 * 1 a3 1 A2 973 * 1 a3 1 A3 974 * 2 b1 2 B1 975 * 2 b1 2 B2 976 * 2 b1 2 B3 977 * 3 c1 null null 978 * 4 d1 4 D1 979 * 4 d2 4 D1 980 * 4 d3 4 D1 981 * 5 e1 null null 982 * 5 e2 null null 983 * 5 e3 null null 984 * 7 g1 null null 985 * 7 g2 null null 986 * 7 g3 null null 987 * 7 g4 null null 988 * 7 g5 null null 989 * null h1 null null 990 * 991 * @throws Exception 992 */ 993 @Test 994 public void testCoGroupInnerOuterNull() throws Exception 995 { 996 Set<Tuple> results = new HashSet<Tuple>(); 997 998 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 999 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 1000 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 1001 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 1002 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 1003 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 1004 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 1005 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 1006 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 1007 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 1008 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 1009 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 1010 results.add( new Tuple( "3", "c1", null, null ) ); 1011 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 1012 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 1013 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 1014 results.add( new Tuple( "5", "e1", null, null ) ); 1015 results.add( new Tuple( "5", "e2", null, null ) ); 1016 results.add( new Tuple( "5", "e3", null, null ) ); 1017 results.add( new Tuple( "7", "g1", null, null ) ); 1018 results.add( new Tuple( "7", "g2", null, null ) ); 1019 results.add( new Tuple( "7", "g3", null, null ) ); 1020 results.add( new Tuple( "7", "g4", null, null ) ); 1021 results.add( new Tuple( "7", "g5", null, null ) ); 1022 results.add( new Tuple( null, "h1", null, null ) ); 1023 1024 handleJoins( "cogroupinnerouternull", new LeftJoin(), results, 9, false, new NullNotEquivalentComparator() ); 1025 handleJoins( "cogroupinnerouternull-resultgroup", new LeftJoin(), results, 9, true, new NullNotEquivalentComparator() ); 1026 } 1027 1028 /** 1029 * 1 a1 1030 * 1 a2 1031 * 1 a3 1032 * 2 b1 1033 * 3 c1 1034 * 4 d1 1035 * 4 d2 1036 * 4 d3 1037 * 5 e1 1038 * 5 e2 1039 * 5 e3 1040 * 7 g1 1041 * 7 g2 1042 * 7 g3 1043 * 7 g4 1044 * 7 g5 1045 * null h1 1046 * <p/> 1047 * 1 A1 1048 * 1 A2 1049 * 1 A3 1050 * 2 B1 1051 * 2 B2 1052 * 2 B3 1053 * 4 D1 1054 * 6 F1 1055 * 6 F2 1056 * null H1 1057 * <p/> 1058 * 1 a1 1 A1 1059 * 1 a1 1 A2 1060 * 1 a1 1 A3 1061 * 1 a2 1 A1 1062 * 1 a2 1 A2 1063 * 1 a2 1 A3 1064 * 1 a3 1 A1 1065 * 1 a3 1 A2 1066 * 1 a3 1 A3 1067 * 2 b1 2 B1 1068 * 2 b1 2 B2 1069 * 2 b1 2 B3 1070 * 4 d1 4 D1 1071 * 4 d2 4 D1 1072 * 4 d3 4 D1 1073 * null null 6 F1 1074 * null null 6 F2 1075 * null h1 null H1 1076 * 1077 * @throws Exception 1078 */ 1079 @Test 1080 public void testCoGroupOuterInner() throws Exception 1081 { 1082 Set<Tuple> results = new HashSet<Tuple>(); 1083 1084 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 1085 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 1086 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 1087 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 1088 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 1089 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 1090 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 1091 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 1092 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 1093 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 1094 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 1095 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 1096 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 1097 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 1098 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 1099 results.add( new Tuple( null, null, "6", "F1" ) ); 1100 results.add( new Tuple( null, null, "6", "F2" ) ); 1101 results.add( new Tuple( null, "h1", null, "H1" ) ); 1102 1103 handleJoins( "cogroupouterinner", new RightJoin(), results, 8, false, null ); 1104 handleJoins( "cogroupouterinner-resultgroup", new RightJoin(), results, 8, true, null ); 1105 } 1106 1107 /** 1108 * 1 a1 1109 * 1 a2 1110 * 1 a3 1111 * 2 b1 1112 * 3 c1 1113 * 4 d1 1114 * 4 d2 1115 * 4 d3 1116 * 5 e1 1117 * 5 e2 1118 * 5 e3 1119 * 7 g1 1120 * 7 g2 1121 * 7 g3 1122 * 7 g4 1123 * 7 g5 1124 * null h1 1125 * <p/> 1126 * 1 A1 1127 * 1 A2 1128 * 1 A3 1129 * 2 B1 1130 * 2 B2 1131 * 2 B3 1132 * 4 D1 1133 * 6 F1 1134 * 6 F2 1135 * null H1 1136 * <p/> 1137 * 1 a1 1 A1 1138 * 1 a1 1 A2 1139 * 1 a1 1 A3 1140 * 1 a2 1 A1 1141 * 1 a2 1 A2 1142 * 1 a2 1 A3 1143 * 1 a3 1 A1 1144 * 1 a3 1 A2 1145 * 1 a3 1 A3 1146 * 2 b1 2 B1 1147 * 2 b1 2 B2 1148 * 2 b1 2 B3 1149 * 4 d1 4 D1 1150 * 4 d2 4 D1 1151 * 4 d3 4 D1 1152 * null null 6 F1 1153 * null null 6 F2 1154 * null null null H1 1155 * 1156 * @throws Exception 1157 */ 1158 @Test 1159 public void testCoGroupOuterInnerNull() throws Exception 1160 { 1161 Set<Tuple> results = new HashSet<Tuple>(); 1162 1163 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 1164 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 1165 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 1166 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 1167 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 1168 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 1169 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 1170 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 1171 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 1172 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 1173 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 1174 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 1175 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 1176 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 1177 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 1178 results.add( new Tuple( null, null, "6", "F1" ) ); 1179 results.add( new Tuple( null, null, "6", "F2" ) ); 1180 results.add( new Tuple( null, null, null, "H1" ) ); 1181 1182 handleJoins( "cogroupouterinnernull", new RightJoin(), results, 9, false, new NullNotEquivalentComparator() ); 1183 handleJoins( "cogroupouterinnernull-resultgroup", new RightJoin(), results, 9, true, new NullNotEquivalentComparator() ); 1184 } 1185 1186 private void handleJoins( String path, Joiner joiner, Set<Tuple> results, int numGroups, boolean useResultGroupFields, NullNotEquivalentComparator comparator ) throws Exception 1187 { 1188 results = new HashSet<Tuple>( results ); 1189 1190 getPlatform().copyFromLocal( inputFileLhsSparse ); 1191 getPlatform().copyFromLocal( inputFileRhsSparse ); 1192 1193 Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class ); 1194 Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse ); 1195 Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse ); 1196 1197 Map sources = new HashMap(); 1198 1199 sources.put( "lower", sourceLower ); 1200 sources.put( "upper", sourceUpper ); 1201 1202 Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE ); 1203 1204 Pipe pipeLower = new Pipe( "lower" ); 1205 Pipe pipeUpper = new Pipe( "upper" ); 1206 1207 Fields declaredFields = new Fields( "num", "char", "num2", "char2" ); 1208 1209 Fields groupFields = new Fields( "num" ); 1210 1211 if( comparator != null ) 1212 groupFields.setComparator( 0, comparator ); 1213 1214 Pipe splice; 1215 if( useResultGroupFields ) 1216 splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, new Fields( "num", "num2" ), joiner ); 1217 else 1218 splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, joiner ); 1219 1220 splice = new Every( splice, Fields.ALL, new TestIdentityBuffer( new Fields( "num", "num2" ), numGroups, true ), Fields.RESULTS ); 1221 1222 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 1223 1224 flow.complete(); 1225 1226 validateLength( flow, results.size() ); 1227 1228 List<Tuple> actual = getSinkAsList( flow ); 1229 1230 results.removeAll( actual ); 1231 1232 assertEquals( 0, results.size() ); 1233 } 1234 1235 /** 1236 * 1 a 1237 * 5 b 1238 * 6 c 1239 * 5 b 1240 * 5 e 1241 * <p/> 1242 * 1 A 1243 * 2 B 1244 * 3 C 1245 * 4 D 1246 * 5 E 1247 * <p/> 1248 * 1 a 1249 * 2 b 1250 * 3 c 1251 * 4 d 1252 * 5 e 1253 * <p/> 1254 * 1 a 1 A 1 a 1255 * - - 2 B 2 b 1256 * - - 3 C 3 c 1257 * - - 4 D 4 d 1258 * 5 b 5 E 5 e 1259 * 5 e 5 E 5 e 1260 * 1261 * @throws Exception 1262 */ 1263 @Test 1264 public void testCoGroupMixed() throws Exception 1265 { 1266 getPlatform().copyFromLocal( inputFileLowerOffset ); 1267 getPlatform().copyFromLocal( inputFileLower ); 1268 getPlatform().copyFromLocal( inputFileUpper ); 1269 1270 Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 1271 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1272 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1273 1274 Map sources = new HashMap(); 1275 1276 sources.put( "loweroffset", sourceLowerOffset ); 1277 sources.put( "lower", sourceLower ); 1278 sources.put( "upper", sourceUpper ); 1279 1280 Tap sink = getPlatform().getDelimitedFile( Fields.size( 6, String.class ), "\t", getOutputPath( "cogroupmixed" ), SinkMode.REPLACE ); 1281 1282 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1283 1284 Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter ); 1285 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1286 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1287 1288 Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower ); 1289 Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) ); 1290 1291 MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} ); 1292 Pipe splice = new CoGroup( pipes, fields, Fields.size( 6 ), join ); 1293 1294 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 1295 1296 flow.complete(); 1297 1298 validateLength( flow, 6 ); 1299 1300 Set<Tuple> results = new HashSet<Tuple>(); 1301 1302 results.add( new Tuple( "1", "a", "1", "A", "1", "a" ) ); 1303 results.add( new Tuple( null, null, "2", "B", "2", "b" ) ); 1304 results.add( new Tuple( null, null, "3", "C", "3", "c" ) ); 1305 results.add( new Tuple( null, null, "4", "D", "4", "d" ) ); 1306 results.add( new Tuple( "5", "b", "5", "E", "5", "e" ) ); 1307 results.add( new Tuple( "5", "e", "5", "E", "5", "e" ) ); 1308 1309 List<Tuple> actual = getSinkAsList( flow ); 1310 1311 results.removeAll( actual ); 1312 1313 assertEquals( 0, results.size() ); 1314 } 1315 1316 @Test 1317 public void testCoGroupDiffFields() throws Exception 1318 { 1319 getPlatform().copyFromLocal( inputFileLower ); 1320 getPlatform().copyFromLocal( inputFileUpper ); 1321 1322 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1323 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1324 1325 Map sources = new HashMap(); 1326 1327 sources.put( "lower", sourceLower ); 1328 sources.put( "upper", sourceUpper ); 1329 1330 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE ); 1331 1332 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 1333 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 1334 1335 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1336 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1337 1338 Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1339 1340 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1341 1342 flow.complete(); 1343 1344 validateLength( flow, 5 ); 1345 1346 List<Tuple> actual = getSinkAsList( flow ); 1347 1348 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1349 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1350 } 1351 1352 @Test 1353 public void testCoGroupGroupBy() throws Exception 1354 { 1355 getPlatform().copyFromLocal( inputFileLower ); 1356 getPlatform().copyFromLocal( inputFileUpper ); 1357 1358 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1359 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1360 1361 Map sources = new HashMap(); 1362 1363 sources.put( "lower", sourceLower ); 1364 sources.put( "upper", sourceUpper ); 1365 1366 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupgroupby" ), SinkMode.REPLACE ); 1367 1368 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 1369 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 1370 1371 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1372 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1373 1374 Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1375 1376 Pipe groupby = new GroupBy( cogroup, new Fields( "numA" ) ); 1377 1378 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, groupby ); 1379 1380 flow.complete(); 1381 1382 validateLength( flow, 5, null ); 1383 1384 List<Tuple> actual = getSinkAsList( flow ); 1385 1386 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1387 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1388 } 1389 1390 @Test 1391 public void testCoGroupSamePipe() throws Exception 1392 { 1393 getPlatform().copyFromLocal( inputFileLower ); 1394 1395 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1396 1397 Map sources = new HashMap(); 1398 1399 sources.put( "lower", source ); 1400 1401 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE ); 1402 1403 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1404 1405 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1406 1407 Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) ); 1408 1409 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1410 1411 flow.complete(); 1412 1413 validateLength( flow, 5, null ); 1414 1415 List<Tuple> actual = getSinkAsList( flow ); 1416 1417 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1418 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1419 } 1420 1421 @Test 1422 public void testCoGroupSamePipe2() throws Exception 1423 { 1424 getPlatform().copyFromLocal( inputFileLower ); 1425 1426 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1427 1428 Map sources = new HashMap(); 1429 1430 sources.put( "lower", source ); 1431 1432 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE ); 1433 1434 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1435 1436 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1437 1438 Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1439 1440 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1441 1442 flow.complete(); 1443 1444 validateLength( flow, 5, null ); 1445 1446 List<Tuple> actual = getSinkAsList( flow ); 1447 1448 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1449 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1450 } 1451 1452 @Test 1453 public void testCoGroupSamePipe3() throws Exception 1454 { 1455 getPlatform().copyFromLocal( inputFileLower ); 1456 1457 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower ); 1458 1459 Map sources = new HashMap(); 1460 1461 sources.put( "lower", source ); 1462 1463 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE ); 1464 1465 Pipe pipe = new Pipe( "lower" ); 1466 1467 Pipe lhs = new Pipe( "lhs", pipe ); 1468 Pipe rhs = new Pipe( "rhs", pipe ); 1469 1470 Pipe cogroup = new CoGroup( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1471 1472 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1473 1474 flow.complete(); 1475 1476 validateLength( flow, 5, null ); 1477 1478 List<Tuple> actual = getSinkAsList( flow ); 1479 1480 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1481 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1482 } 1483 1484 @Test 1485 public void testCoGroupAroundCoGroup() throws Exception 1486 { 1487 getPlatform().copyFromLocal( inputFileLower ); 1488 getPlatform().copyFromLocal( inputFileUpper ); 1489 1490 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1491 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1492 1493 Map sources = new HashMap(); 1494 1495 sources.put( "lower", sourceLower ); 1496 sources.put( "upper1", sourceUpper ); 1497 sources.put( "upper2", sourceUpper ); 1498 1499 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupacogroup" ), SinkMode.REPLACE ); 1500 1501 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1502 1503 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1504 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1505 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1506 1507 Pipe splice1 = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1508 1509 splice1 = new Each( splice1, new Identity() ); 1510 1511 Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1512 1513 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1514 1515 flow.complete(); 1516 1517 validateLength( flow, 5, null ); 1518 1519 List<Tuple> actual = getSinkAsList( flow ); 1520 1521 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 1522 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) ); 1523 } 1524 1525 @Test 1526 public void testCoGroupAroundCoGroupWithout() throws Exception 1527 { 1528 runCoGroupAroundCoGroup( null, "cogroupacogroupopt1" ); 1529 } 1530 1531 @Test 1532 public void testCoGroupAroundCoGroupWith() throws Exception 1533 { 1534 // hack to get classname 1535 runCoGroupAroundCoGroup( getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ).getScheme().getClass(), "cogroupacogroupopt2" ); 1536 } 1537 1538 private void runCoGroupAroundCoGroup( Class schemeClass, String stringPath ) throws IOException 1539 { 1540 getPlatform().copyFromLocal( inputFileNums20 ); 1541 getPlatform().copyFromLocal( inputFileNums10 ); 1542 1543 Tap source10 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ); 1544 Tap source20 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums20 ); 1545 1546 Map sources = new HashMap(); 1547 1548 sources.put( "source20", source20 ); 1549 sources.put( "source101", source10 ); 1550 sources.put( "source102", source10 ); 1551 1552 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( stringPath ), SinkMode.REPLACE ); 1553 1554 Pipe pipeNum20 = new Pipe( "source20" ); 1555 Pipe pipeNum101 = new Pipe( "source101" ); 1556 Pipe pipeNum102 = new Pipe( "source102" ); 1557 1558 Pipe splice1 = new CoGroup( pipeNum20, new Fields( "num" ), pipeNum101, new Fields( "num" ), new Fields( "num1", "num2" ) ); 1559 1560 Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeNum102, new Fields( "num" ), new Fields( "num1", "num2", "num3" ) ); 1561 1562 splice2 = new Each( splice2, new Identity() ); 1563 1564 Map<Object, Object> properties = getPlatform().getProperties(); 1565 1566 if( getPlatform().isMapReduce() ) 1567 FlowConnectorProps.setIntermediateSchemeClass( properties, schemeClass ); 1568 1569 Flow flow = getPlatform().getFlowConnector( properties ).connect( "cogroupopt", sources, sink, splice2 ); 1570 1571 if( getPlatform().isMapReduce() ) 1572 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1573 1574 flow.complete(); 1575 1576 validateLength( flow, 10 ); 1577 1578 List<Tuple> actual = getSinkAsList( flow ); 1579 1580 assertTrue( actual.contains( new Tuple( "1\t1\t1" ) ) ); 1581 assertTrue( actual.contains( new Tuple( "10\t10\t10" ) ) ); 1582 } 1583 1584 @Test 1585 public void testCoGroupDiffFieldsSameFile() throws Exception 1586 { 1587 getPlatform().copyFromLocal( inputFileLower ); 1588 getPlatform().copyFromLocal( inputFileUpper ); 1589 1590 Tap sourceOffsetLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1591 Tap sourceLower = getPlatform().getTextFile( new Fields( "line" ), inputFileLower ); 1592 1593 Map sources = new HashMap(); 1594 1595 sources.put( "offsetLower", sourceOffsetLower ); 1596 sources.put( "lower", sourceLower ); 1597 1598 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samefiledifffields" ), SinkMode.REPLACE ); 1599 1600 Function splitterLower = new RegexSplitter( new Fields( "numA", "left" ), " " ); 1601 Function splitterUpper = new RegexSplitter( new Fields( "numB", "right" ), " " ); 1602 1603 Pipe offsetLower = new Pipe( "offsetLower" ); 1604 offsetLower = new Discard( offsetLower, new Fields( "offset" ) ); 1605 offsetLower = new Each( offsetLower, new Fields( "line" ), splitterLower ); 1606 1607 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterUpper ); 1608 1609 Pipe cogroup = new CoGroup( offsetLower, new Fields( "numA" ), pipeLower, new Fields( "numB" ) ); 1610 1611 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1612 1613 flow.complete(); 1614 1615 validateLength( flow, 5 ); 1616 1617 List<Tuple> actual = getSinkAsList( flow ); 1618 1619 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1620 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1621 } 1622 1623 @Test 1624 public void testJoinNone() throws Exception 1625 { 1626 getPlatform().copyFromLocal( inputFileLower ); 1627 getPlatform().copyFromLocal( inputFileUpper ); 1628 1629 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1630 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1631 1632 Map sources = new HashMap(); 1633 1634 sources.put( "lower", sourceLower ); 1635 sources.put( "upper", sourceUpper ); 1636 1637 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE ); 1638 1639 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1640 1641 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1642 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1643 1644 Pipe splice = new CoGroup( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) ); 1645 1646 Map<Object, Object> properties = getProperties(); 1647 1648 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 1649 1650 flow.complete(); 1651 1652 validateLength( flow, 25 ); 1653 1654 List<Tuple> values = getSinkAsList( flow ); 1655 1656 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1657 assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) ); 1658 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1659 } 1660 1661 @Test 1662 public void testMultiJoin() throws Exception 1663 { 1664 getPlatform().copyFromLocal( inputFileCrossX2 ); 1665 1666 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossX2 ); 1667 Tap innerSink = getPlatform().getTextFile( getOutputPath( "inner" ), SinkMode.REPLACE ); 1668 Tap outerSink = getPlatform().getTextFile( getOutputPath( "outer" ), SinkMode.REPLACE ); 1669 Tap leftSink = getPlatform().getTextFile( getOutputPath( "left" ), SinkMode.REPLACE ); 1670 Tap rightSink = getPlatform().getTextFile( getOutputPath( "right" ), SinkMode.REPLACE ); 1671 1672 Pipe uniques = new Pipe( "unique" ); 1673 1674 uniques = new Each( uniques, new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1675 1676 uniques = new GroupBy( uniques, new Fields( "word" ) ); 1677 1678 uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE ); 1679 1680// uniques = new Each( uniques, new Debug( true ) ); 1681 1682 Pipe fielded = new Pipe( "fielded" ); 1683 1684 fielded = new Each( fielded, new Fields( "line" ), new RegexSplitter( "\\s" ) ); 1685 1686// fielded = new Each( fielded, new Debug( true ) ); 1687 1688 Pipe inner = new CoGroup( "inner", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new InnerJoin() ); 1689 Pipe outer = new CoGroup( "outer", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new OuterJoin() ); 1690 Pipe left = new CoGroup( "left", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new LeftJoin() ); 1691 Pipe right = new CoGroup( "right", fielded, new Fields( 0 ), uniques, new Fields( "word" ), new RightJoin() ); 1692 1693 Pipe[] heads = Pipe.pipes( uniques, fielded ); 1694 Map<String, Tap> sources = Cascades.tapsMap( heads, Tap.taps( source, source ) ); 1695 1696 Pipe[] tails = Pipe.pipes( inner, outer, left, right ); 1697 Map<String, Tap> sinks = Cascades.tapsMap( tails, Tap.taps( innerSink, outerSink, leftSink, rightSink ) ); 1698 1699 Flow flow = getPlatform().getFlowConnector().connect( "multi-joins", sources, sinks, tails ); 1700 1701 flow.complete(); 1702 1703 validateLength( flow.openTapForRead( innerSink ), 74 ); 1704 validateLength( flow.openTapForRead( outerSink ), 84 ); 1705 validateLength( flow.openTapForRead( leftSink ), 74 ); 1706 validateLength( flow.openTapForRead( rightSink ), 84 ); 1707 } 1708 1709 /** 1710 * This test checks that a valid node is created when adding back remainders during unique 1711 * path sub-graph partitioning. 1712 */ 1713 @Test 1714 public void testMultiJoinWithSplits() throws Exception 1715 { 1716 getPlatform().copyFromLocal( inputFileCrossX2 ); 1717 1718 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileCrossX2 ); 1719 Tap innerSinkLhs = getPlatform().getTextFile( getOutputPath( "innerLhs" ), SinkMode.REPLACE ); 1720 Tap uniqueSinkLhs = getPlatform().getTextFile( getOutputPath( "uniquesLhs" ), SinkMode.REPLACE ); 1721 Tap innerSinkRhs = getPlatform().getTextFile( getOutputPath( "innerRhs" ), SinkMode.REPLACE ); 1722 Tap uniqueSinkRhs = getPlatform().getTextFile( getOutputPath( "uniquesRhs" ), SinkMode.REPLACE ); 1723 1724 Pipe incoming = new Pipe( "incoming" ); 1725 Pipe uniquesLhs; 1726 Pipe innerLhs; 1727 Pipe uniquesRhs; 1728 Pipe innerRhs; 1729 1730 { 1731 Pipe generatorLhs = new Each( new Pipe( "genLhsLhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1732 1733 generatorLhs = new Each( generatorLhs, new Identity() ); 1734 1735 Pipe generatorRhs = new Each( new Pipe( "genRhsLhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1736 1737 Pipe uniques = new Each( new Pipe( "uniquesLhs", generatorLhs ), new Identity() ); 1738 1739 uniques = new GroupBy( uniques, new Fields( "word" ) ); 1740 1741 uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE ); 1742 1743 Pipe lhs = new Pipe( "lhs", generatorLhs ); 1744 1745 lhs = new Rename( lhs, new Fields( "word" ), new Fields( "lhs" ) ); 1746 1747 Pipe rhs = new Pipe( "rhs", generatorRhs ); 1748 1749 rhs = new Rename( rhs, new Fields( "word" ), new Fields( "rhs" ) ); 1750 1751 Pipe inner = new CoGroup( "innerLhs", lhs, new Fields( 0 ), rhs, new Fields( 0 ), new InnerJoin() ); 1752 1753 uniquesLhs = uniques; 1754 innerLhs = inner; 1755 } 1756 1757 { 1758 Pipe generatorLhs = new Each( new Pipe( "genLhsRhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1759 1760 generatorLhs = new Each( generatorLhs, new Identity() ); 1761 1762 Pipe generatorRhs = new Each( new Pipe( "genRhsRhs", incoming ), new Fields( "line" ), new RegexSplitGenerator( new Fields( "word" ), "\\s" ) ); 1763 1764 Pipe uniques = new Each( new Pipe( "uniquesRhs", generatorLhs ), new Identity() ); 1765 1766 uniques = new GroupBy( uniques, new Fields( "word" ) ); 1767 1768 uniques = new Every( uniques, new Fields( "word" ), new First( Fields.ARGS ), Fields.REPLACE ); 1769 1770 Pipe lhs = new Pipe( "lhs", generatorLhs ); 1771 1772 lhs = new Rename( lhs, new Fields( "word" ), new Fields( "lhs" ) ); 1773 1774 Pipe rhs = new Pipe( "rhs", generatorRhs ); 1775 1776 rhs = new Rename( rhs, new Fields( "word" ), new Fields( "rhs" ) ); 1777 1778 Pipe inner = new CoGroup( "innerRhs", lhs, new Fields( 0 ), rhs, new Fields( 0 ), new InnerJoin() ); 1779 1780 uniquesRhs = uniques; 1781 innerRhs = inner; 1782 } 1783 1784 FlowDef flowDef = FlowDef.flowDef() 1785 .setName( "multi-joins" ) 1786 .addSource( "incoming", source ) 1787 .addTailSink( innerLhs, innerSinkLhs ) 1788 .addTailSink( uniquesLhs, uniqueSinkLhs ) 1789 .addTailSink( innerRhs, innerSinkRhs ) 1790 .addTailSink( uniquesRhs, uniqueSinkRhs ); 1791 1792 Flow flow = getPlatform().getFlowConnector().connect( flowDef ); 1793 1794 flow.complete(); 1795 1796 validateLength( flow.openTapForRead( innerSinkLhs ), 3900 ); 1797 validateLength( flow.openTapForRead( uniqueSinkLhs ), 15 ); 1798 validateLength( flow.openTapForRead( innerSinkRhs ), 3900 ); 1799 validateLength( flow.openTapForRead( uniqueSinkRhs ), 15 ); 1800 } 1801 1802 @Test 1803 public void testSameSourceGroupSplitCoGroup() throws Exception 1804 { 1805 getPlatform().copyFromLocal( inputFileLower ); 1806 1807 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ).applyTypes( Long.TYPE, String.class ), inputFileLower ); 1808 1809 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath(), SinkMode.REPLACE ); 1810 1811 Function splitter = new RegexSplitter( new Fields( "num", "char" ).applyTypes( String.class, String.class ), " " ); 1812 1813 Pipe sourcePipe = new Each( new Pipe( "source" ), new Fields( "line" ), splitter ); 1814 sourcePipe = new GroupBy( sourcePipe, new Fields( "num" ) ); 1815 sourcePipe = new Every( sourcePipe, new Fields( "char" ), new First( new Fields( "first", String.class ) ), Fields.ALL ); 1816 1817 Pipe lhsPipe = new Retain( new Pipe( "lhs", sourcePipe ), Fields.ALL ); 1818 Pipe rhsPipe = new Retain( new Pipe( "rhs", sourcePipe ), Fields.ALL ); 1819 1820 Pipe splice = new CoGroup( lhsPipe, new Fields( "num" ), rhsPipe, new Fields( "num" ), Fields.size( 4 ) ); 1821 1822 Map<Object, Object> properties = getPlatform().getProperties(); 1823 1824 properties.put( "cascading.serialization.types.required", "true" ); 1825 1826 Flow flow = getPlatform().getFlowConnector( properties ).connect( source, sink, splice ); 1827 1828 flow.complete(); 1829 1830 validateLength( flow, 5, null ); 1831 1832 List<Tuple> values = getSinkAsList( flow ); 1833 1834 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1835 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1836 } 1837 }