001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.util.Collection;
024import java.util.HashMap;
025import java.util.Map;
026
027import cascading.stats.CascadingStats;
028import cascading.stats.FlowSliceStats;
029import cascading.stats.ProvidesCounters;
030import cascading.util.Util;
031import org.apache.hadoop.mapred.Counters;
032import org.apache.hadoop.mapred.TaskCompletionEvent;
033import org.apache.hadoop.mapred.TaskReport;
034
035import static cascading.stats.CascadingStats.Status.*;
036
037/** Class HadoopTaskStats tracks individual task stats. */
038public class HadoopSliceStats extends FlowSliceStats<HadoopSliceStats.Kind> implements ProvidesCounters
039  {
040  private final CascadingStats.Status parentStatus;
041
042  public static class HadoopAttempt extends FlowSliceAttempt
043    {
044    private final TaskCompletionEvent event;
045
046    public HadoopAttempt( TaskCompletionEvent event )
047      {
048      this.event = event;
049      }
050
051    @Override
052    public String getProcessAttemptID()
053      {
054      return event.getTaskAttemptId().toString();
055      }
056
057    @Override
058    public int getEventId()
059      {
060      return event.getEventId();
061      }
062
063    @Override
064    public int getProcessDuration()
065      {
066      return event.getTaskRunTime();
067      }
068
069    @Override
070    public String getProcessStatus()
071      {
072      return event.getTaskStatus().toString();
073      }
074
075    @Override
076    public String getStatusURL()
077      {
078      return event.getTaskTrackerHttp();
079      }
080
081    @Override
082    public CascadingStats.Status getStatus()
083      {
084      CascadingStats.Status status = null;
085
086      switch( event.getTaskStatus() )
087        {
088        case FAILED:
089          status = FAILED;
090          break;
091        case KILLED:
092          status = STOPPED;
093          break;
094        case SUCCEEDED:
095          status = SUCCESSFUL;
096          break;
097        case OBSOLETE:
098          status = SKIPPED;
099          break;
100        case TIPFAILED:
101          status = FAILED;
102          break;
103        }
104      return status;
105      }
106
107    @Override
108    public String getProcessHostname()
109      {
110      return Util.parseHostname( event.getTaskTrackerHttp() );
111      }
112    }
113
114  public enum Kind
115    {
116      SETUP, MAPPER, REDUCER, CLEANUP
117    }
118
119  private String id;
120  private Kind kind;
121  private TaskReport taskReport;
122  private Map<String, Map<String, Long>> counters;
123  private long lastFetch = -1;
124
125  private Map<Integer, FlowSliceAttempt> attempts = new HashMap<>();
126
127  HadoopSliceStats( String id, CascadingStats.Status parentStatus, Kind kind, TaskReport taskReport, long lastFetch )
128    {
129    this.parentStatus = parentStatus;
130    this.id = id;
131    this.kind = kind;
132    this.taskReport = taskReport;
133    this.lastFetch = lastFetch;
134    }
135
136  @Override
137  public String getID()
138    {
139    return id;
140    }
141
142  @Override
143  public Kind getKind()
144    {
145    return kind;
146    }
147
148  @Override
149  public String getProcessSliceID()
150    {
151    return taskReport.getTaskID().toString();
152    }
153
154  public int getTaskIDNum()
155    {
156    return taskReport.getTaskID().getId();
157    }
158
159  @Override
160  public String getProcessNodeID()
161    {
162    return null;
163    }
164
165  @Override
166  public String getProcessStepID()
167    {
168    return taskReport.getTaskID().getJobID().toString();
169    }
170
171  protected TaskReport getTaskReport()
172    {
173    return taskReport;
174    }
175
176  @Override
177  public String getProcessStatus()
178    {
179    return taskReport.getState();
180    }
181
182  @Override
183  public float getProcessProgress()
184    {
185    return taskReport.getProgress();
186    }
187
188  @Override
189  public long getProcessStartTime()
190    {
191    return taskReport.getStartTime();
192    }
193
194  @Override
195  public long getProcessFinishTime()
196    {
197    return taskReport.getFinishTime();
198    }
199
200  public CascadingStats.Status getParentStatus()
201    {
202    return parentStatus;
203    }
204
205  @Override
206  public CascadingStats.Status getStatus()
207    {
208    CascadingStats.Status status = null;
209
210    switch( taskReport.getCurrentStatus() )
211      {
212      case PENDING:
213        status = PENDING;
214        break;
215      case RUNNING:
216        status = RUNNING;
217        break;
218      case COMPLETE:
219        status = SUCCESSFUL;
220        break;
221      case KILLED:
222        status = STOPPED;
223        break;
224      case FAILED:
225        status = FAILED;
226        break;
227      }
228
229    return status;
230    }
231
232  @Override
233  public String[] getDiagnostics()
234    {
235    return taskReport.getDiagnostics();
236    }
237
238  @Override
239  public Map<String, Map<String, Long>> getCounters()
240    {
241    if( counters == null )
242      setCounters( taskReport );
243
244    return counters;
245    }
246
247  @Override
248  public Map<Integer, FlowSliceAttempt> getAttempts()
249    {
250    return attempts;
251    }
252
253  private void setCounters( TaskReport taskReport )
254    {
255    this.counters = new HashMap<>();
256
257    Counters hadoopCounters = taskReport.getCounters();
258
259    for( Counters.Group group : hadoopCounters )
260      {
261      Map<String, Long> values = new HashMap<String, Long>();
262
263      this.counters.put( group.getName(), values );
264
265      for( Counters.Counter counter : group )
266        values.put( counter.getName(), counter.getCounter() );
267      }
268    }
269
270  public void setLastFetch( long lastFetch )
271    {
272    this.lastFetch = lastFetch;
273    }
274
275  @Override
276  public long getLastSuccessfulCounterFetchTime()
277    {
278    return lastFetch;
279    }
280
281  @Override
282  public Collection<String> getCounterGroups()
283    {
284    return getCounters().keySet();
285    }
286
287  @Override
288  public Collection<String> getCountersFor( String group )
289    {
290    return getCounters().get( group ).keySet();
291    }
292
293  @Override
294  public Collection<String> getCountersFor( Class<? extends Enum> group )
295    {
296    return getCountersFor( group.getDeclaringClass().getName() );
297    }
298
299  @Override
300  public long getCounterValue( Enum counter )
301    {
302    return getCounterValue( counter.getDeclaringClass().getName(), counter.name() );
303    }
304
305  @Override
306  public long getCounterValue( String group, String name )
307    {
308    if( getCounters() == null || getCounters().get( group ) == null )
309      return 0;
310
311    Long value = getCounters().get( group ).get( name );
312
313    if( value == null )
314      return 0;
315
316    return value;
317    }
318
319  public void addAttempt( TaskCompletionEvent event )
320    {
321    attempts.put( event.getEventId(), new HadoopAttempt( event ) );
322    }
323
324  @Override
325  public String toString()
326    {
327    final StringBuilder sb = new StringBuilder();
328    sb.append( "HadoopSliceStats" );
329    sb.append( "{id='" ).append( id ).append( '\'' );
330    sb.append( ", kind=" ).append( kind );
331    sb.append( '}' );
332    return sb.toString();
333    }
334  }