001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow;
022    
023    import java.util.Map;
024    import java.util.Properties;
025    
026    import cascading.property.Props;
027    
028    /**
029     * Class FlowProps is a fluent helper class for setting {@link Flow} specific properties through
030     * a {@link FlowConnector}.
031     *
032     * @see cascading.property.AppProps
033     * @see cascading.cascade.CascadeProps
034     * @see FlowConnectorProps
035     */
036    public class FlowProps extends Props
037      {
038      public static final String DEFAULT_ELEMENT_COMPARATOR = "cascading.flow.tuple.element.comparator";
039      public static final String PRESERVE_TEMPORARY_FILES = "cascading.flow.preservetemporaryfiles";
040      public static final String JOB_POLLING_INTERVAL = "cascading.flow.job.pollinginterval";
041      public static final String MAX_CONCURRENT_STEPS = "cascading.flow.maxconcurrentsteps";
042      public static final String STOP_JOBS_ON_EXIT = "cascading.flow.stopjobsonexit"; // create a stop flows on exit for AppConfig
043    
044      String defaultTupleElementComparator = null;
045      boolean preserveTemporaryFiles = false;
046      int jobPollingInterval = 5000;
047      int maxConcurrentSteps = 0;
048      boolean stopJobsOnExit = true;
049    
050      /**
051       * Sets a default {@link java.util.Comparator} to be used if no Comparator can be found for the class via the
052       * {@link cascading.tuple.Comparison} interface.
053       * <p/>
054       * In the case of Hadoop, if the Comparator instance also implements {@link org.apache.hadoop.conf.Configurable}, the
055       * {@link org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)}
056       * will be called.
057       * <p/>
058       * In local mode, only the default constructor will be called for the comparator.
059       *
060       * @param properties
061       * @param className
062       */
063      public static void setDefaultTupleElementComparator( Map<Object, Object> properties, String className )
064        {
065        if( className != null )
066          properties.put( DEFAULT_ELEMENT_COMPARATOR, className );
067        }
068    
069      /**
070       * Property preserveTemporaryFiles forces the Flow instance to keep any temporary intermediate data sets. Useful
071       * for debugging. Defaults to {@code false}.
072       *
073       * @param properties             of type Map
074       * @param preserveTemporaryFiles of type boolean
075       */
076      public static void setPreserveTemporaryFiles( Map<Object, Object> properties, boolean preserveTemporaryFiles )
077        {
078        properties.put( PRESERVE_TEMPORARY_FILES, Boolean.toString( preserveTemporaryFiles ) );
079        }
080    
081      /**
082       * Property jobPollingInterval will set the time to wait between polling the remote server for the status of a job.
083       * The default value is 5000 msec (5 seconds).
084       *
085       * @param properties of type Map
086       * @param interval   of type long
087       */
088      public static void setJobPollingInterval( Map<Object, Object> properties, long interval )
089        {
090        properties.put( JOB_POLLING_INTERVAL, Long.toString( interval ) );
091        }
092    
093      /**
094       * Method setMaxConcurrentSteps sets the maximum number of steps that a Flow can run concurrently.
095       * <p/>
096       * By default a Flow will attempt to run all give steps at the same time. But there are occasions
097       * where limiting the number of steps helps manages resources.
098       *
099       * @param properties         of type Map<Object, Object>
100       * @param numConcurrentSteps of type int
101       */
102      public static void setMaxConcurrentSteps( Map<Object, Object> properties, int numConcurrentSteps )
103        {
104        properties.put( MAX_CONCURRENT_STEPS, Integer.toString( numConcurrentSteps ) );
105        }
106    
107      /**
108       * Property stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the
109       * underlying computing system supports it. Defaults to {@code true}.
110       *
111       * @param properties     of type Map
112       * @param stopJobsOnExit of type boolean
113       */
114      public static void setStopJobsOnExit( Map<Object, Object> properties, boolean stopJobsOnExit )
115        {
116        properties.put( STOP_JOBS_ON_EXIT, Boolean.toString( stopJobsOnExit ) );
117        }
118    
119    
120      public FlowProps()
121        {
122        }
123    
124      public String getDefaultTupleElementComparator()
125        {
126        return defaultTupleElementComparator;
127        }
128    
129      public FlowProps setDefaultTupleElementComparator( String defaultTupleElementComparator )
130        {
131        this.defaultTupleElementComparator = defaultTupleElementComparator;
132    
133        return this;
134        }
135    
136      public boolean isPreserveTemporaryFiles()
137        {
138        return preserveTemporaryFiles;
139        }
140    
141      public FlowProps setPreserveTemporaryFiles( boolean preserveTemporaryFiles )
142        {
143        this.preserveTemporaryFiles = preserveTemporaryFiles;
144    
145        return this;
146        }
147    
148      public int getJobPollingInterval()
149        {
150        return jobPollingInterval;
151        }
152    
153      public FlowProps setJobPollingInterval( int jobPollingInterval )
154        {
155        this.jobPollingInterval = jobPollingInterval;
156    
157        return this;
158        }
159    
160      public int getMaxConcurrentSteps()
161        {
162        return maxConcurrentSteps;
163        }
164    
165      public FlowProps setMaxConcurrentSteps( int maxConcurrentSteps )
166        {
167        this.maxConcurrentSteps = maxConcurrentSteps;
168    
169        return this;
170        }
171    
172      public boolean isStopJobsOnExit()
173        {
174        return stopJobsOnExit;
175        }
176    
177      public FlowProps setStopJobsOnExit( boolean stopJobsOnExit )
178        {
179        this.stopJobsOnExit = stopJobsOnExit;
180    
181        return this;
182        }
183    
184      @Override
185      protected void addPropertiesTo( Properties properties )
186        {
187        setDefaultTupleElementComparator( properties, defaultTupleElementComparator );
188        setPreserveTemporaryFiles( properties, preserveTemporaryFiles );
189        setJobPollingInterval( properties, jobPollingInterval );
190        setMaxConcurrentSteps( properties, maxConcurrentSteps );
191        setStopJobsOnExit( properties, stopJobsOnExit );
192        }
193      }