001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.Map;
026
027import cascading.stats.CascadingStats;
028import cascading.stats.FlowSliceStats;
029import cascading.stats.ProvidesCounters;
030import org.apache.hadoop.mapred.Counters;
031import org.apache.hadoop.mapred.TaskCompletionEvent;
032import org.apache.hadoop.mapred.TaskReport;
033
034import static cascading.stats.CascadingStats.Status.*;
035
036/** Class HadoopTaskStats tracks individual task stats. */
037public class HadoopSliceStats extends FlowSliceStats<HadoopSliceStats.Kind> implements ProvidesCounters
038  {
039  private final CascadingStats.Status parentStatus;
040
041  public static class HadoopAttempt extends FlowSliceAttempt
042    {
043    private final TaskCompletionEvent event;
044
045    public HadoopAttempt( TaskCompletionEvent event )
046      {
047      this.event = event;
048      }
049
050    @Override
051    public String getProcessAttemptID()
052      {
053      return event.getTaskAttemptId().toString();
054      }
055
056    @Override
057    public int getEventId()
058      {
059      return event.getEventId();
060      }
061
062    @Override
063    public int getProcessDuration()
064      {
065      return event.getTaskRunTime();
066      }
067
068    @Override
069    public String getProcessStatus()
070      {
071      return event.getTaskStatus().toString();
072      }
073
074    @Override
075    public String getStatusURL()
076      {
077      return event.getTaskTrackerHttp();
078      }
079
080    @Override
081    public CascadingStats.Status getStatus()
082      {
083      CascadingStats.Status status = null;
084
085      switch( event.getTaskStatus() )
086        {
087        case FAILED:
088          status = FAILED;
089          break;
090        case KILLED:
091          status = STOPPED;
092          break;
093        case SUCCEEDED:
094          status = SUCCESSFUL;
095          break;
096        case OBSOLETE:
097          status = SKIPPED;
098          break;
099        case TIPFAILED:
100          status = FAILED;
101          break;
102        }
103      return status;
104      }
105    }
106
107  public enum Kind
108    {
109      SETUP, MAPPER, REDUCER, CLEANUP
110    }
111
112  private String id;
113  private Kind kind;
114  private TaskReport taskReport;
115  private Map<String, Map<String, Long>> counters;
116  private long lastFetch = -1;
117
118  private Map<Integer, FlowSliceAttempt> attempts = new HashMap<>();
119
120  HadoopSliceStats( String id, CascadingStats.Status parentStatus, Kind kind, TaskReport taskReport, long lastFetch )
121    {
122    this.parentStatus = parentStatus;
123    this.id = id;
124    this.kind = kind;
125    this.taskReport = taskReport;
126    this.lastFetch = lastFetch;
127    }
128
129  @Override
130  public String getID()
131    {
132    return id;
133    }
134
135  @Override
136  public Kind getKind()
137    {
138    return kind;
139    }
140
141  @Override
142  public String getProcessSliceID()
143    {
144    return taskReport.getTaskID().toString();
145    }
146
147  public int getTaskIDNum()
148    {
149    return taskReport.getTaskID().getId();
150    }
151
152  @Override
153  public String getProcessStepID()
154    {
155    return taskReport.getTaskID().getJobID().toString();
156    }
157
158  protected TaskReport getTaskReport()
159    {
160    return taskReport;
161    }
162
163  @Override
164  public String getProcessStatus()
165    {
166    return taskReport.getState();
167    }
168
169  @Override
170  public float getProcessProgress()
171    {
172    return taskReport.getProgress();
173    }
174
175  @Override
176  public long getProcessStartTime()
177    {
178    return taskReport.getStartTime();
179    }
180
181  @Override
182  public long getProcessFinishTime()
183    {
184    return taskReport.getFinishTime();
185    }
186
187  public CascadingStats.Status getParentStatus()
188    {
189    return parentStatus;
190    }
191
192  @Override
193  public CascadingStats.Status getStatus()
194    {
195    CascadingStats.Status status = null;
196
197    switch( taskReport.getCurrentStatus() )
198      {
199      case PENDING:
200        status = PENDING;
201        break;
202      case RUNNING:
203        status = RUNNING;
204        break;
205      case COMPLETE:
206        status = SUCCESSFUL;
207        break;
208      case KILLED:
209        status = STOPPED;
210        break;
211      case FAILED:
212        status = FAILED;
213        break;
214      }
215
216    return status;
217    }
218
219  @Override
220  public String[] getDiagnostics()
221    {
222    return taskReport.getDiagnostics();
223    }
224
225  @Override
226  public Map<String, Map<String, Long>> getCounters()
227    {
228    if( counters == null )
229      setCounters( taskReport );
230
231    return counters;
232    }
233
234  @Override
235  public Map<Integer, FlowSliceAttempt> getAttempts()
236    {
237    return attempts;
238    }
239
240  private void setCounters( TaskReport taskReport )
241    {
242    this.counters = new HashMap<>();
243
244    Counters hadoopCounters = taskReport.getCounters();
245
246    for( Counters.Group group : hadoopCounters )
247      {
248      Map<String, Long> values = new HashMap<String, Long>();
249
250      this.counters.put( group.getName(), values );
251
252      for( Counters.Counter counter : group )
253        values.put( counter.getName(), counter.getCounter() );
254      }
255    }
256
257  public void setLastFetch( long lastFetch )
258    {
259    this.lastFetch = lastFetch;
260    }
261
262  @Override
263  public long getLastSuccessfulCounterFetchTime()
264    {
265    return lastFetch;
266    }
267
268  @Override
269  public Collection<String> getCounterGroups()
270    {
271    return getCounters().keySet();
272    }
273
274  @Override
275  public Collection<String> getCountersFor( String group )
276    {
277    return getCounters().get( group ).keySet();
278    }
279
280  @Override
281  public Collection<String> getCountersFor( Class<? extends Enum> group )
282    {
283    return getCountersFor( group.getDeclaringClass().getName() );
284    }
285
286  @Override
287  public long getCounterValue( Enum counter )
288    {
289    return getCounterValue( counter.getDeclaringClass().getName(), counter.name() );
290    }
291
292  @Override
293  public long getCounterValue( String group, String name )
294    {
295    if( getCounters() == null || getCounters().get( group ) == null )
296      return 0;
297
298    Long value = getCounters().get( group ).get( name );
299
300    if( value == null )
301      return 0;
302
303    return value;
304    }
305
306  public void addAttempt( TaskCompletionEvent event )
307    {
308    attempts.put( event.getEventId(), new HadoopAttempt( event ) );
309    }
310
311  @Override
312  public String toString()
313    {
314    final StringBuilder sb = new StringBuilder();
315    sb.append( "HadoopSliceStats" );
316    sb.append( "{id='" ).append( id ).append( '\'' );
317    sb.append( ", kind=" ).append( kind );
318    sb.append( '}' );
319    return sb.toString();
320    }
321  }