001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.Map;
026
027import cascading.stats.CascadingStats;
028import cascading.stats.FlowSliceStats;
029import cascading.stats.ProvidesCounters;
030import org.apache.hadoop.mapreduce.Counter;
031import org.apache.hadoop.mapreduce.CounterGroup;
032import org.apache.hadoop.mapreduce.Counters;
033import org.apache.hadoop.mapreduce.TaskCompletionEvent;
034import org.apache.hadoop.mapreduce.TaskReport;
035
036import static cascading.stats.CascadingStats.Status.*;
037
038/** Class HadoopTaskStats tracks individual task stats. */
039public class HadoopSliceStats extends FlowSliceStats<HadoopSliceStats.Kind> implements ProvidesCounters
040  {
041  public static class HadoopAttempt extends FlowSliceAttempt
042    {
043    private final TaskCompletionEvent event;
044
045    public HadoopAttempt( TaskCompletionEvent event )
046      {
047      this.event = event;
048      }
049
050    @Override
051    public String getProcessAttemptID()
052      {
053      return event.getTaskAttemptId().toString();
054      }
055
056    @Override
057    public int getEventId()
058      {
059      return event.getEventId();
060      }
061
062    @Override
063    public int getProcessDuration()
064      {
065      return event.getTaskRunTime();
066      }
067
068    @Override
069    public String getProcessStatus()
070      {
071      return event.getStatus().toString();
072      }
073
074    @Override
075    public String getStatusURL()
076      {
077      return event.getTaskTrackerHttp();
078      }
079
080    @Override
081    public CascadingStats.Status getStatus()
082      {
083      CascadingStats.Status status = null;
084
085      switch( event.getStatus() )
086        {
087        case FAILED:
088          status = FAILED;
089          break;
090        case KILLED:
091          status = STOPPED;
092          break;
093        case SUCCEEDED:
094          status = SUCCESSFUL;
095          break;
096        case OBSOLETE:
097          status = SKIPPED;
098          break;
099        case TIPFAILED:
100          status = FAILED;
101          break;
102        }
103      return status;
104      }
105    }
106
107  public enum Kind
108    {
109      SETUP, MAPPER, REDUCER, CLEANUP
110    }
111
112  private String id;
113  private CascadingStats.Status parentStatus;
114  private Kind kind;
115  private TaskReport taskReport;
116  private Map<String, Map<String, Long>> counters;
117  private long lastFetch = -1;
118
119  private Map<Integer, FlowSliceAttempt> attempts = new HashMap<>();
120
121  HadoopSliceStats( String id, CascadingStats.Status parentStatus, Kind kind, TaskReport taskReport, long lastFetch )
122    {
123    this.id = id;
124    this.parentStatus = parentStatus;
125    this.kind = kind;
126    this.taskReport = taskReport;
127    this.lastFetch = lastFetch;
128    }
129
130  public void update( CascadingStats.Status parentStatus, Kind kind, TaskReport taskReport, long lastFetch )
131    {
132    this.parentStatus = parentStatus;
133    this.kind = kind;
134    this.taskReport = taskReport;
135    this.lastFetch = lastFetch;
136    this.counters = null; // force recalc of counters
137    }
138
139  @Override
140  public String getID()
141    {
142    return id;
143    }
144
145  @Override
146  public Kind getKind()
147    {
148    return kind;
149    }
150
151  @Override
152  public String getProcessSliceID()
153    {
154    return taskReport.getTaskID().toString();
155    }
156
157  public int getTaskIDNum()
158    {
159    return taskReport.getTaskID().getId();
160    }
161
162  @Override
163  public String getProcessStepID()
164    {
165    return taskReport.getTaskID().getJobID().toString();
166    }
167
168  protected TaskReport getTaskReport()
169    {
170    return taskReport;
171    }
172
173  public float getProgress()
174    {
175    return taskReport.getProgress();
176    }
177
178  @Override
179  public String getProcessStatus()
180    {
181    return taskReport.getState();
182    }
183
184  @Override
185  public float getProcessProgress()
186    {
187    return taskReport.getProgress();
188    }
189
190  @Override
191  public long getProcessStartTime()
192    {
193    return taskReport.getStartTime();
194    }
195
196  @Override
197  public long getProcessFinishTime()
198    {
199    return taskReport.getFinishTime();
200    }
201
202  public CascadingStats.Status getParentStatus()
203    {
204    return parentStatus;
205    }
206
207  @Override
208  public CascadingStats.Status getStatus()
209    {
210    CascadingStats.Status status = null;
211
212    switch( taskReport.getCurrentStatus() )
213      {
214      case PENDING:
215        status = PENDING;
216        break;
217      case RUNNING:
218        status = RUNNING;
219        break;
220      case COMPLETE:
221        status = SUCCESSFUL;
222        break;
223      case KILLED:
224        status = STOPPED;
225        break;
226      case FAILED:
227        status = FAILED;
228        break;
229      }
230
231    return status;
232    }
233
234  @Override
235  public String[] getDiagnostics()
236    {
237    return taskReport.getDiagnostics();
238    }
239
240  @Override
241  public Map<String, Map<String, Long>> getCounters()
242    {
243    if( counters == null )
244      setCounters( taskReport );
245
246    return counters;
247    }
248
249  @Override
250  public Map<Integer, FlowSliceAttempt> getAttempts()
251    {
252    return attempts;
253    }
254
255  private void setCounters( TaskReport taskReport )
256    {
257    this.counters = new HashMap<>();
258
259    Counters hadoopCounters = taskReport.getTaskCounters();
260
261    for( CounterGroup group : hadoopCounters )
262      {
263      Map<String, Long> values = new HashMap<String, Long>();
264
265      this.counters.put( group.getName(), values );
266
267      for( Counter counter : group )
268        values.put( counter.getName(), counter.getValue() );
269      }
270    }
271
272  public void setLastFetch( long lastFetch )
273    {
274    this.lastFetch = lastFetch;
275    }
276
277  @Override
278  public long getLastSuccessfulCounterFetchTime()
279    {
280    return lastFetch;
281    }
282
283  @Override
284  public Collection<String> getCounterGroups()
285    {
286    return getCounters().keySet();
287    }
288
289  @Override
290  public Collection<String> getCountersFor( String group )
291    {
292    return getCounters().get( group ).keySet();
293    }
294
295  @Override
296  public Collection<String> getCountersFor( Class<? extends Enum> group )
297    {
298    return getCountersFor( group.getDeclaringClass().getName() );
299    }
300
301  @Override
302  public long getCounterValue( Enum counter )
303    {
304    return getCounterValue( counter.getDeclaringClass().getName(), counter.name() );
305    }
306
307  @Override
308  public long getCounterValue( String group, String name )
309    {
310    if( getCounters() == null || getCounters().get( group ) == null )
311      return 0;
312
313    Long value = getCounters().get( group ).get( name );
314
315    if( value == null )
316      return 0;
317
318    return value;
319    }
320
321  public void addAttempt( TaskCompletionEvent event )
322    {
323    attempts.put( event.getEventId(), new HadoopAttempt( event ) );
324    }
325
326  @Override
327  public String toString()
328    {
329    final StringBuilder sb = new StringBuilder();
330    sb.append( "HadoopSliceStats" );
331    sb.append( "{id='" ).append( id ).append( '\'' );
332    sb.append( ", kind=" ).append( kind );
333    sb.append( '}' );
334    return sb.toString();
335    }
336  }