001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.planner;
022    
023    import java.io.IOException;
024    import java.util.ArrayList;
025    import java.util.List;
026    import java.util.concurrent.Callable;
027    import java.util.concurrent.CountDownLatch;
028    
029    import cascading.flow.Flow;
030    import cascading.flow.FlowException;
031    import cascading.flow.FlowStep;
032    import cascading.flow.FlowStepStrategy;
033    import cascading.management.state.ClientState;
034    import cascading.stats.FlowStats;
035    import cascading.stats.FlowStepStats;
036    import cascading.util.Util;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     *
042     */
043    public abstract class FlowStepJob<Config> implements Callable<Throwable>
044      {
045      private static final Logger LOG = LoggerFactory.getLogger( FlowStepJob.class );
046    
047      /** Field stepName */
048      protected final String stepName;
049      /** Field pollingInterval */
050      protected long pollingInterval = 1000;
051      /** Field recordStatsInterval */
052      protected long statsStoreInterval = 60 * 1000;
053      /** Field predecessors */
054      protected List<FlowStepJob<Config>> predecessors;
055      /** Field latch */
056      private final CountDownLatch latch = new CountDownLatch( 1 );
057      /** Field stop */
058      private boolean stop = false;
059      /** Field flowStep */
060      protected final BaseFlowStep<Config> flowStep;
061      /** Field stepStats */
062      protected FlowStepStats flowStepStats;
063      /** Field throwable */
064      protected Throwable throwable;
065    
066      public FlowStepJob( ClientState clientState, BaseFlowStep flowStep, long pollingInterval, long statsStoreInterval )
067        {
068        this.flowStep = flowStep;
069        this.stepName = flowStep.getName();
070        this.pollingInterval = pollingInterval;
071        this.statsStoreInterval = statsStoreInterval;
072        this.flowStepStats = createStepStats( clientState );
073    
074        this.flowStepStats.prepare();
075        this.flowStepStats.markPending();
076        }
077    
078      public abstract Config getConfig();
079    
080      protected abstract FlowStepStats createStepStats( ClientState clientState );
081    
082      public synchronized void stop()
083        {
084        if( flowStep.isInfoEnabled() )
085          flowStep.logInfo( "stopping: " + stepName );
086    
087        stop = true;
088    
089        // allow pending -> stopped transition
090        // never want a hanging pending state
091        if( !flowStepStats.isFinished() )
092          flowStepStats.markStopped();
093    
094        try
095          {
096          internalBlockOnStop();
097          }
098        catch( IOException exception )
099          {
100          flowStep.logWarn( "unable to kill job: " + stepName, exception );
101          }
102        finally
103          {
104          // call rollback after the job has been stopped, only if it was stopped
105          if( flowStepStats.isStopped() )
106            {
107            flowStep.rollbackSinks();
108            flowStep.fireOnStopping();
109            }
110    
111          flowStepStats.cleanup();
112          }
113        }
114    
115      protected abstract void internalBlockOnStop() throws IOException;
116    
117      public void setPredecessors( List<FlowStepJob<Config>> predecessors )
118        {
119        this.predecessors = predecessors;
120        }
121    
122      public Throwable call()
123        {
124        start();
125    
126        return throwable;
127        }
128    
129      protected void start()
130        {
131        try
132          {
133          if( isSkipFlowStep() )
134            {
135            markSkipped();
136    
137            if( flowStep.isInfoEnabled() && flowStepStats.isSkipped() )
138              flowStep.logInfo( "skipping step: " + stepName );
139    
140            return;
141            }
142    
143          synchronized( this ) // backport from 3.0
144            {
145            if( stop )
146              {
147              if( flowStep.isInfoEnabled() )
148                flowStep.logInfo( "stop called before start: " + stepName );
149              return;
150              }
151    
152            if( !markStarted() )
153              return;
154            }
155    
156          blockOnPredecessors();
157    
158          applyFlowStepConfStrategy();
159    
160          blockOnJob();
161          }
162        catch( Throwable throwable )
163          {
164          dumpDebugInfo();
165          this.throwable = throwable;
166          flowStep.fireOnThrowable( throwable );
167          }
168        finally
169          {
170          latch.countDown();
171          flowStepStats.cleanup();
172          }
173        }
174    
175      private synchronized boolean markStarted()
176        {
177        if( flowStepStats.isFinished() ) // if stopped, return
178          return false;
179    
180        flowStepStats.markStarted();
181    
182        return true;
183        }
184    
185      private void applyFlowStepConfStrategy()
186        {
187        FlowStepStrategy flowStepStrategy = flowStep.getFlow().getFlowStepStrategy();
188    
189        if( flowStepStrategy == null )
190          return;
191    
192        List<FlowStep> predecessorSteps = new ArrayList<FlowStep>();
193    
194        for( FlowStepJob predecessor : predecessors )
195          predecessorSteps.add( predecessor.flowStep );
196    
197        flowStepStrategy.apply( flowStep.getFlow(), predecessorSteps, flowStep );
198        }
199    
200      protected boolean isSkipFlowStep() throws IOException
201        {
202        // if runID is not set, never skip a step
203        if( flowStep.getFlow().getRunID() == null )
204          return false;
205    
206        return flowStep.allSourcesExist() && !flowStep.areSourcesNewer( flowStep.getSinkModified() );
207        }
208    
209      protected void blockOnJob() throws IOException
210        {
211        if( stop ) // true if a predecessor failed
212          return;
213    
214        if( flowStep.isInfoEnabled() )
215          flowStep.logInfo( "starting step: " + stepName );
216    
217        internalNonBlockingStart();
218    
219        markSubmitted();
220        flowStep.fireOnStarting();
221    
222        blockTillCompleteOrStopped();
223    
224        if( !stop && !internalNonBlockingIsSuccessful() )
225          {
226          if( !flowStepStats.isFinished() )
227            {
228            flowStep.rollbackSinks();
229            flowStepStats.markFailed( getThrowable() );
230            flowStep.fireOnThrowable( getThrowable() );
231            }
232    
233          dumpDebugInfo();
234    
235          // if available, rethrow the unrecoverable error
236          if( getThrowable() instanceof OutOfMemoryError )
237            throw ( (OutOfMemoryError) getThrowable() );
238    
239          if( !isRemoteExecution() )
240            throwable = new FlowException( "local step failed", getThrowable() );
241          else
242            throwable = new FlowException( "step failed: " + stepName + ", with job id: " + internalJobId() + ", please see cluster logs for failure messages" );
243          }
244        else
245          {
246          if( internalNonBlockingIsSuccessful() && !flowStepStats.isFinished() )
247            {
248            throwable = flowStep.commitSinks();
249    
250            if( throwable != null )
251              {
252              flowStepStats.markFailed( throwable );
253              flowStep.fireOnThrowable( throwable );
254              }
255            else
256              {
257              flowStepStats.markSuccessful();
258              flowStep.fireOnCompleted();
259              }
260            }
261          }
262    
263        flowStepStats.recordChildStats();
264        }
265    
266      protected abstract boolean isRemoteExecution();
267    
268      protected abstract String internalJobId();
269    
270      protected abstract boolean internalNonBlockingIsSuccessful() throws IOException;
271    
272      protected abstract Throwable getThrowable();
273    
274      protected abstract void internalNonBlockingStart() throws IOException;
275    
276      protected void blockTillCompleteOrStopped() throws IOException
277        {
278        int iterations = (int) Math.floor( statsStoreInterval / pollingInterval );
279        int count = 0;
280    
281        while( true )
282          {
283          if( flowStepStats.isSubmitted() && isStarted() )
284            {
285            markRunning();
286            flowStep.fireOnRunning();
287            }
288    
289          if( stop || internalNonBlockingIsComplete() )
290            break;
291    
292          sleepForPollingInterval();
293    
294          if( iterations == count++ )
295            {
296            count = 0;
297            flowStepStats.recordStats();
298            flowStepStats.recordChildStats();
299            }
300          }
301        }
302    
303      private synchronized void markSubmitted()
304        {
305        if( flowStepStats.isStarted() )
306          flowStepStats.markSubmitted();
307    
308        Flow flow = flowStep.getFlow();
309    
310        if( flow == null )
311          {
312          LOG.warn( "no parent flow set" );
313          return;
314          }
315    
316        FlowStats flowStats = flow.getFlowStats();
317    
318        synchronized( flowStats )
319          {
320          if( flowStats.isStarted() )
321            flowStats.markSubmitted();
322          }
323        }
324    
325      private synchronized void markRunning()
326        {
327        flowStepStats.markRunning();
328    
329        markFlowRunning();
330        }
331    
332      private synchronized void markSkipped()
333        {
334        if( flowStepStats.isFinished() )
335          return;
336    
337        flowStepStats.markSkipped();
338    
339        markFlowRunning();
340        }
341    
342      private synchronized void markFlowRunning()
343        {
344        Flow flow = flowStep.getFlow();
345    
346        if( flow == null )
347          {
348          LOG.warn( "no parent flow set" );
349          return;
350          }
351    
352        FlowStats flowStats = flow.getFlowStats();
353    
354        synchronized( flowStats )
355          {
356          if( flowStats.isStarted() || flowStats.isSubmitted() )
357            flowStats.markRunning();
358          }
359        }
360    
361      protected abstract boolean internalNonBlockingIsComplete() throws IOException;
362    
363      protected void sleepForPollingInterval()
364        {
365        Util.safeSleep( pollingInterval );
366        }
367    
368      protected void blockOnPredecessors()
369        {
370        for( FlowStepJob predecessor : predecessors )
371          {
372          if( !predecessor.isSuccessful() )
373            {
374            flowStep.logWarn( "abandoning step: " + stepName + ", predecessor failed: " + predecessor.stepName );
375    
376            stop();
377            }
378          }
379        }
380    
381      protected abstract void dumpDebugInfo();
382    
383      /**
384       * Method isSuccessful returns true if this step completed successfully or was skipped.
385       *
386       * @return the successful (type boolean) of this FlowStepJob object.
387       */
388      public boolean isSuccessful()
389        {
390        try
391          {
392          latch.await(); // freed after step completes in #start()
393    
394          return flowStepStats.isSuccessful() || flowStepStats.isSkipped();
395          }
396        catch( InterruptedException exception )
397          {
398          flowStep.logWarn( "latch interrupted", exception );
399    
400          return false;
401          }
402        catch( NullPointerException exception )
403          {
404          throw new FlowException( "Hadoop is not keeping a large enough job history, please increase the \'mapred.jobtracker.completeuserjobs.maximum\' property", exception );
405          }
406        }
407    
408      /**
409       * Method wasStarted returns true if this job was started
410       *
411       * @return boolean
412       */
413      public boolean isStarted()
414        {
415        return internalIsStarted();
416        }
417    
418      protected abstract boolean internalIsStarted();
419    
420      /**
421       * Method getStepStats returns the stepStats of this FlowStepJob object.
422       *
423       * @return the stepStats (type StepStats) of this FlowStepJob object.
424       */
425      public FlowStepStats getStepStats()
426        {
427        return flowStepStats;
428        }
429      }