001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.planner; 022 023 import java.io.IOException; 024 import java.util.ArrayList; 025 import java.util.List; 026 import java.util.concurrent.Callable; 027 import java.util.concurrent.CountDownLatch; 028 029 import cascading.flow.Flow; 030 import cascading.flow.FlowException; 031 import cascading.flow.FlowStep; 032 import cascading.flow.FlowStepStrategy; 033 import cascading.management.state.ClientState; 034 import cascading.stats.FlowStats; 035 import cascading.stats.FlowStepStats; 036 import cascading.util.Util; 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 040 /** 041 * 042 */ 043 public abstract class FlowStepJob<Config> implements Callable<Throwable> 044 { 045 private static final Logger LOG = LoggerFactory.getLogger( FlowStepJob.class ); 046 047 /** Field stepName */ 048 protected final String stepName; 049 /** Field pollingInterval */ 050 protected long pollingInterval = 1000; 051 /** Field recordStatsInterval */ 052 protected long statsStoreInterval = 60 * 1000; 053 /** Field predecessors */ 054 protected List<FlowStepJob<Config>> predecessors; 055 /** Field latch */ 056 private final CountDownLatch latch = new CountDownLatch( 1 ); 057 /** Field stop */ 058 private boolean stop = false; 059 /** Field flowStep */ 060 protected final BaseFlowStep<Config> flowStep; 061 /** Field stepStats */ 062 protected FlowStepStats flowStepStats; 063 /** Field throwable */ 064 protected Throwable throwable; 065 066 public FlowStepJob( ClientState clientState, BaseFlowStep flowStep, long pollingInterval, long statsStoreInterval ) 067 { 068 this.flowStep = flowStep; 069 this.stepName = flowStep.getName(); 070 this.pollingInterval = pollingInterval; 071 this.statsStoreInterval = statsStoreInterval; 072 this.flowStepStats = createStepStats( clientState ); 073 074 this.flowStepStats.prepare(); 075 this.flowStepStats.markPending(); 076 } 077 078 public abstract Config getConfig(); 079 080 protected abstract FlowStepStats createStepStats( ClientState clientState ); 081 082 public synchronized void stop() 083 { 084 if( flowStep.isInfoEnabled() ) 085 flowStep.logInfo( "stopping: " + stepName ); 086 087 stop = true; 088 089 // allow pending -> stopped transition 090 // never want a hanging pending state 091 if( !flowStepStats.isFinished() ) 092 flowStepStats.markStopped(); 093 094 try 095 { 096 internalBlockOnStop(); 097 } 098 catch( IOException exception ) 099 { 100 flowStep.logWarn( "unable to kill job: " + stepName, exception ); 101 } 102 finally 103 { 104 // call rollback after the job has been stopped, only if it was stopped 105 if( flowStepStats.isStopped() ) 106 { 107 flowStep.rollbackSinks(); 108 flowStep.fireOnStopping(); 109 } 110 111 flowStepStats.cleanup(); 112 } 113 } 114 115 protected abstract void internalBlockOnStop() throws IOException; 116 117 public void setPredecessors( List<FlowStepJob<Config>> predecessors ) 118 { 119 this.predecessors = predecessors; 120 } 121 122 public Throwable call() 123 { 124 start(); 125 126 return throwable; 127 } 128 129 protected void start() 130 { 131 try 132 { 133 if( isSkipFlowStep() ) 134 { 135 markSkipped(); 136 137 if( flowStep.isInfoEnabled() ) 138 flowStep.logInfo( "skipping step: " + stepName ); 139 140 return; 141 } 142 143 flowStepStats.markStarted(); 144 145 blockOnPredecessors(); 146 147 applyFlowStepConfStrategy(); 148 149 blockOnJob(); 150 } 151 catch( Throwable throwable ) 152 { 153 dumpDebugInfo(); 154 this.throwable = throwable; 155 flowStep.fireOnThrowable( throwable ); 156 } 157 finally 158 { 159 latch.countDown(); 160 flowStepStats.cleanup(); 161 } 162 } 163 164 private void applyFlowStepConfStrategy() 165 { 166 FlowStepStrategy flowStepStrategy = flowStep.getFlow().getFlowStepStrategy(); 167 168 if( flowStepStrategy == null ) 169 return; 170 171 List<FlowStep> predecessorSteps = new ArrayList<FlowStep>(); 172 173 for( FlowStepJob predecessor : predecessors ) 174 predecessorSteps.add( predecessor.flowStep ); 175 176 flowStepStrategy.apply( flowStep.getFlow(), predecessorSteps, flowStep ); 177 } 178 179 protected boolean isSkipFlowStep() throws IOException 180 { 181 // if runID is not set, never skip a step 182 if( flowStep.getFlow().getRunID() == null ) 183 return false; 184 185 return flowStep.allSourcesExist() && !flowStep.areSourcesNewer( flowStep.getSinkModified() ); 186 } 187 188 protected void blockOnJob() throws IOException 189 { 190 if( stop ) // true if a predecessor failed 191 return; 192 193 if( flowStep.isInfoEnabled() ) 194 flowStep.logInfo( "starting step: " + stepName ); 195 196 internalNonBlockingStart(); 197 198 markSubmitted(); 199 flowStep.fireOnStarting(); 200 201 blockTillCompleteOrStopped(); 202 203 if( !stop && !internalNonBlockingIsSuccessful() ) 204 { 205 if( !flowStepStats.isFinished() ) 206 { 207 flowStep.rollbackSinks(); 208 flowStepStats.markFailed( getThrowable() ); 209 flowStep.fireOnThrowable( getThrowable() ); 210 } 211 212 dumpDebugInfo(); 213 214 // if available, rethrow the unrecoverable error 215 if( getThrowable() instanceof OutOfMemoryError ) 216 throw ( (OutOfMemoryError) getThrowable() ); 217 218 if( !isRemoteExecution() ) 219 throwable = new FlowException( "local step failed", getThrowable() ); 220 else 221 throwable = new FlowException( "step failed: " + stepName + ", with job id: " + internalJobId() + ", please see cluster logs for failure messages" ); 222 } 223 else 224 { 225 if( internalNonBlockingIsSuccessful() && !flowStepStats.isFinished() ) 226 { 227 throwable = flowStep.commitSinks(); 228 229 if( throwable != null ) 230 { 231 flowStepStats.markFailed( throwable ); 232 flowStep.fireOnThrowable( throwable ); 233 } 234 else 235 { 236 flowStepStats.markSuccessful(); 237 flowStep.fireOnCompleted(); 238 } 239 } 240 } 241 242 flowStepStats.recordChildStats(); 243 } 244 245 protected abstract boolean isRemoteExecution(); 246 247 protected abstract String internalJobId(); 248 249 protected abstract boolean internalNonBlockingIsSuccessful() throws IOException; 250 251 protected abstract Throwable getThrowable(); 252 253 protected abstract void internalNonBlockingStart() throws IOException; 254 255 protected void blockTillCompleteOrStopped() throws IOException 256 { 257 int iterations = (int) Math.floor( statsStoreInterval / pollingInterval ); 258 int count = 0; 259 260 while( true ) 261 { 262 if( flowStepStats.isSubmitted() && isStarted() ) 263 { 264 markRunning(); 265 flowStep.fireOnRunning(); 266 } 267 268 if( stop || internalNonBlockingIsComplete() ) 269 break; 270 271 sleepForPollingInterval(); 272 273 if( iterations == count++ ) 274 { 275 count = 0; 276 flowStepStats.recordStats(); 277 flowStepStats.recordChildStats(); 278 } 279 } 280 } 281 282 private synchronized void markSubmitted() 283 { 284 if( flowStepStats.isStarted() ) 285 flowStepStats.markSubmitted(); 286 287 Flow flow = flowStep.getFlow(); 288 289 if( flow == null ) 290 { 291 LOG.warn( "no parent flow set" ); 292 return; 293 } 294 295 FlowStats flowStats = flow.getFlowStats(); 296 297 synchronized( flowStats ) 298 { 299 if( flowStats.isStarted() ) 300 flowStats.markSubmitted(); 301 } 302 } 303 304 private synchronized void markRunning() 305 { 306 flowStepStats.markRunning(); 307 308 markFlowRunning(); 309 } 310 311 private synchronized void markSkipped() 312 { 313 flowStepStats.markSkipped(); 314 315 markFlowRunning(); 316 } 317 318 private synchronized void markFlowRunning() 319 { 320 Flow flow = flowStep.getFlow(); 321 322 if( flow == null ) 323 { 324 LOG.warn( "no parent flow set" ); 325 return; 326 } 327 328 FlowStats flowStats = flow.getFlowStats(); 329 330 synchronized( flowStats ) 331 { 332 if( flowStats.isStarted() || flowStats.isSubmitted() ) 333 flowStats.markRunning(); 334 } 335 } 336 337 protected abstract boolean internalNonBlockingIsComplete() throws IOException; 338 339 protected void sleepForPollingInterval() 340 { 341 Util.safeSleep( pollingInterval ); 342 } 343 344 protected void blockOnPredecessors() 345 { 346 for( FlowStepJob predecessor : predecessors ) 347 { 348 if( !predecessor.isSuccessful() ) 349 { 350 flowStep.logWarn( "abandoning step: " + stepName + ", predecessor failed: " + predecessor.stepName ); 351 352 stop(); 353 } 354 } 355 } 356 357 protected abstract void dumpDebugInfo(); 358 359 /** 360 * Method isSuccessful returns true if this step completed successfully or was skipped. 361 * 362 * @return the successful (type boolean) of this FlowStepJob object. 363 */ 364 public boolean isSuccessful() 365 { 366 try 367 { 368 latch.await(); // freed after step completes in #start() 369 370 return flowStepStats.isSuccessful() || flowStepStats.isSkipped(); 371 } 372 catch( InterruptedException exception ) 373 { 374 flowStep.logWarn( "latch interrupted", exception ); 375 376 return false; 377 } 378 catch( NullPointerException exception ) 379 { 380 throw new FlowException( "Hadoop is not keeping a large enough job history, please increase the \'mapred.jobtracker.completeuserjobs.maximum\' property", exception ); 381 } 382 } 383 384 /** 385 * Method wasStarted returns true if this job was started 386 * 387 * @return boolean 388 */ 389 public boolean isStarted() 390 { 391 return internalIsStarted(); 392 } 393 394 protected abstract boolean internalIsStarted(); 395 396 /** 397 * Method getStepStats returns the stepStats of this FlowStepJob object. 398 * 399 * @return the stepStats (type StepStats) of this FlowStepJob object. 400 */ 401 public FlowStepStats getStepStats() 402 { 403 return flowStepStats; 404 } 405 }