001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.Map; 026 027import cascading.stats.CascadingStats; 028import cascading.stats.FlowSliceStats; 029import cascading.stats.ProvidesCounters; 030import org.apache.hadoop.mapreduce.Counter; 031import org.apache.hadoop.mapreduce.CounterGroup; 032import org.apache.hadoop.mapreduce.Counters; 033import org.apache.hadoop.mapreduce.TaskCompletionEvent; 034import org.apache.hadoop.mapreduce.TaskReport; 035 036import static cascading.stats.CascadingStats.Status.*; 037 038/** Class HadoopTaskStats tracks individual task stats. */ 039public class HadoopSliceStats extends FlowSliceStats<HadoopSliceStats.Kind> implements ProvidesCounters 040 { 041 public static class HadoopAttempt extends FlowSliceAttempt 042 { 043 private final TaskCompletionEvent event; 044 045 public HadoopAttempt( TaskCompletionEvent event ) 046 { 047 this.event = event; 048 } 049 050 @Override 051 public String getProcessAttemptID() 052 { 053 return event.getTaskAttemptId().toString(); 054 } 055 056 @Override 057 public int getEventId() 058 { 059 return event.getEventId(); 060 } 061 062 @Override 063 public int getProcessDuration() 064 { 065 return event.getTaskRunTime(); 066 } 067 068 @Override 069 public String getProcessStatus() 070 { 071 return event.getStatus().toString(); 072 } 073 074 @Override 075 public String getStatusURL() 076 { 077 return event.getTaskTrackerHttp(); 078 } 079 080 @Override 081 public CascadingStats.Status getStatus() 082 { 083 CascadingStats.Status status = null; 084 085 switch( event.getStatus() ) 086 { 087 case FAILED: 088 status = FAILED; 089 break; 090 case KILLED: 091 status = STOPPED; 092 break; 093 case SUCCEEDED: 094 status = SUCCESSFUL; 095 break; 096 case OBSOLETE: 097 status = SKIPPED; 098 break; 099 case TIPFAILED: 100 status = FAILED; 101 break; 102 } 103 return status; 104 } 105 } 106 107 public enum Kind 108 { 109 SETUP, MAPPER, REDUCER, CLEANUP 110 } 111 112 private String id; 113 private CascadingStats.Status parentStatus; 114 private Kind kind; 115 private TaskReport taskReport; 116 private Map<String, Map<String, Long>> counters; 117 private long lastFetch = -1; 118 119 private Map<Integer, FlowSliceAttempt> attempts = new HashMap<>(); 120 121 HadoopSliceStats( String id, CascadingStats.Status parentStatus, Kind kind, TaskReport taskReport, long lastFetch ) 122 { 123 this.id = id; 124 this.parentStatus = parentStatus; 125 this.kind = kind; 126 this.taskReport = taskReport; 127 this.lastFetch = lastFetch; 128 } 129 130 public void update( CascadingStats.Status parentStatus, Kind kind, TaskReport taskReport, long lastFetch ) 131 { 132 this.parentStatus = parentStatus; 133 this.kind = kind; 134 this.taskReport = taskReport; 135 this.lastFetch = lastFetch; 136 this.counters = null; // force recalc of counters 137 } 138 139 @Override 140 public String getID() 141 { 142 return id; 143 } 144 145 @Override 146 public Kind getKind() 147 { 148 return kind; 149 } 150 151 @Override 152 public String getProcessSliceID() 153 { 154 return taskReport.getTaskID().toString(); 155 } 156 157 public int getTaskIDNum() 158 { 159 return taskReport.getTaskID().getId(); 160 } 161 162 @Override 163 public String getProcessStepID() 164 { 165 return taskReport.getTaskID().getJobID().toString(); 166 } 167 168 protected TaskReport getTaskReport() 169 { 170 return taskReport; 171 } 172 173 public float getProgress() 174 { 175 return taskReport.getProgress(); 176 } 177 178 @Override 179 public String getProcessStatus() 180 { 181 return taskReport.getState(); 182 } 183 184 @Override 185 public float getProcessProgress() 186 { 187 return taskReport.getProgress(); 188 } 189 190 @Override 191 public long getProcessStartTime() 192 { 193 return taskReport.getStartTime(); 194 } 195 196 @Override 197 public long getProcessFinishTime() 198 { 199 return taskReport.getFinishTime(); 200 } 201 202 public CascadingStats.Status getParentStatus() 203 { 204 return parentStatus; 205 } 206 207 @Override 208 public CascadingStats.Status getStatus() 209 { 210 CascadingStats.Status status = null; 211 212 switch( taskReport.getCurrentStatus() ) 213 { 214 case PENDING: 215 status = PENDING; 216 break; 217 case RUNNING: 218 status = RUNNING; 219 break; 220 case COMPLETE: 221 status = SUCCESSFUL; 222 break; 223 case KILLED: 224 status = STOPPED; 225 break; 226 case FAILED: 227 status = FAILED; 228 break; 229 } 230 231 return status; 232 } 233 234 @Override 235 public String[] getDiagnostics() 236 { 237 return taskReport.getDiagnostics(); 238 } 239 240 @Override 241 public Map<String, Map<String, Long>> getCounters() 242 { 243 if( counters == null ) 244 setCounters( taskReport ); 245 246 return counters; 247 } 248 249 @Override 250 public Map<Integer, FlowSliceAttempt> getAttempts() 251 { 252 return attempts; 253 } 254 255 private void setCounters( TaskReport taskReport ) 256 { 257 this.counters = new HashMap<>(); 258 259 Counters hadoopCounters = taskReport.getTaskCounters(); 260 261 for( CounterGroup group : hadoopCounters ) 262 { 263 Map<String, Long> values = new HashMap<String, Long>(); 264 265 this.counters.put( group.getName(), values ); 266 267 for( Counter counter : group ) 268 values.put( counter.getName(), counter.getValue() ); 269 } 270 } 271 272 public void setLastFetch( long lastFetch ) 273 { 274 this.lastFetch = lastFetch; 275 } 276 277 @Override 278 public long getLastSuccessfulCounterFetchTime() 279 { 280 return lastFetch; 281 } 282 283 @Override 284 public Collection<String> getCounterGroups() 285 { 286 return getCounters().keySet(); 287 } 288 289 @Override 290 public Collection<String> getCountersFor( String group ) 291 { 292 return getCounters().get( group ).keySet(); 293 } 294 295 @Override 296 public Collection<String> getCountersFor( Class<? extends Enum> group ) 297 { 298 return getCountersFor( group.getDeclaringClass().getName() ); 299 } 300 301 @Override 302 public long getCounterValue( Enum counter ) 303 { 304 return getCounterValue( counter.getDeclaringClass().getName(), counter.name() ); 305 } 306 307 @Override 308 public long getCounterValue( String group, String name ) 309 { 310 if( getCounters() == null || getCounters().get( group ) == null ) 311 return 0; 312 313 Long value = getCounters().get( group ).get( name ); 314 315 if( value == null ) 316 return 0; 317 318 return value; 319 } 320 321 public void addAttempt( TaskCompletionEvent event ) 322 { 323 attempts.put( event.getEventId(), new HadoopAttempt( event ) ); 324 } 325 326 @Override 327 public String toString() 328 { 329 final StringBuilder sb = new StringBuilder(); 330 sb.append( "HadoopSliceStats" ); 331 sb.append( "{id='" ).append( id ).append( '\'' ); 332 sb.append( ", kind=" ).append( kind ); 333 sb.append( '}' ); 334 return sb.toString(); 335 } 336 }