001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.planner;
022
023import java.io.IOException;
024
025import cascading.flow.FlowException;
026import cascading.flow.hadoop.HadoopFlowStep;
027import cascading.flow.planner.BaseFlowStep;
028import cascading.flow.planner.FlowStepJob;
029import cascading.management.state.ClientState;
030import cascading.stats.FlowNodeStats;
031import cascading.stats.FlowStepStats;
032import cascading.stats.hadoop.HadoopStepStats;
033import org.apache.hadoop.mapred.JobClient;
034import org.apache.hadoop.mapred.JobConf;
035import org.apache.hadoop.mapred.JobStatus;
036import org.apache.hadoop.mapred.RunningJob;
037import org.apache.hadoop.mapred.TaskCompletionEvent;
038
039import static cascading.flow.FlowProps.JOB_POLLING_INTERVAL;
040import static cascading.stats.CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION;
041import static cascading.stats.CascadingStats.STATS_STORE_INTERVAL;
042
043/**
044 *
045 */
046public class HadoopFlowStepJob extends FlowStepJob<JobConf>
047  {
048  /** static field to capture errors in hadoop local mode */
049  private static Throwable localError;
050  /** Field jobClient */
051  protected JobClient jobClient;
052  /** Field runningJob */
053  protected RunningJob runningJob;
054
055  private static long getStoreInterval( JobConf jobConf )
056    {
057    return jobConf.getLong( STATS_STORE_INTERVAL, 60 * 1000 );
058    }
059
060  private static long getChildDetailsBlockingDuration( JobConf jobConf )
061    {
062    return jobConf.getLong( STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION, 60 * 1000 );
063    }
064
065  public static long getJobPollingInterval( JobConf jobConf )
066    {
067    return jobConf.getLong( JOB_POLLING_INTERVAL, 5000 );
068    }
069
070  public HadoopFlowStepJob( ClientState clientState, BaseFlowStep<JobConf> flowStep, JobConf currentConf )
071    {
072    super( clientState, currentConf, flowStep, getJobPollingInterval( currentConf ), getStoreInterval( currentConf ), getChildDetailsBlockingDuration( currentConf ) );
073
074    if( flowStep.isDebugEnabled() )
075      flowStep.logDebug( "using polling interval: " + pollingInterval );
076    }
077
078  @Override
079  protected FlowStepStats createStepStats( ClientState clientState )
080    {
081    return new HadoopStepStats( flowStep, clientState )
082      {
083      @Override
084      public JobClient getJobClient()
085        {
086        return jobClient;
087        }
088
089      @Override
090      public RunningJob getJobStatusClient()
091        {
092        return runningJob;
093        }
094      };
095    }
096
097  protected void internalBlockOnStop() throws IOException
098    {
099    if( runningJob != null && !runningJob.isComplete() )
100      runningJob.killJob();
101    }
102
103  protected void internalNonBlockingStart() throws IOException
104    {
105    jobClient = new JobClient( jobConfiguration );
106    runningJob = internalNonBlockingSubmit();
107
108    flowStep.logInfo( "submitted hadoop job: " + runningJob.getID() );
109
110    if( runningJob.getTrackingURL() != null )
111      flowStep.logInfo( "tracking url: " + runningJob.getTrackingURL() );
112    }
113
114  protected RunningJob internalNonBlockingSubmit() throws IOException
115    {
116    return jobClient.submitJob( jobConfiguration );
117    }
118
119  @Override
120  protected void updateNodeStatus( FlowNodeStats flowNodeStats )
121    {
122    try
123      {
124      if( runningJob == null || flowNodeStats.isFinished() )
125        return;
126
127      boolean isMapper = flowNodeStats.getOrdinal() == 0;
128      Integer jobState = getJobStateSafe();
129
130      if( jobState == null ) // if call throws an NPE internally
131        return;
132
133      if( JobStatus.FAILED == jobState )
134        {
135        flowNodeStats.markFailed();
136        return;
137        }
138
139      if( JobStatus.KILLED == jobState )
140        {
141        flowNodeStats.markStopped();
142        return;
143        }
144
145      float progress;
146
147      if( isMapper )
148        progress = runningJob.mapProgress();
149      else
150        progress = runningJob.reduceProgress();
151
152      if( progress == 0.0F ) // not yet running, is only started
153        return;
154
155      if( progress != 1.0F )
156        {
157        flowNodeStats.markRunning();
158        return;
159        }
160
161      flowNodeStats.markRunning();
162
163      if( isMapper && runningJob.reduceProgress() > 0.0F )
164        {
165        flowNodeStats.markSuccessful();
166        return;
167        }
168
169      if( JobStatus.SUCCEEDED == jobState )
170        flowNodeStats.markSuccessful();
171      }
172    catch( IOException exception )
173      {
174      flowStep.logError( "failed setting node status", throwable );
175      }
176    }
177
178  private Integer getJobStateSafe() throws IOException
179    {
180    try
181      {
182      return runningJob.getJobState();
183      }
184    catch( NullPointerException exception ) // this happens
185      {
186      return null;
187      }
188    }
189
190  @Override
191  public boolean isSuccessful()
192    {
193    try
194      {
195      return super.isSuccessful();
196      }
197    catch( NullPointerException exception )
198      {
199      throw new FlowException( "Hadoop is not keeping a large enough job history, please increase the \'mapred.jobtracker.completeuserjobs.maximum\' property", exception );
200      }
201    }
202
203  protected boolean internalNonBlockingIsSuccessful() throws IOException
204    {
205    return runningJob != null && runningJob.isSuccessful();
206    }
207
208  @Override
209  protected boolean isRemoteExecution()
210    {
211    return !( (HadoopFlowStep) flowStep ).isHadoopLocalMode( getConfig() );
212    }
213
214  @Override
215  protected Throwable getThrowable()
216    {
217    return localError;
218    }
219
220  protected String internalJobId()
221    {
222    return runningJob.getJobID();
223    }
224
225  protected boolean internalNonBlockingIsComplete() throws IOException
226    {
227    return runningJob.isComplete();
228    }
229
230  protected void dumpDebugInfo()
231    {
232    try
233      {
234      if( runningJob == null )
235        return;
236
237      Integer jobState = getJobStateSafe();  // if call throws an NPE internally
238
239      if( jobState == null )
240        return;
241
242      flowStep.logWarn( "hadoop job " + runningJob.getID() + " state at " + JobStatus.getJobRunState( jobState ) );
243      flowStep.logWarn( "failure info: " + runningJob.getFailureInfo() );
244
245      TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( 0 );
246      flowStep.logWarn( "task completion events identify failed tasks" );
247      flowStep.logWarn( "task completion events count: " + events.length );
248
249      for( TaskCompletionEvent event : events )
250        flowStep.logWarn( "event = " + event );
251      }
252    catch( Throwable throwable )
253      {
254      flowStep.logError( "failed reading task completion events", throwable );
255      }
256    }
257
258  protected boolean internalIsStartedRunning()
259    {
260    if( runningJob == null )
261      return false;
262
263    try
264      {
265      return runningJob.mapProgress() > 0;
266      }
267    catch( IOException exception )
268      {
269      flowStep.logWarn( "unable to test for map progress", exception );
270      return false;
271      }
272    }
273
274  /**
275   * Internal method to report errors that happen on hadoop local mode. Hadoops local
276   * JobRunner does not give access to TaskReports, but we want to be able to capture
277   * the exception and not just print it to stderr. FlowMapper and FlowReducer use this method.
278   *
279   * @param throwable the throwable to be reported.
280   */
281  public static void reportLocalError( Throwable throwable )
282    {
283    localError = throwable;
284    }
285  }