001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.Map;
026
027import cascading.stats.CascadingStats;
028import cascading.stats.FlowSliceStats;
029import cascading.stats.ProvidesCounters;
030import cascading.util.Util;
031import org.apache.hadoop.mapreduce.Counter;
032import org.apache.hadoop.mapreduce.CounterGroup;
033import org.apache.hadoop.mapreduce.Counters;
034import org.apache.hadoop.mapreduce.TaskCompletionEvent;
035import org.apache.hadoop.mapreduce.TaskReport;
036
037import static cascading.stats.CascadingStats.Status.*;
038
039/** Class HadoopTaskStats tracks individual task stats. */
040public class HadoopSliceStats extends FlowSliceStats<HadoopSliceStats.Kind> implements ProvidesCounters
041  {
042  public static class HadoopAttempt extends FlowSliceAttempt
043    {
044    private final TaskCompletionEvent event;
045
046    public HadoopAttempt( TaskCompletionEvent event )
047      {
048      this.event = event;
049      }
050
051    @Override
052    public String getProcessAttemptID()
053      {
054      return event.getTaskAttemptId().toString();
055      }
056
057    @Override
058    public int getEventId()
059      {
060      return event.getEventId();
061      }
062
063    @Override
064    public int getProcessDuration()
065      {
066      return event.getTaskRunTime();
067      }
068
069    @Override
070    public String getProcessStatus()
071      {
072      return event.getStatus().toString();
073      }
074
075    @Override
076    public String getStatusURL()
077      {
078      return event.getTaskTrackerHttp();
079      }
080
081    @Override
082    public CascadingStats.Status getStatus()
083      {
084      CascadingStats.Status status = null;
085
086      switch( event.getStatus() )
087        {
088        case FAILED:
089          status = FAILED;
090          break;
091        case KILLED:
092          status = STOPPED;
093          break;
094        case SUCCEEDED:
095          status = SUCCESSFUL;
096          break;
097        case OBSOLETE:
098          status = SKIPPED;
099          break;
100        case TIPFAILED:
101          status = FAILED;
102          break;
103        }
104      return status;
105      }
106
107    @Override
108    public String getProcessHostname()
109      {
110      return Util.parseHostname( event.getTaskTrackerHttp() );
111      }
112    }
113
114  public enum Kind
115    {
116      SETUP, MAPPER, REDUCER, CLEANUP
117    }
118
119  private String id;
120  private CascadingStats.Status parentStatus;
121  private Kind kind;
122  private TaskReport taskReport;
123  private Map<String, Map<String, Long>> counters;
124  private long lastFetch = -1;
125
126  private Map<Integer, FlowSliceAttempt> attempts = new HashMap<>();
127
128  HadoopSliceStats( String id, CascadingStats.Status parentStatus, Kind kind, TaskReport taskReport, long lastFetch )
129    {
130    this.id = id;
131    this.parentStatus = parentStatus;
132    this.kind = kind;
133    this.taskReport = taskReport;
134    this.lastFetch = lastFetch;
135    }
136
137  public void update( CascadingStats.Status parentStatus, Kind kind, TaskReport taskReport, long lastFetch )
138    {
139    this.parentStatus = parentStatus;
140    this.kind = kind;
141    this.taskReport = taskReport;
142    this.lastFetch = lastFetch;
143    this.counters = null; // force recalc of counters
144    }
145
146  @Override
147  public String getID()
148    {
149    return id;
150    }
151
152  @Override
153  public Kind getKind()
154    {
155    return kind;
156    }
157
158  @Override
159  public String getProcessSliceID()
160    {
161    return taskReport.getTaskID().toString();
162    }
163
164  public int getTaskIDNum()
165    {
166    return taskReport.getTaskID().getId();
167    }
168
169  @Override
170  public String getProcessNodeID()
171    {
172    return null; // map and reduce side of the job don't have ids, like a tez vertex does
173    }
174
175  @Override
176  public String getProcessStepID()
177    {
178    return taskReport.getTaskID().getJobID().toString();
179    }
180
181  protected TaskReport getTaskReport()
182    {
183    return taskReport;
184    }
185
186  public float getProgress()
187    {
188    return taskReport.getProgress();
189    }
190
191  @Override
192  public String getProcessStatus()
193    {
194    return taskReport.getState();
195    }
196
197  @Override
198  public float getProcessProgress()
199    {
200    return taskReport.getProgress();
201    }
202
203  @Override
204  public long getProcessStartTime()
205    {
206    return taskReport.getStartTime();
207    }
208
209  @Override
210  public long getProcessFinishTime()
211    {
212    return taskReport.getFinishTime();
213    }
214
215  public CascadingStats.Status getParentStatus()
216    {
217    return parentStatus;
218    }
219
220  @Override
221  public CascadingStats.Status getStatus()
222    {
223    CascadingStats.Status status = null;
224
225    switch( taskReport.getCurrentStatus() )
226      {
227      case PENDING:
228        status = PENDING;
229        break;
230      case RUNNING:
231        status = RUNNING;
232        break;
233      case COMPLETE:
234        status = SUCCESSFUL;
235        break;
236      case KILLED:
237        status = STOPPED;
238        break;
239      case FAILED:
240        status = FAILED;
241        break;
242      }
243
244    return status;
245    }
246
247  @Override
248  public String[] getDiagnostics()
249    {
250    return taskReport.getDiagnostics();
251    }
252
253  @Override
254  public Map<String, Map<String, Long>> getCounters()
255    {
256    if( counters == null )
257      setCounters( taskReport );
258
259    return counters;
260    }
261
262  @Override
263  public Map<Integer, FlowSliceAttempt> getAttempts()
264    {
265    return attempts;
266    }
267
268  private void setCounters( TaskReport taskReport )
269    {
270    this.counters = new HashMap<>();
271
272    Counters hadoopCounters = taskReport.getTaskCounters();
273
274    for( CounterGroup group : hadoopCounters )
275      {
276      Map<String, Long> values = new HashMap<String, Long>();
277
278      this.counters.put( group.getName(), values );
279
280      for( Counter counter : group )
281        values.put( counter.getName(), counter.getValue() );
282      }
283    }
284
285  public void setLastFetch( long lastFetch )
286    {
287    this.lastFetch = lastFetch;
288    }
289
290  @Override
291  public long getLastSuccessfulCounterFetchTime()
292    {
293    return lastFetch;
294    }
295
296  @Override
297  public Collection<String> getCounterGroups()
298    {
299    return getCounters().keySet();
300    }
301
302  @Override
303  public Collection<String> getCountersFor( String group )
304    {
305    return getCounters().get( group ).keySet();
306    }
307
308  @Override
309  public Collection<String> getCountersFor( Class<? extends Enum> group )
310    {
311    return getCountersFor( group.getDeclaringClass().getName() );
312    }
313
314  @Override
315  public long getCounterValue( Enum counter )
316    {
317    return getCounterValue( counter.getDeclaringClass().getName(), counter.name() );
318    }
319
320  @Override
321  public long getCounterValue( String group, String name )
322    {
323    if( getCounters() == null || getCounters().get( group ) == null )
324      return 0;
325
326    Long value = getCounters().get( group ).get( name );
327
328    if( value == null )
329      return 0;
330
331    return value;
332    }
333
334  public void addAttempt( TaskCompletionEvent event )
335    {
336    attempts.put( event.getEventId(), new HadoopAttempt( event ) );
337    }
338
339  @Override
340  public String toString()
341    {
342    final StringBuilder sb = new StringBuilder();
343    sb.append( "HadoopSliceStats" );
344    sb.append( "{id='" ).append( id ).append( '\'' );
345    sb.append( ", kind=" ).append( kind );
346    sb.append( '}' );
347    return sb.toString();
348    }
349  }