001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow;
022
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.LinkedHashMap;
028import java.util.List;
029import java.util.Map;
030
031import cascading.operation.AssertionLevel;
032import cascading.operation.DebugLevel;
033import cascading.pipe.Checkpoint;
034import cascading.pipe.Pipe;
035import cascading.property.UnitOfWorkDef;
036import cascading.tap.Tap;
037import cascading.util.Util;
038
039/**
040 * Class FlowDef is a fluent interface for defining a {@link Flow}.
041 * <p/>
042 * This allows for ad-hoc building of Flow data and meta-data, like tags.
043 * <p/>
044 * Instead of calling one of the {@link FlowConnector} connect methods, {@link FlowConnector#connect(FlowDef)}
045 * can be called.
046 */
047public class FlowDef extends UnitOfWorkDef<FlowDef>
048  {
049  protected Map<String, Tap> sources = new HashMap<String, Tap>();
050  protected Map<String, Tap> sinks = new HashMap<String, Tap>();
051  protected Map<String, Tap> traps = new HashMap<String, Tap>();
052  protected Map<String, Tap> checkpoints = new HashMap<String, Tap>();
053
054  protected List<String> classPath = new ArrayList<String>();
055  protected List<Pipe> tails = new ArrayList<Pipe>();
056  protected List<AssemblyPlanner> assemblyPlanners = new ArrayList<AssemblyPlanner>();
057
058  protected HashMap<String, String> flowDescriptor = new LinkedHashMap<String, String>();
059
060  protected AssertionLevel assertionLevel;
061  protected DebugLevel debugLevel;
062
063  protected String runID;
064
065  /**
066   * Creates a new instance of a FlowDef.
067   *
068   * @return a FlowDef
069   */
070  public static FlowDef flowDef()
071    {
072    return new FlowDef();
073    }
074
075  /** Constructor FlowDef creates a new FlowDef instance. */
076  public FlowDef()
077    {
078    }
079
080  protected FlowDef( FlowDef flowDef, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints )
081    {
082    super( flowDef );
083
084    this.sources = sources;
085    this.sinks = sinks;
086    this.traps = traps;
087    this.checkpoints = checkpoints;
088
089    this.classPath = flowDef.classPath;
090    this.tails = flowDef.tails;
091    this.assemblyPlanners = flowDef.assemblyPlanners;
092    this.flowDescriptor = flowDef.flowDescriptor;
093    this.assertionLevel = flowDef.assertionLevel;
094    this.debugLevel = flowDef.debugLevel;
095    this.runID = flowDef.runID;
096    }
097
098  /**
099   * Method getAssemblyPlanners returns the current registered AssemblyPlanners.
100   *
101   * @return a List of AssemblyPlanner instances
102   */
103  public List<AssemblyPlanner> getAssemblyPlanners()
104    {
105    return assemblyPlanners;
106    }
107
108  /**
109   * Method addAssemblyPlanner adds new AssemblyPlanner instances to be evaluated.
110   *
111   * @param assemblyPlanner of type AssemblyPlanner
112   * @return a FlowDef
113   */
114  public FlowDef addAssemblyPlanner( AssemblyPlanner assemblyPlanner )
115    {
116    assemblyPlanners.add( assemblyPlanner );
117    addDescriptions( assemblyPlanner.getFlowDescriptor() );
118
119    return this;
120    }
121
122  /**
123   * Method getSources returns the sources of this FlowDef object.
124   *
125   * @return the sources (type Map<String, Tap>) of this FlowDef object.
126   */
127  public Map<String, Tap> getSources()
128    {
129    return sources;
130    }
131
132  /**
133   * Method getSourcesCopy returns a copy of the sources Map.
134   *
135   * @return the sourcesCopy (type Map<String, Tap>) of this FlowDef object.
136   */
137  public Map<String, Tap> getSourcesCopy()
138    {
139    return new HashMap<String, Tap>( sources );
140    }
141
142  /**
143   * Method getFlowDescriptor returns the  flowDescriptor of this FlowDef.
144   *
145   * @return the flowDescriptor of this FlowDef object.
146   */
147  public HashMap<String, String> getFlowDescriptor()
148    {
149    return flowDescriptor;
150    }
151
152  /**
153   * Method addSource adds a new named source {@link Tap} for use in the resulting {@link Flow}.
154   *
155   * @param name   of String
156   * @param source of Tap
157   * @return FlowDef
158   */
159  public FlowDef addSource( String name, Tap source )
160    {
161    if( sources.containsKey( name ) )
162      throw new IllegalArgumentException( "cannot add duplicate source: " + name );
163
164    sources.put( name, source );
165    return this;
166    }
167
168  /**
169   * Method addSource adds a new source {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}.
170   * <p/>
171   * If the given pipe is not a head pipe, it will be resolved. If more than one is found, an
172   * {@link IllegalArgumentException} will be thrown.
173   *
174   * @param pipe   of Pipe
175   * @param source of Tap
176   * @return FlowDef
177   */
178  public FlowDef addSource( Pipe pipe, Tap source )
179    {
180    if( pipe == null )
181      throw new IllegalArgumentException( "pipe may not be null" );
182
183    Pipe[] heads = pipe.getHeads();
184
185    if( heads.length != 1 )
186      throw new IllegalArgumentException( "pipe has too many heads, found: " + Arrays.toString( Pipe.names( heads ) ) );
187
188    addSource( heads[ 0 ].getName(), source );
189    return this;
190    }
191
192  /**
193   * Method addSources adds a map of name and {@link Tap} pairs.
194   *
195   * @param sources of Map<String, Tap>
196   * @return FlowDef
197   */
198  public FlowDef addSources( Map<String, Tap> sources )
199    {
200    if( sources != null )
201      {
202      for( Map.Entry<String, Tap> entry : sources.entrySet() )
203        addSource( entry.getKey(), entry.getValue() );
204      }
205
206    return this;
207    }
208
209  /**
210   * Method addDescription adds a user readable description to the flowDescriptor.
211   * <p/>
212   * This uses the {@link FlowDescriptors#DESCRIPTION} key.
213   */
214  public FlowDef addDescription( String description )
215    {
216    addDescription( FlowDescriptors.DESCRIPTION, description );
217
218    return this;
219    }
220
221  /**
222   * Method addDescription adds a description to the flowDescriptor.
223   * <p/>
224   * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents.
225   * For known description types, see {@link FlowDescriptors}.
226   * <p/>
227   * If an existing key exists, it will be appended to the original value using
228   * {@link FlowDescriptors#VALUE_SEPARATOR}.
229   *
230   * @param key   The key as a String.
231   * @param value The value as a String.
232   * @return FlowDef
233   */
234  public FlowDef addDescription( String key, String value )
235    {
236    if( Util.isEmpty( value ) ) // do nothing
237      return this;
238
239    if( flowDescriptor.containsKey( key ) )
240      {
241      String original = flowDescriptor.get( key );
242
243      if( !Util.isEmpty( original ) )
244        value = original + FlowDescriptors.VALUE_SEPARATOR + value;
245      }
246
247    flowDescriptor.put( key, value );
248
249    return this;
250    }
251
252  /**
253   * Method addProperties adds all properties in the given map in order to the flowDescriptor. If the given Map has
254   * an explicit order, it will be preserved.
255   * <p/>
256   * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents.
257   * For known description types, see {@link FlowDescriptors}.
258   *
259   * @param descriptions The properties to be added to the map.
260   * @return FlowDef
261   */
262  public FlowDef addDescriptions( Map<String, String> descriptions )
263    {
264    for( Map.Entry<String, String> entry : descriptions.entrySet() )
265      addDescription( entry.getKey(), entry.getValue() );
266
267    return this;
268    }
269
270  /**
271   * Method getSinks returns the sinks of this FlowDef object.
272   *
273   * @return the sinks (type Map<String, Tap>) of this FlowDef object.
274   */
275  public Map<String, Tap> getSinks()
276    {
277    return sinks;
278    }
279
280  /**
281   * Method getSinksCopy returns a copy of the sink Map.
282   *
283   * @return the sinksCopy (type Map<String, Tap>) of this FlowDef object.
284   */
285  public Map<String, Tap> getSinksCopy()
286    {
287    return new HashMap<String, Tap>( sinks );
288    }
289
290  /**
291   * Method addSink adds a new named sink {@link Tap} for use in the resulting {@link Flow}.
292   *
293   * @param name of String
294   * @param sink of Tap
295   * @return FlowDef
296   */
297  public FlowDef addSink( String name, Tap sink )
298    {
299    if( sinks.containsKey( name ) )
300      throw new IllegalArgumentException( "cannot add duplicate sink: " + name );
301
302    sinks.put( name, sink );
303    return this;
304    }
305
306  /**
307   * Method addSink adds a new sink {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}.
308   *
309   * @param tail of Pipe
310   * @param sink of Tap
311   * @return FlowDef
312   */
313  public FlowDef addSink( Pipe tail, Tap sink )
314    {
315    addSink( tail.getName(), sink );
316    return this;
317    }
318
319  /**
320   * Method addTailSink adds the tail {@link Pipe} and sink {@link Tap} to this FlowDef.
321   * <p/>
322   * This is a convenience method for adding both a tail and sink simultaneously. There isn't a similar method
323   * for heads and sources as the head Pipe can always be derived.
324   *
325   * @param tail of Pipe
326   * @param sink of Tap
327   * @return FlowDef
328   */
329  public FlowDef addTailSink( Pipe tail, Tap sink )
330    {
331    addSink( tail.getName(), sink );
332    addTail( tail );
333    return this;
334    }
335
336  /**
337   * Method addSinks adds a Map of the named and {@link Tap} pairs.
338   *
339   * @param sinks of Map<String, Tap>
340   * @return FlowDef
341   */
342  public FlowDef addSinks( Map<String, Tap> sinks )
343    {
344    if( sinks != null )
345      {
346      for( Map.Entry<String, Tap> entry : sinks.entrySet() )
347        addSink( entry.getKey(), entry.getValue() );
348      }
349
350    return this;
351    }
352
353  /**
354   * Method getTraps returns the traps of this FlowDef object.
355   *
356   * @return the traps (type Map<String, Tap>) of this FlowDef object.
357   */
358  public Map<String, Tap> getTraps()
359    {
360    return traps;
361    }
362
363  /**
364   * Method getTrapsCopy returns a copy of the trap Map.
365   *
366   * @return the trapsCopy (type Map<String, Tap>) of this FlowDef object.
367   */
368  public Map<String, Tap> getTrapsCopy()
369    {
370    return new HashMap<String, Tap>( traps );
371    }
372
373  /**
374   * Method addTrap adds a new named trap {@link Tap} for use in the resulting {@link Flow}.
375   *
376   * @param name of String
377   * @param trap of Tap
378   * @return FlowDef
379   */
380  public FlowDef addTrap( String name, Tap trap )
381    {
382    if( traps.containsKey( name ) )
383      throw new IllegalArgumentException( "cannot add duplicate trap: " + name );
384
385    traps.put( name, trap );
386    return this;
387    }
388
389  /**
390   * Method addTrap adds a new trap {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}.
391   *
392   * @param pipe of Pipe
393   * @param trap of Tap
394   * @return FlowDef
395   */
396  public FlowDef addTrap( Pipe pipe, Tap trap )
397    {
398    addTrap( pipe.getName(), trap );
399    return this;
400    }
401
402  /**
403   * Method addTraps adds a Map of the names and {@link Tap} pairs.
404   *
405   * @param traps of Map<String, Tap>
406   * @return FlowDef
407   */
408  public FlowDef addTraps( Map<String, Tap> traps )
409    {
410    if( traps != null )
411      {
412      for( Map.Entry<String, Tap> entry : traps.entrySet() )
413        addTrap( entry.getKey(), entry.getValue() );
414      }
415
416    return this;
417    }
418
419  /**
420   * Method getCheckpoints returns the checkpoint taps of this FlowDef object.
421   *
422   * @return the checkpoints (type Map<String, Tap>) of this FlowDef object.
423   */
424  public Map<String, Tap> getCheckpoints()
425    {
426    return checkpoints;
427    }
428
429  /**
430   * Method getCheckpointsCopy returns a copy of the checkpoint tap Map.
431   *
432   * @return the checkpointsCopy (type Map<String, Tap>) of this FlowDef object.
433   */
434  public Map<String, Tap> getCheckpointsCopy()
435    {
436    return new HashMap<String, Tap>( checkpoints );
437    }
438
439  /**
440   * Method addCheckpoint adds a new named checkpoint {@link Tap} for use in the resulting {@link Flow}.
441   *
442   * @param name       of String
443   * @param checkpoint of Tap
444   * @return FlowDef
445   */
446  public FlowDef addCheckpoint( String name, Tap checkpoint )
447    {
448    if( checkpoints.containsKey( name ) )
449      throw new IllegalArgumentException( "cannot add duplicate checkpoint: " + name );
450
451    checkpoints.put( name, checkpoint );
452    return this;
453    }
454
455  /**
456   * Method addCheckpoint adds a new checkpoint {@link Tap} named after the given {@link Checkpoint} for use in the resulting {@link Flow}.
457   *
458   * @param pipe       of Pipe
459   * @param checkpoint of Tap
460   * @return FlowDef
461   */
462  public FlowDef addCheckpoint( Checkpoint pipe, Tap checkpoint )
463    {
464    addCheckpoint( pipe.getName(), checkpoint );
465    return this;
466    }
467
468  /**
469   * Method addCheckpoints adds a Map of the names and {@link Tap} pairs.
470   *
471   * @param checkpoints of Map<String, Tap>
472   * @return FlowDef
473   */
474  public FlowDef addCheckpoints( Map<String, Tap> checkpoints )
475    {
476    if( checkpoints != null )
477      {
478      for( Map.Entry<String, Tap> entry : checkpoints.entrySet() )
479        addCheckpoint( entry.getKey(), entry.getValue() );
480      }
481
482    return this;
483    }
484
485  /**
486   * Method getTails returns all the current pipe assembly tails the FlowDef holds.
487   *
488   * @return the tails (type List<Pipe>) of this FlowDef object.
489   */
490  public List<Pipe> getTails()
491    {
492    return tails;
493    }
494
495  /**
496   * Method getTailsArray returns all the current pipe assembly tails the FlowDef holds.
497   *
498   * @return the tailsArray (type Pipe[]) of this FlowDef object.
499   */
500  public Pipe[] getTailsArray()
501    {
502    return tails.toArray( new Pipe[ tails.size() ] );
503    }
504
505  /**
506   * Method addTail adds a new {@link Pipe} to this FlowDef that represents a tail in a pipe assembly.
507   * <p/>
508   * Be sure to add a sink tap that has the same name as this tail.
509   *
510   * @param tail of Pipe
511   * @return FlowDef
512   */
513  public FlowDef addTail( Pipe tail )
514    {
515    if( tail != null )
516      this.tails.add( tail );
517
518    return this;
519    }
520
521  /**
522   * Method addTails adds a Collection of tails.
523   *
524   * @param tails of Collection<Pipe>
525   * @return FlowDef
526   */
527  public FlowDef addTails( Collection<Pipe> tails )
528    {
529    for( Pipe tail : tails )
530      addTail( tail );
531
532    return this;
533    }
534
535  /**
536   * Method addTails adds an array of tails.
537   *
538   * @param tails of Pipe...
539   * @return FlowDef
540   */
541  public FlowDef addTails( Pipe... tails )
542    {
543    for( Pipe tail : tails )
544      addTail( tail );
545
546    return this;
547    }
548
549  public FlowDef setAssertionLevel( AssertionLevel assertionLevel )
550    {
551    this.assertionLevel = assertionLevel;
552
553    return this;
554    }
555
556  public AssertionLevel getAssertionLevel()
557    {
558    return assertionLevel;
559    }
560
561  public FlowDef setDebugLevel( DebugLevel debugLevel )
562    {
563    this.debugLevel = debugLevel;
564
565    return this;
566    }
567
568  public DebugLevel getDebugLevel()
569    {
570    return debugLevel;
571    }
572
573  /**
574   * Method setRunID sets the checkpoint run or execution ID to be used to find prior failed runs against
575   * this runID.
576   * <p/>
577   * When given, and a {@link Flow} fails to execute, a subsequent attempt to run the same Flow with the same
578   * runID will allow the Flow instance to start where it left off.
579   * <p/>
580   * Not all planners support this feature.
581   * <p/>
582   * A Flow name is required when using a runID.
583   *
584   * @param runID of type String
585   * @return FlowDef
586   */
587  public FlowDef setRunID( String runID )
588    {
589    if( runID != null && runID.isEmpty() )
590      return this;
591
592    this.runID = runID;
593
594    return this;
595    }
596
597  public String getRunID()
598    {
599    return runID;
600    }
601
602  public List<String> getClassPath()
603    {
604    return classPath;
605    }
606
607  /**
608   * Adds each given artifact to the classpath the assembly will execute under allowing
609   * {@link cascading.pipe.Operator}s to dynamically load classes and resources from a {@link ClassLoader}.
610   *
611   * @param artifact a jar or other file String path
612   * @return FlowDef
613   */
614  public FlowDef addToClassPath( String artifact )
615    {
616    if( artifact == null || artifact.isEmpty() )
617      return this;
618
619    classPath.add( artifact );
620
621    return this;
622    }
623  }