001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading;
022
023import java.io.File;
024import java.io.IOException;
025import java.io.Serializable;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.regex.Pattern;
036
037import cascading.flow.Flow;
038import cascading.flow.FlowProcess;
039import cascading.flow.planner.FlowPlanner;
040import cascading.flow.stream.graph.StreamGraph;
041import cascading.operation.Aggregator;
042import cascading.operation.Buffer;
043import cascading.operation.ConcreteCall;
044import cascading.operation.Filter;
045import cascading.operation.Function;
046import cascading.tap.Tap;
047import cascading.tuple.Fields;
048import cascading.tuple.Tuple;
049import cascading.tuple.TupleEntry;
050import cascading.tuple.TupleEntryIterator;
051import cascading.tuple.TupleListCollector;
052import cascading.util.Util;
053import junit.framework.TestCase;
054import org.junit.After;
055import org.junit.Before;
056import org.junit.Rule;
057import org.junit.rules.TestName;
058import org.junit.runner.RunWith;
059import org.junit.runners.BlockJUnit4ClassRunner;
060
061/**
062 * Class CascadingTestCase is the base class for all Cascading tests.
063 * <p/>
064 * It included a few helpful utility methods for testing Cascading applications.
065 */
066@RunWith(BlockJUnit4ClassRunner.class)
067public abstract class CascadingTestCase extends TestCase implements Serializable
068  {
069  public static final String ROOT_OUTPUT_PATH = "test.output.root";
070  public static final String ROOT_PLAN_PATH = "test.plan.root";
071  public static final String TEST_TRACEPLAN_ENABLED = "test.traceplan.enabled";
072
073  private String outputPath;
074  private String planPath;
075
076  @Rule
077  public transient TestName name = new TestName();
078
079  static class TestFlowProcess extends FlowProcess.NullFlowProcess
080    {
081    private final Map<Object, Object> properties;
082
083    public TestFlowProcess( Map<Object, Object> properties )
084      {
085      this.properties = properties;
086      }
087
088    @Override
089    public Object getProperty( String key )
090      {
091      return properties.get( key );
092      }
093    }
094
095  public CascadingTestCase()
096    {
097    }
098
099  public CascadingTestCase( String name )
100    {
101    super( name );
102    }
103
104  @Override
105  @Before
106  public void setUp() throws Exception
107    {
108    super.setUp();
109
110    if( Boolean.getBoolean( TEST_TRACEPLAN_ENABLED ) )
111      {
112      System.setProperty( FlowPlanner.TRACE_PLAN_PATH, Util.join( "/", getPlanPath(), "planner" ) );
113      System.setProperty( FlowPlanner.TRACE_PLAN_TRANSFORM_PATH, Util.join( "/", getPlanPath(), "planner" ) );
114      System.setProperty( FlowPlanner.TRACE_STATS_PATH, Util.join( "/", getPlanPath(), "planner" ) );
115      System.setProperty( "platform." + StreamGraph.DOT_FILE_PATH, Util.join( "/", getPlanPath(), "stream" ) ); // pass down
116      }
117    }
118
119  @Override
120  @After
121  public void tearDown() throws Exception
122    {
123    super.tearDown();
124    }
125
126  protected static String getTestOutputRoot()
127    {
128    return System.getProperty( ROOT_OUTPUT_PATH, "build/test/output" ).replace( ":", "_" );
129    }
130
131  protected static String getTestPlanRoot()
132    {
133    return System.getProperty( ROOT_PLAN_PATH, "build/test/plan" ).replace( ":", "_" );
134    }
135
136  protected String[] getOutputPathElements()
137    {
138    return new String[]{getTestOutputRoot(), getTestCaseName(), getTestName()};
139    }
140
141  protected String[] getPlanPathElements()
142    {
143    return new String[]{getTestPlanRoot(), getTestCaseName(), getTestName()};
144    }
145
146  protected String getOutputPath()
147    {
148    if( outputPath == null )
149      outputPath = Util.join( getOutputPathElements(), File.separator );
150
151    return outputPath;
152    }
153
154  protected String getPlanPath()
155    {
156    if( planPath == null )
157      planPath = Util.join( getPlanPathElements(), File.separator );
158
159    return planPath;
160    }
161
162  public String getTestCaseName()
163    {
164    return getClass().getSimpleName().replaceAll( "^(.*)Test.*$", "$1" ).toLowerCase();
165    }
166
167  public String getTestName()
168    {
169    return name.getMethodName();
170    }
171
172  public static void validateLength( Flow flow, int length ) throws IOException
173    {
174    validateLength( flow, length, -1 );
175    }
176
177  public static void validateLength( Flow flow, int length, String name ) throws IOException
178    {
179    validateLength( flow, length, -1, null, name );
180    }
181
182  public static void validateLength( Flow flow, int length, int size ) throws IOException
183    {
184    validateLength( flow, length, size, null, null );
185    }
186
187  public static void validateLength( Flow flow, int length, int size, Pattern regex ) throws IOException
188    {
189    validateLength( flow, length, size, regex, null );
190    }
191
192  public static void validateLength( Flow flow, int length, Pattern regex, String name ) throws IOException
193    {
194    validateLength( flow, length, -1, regex, name );
195    }
196
197  public static void validateLength( Flow flow, int length, int size, Pattern regex, String name ) throws IOException
198    {
199    TupleEntryIterator iterator = name == null ? flow.openSink() : flow.openSink( name );
200    validateLength( iterator, length, size, regex );
201    }
202
203  public static void validateLength( TupleEntryIterator iterator, int length )
204    {
205    validateLength( iterator, length, -1, null );
206    }
207
208  public static void validateLength( TupleEntryIterator iterator, int length, int size )
209    {
210    validateLength( iterator, length, size, null );
211    }
212
213  public static void validateLength( TupleEntryIterator iterator, int length, Pattern regex )
214    {
215    validateLength( iterator, length, -1, regex );
216    }
217
218  public static void validateLength( TupleEntryIterator iterator, int length, int size, Pattern regex )
219    {
220    int count = 0;
221
222    while( iterator.hasNext() )
223      {
224      TupleEntry tupleEntry = iterator.next();
225
226      if( size != -1 )
227        assertEquals( "wrong number of elements", size, tupleEntry.size() );
228
229      if( regex != null )
230        assertTrue( "regex: " + regex + " does not match: " + tupleEntry.getTuple().toString(), regex.matcher( tupleEntry.getTuple().toString() ).matches() );
231
232      count++;
233      }
234
235    try
236      {
237      iterator.close();
238      }
239    catch( IOException exception )
240      {
241      throw new RuntimeException( exception );
242      }
243
244    assertEquals( "wrong number of lines", length, count );
245    }
246
247  public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields )
248    {
249    return invokeFunction( function, new TupleEntry( arguments ), resultFields );
250    }
251
252  public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields, Map<Object, Object> properties )
253    {
254    return invokeFunction( function, new TupleEntry( arguments ), resultFields, properties );
255    }
256
257  public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields )
258    {
259    return invokeFunction( function, arguments, resultFields, new HashMap<Object, Object>() );
260    }
261
262  public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields, Map<Object, Object> properties )
263    {
264    FlowProcess flowProcess = new TestFlowProcess( properties );
265    ConcreteCall operationCall = new ConcreteCall( arguments.getFields() );
266    TupleListCollector collector = new TupleListCollector( resultFields, true );
267
268    operationCall.setArguments( arguments );
269    operationCall.setOutputCollector( collector );
270
271    function.prepare( flowProcess, operationCall );
272    function.operate( flowProcess, operationCall );
273    function.cleanup( flowProcess, operationCall );
274
275    return collector;
276    }
277
278  public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields )
279    {
280    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
281
282    return invokeFunction( function, entries, resultFields );
283    }
284
285  public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
286    {
287    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
288
289    return invokeFunction( function, entries, resultFields, properties );
290    }
291
292  public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields )
293    {
294    return invokeFunction( function, argumentsArray, resultFields, new HashMap<Object, Object>() );
295    }
296
297  public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
298    {
299    FlowProcess flowProcess = new TestFlowProcess( properties );
300    ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
301    TupleListCollector collector = new TupleListCollector( resultFields, true );
302
303    function.prepare( flowProcess, operationCall );
304    operationCall.setOutputCollector( collector );
305
306    for( TupleEntry arguments : argumentsArray )
307      {
308      operationCall.setArguments( arguments );
309      function.operate( flowProcess, operationCall );
310      }
311
312    function.flush( flowProcess, operationCall );
313    function.cleanup( flowProcess, operationCall );
314
315    return collector;
316    }
317
318  public static boolean invokeFilter( Filter filter, Tuple arguments )
319    {
320    return invokeFilter( filter, new TupleEntry( arguments ) );
321    }
322
323  public static boolean invokeFilter( Filter filter, Tuple arguments, Map<Object, Object> properties )
324    {
325    return invokeFilter( filter, new TupleEntry( arguments ), properties );
326    }
327
328  public static boolean invokeFilter( Filter filter, TupleEntry arguments )
329    {
330    return invokeFilter( filter, arguments, new HashMap<Object, Object>() );
331    }
332
333  public static boolean invokeFilter( Filter filter, TupleEntry arguments, Map<Object, Object> properties )
334    {
335    FlowProcess flowProcess = new TestFlowProcess( properties );
336    ConcreteCall operationCall = new ConcreteCall( arguments.getFields() );
337
338    operationCall.setArguments( arguments );
339
340    filter.prepare( flowProcess, operationCall );
341
342    boolean isRemove = filter.isRemove( flowProcess, operationCall );
343
344    filter.cleanup( flowProcess, operationCall );
345
346    return isRemove;
347    }
348
349  public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray )
350    {
351    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
352
353    return invokeFilter( filter, entries, Collections.emptyMap() );
354    }
355
356  public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray, Map<Object, Object> properties )
357    {
358    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
359
360    return invokeFilter( filter, entries, properties );
361    }
362
363  public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray )
364    {
365    return invokeFilter( filter, argumentsArray, Collections.emptyMap() );
366    }
367
368  public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray, Map<Object, Object> properties )
369    {
370    ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
371
372    FlowProcess flowProcess = new TestFlowProcess( properties );
373
374    filter.prepare( flowProcess, operationCall );
375
376    boolean[] results = new boolean[ argumentsArray.length ];
377
378    for( int i = 0; i < argumentsArray.length; i++ )
379      {
380      operationCall.setArguments( argumentsArray[ i ] );
381
382      results[ i ] = filter.isRemove( flowProcess, operationCall );
383      }
384
385    filter.flush( flowProcess, operationCall );
386    filter.cleanup( flowProcess, operationCall );
387
388    return results;
389    }
390
391  public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields )
392    {
393    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
394
395    return invokeAggregator( aggregator, entries, resultFields );
396    }
397
398  public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
399    {
400    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
401
402    return invokeAggregator( aggregator, entries, resultFields, properties );
403    }
404
405  public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields )
406    {
407    return invokeAggregator( aggregator, null, argumentsArray, resultFields );
408    }
409
410  public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
411    {
412    return invokeAggregator( aggregator, null, argumentsArray, resultFields, properties );
413    }
414
415  public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields )
416    {
417    return invokeAggregator( aggregator, group, argumentsArray, resultFields, Collections.emptyMap() );
418    }
419
420  public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
421    {
422    FlowProcess flowProcess = new TestFlowProcess( properties );
423    ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
424
425    operationCall.setGroup( group );
426
427    aggregator.prepare( flowProcess, operationCall );
428
429    aggregator.start( flowProcess, operationCall );
430
431    for( TupleEntry arguments : argumentsArray )
432      {
433      operationCall.setArguments( arguments );
434      aggregator.aggregate( flowProcess, operationCall );
435      }
436
437    TupleListCollector collector = new TupleListCollector( resultFields, true );
438    operationCall.setOutputCollector( collector );
439
440    aggregator.complete( flowProcess, operationCall );
441
442    aggregator.cleanup( null, operationCall );
443
444    return collector;
445    }
446
447  public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields )
448    {
449    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
450
451    return invokeBuffer( buffer, entries, resultFields );
452    }
453
454  public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
455    {
456    TupleEntry[] entries = makeArgumentsArray( argumentsArray );
457
458    return invokeBuffer( buffer, entries, resultFields, properties );
459    }
460
461  public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields )
462    {
463    return invokeBuffer( buffer, null, argumentsArray, resultFields );
464    }
465
466  public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
467    {
468    return invokeBuffer( buffer, null, argumentsArray, resultFields, properties );
469    }
470
471  public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields )
472    {
473    return invokeBuffer( buffer, group, argumentsArray, resultFields, Collections.emptyMap() );
474    }
475
476  public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields, Map<Object, Object> properties )
477    {
478    FlowProcess flowProcess = new TestFlowProcess( properties );
479    ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
480
481    operationCall.setGroup( group );
482
483    buffer.prepare( flowProcess, operationCall );
484    TupleListCollector collector = new TupleListCollector( resultFields, true );
485    operationCall.setOutputCollector( collector );
486
487    operationCall.setArgumentsIterator( Arrays.asList( argumentsArray ).iterator() );
488
489    buffer.operate( flowProcess, operationCall );
490
491    buffer.cleanup( null, operationCall );
492
493    return collector;
494    }
495
496  private static TupleEntry[] makeArgumentsArray( Tuple[] argumentsArray )
497    {
498    TupleEntry[] entries = new TupleEntry[ argumentsArray.length ];
499
500    for( int i = 0; i < argumentsArray.length; i++ )
501      entries[ i ] = new TupleEntry( argumentsArray[ i ] );
502
503    return entries;
504    }
505
506  public static List<Tuple> getSourceAsList( Flow flow ) throws IOException
507    {
508    return asCollection( flow, (Tap) flow.getSourcesCollection().iterator().next(), Fields.ALL, new ArrayList<Tuple>() );
509    }
510
511  public static List<Tuple> getSinkAsList( Flow flow ) throws IOException
512    {
513    return asCollection( flow, flow.getSink(), Fields.ALL, new ArrayList<Tuple>() );
514    }
515
516  public static List<Tuple> asList( Flow flow, Tap tap ) throws IOException
517    {
518    return asCollection( flow, tap, Fields.ALL, new ArrayList<Tuple>() );
519    }
520
521  public static List<Tuple> asList( Flow flow, Tap tap, Fields selector ) throws IOException
522    {
523    return asCollection( flow, tap, selector, new ArrayList<Tuple>() );
524    }
525
526  public static Set<Tuple> asSet( Flow flow, Tap tap ) throws IOException
527    {
528    return asCollection( flow, tap, Fields.ALL, new HashSet<Tuple>() );
529    }
530
531  public static Set<Tuple> asSet( Flow flow, Tap tap, Fields selector ) throws IOException
532    {
533    return asCollection( flow, tap, selector, new HashSet<Tuple>() );
534    }
535
536  public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, C collection ) throws IOException
537    {
538    return asCollection( flow, tap, Fields.ALL, collection );
539    }
540
541  public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, Fields selector, C collection ) throws IOException
542    {
543    try (TupleEntryIterator iterator = flow.openTapForRead( tap ))
544      {
545      return asCollection( iterator, selector, collection );
546      }
547    }
548
549  public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, C result )
550    {
551    while( iterator.hasNext() )
552      result.add( iterator.next().getTupleCopy() );
553
554    return result;
555    }
556
557  public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, Fields selector, C result )
558    {
559    while( iterator.hasNext() )
560      result.add( iterator.next().selectTupleCopy( selector ) );
561
562    return result;
563    }
564  }