001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.Map; 026 027import cascading.stats.CascadingStats; 028import cascading.stats.FlowSliceStats; 029import cascading.stats.ProvidesCounters; 030import cascading.util.Util; 031import org.apache.hadoop.mapreduce.Counter; 032import org.apache.hadoop.mapreduce.CounterGroup; 033import org.apache.hadoop.mapreduce.Counters; 034import org.apache.hadoop.mapreduce.TaskCompletionEvent; 035import org.apache.hadoop.mapreduce.TaskReport; 036 037import static cascading.stats.CascadingStats.Status.*; 038 039/** Class HadoopTaskStats tracks individual task stats. */ 040public class HadoopSliceStats extends FlowSliceStats<HadoopSliceStats.Kind> implements ProvidesCounters 041 { 042 public static class HadoopAttempt extends FlowSliceAttempt 043 { 044 private final TaskCompletionEvent event; 045 046 public HadoopAttempt( TaskCompletionEvent event ) 047 { 048 this.event = event; 049 } 050 051 @Override 052 public String getProcessAttemptID() 053 { 054 return event.getTaskAttemptId().toString(); 055 } 056 057 @Override 058 public int getEventId() 059 { 060 return event.getEventId(); 061 } 062 063 @Override 064 public int getProcessDuration() 065 { 066 return event.getTaskRunTime(); 067 } 068 069 @Override 070 public String getProcessStatus() 071 { 072 return event.getStatus().toString(); 073 } 074 075 @Override 076 public String getStatusURL() 077 { 078 return event.getTaskTrackerHttp(); 079 } 080 081 @Override 082 public CascadingStats.Status getStatus() 083 { 084 CascadingStats.Status status = null; 085 086 switch( event.getStatus() ) 087 { 088 case FAILED: 089 status = FAILED; 090 break; 091 case KILLED: 092 status = STOPPED; 093 break; 094 case SUCCEEDED: 095 status = SUCCESSFUL; 096 break; 097 case OBSOLETE: 098 status = SKIPPED; 099 break; 100 case TIPFAILED: 101 status = FAILED; 102 break; 103 } 104 return status; 105 } 106 107 @Override 108 public String getProcessHostname() 109 { 110 return Util.parseHostname( event.getTaskTrackerHttp() ); 111 } 112 } 113 114 public enum Kind 115 { 116 SETUP, MAPPER, REDUCER, CLEANUP 117 } 118 119 private String id; 120 private CascadingStats.Status parentStatus; 121 private Kind kind; 122 private TaskReport taskReport; 123 private Map<String, Map<String, Long>> counters; 124 private long lastFetch = -1; 125 126 private Map<Integer, FlowSliceAttempt> attempts = new HashMap<>(); 127 128 HadoopSliceStats( String id, CascadingStats.Status parentStatus, Kind kind, TaskReport taskReport, long lastFetch ) 129 { 130 this.id = id; 131 this.parentStatus = parentStatus; 132 this.kind = kind; 133 this.taskReport = taskReport; 134 this.lastFetch = lastFetch; 135 } 136 137 public void update( CascadingStats.Status parentStatus, Kind kind, TaskReport taskReport, long lastFetch ) 138 { 139 this.parentStatus = parentStatus; 140 this.kind = kind; 141 this.taskReport = taskReport; 142 this.lastFetch = lastFetch; 143 this.counters = null; // force recalc of counters 144 } 145 146 @Override 147 public String getID() 148 { 149 return id; 150 } 151 152 @Override 153 public Kind getKind() 154 { 155 return kind; 156 } 157 158 @Override 159 public String getProcessSliceID() 160 { 161 return taskReport.getTaskID().toString(); 162 } 163 164 public int getTaskIDNum() 165 { 166 return taskReport.getTaskID().getId(); 167 } 168 169 @Override 170 public String getProcessNodeID() 171 { 172 return null; // map and reduce side of the job don't have ids, like a tez vertex does 173 } 174 175 @Override 176 public String getProcessStepID() 177 { 178 return taskReport.getTaskID().getJobID().toString(); 179 } 180 181 protected TaskReport getTaskReport() 182 { 183 return taskReport; 184 } 185 186 public float getProgress() 187 { 188 return taskReport.getProgress(); 189 } 190 191 @Override 192 public String getProcessStatus() 193 { 194 return taskReport.getState(); 195 } 196 197 @Override 198 public float getProcessProgress() 199 { 200 return taskReport.getProgress(); 201 } 202 203 @Override 204 public long getProcessStartTime() 205 { 206 return taskReport.getStartTime(); 207 } 208 209 @Override 210 public long getProcessFinishTime() 211 { 212 return taskReport.getFinishTime(); 213 } 214 215 public CascadingStats.Status getParentStatus() 216 { 217 return parentStatus; 218 } 219 220 @Override 221 public CascadingStats.Status getStatus() 222 { 223 CascadingStats.Status status = null; 224 225 switch( taskReport.getCurrentStatus() ) 226 { 227 case PENDING: 228 status = PENDING; 229 break; 230 case RUNNING: 231 status = RUNNING; 232 break; 233 case COMPLETE: 234 status = SUCCESSFUL; 235 break; 236 case KILLED: 237 status = STOPPED; 238 break; 239 case FAILED: 240 status = FAILED; 241 break; 242 } 243 244 return status; 245 } 246 247 @Override 248 public String[] getDiagnostics() 249 { 250 return taskReport.getDiagnostics(); 251 } 252 253 @Override 254 public Map<String, Map<String, Long>> getCounters() 255 { 256 if( counters == null ) 257 setCounters( taskReport ); 258 259 return counters; 260 } 261 262 @Override 263 public Map<Integer, FlowSliceAttempt> getAttempts() 264 { 265 return attempts; 266 } 267 268 private void setCounters( TaskReport taskReport ) 269 { 270 this.counters = new HashMap<>(); 271 272 Counters hadoopCounters = taskReport.getTaskCounters(); 273 274 for( CounterGroup group : hadoopCounters ) 275 { 276 Map<String, Long> values = new HashMap<String, Long>(); 277 278 this.counters.put( group.getName(), values ); 279 280 for( Counter counter : group ) 281 values.put( counter.getName(), counter.getValue() ); 282 } 283 } 284 285 public void setLastFetch( long lastFetch ) 286 { 287 this.lastFetch = lastFetch; 288 } 289 290 @Override 291 public long getLastSuccessfulCounterFetchTime() 292 { 293 return lastFetch; 294 } 295 296 @Override 297 public Collection<String> getCounterGroups() 298 { 299 return getCounters().keySet(); 300 } 301 302 @Override 303 public Collection<String> getCountersFor( String group ) 304 { 305 return getCounters().get( group ).keySet(); 306 } 307 308 @Override 309 public Collection<String> getCountersFor( Class<? extends Enum> group ) 310 { 311 return getCountersFor( group.getDeclaringClass().getName() ); 312 } 313 314 @Override 315 public long getCounterValue( Enum counter ) 316 { 317 return getCounterValue( counter.getDeclaringClass().getName(), counter.name() ); 318 } 319 320 @Override 321 public long getCounterValue( String group, String name ) 322 { 323 if( getCounters() == null || getCounters().get( group ) == null ) 324 return 0; 325 326 Long value = getCounters().get( group ).get( name ); 327 328 if( value == null ) 329 return 0; 330 331 return value; 332 } 333 334 public void addAttempt( TaskCompletionEvent event ) 335 { 336 attempts.put( event.getEventId(), new HadoopAttempt( event ) ); 337 } 338 339 @Override 340 public String toString() 341 { 342 final StringBuilder sb = new StringBuilder(); 343 sb.append( "HadoopSliceStats" ); 344 sb.append( "{id='" ).append( id ).append( '\'' ); 345 sb.append( ", kind=" ).append( kind ); 346 sb.append( '}' ); 347 return sb.toString(); 348 } 349 }