001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.List;
027import java.util.concurrent.Callable;
028import java.util.concurrent.CountDownLatch;
029
030import cascading.flow.Flow;
031import cascading.flow.FlowException;
032import cascading.flow.FlowStep;
033import cascading.flow.FlowStepStrategy;
034import cascading.management.state.ClientState;
035import cascading.stats.CascadingStats;
036import cascading.stats.FlowNodeStats;
037import cascading.stats.FlowStats;
038import cascading.stats.FlowStepStats;
039import cascading.util.Util;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import static cascading.util.Util.formatDurationFromMillis;
044
045/**
046 *
047 */
048public abstract class FlowStepJob<Config> implements Callable<Throwable>
049  {
050  // most logs messages should be delegated to the FlowStep.log* methods
051  // non job related issues can use this logger
052  private static final Logger LOG = LoggerFactory.getLogger( FlowStepJob.class );
053
054  /** Field stepName */
055  protected final String stepName;
056  /** Field jobConfiguration */
057  protected final Config jobConfiguration;
058  /** Field pollingInterval */
059  protected long pollingInterval = 1000;
060  /** Field recordStatsInterval */
061  protected long statsStoreInterval = 60 * 1000;
062  /** Field waitTillCompletedChildStatsDuration */
063  protected long blockForCompletedChildDetailDuration = 60 * 1000;
064  /** Field predecessors */
065  protected List<FlowStepJob<Config>> predecessors;
066  /** Field latch */
067  private final CountDownLatch latch = new CountDownLatch( 1 );
068  /** Field stop */
069  private boolean stop = false;
070  /** Field flowStep */
071  protected final BaseFlowStep<Config> flowStep;
072  /** Field stepStats */
073  protected FlowStepStats flowStepStats;
074  /** Field throwable */
075  protected Throwable throwable;
076
077  public FlowStepJob( ClientState clientState, Config jobConfiguration, BaseFlowStep<Config> flowStep, long pollingInterval, long statsStoreInterval, long blockForCompletedChildDetailDuration )
078    {
079    this.jobConfiguration = jobConfiguration;
080    this.stepName = flowStep.getName();
081    this.pollingInterval = pollingInterval;
082    this.statsStoreInterval = statsStoreInterval;
083    this.blockForCompletedChildDetailDuration = blockForCompletedChildDetailDuration;
084    this.flowStep = flowStep;
085    this.flowStepStats = createStepStats( clientState );
086
087    this.flowStepStats.prepare();
088    this.flowStepStats.markPending();
089
090    for( FlowNodeStats flowNodeStats : this.flowStepStats.getFlowNodeStats() )
091      {
092      flowNodeStats.prepare();
093      flowNodeStats.markPending();
094      }
095    }
096
097  public Config getConfig()
098    {
099    return jobConfiguration;
100    }
101
102  protected abstract FlowStepStats createStepStats( ClientState clientState );
103
104  public synchronized void stop()
105    {
106    if( flowStep.isInfoEnabled() )
107      flowStep.logInfo( "stopping: " + stepName );
108
109    stop = true;
110
111    // allow pending -> stopped transition
112    // never want a hanging pending state
113    if( !flowStepStats.isFinished() )
114      flowStepStats.markStopped();
115
116    try
117      {
118      internalBlockOnStop();
119      }
120    catch( IOException exception )
121      {
122      flowStep.logWarn( "unable to kill job: " + stepName, exception );
123      }
124    finally
125      {
126      // call rollback after the job has been stopped, only if it was stopped
127      if( flowStepStats.isStopped() )
128        {
129        flowStep.rollbackSinks();
130        flowStep.fireOnStopping();
131        }
132
133      flowStepStats.cleanup();
134      }
135    }
136
137  protected abstract void internalBlockOnStop() throws IOException;
138
139  public void setPredecessors( List<FlowStepJob<Config>> predecessors )
140    {
141    this.predecessors = predecessors;
142    }
143
144  public Throwable call()
145    {
146    start();
147
148    return throwable;
149    }
150
151  protected void start()
152    {
153    try
154      {
155      if( isSkipFlowStep() )
156        {
157        markSkipped();
158
159        if( flowStep.isInfoEnabled() && flowStepStats.isSkipped() )
160          flowStep.logInfo( "skipping step: " + stepName );
161
162        return;
163        }
164
165      synchronized( this ) // added in 3.0, jdk1.7 may have a aggravated
166        {
167        if( stop )
168          {
169          if( flowStep.isInfoEnabled() )
170            flowStep.logInfo( "stop called before start: " + stepName );
171
172          return;
173          }
174
175        markStarted();
176        }
177
178      blockOnPredecessors();
179
180      prepareResources(); // throws Throwable to skip next steps
181
182      applyFlowStepConfStrategy(); // prepare resources beforehand
183
184      blockOnJob();
185      }
186    catch( Throwable throwable )
187      {
188      this.throwable = throwable; // store first, in case throwable leaks out of dumpDebugInfo
189
190      dumpDebugInfo();
191
192      // in case isSkipFlowStep fails before the previous markStarted, we don't fail advancing to the failed state
193      if( flowStepStats.isPending() )
194        markStarted();
195
196      if( !flowStepStats.isFinished() )
197        {
198        flowStepStats.markFailed( this.throwable );
199        flowStep.fireOnThrowable( this.throwable );
200        }
201      }
202    finally
203      {
204      latch.countDown();
205
206      // lets free the latch and capture any failure info if exiting via a throwable
207      // remain in this thread to keep client side alive until shutdown
208      finalizeNodeSliceCapture();
209
210      flowStepStats.cleanup();
211      }
212
213    internalCleanup();
214    }
215
216  private void prepareResources() throws Throwable
217    {
218    if( stop ) // true if a predecessor failed
219      return;
220
221    Throwable throwable = flowStep.prepareResources();
222
223    if( throwable != null )
224      throw throwable;
225    }
226
227  private synchronized boolean markStarted()
228    {
229    if( flowStepStats.isFinished() ) // if stopped, return
230      return false;
231
232    flowStepStats.markStarted();
233
234    return true;
235    }
236
237  private void applyFlowStepConfStrategy()
238    {
239    FlowStepStrategy flowStepStrategy = flowStep.getFlow().getFlowStepStrategy();
240
241    if( flowStepStrategy == null )
242      return;
243
244    List<FlowStep> predecessorSteps = new ArrayList<FlowStep>();
245
246    for( FlowStepJob predecessor : predecessors )
247      predecessorSteps.add( predecessor.flowStep );
248
249    flowStepStrategy.apply( flowStep.getFlow(), predecessorSteps, flowStep );
250    }
251
252  protected boolean isSkipFlowStep() throws IOException
253    {
254    // if runID is not set, never skip a step
255    if( flowStep.getFlow().getRunID() == null )
256      return false;
257
258    return flowStep.allSourcesExist() && !flowStep.areSourcesNewer( flowStep.getSinkModified() );
259    }
260
261  protected void blockOnJob() throws IOException
262    {
263    if( stop ) // true if a predecessor failed
264      return;
265
266    if( flowStep.isInfoEnabled() )
267      flowStep.logInfo( "starting step: " + stepName );
268
269    internalNonBlockingStart();
270
271    markSubmitted();
272    flowStep.fireOnStarting();
273
274    blockTillCompleteOrStopped();
275
276    if( !stop && !internalNonBlockingIsSuccessful() )
277      {
278      if( !flowStepStats.isFinished() )
279        {
280        flowStep.rollbackSinks();
281        flowStepStats.markFailed( getThrowable() );
282        updateNodesStatus();
283        flowStep.fireOnThrowable( getThrowable() );
284        }
285
286      // if available, rethrow the unrecoverable error
287      if( getThrowable() instanceof OutOfMemoryError )
288        throw (OutOfMemoryError) getThrowable();
289
290      dumpDebugInfo();
291
292      if( !isRemoteExecution() )
293        this.throwable = new FlowException( "local step failed: " + stepName, getThrowable() );
294      else
295        this.throwable = new FlowException( "step failed: " + stepName + ", step id: " + getStepStats().getID() + ", job id: " + internalJobId() + ", please see cluster logs for failure messages" );
296      }
297    else
298      {
299      if( internalNonBlockingIsSuccessful() && !flowStepStats.isFinished() )
300        {
301        this.throwable = flowStep.commitSinks();
302
303        if( this.throwable != null )
304          {
305          flowStepStats.markFailed( this.throwable );
306          updateNodesStatus();
307          flowStep.fireOnThrowable( this.throwable );
308          }
309        else
310          {
311          flowStepStats.markSuccessful();
312          updateNodesStatus();
313          flowStep.fireOnCompleted();
314          }
315        }
316      }
317    }
318
319  protected void finalizeNodeSliceCapture()
320    {
321    long startOfFinalPolling = System.currentTimeMillis();
322    long lastLog = 0;
323    long retries = 0;
324
325    boolean allNodesFinished;
326
327    while( true )
328      {
329      allNodesFinished = updateNodesStatus();
330
331      flowStepStats.recordChildStats();
332
333      if( allNodesFinished && flowStepStats.hasCapturedFinalDetail() )
334        break;
335
336      if( ( System.currentTimeMillis() - startOfFinalPolling ) >= blockForCompletedChildDetailDuration )
337        break;
338
339      if( System.currentTimeMillis() - lastLog > 1000 )
340        {
341        if( !allNodesFinished )
342          flowStep.logInfo( "did not capture all completed node details, will retry in {}, prior retries: {}", formatDurationFromMillis( pollingInterval ), retries );
343        else
344          flowStep.logInfo( "did not capture all completed slice details, will retry in {}, prior retries: {}", formatDurationFromMillis( pollingInterval ), retries );
345
346        lastLog = System.currentTimeMillis();
347        }
348
349      retries++;
350
351      sleepForPollingInterval();
352      }
353
354    if( !allNodesFinished )
355      flowStep.logWarn( "unable to capture all completed node details or determine final state within configured duration: {}, configure property to increase wait duration: '{}'", formatDurationFromMillis( blockForCompletedChildDetailDuration ), CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION );
356
357    if( !flowStepStats.hasCapturedFinalDetail() )
358      flowStep.logWarn( "unable to capture all completed slice details within configured duration: {}, configure property to increase wait duration: '{}'", formatDurationFromMillis( blockForCompletedChildDetailDuration ), CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION );
359    }
360
361  protected abstract boolean isRemoteExecution();
362
363  protected abstract String internalJobId();
364
365  protected abstract boolean internalNonBlockingIsSuccessful() throws IOException;
366
367  protected abstract Throwable getThrowable();
368
369  protected abstract void internalNonBlockingStart() throws IOException;
370
371  protected void blockTillCompleteOrStopped() throws IOException
372    {
373    int iterations = (int) Math.floor( statsStoreInterval / pollingInterval );
374    int count = 0;
375
376    while( true )
377      {
378      // test stop last, internalIsStartedRunning may block causing a race condition
379      if( flowStepStats.isSubmitted() && internalIsStartedRunning() && !stop )
380        {
381        markRunning();
382        flowStep.fireOnRunning();
383        }
384
385      if( flowStepStats.isRunning() )
386        updateNodesStatus(); // records node stats on node status change, not slices
387
388      if( stop || internalNonBlockingIsComplete() )
389        break;
390
391      if( iterations == count++ )
392        {
393        count = 0;
394        flowStepStats.recordStats();
395        flowStepStats.recordChildStats(); // records node and slice stats
396        }
397
398      sleepForPollingInterval();
399      }
400    }
401
402  private synchronized void markSubmitted()
403    {
404    if( flowStepStats.isStarted() )
405      {
406      flowStepStats.markSubmitted();
407
408      Collection<FlowNodeStats> children = flowStepStats.getChildren();
409
410      for( FlowNodeStats flowNodeStats : children )
411        flowNodeStats.markStarted();
412      }
413
414    Flow flow = flowStep.getFlow();
415
416    if( flow == null )
417      {
418      LOG.warn( "no parent flow set" );
419      return;
420      }
421
422    FlowStats flowStats = flow.getFlowStats();
423
424    synchronized( flowStats )
425      {
426      if( flowStats.isStarted() )
427        flowStats.markSubmitted();
428      }
429    }
430
431  private synchronized void markSkipped()
432    {
433    if( flowStepStats.isFinished() )
434      return;
435
436    try
437      {
438      flowStepStats.markSkipped();
439      flowStep.fireOnCompleted();
440      }
441    finally
442      {
443      markFlowRunning(); // move to running before marking failed
444      }
445    }
446
447  private synchronized void markRunning()
448    {
449    flowStepStats.markRunning();
450
451    markFlowRunning();
452    }
453
454  private synchronized void markFlowRunning()
455    {
456    Flow flow = flowStep.getFlow();
457
458    if( flow == null )
459      {
460      LOG.warn( "no parent flow set" );
461      return;
462      }
463
464    FlowStats flowStats = flow.getFlowStats();
465
466    synchronized( flowStats )
467      {
468      if( flowStats.isStarted() || flowStats.isSubmitted() )
469        flowStats.markRunning();
470      }
471    }
472
473  private boolean updateNodesStatus()
474    {
475    boolean allFinished = true;
476
477    Collection<FlowNodeStats> children = flowStepStats.getFlowNodeStats();
478
479    for( FlowNodeStats child : children )
480      {
481      // child#markStarted is called above
482      if( child.isFinished() || child.isPending() )
483        continue;
484
485      updateNodeStatus( child );
486
487      allFinished &= child.isFinished();
488      }
489
490    return allFinished;
491    }
492
493  protected abstract void updateNodeStatus( FlowNodeStats flowNodeStats );
494
495  protected abstract boolean internalNonBlockingIsComplete() throws IOException;
496
497  protected void sleepForPollingInterval()
498    {
499    Util.safeSleep( pollingInterval );
500    }
501
502  protected void blockOnPredecessors()
503    {
504    for( FlowStepJob predecessor : predecessors )
505      {
506      if( !predecessor.isSuccessful() )
507        {
508        flowStep.logWarn( "abandoning step: " + stepName + ", predecessor failed: " + predecessor.stepName );
509
510        stop();
511        }
512      }
513    }
514
515  protected abstract void dumpDebugInfo();
516
517  /**
518   * Method isSuccessful returns true if this step completed successfully or was skipped.
519   *
520   * @return the successful (type boolean) of this FlowStepJob object.
521   */
522  public boolean isSuccessful()
523    {
524    try
525      {
526      latch.await(); // freed after step completes in #start()
527
528      return flowStepStats.isSuccessful() || flowStepStats.isSkipped();
529      }
530    catch( InterruptedException exception )
531      {
532      flowStep.logWarn( "latch interrupted", exception );
533
534      return false;
535      }
536    }
537
538  /**
539   * Method isStarted returns true if this underlying job has started running
540   *
541   * @return boolean
542   */
543  public boolean isStarted()
544    {
545    return internalIsStartedRunning();
546    }
547
548  protected abstract boolean internalIsStartedRunning();
549
550  protected void internalCleanup()
551    {
552    // optional, safe to override
553    }
554
555  /**
556   * Method getStepStats returns the stepStats of this FlowStepJob object.
557   *
558   * @return the stepStats (type StepStats) of this FlowStepJob object.
559   */
560  public FlowStepStats getStepStats()
561    {
562    return flowStepStats;
563    }
564  }