001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.io.IOException; 024import java.util.Map; 025 026import cascading.flow.BaseFlow; 027import cascading.flow.Flow; 028import cascading.flow.FlowDef; 029import cascading.flow.FlowException; 030import cascading.flow.FlowProcess; 031import cascading.flow.FlowStep; 032import cascading.flow.hadoop.util.HadoopUtil; 033import cascading.flow.planner.BaseFlowStep; 034import cascading.flow.planner.PlatformInfo; 035import cascading.property.PropertyUtil; 036import cascading.tap.hadoop.io.HttpFileSystem; 037import cascading.util.ShutdownUtil; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.mapred.JobConf; 040import riffle.process.ProcessConfiguration; 041 042import static cascading.flow.FlowProps.MAX_CONCURRENT_STEPS; 043import static cascading.flow.FlowProps.PRESERVE_TEMPORARY_FILES; 044 045/** 046 * Class HadoopFlow is the Apache Hadoop specific implementation of a {@link Flow}. 047 * <p/> 048 * HadoopFlow must be created through a {@link cascading.flow.FlowConnector} sub-class instance. 049 * <p/> 050 * If classpath paths are provided on the {@link FlowDef}, the Hadoop distributed cache mechanism will be used 051 * to augment the remote classpath. 052 * <p/> 053 * Any path elements that are relative will be uploaded to HDFS, and the HDFS URI will be used on the JobConf. Note 054 * all paths are added as "files" to the JobConf, not archives, so they aren't needlessly uncompressed cluster side. 055 * 056 * @see cascading.flow.FlowConnector 057 */ 058public class HadoopFlow extends BaseFlow<JobConf> 059 { 060 /** Field hdfsShutdown */ 061 private static Thread hdfsShutdown = null; 062 /** Field shutdownHook */ 063 private static ShutdownUtil.Hook shutdownHook; 064 /** Field jobConf */ 065 private transient JobConf jobConf; 066 /** Field preserveTemporaryFiles */ 067 private boolean preserveTemporaryFiles = false; 068 /** Field syncPaths */ 069 private transient Map<Path, Path> syncPaths; 070 071 protected HadoopFlow() 072 { 073 } 074 075 /** 076 * Returns property preserveTemporaryFiles. 077 * 078 * @param properties of type Map 079 * @return a boolean 080 */ 081 static boolean getPreserveTemporaryFiles( Map<Object, Object> properties ) 082 { 083 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, PRESERVE_TEMPORARY_FILES, "false" ) ); 084 } 085 086 static int getMaxConcurrentSteps( JobConf jobConf ) 087 { 088 return jobConf.getInt( MAX_CONCURRENT_STEPS, 0 ); 089 } 090 091 protected HadoopFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, String name, Map<String, String> flowDescriptor ) 092 { 093 super( platformInfo, properties, jobConf, name, flowDescriptor ); 094 initFromProperties( properties ); 095 } 096 097 public HadoopFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, FlowDef flowDef ) 098 { 099 super( platformInfo, properties, jobConf, flowDef ); 100 101 initFromProperties( properties ); 102 } 103 104 @Override 105 protected void initFromProperties( Map<Object, Object> properties ) 106 { 107 super.initFromProperties( properties ); 108 preserveTemporaryFiles = getPreserveTemporaryFiles( properties ); 109 } 110 111 protected void initConfig( Map<Object, Object> properties, JobConf parentConfig ) 112 { 113 if( properties != null ) 114 parentConfig = createConfig( properties, parentConfig ); 115 116 if( parentConfig == null ) // this is ok, getJobConf will pass a default parent in 117 return; 118 119 jobConf = HadoopUtil.copyJobConf( parentConfig ); // prevent local values from being shared 120 jobConf.set( "fs.http.impl", HttpFileSystem.class.getName() ); 121 jobConf.set( "fs.https.impl", HttpFileSystem.class.getName() ); 122 123 syncPaths = HadoopUtil.addToClassPath( jobConf, getClassPath() ); 124 } 125 126 @Override 127 protected void setConfigProperty( JobConf config, Object key, Object value ) 128 { 129 // don't let these objects pass, even though toString is called below. 130 if( value instanceof Class || value instanceof JobConf ) 131 return; 132 133 config.set( key.toString(), value.toString() ); 134 } 135 136 @Override 137 protected JobConf newConfig( JobConf defaultConfig ) 138 { 139 return defaultConfig == null ? new JobConf() : HadoopUtil.copyJobConf( defaultConfig ); 140 } 141 142 @ProcessConfiguration 143 @Override 144 public JobConf getConfig() 145 { 146 if( jobConf == null ) 147 initConfig( null, new JobConf() ); 148 149 return jobConf; 150 } 151 152 @Override 153 public JobConf getConfigCopy() 154 { 155 return HadoopUtil.copyJobConf( getConfig() ); 156 } 157 158 @Override 159 public Map<Object, Object> getConfigAsProperties() 160 { 161 return HadoopUtil.createProperties( getConfig() ); 162 } 163 164 /** 165 * Method getProperty returns the value associated with the given key from the underlying properties system. 166 * 167 * @param key of type String 168 * @return String 169 */ 170 public String getProperty( String key ) 171 { 172 return getConfig().get( key ); 173 } 174 175 @Override 176 public FlowProcess<JobConf> getFlowProcess() 177 { 178 return new HadoopFlowProcess( getFlowSession(), getConfig() ); 179 } 180 181 /** 182 * Method isPreserveTemporaryFiles returns false if temporary files will be cleaned when this Flow completes. 183 * 184 * @return the preserveTemporaryFiles (type boolean) of this Flow object. 185 */ 186 public boolean isPreserveTemporaryFiles() 187 { 188 return preserveTemporaryFiles; 189 } 190 191 @Override 192 protected void internalStart() 193 { 194 try 195 { 196 copyToDistributedCache(); 197 deleteSinksIfReplace(); 198 deleteTrapsIfReplace(); 199 deleteCheckpointsIfReplace(); 200 } 201 catch( IOException exception ) 202 { 203 throw new FlowException( "unable to delete sinks", exception ); 204 } 205 206 registerHadoopShutdownHook( this ); 207 } 208 209 private void copyToDistributedCache() 210 { 211 HadoopUtil.syncPaths( jobConf, syncPaths, true ); 212 } 213 214 @Override 215 public boolean stepsAreLocal() 216 { 217 return HadoopUtil.isLocal( getConfig() ); 218 } 219 220 private void cleanTemporaryFiles( boolean stop ) 221 { 222 if( stop ) // unstable to call fs operations during shutdown 223 return; 224 225 // use step config so cascading.flow.step.path property is properly used 226 for( FlowStep<JobConf> step : getFlowSteps() ) 227 ( (BaseFlowStep<JobConf>) step ).clean(); 228 } 229 230 private static synchronized void registerHadoopShutdownHook( Flow flow ) 231 { 232 if( !flow.isStopJobsOnExit() ) 233 return; 234 235 // guaranteed singleton here 236 if( shutdownHook != null ) 237 return; 238 239 getHdfsShutdownHook(); 240 241 shutdownHook = new ShutdownUtil.Hook() 242 { 243 @Override 244 public Priority priority() 245 { 246 return Priority.LAST; // very last thing to happen 247 } 248 249 @Override 250 public void execute() 251 { 252 callHdfsShutdownHook(); 253 } 254 }; 255 256 ShutdownUtil.addHook( shutdownHook ); 257 } 258 259 private synchronized static void callHdfsShutdownHook() 260 { 261 if( hdfsShutdown != null ) 262 hdfsShutdown.start(); 263 } 264 265 private synchronized static void getHdfsShutdownHook() 266 { 267 if( hdfsShutdown == null ) 268 hdfsShutdown = HadoopUtil.getHDFSShutdownHook(); 269 } 270 271 protected void internalClean( boolean stop ) 272 { 273 if( !isPreserveTemporaryFiles() ) 274 cleanTemporaryFiles( stop ); 275 } 276 277 protected void internalShutdown() 278 { 279 } 280 281 protected int getMaxNumParallelSteps() 282 { 283 return stepsAreLocal() ? 1 : getMaxConcurrentSteps( getConfig() ); 284 } 285 286 @Override 287 protected long getTotalSliceCPUMilliSeconds() 288 { 289 // this is a hadoop2 MR specific counter/value 290 long counterValue = flowStats.getCounterValue( "org.apache.hadoop.mapreduce.TaskCounter", "CPU_MILLISECONDS" ); 291 292 if( counterValue == 0 ) 293 return -1; 294 295 return counterValue; 296 } 297 }