001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading;
022    
023    import java.io.IOException;
024    import java.io.Serializable;
025    import java.util.ArrayList;
026    import java.util.Arrays;
027    import java.util.Collection;
028    import java.util.HashSet;
029    import java.util.List;
030    import java.util.Set;
031    import java.util.regex.Pattern;
032    
033    import cascading.flow.Flow;
034    import cascading.flow.FlowProcess;
035    import cascading.operation.Aggregator;
036    import cascading.operation.Buffer;
037    import cascading.operation.ConcreteCall;
038    import cascading.operation.Filter;
039    import cascading.operation.Function;
040    import cascading.tap.Tap;
041    import cascading.tuple.Fields;
042    import cascading.tuple.Tuple;
043    import cascading.tuple.TupleEntry;
044    import cascading.tuple.TupleEntryIterator;
045    import cascading.tuple.TupleListCollector;
046    import junit.framework.TestCase;
047    
048    /**
049     * Class CascadingTestCase is the base class for all Cascading tests.
050     * <p/>
051     * It included a few helpful utility methods for testing Cascading applications.
052     */
053    public class CascadingTestCase extends TestCase implements Serializable
054      {
055      public CascadingTestCase()
056        {
057        }
058    
059      public CascadingTestCase( String name )
060        {
061        super( name );
062        }
063    
064      public static void validateLength( Flow flow, int length ) throws IOException
065        {
066        validateLength( flow, length, -1 );
067        }
068    
069      public static void validateLength( Flow flow, int length, String name ) throws IOException
070        {
071        validateLength( flow, length, -1, null, name );
072        }
073    
074      public static void validateLength( Flow flow, int length, int size ) throws IOException
075        {
076        validateLength( flow, length, size, null, null );
077        }
078    
079      public static void validateLength( Flow flow, int length, int size, Pattern regex ) throws IOException
080        {
081        validateLength( flow, length, size, regex, null );
082        }
083    
084      public static void validateLength( Flow flow, int length, Pattern regex, String name ) throws IOException
085        {
086        validateLength( flow, length, -1, regex, name );
087        }
088    
089      public static void validateLength( Flow flow, int length, int size, Pattern regex, String name ) throws IOException
090        {
091        TupleEntryIterator iterator = name == null ? flow.openSink() : flow.openSink( name );
092        validateLength( iterator, length, size, regex );
093        }
094    
095      public static void validateLength( TupleEntryIterator iterator, int length )
096        {
097        validateLength( iterator, length, -1, null );
098        }
099    
100      public static void validateLength( TupleEntryIterator iterator, int length, int size )
101        {
102        validateLength( iterator, length, size, null );
103        }
104    
105      public static void validateLength( TupleEntryIterator iterator, int length, Pattern regex )
106        {
107        validateLength( iterator, length, -1, regex );
108        }
109    
110      public static void validateLength( TupleEntryIterator iterator, int length, int size, Pattern regex )
111        {
112        int count = 0;
113    
114        while( iterator.hasNext() )
115          {
116          TupleEntry tupleEntry = iterator.next();
117    
118          if( size != -1 )
119            assertEquals( "wrong number of elements", size, tupleEntry.size() );
120    
121          if( regex != null )
122            assertTrue( "regex: " + regex + " does not match: " + tupleEntry.getTuple().toString(), regex.matcher( tupleEntry.getTuple().toString() ).matches() );
123    
124          count++;
125          }
126    
127        try
128          {
129          iterator.close();
130          }
131        catch( IOException exception )
132          {
133          throw new RuntimeException( exception );
134          }
135    
136        assertEquals( "wrong number of lines", length, count );
137        }
138    
139      public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields )
140        {
141        return invokeFunction( function, new TupleEntry( arguments ), resultFields );
142        }
143    
144      public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields )
145        {
146        ConcreteCall operationCall = new ConcreteCall( arguments.getFields() );
147        TupleListCollector collector = new TupleListCollector( resultFields, true );
148    
149        operationCall.setArguments( arguments );
150        operationCall.setOutputCollector( collector );
151    
152        function.prepare( FlowProcess.NULL, operationCall );
153        function.operate( FlowProcess.NULL, operationCall );
154        function.cleanup( FlowProcess.NULL, operationCall );
155    
156        return collector;
157        }
158    
159      public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields )
160        {
161        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
162    
163        return invokeFunction( function, entries, resultFields );
164        }
165    
166      public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields )
167        {
168        ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
169        TupleListCollector collector = new TupleListCollector( resultFields, true );
170    
171        function.prepare( FlowProcess.NULL, operationCall );
172        operationCall.setOutputCollector( collector );
173    
174        for( TupleEntry arguments : argumentsArray )
175          {
176          operationCall.setArguments( arguments );
177          function.operate( FlowProcess.NULL, operationCall );
178          }
179    
180        function.cleanup( FlowProcess.NULL, operationCall );
181    
182        return collector;
183        }
184    
185      public static boolean invokeFilter( Filter filter, Tuple arguments )
186        {
187        return invokeFilter( filter, new TupleEntry( arguments ) );
188        }
189    
190      public static boolean invokeFilter( Filter filter, TupleEntry arguments )
191        {
192        ConcreteCall operationCall = new ConcreteCall( arguments.getFields() );
193    
194        operationCall.setArguments( arguments );
195    
196        filter.prepare( FlowProcess.NULL, operationCall );
197    
198        boolean isRemove = filter.isRemove( FlowProcess.NULL, operationCall );
199    
200        filter.cleanup( FlowProcess.NULL, operationCall );
201    
202        return isRemove;
203        }
204    
205      public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray )
206        {
207        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
208    
209        return invokeFilter( filter, entries );
210        }
211    
212      public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray )
213        {
214        ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
215    
216        filter.prepare( FlowProcess.NULL, operationCall );
217    
218        boolean[] results = new boolean[ argumentsArray.length ];
219    
220        for( int i = 0; i < argumentsArray.length; i++ )
221          {
222          operationCall.setArguments( argumentsArray[ i ] );
223    
224          results[ i ] = filter.isRemove( FlowProcess.NULL, operationCall );
225          }
226    
227        filter.flush( FlowProcess.NULL, operationCall );
228        filter.cleanup( FlowProcess.NULL, operationCall );
229    
230        return results;
231        }
232    
233      public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields )
234        {
235        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
236    
237        return invokeAggregator( aggregator, entries, resultFields );
238        }
239    
240      public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields )
241        {
242        return invokeAggregator( aggregator, null, argumentsArray, resultFields );
243        }
244    
245      public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields )
246        {
247        ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
248    
249        operationCall.setGroup( group );
250    
251        aggregator.prepare( FlowProcess.NULL, operationCall );
252    
253        aggregator.start( FlowProcess.NULL, operationCall );
254    
255        for( TupleEntry arguments : argumentsArray )
256          {
257          operationCall.setArguments( arguments );
258          aggregator.aggregate( FlowProcess.NULL, operationCall );
259          }
260    
261        TupleListCollector collector = new TupleListCollector( resultFields, true );
262        operationCall.setOutputCollector( collector );
263    
264        aggregator.complete( FlowProcess.NULL, operationCall );
265    
266        aggregator.cleanup( null, operationCall );
267    
268        return collector;
269        }
270    
271      public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields )
272        {
273        TupleEntry[] entries = makeArgumentsArray( argumentsArray );
274    
275        return invokeBuffer( buffer, entries, resultFields );
276        }
277    
278      public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields )
279        {
280        return invokeBuffer( buffer, null, argumentsArray, resultFields );
281        }
282    
283      public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields )
284        {
285        ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() );
286    
287        operationCall.setGroup( group );
288    
289        buffer.prepare( FlowProcess.NULL, operationCall );
290        TupleListCollector collector = new TupleListCollector( resultFields, true );
291        operationCall.setOutputCollector( collector );
292    
293        operationCall.setArgumentsIterator( Arrays.asList( argumentsArray ).iterator() );
294    
295        buffer.operate( FlowProcess.NULL, operationCall );
296    
297        buffer.cleanup( null, operationCall );
298    
299        return collector;
300        }
301    
302      private static TupleEntry[] makeArgumentsArray( Tuple[] argumentsArray )
303        {
304        TupleEntry[] entries = new TupleEntry[ argumentsArray.length ];
305    
306        for( int i = 0; i < argumentsArray.length; i++ )
307          entries[ i ] = new TupleEntry( argumentsArray[ i ] );
308    
309        return entries;
310        }
311    
312      public static List<Tuple> getSourceAsList( Flow flow ) throws IOException
313        {
314        return asCollection( flow, (Tap) flow.getSourcesCollection().iterator().next(), Fields.ALL, new ArrayList<Tuple>() );
315        }
316    
317      public static List<Tuple> getSinkAsList( Flow flow ) throws IOException
318        {
319        return asCollection( flow, flow.getSink(), Fields.ALL, new ArrayList<Tuple>() );
320        }
321    
322      public static List<Tuple> asList( Flow flow, Tap tap ) throws IOException
323        {
324        return asCollection( flow, tap, Fields.ALL, new ArrayList<Tuple>() );
325        }
326    
327      public static List<Tuple> asList( Flow flow, Tap tap, Fields selector ) throws IOException
328        {
329        return asCollection( flow, tap, selector, new ArrayList<Tuple>() );
330        }
331    
332      public static Set<Tuple> asSet( Flow flow, Tap tap ) throws IOException
333        {
334        return asCollection( flow, tap, Fields.ALL, new HashSet<Tuple>() );
335        }
336    
337      public static Set<Tuple> asSet( Flow flow, Tap tap, Fields selector ) throws IOException
338        {
339        return asCollection( flow, tap, selector, new HashSet<Tuple>() );
340        }
341    
342      public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, C collection ) throws IOException
343        {
344        return asCollection( flow, tap, Fields.ALL, collection );
345        }
346    
347      public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, Fields selector, C collection ) throws IOException
348        {
349        TupleEntryIterator iterator = flow.openTapForRead( tap );
350    
351        try
352          {
353          return asCollection( iterator, selector, collection );
354          }
355        finally
356          {
357          iterator.close();
358          }
359        }
360    
361      public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, C result )
362        {
363        while( iterator.hasNext() )
364          result.add( iterator.next().getTupleCopy() );
365    
366        return result;
367        }
368    
369      public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, Fields selector, C result )
370        {
371        while( iterator.hasNext() )
372          result.add( iterator.next().selectTupleCopy( selector ) );
373    
374        return result;
375        }
376      }