001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.Map; 026 027import cascading.stats.CascadingStats; 028import cascading.stats.FlowSliceStats; 029import cascading.stats.ProvidesCounters; 030import org.apache.hadoop.mapred.Counters; 031import org.apache.hadoop.mapred.TaskCompletionEvent; 032import org.apache.hadoop.mapred.TaskReport; 033 034import static cascading.stats.CascadingStats.Status.*; 035 036/** Class HadoopTaskStats tracks individual task stats. */ 037public class HadoopSliceStats extends FlowSliceStats<HadoopSliceStats.Kind> implements ProvidesCounters 038 { 039 private final CascadingStats.Status parentStatus; 040 041 public static class HadoopAttempt extends FlowSliceAttempt 042 { 043 private final TaskCompletionEvent event; 044 045 public HadoopAttempt( TaskCompletionEvent event ) 046 { 047 this.event = event; 048 } 049 050 @Override 051 public String getProcessAttemptID() 052 { 053 return event.getTaskAttemptId().toString(); 054 } 055 056 @Override 057 public int getEventId() 058 { 059 return event.getEventId(); 060 } 061 062 @Override 063 public int getProcessDuration() 064 { 065 return event.getTaskRunTime(); 066 } 067 068 @Override 069 public String getProcessStatus() 070 { 071 return event.getTaskStatus().toString(); 072 } 073 074 @Override 075 public String getStatusURL() 076 { 077 return event.getTaskTrackerHttp(); 078 } 079 080 @Override 081 public CascadingStats.Status getStatus() 082 { 083 CascadingStats.Status status = null; 084 085 switch( event.getTaskStatus() ) 086 { 087 case FAILED: 088 status = FAILED; 089 break; 090 case KILLED: 091 status = STOPPED; 092 break; 093 case SUCCEEDED: 094 status = SUCCESSFUL; 095 break; 096 case OBSOLETE: 097 status = SKIPPED; 098 break; 099 case TIPFAILED: 100 status = FAILED; 101 break; 102 } 103 return status; 104 } 105 } 106 107 public enum Kind 108 { 109 SETUP, MAPPER, REDUCER, CLEANUP 110 } 111 112 private String id; 113 private Kind kind; 114 private TaskReport taskReport; 115 private Map<String, Map<String, Long>> counters; 116 private long lastFetch = -1; 117 118 private Map<Integer, FlowSliceAttempt> attempts = new HashMap<>(); 119 120 HadoopSliceStats( String id, CascadingStats.Status parentStatus, Kind kind, TaskReport taskReport, long lastFetch ) 121 { 122 this.parentStatus = parentStatus; 123 this.id = id; 124 this.kind = kind; 125 this.taskReport = taskReport; 126 this.lastFetch = lastFetch; 127 } 128 129 @Override 130 public String getID() 131 { 132 return id; 133 } 134 135 @Override 136 public Kind getKind() 137 { 138 return kind; 139 } 140 141 @Override 142 public String getProcessSliceID() 143 { 144 return taskReport.getTaskID().toString(); 145 } 146 147 public int getTaskIDNum() 148 { 149 return taskReport.getTaskID().getId(); 150 } 151 152 @Override 153 public String getProcessStepID() 154 { 155 return taskReport.getTaskID().getJobID().toString(); 156 } 157 158 protected TaskReport getTaskReport() 159 { 160 return taskReport; 161 } 162 163 @Override 164 public String getProcessStatus() 165 { 166 return taskReport.getState(); 167 } 168 169 @Override 170 public float getProcessProgress() 171 { 172 return taskReport.getProgress(); 173 } 174 175 @Override 176 public long getProcessStartTime() 177 { 178 return taskReport.getStartTime(); 179 } 180 181 @Override 182 public long getProcessFinishTime() 183 { 184 return taskReport.getFinishTime(); 185 } 186 187 public CascadingStats.Status getParentStatus() 188 { 189 return parentStatus; 190 } 191 192 @Override 193 public CascadingStats.Status getStatus() 194 { 195 CascadingStats.Status status = null; 196 197 switch( taskReport.getCurrentStatus() ) 198 { 199 case PENDING: 200 status = PENDING; 201 break; 202 case RUNNING: 203 status = RUNNING; 204 break; 205 case COMPLETE: 206 status = SUCCESSFUL; 207 break; 208 case KILLED: 209 status = STOPPED; 210 break; 211 case FAILED: 212 status = FAILED; 213 break; 214 } 215 216 return status; 217 } 218 219 @Override 220 public String[] getDiagnostics() 221 { 222 return taskReport.getDiagnostics(); 223 } 224 225 @Override 226 public Map<String, Map<String, Long>> getCounters() 227 { 228 if( counters == null ) 229 setCounters( taskReport ); 230 231 return counters; 232 } 233 234 @Override 235 public Map<Integer, FlowSliceAttempt> getAttempts() 236 { 237 return attempts; 238 } 239 240 private void setCounters( TaskReport taskReport ) 241 { 242 this.counters = new HashMap<>(); 243 244 Counters hadoopCounters = taskReport.getCounters(); 245 246 for( Counters.Group group : hadoopCounters ) 247 { 248 Map<String, Long> values = new HashMap<String, Long>(); 249 250 this.counters.put( group.getName(), values ); 251 252 for( Counters.Counter counter : group ) 253 values.put( counter.getName(), counter.getCounter() ); 254 } 255 } 256 257 public void setLastFetch( long lastFetch ) 258 { 259 this.lastFetch = lastFetch; 260 } 261 262 @Override 263 public long getLastSuccessfulCounterFetchTime() 264 { 265 return lastFetch; 266 } 267 268 @Override 269 public Collection<String> getCounterGroups() 270 { 271 return getCounters().keySet(); 272 } 273 274 @Override 275 public Collection<String> getCountersFor( String group ) 276 { 277 return getCounters().get( group ).keySet(); 278 } 279 280 @Override 281 public Collection<String> getCountersFor( Class<? extends Enum> group ) 282 { 283 return getCountersFor( group.getDeclaringClass().getName() ); 284 } 285 286 @Override 287 public long getCounterValue( Enum counter ) 288 { 289 return getCounterValue( counter.getDeclaringClass().getName(), counter.name() ); 290 } 291 292 @Override 293 public long getCounterValue( String group, String name ) 294 { 295 if( getCounters() == null || getCounters().get( group ) == null ) 296 return 0; 297 298 Long value = getCounters().get( group ).get( name ); 299 300 if( value == null ) 301 return 0; 302 303 return value; 304 } 305 306 public void addAttempt( TaskCompletionEvent event ) 307 { 308 attempts.put( event.getEventId(), new HadoopAttempt( event ) ); 309 } 310 311 @Override 312 public String toString() 313 { 314 final StringBuilder sb = new StringBuilder(); 315 sb.append( "HadoopSliceStats" ); 316 sb.append( "{id='" ).append( id ).append( '\'' ); 317 sb.append( ", kind=" ).append( kind ); 318 sb.append( '}' ); 319 return sb.toString(); 320 } 321 }