001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading; 022 023 import java.io.Serializable; 024 import java.util.ArrayList; 025 import java.util.Collection; 026 import java.util.Comparator; 027 import java.util.HashMap; 028 import java.util.List; 029 import java.util.Map; 030 import java.util.regex.Pattern; 031 032 import cascading.cascade.Cascades; 033 import cascading.flow.Flow; 034 import cascading.operation.Debug; 035 import cascading.operation.Filter; 036 import cascading.operation.Function; 037 import cascading.operation.Identity; 038 import cascading.operation.Insert; 039 import cascading.operation.NoOp; 040 import cascading.operation.aggregator.Count; 041 import cascading.operation.aggregator.First; 042 import cascading.operation.expression.ExpressionFunction; 043 import cascading.operation.filter.And; 044 import cascading.operation.function.UnGroup; 045 import cascading.operation.regex.RegexFilter; 046 import cascading.operation.regex.RegexParser; 047 import cascading.operation.regex.RegexSplitter; 048 import cascading.pipe.Each; 049 import cascading.pipe.Every; 050 import cascading.pipe.GroupBy; 051 import cascading.pipe.Merge; 052 import cascading.pipe.Pipe; 053 import cascading.tap.MultiSourceTap; 054 import cascading.tap.SinkMode; 055 import cascading.tap.Tap; 056 import cascading.tuple.Fields; 057 import cascading.tuple.Hasher; 058 import cascading.tuple.Tuple; 059 import cascading.tuple.TupleEntryIterator; 060 import org.junit.Test; 061 062 import static cascading.ComparePlatformsTest.NONDETERMINISTIC; 063 import static data.InputData.*; 064 065 066 public class FieldedPipesPlatformTest extends PlatformTestCase 067 { 068 public FieldedPipesPlatformTest() 069 { 070 super( true ); // leave cluster testing enabled 071 } 072 073 @Test 074 public void testSimpleGroup() throws Exception 075 { 076 getPlatform().copyFromLocal( inputFileApache ); 077 078 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 079 080 Pipe pipe = new Pipe( "test" ); 081 082 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 083 084 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 085 086 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) ); 087 088 Tap sink = getPlatform().getTextFile( getOutputPath( "simple" ), SinkMode.REPLACE ); 089 090 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 091 092 flow.complete(); 093 094 validateLength( flow.openSource(), 10 ); // validate source, this once, as a sanity check 095 validateLength( flow, 8, null ); 096 } 097 098 @Test 099 public void testSimpleChain() throws Exception 100 { 101 getPlatform().copyFromLocal( inputFileApache ); 102 103 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 104 105 Pipe pipe = new Pipe( "test" ); 106 107 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 108 109 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 110 111 pipe = new Every( pipe, new Count( new Fields( "count1" ) ) ); 112 pipe = new Every( pipe, new Count( new Fields( "count2" ) ) ); 113 pipe = new Every( pipe, new Count( new Fields( "count3" ) ) ); 114 pipe = new Every( pipe, new Count( new Fields( "count4" ) ) ); 115 116 Tap sink = getPlatform().getTabDelimitedFile( Fields.ALL, getOutputPath( "simplechain" ), SinkMode.REPLACE ); 117 118 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 119 120 flow.complete(); 121 122 validateLength( flow, 8, 5 ); 123 } 124 125 @Test 126 public void testChainEndingWithEach() throws Exception 127 { 128 getPlatform().copyFromLocal( inputFileApache ); 129 130 Pipe pipe = new Pipe( "test" ); 131 132 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 133 134 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 135 136 pipe = new Every( pipe, new Count( new Fields( "count1" ) ) ); 137 pipe = new Every( pipe, new Count( new Fields( "count2" ) ) ); 138 139 pipe = new Each( pipe, new Fields( "count1", "count2" ), new ExpressionFunction( new Fields( "sum" ), "count1 + count2", int.class ), Fields.ALL ); 140 141 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 142 Tap sink = getPlatform().getTextFile( getOutputPath( "chaineach" ), SinkMode.REPLACE ); 143 144 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 145 146 flow.complete(); 147 148 validateLength( flow, 8, null ); 149 } 150 151 // also tests the RegexSplitter 152 153 @Test 154 public void testNoGroup() throws Exception 155 { 156 getPlatform().copyFromLocal( inputFileApache ); 157 158 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileApache ); 159 160 Pipe pipe = new Pipe( "test" ); 161 162 pipe = new Each( pipe, new RegexSplitter( "\\s+" ), new Fields( 1 ) ); 163 164 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "nogroup" ), SinkMode.REPLACE ); 165 166 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 167 168 flow.complete(); 169 170 validateLength( flow, 10, null ); 171 172 List<Tuple> results = getSinkAsList( flow ); 173 174 assertTrue( results.contains( new Tuple( "75.185.76.245" ) ) ); 175 } 176 177 @Test 178 public void testCopy() throws Exception 179 { 180 getPlatform().copyFromLocal( inputFileApache ); 181 182 Tap source = getPlatform().getTextFile( new Fields( "line" ), inputFileApache ); 183 184 Pipe pipe = new Pipe( "test" ); 185 186 Tap sink = getPlatform().getTextFile( getOutputPath( "copy" ), SinkMode.REPLACE ); 187 188 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 189 190 flow.complete(); 191 192 validateLength( flow, 10, null ); 193 } 194 195 @Test 196 public void testSimpleMerge() throws Exception 197 { 198 getPlatform().copyFromLocal( inputFileLower ); 199 getPlatform().copyFromLocal( inputFileUpper ); 200 201 Tap sourceLower = getPlatform().getTextFile( inputFileLower ); 202 Tap sourceUpper = getPlatform().getTextFile( inputFileUpper ); 203 204 Map sources = new HashMap(); 205 206 sources.put( "lower", sourceLower ); 207 sources.put( "upper", sourceUpper ); 208 209 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 210 211 // using null pos so all fields are written 212 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "simplemerge" ), SinkMode.REPLACE ); 213 214 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 215 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 216 217 Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ), null, false ); 218 219 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 220 221 flow.complete(); 222 223 validateLength( flow, 10 ); 224 225 Collection results = getSinkAsList( flow ); 226 227 assertTrue( "missing value", results.contains( new Tuple( "1\ta" ) ) ); 228 assertTrue( "missing value", results.contains( new Tuple( "1\tA" ) ) ); 229 assertTrue( "missing value", results.contains( new Tuple( "2\tb" ) ) ); 230 assertTrue( "missing value", results.contains( new Tuple( "2\tB" ) ) ); 231 assertTrue( "missing value", results.contains( new Tuple( "3\tc" ) ) ); 232 assertTrue( "missing value", results.contains( new Tuple( "3\tC" ) ) ); 233 } 234 235 /** 236 * Specifically tests GroupBy will return the correct grouping fields to the following Every 237 * 238 * @throws Exception 239 */ 240 @Test 241 public void testSimpleMergeThree() throws Exception 242 { 243 getPlatform().copyFromLocal( inputFileLower ); 244 getPlatform().copyFromLocal( inputFileUpper ); 245 getPlatform().copyFromLocal( inputFileLowerOffset ); 246 247 Tap sourceLower = getPlatform().getTextFile( inputFileLower ); 248 Tap sourceUpper = getPlatform().getTextFile( inputFileUpper ); 249 Tap sourceLowerOffset = getPlatform().getTextFile( inputFileLowerOffset ); 250 251 Map sources = new HashMap(); 252 253 sources.put( "lower", sourceLower ); 254 sources.put( "upper", sourceUpper ); 255 sources.put( "offset", sourceLowerOffset ); 256 257 Tap sink = getPlatform().getTextFile( getOutputPath( "simplemergethree" ), SinkMode.REPLACE ); 258 259 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 260 261 Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter ); 262 Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter ); 263 Pipe pipeOffset = new Each( new Pipe( "offset" ), new Fields( "line" ), splitter ); 264 265 Pipe splice = new GroupBy( "merge", Pipe.pipes( pipeLower, pipeUpper, pipeOffset ), new Fields( "num" ), new Fields( "char" ) ); 266 267 splice = new Every( splice, new Fields( "char" ), new First( new Fields( "first" ) ) ); 268 269 splice = new Each( splice, new Fields( "num", "first" ), new Identity() ); 270 271 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 272 273 flow.complete(); 274 275 validateLength( flow, 6 ); 276 } 277 278 /** 279 * same test as MergePipesTest, but to test that chained groupby don't exhibit similar failures 280 * 281 * @throws Exception 282 */ 283 @Test 284 public void testSameSourceMergeThreeChainGroup() throws Exception 285 { 286 getPlatform().copyFromLocal( inputFileLower ); 287 288 Tap sourceLower = getPlatform().getTextFile( inputFileLower ); 289 290 Map sources = new HashMap(); 291 292 sources.put( "split", sourceLower ); 293 294 Tap sink = getPlatform().getTextFile( getOutputPath( "samemergethreechaingroup" ), SinkMode.REPLACE ); 295 296 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 297 298 Pipe pipe = new Pipe( "split" ); 299 300 Pipe pipeLower = new Each( new Pipe( "lower", pipe ), new Fields( "line" ), splitter ); 301 Pipe pipeUpper = new Each( new Pipe( "upper", pipe ), new Fields( "line" ), splitter ); 302 Pipe pipeOffset = new Each( new Pipe( "offset", pipe ), new Fields( "line" ), splitter ); 303 304 //put group before merge to test path counts 305 Pipe splice = new GroupBy( Pipe.pipes( pipeLower, pipeUpper ), new Fields( "num" ) ); 306 307 // this group has its incoming paths counted, gated by the previous group 308 splice = new GroupBy( Pipe.pipes( splice, pipeOffset ), new Fields( "num" ) ); 309 310 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 311 312 if( getPlatform().isMapReduce() ) 313 assertEquals( "wrong num jobs", 2, flow.getFlowSteps().size() ); 314 315 flow.complete(); 316 317 validateLength( flow, 15 ); 318 } 319 320 @Test 321 public void testUnGroup() throws Exception 322 { 323 getPlatform().copyFromLocal( inputFileJoined ); 324 325 Tap source = getPlatform().getTextFile( inputFileJoined ); 326 Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped" ), SinkMode.REPLACE ); 327 328 Pipe pipe = new Pipe( "test" ); 329 330 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) ); 331 332 pipe = new Each( pipe, new UnGroup( new Fields( "num", "char" ), new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) ); 333 334 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 335 336 flow.complete(); 337 338 validateLength( flow, 10 ); 339 } 340 341 @Test 342 public void testUnGroupAnon() throws Exception 343 { 344 getPlatform().copyFromLocal( inputFileJoined ); 345 346 Tap source = getPlatform().getTextFile( inputFileJoined ); 347 Tap sink = getPlatform().getTextFile( getOutputPath( "ungroupedanon" ), SinkMode.REPLACE ); 348 349 Pipe pipe = new Pipe( "test" ); 350 351 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ) ) ); 352 353 pipe = new Each( pipe, new UnGroup( new Fields( "num" ), Fields.fields( new Fields( "lower" ), new Fields( "upper" ) ) ) ); 354 355 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 356 357 flow.complete(); 358 359 validateLength( flow, 10 ); 360 } 361 362 @Test 363 public void testUnGroupBySize() throws Exception 364 { 365 getPlatform().copyFromLocal( inputFileJoinedExtra ); 366 367 Tap source = getPlatform().getTextFile( inputFileJoinedExtra ); 368 Tap sink = getPlatform().getTextFile( getOutputPath( "ungrouped_size" ), SinkMode.REPLACE ); 369 370 Pipe pipe = new Pipe( "test" ); 371 372 pipe = new Each( pipe, new Fields( "line" ), new RegexSplitter( new Fields( "num1", "num2", "lower", "upper" ) ) ); 373 374 pipe = new Each( pipe, new UnGroup( new Fields( "num1", "num2", "char" ), new Fields( "num1", "num2" ), 1 ) ); 375 376 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 377 378 flow.complete(); 379 380 List<Tuple> tuples = asList( flow, sink ); 381 assertEquals( 10, tuples.size() ); 382 383 List<Object> values = new ArrayList<Object>( ); 384 for (Tuple tuple: tuples) 385 values.add( tuple.getObject( 1 ) ); 386 387 assertTrue( values.contains( "1\t1\ta" ) ); 388 assertTrue( values.contains( "1\t1\tA" ) ); 389 assertTrue( values.contains( "2\t2\tb" ) ); 390 assertTrue( values.contains( "2\t2\tB" ) ); 391 assertTrue( values.contains( "3\t3\tc" ) ); 392 assertTrue( values.contains( "3\t3\tC" ) ); 393 assertTrue( values.contains( "4\t4\td" ) ); 394 assertTrue( values.contains( "4\t4\tD" ) ); 395 assertTrue( values.contains( "5\t5\te" ) ); 396 assertTrue( values.contains( "5\t5\tE" ) ); 397 } 398 399 @Test 400 public void testFilter() throws Exception 401 { 402 getPlatform().copyFromLocal( inputFileApache ); 403 404 Tap source = getPlatform().getTextFile( inputFileApache ); 405 Tap sink = getPlatform().getTextFile( getOutputPath( "filter" ), SinkMode.REPLACE ); 406 407 Pipe pipe = new Pipe( "test" ); 408 409 Filter filter = new RegexFilter( "^68.*" ); 410 411 pipe = new Each( pipe, new Fields( "line" ), filter ); 412 413 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 414 415 flow.complete(); 416 417 validateLength( flow, 3 ); 418 } 419 420 @Test 421 public void testLogicFilter() throws Exception 422 { 423 getPlatform().copyFromLocal( inputFileApache ); 424 425 Tap source = getPlatform().getTextFile( inputFileApache ); 426 Tap sink = getPlatform().getTextFile( getOutputPath( "logicfilter" ), SinkMode.REPLACE ); 427 428 Pipe pipe = new Pipe( "test" ); 429 430 Filter filter = new And( new RegexFilter( "^68.*$" ), new RegexFilter( "^1000.*$" ) ); 431 432 pipe = new Each( pipe, new Fields( "line" ), filter ); 433 434 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 435 436 flow.complete(); 437 438 validateLength( flow, 3 ); 439 } 440 441 @Test 442 public void testFilterComplex() throws Exception 443 { 444 getPlatform().copyFromLocal( inputFileApache ); 445 446 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 447 Tap sink = getPlatform().getTextFile( getOutputPath( "filtercomplex" ), SinkMode.REPLACE ); 448 449 Pipe pipe = new Pipe( "test" ); 450 451 pipe = new Each( pipe, new Fields( "line" ), TestConstants.APACHE_COMMON_PARSER ); 452 453 pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^POST" ) ); 454 pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^POST" ) ); 455 456 pipe = new Each( pipe, new Fields( "method" ), new Identity( new Fields( "value" ) ), Fields.ALL ); 457 458 pipe = new GroupBy( pipe, new Fields( "value" ) ); 459 460 pipe = new Every( pipe, new Count(), new Fields( "value", "count" ) ); 461 462 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 463 464 flow.complete(); 465 466 validateLength( flow, 1, null ); 467 } 468 469 /** 470 * Intentionally filters all values out to test next mr job behaves 471 * 472 * @throws Exception 473 */ 474 @Test 475 public void testFilterAll() throws Exception 476 { 477 getPlatform().copyFromLocal( inputFileApache ); 478 479 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 480 Tap sink = getPlatform().getTextFile( getOutputPath( "filterall" ), SinkMode.REPLACE ); 481 482 Pipe pipe = new Pipe( "test" ); 483 484 String regex = "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*) [^ ]*\\\" ([^ ]*) ([^ ]*).*$"; 485 Fields fieldDeclaration = new Fields( "ip", "time", "method", "event", "status", "size" ); 486 int[] groups = {1, 2, 3, 4, 5, 6}; 487 RegexParser function = new RegexParser( fieldDeclaration, regex, groups ); 488 pipe = new Each( pipe, new Fields( "line" ), function ); 489 490 pipe = new Each( pipe, new Fields( "method" ), new RegexFilter( "^fobar" ) ); // intentionally filtering all 491 492 pipe = new GroupBy( pipe, new Fields( "method" ) ); 493 494 pipe = new Each( pipe, new Fields( "method" ), new Identity( new Fields( "value" ) ), Fields.ALL ); 495 496 pipe = new GroupBy( pipe, new Fields( "value" ) ); 497 498 pipe = new Every( pipe, new Count(), new Fields( "value", "count" ) ); 499 500 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 501 502 flow.complete(); 503 504 validateLength( flow, 0, null ); 505 } 506 507 // public void testLimitFilter() throws Exception 508 // { 509 // copyFromLocal( inputFileApache ); 510 // 511 // Tap source = new Hfs( new TextLine( new Fields( "offset", "line" ) ), inputFileApache ); 512 // Tap sink = new Lfs( new TextLine(), outputPath + "/limitfilter", true ); 513 // 514 // Pipe pipe = new Pipe( "test" ); 515 // 516 // Filter filter = new Limit( 7 ); 517 // 518 // pipe = new Each( pipe, new Fields( "line" ), filter ); 519 // 520 // Flow flow = new FlowConnector( getProperties() ).connect( source, sink, pipe ); 521 // 522 //// flow.writeDOT( "flow.dot" ); 523 // 524 // flow.complete(); 525 // 526 // validateLength( flow, 7, null ); 527 // } 528 529 // 530 531 @Test 532 public void testSplit() throws Exception 533 { 534 getPlatform().copyFromLocal( inputFileApache ); 535 536 // 46 192 537 538 Tap source = getPlatform().getTextFile( inputFileApache ); 539 Tap sink1 = getPlatform().getTextFile( getOutputPath( "split1" ), SinkMode.REPLACE ); 540 Tap sink2 = getPlatform().getTextFile( getOutputPath( "split2" ), SinkMode.REPLACE ); 541 542 Pipe pipe = new Pipe( "split" ); 543 544 pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) ); 545 546 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) ); 547 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) ); 548 549 Map sources = new HashMap(); 550 sources.put( "split", source ); 551 552 Map sinks = new HashMap(); 553 sinks.put( "left", sink1 ); 554 sinks.put( "right", sink2 ); 555 556 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right ); 557 558 flow.complete(); 559 560 validateLength( flow, 1, "left" ); 561 validateLength( flow, 2, "right" ); 562 } 563 564 /** 565 * verifies non-safe rules apply in the proper place 566 * 567 * @throws Exception 568 */ 569 @Test 570 public void testSplitNonSafe() throws Exception 571 { 572 getPlatform().copyFromLocal( inputFileApache ); 573 574 // 46 192 575 576 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 577 Tap sink1 = getPlatform().getTextFile( getOutputPath( "nonsafesplit1" ), SinkMode.REPLACE ); 578 Tap sink2 = getPlatform().getTextFile( getOutputPath( "nonsafesplit2" ), SinkMode.REPLACE ); 579 580 Pipe pipe = new Pipe( "split" ); 581 582 // run job on non-safe operation, forces 3 mr jobs. 583 pipe = new Each( pipe, new TestFunction( new Fields( "ignore" ), new Tuple( 1 ), false ), new Fields( "line" ) ); 584 585 pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) ); 586 587 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) ); 588 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) ); 589 590 Map sources = new HashMap(); 591 sources.put( "split", source ); 592 593 Map sinks = new HashMap(); 594 sinks.put( "left", sink1 ); 595 sinks.put( "right", sink2 ); 596 597 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right ); 598 599 flow.complete(); 600 601 validateLength( flow, 1, "left" ); 602 validateLength( flow, 2, "right" ); 603 } 604 605 @Test 606 public void testSplitSameSourceMerged() throws Exception 607 { 608 getPlatform().copyFromLocal( inputFileApache ); 609 610 // 46 192 611 612 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 613 Tap sink = getPlatform().getTextFile( getOutputPath( "splitsourcemerged" ), SinkMode.REPLACE ); 614 615 Pipe pipe = new Pipe( "split" ); 616 617 pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) ); 618 619 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "line" ), new RegexFilter( ".*46.*" ) ); 620 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "line" ), new RegexFilter( ".*102.*" ) ); 621 622 Pipe merged = new GroupBy( "merged", Pipe.pipes( left, right ), new Fields( "line" ) ); 623 624 Flow flow = getPlatform().getFlowConnector().connect( source, sink, merged ); 625 626 flow.complete(); 627 628 validateLength( flow, 3 ); 629 } 630 631 /** 632 * verifies not inserting Identity between groups works 633 * 634 * @throws Exception 635 */ 636 @Test 637 public void testSplitOut() throws Exception 638 { 639 getPlatform().copyFromLocal( inputFileApache ); 640 641 Tap sourceLower = getPlatform().getTextFile( new Fields( "num", "line" ), inputFileApache ); 642 643 Map sources = new HashMap(); 644 645 sources.put( "lower1", sourceLower ); 646 647 // using null pos so all fields are written 648 Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitout1" ), SinkMode.REPLACE ); 649 Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitout2" ), SinkMode.REPLACE ); 650 651 Map sinks = new HashMap(); 652 653 sinks.put( "output1", sink1 ); 654 sinks.put( "output2", sink2 ); 655 656 Pipe pipeLower1 = new Pipe( "lower1" ); 657 658 Pipe left = new GroupBy( "output1", pipeLower1, new Fields( 0 ) ); 659 Pipe right = new GroupBy( "output2", left, new Fields( 0 ) ); 660 661 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, Pipe.pipes( left, right ) ); 662 663 // flow.writeDOT( "spit.dot" ); 664 665 flow.complete(); 666 667 validateLength( flow, 10, "output1" ); 668 validateLength( flow, 10, "output2" ); 669 670 assertEquals( 10, asSet( flow, sink1 ).size() ); 671 assertEquals( 10, asSet( flow, sink2 ).size() ); 672 } 673 674 @Test 675 public void testSplitComplex() throws Exception 676 { 677 getPlatform().copyFromLocal( inputFileApache ); 678 679 // 46 192 680 681 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 682 Tap sink1 = getPlatform().getTextFile( getOutputPath( "splitcomp1" ), SinkMode.REPLACE ); 683 Tap sink2 = getPlatform().getTextFile( getOutputPath( "splitcomp2" ), SinkMode.REPLACE ); 684 685 Pipe pipe = new Pipe( "split" ); 686 687 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 688 689 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 690 691 pipe = new Every( pipe, new Fields( "ip" ), new Count(), new Fields( "ip", "count" ) ); 692 693 pipe = new Each( pipe, new Fields( "ip" ), new RegexFilter( "^68.*" ) ); 694 695 Pipe left = new Each( new Pipe( "left", pipe ), new Fields( "ip" ), new RegexFilter( ".*46.*" ) ); 696 697 Pipe right = new Each( new Pipe( "right", pipe ), new Fields( "ip" ), new RegexFilter( ".*102.*" ) ); 698 699 Map sources = Cascades.tapsMap( "split", source ); 700 Map sinks = Cascades.tapsMap( Pipe.pipes( left, right ), Tap.taps( sink1, sink2 ) ); 701 702 Flow flow = getPlatform().getFlowConnector().connect( sources, sinks, left, right ); 703 704 flow.complete(); 705 706 validateLength( flow, 1, "left" ); 707 validateLength( flow, 1, "right" ); 708 } 709 710 @Test 711 public void testConcatenation() throws Exception 712 { 713 getPlatform().copyFromLocal( inputFileLower ); 714 getPlatform().copyFromLocal( inputFileUpper ); 715 716 Tap sourceLower = getPlatform().getTextFile( inputFileLower ); 717 Tap sourceUpper = getPlatform().getTextFile( inputFileUpper ); 718 719 Tap source = new MultiSourceTap( sourceLower, sourceUpper ); 720 721 Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " ); 722 723 // using null pos so all fields are written 724 Tap sink = getPlatform().getTextFile( getOutputPath( "complexconcat" ), SinkMode.REPLACE ); 725 726 Pipe pipe = new Each( new Pipe( "concat" ), new Fields( "line" ), splitter ); 727 728 Pipe splice = new GroupBy( pipe, new Fields( "num" ) ); 729 730 Flow countFlow = getPlatform().getFlowConnector().connect( source, sink, splice ); 731 732 countFlow.complete(); 733 734 validateLength( countFlow, 10, null ); 735 } 736 737 @Test 738 public void testGeneratorAggregator() throws Exception 739 { 740 getPlatform().copyFromLocal( inputFileApache ); 741 742 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 743 744 Pipe pipe = new Pipe( "test" ); 745 746 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 747 748 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 749 750 pipe = new Every( pipe, new TestAggregator( new Fields( "count1" ), new Fields( "ip" ), new Tuple( "first1" ), new Tuple( "first2" ) ) ); 751 pipe = new Every( pipe, new TestAggregator( new Fields( "count2" ), new Fields( "ip" ), new Tuple( "second" ), new Tuple( "second2" ), new Tuple( "second3" ) ) ); 752 753 Tap sink = getPlatform().getTextFile( getOutputPath( "generatoraggregator" ), SinkMode.REPLACE ); 754 755 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 756 757 flow.complete(); 758 759 validateLength( flow, 8 * 2 * 3, null ); 760 } 761 762 /** 763 * If the sinks have the same scheme as a temp tap, replace the temp tap 764 * 765 * @throws Exception 766 */ 767 @Test 768 public void testChainedTaps() throws Exception 769 { 770 getPlatform().copyFromLocal( inputFileApache ); 771 772 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 773 774 Pipe pipe = new Each( new Pipe( "first" ), new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 775 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 776 777 pipe = new Each( new Pipe( "second", pipe ), new Fields( "ip" ), new RegexFilter( "7" ) ); 778 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 779 780 pipe = new Each( new Pipe( "third", pipe ), new Fields( "ip" ), new RegexFilter( "6" ) ); 781 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 782 783 Tap sinkFirst = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "chainedtaps/first" ), SinkMode.REPLACE ); 784 Tap sinkSecond = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "chainedtaps/second" ), SinkMode.REPLACE ); 785 Tap sinkThird = getPlatform().getTabDelimitedFile( new Fields( "ip" ), getOutputPath( "chainedtaps/third" ), SinkMode.REPLACE ); 786 787 Map<String, Tap> sinks = Cascades.tapsMap( new String[]{"first", "second", 788 "third"}, Tap.taps( sinkFirst, sinkSecond, sinkThird ) ); 789 790 Flow flow = getPlatform().getFlowConnector().connect( source, sinks, pipe ); 791 792 if( getPlatform().isMapReduce() ) 793 assertEquals( "wrong number of steps", 3, flow.getFlowSteps().size() ); 794 795 flow.complete(); 796 797 validateLength( flow, 3 ); 798 } 799 800 @Test 801 public void testReplace() throws Exception 802 { 803 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 804 Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "offset", "line" ), getOutputPath( "replace" ), SinkMode.REPLACE ); 805 806 Pipe pipe = new Pipe( "test" ); 807 808 Function parser = new RegexParser( new Fields( 0 ), "^[^ ]*" ); 809 pipe = new Each( pipe, new Fields( "line" ), parser, Fields.REPLACE ); 810 pipe = new Each( pipe, new Fields( "line" ), new Identity( Fields.ARGS ), Fields.REPLACE ); 811 pipe = new Each( pipe, new Fields( "line" ), new Identity( new Fields( "line" ) ), Fields.REPLACE ); 812 813 pipe = new Each( pipe, new Debug( true ) ); 814 815 Flow flow = getPlatform().getFlowConnector( disableDebug() ).connect( source, sink, pipe ); 816 817 flow.complete(); 818 819 validateLength( flow, 10, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) ); 820 } 821 822 @Test 823 public void testSwap() throws Exception 824 { 825 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 826 Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ipaddress" ), getOutputPath( "swap" ), SinkMode.REPLACE ); 827 828 Pipe pipe = new Pipe( "test" ); 829 830 Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" ); 831 pipe = new Each( pipe, new Fields( "line" ), parser, Fields.SWAP ); 832 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 833 pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) ); 834 pipe = new Each( pipe, new Fields( "ip" ), new Identity( new Fields( "ipaddress" ) ), Fields.SWAP ); 835 836 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 837 838 flow.complete(); 839 840 validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) ); 841 } 842 843 @Test 844 public void testNone() throws Exception 845 { 846 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 847 Tap sink = getPlatform().getTextFile( new Fields( "offset", "line" ), new Fields( "count", "ip" ), getOutputPath( "none" ), SinkMode.REPLACE ); 848 849 Pipe pipe = new Pipe( "test" ); 850 851 Function parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" ); 852 pipe = new Each( pipe, new Fields( "line" ), parser, Fields.ALL ); 853 pipe = new Each( pipe, new Fields( "line" ), new NoOp(), Fields.SWAP ); // declares Fields.NONE 854 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 855 pipe = new Every( pipe, new Fields( "ip" ), new Count( new Fields( "count" ) ) ); 856 pipe = new Each( pipe, Fields.NONE, new Insert( new Fields( "ipaddress" ), "1.2.3.4" ), Fields.ALL ); 857 858 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 859 860 flow.complete(); 861 862 validateLength( flow, 8, 2, Pattern.compile( "^\\d+\\s\\d+\\s[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}$" ) ); 863 } 864 865 /** 866 * this tests a merge on two pipes with the same source and name. 867 * 868 * @throws Exception 869 */ 870 @Test 871 public void testSplitSameSourceMergedSameName() throws Exception 872 { 873 getPlatform().copyFromLocal( inputFileApache ); 874 875 // 46 192 876 877 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 878 Tap sink = getPlatform().getTextFile( getOutputPath( "splitsourcemergedsamename" ), SinkMode.REPLACE ); 879 880 Pipe pipe = new Pipe( "split" ); 881 882 pipe = new Each( pipe, new Fields( "line" ), new RegexFilter( "^68.*" ) ); 883 884 Pipe left = new Each( pipe, new Fields( "line" ), new RegexFilter( ".*46.*" ) ); 885 Pipe right = new Each( pipe, new Fields( "line" ), new RegexFilter( ".*102.*" ) ); 886 887 Pipe merged = new GroupBy( "merged", Pipe.pipes( left, right ), new Fields( "line" ) ); 888 889 Flow flow = getPlatform().getFlowConnector().connect( source, sink, merged ); 890 891 flow.complete(); 892 893 validateLength( flow, 3 ); 894 } 895 896 /** 897 * Catches failure to properly resolve the grouping fields as incoming to the second group-by 898 * 899 * @throws Exception 900 */ 901 @Test 902 public void testGroupGroup() throws Exception 903 { 904 getPlatform().copyFromLocal( inputFileApache ); 905 906 Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileApache ); 907 908 Pipe pipe = new Pipe( "test" ); 909 910 pipe = new Each( pipe, new Fields( "line" ), new RegexParser( new Fields( "ip" ), "^[^ ]*" ), new Fields( "ip" ) ); 911 912 pipe = new GroupBy( pipe, new Fields( "ip" ) ); 913 914 pipe = new Every( pipe, new Count(), new Fields( "ip", "count" ) ); 915 916 pipe = new GroupBy( pipe, new Fields( "ip" ), new Fields( "count" ) ); 917 918 Tap sink = getPlatform().getTextFile( getOutputPath( "groupgroup" ), SinkMode.REPLACE ); 919 920 Flow flow = getPlatform().getFlowConnector().connect( source, sink, pipe ); 921 922 flow.complete(); 923 924 validateLength( flow, 8, null ); 925 } 926 927 928 public static class LowerComparator implements Comparator<Comparable>, Hasher<Comparable>, Serializable 929 { 930 @Override 931 public int compare( Comparable lhs, Comparable rhs ) 932 { 933 return lhs.toString().toLowerCase().compareTo( rhs.toString().toLowerCase() ); 934 } 935 936 @Override 937 public int hashCode( Comparable value ) 938 { 939 return value.toString().toLowerCase().hashCode(); 940 } 941 } 942 943 @Test 944 public void testGroupByInsensitive() throws Exception 945 { 946 getPlatform().copyFromLocal( inputFileLower ); 947 getPlatform().copyFromLocal( inputFileUpper ); 948 949 Tap sourceLower = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileLower ); 950 Tap sourceUpper = getPlatform().getDelimitedFile( new Fields( "num", "char" ), " ", inputFileUpper ); 951 952 Map sources = new HashMap(); 953 954 sources.put( "lower", sourceLower ); 955 sources.put( "upper", sourceUpper ); 956 957 Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "insensitivegrouping" + NONDETERMINISTIC ), SinkMode.REPLACE ); 958 959 Pipe pipeLower = new Pipe( "lower" ); 960 Pipe pipeUpper = new Pipe( "upper" ); 961 962 Pipe merge = new Merge( pipeLower, pipeUpper ); 963 964 Fields charFields = new Fields( "char" ); 965 charFields.setComparator( "char", new LowerComparator() ); 966 967 Pipe splice = new GroupBy( merge, charFields ); 968 969 splice = new Every( splice, new Fields( "char" ), new Count() ); 970 971 Flow flow = getPlatform().getFlowConnector().connect( sources, sink, splice ); 972 973 flow.complete(); 974 975 // we can't guarantee if the grouping key will be upper or lower 976 validateLength( flow, 5, 1, Pattern.compile( "^\\w+\\s2$" ) ); 977 } 978 }