001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading; 022 023 import java.io.IOException; 024 import java.io.Serializable; 025 import java.util.ArrayList; 026 import java.util.Arrays; 027 import java.util.Collection; 028 import java.util.HashSet; 029 import java.util.List; 030 import java.util.Set; 031 import java.util.regex.Pattern; 032 033 import cascading.flow.Flow; 034 import cascading.flow.FlowProcess; 035 import cascading.operation.Aggregator; 036 import cascading.operation.Buffer; 037 import cascading.operation.ConcreteCall; 038 import cascading.operation.Filter; 039 import cascading.operation.Function; 040 import cascading.tap.Tap; 041 import cascading.tuple.Fields; 042 import cascading.tuple.Tuple; 043 import cascading.tuple.TupleEntry; 044 import cascading.tuple.TupleEntryIterator; 045 import cascading.tuple.TupleListCollector; 046 import junit.framework.TestCase; 047 048 /** 049 * Class CascadingTestCase is the base class for all Cascading tests. 050 * <p/> 051 * It included a few helpful utility methods for testing Cascading applications. 052 */ 053 public class CascadingTestCase extends TestCase implements Serializable 054 { 055 public CascadingTestCase() 056 { 057 } 058 059 public CascadingTestCase( String name ) 060 { 061 super( name ); 062 } 063 064 public static void validateLength( Flow flow, int length ) throws IOException 065 { 066 validateLength( flow, length, -1 ); 067 } 068 069 public static void validateLength( Flow flow, int length, String name ) throws IOException 070 { 071 validateLength( flow, length, -1, null, name ); 072 } 073 074 public static void validateLength( Flow flow, int length, int size ) throws IOException 075 { 076 validateLength( flow, length, size, null, null ); 077 } 078 079 public static void validateLength( Flow flow, int length, int size, Pattern regex ) throws IOException 080 { 081 validateLength( flow, length, size, regex, null ); 082 } 083 084 public static void validateLength( Flow flow, int length, Pattern regex, String name ) throws IOException 085 { 086 validateLength( flow, length, -1, regex, name ); 087 } 088 089 public static void validateLength( Flow flow, int length, int size, Pattern regex, String name ) throws IOException 090 { 091 TupleEntryIterator iterator = name == null ? flow.openSink() : flow.openSink( name ); 092 validateLength( iterator, length, size, regex ); 093 } 094 095 public static void validateLength( TupleEntryIterator iterator, int length ) 096 { 097 validateLength( iterator, length, -1, null ); 098 } 099 100 public static void validateLength( TupleEntryIterator iterator, int length, int size ) 101 { 102 validateLength( iterator, length, size, null ); 103 } 104 105 public static void validateLength( TupleEntryIterator iterator, int length, Pattern regex ) 106 { 107 validateLength( iterator, length, -1, regex ); 108 } 109 110 public static void validateLength( TupleEntryIterator iterator, int length, int size, Pattern regex ) 111 { 112 int count = 0; 113 114 while( iterator.hasNext() ) 115 { 116 TupleEntry tupleEntry = iterator.next(); 117 118 if( size != -1 ) 119 assertEquals( "wrong number of elements", size, tupleEntry.size() ); 120 121 if( regex != null ) 122 assertTrue( "regex: " + regex + " does not match: " + tupleEntry.getTuple().toString(), regex.matcher( tupleEntry.getTuple().toString() ).matches() ); 123 124 count++; 125 } 126 127 try 128 { 129 iterator.close(); 130 } 131 catch( IOException exception ) 132 { 133 throw new RuntimeException( exception ); 134 } 135 136 assertEquals( "wrong number of lines", length, count ); 137 } 138 139 public static TupleListCollector invokeFunction( Function function, Tuple arguments, Fields resultFields ) 140 { 141 return invokeFunction( function, new TupleEntry( arguments ), resultFields ); 142 } 143 144 public static TupleListCollector invokeFunction( Function function, TupleEntry arguments, Fields resultFields ) 145 { 146 ConcreteCall operationCall = new ConcreteCall( arguments.getFields() ); 147 TupleListCollector collector = new TupleListCollector( resultFields, true ); 148 149 operationCall.setArguments( arguments ); 150 operationCall.setOutputCollector( collector ); 151 152 function.prepare( FlowProcess.NULL, operationCall ); 153 function.operate( FlowProcess.NULL, operationCall ); 154 function.cleanup( FlowProcess.NULL, operationCall ); 155 156 return collector; 157 } 158 159 public static TupleListCollector invokeFunction( Function function, Tuple[] argumentsArray, Fields resultFields ) 160 { 161 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 162 163 return invokeFunction( function, entries, resultFields ); 164 } 165 166 public static TupleListCollector invokeFunction( Function function, TupleEntry[] argumentsArray, Fields resultFields ) 167 { 168 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 169 TupleListCollector collector = new TupleListCollector( resultFields, true ); 170 171 function.prepare( FlowProcess.NULL, operationCall ); 172 operationCall.setOutputCollector( collector ); 173 174 for( TupleEntry arguments : argumentsArray ) 175 { 176 operationCall.setArguments( arguments ); 177 function.operate( FlowProcess.NULL, operationCall ); 178 } 179 180 function.cleanup( FlowProcess.NULL, operationCall ); 181 182 return collector; 183 } 184 185 public static boolean invokeFilter( Filter filter, Tuple arguments ) 186 { 187 return invokeFilter( filter, new TupleEntry( arguments ) ); 188 } 189 190 public static boolean invokeFilter( Filter filter, TupleEntry arguments ) 191 { 192 ConcreteCall operationCall = new ConcreteCall( arguments.getFields() ); 193 194 operationCall.setArguments( arguments ); 195 196 filter.prepare( FlowProcess.NULL, operationCall ); 197 198 boolean isRemove = filter.isRemove( FlowProcess.NULL, operationCall ); 199 200 filter.cleanup( FlowProcess.NULL, operationCall ); 201 202 return isRemove; 203 } 204 205 public static boolean[] invokeFilter( Filter filter, Tuple[] argumentsArray ) 206 { 207 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 208 209 return invokeFilter( filter, entries ); 210 } 211 212 public static boolean[] invokeFilter( Filter filter, TupleEntry[] argumentsArray ) 213 { 214 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 215 216 filter.prepare( FlowProcess.NULL, operationCall ); 217 218 boolean[] results = new boolean[ argumentsArray.length ]; 219 220 for( int i = 0; i < argumentsArray.length; i++ ) 221 { 222 operationCall.setArguments( argumentsArray[ i ] ); 223 224 results[ i ] = filter.isRemove( FlowProcess.NULL, operationCall ); 225 } 226 227 filter.flush( FlowProcess.NULL, operationCall ); 228 filter.cleanup( FlowProcess.NULL, operationCall ); 229 230 return results; 231 } 232 233 public static TupleListCollector invokeAggregator( Aggregator aggregator, Tuple[] argumentsArray, Fields resultFields ) 234 { 235 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 236 237 return invokeAggregator( aggregator, entries, resultFields ); 238 } 239 240 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry[] argumentsArray, Fields resultFields ) 241 { 242 return invokeAggregator( aggregator, null, argumentsArray, resultFields ); 243 } 244 245 public static TupleListCollector invokeAggregator( Aggregator aggregator, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields ) 246 { 247 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 248 249 operationCall.setGroup( group ); 250 251 aggregator.prepare( FlowProcess.NULL, operationCall ); 252 253 aggregator.start( FlowProcess.NULL, operationCall ); 254 255 for( TupleEntry arguments : argumentsArray ) 256 { 257 operationCall.setArguments( arguments ); 258 aggregator.aggregate( FlowProcess.NULL, operationCall ); 259 } 260 261 TupleListCollector collector = new TupleListCollector( resultFields, true ); 262 operationCall.setOutputCollector( collector ); 263 264 aggregator.complete( FlowProcess.NULL, operationCall ); 265 266 aggregator.cleanup( null, operationCall ); 267 268 return collector; 269 } 270 271 public static TupleListCollector invokeBuffer( Buffer buffer, Tuple[] argumentsArray, Fields resultFields ) 272 { 273 TupleEntry[] entries = makeArgumentsArray( argumentsArray ); 274 275 return invokeBuffer( buffer, entries, resultFields ); 276 } 277 278 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry[] argumentsArray, Fields resultFields ) 279 { 280 return invokeBuffer( buffer, null, argumentsArray, resultFields ); 281 } 282 283 public static TupleListCollector invokeBuffer( Buffer buffer, TupleEntry group, TupleEntry[] argumentsArray, Fields resultFields ) 284 { 285 ConcreteCall operationCall = new ConcreteCall( argumentsArray[ 0 ].getFields() ); 286 287 operationCall.setGroup( group ); 288 289 buffer.prepare( FlowProcess.NULL, operationCall ); 290 TupleListCollector collector = new TupleListCollector( resultFields, true ); 291 operationCall.setOutputCollector( collector ); 292 293 operationCall.setArgumentsIterator( Arrays.asList( argumentsArray ).iterator() ); 294 295 buffer.operate( FlowProcess.NULL, operationCall ); 296 297 buffer.cleanup( null, operationCall ); 298 299 return collector; 300 } 301 302 private static TupleEntry[] makeArgumentsArray( Tuple[] argumentsArray ) 303 { 304 TupleEntry[] entries = new TupleEntry[ argumentsArray.length ]; 305 306 for( int i = 0; i < argumentsArray.length; i++ ) 307 entries[ i ] = new TupleEntry( argumentsArray[ i ] ); 308 309 return entries; 310 } 311 312 public static List<Tuple> getSourceAsList( Flow flow ) throws IOException 313 { 314 return asCollection( flow, (Tap) flow.getSourcesCollection().iterator().next(), Fields.ALL, new ArrayList<Tuple>() ); 315 } 316 317 public static List<Tuple> getSinkAsList( Flow flow ) throws IOException 318 { 319 return asCollection( flow, flow.getSink(), Fields.ALL, new ArrayList<Tuple>() ); 320 } 321 322 public static List<Tuple> asList( Flow flow, Tap tap ) throws IOException 323 { 324 return asCollection( flow, tap, Fields.ALL, new ArrayList<Tuple>() ); 325 } 326 327 public static List<Tuple> asList( Flow flow, Tap tap, Fields selector ) throws IOException 328 { 329 return asCollection( flow, tap, selector, new ArrayList<Tuple>() ); 330 } 331 332 public static Set<Tuple> asSet( Flow flow, Tap tap ) throws IOException 333 { 334 return asCollection( flow, tap, Fields.ALL, new HashSet<Tuple>() ); 335 } 336 337 public static Set<Tuple> asSet( Flow flow, Tap tap, Fields selector ) throws IOException 338 { 339 return asCollection( flow, tap, selector, new HashSet<Tuple>() ); 340 } 341 342 public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, C collection ) throws IOException 343 { 344 return asCollection( flow, tap, Fields.ALL, collection ); 345 } 346 347 public static <C extends Collection<Tuple>> C asCollection( Flow flow, Tap tap, Fields selector, C collection ) throws IOException 348 { 349 TupleEntryIterator iterator = flow.openTapForRead( tap ); 350 351 try 352 { 353 return asCollection( iterator, selector, collection ); 354 } 355 finally 356 { 357 iterator.close(); 358 } 359 } 360 361 public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, C result ) 362 { 363 while( iterator.hasNext() ) 364 result.add( iterator.next().getTupleCopy() ); 365 366 return result; 367 } 368 369 public static <C extends Collection<Tuple>> C asCollection( TupleEntryIterator iterator, Fields selector, C result ) 370 { 371 while( iterator.hasNext() ) 372 result.add( iterator.next().selectTupleCopy( selector ) ); 373 374 return result; 375 } 376 }