001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading; 022 023import java.io.File; 024import java.io.IOException; 025import java.io.Serializable; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Collection; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.HashSet; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.regex.Pattern; 036 037import cascading.flow.Flow; 038import cascading.flow.FlowProcess; 039import cascading.flow.planner.FlowPlanner; 040import cascading.flow.stream.graph.StreamGraph; 041import cascading.operation.Aggregator; 042import cascading.operation.Buffer; 043import cascading.operation.ConcreteCall; 044import cascading.operation.Filter; 045import cascading.operation.Function; 046import cascading.tap.Tap; 047import cascading.tuple.Fields; 048import cascading.tuple.Tuple; 049import cascading.tuple.TupleEntry; 050import cascading.tuple.TupleEntryIterator; 051import cascading.tuple.TupleListCollector; 052import cascading.util.Util; 053import junit.framework.TestCase; 054import org.junit.After; 055import org.junit.Before; 056import org.junit.Rule; 057import org.junit.rules.TestName; 058import org.junit.runner.RunWith; 059import org.junit.runners.BlockJUnit4ClassRunner; 060 061/** 062 * Class CascadingTestCase is the base class for all Cascading tests. 063 * <p/> 064 * It included a few helpful utility methods for testing Cascading applications. 065 */ 066@RunWith(BlockJUnit4ClassRunner.class) 067public abstract class CascadingTestCase extends TestCase implements Serializable 068 { 069 public static final String ROOT_OUTPUT_PATH = "test.output.root"; 070 public static final String ROOT_PLAN_PATH = "test.plan.root"; 071 public static final String TEST_TRACEPLAN_ENABLED = "test.traceplan.enabled"; 072 073 private String outputPath; 074 private String planPath; 075 076 @Rule 077 public transient TestName name = new TestName(); 078 079 static class TestFlowProcess extends FlowProcess.NullFlowProcess 080 { 081 private final Map<Object, Object> properties; 082 083 public TestFlowProcess( Map<Object, Object> properties ) 084 { 085 this.properties = properties; 086 } 087 088 @Override 089 public Object getProperty( String key ) 090 { 091 return properties.get( key ); 092 } 093 } 094 095 public CascadingTestCase() 096 { 097 } 098 099 public CascadingTestCase( String name ) 100 { 101 super( name ); 102 } 103 104 @Override 105 @Before 106 public void setUp() throws Exception 107 { 108 super.setUp(); 109 110 if( Boolean.getBoolean( TEST_TRACEPLAN_ENABLED ) ) 111 { 112 System.setProperty( FlowPlanner.TRACE_PLAN_PATH, Util.join( "/", getPlanPath(), "planner" ) ); 113 System.setProperty( FlowPlanner.TRACE_PLAN_TRANSFORM_PATH, Util.join( "/", getPlanPath(), "planner" ) ); 114 System.setProperty( FlowPlanner.TRACE_STATS_PATH, Util.join( "/", getPlanPath(), "planner" ) ); 115 System.setProperty( "platform." + StreamGraph.DOT_FILE_PATH, Util.join( "/", getPlanPath(), "stream" ) ); // pass down 116 } 117 } 118 119 @Override 120 @After 121 public void tearDown() throws Exception 122 { 123 super.tearDown(); 124 } 125 126 protected static String getTestOutputRoot() 127 { 128 return System.getProperty( ROOT_OUTPUT_PATH, "build/test/output" ).replace( ":", "_" ); 129 } 130 131 protected static String getTestPlanRoot() 132 { 133 return System.getProperty( ROOT_PLAN_PATH, "build/test/plan" ).replace( ":", "_" ); 134 } 135 136 protected String[] getOutputPathElements() 137 { 138 return new String[]{getTestOutputRoot(), getTestCaseName(), getTestName()}; 139 } 140 141 protected String[] getPlanPathElements() 142 { 143 return new String[]{getTestPlanRoot(), getTestCaseName(), getTestName()}; 144 } 145 146 protected String getOutputPath() 147 { 148 if( outputPath == null ) 149 outputPath = Util.join( getOutputPathElements(), File.separator ); 150 151 return outputPath; 152 } 153 154 protected String getPlanPath() 155 { 156 if( planPath == null ) 157 planPath = Util.join( getPlanPathElements(), File.separator ); 158 159 return planPath; 160 } 161 162 public String getTestCaseName() 163 { 164 return getClass().getSimpleName().replaceAll( "^(.*)Test.*$", "$1" ).toLowerCase(); 165 } 166 167 public String getTestName() 168 { 169 return name.getMethodName(); 170 } 171 172 public static void validateLength( Flow flow, int length ) throws IOException 173 { 174 validateLength( flow, length, -1 ); 175 } 176 177 public static void validateLength( Flow flow, int length, String name ) throws IOException 178 { 179 validateLength( flow, length, -1, null, name ); 180 } 181 182 public static void validateLength( Flow flow, int length, int size ) throws IOException 183 { 184 validateLength( flow, length, size, null, null ); 185 } 186 187 public static void validateLength( Flow flow, int length, int size, Pattern regex ) throws IOException 188 { 189 validateLength( flow, length, size, regex, null ); 190 } 191 192 public static void validateLength( Flow flow, int length, Pattern regex, String name ) throws IOException 193 { 194 validateLength( flow, length, -1, regex, name ); 195 } 196 197 public static void validateLength( Flow flow, int length, int size, Pattern regex, String name ) throws IOException 198 { 199 TupleEntryIterator iterator = name == null ? flow.openSink() : flow.openSink( name ); 200 validateLength( iterator, length, size, regex ); 201 } 202 203 public static void validateLength( TupleEntryIterator iterator, int length ) 204 { 205 validateLength( iterator, length, -1, null ); 206 } 207 208 public static void validateLength( TupleEntryIterator iterator, int length, int size ) 209 { 210 validateLength( iterator, length, size, null ); 211 } 212 213 public static void validateLength( TupleEntryIterator iterator, int length, Pattern regex ) 214 { 215 validateLength( iterator, length, -1, regex ); 216 } 217 218 public static void validateLength( TupleEntryIterator iterator, int length, int size, Pattern regex ) 219 { 220 int count = 0; 221 222 while( iterator.hasNext() ) 223 { 224 TupleEntry tupleEntry = iterator.next(); 225 226 if( size != -1 ) 227 assertEquals( "wrong number of elements", size, tupleEntry.size() ); 228 229 if( regex != null ) 230 assertTrue( "regex: " + regex + " does not match: " + tupleEntry.getTuple().toString(), regex.matcher( tupleEntry.getTuple().toString() ).matches() ); 231 232 count++; 233 } 234 235 try 236 { 237 iterator.close(); 238 } 239 catch( IOException exception ) 240 { 241 throw new RuntimeException( exception ); 242 } 243 244 assertEquals( "wrong number of lines", length, count ); 245 } 246 247 public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields ) 248 { 249 return invokeFunction( function, new TupleEntry( arguments ), resultFields ); 250 } 251 252 public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields, Map<Object, Object> properties ) 253 { 254 return invokeFunction( function, new TupleEntry( arguments ), resultFields, properties ); 255 } 256 257 public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields ) 258 { 259 return invokeFunction( function, arguments, resultFields, new HashMap<Object, Object>() ); 260 } 261 262 public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields, Map<Object, Object> properties ) 263 { 264 FlowProcess flowProcess = new TestFlowProcess( properties ); 265 ConcreteCall operationCall = new ConcreteCall( arguments.getFields() ); 266 TupleListCollector collector = new TupleListCollector( resultFields, true ); 267 268 operationCall.setArguments( arguments ); 269 operationCall.setOutputCollector( collector ); 270 271 function.prepare( flowProcess, operationCall ); 272 function.operate( flowProcess, operationCall ); 273 function.cleanup( flowProcess, operationCall ); 274 275 return collector; 276 } 277 278 public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields ) 279 { 280 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 281 282 return invokeFunction( function, entries, resultFields ); 283 } 284 285 public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 286 { 287 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 288 289 return invokeFunction( function, entries, resultFields, properties ); 290 } 291 292 public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields ) 293 { 294 return invokeFunction( function, argumentsArray, resultFields, new HashMap<Object, Object>() ); 295 } 296 297 public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 298 { 299 FlowProcess flowProcess = new TestFlowProcess( properties ); 300 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 301 TupleListCollector collector = new TupleListCollector( resultFields, true ); 302 303 function.prepare( flowProcess, operationCall ); 304 operationCall.setOutputCollector( collector ); 305 306 for( TupleEntry arguments : argumentsArray ) 307 { 308 operationCall.setArguments( arguments ); 309 function.operate( flowProcess, operationCall ); 310 } 311 312 function.flush( flowProcess, operationCall ); 313 function.cleanup( flowProcess, operationCall ); 314 315 return collector; 316 } 317 318 public static boolean invokeFilter( Filter filter, Tuple arguments ) 319 { 320 return invokeFilter( filter, new TupleEntry( arguments ) ); 321 } 322 323 public static boolean invokeFilter( Filter filter, Tuple arguments, Map<Object, Object> properties ) 324 { 325 return invokeFilter( filter, new TupleEntry( arguments ), properties ); 326 } 327 328 public static boolean invokeFilter( Filter filter, TupleEntry arguments ) 329 { 330 return invokeFilter( filter, arguments, new HashMap<Object, Object>() ); 331 } 332 333 public static boolean invokeFilter( Filter filter, TupleEntry arguments, Map<Object, Object> properties ) 334 { 335 FlowProcess flowProcess = new TestFlowProcess( properties ); 336 ConcreteCall operationCall = new ConcreteCall( arguments.getFields() ); 337 338 operationCall.setArguments( arguments ); 339 340 filter.prepare( flowProcess, operationCall ); 341 342 boolean isRemove = filter.isRemove( flowProcess, operationCall ); 343 344 filter.cleanup( flowProcess, operationCall ); 345 346 return isRemove; 347 } 348 349 public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray ) 350 { 351 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 352 353 return invokeFilter( filter, entries, Collections.emptyMap() ); 354 } 355 356 public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray, Map<Object, Object> properties ) 357 { 358 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 359 360 return invokeFilter( filter, entries, properties ); 361 } 362 363 public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray ) 364 { 365 return invokeFilter( filter, argumentsArray, Collections.emptyMap() ); 366 } 367 368 public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray, Map<Object, Object> properties ) 369 { 370 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 371 372 FlowProcess flowProcess = new TestFlowProcess( properties ); 373 374 filter.prepare( flowProcess, operationCall ); 375 376 boolean[] results = new boolean[ argumentsArray.length ]; 377 378 for( int i = 0; i < argumentsArray.length; i++ ) 379 { 380 operationCall.setArguments( argumentsArray[ i ] ); 381 382 results[ i ] = filter.isRemove( flowProcess, operationCall ); 383 } 384 385 filter.flush( flowProcess, operationCall ); 386 filter.cleanup( flowProcess, operationCall ); 387 388 return results; 389 } 390 391 public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields ) 392 { 393 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 394 395 return invokeAggregator( aggregator, entries, resultFields ); 396 } 397 398 public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 399 { 400 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 401 402 return invokeAggregator( aggregator, entries, resultFields, properties ); 403 } 404 405 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields ) 406 { 407 return invokeAggregator( aggregator, null, argumentsArray, resultFields ); 408 } 409 410 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 411 { 412 return invokeAggregator( aggregator, null, argumentsArray, resultFields, properties ); 413 } 414 415 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields ) 416 { 417 return invokeAggregator( aggregator, group, argumentsArray, resultFields, Collections.emptyMap() ); 418 } 419 420 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 421 { 422 FlowProcess flowProcess = new TestFlowProcess( properties ); 423 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 424 425 operationCall.setGroup( group ); 426 427 aggregator.prepare( flowProcess, operationCall ); 428 429 aggregator.start( flowProcess, operationCall ); 430 431 for( TupleEntry arguments : argumentsArray ) 432 { 433 operationCall.setArguments( arguments ); 434 aggregator.aggregate( flowProcess, operationCall ); 435 } 436 437 TupleListCollector collector = new TupleListCollector( resultFields, true ); 438 operationCall.setOutputCollector( collector ); 439 440 aggregator.complete( flowProcess, operationCall ); 441 442 aggregator.cleanup( null, operationCall ); 443 444 return collector; 445 } 446 447 public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields ) 448 { 449 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 450 451 return invokeBuffer( buffer, entries, resultFields ); 452 } 453 454 public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 455 { 456 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 457 458 return invokeBuffer( buffer, entries, resultFields, properties ); 459 } 460 461 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields ) 462 { 463 return invokeBuffer( buffer, null, argumentsArray, resultFields ); 464 } 465 466 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 467 { 468 return invokeBuffer( buffer, null, argumentsArray, resultFields, properties ); 469 } 470 471 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields ) 472 { 473 return invokeBuffer( buffer, group, argumentsArray, resultFields, Collections.emptyMap() ); 474 } 475 476 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties ) 477 { 478 FlowProcess flowProcess = new TestFlowProcess( properties ); 479 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 480 481 operationCall.setGroup( group ); 482 483 buffer.prepare( flowProcess, operationCall ); 484 TupleListCollector collector = new TupleListCollector( resultFields, true ); 485 operationCall.setOutputCollector( collector ); 486 487 operationCall.setArgumentsIterator( Arrays.asList( argumentsArray ).iterator() ); 488 489 buffer.operate( flowProcess, operationCall ); 490 491 buffer.cleanup( null, operationCall ); 492 493 return collector; 494 } 495 496 private static TupleEntry[] makeArgumentsArray( Tuple[] argumentsArray ) 497 { 498 TupleEntry[] entries = new TupleEntry[ argumentsArray.length ]; 499 500 for( int i = 0; i < argumentsArray.length; i++ ) 501 entries[ i ] = new TupleEntry( argumentsArray[ i ] ); 502 503 return entries; 504 } 505 506 public static List<Tuple> getSourceAsList( Flow flow ) throws IOException 507 { 508 return asCollection( flow, (Tap) flow.getSourcesCollection().iterator().next(), Fields.ALL, new ArrayList<Tuple>() ); 509 } 510 511 public static List<Tuple> getSinkAsList( Flow flow ) throws IOException 512 { 513 return asCollection( flow, flow.getSink(), Fields.ALL, new ArrayList<Tuple>() ); 514 } 515 516 public static List<Tuple> asList( Flow flow, Tap tap ) throws IOException 517 { 518 return asCollection( flow, tap, Fields.ALL, new ArrayList<Tuple>() ); 519 } 520 521 public static List<Tuple> asList( Flow flow, Tap tap, Fields selector ) throws IOException 522 { 523 return asCollection( flow, tap, selector, new ArrayList<Tuple>() ); 524 } 525 526 public static Set<Tuple> asSet( Flow flow, Tap tap ) throws IOException 527 { 528 return asCollection( flow, tap, Fields.ALL, new HashSet<Tuple>() ); 529 } 530 531 public static Set<Tuple> asSet( Flow flow, Tap tap, Fields selector ) throws IOException 532 { 533 return asCollection( flow, tap, selector, new HashSet<Tuple>() ); 534 } 535 536 public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, C collection ) throws IOException 537 { 538 return asCollection( flow, tap, Fields.ALL, collection ); 539 } 540 541 public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, Fields selector, C collection ) throws IOException 542 { 543 try (TupleEntryIterator iterator = flow.openTapForRead( tap )) 544 { 545 return asCollection( iterator, selector, collection ); 546 } 547 } 548 549 public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, C result ) 550 { 551 while( iterator.hasNext() ) 552 result.add( iterator.next().getTupleCopy() ); 553 554 return result; 555 } 556 557 public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, Fields selector, C result ) 558 { 559 while( iterator.hasNext() ) 560 result.add( iterator.next().selectTupleCopy( selector ) ); 561 562 return result; 563 } 564 }