001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.io.IOException;
024
025import cascading.flow.FlowException;
026import cascading.flow.FlowNode;
027import cascading.flow.FlowStep;
028import cascading.flow.planner.BaseFlowStep;
029import cascading.management.state.ClientState;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.mapred.Counters;
032import org.apache.hadoop.mapred.JobClient;
033import org.apache.hadoop.mapred.JobConf;
034import org.apache.hadoop.mapred.RunningJob;
035import org.apache.hadoop.mapred.TaskCompletionEvent;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/** Class HadoopStepStats provides Hadoop specific statistics and methods to underlying Hadoop facilities. */
040public abstract class HadoopStepStats extends BaseHadoopStepStats<RunningJob, Counters>
041  {
042  private static final Logger LOG = LoggerFactory.getLogger( HadoopStepStats.class );
043
044  private HadoopNodeStats mapperNodeStats;
045  private HadoopNodeStats reducerNodeStats;
046
047  protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState )
048    {
049    super( flowStep, clientState );
050
051    BaseFlowStep<JobConf> step = (BaseFlowStep<JobConf>) flowStep;
052
053    // don't rely on the iterator topological sort to identify mapper or reducer
054    for( FlowNode current : step.getFlowNodeGraph().vertexSet() )
055      {
056      if( step.getFlowNodeGraph().inDegreeOf( current ) == 0 )
057        {
058        if( mapperNodeStats != null )
059          throw new IllegalStateException( "mapper node already found" );
060
061        mapperNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.MAPPER, current, clientState );
062        addNodeStats( mapperNodeStats );
063        }
064      else
065        {
066        if( reducerNodeStats != null )
067          throw new IllegalStateException( "reducer node already found" );
068
069        reducerNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.REDUCER, current, clientState );
070        addNodeStats( reducerNodeStats );
071        }
072      }
073
074    if( mapperNodeStats == null )
075      throw new IllegalStateException( "mapper node not found" );
076
077    counterCache = new HadoopStepCounterCache( this, (Configuration) getConfig() )
078    {
079    @Override
080    protected RunningJob getJobStatusClient()
081      {
082      return HadoopStepStats.this.getJobStatusClient();
083      }
084    };
085    }
086
087  private Configuration getConfig()
088    {
089    return (Configuration) this.getFlowStep().getConfig();
090    }
091
092  /**
093   * Method getNumMapTasks returns the numMapTasks from the Hadoop job file.
094   *
095   * @return the numMapTasks (type int) of this HadoopStepStats object.
096   */
097  public int getNumMapTasks()
098    {
099    return mapperNodeStats.getChildren().size();
100    }
101
102  /**
103   * Method getNumReduceTasks returns the numReducerTasks from the Hadoop job file.
104   *
105   * @return the numReducerTasks (type int) of this HadoopStepStats object.
106   */
107  public int getNumReduceTasks()
108    {
109    return reducerNodeStats == null ? 0 : reducerNodeStats.getChildren().size();
110    }
111
112  /**
113   * Method getProcessStepID returns the Hadoop running job JobID.
114   *
115   * @return the jobID (type String) of this HadoopStepStats object.
116   */
117  @Override
118  public String getProcessStepID()
119    {
120    if( getJobStatusClient() == null )
121      return null;
122
123    return getJobStatusClient().getID().toString();
124    }
125
126  /**
127   * Method getJobClient returns the Hadoop {@link JobClient} managing this Hadoop job.
128   *
129   * @return the jobClient (type JobClient) of this HadoopStepStats object.
130   */
131  public abstract JobClient getJobClient();
132
133  /**
134   * Returns the underlying Map tasks progress percentage.
135   * <p/>
136   * This method is experimental.
137   *
138   * @return float
139   */
140  public float getMapProgress()
141    {
142    RunningJob runningJob = getJobStatusClient();
143
144    if( runningJob == null )
145      return 0;
146
147    try
148      {
149      return runningJob.mapProgress();
150      }
151    catch( IOException exception )
152      {
153      throw new FlowException( "unable to get progress" );
154      }
155    }
156
157  /**
158   * Returns the underlying Reduce tasks progress percentage.
159   * <p/>
160   * This method is experimental.
161   *
162   * @return float
163   */
164  public float getReduceProgress()
165    {
166    RunningJob runningJob = getJobStatusClient();
167
168    if( runningJob == null )
169      return 0;
170
171    try
172      {
173      return runningJob.reduceProgress();
174      }
175    catch( IOException exception )
176      {
177      throw new FlowException( "unable to get progress" );
178      }
179    }
180
181  public String getStatusURL()
182    {
183    RunningJob runningJob = getJobStatusClient();
184
185    if( runningJob == null )
186      return null;
187
188    return runningJob.getTrackingURL();
189    }
190
191  private boolean stepHasReducers()
192    {
193    return getFlowStep().getNumFlowNodes() > 1;
194    }
195
196  /** Method captureDetail captures statistics task details and completion events. */
197  @Override
198  public synchronized void captureDetail( Type depth )
199    {
200    if( !getType().isChild( depth ) || !isDetailStale() )
201      return;
202
203    JobClient jobClient = getJobClient();
204    RunningJob runningJob = getJobStatusClient();
205
206    if( jobClient == null || runningJob == null )
207      return;
208
209    try
210      {
211      mapperNodeStats.captureDetail( depth );
212
213      if( reducerNodeStats != null )
214        reducerNodeStats.captureDetail( depth );
215
216      int count = 0;
217
218      while( depth == Type.ATTEMPT )
219        {
220        TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( count );
221
222        if( events.length == 0 )
223          break;
224
225        addAttemptsToTaskStats( events );
226        count += events.length;
227        }
228
229      markDetailCaptured();
230      }
231    catch( IOException exception )
232      {
233      LOG.warn( "unable to get task stats", exception );
234      }
235    }
236
237  private void addAttemptsToTaskStats( TaskCompletionEvent[] events )
238    {
239    for( TaskCompletionEvent event : events )
240      {
241      if( event == null )
242        {
243        LOG.warn( "found empty completion event" );
244        continue;
245        }
246
247      if( event.isMapTask() )
248        mapperNodeStats.addAttempt( event );
249      else
250        reducerNodeStats.addAttempt( event );
251      }
252    }
253  }