001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow; 022 023 import java.util.ArrayList; 024 import java.util.Arrays; 025 import java.util.Collection; 026 import java.util.HashMap; 027 import java.util.List; 028 import java.util.Map; 029 030 import cascading.operation.AssertionLevel; 031 import cascading.operation.DebugLevel; 032 import cascading.pipe.Checkpoint; 033 import cascading.pipe.Pipe; 034 import cascading.property.UnitOfWorkDef; 035 import cascading.tap.Tap; 036 037 /** 038 * Class FlowDef is a fluent interface for defining a {@link Flow}. 039 * <p/> 040 * This allows for ad-hoc building of Flow data and meta-data like tags. 041 * <p/> 042 * Instead of calling one of the {@link FlowConnector} connect methods, {@link FlowConnector#connect(FlowDef)} 043 * can be called. 044 */ 045 public class FlowDef extends UnitOfWorkDef<FlowDef> 046 { 047 protected Map<String, Tap> sources = new HashMap<String, Tap>(); 048 protected Map<String, Tap> sinks = new HashMap<String, Tap>(); 049 protected Map<String, Tap> traps = new HashMap<String, Tap>(); 050 protected Map<String, Tap> checkpoints = new HashMap<String, Tap>(); 051 052 protected List<String> classPath = new ArrayList<String>(); 053 protected List<Pipe> tails = new ArrayList<Pipe>(); 054 protected List<AssemblyPlanner> assemblyPlanners = new ArrayList<AssemblyPlanner>(); 055 056 protected AssertionLevel assertionLevel; 057 protected DebugLevel debugLevel; 058 059 protected String runID; 060 061 /** 062 * Creates a new instance of a FlowDef. 063 * 064 * @return a FlowDef 065 */ 066 public static FlowDef flowDef() 067 { 068 return new FlowDef(); 069 } 070 071 /** Constructor FlowDef creates a new FlowDef instance. */ 072 public FlowDef() 073 { 074 } 075 076 /** 077 * Method getAssemblyPlanners returns the current registered AssemblyPlanners. 078 * 079 * @return a List of AssemblyPlanner instances 080 */ 081 public List<AssemblyPlanner> getAssemblyPlanners() 082 { 083 return assemblyPlanners; 084 } 085 086 /** 087 * Method addAssemblyPlanner adds new AssemblyPlanner instances to be evaluated. 088 * 089 * @param assemblyPlanner of type AssemblyPlanner 090 * @return a FlowDef 091 */ 092 public FlowDef addAssemblyPlanner( AssemblyPlanner assemblyPlanner ) 093 { 094 assemblyPlanners.add( assemblyPlanner ); 095 096 return this; 097 } 098 099 /** 100 * Method getSources returns the sources of this FlowDef object. 101 * 102 * @return the sources (type Map<String, Tap>) of this FlowDef object. 103 */ 104 public Map<String, Tap> getSources() 105 { 106 return sources; 107 } 108 109 /** 110 * Method getSourcesCopy returns a copy of the sources Map. 111 * 112 * @return the sourcesCopy (type Map<String, Tap>) of this FlowDef object. 113 */ 114 public Map<String, Tap> getSourcesCopy() 115 { 116 return new HashMap<String, Tap>( sources ); 117 } 118 119 /** 120 * Method addSource adds a new named source {@link Tap} for use in the resulting {@link Flow}. 121 * 122 * @param name of String 123 * @param source of Tap 124 * @return FlowDef 125 */ 126 public FlowDef addSource( String name, Tap source ) 127 { 128 if( sources.containsKey( name ) ) 129 throw new IllegalArgumentException( "cannot add duplicate source: " + name ); 130 131 sources.put( name, source ); 132 return this; 133 } 134 135 /** 136 * Method addSource adds a new source {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}. 137 * <p/> 138 * If the given pipe is not a head pipe, it will be resolved. If more than one is found, an 139 * {@link IllegalArgumentException} will be thrown. 140 * 141 * @param pipe of Pipe 142 * @param source of Tap 143 * @return FlowDef 144 */ 145 public FlowDef addSource( Pipe pipe, Tap source ) 146 { 147 if( pipe == null ) 148 throw new IllegalArgumentException( "pipe may not be null" ); 149 150 Pipe[] heads = pipe.getHeads(); 151 152 if( heads.length != 1 ) 153 throw new IllegalArgumentException( "pipe has too many heads, found: " + Arrays.toString( Pipe.names( heads ) ) ); 154 155 addSource( heads[ 0 ].getName(), source ); 156 return this; 157 } 158 159 /** 160 * Method addSources adds a map of name and {@link Tap} pairs. 161 * 162 * @param sources of Map<String, Tap> 163 * @return FlowDef 164 */ 165 public FlowDef addSources( Map<String, Tap> sources ) 166 { 167 if( sources != null ) 168 { 169 for( Map.Entry<String, Tap> entry : sources.entrySet() ) 170 addSource( entry.getKey(), entry.getValue() ); 171 } 172 173 return this; 174 } 175 176 /** 177 * Method getSinks returns the sinks of this FlowDef object. 178 * 179 * @return the sinks (type Map<String, Tap>) of this FlowDef object. 180 */ 181 public Map<String, Tap> getSinks() 182 { 183 return sinks; 184 } 185 186 /** 187 * Method getSinksCopy returns a copy of the sink Map. 188 * 189 * @return the sinksCopy (type Map<String, Tap>) of this FlowDef object. 190 */ 191 public Map<String, Tap> getSinksCopy() 192 { 193 return new HashMap<String, Tap>( sinks ); 194 } 195 196 /** 197 * Method addSink adds a new named sink {@link Tap} for use in the resulting {@link Flow}. 198 * 199 * @param name of String 200 * @param sink of Tap 201 * @return FlowDef 202 */ 203 public FlowDef addSink( String name, Tap sink ) 204 { 205 if( sinks.containsKey( name ) ) 206 throw new IllegalArgumentException( "cannot add duplicate sink: " + name ); 207 208 sinks.put( name, sink ); 209 return this; 210 } 211 212 /** 213 * Method addSink adds a new sink {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}. 214 * 215 * @param tail of Pipe 216 * @param sink of Tap 217 * @return FlowDef 218 */ 219 public FlowDef addSink( Pipe tail, Tap sink ) 220 { 221 addSink( tail.getName(), sink ); 222 return this; 223 } 224 225 /** 226 * Method addTailSink adds the tail {@link Pipe} and sink {@link Tap} to this FlowDef. 227 * <p/> 228 * This is a convenience method for adding both a tail and sink simultaneously. There isn't a similar method 229 * for heads and sources as the head Pipe can always be derived. 230 * 231 * @param tail of Pipe 232 * @param sink of Tap 233 * @return FlowDef 234 */ 235 public FlowDef addTailSink( Pipe tail, Tap sink ) 236 { 237 addSink( tail.getName(), sink ); 238 addTail( tail ); 239 return this; 240 } 241 242 /** 243 * Method addSinks adds a Map of the named and {@link Tap} pairs. 244 * 245 * @param sinks of Map<String, Tap> 246 * @return FlowDef 247 */ 248 public FlowDef addSinks( Map<String, Tap> sinks ) 249 { 250 if( sinks != null ) 251 { 252 for( Map.Entry<String, Tap> entry : sinks.entrySet() ) 253 addSink( entry.getKey(), entry.getValue() ); 254 } 255 256 return this; 257 } 258 259 /** 260 * Method getTraps returns the traps of this FlowDef object. 261 * 262 * @return the traps (type Map<String, Tap>) of this FlowDef object. 263 */ 264 public Map<String, Tap> getTraps() 265 { 266 return traps; 267 } 268 269 /** 270 * Method getTrapsCopy returns a copy of the trap Map. 271 * 272 * @return the trapsCopy (type Map<String, Tap>) of this FlowDef object. 273 */ 274 public Map<String, Tap> getTrapsCopy() 275 { 276 return new HashMap<String, Tap>( traps ); 277 } 278 279 /** 280 * Method addTrap adds a new named trap {@link Tap} for use in the resulting {@link Flow}. 281 * 282 * @param name of String 283 * @param trap of Tap 284 * @return FlowDef 285 */ 286 public FlowDef addTrap( String name, Tap trap ) 287 { 288 if( traps.containsKey( name ) ) 289 throw new IllegalArgumentException( "cannot add duplicate trap: " + name ); 290 291 traps.put( name, trap ); 292 return this; 293 } 294 295 /** 296 * Method addTrap adds a new trap {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}. 297 * 298 * @param pipe of Pipe 299 * @param trap of Tap 300 * @return FlowDef 301 */ 302 public FlowDef addTrap( Pipe pipe, Tap trap ) 303 { 304 addTrap( pipe.getName(), trap ); 305 return this; 306 } 307 308 /** 309 * Method addTraps adds a Map of the names and {@link Tap} pairs. 310 * 311 * @param traps of Map<String, Tap> 312 * @return FlowDef 313 */ 314 public FlowDef addTraps( Map<String, Tap> traps ) 315 { 316 if( traps != null ) 317 { 318 for( Map.Entry<String, Tap> entry : traps.entrySet() ) 319 addTrap( entry.getKey(), entry.getValue() ); 320 } 321 322 return this; 323 } 324 325 /** 326 * Method getCheckpoints returns the checkpoint taps of this FlowDef object. 327 * 328 * @return the checkpoints (type Map<String, Tap>) of this FlowDef object. 329 */ 330 public Map<String, Tap> getCheckpoints() 331 { 332 return checkpoints; 333 } 334 335 /** 336 * Method getCheckpointsCopy returns a copy of the checkpoint tap Map. 337 * 338 * @return the checkpointsCopy (type Map<String, Tap>) of this FlowDef object. 339 */ 340 public Map<String, Tap> getCheckpointsCopy() 341 { 342 return new HashMap<String, Tap>( checkpoints ); 343 } 344 345 /** 346 * Method addCheckpoint adds a new named checkpoint {@link Tap} for use in the resulting {@link Flow}. 347 * 348 * @param name of String 349 * @param checkpoint of Tap 350 * @return FlowDef 351 */ 352 public FlowDef addCheckpoint( String name, Tap checkpoint ) 353 { 354 if( checkpoints.containsKey( name ) ) 355 throw new IllegalArgumentException( "cannot add duplicate checkpoint: " + name ); 356 357 checkpoints.put( name, checkpoint ); 358 return this; 359 } 360 361 /** 362 * Method addCheckpoint adds a new checkpoint {@link Tap} named after the given {@link Checkpoint} for use in the resulting {@link Flow}. 363 * 364 * @param pipe of Pipe 365 * @param checkpoint of Tap 366 * @return FlowDef 367 */ 368 public FlowDef addCheckpoint( Checkpoint pipe, Tap checkpoint ) 369 { 370 addCheckpoint( pipe.getName(), checkpoint ); 371 return this; 372 } 373 374 /** 375 * Method addCheckpoints adds a Map of the names and {@link Tap} pairs. 376 * 377 * @param checkpoints of Map<String, Tap> 378 * @return FlowDef 379 */ 380 public FlowDef addCheckpoints( Map<String, Tap> checkpoints ) 381 { 382 if( checkpoints != null ) 383 { 384 for( Map.Entry<String, Tap> entry : checkpoints.entrySet() ) 385 addCheckpoint( entry.getKey(), entry.getValue() ); 386 } 387 388 return this; 389 } 390 391 /** 392 * Method getTails returns all the current pipe assembly tails the FlowDef holds. 393 * 394 * @return the tails (type List<Pipe>) of this FlowDef object. 395 */ 396 public List<Pipe> getTails() 397 { 398 return tails; 399 } 400 401 /** 402 * Method getTailsArray returns all the current pipe assembly tails the FlowDef holds. 403 * 404 * @return the tailsArray (type Pipe[]) of this FlowDef object. 405 */ 406 public Pipe[] getTailsArray() 407 { 408 return tails.toArray( new Pipe[ tails.size() ] ); 409 } 410 411 /** 412 * Method addTail adds a new {@link Pipe} to this FlowDef that represents a tail in a pipe assembly. 413 * <p/> 414 * Be sure to add a sink tap that has the same name as this tail. 415 * 416 * @param tail of Pipe 417 * @return FlowDef 418 */ 419 public FlowDef addTail( Pipe tail ) 420 { 421 if( tail != null ) 422 this.tails.add( tail ); 423 424 return this; 425 } 426 427 /** 428 * Method addTails adds a Collection of tails. 429 * 430 * @param tails of Collection<Pipe> 431 * @return FlowDef 432 */ 433 public FlowDef addTails( Collection<Pipe> tails ) 434 { 435 for( Pipe tail : tails ) 436 addTail( tail ); 437 438 return this; 439 } 440 441 /** 442 * Method addTails adds an array of tails. 443 * 444 * @param tails of Pipe... 445 * @return FlowDef 446 */ 447 public FlowDef addTails( Pipe... tails ) 448 { 449 for( Pipe tail : tails ) 450 addTail( tail ); 451 452 return this; 453 } 454 455 public FlowDef setAssertionLevel( AssertionLevel assertionLevel ) 456 { 457 this.assertionLevel = assertionLevel; 458 459 return this; 460 } 461 462 public AssertionLevel getAssertionLevel() 463 { 464 return assertionLevel; 465 } 466 467 public FlowDef setDebugLevel( DebugLevel debugLevel ) 468 { 469 this.debugLevel = debugLevel; 470 471 return this; 472 } 473 474 public DebugLevel getDebugLevel() 475 { 476 return debugLevel; 477 } 478 479 /** 480 * Method setRunID sets the checkpoint run or execution ID to be used to find prior failed runs against 481 * this runID. 482 * <p/> 483 * When given, and a {@link Flow} fails to execute, a subsequent attempt to run the same Flow with the same 484 * runID will allow the Flow instance to start where it left off. 485 * <p/> 486 * Not all planners support this feature. 487 * <p/> 488 * A Flow name is required when using a runID. 489 * 490 * @param runID of type String 491 * @return FlowDef 492 */ 493 public FlowDef setRunID( String runID ) 494 { 495 if( runID != null && runID.isEmpty() ) 496 return this; 497 498 this.runID = runID; 499 500 return this; 501 } 502 503 public String getRunID() 504 { 505 return runID; 506 } 507 508 public List<String> getClassPath() 509 { 510 return classPath; 511 } 512 513 /** 514 * Adds each given artifact to the classpath the assembly will execute under allowing 515 * {@link cascading.pipe.Operator}s to dynamically load classes and resources from a {@link ClassLoader}. 516 * 517 * @param artifact a jar or other file String path 518 * @return FlowDef 519 */ 520 public FlowDef addToClassPath( String artifact ) 521 { 522 if( artifact == null || artifact.isEmpty() ) 523 return this; 524 525 classPath.add( artifact ); 526 527 return this; 528 } 529 }