001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.Map; 026 027import cascading.stats.CascadingStats; 028import cascading.stats.FlowSliceStats; 029import cascading.stats.ProvidesCounters; 030import cascading.util.Util; 031import org.apache.hadoop.mapred.Counters; 032import org.apache.hadoop.mapred.TaskCompletionEvent; 033import org.apache.hadoop.mapred.TaskReport; 034 035import static cascading.stats.CascadingStats.Status.*; 036 037/** Class HadoopTaskStats tracks individual task stats. */ 038public class HadoopSliceStats extends FlowSliceStats<HadoopSliceStats.Kind> implements ProvidesCounters 039 { 040 private final CascadingStats.Status parentStatus; 041 042 public static class HadoopAttempt extends FlowSliceAttempt 043 { 044 private final TaskCompletionEvent event; 045 046 public HadoopAttempt( TaskCompletionEvent event ) 047 { 048 this.event = event; 049 } 050 051 @Override 052 public String getProcessAttemptID() 053 { 054 return event.getTaskAttemptId().toString(); 055 } 056 057 @Override 058 public int getEventId() 059 { 060 return event.getEventId(); 061 } 062 063 @Override 064 public int getProcessDuration() 065 { 066 return event.getTaskRunTime(); 067 } 068 069 @Override 070 public String getProcessStatus() 071 { 072 return event.getTaskStatus().toString(); 073 } 074 075 @Override 076 public String getStatusURL() 077 { 078 return event.getTaskTrackerHttp(); 079 } 080 081 @Override 082 public CascadingStats.Status getStatus() 083 { 084 CascadingStats.Status status = null; 085 086 switch( event.getTaskStatus() ) 087 { 088 case FAILED: 089 status = FAILED; 090 break; 091 case KILLED: 092 status = STOPPED; 093 break; 094 case SUCCEEDED: 095 status = SUCCESSFUL; 096 break; 097 case OBSOLETE: 098 status = SKIPPED; 099 break; 100 case TIPFAILED: 101 status = FAILED; 102 break; 103 } 104 return status; 105 } 106 107 @Override 108 public String getProcessHostname() 109 { 110 return Util.parseHostname( event.getTaskTrackerHttp() ); 111 } 112 } 113 114 public enum Kind 115 { 116 SETUP, MAPPER, REDUCER, CLEANUP 117 } 118 119 private String id; 120 private Kind kind; 121 private TaskReport taskReport; 122 private Map<String, Map<String, Long>> counters; 123 private long lastFetch = -1; 124 125 private Map<Integer, FlowSliceAttempt> attempts = new HashMap<>(); 126 127 HadoopSliceStats( String id, CascadingStats.Status parentStatus, Kind kind, TaskReport taskReport, long lastFetch ) 128 { 129 this.parentStatus = parentStatus; 130 this.id = id; 131 this.kind = kind; 132 this.taskReport = taskReport; 133 this.lastFetch = lastFetch; 134 } 135 136 @Override 137 public String getID() 138 { 139 return id; 140 } 141 142 @Override 143 public Kind getKind() 144 { 145 return kind; 146 } 147 148 @Override 149 public String getProcessSliceID() 150 { 151 return taskReport.getTaskID().toString(); 152 } 153 154 public int getTaskIDNum() 155 { 156 return taskReport.getTaskID().getId(); 157 } 158 159 @Override 160 public String getProcessNodeID() 161 { 162 return null; 163 } 164 165 @Override 166 public String getProcessStepID() 167 { 168 return taskReport.getTaskID().getJobID().toString(); 169 } 170 171 protected TaskReport getTaskReport() 172 { 173 return taskReport; 174 } 175 176 @Override 177 public String getProcessStatus() 178 { 179 return taskReport.getState(); 180 } 181 182 @Override 183 public float getProcessProgress() 184 { 185 return taskReport.getProgress(); 186 } 187 188 @Override 189 public long getProcessStartTime() 190 { 191 return taskReport.getStartTime(); 192 } 193 194 @Override 195 public long getProcessFinishTime() 196 { 197 return taskReport.getFinishTime(); 198 } 199 200 public CascadingStats.Status getParentStatus() 201 { 202 return parentStatus; 203 } 204 205 @Override 206 public CascadingStats.Status getStatus() 207 { 208 CascadingStats.Status status = null; 209 210 switch( taskReport.getCurrentStatus() ) 211 { 212 case PENDING: 213 status = PENDING; 214 break; 215 case RUNNING: 216 status = RUNNING; 217 break; 218 case COMPLETE: 219 status = SUCCESSFUL; 220 break; 221 case KILLED: 222 status = STOPPED; 223 break; 224 case FAILED: 225 status = FAILED; 226 break; 227 } 228 229 return status; 230 } 231 232 @Override 233 public String[] getDiagnostics() 234 { 235 return taskReport.getDiagnostics(); 236 } 237 238 @Override 239 public Map<String, Map<String, Long>> getCounters() 240 { 241 if( counters == null ) 242 setCounters( taskReport ); 243 244 return counters; 245 } 246 247 @Override 248 public Map<Integer, FlowSliceAttempt> getAttempts() 249 { 250 return attempts; 251 } 252 253 private void setCounters( TaskReport taskReport ) 254 { 255 this.counters = new HashMap<>(); 256 257 Counters hadoopCounters = taskReport.getCounters(); 258 259 for( Counters.Group group : hadoopCounters ) 260 { 261 Map<String, Long> values = new HashMap<String, Long>(); 262 263 this.counters.put( group.getName(), values ); 264 265 for( Counters.Counter counter : group ) 266 values.put( counter.getName(), counter.getCounter() ); 267 } 268 } 269 270 public void setLastFetch( long lastFetch ) 271 { 272 this.lastFetch = lastFetch; 273 } 274 275 @Override 276 public long getLastSuccessfulCounterFetchTime() 277 { 278 return lastFetch; 279 } 280 281 @Override 282 public Collection<String> getCounterGroups() 283 { 284 return getCounters().keySet(); 285 } 286 287 @Override 288 public Collection<String> getCountersFor( String group ) 289 { 290 return getCounters().get( group ).keySet(); 291 } 292 293 @Override 294 public Collection<String> getCountersFor( Class<? extends Enum> group ) 295 { 296 return getCountersFor( group.getDeclaringClass().getName() ); 297 } 298 299 @Override 300 public long getCounterValue( Enum counter ) 301 { 302 return getCounterValue( counter.getDeclaringClass().getName(), counter.name() ); 303 } 304 305 @Override 306 public long getCounterValue( String group, String name ) 307 { 308 if( getCounters() == null || getCounters().get( group ) == null ) 309 return 0; 310 311 Long value = getCounters().get( group ).get( name ); 312 313 if( value == null ) 314 return 0; 315 316 return value; 317 } 318 319 public void addAttempt( TaskCompletionEvent event ) 320 { 321 attempts.put( event.getEventId(), new HadoopAttempt( event ) ); 322 } 323 324 @Override 325 public String toString() 326 { 327 final StringBuilder sb = new StringBuilder(); 328 sb.append( "HadoopSliceStats" ); 329 sb.append( "{id='" ).append( id ).append( '\'' ); 330 sb.append( ", kind=" ).append( kind ); 331 sb.append( '}' ); 332 return sb.toString(); 333 } 334 }