001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.List; 027import java.util.concurrent.Callable; 028import java.util.concurrent.CountDownLatch; 029import java.util.concurrent.atomic.AtomicBoolean; 030 031import cascading.flow.Flow; 032import cascading.flow.FlowException; 033import cascading.flow.FlowStep; 034import cascading.flow.FlowStepStrategy; 035import cascading.management.state.ClientState; 036import cascading.stats.CascadingStats; 037import cascading.stats.FlowNodeStats; 038import cascading.stats.FlowStats; 039import cascading.stats.FlowStepStats; 040import cascading.util.Util; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import static cascading.util.Util.formatDurationFromMillis; 045 046/** 047 * 048 */ 049public abstract class FlowStepJob<Config> implements Callable<Throwable> 050 { 051 // most logs messages should be delegated to the FlowStep.log* methods 052 // non job related issues can use this logger 053 private static final Logger LOG = LoggerFactory.getLogger( FlowStepJob.class ); 054 055 /** Field stepName */ 056 protected final String stepName; 057 /** Field jobConfiguration */ 058 protected final Config jobConfiguration; 059 /** Field pollingInterval */ 060 protected long pollingInterval = 1000; 061 /** Field recordStatsInterval */ 062 protected long statsStoreInterval = 60 * 1000; 063 /** Field waitTillCompletedChildStatsDuration */ 064 protected long blockForCompletedChildDetailDuration = 60 * 1000; 065 /** Field predecessors */ 066 protected List<FlowStepJob<Config>> predecessors; 067 /** Field latch */ 068 private final CountDownLatch latch = new CountDownLatch( 1 ); 069 /** Field started */ 070 private AtomicBoolean callableStarted = new AtomicBoolean( false ); 071 /** Field stop */ 072 private volatile boolean stop = false; 073 /** Field flowStep */ 074 protected final BaseFlowStep<Config> flowStep; 075 /** Field stepStats */ 076 protected FlowStepStats flowStepStats; 077 /** Field throwable */ 078 protected Throwable throwable; 079 080 public FlowStepJob( ClientState clientState, Config jobConfiguration, BaseFlowStep<Config> flowStep, long pollingInterval, long statsStoreInterval, long blockForCompletedChildDetailDuration ) 081 { 082 this.jobConfiguration = jobConfiguration; 083 this.stepName = flowStep.getName(); 084 this.pollingInterval = pollingInterval; 085 this.statsStoreInterval = statsStoreInterval; 086 this.blockForCompletedChildDetailDuration = blockForCompletedChildDetailDuration; 087 this.flowStep = flowStep; 088 this.flowStepStats = createStepStats( clientState ); 089 090 this.flowStepStats.prepare(); 091 this.flowStepStats.markPending(); 092 093 for( FlowNodeStats flowNodeStats : this.flowStepStats.getFlowNodeStats() ) 094 { 095 flowNodeStats.prepare(); 096 flowNodeStats.markPending(); 097 } 098 } 099 100 public Config getConfig() 101 { 102 return jobConfiguration; 103 } 104 105 protected abstract FlowStepStats createStepStats( ClientState clientState ); 106 107 public synchronized void stop() 108 { 109 if( flowStep.isInfoEnabled() ) 110 flowStep.logInfo( "stopping: " + stepName ); 111 112 stop = true; 113 114 // allow pending -> stopped transition 115 // never want a hanging pending state 116 if( !flowStepStats.isFinished() ) 117 flowStepStats.markStopped(); 118 119 try 120 { 121 internalBlockOnStop(); 122 } 123 catch( IOException exception ) 124 { 125 flowStep.logWarn( "unable to kill job: " + stepName, exception ); 126 } 127 finally 128 { 129 // call rollback after the job has been stopped, only if it was stopped 130 if( flowStepStats.isStopped() ) 131 { 132 flowStep.rollbackSinks(); 133 flowStep.fireOnStopping(); 134 } 135 136 flowStepStats.cleanup(); 137 } 138 } 139 140 protected abstract void internalBlockOnStop() throws IOException; 141 142 public void setPredecessors( List<FlowStepJob<Config>> predecessors ) 143 { 144 this.predecessors = predecessors; 145 } 146 147 public Throwable call() 148 { 149 start(); 150 151 return throwable; 152 } 153 154 public boolean isCallableStarted() 155 { 156 return callableStarted.get(); 157 } 158 159 protected void start() 160 { 161 if( callableStarted.getAndSet( true ) ) 162 return; 163 164 try 165 { 166 if( isSkipFlowStep() ) 167 { 168 markSkipped(); 169 170 if( flowStep.isInfoEnabled() && flowStepStats.isSkipped() ) 171 flowStep.logInfo( "skipping step: " + stepName ); 172 173 return; 174 } 175 176 synchronized( this ) // added in 3.0, jdk1.7 may have a aggravated 177 { 178 if( stop ) 179 { 180 if( flowStep.isInfoEnabled() ) 181 flowStep.logInfo( "stop called before start: " + stepName ); 182 183 return; 184 } 185 186 markStarted(); 187 } 188 189 blockOnPredecessors(); 190 191 prepareResources(); // throws Throwable to skip next steps 192 193 applyFlowStepConfStrategy(); // prepare resources beforehand 194 195 blockOnJob(); 196 } 197 catch( Throwable throwable ) 198 { 199 this.throwable = throwable; // store first, in case throwable leaks out of dumpDebugInfo 200 201 dumpDebugInfo(); 202 203 // in case isSkipFlowStep fails before the previous markStarted, we don't fail advancing to the failed state 204 if( flowStepStats.isPending() ) 205 markStarted(); 206 207 if( !flowStepStats.isFinished() ) 208 { 209 flowStepStats.markFailed( this.throwable ); 210 flowStep.fireOnThrowable( this.throwable ); 211 } 212 } 213 finally 214 { 215 latch.countDown(); 216 217 // lets free the latch and capture any failure info if exiting via a throwable 218 // remain in this thread to keep client side alive until shutdown 219 finalizeNodeSliceCapture(); 220 221 flowStepStats.cleanup(); 222 } 223 224 internalCleanup(); 225 } 226 227 private void prepareResources() throws Throwable 228 { 229 if( stop ) // true if a predecessor failed 230 return; 231 232 Throwable throwable = flowStep.prepareResources(); 233 234 if( throwable != null ) 235 throw throwable; 236 } 237 238 private synchronized boolean markStarted() 239 { 240 if( flowStepStats.isFinished() ) // if stopped, return 241 return false; 242 243 flowStepStats.markStarted(); 244 245 return true; 246 } 247 248 private void applyFlowStepConfStrategy() 249 { 250 FlowStepStrategy flowStepStrategy = flowStep.getFlow().getFlowStepStrategy(); 251 252 if( flowStepStrategy == null ) 253 return; 254 255 List<FlowStep> predecessorSteps = new ArrayList<FlowStep>(); 256 257 for( FlowStepJob predecessor : predecessors ) 258 predecessorSteps.add( predecessor.flowStep ); 259 260 flowStepStrategy.apply( flowStep.getFlow(), predecessorSteps, flowStep ); 261 } 262 263 protected boolean isSkipFlowStep() throws IOException 264 { 265 // if runID is not set, never skip a step 266 if( flowStep.getFlow().getRunID() == null ) 267 return false; 268 269 return flowStep.allSourcesExist() && !flowStep.areSourcesNewer( flowStep.getSinkModified() ); 270 } 271 272 protected void blockOnJob() throws IOException 273 { 274 if( stop ) // true if a predecessor failed 275 return; 276 277 if( flowStep.isInfoEnabled() ) 278 flowStep.logInfo( "starting step: " + stepName ); 279 280 internalNonBlockingStart(); 281 282 markSubmitted(); 283 flowStep.fireOnStarting(); 284 285 blockTillCompleteOrStopped(); 286 287 if( !stop && !internalNonBlockingIsSuccessful() ) 288 { 289 if( !flowStepStats.isFinished() ) 290 { 291 flowStep.rollbackSinks(); 292 flowStepStats.markFailed( getThrowable() ); 293 updateNodesStatus(); 294 flowStep.fireOnThrowable( getThrowable() ); 295 } 296 297 // if available, rethrow the unrecoverable error 298 if( getThrowable() instanceof OutOfMemoryError ) 299 throw (OutOfMemoryError) getThrowable(); 300 301 dumpDebugInfo(); 302 303 if( !isRemoteExecution() ) 304 this.throwable = new FlowException( "local step failed: " + stepName, getThrowable() ); 305 else 306 this.throwable = new FlowException( "step failed: " + stepName + ", step id: " + getStepStats().getID() + ", job id: " + internalJobId() + ", please see cluster logs for failure messages" ); 307 } 308 else 309 { 310 if( internalNonBlockingIsSuccessful() && !flowStepStats.isFinished() ) 311 { 312 this.throwable = flowStep.commitSinks(); 313 314 if( this.throwable != null ) 315 { 316 flowStepStats.markFailed( this.throwable ); 317 updateNodesStatus(); 318 flowStep.fireOnThrowable( this.throwable ); 319 } 320 else 321 { 322 flowStepStats.markSuccessful(); 323 updateNodesStatus(); 324 flowStep.fireOnCompleted(); 325 } 326 } 327 } 328 } 329 330 protected void finalizeNodeSliceCapture() 331 { 332 long startOfFinalPolling = System.currentTimeMillis(); 333 long lastLog = 0; 334 long retries = 0; 335 336 boolean allNodesFinished; 337 338 while( true ) 339 { 340 allNodesFinished = updateNodesStatus(); 341 342 flowStepStats.recordChildStats(); 343 344 if( allNodesFinished && flowStepStats.hasCapturedFinalDetail() ) 345 break; 346 347 if( ( System.currentTimeMillis() - startOfFinalPolling ) >= blockForCompletedChildDetailDuration ) 348 break; 349 350 if( System.currentTimeMillis() - lastLog > 1000 ) 351 { 352 if( !allNodesFinished ) 353 flowStep.logInfo( "did not capture all completed node details, will retry in {}, prior retries: {}", formatDurationFromMillis( pollingInterval ), retries ); 354 else 355 flowStep.logInfo( "did not capture all completed slice details, will retry in {}, prior retries: {}", formatDurationFromMillis( pollingInterval ), retries ); 356 357 lastLog = System.currentTimeMillis(); 358 } 359 360 retries++; 361 362 sleepForPollingInterval(); 363 } 364 365 if( !allNodesFinished ) 366 flowStep.logWarn( "unable to capture all completed node details or determine final state within configured duration: {}, configure property to increase wait duration: '{}'", formatDurationFromMillis( blockForCompletedChildDetailDuration ), CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION ); 367 368 if( !flowStepStats.hasCapturedFinalDetail() ) 369 flowStep.logWarn( "unable to capture all completed slice details within configured duration: {}, configure property to increase wait duration: '{}'", formatDurationFromMillis( blockForCompletedChildDetailDuration ), CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION ); 370 } 371 372 protected abstract boolean isRemoteExecution(); 373 374 protected abstract String internalJobId(); 375 376 protected abstract boolean internalNonBlockingIsSuccessful() throws IOException; 377 378 protected abstract Throwable getThrowable(); 379 380 protected abstract void internalNonBlockingStart() throws IOException; 381 382 protected void blockTillCompleteOrStopped() throws IOException 383 { 384 int iterations = (int) Math.floor( statsStoreInterval / pollingInterval ); 385 int count = 0; 386 387 while( true ) 388 { 389 // test stop last, internalIsStartedRunning may block causing a race condition 390 if( flowStepStats.isSubmitted() && internalIsStartedRunning() && !stop ) 391 { 392 markRunning(); 393 flowStep.fireOnRunning(); 394 } 395 396 if( flowStepStats.isRunning() ) 397 updateNodesStatus(); // records node stats on node status change, not slices 398 399 if( stop || internalNonBlockingIsComplete() ) 400 break; 401 402 if( iterations == count++ ) 403 { 404 count = 0; 405 flowStepStats.recordStats(); 406 flowStepStats.recordChildStats(); // records node and slice stats 407 } 408 409 sleepForPollingInterval(); 410 } 411 } 412 413 private synchronized void markSubmitted() 414 { 415 if( flowStepStats.isStarted() ) 416 { 417 flowStepStats.markSubmitted(); 418 419 Collection<FlowNodeStats> children = flowStepStats.getChildren(); 420 421 for( FlowNodeStats flowNodeStats : children ) 422 flowNodeStats.markStarted(); 423 } 424 425 Flow flow = flowStep.getFlow(); 426 427 if( flow == null ) 428 { 429 LOG.warn( "no parent flow set" ); 430 return; 431 } 432 433 FlowStats flowStats = flow.getFlowStats(); 434 435 synchronized( flowStats ) 436 { 437 if( flowStats.isStarted() ) 438 flowStats.markSubmitted(); 439 } 440 } 441 442 private synchronized void markSkipped() 443 { 444 if( flowStepStats.isFinished() ) 445 return; 446 447 try 448 { 449 flowStepStats.markSkipped(); 450 flowStep.fireOnCompleted(); 451 } 452 finally 453 { 454 markFlowRunning(); // move to running before marking failed 455 } 456 } 457 458 private synchronized void markRunning() 459 { 460 flowStepStats.markRunning(); 461 462 markFlowRunning(); 463 } 464 465 private synchronized void markFlowRunning() 466 { 467 Flow flow = flowStep.getFlow(); 468 469 if( flow == null ) 470 { 471 LOG.warn( "no parent flow set" ); 472 return; 473 } 474 475 FlowStats flowStats = flow.getFlowStats(); 476 477 synchronized( flowStats ) 478 { 479 if( flowStats.isStarted() || flowStats.isSubmitted() ) 480 flowStats.markRunning(); 481 } 482 } 483 484 private boolean updateNodesStatus() 485 { 486 boolean allFinished = true; 487 488 Collection<FlowNodeStats> children = flowStepStats.getFlowNodeStats(); 489 490 for( FlowNodeStats child : children ) 491 { 492 // child#markStarted is called above 493 if( child.isFinished() || child.isPending() ) 494 continue; 495 496 updateNodeStatus( child ); 497 498 allFinished &= child.isFinished(); 499 } 500 501 return allFinished; 502 } 503 504 protected abstract void updateNodeStatus( FlowNodeStats flowNodeStats ); 505 506 protected abstract boolean internalNonBlockingIsComplete() throws IOException; 507 508 protected void sleepForPollingInterval() 509 { 510 Util.safeSleep( pollingInterval ); 511 } 512 513 protected void blockOnPredecessors() 514 { 515 for( FlowStepJob predecessor : predecessors ) 516 { 517 if( !predecessor.isSuccessful() ) 518 { 519 flowStep.logWarn( "abandoning step: " + stepName + ", predecessor failed: " + predecessor.stepName ); 520 521 stop(); 522 } 523 } 524 } 525 526 protected abstract void dumpDebugInfo(); 527 528 /** 529 * Method isSuccessful returns true if this step completed successfully or was skipped. 530 * 531 * @return the successful (type boolean) of this FlowStepJob object. 532 */ 533 public boolean isSuccessful() 534 { 535 try 536 { 537 latch.await(); // freed after step completes in #start() 538 539 return flowStepStats.isSuccessful() || flowStepStats.isSkipped(); 540 } 541 catch( InterruptedException exception ) 542 { 543 flowStep.logWarn( "latch interrupted", exception ); 544 545 return false; 546 } 547 } 548 549 /** 550 * Method isStarted returns true if this underlying job has started running 551 * 552 * @return boolean 553 */ 554 public boolean isStarted() 555 { 556 return internalIsStartedRunning(); 557 } 558 559 protected abstract boolean internalIsStartedRunning(); 560 561 protected void internalCleanup() 562 { 563 // optional, safe to override 564 } 565 566 /** 567 * Method getStepStats returns the stepStats of this FlowStepJob object. 568 * 569 * @return the stepStats (type StepStats) of this FlowStepJob object. 570 */ 571 public FlowStepStats getStepStats() 572 { 573 return flowStepStats; 574 } 575 }