001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading; 022 023 import java.io.IOException; 024 import java.util.HashMap; 025 import java.util.HashSet; 026 import java.util.List; 027 import java.util.Map; 028 import java.util.Set; 029 030 import cascading.flow.Flow; 031 import cascading.flow.FlowConnectorProps; 032 import cascading.flow.FlowProps; 033 import cascading.operation.Function; 034 import cascading.operation.Identity; 035 import cascading.operation.Insert; 036 import cascading.operation.aggregator.Count; 037 import cascading.operation.aggregator.First; 038 import cascading.operation.regex.RegexFilter; 039 import cascading.operation.regex.RegexSplitter; 040 import cascading.pipe.CoGroup; 041 import cascading.pipe.Each; 042 import cascading.pipe.Every; 043 import cascading.pipe.GroupBy; 044 import cascading.pipe.Pipe; 045 import cascading.pipe.assembly.Discard; 046 import cascading.pipe.joiner.InnerJoin; 047 import cascading.pipe.joiner.Joiner; 048 import cascading.pipe.joiner.LeftJoin; 049 import cascading.pipe.joiner.MixedJoin; 050 import cascading.pipe.joiner.OuterJoin; 051 import cascading.pipe.joiner.RightJoin; 052 import cascading.tap.SinkMode; 053 import cascading.tap.Tap; 054 import cascading.tuple.Fields; 055 import cascading.tuple.Tuple; 056 import cascading.util.NullNotEquivalentComparator; 057 import org.junit.Test; 058 059 import static data.InputData.*; 060 061 062 public class CoGroupFieldedPipesPlatformTest extends PlatformTestCase 063 { 064 public CoGroupFieldedPipesPlatformTest() 065 { 066 super( true, 4, 1 ); // leave cluster testing enabled 067 } 068 069 @Test 070 public void testCross() throws Exception 071 { 072 getPlatform().copyFromLocal( inputFileLhs ); 073 getPlatform().copyFromLocal( inputFileRhs ); 074 075 Map sources = new HashMap(); 076 077 sources.put( "lhs", getPlatform().getTextFile( inputFileLhs ) ); 078 sources.put( "rhs", getPlatform().getTextFile( inputFileRhs ) ); 079 080 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cross" ), SinkMode.REPLACE ); 081 082 Pipe pipeLower = new Each( "lhs", new Fields( "line" ), new RegexSplitter( new Fields( "numLHS", "charLHS" ), " " ) ); 083 Pipe pipeUpper = new Each( "rhs", new Fields( "line" ), new RegexSplitter( new Fields( "numRHS", "charRHS" ), " " ) ); 084 085 Pipe cross = new CoGroup( pipeLower, new Fields( "numLHS" ), pipeUpper, new Fields( "numRHS" ), new InnerJoin() ); 086 087 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cross ); 088 089 flow.complete(); 090 091 validateLength( flow, 37, null ); 092 093 List<Tuple> values = getSinkAsList( flow ); 094 095 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 096 assertTrue( values.contains( new Tuple( "1\ta\t1\tB" ) ) ); 097 } 098 099 @Test 100 public void testCoGroup() throws Exception 101 { 102 getPlatform().copyFromLocal( inputFileLower ); 103 getPlatform().copyFromLocal( inputFileUpper ); 104 105 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 106 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 107 108 Map sources = new HashMap(); 109 110 sources.put( "lower", sourceLower ); 111 sources.put( "upper", sourceUpper ); 112 113 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroup" ), SinkMode.REPLACE ); 114 115 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 116 117 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 118 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 119 120 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), new InnerJoin( Fields.size( 4 ) ) ); 121 122 Map<Object, Object> properties = getProperties(); 123 124 // make sure hasher is getting called, but does nothing special 125 FlowProps.setDefaultTupleElementComparator( properties, getPlatform().getStringComparator( false ).getClass().getCanonicalName() ); 126 127 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 128 129 flow.complete(); 130 131 validateLength( flow, 5 ); 132 133 List<Tuple> values = getSinkAsList( flow ); 134 135 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 136 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 137 } 138 139 @Test 140 public void testCoGroupSamePipeName() throws Exception 141 { 142 getPlatform().copyFromLocal( inputFileLower ); 143 getPlatform().copyFromLocal( inputFileUpper ); 144 145 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 146 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 147 148 Map sources = new HashMap(); 149 150 sources.put( "lower", sourceLower ); 151 sources.put( "upper", sourceUpper ); 152 153 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "renamedpipes" ), SinkMode.REPLACE ); 154 155 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 156 157 Pipe pipeLower = new Pipe( "lower" ); 158 Pipe pipeUpper = new Pipe( "upper" ); 159 160 // these pipes will hide the source name, and could cause one to be lost 161 pipeLower = new Pipe( "same", pipeLower ); 162 pipeUpper = new Pipe( "same", pipeUpper ); 163 164 pipeLower = new Each( pipeLower, new Fields( "line" ), splitter ); 165 pipeUpper = new Each( pipeUpper, new Fields( "line" ), splitter ); 166 167 // pipeLower = new Each( pipeLower, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 168 // pipeUpper = new Each( pipeUpper, new Fields( "num", "char" ), new Identity( new Fields( "num", "char" ) ) ); 169 170 pipeLower = new Pipe( "left", pipeLower ); 171 pipeUpper = new Pipe( "right", pipeUpper ); 172 173 // pipeLower = new Each( pipeLower, new Debug( true ) ); 174 // pipeUpper = new Each( pipeUpper, new Debug( true ) ); 175 176 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 177 178 // splice = new Each( splice, new Debug( true ) ); 179 splice = new Pipe( "splice", splice ); 180 splice = new Pipe( "tail", splice ); 181 182 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 183 184 flow.complete(); 185 186 validateLength( flow, 5 ); 187 188 List<Tuple> values = getSinkAsList( flow ); 189 190 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 191 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 192 } 193 194 @Test 195 public void testCoGroupWithUnknowns() throws Exception 196 { 197 getPlatform().copyFromLocal( inputFileLower ); 198 getPlatform().copyFromLocal( inputFileUpper ); 199 200 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 201 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 202 203 Map sources = new HashMap(); 204 205 sources.put( "lower", sourceLower ); 206 sources.put( "upper", sourceUpper ); 207 208 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "unknown" ), SinkMode.REPLACE ); 209 210 Function splitter = new RegexSplitter( Fields.UNKNOWN, " " ); 211 212 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 213 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 214 215 Pipe splice = new CoGroup( pipeLower, new Fields( 0 ), pipeUpper, new Fields( 0 ), Fields.size( 4 ) ); 216 217 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 218 219 flow.complete(); 220 221 validateLength( flow, 5 ); 222 223 List<Tuple> values = getSinkAsList( flow ); 224 225 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 226 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 227 } 228 229 /** 230 * this test intentionally filters out all values so the intermediate tap is empty. this tap is cogrouped with 231 * a new stream using an outerjoin. 232 * 233 * @throws Exception 234 */ 235 @Test 236 public void testCoGroupFilteredBranch() throws Exception 237 { 238 getPlatform().copyFromLocal( inputFileLower ); 239 getPlatform().copyFromLocal( inputFileUpper ); 240 241 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 242 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 243 244 Map sources = new HashMap(); 245 246 sources.put( "lower", sourceLower ); 247 sources.put( "upper", sourceUpper ); 248 249 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupfilteredbranch" ), SinkMode.REPLACE ); 250 251 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 252 253 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 254 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 255 pipeUpper = new Each( pipeUpper, new Fields( "num" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all 256 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 257 258 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ), new OuterJoin() ); 259 260 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 261 262 flow.complete(); 263 264 validateLength( flow, 5 ); 265 266 List<Tuple> values = getSinkAsList( flow ); 267 268 assertTrue( values.contains( new Tuple( "1\ta\tnull\tnull" ) ) ); 269 assertTrue( values.contains( new Tuple( "2\tb\tnull\tnull" ) ) ); 270 } 271 272 @Test 273 public void testCoGroupSelf() throws Exception 274 { 275 getPlatform().copyFromLocal( inputFileLower ); 276 277 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 278 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 279 280 Map sources = new HashMap(); 281 282 sources.put( "lower", sourceLower ); 283 sources.put( "upper", sourceUpper ); 284 285 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupself" ), SinkMode.REPLACE ); 286 287 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 288 289 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 290 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 291 292 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 293 294 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 295 296 flow.complete(); 297 298 validateLength( flow, 5 ); 299 300 List<Tuple> values = getSinkAsList( flow ); 301 302 assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) ); 303 assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) ); 304 } 305 306 /** 307 * Method testCoGroupAfterEvery tests that a tmp tap is inserted after the Every in the cogroup join 308 * 309 * @throws Exception when 310 */ 311 @Test 312 public void testCoGroupAfterEvery() throws Exception 313 { 314 getPlatform().copyFromLocal( inputFileLower ); 315 getPlatform().copyFromLocal( inputFileUpper ); 316 317 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 318 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 319 320 Map sources = new HashMap(); 321 322 sources.put( "lower", sourceLower ); 323 sources.put( "upper", sourceUpper ); 324 325 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "afterevery" ), SinkMode.REPLACE ); 326 327 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 328 329 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 330 pipeLower = new GroupBy( pipeLower, new Fields( "num" ) ); 331 pipeLower = new Every( pipeLower, new Fields( "char" ), new First(), Fields.ALL ); 332 333 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 334 pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) ); 335 pipeUpper = new Every( pipeUpper, new Fields( "char" ), new First(), Fields.ALL ); 336 337 Pipe splice = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) ); 338 339 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 340 341 flow.complete(); 342 343 validateLength( flow, 5, null ); 344 345 List<Tuple> values = getSinkAsList( flow ); 346 347 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 348 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 349 } 350 351 /** 352 * Tests that CoGroup properly resolves fields when following an Every 353 * 354 * @throws Exception 355 */ 356 @Test 357 public void testCoGroupAfterEveryNoDeclared() throws Exception 358 { 359 getPlatform().copyFromLocal( inputFileLower ); 360 getPlatform().copyFromLocal( inputFileUpper ); 361 362 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 363 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 364 365 Map sources = new HashMap(); 366 367 sources.put( "lower", sourceLower ); 368 sources.put( "upper", sourceUpper ); 369 370 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "aftereverynodeclared" ), SinkMode.REPLACE ); 371 372 Function splitter1 = new RegexSplitter( new Fields( "num1", "char1" ), " " ); 373 374 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter1 ); 375 pipeLower = new Each( pipeLower, new Insert( new Fields( "one", "two", "three", "four" ), "one", "two", "three", "four" ), Fields.ALL ); 376 pipeLower = new GroupBy( pipeLower, new Fields( "num1" ) ); 377 pipeLower = new Every( pipeLower, new Fields( "char1" ), new First(), Fields.ALL ); 378 379 Function splitter2 = new RegexSplitter( new Fields( "num2", "char2" ), " " ); 380 381 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter2 ); 382 pipeUpper = new GroupBy( pipeUpper, new Fields( "num2" ) ); 383 pipeUpper = new Every( pipeUpper, new Fields( "char2" ), new First(), Fields.ALL ); 384 385 Pipe splice = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) ); 386 387 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 388 389 flow.complete(); 390 391 validateLength( flow, 5, null ); 392 393 List<Tuple> values = getSinkAsList( flow ); 394 395 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 396 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 397 } 398 399 @Test 400 public void testCoGroupInnerSingleField() throws Exception 401 { 402 getPlatform().copyFromLocal( inputFileLowerOffset ); 403 getPlatform().copyFromLocal( inputFileUpper ); 404 405 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 406 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 407 408 Map sources = new HashMap(); 409 410 sources.put( "lower", sourceLower ); 411 sources.put( "upper", sourceUpper ); 412 413 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupinnersingle" ), SinkMode.REPLACE ); 414 415 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), new RegexSplitter( new Fields( "num1", "char" ), " " ), new Fields( "num1" ) ); 416 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), new RegexSplitter( new Fields( "num2", "char" ), " " ), new Fields( "num2" ) ); 417 418 Pipe join = new CoGroup( pipeLower, new Fields( "num1" ), pipeUpper, new Fields( "num2" ) ); 419 420 join = new Every( join, new Count() ); 421 422 // join = new Each( join, new Debug( true ) ); 423 424 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, join ); 425 426 flow.complete(); 427 428 validateLength( flow, 2, null ); 429 430 Set<Tuple> results = new HashSet<Tuple>(); 431 432 results.add( new Tuple( "1\t1\t1" ) ); 433 results.add( new Tuple( "5\t5\t2" ) ); 434 435 List<Tuple> actual = getSinkAsList( flow ); 436 437 results.removeAll( actual ); 438 439 assertEquals( 0, results.size() ); 440 } 441 442 /** 443 * 1 a1 444 * 1 a2 445 * 1 a3 446 * 2 b1 447 * 3 c1 448 * 4 d1 449 * 4 d2 450 * 4 d3 451 * 5 e1 452 * 5 e2 453 * 5 e3 454 * 7 g1 455 * 7 g2 456 * 7 g3 457 * 7 g4 458 * 7 g5 459 * null h1 460 * <p/> 461 * 1 A1 462 * 1 A2 463 * 1 A3 464 * 2 B1 465 * 2 B2 466 * 2 B3 467 * 4 D1 468 * 6 F1 469 * 6 F2 470 * null H1 471 * <p/> 472 * 1 a1 1 A1 473 * 1 a1 1 A2 474 * 1 a1 1 A3 475 * 1 a2 1 A1 476 * 1 a2 1 A2 477 * 1 a2 1 A3 478 * 1 a3 1 A1 479 * 1 a3 1 A2 480 * 1 a3 1 A3 481 * 2 b1 2 B1 482 * 2 b1 2 B2 483 * 2 b1 2 B3 484 * 4 d1 4 D1 485 * 4 d2 4 D1 486 * 4 d3 4 D1 487 * null h1 null H1 488 * 489 * @throws Exception 490 */ 491 @Test 492 public void testCoGroupInner() throws Exception 493 { 494 HashSet<Tuple> results = new HashSet<Tuple>(); 495 496 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 497 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 498 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 499 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 500 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 501 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 502 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 503 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 504 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 505 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 506 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 507 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 508 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 509 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 510 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 511 results.add( new Tuple( null, "h1", null, "H1" ) ); 512 513 handleJoins( "cogroupinner", new InnerJoin(), results, 8, false, null ); 514 handleJoins( "cogroupinner-resultgroup", new InnerJoin(), results, 8, true, null ); 515 } 516 517 /** 518 * 1 a1 519 * 1 a2 520 * 1 a3 521 * 2 b1 522 * 3 c1 523 * 4 d1 524 * 4 d2 525 * 4 d3 526 * 5 e1 527 * 5 e2 528 * 5 e3 529 * 7 g1 530 * 7 g2 531 * 7 g3 532 * 7 g4 533 * 7 g5 534 * null h1 535 * <p/> 536 * 1 A1 537 * 1 A2 538 * 1 A3 539 * 2 B1 540 * 2 B2 541 * 2 B3 542 * 4 D1 543 * 6 F1 544 * 6 F2 545 * null H1 546 * <p/> 547 * 1 a1 1 A1 548 * 1 a1 1 A2 549 * 1 a1 1 A3 550 * 1 a2 1 A1 551 * 1 a2 1 A2 552 * 1 a2 1 A3 553 * 1 a3 1 A1 554 * 1 a3 1 A2 555 * 1 a3 1 A3 556 * 2 b1 2 B1 557 * 2 b1 2 B2 558 * 2 b1 2 B3 559 * 4 d1 4 D1 560 * 4 d2 4 D1 561 * 4 d3 4 D1 562 * 563 * @throws Exception 564 */ 565 @Test 566 public void testCoGroupInnerNull() throws Exception 567 { 568 HashSet<Tuple> results = new HashSet<Tuple>(); 569 570 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 571 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 572 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 573 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 574 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 575 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 576 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 577 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 578 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 579 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 580 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 581 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 582 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 583 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 584 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 585 586 handleJoins( "cogroupinnernull", new InnerJoin(), results, 9, false, new NullNotEquivalentComparator() ); 587 handleJoins( "cogroupinnernull-resultgroup", new InnerJoin(), results, 9, true, new NullNotEquivalentComparator() ); 588 } 589 590 /** 591 * 1 a1 592 * 1 a2 593 * 1 a3 594 * 2 b1 595 * 3 c1 596 * 4 d1 597 * 4 d2 598 * 4 d3 599 * 5 e1 600 * 5 e2 601 * 5 e3 602 * 7 g1 603 * 7 g2 604 * 7 g3 605 * 7 g4 606 * 7 g5 607 * null h1 608 * <p/> 609 * 1 A1 610 * 1 A2 611 * 1 A3 612 * 2 B1 613 * 2 B2 614 * 2 B3 615 * 4 D1 616 * 6 F1 617 * 6 F2 618 * null H1 619 * <p/> 620 * 1 a1 1 A1 621 * 1 a1 1 A2 622 * 1 a1 1 A3 623 * 1 a2 1 A1 624 * 1 a2 1 A2 625 * 1 a2 1 A3 626 * 1 a3 1 A1 627 * 1 a3 1 A2 628 * 1 a3 1 A3 629 * 2 b1 2 B1 630 * 2 b1 2 B2 631 * 2 b1 2 B3 632 * 3 c1 null null 633 * 4 d1 4 D1 634 * 4 d2 4 D1 635 * 4 d3 4 D1 636 * 5 e1 null null 637 * 5 e2 null null 638 * 5 e3 null null 639 * null null 6 F1 640 * null null 6 F2 641 * 7 g1 null null 642 * 7 g2 null null 643 * 7 g3 null null 644 * 7 g4 null null 645 * 7 g5 null null 646 * null h1 null H1 647 * 648 * @throws Exception 649 */ 650 @Test 651 public void testCoGroupOuter() throws Exception 652 { 653 Set<Tuple> results = new HashSet<Tuple>(); 654 655 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 656 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 657 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 658 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 659 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 660 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 661 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 662 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 663 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 664 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 665 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 666 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 667 results.add( new Tuple( "3", "c1", null, null ) ); 668 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 669 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 670 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 671 results.add( new Tuple( "5", "e1", null, null ) ); 672 results.add( new Tuple( "5", "e2", null, null ) ); 673 results.add( new Tuple( "5", "e3", null, null ) ); 674 results.add( new Tuple( null, null, "6", "F1" ) ); 675 results.add( new Tuple( null, null, "6", "F2" ) ); 676 results.add( new Tuple( "7", "g1", null, null ) ); 677 results.add( new Tuple( "7", "g2", null, null ) ); 678 results.add( new Tuple( "7", "g3", null, null ) ); 679 results.add( new Tuple( "7", "g4", null, null ) ); 680 results.add( new Tuple( "7", "g5", null, null ) ); 681 results.add( new Tuple( null, "h1", null, "H1" ) ); 682 683 handleJoins( "cogroupouter", new OuterJoin(), results, 8, false, null ); 684 handleJoins( "cogroupouter-resultgroup", new OuterJoin(), results, 8, true, null ); 685 } 686 687 /** 688 * 1 a1 689 * 1 a2 690 * 1 a3 691 * 2 b1 692 * 3 c1 693 * 4 d1 694 * 4 d2 695 * 4 d3 696 * 5 e1 697 * 5 e2 698 * 5 e3 699 * 7 g1 700 * 7 g2 701 * 7 g3 702 * 7 g4 703 * 7 g5 704 * null h1 705 * <p/> 706 * 1 A1 707 * 1 A2 708 * 1 A3 709 * 2 B1 710 * 2 B2 711 * 2 B3 712 * 4 D1 713 * 6 F1 714 * 6 F2 715 * null H1 716 * <p/> 717 * 1 a1 1 A1 718 * 1 a1 1 A2 719 * 1 a1 1 A3 720 * 1 a2 1 A1 721 * 1 a2 1 A2 722 * 1 a2 1 A3 723 * 1 a3 1 A1 724 * 1 a3 1 A2 725 * 1 a3 1 A3 726 * 2 b1 2 B1 727 * 2 b1 2 B2 728 * 2 b1 2 B3 729 * 3 c1 null null 730 * 4 d1 4 D1 731 * 4 d2 4 D1 732 * 4 d3 4 D1 733 * 5 e1 null null 734 * 5 e2 null null 735 * 5 e3 null null 736 * null null 6 F1 737 * null null 6 F2 738 * 7 g1 null null 739 * 7 g2 null null 740 * 7 g3 null null 741 * 7 g4 null null 742 * 7 g5 null null 743 * null h1 null null 744 * null null null H1 745 * 746 * @throws Exception 747 */ 748 @Test 749 public void testCoGroupOuterNull() throws Exception 750 { 751 Set<Tuple> results = new HashSet<Tuple>(); 752 753 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 754 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 755 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 756 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 757 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 758 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 759 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 760 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 761 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 762 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 763 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 764 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 765 results.add( new Tuple( "3", "c1", null, null ) ); 766 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 767 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 768 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 769 results.add( new Tuple( "5", "e1", null, null ) ); 770 results.add( new Tuple( "5", "e2", null, null ) ); 771 results.add( new Tuple( "5", "e3", null, null ) ); 772 results.add( new Tuple( null, null, "6", "F1" ) ); 773 results.add( new Tuple( null, null, "6", "F2" ) ); 774 results.add( new Tuple( "7", "g1", null, null ) ); 775 results.add( new Tuple( "7", "g2", null, null ) ); 776 results.add( new Tuple( "7", "g3", null, null ) ); 777 results.add( new Tuple( "7", "g4", null, null ) ); 778 results.add( new Tuple( "7", "g5", null, null ) ); 779 results.add( new Tuple( null, "h1", null, null ) ); 780 results.add( new Tuple( null, null, null, "H1" ) ); 781 782 handleJoins( "cogroupouternull", new OuterJoin(), results, 9, false, new NullNotEquivalentComparator() ); 783 handleJoins( "cogroupouternull-resultgroup", new OuterJoin(), results, 9, true, new NullNotEquivalentComparator() ); 784 } 785 786 /** 787 * 1 a1 788 * 1 a2 789 * 1 a3 790 * 2 b1 791 * 3 c1 792 * 4 d1 793 * 4 d2 794 * 4 d3 795 * 5 e1 796 * 5 e2 797 * 5 e3 798 * 7 g1 799 * 7 g2 800 * 7 g3 801 * 7 g4 802 * 7 g5 803 * null h1 804 * <p/> 805 * 1 A1 806 * 1 A2 807 * 1 A3 808 * 2 B1 809 * 2 B2 810 * 2 B3 811 * 4 D1 812 * 6 F1 813 * 6 F2 814 * null H1 815 * <p/> 816 * 1 a1 1 A1 817 * 1 a1 1 A2 818 * 1 a1 1 A3 819 * 1 a2 1 A1 820 * 1 a2 1 A2 821 * 1 a2 1 A3 822 * 1 a3 1 A1 823 * 1 a3 1 A2 824 * 1 a3 1 A3 825 * 2 b1 2 B1 826 * 2 b1 2 B2 827 * 2 b1 2 B3 828 * 3 c1 null null 829 * 4 d1 4 D1 830 * 4 d2 4 D1 831 * 4 d3 4 D1 832 * 5 e1 null null 833 * 5 e2 null null 834 * 5 e3 null null 835 * 7 g1 null null 836 * 7 g2 null null 837 * 7 g3 null null 838 * 7 g4 null null 839 * 7 g5 null null 840 * null h1 null H1 841 * 842 * @throws Exception 843 */ 844 @Test 845 public void testCoGroupInnerOuter() throws Exception 846 { 847 Set<Tuple> results = new HashSet<Tuple>(); 848 849 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 850 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 851 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 852 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 853 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 854 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 855 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 856 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 857 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 858 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 859 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 860 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 861 results.add( new Tuple( "3", "c1", null, null ) ); 862 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 863 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 864 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 865 results.add( new Tuple( "5", "e1", null, null ) ); 866 results.add( new Tuple( "5", "e2", null, null ) ); 867 results.add( new Tuple( "5", "e3", null, null ) ); 868 results.add( new Tuple( "7", "g1", null, null ) ); 869 results.add( new Tuple( "7", "g2", null, null ) ); 870 results.add( new Tuple( "7", "g3", null, null ) ); 871 results.add( new Tuple( "7", "g4", null, null ) ); 872 results.add( new Tuple( "7", "g5", null, null ) ); 873 results.add( new Tuple( null, "h1", null, "H1" ) ); 874 875 handleJoins( "cogroupinnerouter", new LeftJoin(), results, 8, false, null ); 876 handleJoins( "cogroupinnerouter-resultgroup", new LeftJoin(), results, 8, true, null ); 877 } 878 879 /** 880 * 1 a1 881 * 1 a2 882 * 1 a3 883 * 2 b1 884 * 3 c1 885 * 4 d1 886 * 4 d2 887 * 4 d3 888 * 5 e1 889 * 5 e2 890 * 5 e3 891 * 7 g1 892 * 7 g2 893 * 7 g3 894 * 7 g4 895 * 7 g5 896 * null h1 897 * <p/> 898 * 1 A1 899 * 1 A2 900 * 1 A3 901 * 2 B1 902 * 2 B2 903 * 2 B3 904 * 4 D1 905 * 6 F1 906 * 6 F2 907 * null H1 908 * <p/> 909 * 1 a1 1 A1 910 * 1 a1 1 A2 911 * 1 a1 1 A3 912 * 1 a2 1 A1 913 * 1 a2 1 A2 914 * 1 a2 1 A3 915 * 1 a3 1 A1 916 * 1 a3 1 A2 917 * 1 a3 1 A3 918 * 2 b1 2 B1 919 * 2 b1 2 B2 920 * 2 b1 2 B3 921 * 3 c1 null null 922 * 4 d1 4 D1 923 * 4 d2 4 D1 924 * 4 d3 4 D1 925 * 5 e1 null null 926 * 5 e2 null null 927 * 5 e3 null null 928 * 7 g1 null null 929 * 7 g2 null null 930 * 7 g3 null null 931 * 7 g4 null null 932 * 7 g5 null null 933 * null h1 null null 934 * 935 * @throws Exception 936 */ 937 @Test 938 public void testCoGroupInnerOuterNull() throws Exception 939 { 940 Set<Tuple> results = new HashSet<Tuple>(); 941 942 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 943 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 944 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 945 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 946 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 947 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 948 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 949 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 950 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 951 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 952 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 953 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 954 results.add( new Tuple( "3", "c1", null, null ) ); 955 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 956 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 957 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 958 results.add( new Tuple( "5", "e1", null, null ) ); 959 results.add( new Tuple( "5", "e2", null, null ) ); 960 results.add( new Tuple( "5", "e3", null, null ) ); 961 results.add( new Tuple( "7", "g1", null, null ) ); 962 results.add( new Tuple( "7", "g2", null, null ) ); 963 results.add( new Tuple( "7", "g3", null, null ) ); 964 results.add( new Tuple( "7", "g4", null, null ) ); 965 results.add( new Tuple( "7", "g5", null, null ) ); 966 results.add( new Tuple( null, "h1", null, null ) ); 967 968 handleJoins( "cogroupinnerouternull", new LeftJoin(), results, 9, false, new NullNotEquivalentComparator() ); 969 handleJoins( "cogroupinnerouternull-resultgroup", new LeftJoin(), results, 9, true, new NullNotEquivalentComparator() ); 970 } 971 972 /** 973 * 1 a1 974 * 1 a2 975 * 1 a3 976 * 2 b1 977 * 3 c1 978 * 4 d1 979 * 4 d2 980 * 4 d3 981 * 5 e1 982 * 5 e2 983 * 5 e3 984 * 7 g1 985 * 7 g2 986 * 7 g3 987 * 7 g4 988 * 7 g5 989 * null h1 990 * <p/> 991 * 1 A1 992 * 1 A2 993 * 1 A3 994 * 2 B1 995 * 2 B2 996 * 2 B3 997 * 4 D1 998 * 6 F1 999 * 6 F2 1000 * null H1 1001 * <p/> 1002 * 1 a1 1 A1 1003 * 1 a1 1 A2 1004 * 1 a1 1 A3 1005 * 1 a2 1 A1 1006 * 1 a2 1 A2 1007 * 1 a2 1 A3 1008 * 1 a3 1 A1 1009 * 1 a3 1 A2 1010 * 1 a3 1 A3 1011 * 2 b1 2 B1 1012 * 2 b1 2 B2 1013 * 2 b1 2 B3 1014 * 4 d1 4 D1 1015 * 4 d2 4 D1 1016 * 4 d3 4 D1 1017 * null null 6 F1 1018 * null null 6 F2 1019 * null h1 null H1 1020 * 1021 * @throws Exception 1022 */ 1023 @Test 1024 public void testCoGroupOuterInner() throws Exception 1025 { 1026 Set<Tuple> results = new HashSet<Tuple>(); 1027 1028 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 1029 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 1030 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 1031 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 1032 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 1033 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 1034 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 1035 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 1036 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 1037 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 1038 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 1039 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 1040 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 1041 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 1042 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 1043 results.add( new Tuple( null, null, "6", "F1" ) ); 1044 results.add( new Tuple( null, null, "6", "F2" ) ); 1045 results.add( new Tuple( null, "h1", null, "H1" ) ); 1046 1047 handleJoins( "cogroupouterinner", new RightJoin(), results, 8, false, null ); 1048 handleJoins( "cogroupouterinner-resultgroup", new RightJoin(), results, 8, true, null ); 1049 } 1050 1051 /** 1052 * 1 a1 1053 * 1 a2 1054 * 1 a3 1055 * 2 b1 1056 * 3 c1 1057 * 4 d1 1058 * 4 d2 1059 * 4 d3 1060 * 5 e1 1061 * 5 e2 1062 * 5 e3 1063 * 7 g1 1064 * 7 g2 1065 * 7 g3 1066 * 7 g4 1067 * 7 g5 1068 * null h1 1069 * <p/> 1070 * 1 A1 1071 * 1 A2 1072 * 1 A3 1073 * 2 B1 1074 * 2 B2 1075 * 2 B3 1076 * 4 D1 1077 * 6 F1 1078 * 6 F2 1079 * null H1 1080 * <p/> 1081 * 1 a1 1 A1 1082 * 1 a1 1 A2 1083 * 1 a1 1 A3 1084 * 1 a2 1 A1 1085 * 1 a2 1 A2 1086 * 1 a2 1 A3 1087 * 1 a3 1 A1 1088 * 1 a3 1 A2 1089 * 1 a3 1 A3 1090 * 2 b1 2 B1 1091 * 2 b1 2 B2 1092 * 2 b1 2 B3 1093 * 4 d1 4 D1 1094 * 4 d2 4 D1 1095 * 4 d3 4 D1 1096 * null null 6 F1 1097 * null null 6 F2 1098 * null null null H1 1099 * 1100 * @throws Exception 1101 */ 1102 @Test 1103 public void testCoGroupOuterInnerNull() throws Exception 1104 { 1105 Set<Tuple> results = new HashSet<Tuple>(); 1106 1107 results.add( new Tuple( "1", "a1", "1", "A1" ) ); 1108 results.add( new Tuple( "1", "a1", "1", "A2" ) ); 1109 results.add( new Tuple( "1", "a1", "1", "A3" ) ); 1110 results.add( new Tuple( "1", "a2", "1", "A1" ) ); 1111 results.add( new Tuple( "1", "a2", "1", "A2" ) ); 1112 results.add( new Tuple( "1", "a2", "1", "A3" ) ); 1113 results.add( new Tuple( "1", "a3", "1", "A1" ) ); 1114 results.add( new Tuple( "1", "a3", "1", "A2" ) ); 1115 results.add( new Tuple( "1", "a3", "1", "A3" ) ); 1116 results.add( new Tuple( "2", "b1", "2", "B1" ) ); 1117 results.add( new Tuple( "2", "b1", "2", "B2" ) ); 1118 results.add( new Tuple( "2", "b1", "2", "B3" ) ); 1119 results.add( new Tuple( "4", "d1", "4", "D1" ) ); 1120 results.add( new Tuple( "4", "d2", "4", "D1" ) ); 1121 results.add( new Tuple( "4", "d3", "4", "D1" ) ); 1122 results.add( new Tuple( null, null, "6", "F1" ) ); 1123 results.add( new Tuple( null, null, "6", "F2" ) ); 1124 results.add( new Tuple( null, null, null, "H1" ) ); 1125 1126 handleJoins( "cogroupouterinnernull", new RightJoin(), results, 9, false, new NullNotEquivalentComparator() ); 1127 handleJoins( "cogroupouterinnernull-resultgroup", new RightJoin(), results, 9, true, new NullNotEquivalentComparator() ); 1128 } 1129 1130 private void handleJoins( String path, Joiner joiner, Set<Tuple> results, int numGroups, boolean useResultGroupFields, NullNotEquivalentComparator comparator ) throws Exception 1131 { 1132 results = new HashSet<Tuple>( results ); 1133 1134 getPlatform().copyFromLocal( inputFileLhsSparse ); 1135 getPlatform().copyFromLocal( inputFileRhsSparse ); 1136 1137 Fields fields = new Fields( "num", "char" ).applyTypes( Integer.class, String.class ); 1138 Tap sourceLower = getPlatform().getDelimitedFile( fields, " ", inputFileLhsSparse ); 1139 Tap sourceUpper = getPlatform().getDelimitedFile( fields, " ", inputFileRhsSparse ); 1140 1141 Map sources = new HashMap(); 1142 1143 sources.put( "lower", sourceLower ); 1144 sources.put( "upper", sourceUpper ); 1145 1146 Tap sink = getPlatform().getDelimitedFile( Fields.size( 4, String.class ), "\t", getOutputPath( path ), SinkMode.REPLACE ); 1147 1148 Pipe pipeLower = new Pipe( "lower" ); 1149 Pipe pipeUpper = new Pipe( "upper" ); 1150 1151 Fields declaredFields = new Fields( "num", "char", "num2", "char2" ); 1152 1153 Fields groupFields = new Fields( "num" ); 1154 1155 if( comparator != null ) 1156 groupFields.setComparator( 0, comparator ); 1157 1158 Pipe splice; 1159 if( useResultGroupFields ) 1160 splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, new Fields( "num", "num2" ), joiner ); 1161 else 1162 splice = new CoGroup( pipeLower, groupFields, pipeUpper, groupFields, declaredFields, joiner ); 1163 1164 splice = new Every( splice, Fields.ALL, new TestIdentityBuffer( new Fields( "num", "num2" ), numGroups, true ), Fields.RESULTS ); 1165 1166 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 1167 1168 flow.complete(); 1169 1170 validateLength( flow, results.size() ); 1171 1172 List<Tuple> actual = getSinkAsList( flow ); 1173 1174 results.removeAll( actual ); 1175 1176 assertEquals( 0, results.size() ); 1177 } 1178 1179 /** 1180 * 1 a 1181 * 5 b 1182 * 6 c 1183 * 5 b 1184 * 5 e 1185 * <p/> 1186 * 1 A 1187 * 2 B 1188 * 3 C 1189 * 4 D 1190 * 5 E 1191 * <p/> 1192 * 1 a 1193 * 2 b 1194 * 3 c 1195 * 4 d 1196 * 5 e 1197 * <p/> 1198 * 1 a 1 A 1 a 1199 * - - 2 B 2 b 1200 * - - 3 C 3 c 1201 * - - 4 D 4 d 1202 * 5 b 5 E 5 e 1203 * 5 e 5 E 5 e 1204 * 1205 * @throws Exception 1206 */ 1207 @Test 1208 public void testCoGroupMixed() throws Exception 1209 { 1210 getPlatform().copyFromLocal( inputFileLowerOffset ); 1211 getPlatform().copyFromLocal( inputFileLower ); 1212 getPlatform().copyFromLocal( inputFileUpper ); 1213 1214 Tap sourceLowerOffset = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLowerOffset ); 1215 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1216 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1217 1218 Map sources = new HashMap(); 1219 1220 sources.put( "loweroffset", sourceLowerOffset ); 1221 sources.put( "lower", sourceLower ); 1222 sources.put( "upper", sourceUpper ); 1223 1224 Tap sink = getPlatform().getDelimitedFile( Fields.size( 6, String.class ), "\t", getOutputPath( "cogroupmixed" ), SinkMode.REPLACE ); 1225 1226 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1227 1228 Pipe pipeLowerOffset = new Each( new Pipe( "loweroffset" ), new Fields( "line" ), splitter ); 1229 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1230 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1231 1232 Pipe[] pipes = Pipe.pipes( pipeLowerOffset, pipeUpper, pipeLower ); 1233 Fields[] fields = Fields.fields( new Fields( "num" ), new Fields( "num" ), new Fields( "num" ) ); 1234 1235 MixedJoin join = new MixedJoin( new boolean[]{MixedJoin.OUTER, MixedJoin.INNER, MixedJoin.OUTER} ); 1236 Pipe splice = new CoGroup( pipes, fields, Fields.size( 6 ), join ); 1237 1238 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 1239 1240 flow.complete(); 1241 1242 validateLength( flow, 6 ); 1243 1244 Set<Tuple> results = new HashSet<Tuple>(); 1245 1246 results.add( new Tuple( "1", "a", "1", "A", "1", "a" ) ); 1247 results.add( new Tuple( null, null, "2", "B", "2", "b" ) ); 1248 results.add( new Tuple( null, null, "3", "C", "3", "c" ) ); 1249 results.add( new Tuple( null, null, "4", "D", "4", "d" ) ); 1250 results.add( new Tuple( "5", "b", "5", "E", "5", "e" ) ); 1251 results.add( new Tuple( "5", "e", "5", "E", "5", "e" ) ); 1252 1253 List<Tuple> actual = getSinkAsList( flow ); 1254 1255 results.removeAll( actual ); 1256 1257 assertEquals( 0, results.size() ); 1258 } 1259 1260 @Test 1261 public void testCoGroupDiffFields() throws Exception 1262 { 1263 getPlatform().copyFromLocal( inputFileLower ); 1264 getPlatform().copyFromLocal( inputFileUpper ); 1265 1266 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1267 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1268 1269 Map sources = new HashMap(); 1270 1271 sources.put( "lower", sourceLower ); 1272 sources.put( "upper", sourceUpper ); 1273 1274 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "difffields" ), SinkMode.REPLACE ); 1275 1276 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 1277 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 1278 1279 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1280 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1281 1282 Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1283 1284 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1285 1286 flow.complete(); 1287 1288 validateLength( flow, 5 ); 1289 1290 List<Tuple> actual = getSinkAsList( flow ); 1291 1292 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1293 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1294 } 1295 1296 @Test 1297 public void testCoGroupGroupBy() throws Exception 1298 { 1299 getPlatform().copyFromLocal( inputFileLower ); 1300 getPlatform().copyFromLocal( inputFileUpper ); 1301 1302 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1303 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1304 1305 Map sources = new HashMap(); 1306 1307 sources.put( "lower", sourceLower ); 1308 sources.put( "upper", sourceUpper ); 1309 1310 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupgroupby" ), SinkMode.REPLACE ); 1311 1312 Function splitterLower = new RegexSplitter( new Fields( "numA", "lower" ), " " ); 1313 Function splitterUpper = new RegexSplitter( new Fields( "numB", "upper" ), " " ); 1314 1315 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterLower ); 1316 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitterUpper ); 1317 1318 Pipe cogroup = new CoGroup( pipeLower, new Fields( "numA" ), pipeUpper, new Fields( "numB" ) ); 1319 1320 Pipe groupby = new GroupBy( cogroup, new Fields( "numA" ) ); 1321 1322 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, groupby ); 1323 1324 flow.complete(); 1325 1326 validateLength( flow, 5, null ); 1327 1328 List<Tuple> actual = getSinkAsList( flow ); 1329 1330 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1331 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1332 } 1333 1334 @Test 1335 public void testCoGroupSamePipe() throws Exception 1336 { 1337 getPlatform().copyFromLocal( inputFileLower ); 1338 1339 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1340 1341 Map sources = new HashMap(); 1342 1343 sources.put( "lower", source ); 1344 1345 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe" ), SinkMode.REPLACE ); 1346 1347 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1348 1349 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1350 1351 Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), 1, new Fields( "num1", "char1", "num2", "char2" ) ); 1352 1353 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1354 1355 flow.complete(); 1356 1357 validateLength( flow, 5, null ); 1358 1359 List<Tuple> actual = getSinkAsList( flow ); 1360 1361 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1362 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1363 } 1364 1365 @Test 1366 public void testCoGroupSamePipe2() throws Exception 1367 { 1368 getPlatform().copyFromLocal( inputFileLower ); 1369 1370 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1371 1372 Map sources = new HashMap(); 1373 1374 sources.put( "lower", source ); 1375 1376 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe2" ), SinkMode.REPLACE ); 1377 1378 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1379 1380 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1381 1382 Pipe cogroup = new CoGroup( pipeLower, new Fields( "num" ), pipeLower, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1383 1384 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1385 1386 flow.complete(); 1387 1388 validateLength( flow, 5, null ); 1389 1390 List<Tuple> actual = getSinkAsList( flow ); 1391 1392 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1393 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1394 } 1395 1396 @Test 1397 public void testCoGroupSamePipe3() throws Exception 1398 { 1399 getPlatform().copyFromLocal( inputFileLower ); 1400 1401 Tap source = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower ); 1402 1403 Map sources = new HashMap(); 1404 1405 sources.put( "lower", source ); 1406 1407 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samepipe3" ), SinkMode.REPLACE ); 1408 1409 Pipe pipe = new Pipe( "lower" ); 1410 1411 Pipe lhs = new Pipe( "lhs", pipe ); 1412 Pipe rhs = new Pipe( "rhs", pipe ); 1413 1414 Pipe cogroup = new CoGroup( lhs, new Fields( "num" ), rhs, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1415 1416 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1417 1418 flow.complete(); 1419 1420 validateLength( flow, 5, null ); 1421 1422 List<Tuple> actual = getSinkAsList( flow ); 1423 1424 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1425 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1426 } 1427 1428 @Test 1429 public void testCoGroupAroundCoGroup() throws Exception 1430 { 1431 getPlatform().copyFromLocal( inputFileLower ); 1432 getPlatform().copyFromLocal( inputFileUpper ); 1433 1434 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1435 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1436 1437 Map sources = new HashMap(); 1438 1439 sources.put( "lower", sourceLower ); 1440 sources.put( "upper1", sourceUpper ); 1441 sources.put( "upper2", sourceUpper ); 1442 1443 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "cogroupacogroup" ), SinkMode.REPLACE ); 1444 1445 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1446 1447 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1448 Pipe pipeUpper1 = new Each( new Pipe( "upper1" ), new Fields( "line" ), splitter ); 1449 Pipe pipeUpper2 = new Each( new Pipe( "upper2" ), new Fields( "line" ), splitter ); 1450 1451 Pipe splice1 = new CoGroup( pipeLower, new Fields( "num" ), pipeUpper1, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2" ) ); 1452 1453 splice1 = new Each( splice1, new Identity() ); 1454 1455 Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeUpper2, new Fields( "num" ), new Fields( "num1", "char1", "num2", "char2", "num3", "char3" ) ); 1456 1457 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice2 ); 1458 1459 flow.complete(); 1460 1461 validateLength( flow, 5, null ); 1462 1463 List<Tuple> actual = getSinkAsList( flow ); 1464 1465 assertTrue( actual.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) ); 1466 assertTrue( actual.contains( new Tuple( "2\tb\t2\tB\t2\tB" ) ) ); 1467 } 1468 1469 @Test 1470 public void testCoGroupAroundCoGroupWithout() throws Exception 1471 { 1472 runCoGroupAroundCoGroup( null, "cogroupacogroupopt1" ); 1473 } 1474 1475 @Test 1476 public void testCoGroupAroundCoGroupWith() throws Exception 1477 { 1478 // hack to get classname 1479 runCoGroupAroundCoGroup( getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ).getScheme().getClass(), "cogroupacogroupopt2" ); 1480 } 1481 1482 private void runCoGroupAroundCoGroup( Class schemeClass, String stringPath ) throws IOException 1483 { 1484 getPlatform().copyFromLocal( inputFileNums20 ); 1485 getPlatform().copyFromLocal( inputFileNums10 ); 1486 1487 Tap source10 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums10 ); 1488 Tap source20 = getPlatform().getDelimitedFile( new Fields( "num" ), "\t", inputFileNums20 ); 1489 1490 Map sources = new HashMap(); 1491 1492 sources.put( "source20", source20 ); 1493 sources.put( "source101", source10 ); 1494 sources.put( "source102", source10 ); 1495 1496 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( stringPath ), SinkMode.REPLACE ); 1497 1498 Pipe pipeNum20 = new Pipe( "source20" ); 1499 Pipe pipeNum101 = new Pipe( "source101" ); 1500 Pipe pipeNum102 = new Pipe( "source102" ); 1501 1502 Pipe splice1 = new CoGroup( pipeNum20, new Fields( "num" ), pipeNum101, new Fields( "num" ), new Fields( "num1", "num2" ) ); 1503 1504 Pipe splice2 = new CoGroup( splice1, new Fields( "num1" ), pipeNum102, new Fields( "num" ), new Fields( "num1", "num2", "num3" ) ); 1505 1506 splice2 = new Each( splice2, new Identity() ); 1507 1508 Map<Object, Object> properties = getPlatform().getProperties(); 1509 1510 if( getPlatform().isMapReduce() ) 1511 FlowConnectorProps.setIntermediateSchemeClass( properties, schemeClass ); 1512 1513 Flow flow = getPlatform().getFlowConnector( properties ).connect( "cogroupopt", sources, sink, splice2 ); 1514 1515 if( getPlatform().isMapReduce() ) 1516 assertEquals( "wrong number of steps", 2, flow.getFlowSteps().size() ); 1517 1518 flow.complete(); 1519 1520 validateLength( flow, 10 ); 1521 1522 List<Tuple> actual = getSinkAsList( flow ); 1523 1524 assertTrue( actual.contains( new Tuple( "1\t1\t1" ) ) ); 1525 assertTrue( actual.contains( new Tuple( "10\t10\t10" ) ) ); 1526 } 1527 1528 @Test 1529 public void testCoGroupDiffFieldsSameFile() throws Exception 1530 { 1531 getPlatform().copyFromLocal( inputFileLower ); 1532 getPlatform().copyFromLocal( inputFileUpper ); 1533 1534 Tap sourceOffsetLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1535 Tap sourceLower = getPlatform().getTextFile( new Fields( "line" ), inputFileLower ); 1536 1537 Map sources = new HashMap(); 1538 1539 sources.put( "offsetLower", sourceOffsetLower ); 1540 sources.put( "lower", sourceLower ); 1541 1542 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "samefiledifffields" ), SinkMode.REPLACE ); 1543 1544 Function splitterLower = new RegexSplitter( new Fields( "numA", "left" ), " " ); 1545 Function splitterUpper = new RegexSplitter( new Fields( "numB", "right" ), " " ); 1546 1547 Pipe offsetLower = new Pipe( "offsetLower" ); 1548 offsetLower = new Discard( offsetLower, new Fields( "offset" ) ); 1549 offsetLower = new Each( offsetLower, new Fields( "line" ), splitterLower ); 1550 1551 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitterUpper ); 1552 1553 Pipe cogroup = new CoGroup( offsetLower, new Fields( "numA" ), pipeLower, new Fields( "numB" ) ); 1554 1555 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, cogroup ); 1556 1557 flow.complete(); 1558 1559 validateLength( flow, 5 ); 1560 1561 List<Tuple> actual = getSinkAsList( flow ); 1562 1563 assertTrue( actual.contains( new Tuple( "1\ta\t1\ta" ) ) ); 1564 assertTrue( actual.contains( new Tuple( "2\tb\t2\tb" ) ) ); 1565 } 1566 1567 @Test 1568 public void testJoinNone() throws Exception 1569 { 1570 getPlatform().copyFromLocal( inputFileLower ); 1571 getPlatform().copyFromLocal( inputFileUpper ); 1572 1573 Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower ); 1574 Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper ); 1575 1576 Map sources = new HashMap(); 1577 1578 sources.put( "lower", sourceLower ); 1579 sources.put( "upper", sourceUpper ); 1580 1581 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "joinnone" ), SinkMode.REPLACE ); 1582 1583 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 1584 1585 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 1586 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 1587 1588 Pipe splice = new CoGroup( pipeLower, Fields.NONE, pipeUpper, Fields.NONE, Fields.size( 4 ) ); 1589 1590 Map<Object, Object> properties = getProperties(); 1591 1592 Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice ); 1593 1594 flow.complete(); 1595 1596 validateLength( flow, 25 ); 1597 1598 List<Tuple> values = getSinkAsList( flow ); 1599 1600 assertTrue( values.contains( new Tuple( "1\ta\t1\tA" ) ) ); 1601 assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) ); 1602 assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) ); 1603 } 1604 }