001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.planner;
022    
023    import java.io.IOException;
024    import java.util.ArrayList;
025    import java.util.List;
026    import java.util.concurrent.Callable;
027    import java.util.concurrent.CountDownLatch;
028    
029    import cascading.flow.Flow;
030    import cascading.flow.FlowException;
031    import cascading.flow.FlowStep;
032    import cascading.flow.FlowStepStrategy;
033    import cascading.management.state.ClientState;
034    import cascading.stats.FlowStats;
035    import cascading.stats.FlowStepStats;
036    import cascading.util.Util;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     *
042     */
043    public abstract class FlowStepJob<Config> implements Callable<Throwable>
044      {
045      private static final Logger LOG = LoggerFactory.getLogger( FlowStepJob.class );
046    
047      /** Field stepName */
048      protected final String stepName;
049      /** Field pollingInterval */
050      protected long pollingInterval = 1000;
051      /** Field recordStatsInterval */
052      protected long statsStoreInterval = 60 * 1000;
053      /** Field predecessors */
054      protected List<FlowStepJob<Config>> predecessors;
055      /** Field latch */
056      private final CountDownLatch latch = new CountDownLatch( 1 );
057      /** Field stop */
058      private boolean stop = false;
059      /** Field flowStep */
060      protected final BaseFlowStep<Config> flowStep;
061      /** Field stepStats */
062      protected FlowStepStats flowStepStats;
063      /** Field throwable */
064      protected Throwable throwable;
065    
066      public FlowStepJob( ClientState clientState, BaseFlowStep flowStep, long pollingInterval, long statsStoreInterval )
067        {
068        this.flowStep = flowStep;
069        this.stepName = flowStep.getName();
070        this.pollingInterval = pollingInterval;
071        this.statsStoreInterval = statsStoreInterval;
072        this.flowStepStats = createStepStats( clientState );
073    
074        this.flowStepStats.prepare();
075        this.flowStepStats.markPending();
076        }
077    
078      public abstract Config getConfig();
079    
080      protected abstract FlowStepStats createStepStats( ClientState clientState );
081    
082      public synchronized void stop()
083        {
084        if( flowStep.isInfoEnabled() )
085          flowStep.logInfo( "stopping: " + stepName );
086    
087        stop = true;
088    
089        // allow pending -> stopped transition
090        // never want a hanging pending state
091        if( !flowStepStats.isFinished() )
092          flowStepStats.markStopped();
093    
094        try
095          {
096          internalBlockOnStop();
097          }
098        catch( IOException exception )
099          {
100          flowStep.logWarn( "unable to kill job: " + stepName, exception );
101          }
102        finally
103          {
104          // call rollback after the job has been stopped, only if it was stopped
105          if( flowStepStats.isStopped() )
106            {
107            flowStep.rollbackSinks();
108            flowStep.fireOnStopping();
109            }
110    
111          flowStepStats.cleanup();
112          }
113        }
114    
115      protected abstract void internalBlockOnStop() throws IOException;
116    
117      public void setPredecessors( List<FlowStepJob<Config>> predecessors )
118        {
119        this.predecessors = predecessors;
120        }
121    
122      public Throwable call()
123        {
124        start();
125    
126        return throwable;
127        }
128    
129      protected void start()
130        {
131        try
132          {
133          if( isSkipFlowStep() )
134            {
135            markSkipped();
136    
137            if( flowStep.isInfoEnabled() )
138              flowStep.logInfo( "skipping step: " + stepName );
139    
140            return;
141            }
142    
143          flowStepStats.markStarted();
144    
145          blockOnPredecessors();
146    
147          applyFlowStepConfStrategy();
148    
149          blockOnJob();
150          }
151        catch( Throwable throwable )
152          {
153          dumpDebugInfo();
154          this.throwable = throwable;
155          flowStep.fireOnThrowable( throwable );
156          }
157        finally
158          {
159          latch.countDown();
160          flowStepStats.cleanup();
161          }
162        }
163    
164      private void applyFlowStepConfStrategy()
165        {
166        FlowStepStrategy flowStepStrategy = flowStep.getFlow().getFlowStepStrategy();
167    
168        if( flowStepStrategy == null )
169          return;
170    
171        List<FlowStep> predecessorSteps = new ArrayList<FlowStep>();
172    
173        for( FlowStepJob predecessor : predecessors )
174          predecessorSteps.add( predecessor.flowStep );
175    
176        flowStepStrategy.apply( flowStep.getFlow(), predecessorSteps, flowStep );
177        }
178    
179      protected boolean isSkipFlowStep() throws IOException
180        {
181        // if runID is not set, never skip a step
182        if( flowStep.getFlow().getRunID() == null )
183          return false;
184    
185        return flowStep.allSourcesExist() && !flowStep.areSourcesNewer( flowStep.getSinkModified() );
186        }
187    
188      protected void blockOnJob() throws IOException
189        {
190        if( stop ) // true if a predecessor failed
191          return;
192    
193        if( flowStep.isInfoEnabled() )
194          flowStep.logInfo( "starting step: " + stepName );
195    
196        internalNonBlockingStart();
197    
198        markSubmitted();
199        flowStep.fireOnStarting();
200    
201        blockTillCompleteOrStopped();
202    
203        if( !stop && !internalNonBlockingIsSuccessful() )
204          {
205          if( !flowStepStats.isFinished() )
206            {
207            flowStep.rollbackSinks();
208            flowStepStats.markFailed( getThrowable() );
209            flowStep.fireOnThrowable( getThrowable() );
210            }
211    
212          dumpDebugInfo();
213    
214          // if available, rethrow the unrecoverable error
215          if( getThrowable() instanceof OutOfMemoryError )
216            throw ( (OutOfMemoryError) getThrowable() );
217    
218          if( !isRemoteExecution() )
219            throwable = new FlowException( "local step failed", getThrowable() );
220          else
221            throwable = new FlowException( "step failed: " + stepName + ", with job id: " + internalJobId() + ", please see cluster logs for failure messages" );
222          }
223        else
224          {
225          if( internalNonBlockingIsSuccessful() && !flowStepStats.isFinished() )
226            {
227            throwable = flowStep.commitSinks();
228    
229            if( throwable != null )
230              {
231              flowStepStats.markFailed( throwable );
232              flowStep.fireOnThrowable( throwable );
233              }
234            else
235              {
236              flowStepStats.markSuccessful();
237              flowStep.fireOnCompleted();
238              }
239            }
240          }
241    
242        flowStepStats.recordChildStats();
243        }
244    
245      protected abstract boolean isRemoteExecution();
246    
247      protected abstract String internalJobId();
248    
249      protected abstract boolean internalNonBlockingIsSuccessful() throws IOException;
250    
251      protected abstract Throwable getThrowable();
252    
253      protected abstract void internalNonBlockingStart() throws IOException;
254    
255      protected void blockTillCompleteOrStopped() throws IOException
256        {
257        int iterations = (int) Math.floor( statsStoreInterval / pollingInterval );
258        int count = 0;
259    
260        while( true )
261          {
262          if( flowStepStats.isSubmitted() && isStarted() )
263            {
264            markRunning();
265            flowStep.fireOnRunning();
266            }
267    
268          if( stop || internalNonBlockingIsComplete() )
269            break;
270    
271          sleepForPollingInterval();
272    
273          if( iterations == count++ )
274            {
275            count = 0;
276            flowStepStats.recordStats();
277            flowStepStats.recordChildStats();
278            }
279          }
280        }
281    
282      private synchronized void markSubmitted()
283        {
284        if( flowStepStats.isStarted() )
285          flowStepStats.markSubmitted();
286    
287        Flow flow = flowStep.getFlow();
288    
289        if( flow == null )
290          {
291          LOG.warn( "no parent flow set" );
292          return;
293          }
294    
295        FlowStats flowStats = flow.getFlowStats();
296    
297        synchronized( flowStats )
298          {
299          if( flowStats.isStarted() )
300            flowStats.markSubmitted();
301          }
302        }
303    
304      private synchronized void markRunning()
305        {
306        flowStepStats.markRunning();
307    
308        markFlowRunning();
309        }
310    
311      private synchronized void markSkipped()
312        {
313        flowStepStats.markSkipped();
314    
315        markFlowRunning();
316        }
317    
318      private synchronized void markFlowRunning()
319        {
320        Flow flow = flowStep.getFlow();
321    
322        if( flow == null )
323          {
324          LOG.warn( "no parent flow set" );
325          return;
326          }
327    
328        FlowStats flowStats = flow.getFlowStats();
329    
330        synchronized( flowStats )
331          {
332          if( flowStats.isStarted() || flowStats.isSubmitted() )
333            flowStats.markRunning();
334          }
335        }
336    
337      protected abstract boolean internalNonBlockingIsComplete() throws IOException;
338    
339      protected void sleepForPollingInterval()
340        {
341        Util.safeSleep( pollingInterval );
342        }
343    
344      protected void blockOnPredecessors()
345        {
346        for( FlowStepJob predecessor : predecessors )
347          {
348          if( !predecessor.isSuccessful() )
349            {
350            flowStep.logWarn( "abandoning step: " + stepName + ", predecessor failed: " + predecessor.stepName );
351    
352            stop();
353            }
354          }
355        }
356    
357      protected abstract void dumpDebugInfo();
358    
359      /**
360       * Method isSuccessful returns true if this step completed successfully or was skipped.
361       *
362       * @return the successful (type boolean) of this FlowStepJob object.
363       */
364      public boolean isSuccessful()
365        {
366        try
367          {
368          latch.await(); // freed after step completes in #start()
369    
370          return flowStepStats.isSuccessful() || flowStepStats.isSkipped();
371          }
372        catch( InterruptedException exception )
373          {
374          flowStep.logWarn( "latch interrupted", exception );
375    
376          return false;
377          }
378        catch( NullPointerException exception )
379          {
380          throw new FlowException( "Hadoop is not keeping a large enough job history, please increase the \'mapred.jobtracker.completeuserjobs.maximum\' property", exception );
381          }
382        }
383    
384      /**
385       * Method wasStarted returns true if this job was started
386       *
387       * @return boolean
388       */
389      public boolean isStarted()
390        {
391        return internalIsStarted();
392        }
393    
394      protected abstract boolean internalIsStarted();
395    
396      /**
397       * Method getStepStats returns the stepStats of this FlowStepJob object.
398       *
399       * @return the stepStats (type StepStats) of this FlowStepJob object.
400       */
401      public FlowStepStats getStepStats()
402        {
403        return flowStepStats;
404        }
405      }