001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow; 022 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.LinkedHashMap; 028import java.util.List; 029import java.util.Map; 030 031import cascading.operation.AssertionLevel; 032import cascading.operation.DebugLevel; 033import cascading.pipe.Checkpoint; 034import cascading.pipe.Pipe; 035import cascading.property.UnitOfWorkDef; 036import cascading.tap.Tap; 037import cascading.util.Util; 038 039/** 040 * Class FlowDef is a fluent interface for defining a {@link Flow}. 041 * <p/> 042 * This allows for ad-hoc building of Flow data and meta-data, like tags. 043 * <p/> 044 * Instead of calling one of the {@link FlowConnector} connect methods, {@link FlowConnector#connect(FlowDef)} 045 * can be called. 046 */ 047public class FlowDef extends UnitOfWorkDef<FlowDef> 048 { 049 protected Map<String, Tap> sources = new HashMap<String, Tap>(); 050 protected Map<String, Tap> sinks = new HashMap<String, Tap>(); 051 protected Map<String, Tap> traps = new HashMap<String, Tap>(); 052 protected Map<String, Tap> checkpoints = new HashMap<String, Tap>(); 053 054 protected List<String> classPath = new ArrayList<String>(); 055 protected List<Pipe> tails = new ArrayList<Pipe>(); 056 protected List<AssemblyPlanner> assemblyPlanners = new ArrayList<AssemblyPlanner>(); 057 058 protected HashMap<String, String> flowDescriptor = new LinkedHashMap<String, String>(); 059 060 protected AssertionLevel assertionLevel; 061 protected DebugLevel debugLevel; 062 063 protected String runID; 064 065 /** 066 * Creates a new instance of a FlowDef. 067 * 068 * @return a FlowDef 069 */ 070 public static FlowDef flowDef() 071 { 072 return new FlowDef(); 073 } 074 075 /** Constructor FlowDef creates a new FlowDef instance. */ 076 public FlowDef() 077 { 078 } 079 080 protected FlowDef( FlowDef flowDef, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints ) 081 { 082 super( flowDef ); 083 084 this.sources = sources; 085 this.sinks = sinks; 086 this.traps = traps; 087 this.checkpoints = checkpoints; 088 089 this.classPath = flowDef.classPath; 090 this.tails = flowDef.tails; 091 this.assemblyPlanners = flowDef.assemblyPlanners; 092 this.flowDescriptor = flowDef.flowDescriptor; 093 this.assertionLevel = flowDef.assertionLevel; 094 this.debugLevel = flowDef.debugLevel; 095 this.runID = flowDef.runID; 096 } 097 098 /** 099 * Method getAssemblyPlanners returns the current registered AssemblyPlanners. 100 * 101 * @return a List of AssemblyPlanner instances 102 */ 103 public List<AssemblyPlanner> getAssemblyPlanners() 104 { 105 return assemblyPlanners; 106 } 107 108 /** 109 * Method addAssemblyPlanner adds new AssemblyPlanner instances to be evaluated. 110 * 111 * @param assemblyPlanner of type AssemblyPlanner 112 * @return a FlowDef 113 */ 114 public FlowDef addAssemblyPlanner( AssemblyPlanner assemblyPlanner ) 115 { 116 assemblyPlanners.add( assemblyPlanner ); 117 addDescriptions( assemblyPlanner.getFlowDescriptor() ); 118 119 return this; 120 } 121 122 /** 123 * Method getSources returns the sources of this FlowDef object. 124 * 125 * @return the sources (type Map<String, Tap>) of this FlowDef object. 126 */ 127 public Map<String, Tap> getSources() 128 { 129 return sources; 130 } 131 132 /** 133 * Method getSourcesCopy returns a copy of the sources Map. 134 * 135 * @return the sourcesCopy (type Map<String, Tap>) of this FlowDef object. 136 */ 137 public Map<String, Tap> getSourcesCopy() 138 { 139 return new HashMap<String, Tap>( sources ); 140 } 141 142 /** 143 * Method getFlowDescriptor returns the flowDescriptor of this FlowDef. 144 * 145 * @return the flowDescriptor of this FlowDef object. 146 */ 147 public HashMap<String, String> getFlowDescriptor() 148 { 149 return flowDescriptor; 150 } 151 152 /** 153 * Method addSource adds a new named source {@link Tap} for use in the resulting {@link Flow}. 154 * 155 * @param name of String 156 * @param source of Tap 157 * @return FlowDef 158 */ 159 public FlowDef addSource( String name, Tap source ) 160 { 161 if( sources.containsKey( name ) ) 162 throw new IllegalArgumentException( "cannot add duplicate source: " + name ); 163 164 sources.put( name, source ); 165 return this; 166 } 167 168 /** 169 * Method addSource adds a new source {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}. 170 * <p/> 171 * If the given pipe is not a head pipe, it will be resolved. If more than one is found, an 172 * {@link IllegalArgumentException} will be thrown. 173 * 174 * @param pipe of Pipe 175 * @param source of Tap 176 * @return FlowDef 177 */ 178 public FlowDef addSource( Pipe pipe, Tap source ) 179 { 180 if( pipe == null ) 181 throw new IllegalArgumentException( "pipe may not be null" ); 182 183 Pipe[] heads = pipe.getHeads(); 184 185 if( heads.length != 1 ) 186 throw new IllegalArgumentException( "pipe has too many heads, found: " + Arrays.toString( Pipe.names( heads ) ) ); 187 188 addSource( heads[ 0 ].getName(), source ); 189 return this; 190 } 191 192 /** 193 * Method addSources adds a map of name and {@link Tap} pairs. 194 * 195 * @param sources of Map<String, Tap> 196 * @return FlowDef 197 */ 198 public FlowDef addSources( Map<String, Tap> sources ) 199 { 200 if( sources != null ) 201 { 202 for( Map.Entry<String, Tap> entry : sources.entrySet() ) 203 addSource( entry.getKey(), entry.getValue() ); 204 } 205 206 return this; 207 } 208 209 /** 210 * Method addDescription adds a user readable description to the flowDescriptor. 211 * <p/> 212 * This uses the {@link FlowDescriptors#DESCRIPTION} key. 213 */ 214 public FlowDef addDescription( String description ) 215 { 216 addDescription( FlowDescriptors.DESCRIPTION, description ); 217 218 return this; 219 } 220 221 /** 222 * Method addDescription adds a description to the flowDescriptor. 223 * <p/> 224 * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents. 225 * For known description types, see {@link FlowDescriptors}. 226 * <p/> 227 * If an existing key exists, it will be appended to the original value using 228 * {@link FlowDescriptors#VALUE_SEPARATOR}. 229 * 230 * @param key The key as a String. 231 * @param value The value as a String. 232 * @return FlowDef 233 */ 234 public FlowDef addDescription( String key, String value ) 235 { 236 if( Util.isEmpty( value ) ) // do nothing 237 return this; 238 239 if( flowDescriptor.containsKey( key ) ) 240 { 241 String original = flowDescriptor.get( key ); 242 243 if( !Util.isEmpty( original ) ) 244 value = original + FlowDescriptors.VALUE_SEPARATOR + value; 245 } 246 247 flowDescriptor.put( key, value ); 248 249 return this; 250 } 251 252 /** 253 * Method addProperties adds all properties in the given map in order to the flowDescriptor. If the given Map has 254 * an explicit order, it will be preserved. 255 * <p/> 256 * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents. 257 * For known description types, see {@link FlowDescriptors}. 258 * 259 * @param descriptions The properties to be added to the map. 260 * @return FlowDef 261 */ 262 public FlowDef addDescriptions( Map<String, String> descriptions ) 263 { 264 for( Map.Entry<String, String> entry : descriptions.entrySet() ) 265 addDescription( entry.getKey(), entry.getValue() ); 266 267 return this; 268 } 269 270 /** 271 * Method getSinks returns the sinks of this FlowDef object. 272 * 273 * @return the sinks (type Map<String, Tap>) of this FlowDef object. 274 */ 275 public Map<String, Tap> getSinks() 276 { 277 return sinks; 278 } 279 280 /** 281 * Method getSinksCopy returns a copy of the sink Map. 282 * 283 * @return the sinksCopy (type Map<String, Tap>) of this FlowDef object. 284 */ 285 public Map<String, Tap> getSinksCopy() 286 { 287 return new HashMap<String, Tap>( sinks ); 288 } 289 290 /** 291 * Method addSink adds a new named sink {@link Tap} for use in the resulting {@link Flow}. 292 * 293 * @param name of String 294 * @param sink of Tap 295 * @return FlowDef 296 */ 297 public FlowDef addSink( String name, Tap sink ) 298 { 299 if( sinks.containsKey( name ) ) 300 throw new IllegalArgumentException( "cannot add duplicate sink: " + name ); 301 302 sinks.put( name, sink ); 303 return this; 304 } 305 306 /** 307 * Method addSink adds a new sink {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}. 308 * 309 * @param tail of Pipe 310 * @param sink of Tap 311 * @return FlowDef 312 */ 313 public FlowDef addSink( Pipe tail, Tap sink ) 314 { 315 addSink( tail.getName(), sink ); 316 return this; 317 } 318 319 /** 320 * Method addTailSink adds the tail {@link Pipe} and sink {@link Tap} to this FlowDef. 321 * <p/> 322 * This is a convenience method for adding both a tail and sink simultaneously. There isn't a similar method 323 * for heads and sources as the head Pipe can always be derived. 324 * 325 * @param tail of Pipe 326 * @param sink of Tap 327 * @return FlowDef 328 */ 329 public FlowDef addTailSink( Pipe tail, Tap sink ) 330 { 331 addSink( tail.getName(), sink ); 332 addTail( tail ); 333 return this; 334 } 335 336 /** 337 * Method addSinks adds a Map of the named and {@link Tap} pairs. 338 * 339 * @param sinks of Map<String, Tap> 340 * @return FlowDef 341 */ 342 public FlowDef addSinks( Map<String, Tap> sinks ) 343 { 344 if( sinks != null ) 345 { 346 for( Map.Entry<String, Tap> entry : sinks.entrySet() ) 347 addSink( entry.getKey(), entry.getValue() ); 348 } 349 350 return this; 351 } 352 353 /** 354 * Method getTraps returns the traps of this FlowDef object. 355 * 356 * @return the traps (type Map<String, Tap>) of this FlowDef object. 357 */ 358 public Map<String, Tap> getTraps() 359 { 360 return traps; 361 } 362 363 /** 364 * Method getTrapsCopy returns a copy of the trap Map. 365 * 366 * @return the trapsCopy (type Map<String, Tap>) of this FlowDef object. 367 */ 368 public Map<String, Tap> getTrapsCopy() 369 { 370 return new HashMap<String, Tap>( traps ); 371 } 372 373 /** 374 * Method addTrap adds a new named trap {@link Tap} for use in the resulting {@link Flow}. 375 * 376 * @param name of String 377 * @param trap of Tap 378 * @return FlowDef 379 */ 380 public FlowDef addTrap( String name, Tap trap ) 381 { 382 if( traps.containsKey( name ) ) 383 throw new IllegalArgumentException( "cannot add duplicate trap: " + name ); 384 385 traps.put( name, trap ); 386 return this; 387 } 388 389 /** 390 * Method addTrap adds a new trap {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}. 391 * 392 * @param pipe of Pipe 393 * @param trap of Tap 394 * @return FlowDef 395 */ 396 public FlowDef addTrap( Pipe pipe, Tap trap ) 397 { 398 addTrap( pipe.getName(), trap ); 399 return this; 400 } 401 402 /** 403 * Method addTraps adds a Map of the names and {@link Tap} pairs. 404 * 405 * @param traps of Map<String, Tap> 406 * @return FlowDef 407 */ 408 public FlowDef addTraps( Map<String, Tap> traps ) 409 { 410 if( traps != null ) 411 { 412 for( Map.Entry<String, Tap> entry : traps.entrySet() ) 413 addTrap( entry.getKey(), entry.getValue() ); 414 } 415 416 return this; 417 } 418 419 /** 420 * Method getCheckpoints returns the checkpoint taps of this FlowDef object. 421 * 422 * @return the checkpoints (type Map<String, Tap>) of this FlowDef object. 423 */ 424 public Map<String, Tap> getCheckpoints() 425 { 426 return checkpoints; 427 } 428 429 /** 430 * Method getCheckpointsCopy returns a copy of the checkpoint tap Map. 431 * 432 * @return the checkpointsCopy (type Map<String, Tap>) of this FlowDef object. 433 */ 434 public Map<String, Tap> getCheckpointsCopy() 435 { 436 return new HashMap<String, Tap>( checkpoints ); 437 } 438 439 /** 440 * Method addCheckpoint adds a new named checkpoint {@link Tap} for use in the resulting {@link Flow}. 441 * 442 * @param name of String 443 * @param checkpoint of Tap 444 * @return FlowDef 445 */ 446 public FlowDef addCheckpoint( String name, Tap checkpoint ) 447 { 448 if( checkpoints.containsKey( name ) ) 449 throw new IllegalArgumentException( "cannot add duplicate checkpoint: " + name ); 450 451 checkpoints.put( name, checkpoint ); 452 return this; 453 } 454 455 /** 456 * Method addCheckpoint adds a new checkpoint {@link Tap} named after the given {@link Checkpoint} for use in the resulting {@link Flow}. 457 * 458 * @param pipe of Pipe 459 * @param checkpoint of Tap 460 * @return FlowDef 461 */ 462 public FlowDef addCheckpoint( Checkpoint pipe, Tap checkpoint ) 463 { 464 addCheckpoint( pipe.getName(), checkpoint ); 465 return this; 466 } 467 468 /** 469 * Method addCheckpoints adds a Map of the names and {@link Tap} pairs. 470 * 471 * @param checkpoints of Map<String, Tap> 472 * @return FlowDef 473 */ 474 public FlowDef addCheckpoints( Map<String, Tap> checkpoints ) 475 { 476 if( checkpoints != null ) 477 { 478 for( Map.Entry<String, Tap> entry : checkpoints.entrySet() ) 479 addCheckpoint( entry.getKey(), entry.getValue() ); 480 } 481 482 return this; 483 } 484 485 /** 486 * Method getTails returns all the current pipe assembly tails the FlowDef holds. 487 * 488 * @return the tails (type List<Pipe>) of this FlowDef object. 489 */ 490 public List<Pipe> getTails() 491 { 492 return tails; 493 } 494 495 /** 496 * Method getTailsArray returns all the current pipe assembly tails the FlowDef holds. 497 * 498 * @return the tailsArray (type Pipe[]) of this FlowDef object. 499 */ 500 public Pipe[] getTailsArray() 501 { 502 return tails.toArray( new Pipe[ tails.size() ] ); 503 } 504 505 /** 506 * Method addTail adds a new {@link Pipe} to this FlowDef that represents a tail in a pipe assembly. 507 * <p/> 508 * Be sure to add a sink tap that has the same name as this tail. 509 * 510 * @param tail of Pipe 511 * @return FlowDef 512 */ 513 public FlowDef addTail( Pipe tail ) 514 { 515 if( tail != null ) 516 this.tails.add( tail ); 517 518 return this; 519 } 520 521 /** 522 * Method addTails adds a Collection of tails. 523 * 524 * @param tails of Collection<Pipe> 525 * @return FlowDef 526 */ 527 public FlowDef addTails( Collection<Pipe> tails ) 528 { 529 for( Pipe tail : tails ) 530 addTail( tail ); 531 532 return this; 533 } 534 535 /** 536 * Method addTails adds an array of tails. 537 * 538 * @param tails of Pipe... 539 * @return FlowDef 540 */ 541 public FlowDef addTails( Pipe... tails ) 542 { 543 for( Pipe tail : tails ) 544 addTail( tail ); 545 546 return this; 547 } 548 549 public FlowDef setAssertionLevel( AssertionLevel assertionLevel ) 550 { 551 this.assertionLevel = assertionLevel; 552 553 return this; 554 } 555 556 public AssertionLevel getAssertionLevel() 557 { 558 return assertionLevel; 559 } 560 561 public FlowDef setDebugLevel( DebugLevel debugLevel ) 562 { 563 this.debugLevel = debugLevel; 564 565 return this; 566 } 567 568 public DebugLevel getDebugLevel() 569 { 570 return debugLevel; 571 } 572 573 /** 574 * Method setRunID sets the checkpoint run or execution ID to be used to find prior failed runs against 575 * this runID. 576 * <p/> 577 * When given, and a {@link Flow} fails to execute, a subsequent attempt to run the same Flow with the same 578 * runID will allow the Flow instance to start where it left off. 579 * <p/> 580 * Not all planners support this feature. 581 * <p/> 582 * A Flow name is required when using a runID. 583 * 584 * @param runID of type String 585 * @return FlowDef 586 */ 587 public FlowDef setRunID( String runID ) 588 { 589 if( runID != null && runID.isEmpty() ) 590 return this; 591 592 this.runID = runID; 593 594 return this; 595 } 596 597 public String getRunID() 598 { 599 return runID; 600 } 601 602 public List<String> getClassPath() 603 { 604 return classPath; 605 } 606 607 /** 608 * Adds each given artifact to the classpath the assembly will execute under allowing 609 * {@link cascading.pipe.Operator}s to dynamically load classes and resources from a {@link ClassLoader}. 610 * 611 * @param artifact a jar or other file String path 612 * @return FlowDef 613 */ 614 public FlowDef addToClassPath( String artifact ) 615 { 616 if( artifact == null || artifact.isEmpty() ) 617 return this; 618 619 classPath.add( artifact ); 620 621 return this; 622 } 623 }