001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.List; 027import java.util.concurrent.Callable; 028import java.util.concurrent.CountDownLatch; 029 030import cascading.flow.Flow; 031import cascading.flow.FlowException; 032import cascading.flow.FlowStep; 033import cascading.flow.FlowStepStrategy; 034import cascading.management.state.ClientState; 035import cascading.stats.CascadingStats; 036import cascading.stats.FlowNodeStats; 037import cascading.stats.FlowStats; 038import cascading.stats.FlowStepStats; 039import cascading.util.Util; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import static cascading.util.Util.formatDurationFromMillis; 044 045/** 046 * 047 */ 048public abstract class FlowStepJob<Config> implements Callable<Throwable> 049 { 050 // most logs messages should be delegated to the FlowStep.log* methods 051 // non job related issues can use this logger 052 private static final Logger LOG = LoggerFactory.getLogger( FlowStepJob.class ); 053 054 /** Field stepName */ 055 protected final String stepName; 056 /** Field jobConfiguration */ 057 protected final Config jobConfiguration; 058 /** Field pollingInterval */ 059 protected long pollingInterval = 1000; 060 /** Field recordStatsInterval */ 061 protected long statsStoreInterval = 60 * 1000; 062 /** Field waitTillCompletedChildStatsDuration */ 063 protected long blockForCompletedChildDetailDuration = 60 * 1000; 064 /** Field predecessors */ 065 protected List<FlowStepJob<Config>> predecessors; 066 /** Field latch */ 067 private final CountDownLatch latch = new CountDownLatch( 1 ); 068 /** Field stop */ 069 private boolean stop = false; 070 /** Field flowStep */ 071 protected final BaseFlowStep<Config> flowStep; 072 /** Field stepStats */ 073 protected FlowStepStats flowStepStats; 074 /** Field throwable */ 075 protected Throwable throwable; 076 077 public FlowStepJob( ClientState clientState, Config jobConfiguration, BaseFlowStep<Config> flowStep, long pollingInterval, long statsStoreInterval, long blockForCompletedChildDetailDuration ) 078 { 079 this.jobConfiguration = jobConfiguration; 080 this.stepName = flowStep.getName(); 081 this.pollingInterval = pollingInterval; 082 this.statsStoreInterval = statsStoreInterval; 083 this.blockForCompletedChildDetailDuration = blockForCompletedChildDetailDuration; 084 this.flowStep = flowStep; 085 this.flowStepStats = createStepStats( clientState ); 086 087 this.flowStepStats.prepare(); 088 this.flowStepStats.markPending(); 089 090 for( FlowNodeStats flowNodeStats : this.flowStepStats.getFlowNodeStats() ) 091 { 092 flowNodeStats.prepare(); 093 flowNodeStats.markPending(); 094 } 095 } 096 097 public Config getConfig() 098 { 099 return jobConfiguration; 100 } 101 102 protected abstract FlowStepStats createStepStats( ClientState clientState ); 103 104 public synchronized void stop() 105 { 106 if( flowStep.isInfoEnabled() ) 107 flowStep.logInfo( "stopping: " + stepName ); 108 109 stop = true; 110 111 // allow pending -> stopped transition 112 // never want a hanging pending state 113 if( !flowStepStats.isFinished() ) 114 flowStepStats.markStopped(); 115 116 try 117 { 118 internalBlockOnStop(); 119 } 120 catch( IOException exception ) 121 { 122 flowStep.logWarn( "unable to kill job: " + stepName, exception ); 123 } 124 finally 125 { 126 // call rollback after the job has been stopped, only if it was stopped 127 if( flowStepStats.isStopped() ) 128 { 129 flowStep.rollbackSinks(); 130 flowStep.fireOnStopping(); 131 } 132 133 flowStepStats.cleanup(); 134 } 135 } 136 137 protected abstract void internalBlockOnStop() throws IOException; 138 139 public void setPredecessors( List<FlowStepJob<Config>> predecessors ) 140 { 141 this.predecessors = predecessors; 142 } 143 144 public Throwable call() 145 { 146 start(); 147 148 return throwable; 149 } 150 151 protected void start() 152 { 153 try 154 { 155 if( isSkipFlowStep() ) 156 { 157 markSkipped(); 158 159 if( flowStep.isInfoEnabled() && flowStepStats.isSkipped() ) 160 flowStep.logInfo( "skipping step: " + stepName ); 161 162 return; 163 } 164 165 synchronized( this ) // added in 3.0, jdk1.7 may have a aggravated 166 { 167 if( stop ) 168 { 169 if( flowStep.isInfoEnabled() ) 170 flowStep.logInfo( "stop called before start: " + stepName ); 171 172 return; 173 } 174 175 markStarted(); 176 } 177 178 blockOnPredecessors(); 179 180 prepareResources(); // throws Throwable to skip next steps 181 182 applyFlowStepConfStrategy(); // prepare resources beforehand 183 184 blockOnJob(); 185 } 186 catch( Throwable throwable ) 187 { 188 this.throwable = throwable; // store first, in case throwable leaks out of dumpDebugInfo 189 190 dumpDebugInfo(); 191 192 // in case isSkipFlowStep fails before the previous markStarted, we don't fail advancing to the failed state 193 if( flowStepStats.isPending() ) 194 markStarted(); 195 196 if( !flowStepStats.isFinished() ) 197 { 198 flowStepStats.markFailed( this.throwable ); 199 flowStep.fireOnThrowable( this.throwable ); 200 } 201 } 202 finally 203 { 204 latch.countDown(); 205 206 // lets free the latch and capture any failure info if exiting via a throwable 207 // remain in this thread to keep client side alive until shutdown 208 finalizeNodeSliceCapture(); 209 210 flowStepStats.cleanup(); 211 } 212 213 internalCleanup(); 214 } 215 216 private void prepareResources() throws Throwable 217 { 218 if( stop ) // true if a predecessor failed 219 return; 220 221 Throwable throwable = flowStep.prepareResources(); 222 223 if( throwable != null ) 224 throw throwable; 225 } 226 227 private synchronized boolean markStarted() 228 { 229 if( flowStepStats.isFinished() ) // if stopped, return 230 return false; 231 232 flowStepStats.markStarted(); 233 234 return true; 235 } 236 237 private void applyFlowStepConfStrategy() 238 { 239 FlowStepStrategy flowStepStrategy = flowStep.getFlow().getFlowStepStrategy(); 240 241 if( flowStepStrategy == null ) 242 return; 243 244 List<FlowStep> predecessorSteps = new ArrayList<FlowStep>(); 245 246 for( FlowStepJob predecessor : predecessors ) 247 predecessorSteps.add( predecessor.flowStep ); 248 249 flowStepStrategy.apply( flowStep.getFlow(), predecessorSteps, flowStep ); 250 } 251 252 protected boolean isSkipFlowStep() throws IOException 253 { 254 // if runID is not set, never skip a step 255 if( flowStep.getFlow().getRunID() == null ) 256 return false; 257 258 return flowStep.allSourcesExist() && !flowStep.areSourcesNewer( flowStep.getSinkModified() ); 259 } 260 261 protected void blockOnJob() throws IOException 262 { 263 if( stop ) // true if a predecessor failed 264 return; 265 266 if( flowStep.isInfoEnabled() ) 267 flowStep.logInfo( "starting step: " + stepName ); 268 269 internalNonBlockingStart(); 270 271 markSubmitted(); 272 flowStep.fireOnStarting(); 273 274 blockTillCompleteOrStopped(); 275 276 if( !stop && !internalNonBlockingIsSuccessful() ) 277 { 278 if( !flowStepStats.isFinished() ) 279 { 280 flowStep.rollbackSinks(); 281 flowStepStats.markFailed( getThrowable() ); 282 updateNodesStatus(); 283 flowStep.fireOnThrowable( getThrowable() ); 284 } 285 286 // if available, rethrow the unrecoverable error 287 if( getThrowable() instanceof OutOfMemoryError ) 288 throw (OutOfMemoryError) getThrowable(); 289 290 dumpDebugInfo(); 291 292 if( !isRemoteExecution() ) 293 this.throwable = new FlowException( "local step failed: " + stepName, getThrowable() ); 294 else 295 this.throwable = new FlowException( "step failed: " + stepName + ", step id: " + getStepStats().getID() + ", job id: " + internalJobId() + ", please see cluster logs for failure messages" ); 296 } 297 else 298 { 299 if( internalNonBlockingIsSuccessful() && !flowStepStats.isFinished() ) 300 { 301 this.throwable = flowStep.commitSinks(); 302 303 if( this.throwable != null ) 304 { 305 flowStepStats.markFailed( this.throwable ); 306 updateNodesStatus(); 307 flowStep.fireOnThrowable( this.throwable ); 308 } 309 else 310 { 311 flowStepStats.markSuccessful(); 312 updateNodesStatus(); 313 flowStep.fireOnCompleted(); 314 } 315 } 316 } 317 } 318 319 protected void finalizeNodeSliceCapture() 320 { 321 long startOfFinalPolling = System.currentTimeMillis(); 322 long lastLog = 0; 323 long retries = 0; 324 325 boolean allNodesFinished; 326 327 while( true ) 328 { 329 allNodesFinished = updateNodesStatus(); 330 331 flowStepStats.recordChildStats(); 332 333 if( allNodesFinished && flowStepStats.hasCapturedFinalDetail() ) 334 break; 335 336 if( ( System.currentTimeMillis() - startOfFinalPolling ) >= blockForCompletedChildDetailDuration ) 337 break; 338 339 if( System.currentTimeMillis() - lastLog > 1000 ) 340 { 341 if( !allNodesFinished ) 342 flowStep.logInfo( "did not capture all completed node details, will retry in {}, prior retries: {}", formatDurationFromMillis( pollingInterval ), retries ); 343 else 344 flowStep.logInfo( "did not capture all completed slice details, will retry in {}, prior retries: {}", formatDurationFromMillis( pollingInterval ), retries ); 345 346 lastLog = System.currentTimeMillis(); 347 } 348 349 retries++; 350 351 sleepForPollingInterval(); 352 } 353 354 if( !allNodesFinished ) 355 flowStep.logWarn( "unable to capture all completed node details or determine final state within configured duration: {}, configure property to increase wait duration: '{}'", formatDurationFromMillis( blockForCompletedChildDetailDuration ), CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION ); 356 357 if( !flowStepStats.hasCapturedFinalDetail() ) 358 flowStep.logWarn( "unable to capture all completed slice details within configured duration: {}, configure property to increase wait duration: '{}'", formatDurationFromMillis( blockForCompletedChildDetailDuration ), CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION ); 359 } 360 361 protected abstract boolean isRemoteExecution(); 362 363 protected abstract String internalJobId(); 364 365 protected abstract boolean internalNonBlockingIsSuccessful() throws IOException; 366 367 protected abstract Throwable getThrowable(); 368 369 protected abstract void internalNonBlockingStart() throws IOException; 370 371 protected void blockTillCompleteOrStopped() throws IOException 372 { 373 int iterations = (int) Math.floor( statsStoreInterval / pollingInterval ); 374 int count = 0; 375 376 while( true ) 377 { 378 // test stop last, internalIsStartedRunning may block causing a race condition 379 if( flowStepStats.isSubmitted() && internalIsStartedRunning() && !stop ) 380 { 381 markRunning(); 382 flowStep.fireOnRunning(); 383 } 384 385 if( flowStepStats.isRunning() ) 386 updateNodesStatus(); // records node stats on node status change, not slices 387 388 if( stop || internalNonBlockingIsComplete() ) 389 break; 390 391 if( iterations == count++ ) 392 { 393 count = 0; 394 flowStepStats.recordStats(); 395 flowStepStats.recordChildStats(); // records node and slice stats 396 } 397 398 sleepForPollingInterval(); 399 } 400 } 401 402 private synchronized void markSubmitted() 403 { 404 if( flowStepStats.isStarted() ) 405 { 406 flowStepStats.markSubmitted(); 407 408 Collection<FlowNodeStats> children = flowStepStats.getChildren(); 409 410 for( FlowNodeStats flowNodeStats : children ) 411 flowNodeStats.markStarted(); 412 } 413 414 Flow flow = flowStep.getFlow(); 415 416 if( flow == null ) 417 { 418 LOG.warn( "no parent flow set" ); 419 return; 420 } 421 422 FlowStats flowStats = flow.getFlowStats(); 423 424 synchronized( flowStats ) 425 { 426 if( flowStats.isStarted() ) 427 flowStats.markSubmitted(); 428 } 429 } 430 431 private synchronized void markSkipped() 432 { 433 if( flowStepStats.isFinished() ) 434 return; 435 436 try 437 { 438 flowStepStats.markSkipped(); 439 flowStep.fireOnCompleted(); 440 } 441 finally 442 { 443 markFlowRunning(); // move to running before marking failed 444 } 445 } 446 447 private synchronized void markRunning() 448 { 449 flowStepStats.markRunning(); 450 451 markFlowRunning(); 452 } 453 454 private synchronized void markFlowRunning() 455 { 456 Flow flow = flowStep.getFlow(); 457 458 if( flow == null ) 459 { 460 LOG.warn( "no parent flow set" ); 461 return; 462 } 463 464 FlowStats flowStats = flow.getFlowStats(); 465 466 synchronized( flowStats ) 467 { 468 if( flowStats.isStarted() || flowStats.isSubmitted() ) 469 flowStats.markRunning(); 470 } 471 } 472 473 private boolean updateNodesStatus() 474 { 475 boolean allFinished = true; 476 477 Collection<FlowNodeStats> children = flowStepStats.getFlowNodeStats(); 478 479 for( FlowNodeStats child : children ) 480 { 481 // child#markStarted is called above 482 if( child.isFinished() || child.isPending() ) 483 continue; 484 485 updateNodeStatus( child ); 486 487 allFinished &= child.isFinished(); 488 } 489 490 return allFinished; 491 } 492 493 protected abstract void updateNodeStatus( FlowNodeStats flowNodeStats ); 494 495 protected abstract boolean internalNonBlockingIsComplete() throws IOException; 496 497 protected void sleepForPollingInterval() 498 { 499 Util.safeSleep( pollingInterval ); 500 } 501 502 protected void blockOnPredecessors() 503 { 504 for( FlowStepJob predecessor : predecessors ) 505 { 506 if( !predecessor.isSuccessful() ) 507 { 508 flowStep.logWarn( "abandoning step: " + stepName + ", predecessor failed: " + predecessor.stepName ); 509 510 stop(); 511 } 512 } 513 } 514 515 protected abstract void dumpDebugInfo(); 516 517 /** 518 * Method isSuccessful returns true if this step completed successfully or was skipped. 519 * 520 * @return the successful (type boolean) of this FlowStepJob object. 521 */ 522 public boolean isSuccessful() 523 { 524 try 525 { 526 latch.await(); // freed after step completes in #start() 527 528 return flowStepStats.isSuccessful() || flowStepStats.isSkipped(); 529 } 530 catch( InterruptedException exception ) 531 { 532 flowStep.logWarn( "latch interrupted", exception ); 533 534 return false; 535 } 536 } 537 538 /** 539 * Method isStarted returns true if this underlying job has started running 540 * 541 * @return boolean 542 */ 543 public boolean isStarted() 544 { 545 return internalIsStartedRunning(); 546 } 547 548 protected abstract boolean internalIsStartedRunning(); 549 550 protected void internalCleanup() 551 { 552 // optional, safe to override 553 } 554 555 /** 556 * Method getStepStats returns the stepStats of this FlowStepJob object. 557 * 558 * @return the stepStats (type StepStats) of this FlowStepJob object. 559 */ 560 public FlowStepStats getStepStats() 561 { 562 return flowStepStats; 563 } 564 }