001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow;
022    
023    import java.util.ArrayList;
024    import java.util.Arrays;
025    import java.util.Collection;
026    import java.util.HashMap;
027    import java.util.LinkedHashMap;
028    import java.util.List;
029    import java.util.Map;
030    
031    import cascading.operation.AssertionLevel;
032    import cascading.operation.DebugLevel;
033    import cascading.pipe.Checkpoint;
034    import cascading.pipe.Pipe;
035    import cascading.property.UnitOfWorkDef;
036    import cascading.tap.Tap;
037    import cascading.util.Util;
038    
039    /**
040     * Class FlowDef is a fluent interface for defining a {@link Flow}.
041     * <p/>
042     * This allows for ad-hoc building of Flow data and meta-data like tags.
043     * <p/>
044     * Instead of calling one of the {@link FlowConnector} connect methods, {@link FlowConnector#connect(FlowDef)}
045     * can be called.
046     */
047    public class FlowDef extends UnitOfWorkDef<FlowDef>
048      {
049      protected Map<String, Tap> sources = new HashMap<String, Tap>();
050      protected Map<String, Tap> sinks = new HashMap<String, Tap>();
051      protected Map<String, Tap> traps = new HashMap<String, Tap>();
052      protected Map<String, Tap> checkpoints = new HashMap<String, Tap>();
053    
054      protected List<String> classPath = new ArrayList<String>();
055      protected List<Pipe> tails = new ArrayList<Pipe>();
056      protected List<AssemblyPlanner> assemblyPlanners = new ArrayList<AssemblyPlanner>();
057    
058      protected HashMap<String, String> flowDescriptor = new LinkedHashMap<String, String>();
059    
060      protected AssertionLevel assertionLevel;
061      protected DebugLevel debugLevel;
062    
063      protected String runID;
064    
065      /**
066       * Creates a new instance of a FlowDef.
067       *
068       * @return a FlowDef
069       */
070      public static FlowDef flowDef()
071        {
072        return new FlowDef();
073        }
074    
075      /** Constructor FlowDef creates a new FlowDef instance. */
076      public FlowDef()
077        {
078        }
079    
080      /**
081       * Method getAssemblyPlanners returns the current registered AssemblyPlanners.
082       *
083       * @return a List of AssemblyPlanner instances
084       */
085      public List<AssemblyPlanner> getAssemblyPlanners()
086        {
087        return assemblyPlanners;
088        }
089    
090      /**
091       * Method addAssemblyPlanner adds new AssemblyPlanner instances to be evaluated.
092       *
093       * @param assemblyPlanner of type AssemblyPlanner
094       * @return a FlowDef
095       */
096      public FlowDef addAssemblyPlanner( AssemblyPlanner assemblyPlanner )
097        {
098        assemblyPlanners.add( assemblyPlanner );
099        addDescriptions( assemblyPlanner.getFlowDescriptor() );
100    
101        return this;
102        }
103    
104      /**
105       * Method getSources returns the sources of this FlowDef object.
106       *
107       * @return the sources (type Map<String, Tap>) of this FlowDef object.
108       */
109      public Map<String, Tap> getSources()
110        {
111        return sources;
112        }
113    
114      /**
115       * Method getSourcesCopy returns a copy of the sources Map.
116       *
117       * @return the sourcesCopy (type Map<String, Tap>) of this FlowDef object.
118       */
119      public Map<String, Tap> getSourcesCopy()
120        {
121        return new HashMap<String, Tap>( sources );
122        }
123    
124      /**
125       * Method getFlowDescriptor returns the  flowDescriptor of this FlowDef.
126       *
127       * @return the flowDescriptor of this FlowDef object.
128       */
129      public HashMap<String, String> getFlowDescriptor()
130        {
131        return flowDescriptor;
132        }
133    
134      /**
135       * Method addSource adds a new named source {@link Tap} for use in the resulting {@link Flow}.
136       *
137       * @param name   of String
138       * @param source of Tap
139       * @return FlowDef
140       */
141      public FlowDef addSource( String name, Tap source )
142        {
143        if( sources.containsKey( name ) )
144          throw new IllegalArgumentException( "cannot add duplicate source: " + name );
145    
146        sources.put( name, source );
147        return this;
148        }
149    
150      /**
151       * Method addSource adds a new source {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}.
152       * <p/>
153       * If the given pipe is not a head pipe, it will be resolved. If more than one is found, an
154       * {@link IllegalArgumentException} will be thrown.
155       *
156       * @param pipe   of Pipe
157       * @param source of Tap
158       * @return FlowDef
159       */
160      public FlowDef addSource( Pipe pipe, Tap source )
161        {
162        if( pipe == null )
163          throw new IllegalArgumentException( "pipe may not be null" );
164    
165        Pipe[] heads = pipe.getHeads();
166    
167        if( heads.length != 1 )
168          throw new IllegalArgumentException( "pipe has too many heads, found: " + Arrays.toString( Pipe.names( heads ) ) );
169    
170        addSource( heads[ 0 ].getName(), source );
171        return this;
172        }
173    
174      /**
175       * Method addSources adds a map of name and {@link Tap} pairs.
176       *
177       * @param sources of Map<String, Tap>
178       * @return FlowDef
179       */
180      public FlowDef addSources( Map<String, Tap> sources )
181        {
182        if( sources != null )
183          {
184          for( Map.Entry<String, Tap> entry : sources.entrySet() )
185            addSource( entry.getKey(), entry.getValue() );
186          }
187    
188        return this;
189        }
190    
191      /**
192       * Method addDescription adds a user readable description to the flowDescriptor.
193       * <p/>
194       * This uses the {@link FlowDescriptors#DESCRIPTION} key.
195       */
196      public FlowDef addDescription( String description )
197        {
198        addDescription( FlowDescriptors.DESCRIPTION, description );
199    
200        return this;
201        }
202    
203      /**
204       * Method addDescription adds a description to the flowDescriptor.
205       * <p/>
206       * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents.
207       * For known description types, see {@link FlowDescriptors}.
208       * <p/>
209       * If an existing key exists, it will be appended to the original value using
210       * {@link FlowDescriptors#VALUE_SEPARATOR}.
211       *
212       * @param key   The key as a String.
213       * @param value The value as a String.
214       * @return FlowDef
215       */
216      public FlowDef addDescription( String key, String value )
217        {
218        if( Util.isEmpty( value ) ) // do nothing
219          return this;
220    
221        if( flowDescriptor.containsKey( key ) )
222          {
223          String original = flowDescriptor.get( key );
224    
225          if( !Util.isEmpty( original ) )
226            value = original + FlowDescriptors.VALUE_SEPARATOR + value;
227          }
228    
229        flowDescriptor.put( key, value );
230    
231        return this;
232        }
233    
234      /**
235       * Method addProperties adds all properties in the given map in order to the flowDescriptor. If the given Map has
236       * an explicit order, it will be preserved.
237       * <p/>
238       * Flow descriptions provide meta-data to monitoring systems describing the workload a given Flow represents.
239       * For known description types, see {@link FlowDescriptors}.
240       *
241       * @param descriptions The properties to be added to the map.
242       * @return FlowDef
243       */
244      public FlowDef addDescriptions( Map<String, String> descriptions )
245        {
246        for( Map.Entry<String, String> entry : descriptions.entrySet() )
247          addDescription( entry.getKey(), entry.getValue() );
248    
249        return this;
250        }
251    
252      /**
253       * Method getSinks returns the sinks of this FlowDef object.
254       *
255       * @return the sinks (type Map<String, Tap>) of this FlowDef object.
256       */
257      public Map<String, Tap> getSinks()
258        {
259        return sinks;
260        }
261    
262      /**
263       * Method getSinksCopy returns a copy of the sink Map.
264       *
265       * @return the sinksCopy (type Map<String, Tap>) of this FlowDef object.
266       */
267      public Map<String, Tap> getSinksCopy()
268        {
269        return new HashMap<String, Tap>( sinks );
270        }
271    
272      /**
273       * Method addSink adds a new named sink {@link Tap} for use in the resulting {@link Flow}.
274       *
275       * @param name of String
276       * @param sink of Tap
277       * @return FlowDef
278       */
279      public FlowDef addSink( String name, Tap sink )
280        {
281        if( sinks.containsKey( name ) )
282          throw new IllegalArgumentException( "cannot add duplicate sink: " + name );
283    
284        sinks.put( name, sink );
285        return this;
286        }
287    
288      /**
289       * Method addSink adds a new sink {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}.
290       *
291       * @param tail of Pipe
292       * @param sink of Tap
293       * @return FlowDef
294       */
295      public FlowDef addSink( Pipe tail, Tap sink )
296        {
297        addSink( tail.getName(), sink );
298        return this;
299        }
300    
301      /**
302       * Method addTailSink adds the tail {@link Pipe} and sink {@link Tap} to this FlowDef.
303       * <p/>
304       * This is a convenience method for adding both a tail and sink simultaneously. There isn't a similar method
305       * for heads and sources as the head Pipe can always be derived.
306       *
307       * @param tail of Pipe
308       * @param sink of Tap
309       * @return FlowDef
310       */
311      public FlowDef addTailSink( Pipe tail, Tap sink )
312        {
313        addSink( tail.getName(), sink );
314        addTail( tail );
315        return this;
316        }
317    
318      /**
319       * Method addSinks adds a Map of the named and {@link Tap} pairs.
320       *
321       * @param sinks of Map<String, Tap>
322       * @return FlowDef
323       */
324      public FlowDef addSinks( Map<String, Tap> sinks )
325        {
326        if( sinks != null )
327          {
328          for( Map.Entry<String, Tap> entry : sinks.entrySet() )
329            addSink( entry.getKey(), entry.getValue() );
330          }
331    
332        return this;
333        }
334    
335      /**
336       * Method getTraps returns the traps of this FlowDef object.
337       *
338       * @return the traps (type Map<String, Tap>) of this FlowDef object.
339       */
340      public Map<String, Tap> getTraps()
341        {
342        return traps;
343        }
344    
345      /**
346       * Method getTrapsCopy returns a copy of the trap Map.
347       *
348       * @return the trapsCopy (type Map<String, Tap>) of this FlowDef object.
349       */
350      public Map<String, Tap> getTrapsCopy()
351        {
352        return new HashMap<String, Tap>( traps );
353        }
354    
355      /**
356       * Method addTrap adds a new named trap {@link Tap} for use in the resulting {@link Flow}.
357       *
358       * @param name of String
359       * @param trap of Tap
360       * @return FlowDef
361       */
362      public FlowDef addTrap( String name, Tap trap )
363        {
364        if( traps.containsKey( name ) )
365          throw new IllegalArgumentException( "cannot add duplicate trap: " + name );
366    
367        traps.put( name, trap );
368        return this;
369        }
370    
371      /**
372       * Method addTrap adds a new trap {@link Tap} named after the given {@link Pipe} for use in the resulting {@link Flow}.
373       *
374       * @param pipe of Pipe
375       * @param trap of Tap
376       * @return FlowDef
377       */
378      public FlowDef addTrap( Pipe pipe, Tap trap )
379        {
380        addTrap( pipe.getName(), trap );
381        return this;
382        }
383    
384      /**
385       * Method addTraps adds a Map of the names and {@link Tap} pairs.
386       *
387       * @param traps of Map<String, Tap>
388       * @return FlowDef
389       */
390      public FlowDef addTraps( Map<String, Tap> traps )
391        {
392        if( traps != null )
393          {
394          for( Map.Entry<String, Tap> entry : traps.entrySet() )
395            addTrap( entry.getKey(), entry.getValue() );
396          }
397    
398        return this;
399        }
400    
401      /**
402       * Method getCheckpoints returns the checkpoint taps of this FlowDef object.
403       *
404       * @return the checkpoints (type Map<String, Tap>) of this FlowDef object.
405       */
406      public Map<String, Tap> getCheckpoints()
407        {
408        return checkpoints;
409        }
410    
411      /**
412       * Method getCheckpointsCopy returns a copy of the checkpoint tap Map.
413       *
414       * @return the checkpointsCopy (type Map<String, Tap>) of this FlowDef object.
415       */
416      public Map<String, Tap> getCheckpointsCopy()
417        {
418        return new HashMap<String, Tap>( checkpoints );
419        }
420    
421      /**
422       * Method addCheckpoint adds a new named checkpoint {@link Tap} for use in the resulting {@link Flow}.
423       *
424       * @param name       of String
425       * @param checkpoint of Tap
426       * @return FlowDef
427       */
428      public FlowDef addCheckpoint( String name, Tap checkpoint )
429        {
430        if( checkpoints.containsKey( name ) )
431          throw new IllegalArgumentException( "cannot add duplicate checkpoint: " + name );
432    
433        checkpoints.put( name, checkpoint );
434        return this;
435        }
436    
437      /**
438       * Method addCheckpoint adds a new checkpoint {@link Tap} named after the given {@link Checkpoint} for use in the resulting {@link Flow}.
439       *
440       * @param pipe       of Pipe
441       * @param checkpoint of Tap
442       * @return FlowDef
443       */
444      public FlowDef addCheckpoint( Checkpoint pipe, Tap checkpoint )
445        {
446        addCheckpoint( pipe.getName(), checkpoint );
447        return this;
448        }
449    
450      /**
451       * Method addCheckpoints adds a Map of the names and {@link Tap} pairs.
452       *
453       * @param checkpoints of Map<String, Tap>
454       * @return FlowDef
455       */
456      public FlowDef addCheckpoints( Map<String, Tap> checkpoints )
457        {
458        if( checkpoints != null )
459          {
460          for( Map.Entry<String, Tap> entry : checkpoints.entrySet() )
461            addCheckpoint( entry.getKey(), entry.getValue() );
462          }
463    
464        return this;
465        }
466    
467      /**
468       * Method getTails returns all the current pipe assembly tails the FlowDef holds.
469       *
470       * @return the tails (type List<Pipe>) of this FlowDef object.
471       */
472      public List<Pipe> getTails()
473        {
474        return tails;
475        }
476    
477      /**
478       * Method getTailsArray returns all the current pipe assembly tails the FlowDef holds.
479       *
480       * @return the tailsArray (type Pipe[]) of this FlowDef object.
481       */
482      public Pipe[] getTailsArray()
483        {
484        return tails.toArray( new Pipe[ tails.size() ] );
485        }
486    
487      /**
488       * Method addTail adds a new {@link Pipe} to this FlowDef that represents a tail in a pipe assembly.
489       * <p/>
490       * Be sure to add a sink tap that has the same name as this tail.
491       *
492       * @param tail of Pipe
493       * @return FlowDef
494       */
495      public FlowDef addTail( Pipe tail )
496        {
497        if( tail != null )
498          this.tails.add( tail );
499    
500        return this;
501        }
502    
503      /**
504       * Method addTails adds a Collection of tails.
505       *
506       * @param tails of Collection<Pipe>
507       * @return FlowDef
508       */
509      public FlowDef addTails( Collection<Pipe> tails )
510        {
511        for( Pipe tail : tails )
512          addTail( tail );
513    
514        return this;
515        }
516    
517      /**
518       * Method addTails adds an array of tails.
519       *
520       * @param tails of Pipe...
521       * @return FlowDef
522       */
523      public FlowDef addTails( Pipe... tails )
524        {
525        for( Pipe tail : tails )
526          addTail( tail );
527    
528        return this;
529        }
530    
531      public FlowDef setAssertionLevel( AssertionLevel assertionLevel )
532        {
533        this.assertionLevel = assertionLevel;
534    
535        return this;
536        }
537    
538      public AssertionLevel getAssertionLevel()
539        {
540        return assertionLevel;
541        }
542    
543      public FlowDef setDebugLevel( DebugLevel debugLevel )
544        {
545        this.debugLevel = debugLevel;
546    
547        return this;
548        }
549    
550      public DebugLevel getDebugLevel()
551        {
552        return debugLevel;
553        }
554    
555      /**
556       * Method setRunID sets the checkpoint run or execution ID to be used to find prior failed runs against
557       * this runID.
558       * <p/>
559       * When given, and a {@link Flow} fails to execute, a subsequent attempt to run the same Flow with the same
560       * runID will allow the Flow instance to start where it left off.
561       * <p/>
562       * Not all planners support this feature.
563       * <p/>
564       * A Flow name is required when using a runID.
565       *
566       * @param runID of type String
567       * @return FlowDef
568       */
569      public FlowDef setRunID( String runID )
570        {
571        if( runID != null && runID.isEmpty() )
572          return this;
573    
574        this.runID = runID;
575    
576        return this;
577        }
578    
579      public String getRunID()
580        {
581        return runID;
582        }
583    
584      public List<String> getClassPath()
585        {
586        return classPath;
587        }
588    
589      /**
590       * Adds each given artifact to the classpath the assembly will execute under allowing
591       * {@link cascading.pipe.Operator}s to dynamically load classes and resources from a {@link ClassLoader}.
592       *
593       * @param artifact a jar or other file String path
594       * @return FlowDef
595       */
596      public FlowDef addToClassPath( String artifact )
597        {
598        if( artifact == null || artifact.isEmpty() )
599          return this;
600    
601        classPath.add( artifact );
602    
603        return this;
604        }
605      }