001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.io.IOException;
024
025import cascading.flow.FlowException;
026import cascading.flow.FlowNode;
027import cascading.flow.FlowStep;
028import cascading.flow.planner.BaseFlowStep;
029import cascading.management.state.ClientState;
030import cascading.stats.BaseCachedStepStats;
031import cascading.util.Util;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.mapred.JobClient;
034import org.apache.hadoop.mapred.JobConf;
035import org.apache.hadoop.mapred.RunningJob;
036import org.apache.hadoop.mapreduce.Counters;
037import org.apache.hadoop.mapreduce.Job;
038import org.apache.hadoop.mapreduce.TaskCompletionEvent;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/** Class HadoopStepStats provides Hadoop specific statistics and methods to underlying Hadoop facilities. */
043public abstract class HadoopStepStats extends BaseCachedStepStats<Configuration, RunningJob, Counters>
044  {
045  private static final Logger LOG = LoggerFactory.getLogger( HadoopStepStats.class );
046
047  private HadoopNodeStats mapperNodeStats;
048  private HadoopNodeStats reducerNodeStats;
049
050  protected static Job getJob( RunningJob runningJob )
051    {
052    if( runningJob == null ) // if null, job hasn't been submitted
053      return null;
054
055    Job job = Util.returnInstanceFieldIfExistsSafe( runningJob, "job" );
056
057    if( job == null )
058      {
059      LOG.warn( "unable to get underlying org.apache.hadoop.mapreduce.Job from org.apache.hadoop.mapred.RunningJob, task level task counters will be unavailable" );
060      return null;
061      }
062
063    return job;
064    }
065
066  protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState )
067    {
068    super( flowStep, clientState );
069
070    BaseFlowStep<JobConf> step = (BaseFlowStep<JobConf>) flowStep;
071
072    // don't rely on the iterator topological sort to identify mapper or reducer
073    for( FlowNode current : step.getFlowNodeGraph().vertexSet() )
074      {
075      if( step.getFlowNodeGraph().inDegreeOf( current ) == 0 )
076        {
077        if( mapperNodeStats != null )
078          throw new IllegalStateException( "mapper node already found" );
079
080        mapperNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.MAPPER, current, clientState );
081        addNodeStats( mapperNodeStats );
082        }
083      else
084        {
085        if( reducerNodeStats != null )
086          throw new IllegalStateException( "reducer node already found" );
087
088        reducerNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.REDUCER, current, clientState );
089        addNodeStats( reducerNodeStats );
090        }
091      }
092
093    if( mapperNodeStats == null )
094      throw new IllegalStateException( "mapper node not found" );
095
096    counterCache = new HadoopStepCounterCache( this, (Configuration) getConfig() )
097    {
098    @Override
099    protected RunningJob getJobStatusClient()
100      {
101      return HadoopStepStats.this.getJobStatusClient();
102      }
103    };
104    }
105
106  private Configuration getConfig()
107    {
108    return (Configuration) this.getFlowStep().getConfig();
109    }
110
111  /**
112   * Method getNumMapTasks returns the numMapTasks from the Hadoop job file.
113   *
114   * @return the numMapTasks (type int) of this HadoopStepStats object.
115   */
116  public int getNumMapTasks()
117    {
118    return mapperNodeStats.getChildren().size();
119    }
120
121  /**
122   * Method getNumReduceTasks returns the numReducerTasks from the Hadoop job file.
123   *
124   * @return the numReducerTasks (type int) of this HadoopStepStats object.
125   */
126  public int getNumReduceTasks()
127    {
128    return reducerNodeStats == null ? 0 : reducerNodeStats.getChildren().size();
129    }
130
131  @Override
132  public String getProcessStepID()
133    {
134    if( getJobStatusClient() == null )
135      return null;
136
137    return getJobStatusClient().getJobID().toString();
138    }
139
140  /**
141   * Method getJobClient returns the Hadoop {@link JobClient} managing this Hadoop job.
142   *
143   * @return the jobClient (type JobClient) of this HadoopStepStats object.
144   */
145  public abstract JobClient getJobClient();
146
147  /**
148   * Returns the underlying Map tasks progress percentage.
149   * <p/>
150   * This method is experimental.
151   *
152   * @return float
153   */
154  public float getMapProgress()
155    {
156    Job runningJob = getJob( getJobStatusClient() );
157
158    if( runningJob == null )
159      return 0;
160
161    try
162      {
163      return runningJob.mapProgress();
164      }
165    catch( IOException exception )
166      {
167      throw new FlowException( "unable to get progress" );
168      }
169    }
170
171  /**
172   * Returns the underlying Reduce tasks progress percentage.
173   * <p/>
174   * This method is experimental.
175   *
176   * @return float
177   */
178  public float getReduceProgress()
179    {
180    Job runningJob = getJob( getJobStatusClient() );
181
182    if( runningJob == null )
183      return 0;
184
185    try
186      {
187      return runningJob.reduceProgress();
188      }
189    catch( IOException exception )
190      {
191      throw new FlowException( "unable to get progress" );
192      }
193    }
194
195  @Override
196  public String getProcessStatusURL()
197    {
198    return getStatusURL();
199    }
200
201  /**
202   * @deprecated see {@link #getProcessStatusURL()}
203   */
204  @Deprecated
205  public String getStatusURL()
206    {
207    Job runningJob = getJob( getJobStatusClient() );
208
209    if( runningJob == null )
210      return null;
211
212    return runningJob.getTrackingURL();
213    }
214
215  /** Method captureDetail captures statistics task details and completion events. */
216  @Override
217  public synchronized void captureDetail( Type depth )
218    {
219    if( !getType().isChild( depth ) || !isDetailStale() )
220      return;
221
222    Job runningJob = getJob( getJobStatusClient() );
223
224    if( runningJob == null )
225      return;
226
227    try
228      {
229      mapperNodeStats.captureDetail( depth );
230
231      if( reducerNodeStats != null )
232        reducerNodeStats.captureDetail( depth );
233
234      int count = 0;
235
236      while( depth == Type.ATTEMPT )
237        {
238        TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( count );
239
240        if( events.length == 0 )
241          break;
242
243        addAttemptsToTaskStats( events );
244        count += events.length;
245        }
246
247      markDetailCaptured();
248      }
249    catch( IOException exception )
250      {
251      LOG.warn( "unable to get task stats", exception );
252      }
253    }
254
255  private void addAttemptsToTaskStats( TaskCompletionEvent[] events )
256    {
257    for( TaskCompletionEvent event : events )
258      {
259      if( event == null )
260        {
261        LOG.warn( "found empty completion event" );
262        continue;
263        }
264
265      if( event.isMapTask() )
266        mapperNodeStats.addAttempt( event );
267      else
268        reducerNodeStats.addAttempt( event );
269      }
270    }
271  }