001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.io.IOException; 024 import java.util.Map; 025 026 import cascading.flow.BaseFlow; 027 import cascading.flow.Flow; 028 import cascading.flow.FlowDef; 029 import cascading.flow.FlowException; 030 import cascading.flow.FlowProcess; 031 import cascading.flow.FlowStep; 032 import cascading.flow.hadoop.util.HadoopUtil; 033 import cascading.flow.planner.BaseFlowStep; 034 import cascading.flow.planner.PlatformInfo; 035 import cascading.property.PropertyUtil; 036 import cascading.tap.hadoop.io.HttpFileSystem; 037 import cascading.util.ShutdownUtil; 038 import org.apache.hadoop.fs.Path; 039 import org.apache.hadoop.mapred.JobConf; 040 041 import static cascading.flow.FlowProps.MAX_CONCURRENT_STEPS; 042 import static cascading.flow.FlowProps.PRESERVE_TEMPORARY_FILES; 043 044 /** 045 * Class HadoopFlow is the Apache Hadoop specific implementation of a {@link Flow}. 046 * <p/> 047 * HadoopFlow must be created through a {@link HadoopFlowConnector} instance. 048 * <p/> 049 * If classpath paths are provided on the {@link FlowDef}, the Hadoop distributed cache mechanism will be used 050 * to augment the remote classpath. 051 * <p/> 052 * Any path elements that are relative will be uploaded to HDFS, and the HDFS URI will be used on the JobConf. Note 053 * all paths are added as "files" to the JobConf, not archives, so they aren't needlessly uncompressed cluster side. 054 * 055 * @see HadoopFlowConnector 056 */ 057 public class HadoopFlow extends BaseFlow<JobConf> 058 { 059 /** Field hdfsShutdown */ 060 private static Thread hdfsShutdown = null; 061 /** Field shutdownHook */ 062 private static ShutdownUtil.Hook shutdownHook; 063 /** Field jobConf */ 064 private transient JobConf jobConf; 065 /** Field preserveTemporaryFiles */ 066 private boolean preserveTemporaryFiles = false; 067 /** Field syncPaths */ 068 private transient Map<Path, Path> syncPaths; 069 070 protected HadoopFlow() 071 { 072 } 073 074 /** 075 * Returns property preserveTemporaryFiles. 076 * 077 * @param properties of type Map 078 * @return a boolean 079 */ 080 static boolean getPreserveTemporaryFiles( Map<Object, Object> properties ) 081 { 082 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, PRESERVE_TEMPORARY_FILES, "false" ) ); 083 } 084 085 static int getMaxConcurrentSteps( JobConf jobConf ) 086 { 087 return jobConf.getInt( MAX_CONCURRENT_STEPS, 0 ); 088 } 089 090 protected HadoopFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, String name, Map<String, String> flowDescriptor ) 091 { 092 super( platformInfo, properties, jobConf, name, flowDescriptor ); 093 initFromProperties( properties ); 094 } 095 096 public HadoopFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, FlowDef flowDef ) 097 { 098 super( platformInfo, properties, jobConf, flowDef ); 099 100 initFromProperties( properties ); 101 } 102 103 @Override 104 protected void initFromProperties( Map<Object, Object> properties ) 105 { 106 super.initFromProperties( properties ); 107 preserveTemporaryFiles = getPreserveTemporaryFiles( properties ); 108 } 109 110 protected void initConfig( Map<Object, Object> properties, JobConf parentConfig ) 111 { 112 if( properties != null ) 113 parentConfig = createConfig( properties, parentConfig ); 114 115 if( parentConfig == null ) // this is ok, getJobConf will pass a default parent in 116 return; 117 118 jobConf = HadoopUtil.copyJobConf( parentConfig ); // prevent local values from being shared 119 jobConf.set( "fs.http.impl", HttpFileSystem.class.getName() ); 120 jobConf.set( "fs.https.impl", HttpFileSystem.class.getName() ); 121 122 syncPaths = HadoopUtil.addToClassPath( jobConf, getClassPath() ); 123 } 124 125 @Override 126 protected void setConfigProperty( JobConf config, Object key, Object value ) 127 { 128 // don't let these objects pass, even though toString is called below. 129 if( value instanceof Class || value instanceof JobConf ) 130 return; 131 132 config.set( key.toString(), value.toString() ); 133 } 134 135 @Override 136 protected JobConf newConfig( JobConf defaultConfig ) 137 { 138 return defaultConfig == null ? new JobConf() : HadoopUtil.copyJobConf( defaultConfig ); 139 } 140 141 @Override 142 public JobConf getConfig() 143 { 144 if( jobConf == null ) 145 initConfig( null, new JobConf() ); 146 147 return jobConf; 148 } 149 150 @Override 151 public JobConf getConfigCopy() 152 { 153 return HadoopUtil.copyJobConf( getConfig() ); 154 } 155 156 @Override 157 public Map<Object, Object> getConfigAsProperties() 158 { 159 return HadoopUtil.createProperties( getConfig() ); 160 } 161 162 /** 163 * Method getProperty returns the value associated with the given key from the underlying properties system. 164 * 165 * @param key of type String 166 * @return String 167 */ 168 public String getProperty( String key ) 169 { 170 return getConfig().get( key ); 171 } 172 173 @Override 174 public FlowProcess<JobConf> getFlowProcess() 175 { 176 return new HadoopFlowProcess( getFlowSession(), getConfig() ); 177 } 178 179 /** 180 * Method isPreserveTemporaryFiles returns false if temporary files will be cleaned when this Flow completes. 181 * 182 * @return the preserveTemporaryFiles (type boolean) of this Flow object. 183 */ 184 public boolean isPreserveTemporaryFiles() 185 { 186 return preserveTemporaryFiles; 187 } 188 189 @Override 190 protected void internalStart() 191 { 192 try 193 { 194 copyToDistributedCache(); 195 deleteSinksIfReplace(); 196 deleteTrapsIfReplace(); 197 deleteCheckpointsIfReplace(); 198 } 199 catch( IOException exception ) 200 { 201 throw new FlowException( "unable to delete sinks", exception ); 202 } 203 204 registerHadoopShutdownHook( this ); 205 } 206 207 private void copyToDistributedCache() 208 { 209 HadoopUtil.syncPaths( jobConf, syncPaths ); 210 } 211 212 @Override 213 public boolean stepsAreLocal() 214 { 215 return HadoopUtil.isLocal( getConfig() ); 216 } 217 218 private void cleanTemporaryFiles( boolean stop ) 219 { 220 if( stop ) // unstable to call fs operations during shutdown 221 return; 222 223 // use step config so cascading.flow.step.path property is properly used 224 for( FlowStep<JobConf> step : getFlowSteps() ) 225 ( (BaseFlowStep<JobConf>) step ).clean(); 226 } 227 228 private static synchronized void registerHadoopShutdownHook( Flow flow ) 229 { 230 if( !flow.isStopJobsOnExit() ) 231 return; 232 233 // guaranteed singleton here 234 if( shutdownHook != null ) 235 return; 236 237 getHdfsShutdownHook(); 238 239 shutdownHook = new ShutdownUtil.Hook() 240 { 241 @Override 242 public Priority priority() 243 { 244 return Priority.LAST; // very last thing to happen 245 } 246 247 @Override 248 public void execute() 249 { 250 callHdfsShutdownHook(); 251 } 252 }; 253 254 ShutdownUtil.addHook( shutdownHook ); 255 } 256 257 private synchronized static void callHdfsShutdownHook() 258 { 259 if( hdfsShutdown != null ) 260 hdfsShutdown.start(); 261 } 262 263 private synchronized static void getHdfsShutdownHook() 264 { 265 if( hdfsShutdown == null ) 266 hdfsShutdown = HadoopUtil.getHDFSShutdownHook(); 267 } 268 269 protected void internalClean( boolean stop ) 270 { 271 if( !isPreserveTemporaryFiles() ) 272 cleanTemporaryFiles( stop ); 273 } 274 275 protected void internalShutdown() 276 { 277 } 278 279 protected int getMaxNumParallelSteps() 280 { 281 return stepsAreLocal() ? 1 : getMaxConcurrentSteps( getConfig() ); 282 } 283 }