001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow; 022 023 import java.util.Map; 024 import java.util.Properties; 025 026 import cascading.property.Props; 027 028 /** 029 * Class FlowProps is a fluent helper class for setting {@link Flow} specific properties through 030 * a {@link FlowConnector}. 031 * 032 * @see cascading.property.AppProps 033 * @see cascading.cascade.CascadeProps 034 * @see FlowConnectorProps 035 */ 036 public class FlowProps extends Props 037 { 038 public static final String DEFAULT_ELEMENT_COMPARATOR = "cascading.flow.tuple.element.comparator"; 039 public static final String PRESERVE_TEMPORARY_FILES = "cascading.flow.preservetemporaryfiles"; 040 public static final String JOB_POLLING_INTERVAL = "cascading.flow.job.pollinginterval"; 041 public static final String MAX_CONCURRENT_STEPS = "cascading.flow.maxconcurrentsteps"; 042 public static final String STOP_JOBS_ON_EXIT = "cascading.flow.stopjobsonexit"; // create a stop flows on exit for AppConfig 043 044 String defaultTupleElementComparator = null; 045 boolean preserveTemporaryFiles = false; 046 int jobPollingInterval = 5000; 047 int maxConcurrentSteps = 0; 048 boolean stopJobsOnExit = true; 049 050 /** 051 * Sets a default {@link java.util.Comparator} to be used if no Comparator can be found for the class via the 052 * {@link cascading.tuple.Comparison} interface. 053 * <p/> 054 * In the case of Hadoop, if the Comparator instance also implements {@link org.apache.hadoop.conf.Configurable}, the 055 * {@link org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)} 056 * will be called. 057 * <p/> 058 * In local mode, only the default constructor will be called for the comparator. 059 * 060 * @param properties 061 * @param className 062 */ 063 public static void setDefaultTupleElementComparator( Map<Object, Object> properties, String className ) 064 { 065 if( className != null ) 066 properties.put( DEFAULT_ELEMENT_COMPARATOR, className ); 067 } 068 069 /** 070 * Property preserveTemporaryFiles forces the Flow instance to keep any temporary intermediate data sets. Useful 071 * for debugging. Defaults to {@code false}. 072 * 073 * @param properties of type Map 074 * @param preserveTemporaryFiles of type boolean 075 */ 076 public static void setPreserveTemporaryFiles( Map<Object, Object> properties, boolean preserveTemporaryFiles ) 077 { 078 properties.put( PRESERVE_TEMPORARY_FILES, Boolean.toString( preserveTemporaryFiles ) ); 079 } 080 081 /** 082 * Property jobPollingInterval will set the time to wait between polling the remote server for the status of a job. 083 * The default value is 5000 msec (5 seconds). 084 * 085 * @param properties of type Map 086 * @param interval of type long 087 */ 088 public static void setJobPollingInterval( Map<Object, Object> properties, long interval ) 089 { 090 properties.put( JOB_POLLING_INTERVAL, Long.toString( interval ) ); 091 } 092 093 /** 094 * Method setMaxConcurrentSteps sets the maximum number of steps that a Flow can run concurrently. 095 * <p/> 096 * By default a Flow will attempt to run all give steps at the same time. But there are occasions 097 * where limiting the number of steps helps manages resources. 098 * 099 * @param properties of type Map<Object, Object> 100 * @param numConcurrentSteps of type int 101 */ 102 public static void setMaxConcurrentSteps( Map<Object, Object> properties, int numConcurrentSteps ) 103 { 104 properties.put( MAX_CONCURRENT_STEPS, Integer.toString( numConcurrentSteps ) ); 105 } 106 107 /** 108 * Property stopJobsOnExit will tell the Flow to add a JVM shutdown hook that will kill all running processes if the 109 * underlying computing system supports it. Defaults to {@code true}. 110 * 111 * @param properties of type Map 112 * @param stopJobsOnExit of type boolean 113 */ 114 public static void setStopJobsOnExit( Map<Object, Object> properties, boolean stopJobsOnExit ) 115 { 116 properties.put( STOP_JOBS_ON_EXIT, Boolean.toString( stopJobsOnExit ) ); 117 } 118 119 120 public FlowProps() 121 { 122 } 123 124 public String getDefaultTupleElementComparator() 125 { 126 return defaultTupleElementComparator; 127 } 128 129 public FlowProps setDefaultTupleElementComparator( String defaultTupleElementComparator ) 130 { 131 this.defaultTupleElementComparator = defaultTupleElementComparator; 132 133 return this; 134 } 135 136 public boolean isPreserveTemporaryFiles() 137 { 138 return preserveTemporaryFiles; 139 } 140 141 public FlowProps setPreserveTemporaryFiles( boolean preserveTemporaryFiles ) 142 { 143 this.preserveTemporaryFiles = preserveTemporaryFiles; 144 145 return this; 146 } 147 148 public int getJobPollingInterval() 149 { 150 return jobPollingInterval; 151 } 152 153 public FlowProps setJobPollingInterval( int jobPollingInterval ) 154 { 155 this.jobPollingInterval = jobPollingInterval; 156 157 return this; 158 } 159 160 public int getMaxConcurrentSteps() 161 { 162 return maxConcurrentSteps; 163 } 164 165 public FlowProps setMaxConcurrentSteps( int maxConcurrentSteps ) 166 { 167 this.maxConcurrentSteps = maxConcurrentSteps; 168 169 return this; 170 } 171 172 public boolean isStopJobsOnExit() 173 { 174 return stopJobsOnExit; 175 } 176 177 public FlowProps setStopJobsOnExit( boolean stopJobsOnExit ) 178 { 179 this.stopJobsOnExit = stopJobsOnExit; 180 181 return this; 182 } 183 184 @Override 185 protected void addPropertiesTo( Properties properties ) 186 { 187 setDefaultTupleElementComparator( properties, defaultTupleElementComparator ); 188 setPreserveTemporaryFiles( properties, preserveTemporaryFiles ); 189 setJobPollingInterval( properties, jobPollingInterval ); 190 setMaxConcurrentSteps( properties, maxConcurrentSteps ); 191 setStopJobsOnExit( properties, stopJobsOnExit ); 192 } 193 }