001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.planner; 022 023 import java.io.IOException; 024 import java.util.ArrayList; 025 import java.util.List; 026 import java.util.concurrent.Callable; 027 import java.util.concurrent.CountDownLatch; 028 029 import cascading.flow.Flow; 030 import cascading.flow.FlowException; 031 import cascading.flow.FlowStep; 032 import cascading.flow.FlowStepStrategy; 033 import cascading.management.state.ClientState; 034 import cascading.stats.FlowStats; 035 import cascading.stats.FlowStepStats; 036 import cascading.util.Util; 037 import org.slf4j.Logger; 038 import org.slf4j.LoggerFactory; 039 040 /** 041 * 042 */ 043 public abstract class FlowStepJob<Config> implements Callable<Throwable> 044 { 045 private static final Logger LOG = LoggerFactory.getLogger( FlowStepJob.class ); 046 047 /** Field stepName */ 048 protected final String stepName; 049 /** Field pollingInterval */ 050 protected long pollingInterval = 1000; 051 /** Field recordStatsInterval */ 052 protected long statsStoreInterval = 60 * 1000; 053 /** Field predecessors */ 054 protected List<FlowStepJob<Config>> predecessors; 055 /** Field latch */ 056 private final CountDownLatch latch = new CountDownLatch( 1 ); 057 /** Field stop */ 058 private boolean stop = false; 059 /** Field flowStep */ 060 protected final BaseFlowStep<Config> flowStep; 061 /** Field stepStats */ 062 protected FlowStepStats flowStepStats; 063 /** Field throwable */ 064 protected Throwable throwable; 065 066 public FlowStepJob( ClientState clientState, BaseFlowStep flowStep, long pollingInterval, long statsStoreInterval ) 067 { 068 this.flowStep = flowStep; 069 this.stepName = flowStep.getName(); 070 this.pollingInterval = pollingInterval; 071 this.statsStoreInterval = statsStoreInterval; 072 this.flowStepStats = createStepStats( clientState ); 073 074 this.flowStepStats.prepare(); 075 this.flowStepStats.markPending(); 076 } 077 078 public abstract Config getConfig(); 079 080 protected abstract FlowStepStats createStepStats( ClientState clientState ); 081 082 public synchronized void stop() 083 { 084 if( flowStep.isInfoEnabled() ) 085 flowStep.logInfo( "stopping: " + stepName ); 086 087 stop = true; 088 089 // allow pending -> stopped transition 090 // never want a hanging pending state 091 if( !flowStepStats.isFinished() ) 092 flowStepStats.markStopped(); 093 094 try 095 { 096 internalBlockOnStop(); 097 } 098 catch( IOException exception ) 099 { 100 flowStep.logWarn( "unable to kill job: " + stepName, exception ); 101 } 102 finally 103 { 104 // call rollback after the job has been stopped, only if it was stopped 105 if( flowStepStats.isStopped() ) 106 { 107 flowStep.rollbackSinks(); 108 flowStep.fireOnStopping(); 109 } 110 111 flowStepStats.cleanup(); 112 } 113 } 114 115 protected abstract void internalBlockOnStop() throws IOException; 116 117 public void setPredecessors( List<FlowStepJob<Config>> predecessors ) 118 { 119 this.predecessors = predecessors; 120 } 121 122 public Throwable call() 123 { 124 start(); 125 126 return throwable; 127 } 128 129 protected void start() 130 { 131 try 132 { 133 if( isSkipFlowStep() ) 134 { 135 markSkipped(); 136 137 if( flowStep.isInfoEnabled() && flowStepStats.isSkipped() ) 138 flowStep.logInfo( "skipping step: " + stepName ); 139 140 return; 141 } 142 143 synchronized( this ) // backport from 3.0 144 { 145 if( stop ) 146 { 147 if( flowStep.isInfoEnabled() ) 148 flowStep.logInfo( "stop called before start: " + stepName ); 149 return; 150 } 151 152 if( !markStarted() ) 153 return; 154 } 155 156 blockOnPredecessors(); 157 158 applyFlowStepConfStrategy(); 159 160 blockOnJob(); 161 } 162 catch( Throwable throwable ) 163 { 164 dumpDebugInfo(); 165 this.throwable = throwable; 166 flowStep.fireOnThrowable( throwable ); 167 } 168 finally 169 { 170 latch.countDown(); 171 flowStepStats.cleanup(); 172 } 173 } 174 175 private synchronized boolean markStarted() 176 { 177 if( flowStepStats.isFinished() ) // if stopped, return 178 return false; 179 180 flowStepStats.markStarted(); 181 182 return true; 183 } 184 185 private void applyFlowStepConfStrategy() 186 { 187 FlowStepStrategy flowStepStrategy = flowStep.getFlow().getFlowStepStrategy(); 188 189 if( flowStepStrategy == null ) 190 return; 191 192 List<FlowStep> predecessorSteps = new ArrayList<FlowStep>(); 193 194 for( FlowStepJob predecessor : predecessors ) 195 predecessorSteps.add( predecessor.flowStep ); 196 197 flowStepStrategy.apply( flowStep.getFlow(), predecessorSteps, flowStep ); 198 } 199 200 protected boolean isSkipFlowStep() throws IOException 201 { 202 // if runID is not set, never skip a step 203 if( flowStep.getFlow().getRunID() == null ) 204 return false; 205 206 return flowStep.allSourcesExist() && !flowStep.areSourcesNewer( flowStep.getSinkModified() ); 207 } 208 209 protected void blockOnJob() throws IOException 210 { 211 if( stop ) // true if a predecessor failed 212 return; 213 214 if( flowStep.isInfoEnabled() ) 215 flowStep.logInfo( "starting step: " + stepName ); 216 217 internalNonBlockingStart(); 218 219 markSubmitted(); 220 flowStep.fireOnStarting(); 221 222 blockTillCompleteOrStopped(); 223 224 if( !stop && !internalNonBlockingIsSuccessful() ) 225 { 226 if( !flowStepStats.isFinished() ) 227 { 228 flowStep.rollbackSinks(); 229 flowStepStats.markFailed( getThrowable() ); 230 flowStep.fireOnThrowable( getThrowable() ); 231 } 232 233 dumpDebugInfo(); 234 235 // if available, rethrow the unrecoverable error 236 if( getThrowable() instanceof OutOfMemoryError ) 237 throw ( (OutOfMemoryError) getThrowable() ); 238 239 if( !isRemoteExecution() ) 240 throwable = new FlowException( "local step failed", getThrowable() ); 241 else 242 throwable = new FlowException( "step failed: " + stepName + ", with job id: " + internalJobId() + ", please see cluster logs for failure messages" ); 243 } 244 else 245 { 246 if( internalNonBlockingIsSuccessful() && !flowStepStats.isFinished() ) 247 { 248 throwable = flowStep.commitSinks(); 249 250 if( throwable != null ) 251 { 252 flowStepStats.markFailed( throwable ); 253 flowStep.fireOnThrowable( throwable ); 254 } 255 else 256 { 257 flowStepStats.markSuccessful(); 258 flowStep.fireOnCompleted(); 259 } 260 } 261 } 262 263 flowStepStats.recordChildStats(); 264 } 265 266 protected abstract boolean isRemoteExecution(); 267 268 protected abstract String internalJobId(); 269 270 protected abstract boolean internalNonBlockingIsSuccessful() throws IOException; 271 272 protected abstract Throwable getThrowable(); 273 274 protected abstract void internalNonBlockingStart() throws IOException; 275 276 protected void blockTillCompleteOrStopped() throws IOException 277 { 278 int iterations = (int) Math.floor( statsStoreInterval / pollingInterval ); 279 int count = 0; 280 281 while( true ) 282 { 283 if( flowStepStats.isSubmitted() && isStarted() ) 284 { 285 markRunning(); 286 flowStep.fireOnRunning(); 287 } 288 289 if( stop || internalNonBlockingIsComplete() ) 290 break; 291 292 sleepForPollingInterval(); 293 294 if( iterations == count++ ) 295 { 296 count = 0; 297 flowStepStats.recordStats(); 298 flowStepStats.recordChildStats(); 299 } 300 } 301 } 302 303 private synchronized void markSubmitted() 304 { 305 if( flowStepStats.isStarted() ) 306 flowStepStats.markSubmitted(); 307 308 Flow flow = flowStep.getFlow(); 309 310 if( flow == null ) 311 { 312 LOG.warn( "no parent flow set" ); 313 return; 314 } 315 316 FlowStats flowStats = flow.getFlowStats(); 317 318 synchronized( flowStats ) 319 { 320 if( flowStats.isStarted() ) 321 flowStats.markSubmitted(); 322 } 323 } 324 325 private synchronized void markRunning() 326 { 327 flowStepStats.markRunning(); 328 329 markFlowRunning(); 330 } 331 332 private synchronized void markSkipped() 333 { 334 if( flowStepStats.isFinished() ) 335 return; 336 337 flowStepStats.markSkipped(); 338 339 markFlowRunning(); 340 } 341 342 private synchronized void markFlowRunning() 343 { 344 Flow flow = flowStep.getFlow(); 345 346 if( flow == null ) 347 { 348 LOG.warn( "no parent flow set" ); 349 return; 350 } 351 352 FlowStats flowStats = flow.getFlowStats(); 353 354 synchronized( flowStats ) 355 { 356 if( flowStats.isStarted() || flowStats.isSubmitted() ) 357 flowStats.markRunning(); 358 } 359 } 360 361 protected abstract boolean internalNonBlockingIsComplete() throws IOException; 362 363 protected void sleepForPollingInterval() 364 { 365 Util.safeSleep( pollingInterval ); 366 } 367 368 protected void blockOnPredecessors() 369 { 370 for( FlowStepJob predecessor : predecessors ) 371 { 372 if( !predecessor.isSuccessful() ) 373 { 374 flowStep.logWarn( "abandoning step: " + stepName + ", predecessor failed: " + predecessor.stepName ); 375 376 stop(); 377 } 378 } 379 } 380 381 protected abstract void dumpDebugInfo(); 382 383 /** 384 * Method isSuccessful returns true if this step completed successfully or was skipped. 385 * 386 * @return the successful (type boolean) of this FlowStepJob object. 387 */ 388 public boolean isSuccessful() 389 { 390 try 391 { 392 latch.await(); // freed after step completes in #start() 393 394 return flowStepStats.isSuccessful() || flowStepStats.isSkipped(); 395 } 396 catch( InterruptedException exception ) 397 { 398 flowStep.logWarn( "latch interrupted", exception ); 399 400 return false; 401 } 402 catch( NullPointerException exception ) 403 { 404 throw new FlowException( "Hadoop is not keeping a large enough job history, please increase the \'mapred.jobtracker.completeuserjobs.maximum\' property", exception ); 405 } 406 } 407 408 /** 409 * Method wasStarted returns true if this job was started 410 * 411 * @return boolean 412 */ 413 public boolean isStarted() 414 { 415 return internalIsStarted(); 416 } 417 418 protected abstract boolean internalIsStarted(); 419 420 /** 421 * Method getStepStats returns the stepStats of this FlowStepJob object. 422 * 423 * @return the stepStats (type StepStats) of this FlowStepJob object. 424 */ 425 public FlowStepStats getStepStats() 426 { 427 return flowStepStats; 428 } 429 }