001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.tez; 022 023import java.io.IOException; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import javax.annotation.Nullable; 030 031import cascading.CascadingException; 032import cascading.flow.FlowNode; 033import cascading.flow.hadoop.util.HadoopUtil; 034import cascading.flow.stream.annotations.StreamMode; 035import cascading.management.state.ClientState; 036import cascading.property.PropertyUtil; 037import cascading.stats.FlowSliceStats; 038import cascading.stats.hadoop.BaseHadoopNodeStats; 039import cascading.stats.tez.util.TaskStatus; 040import cascading.stats.tez.util.TezStatsUtil; 041import cascading.stats.tez.util.TimelineClient; 042import cascading.tap.Tap; 043import cascading.util.Util; 044import org.apache.hadoop.conf.Configuration; 045import org.apache.hadoop.yarn.conf.YarnConfiguration; 046import org.apache.tez.common.counters.TezCounters; 047import org.apache.tez.dag.api.TezException; 048import org.apache.tez.dag.api.client.DAGClient; 049import org.apache.tez.dag.api.client.Progress; 050import org.apache.tez.dag.api.client.StatusGetOpts; 051import org.apache.tez.dag.api.client.VertexStatus; 052import org.apache.tez.dag.api.oldrecords.TaskState; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056import static cascading.stats.tez.util.TezStatsUtil.STATUS_GET_COUNTERS; 057import static cascading.util.Util.formatDurationFromMillis; 058import static cascading.util.Util.isEmpty; 059 060/** 061 * 062 */ 063public class TezNodeStats extends BaseHadoopNodeStats<DAGClient, TezCounters> 064 { 065 private static final Logger LOG = LoggerFactory.getLogger( TezNodeStats.class ); 066 067 /** 068 * Sets the fetch limit from the timeline server. May be set as a System property. 069 */ 070 public static final String TIMELINE_FETCH_LIMIT = "cascading.stats.timeline.fetch.limit"; 071 public static final int DEFAULT_FETCH_LIMIT = 500; 072 073 private static int fetchLimit = -1; 074 075 public enum Kind 076 { 077 SPLIT, PARTITIONED 078 } 079 080 private TezStepStats parentStepStats; 081 private Kind kind; 082 083 private String vertexID; 084 private int totalTaskCount; 085 private int succeededTaskCount; 086 private int failedTaskCount; 087 private int killedTaskCount; 088 private int runningTaskCount; 089 090 private static void setFetchLimit( Configuration configuration ) 091 { 092 if( fetchLimit > -1 ) 093 return; 094 095 fetchLimit = PropertyUtil.getIntProperty( HadoopUtil.createProperties( configuration ), TIMELINE_FETCH_LIMIT, DEFAULT_FETCH_LIMIT ); 096 097 if( fetchLimit < 2 ) 098 { 099 LOG.warn( "property: {}, was set to: {}, may not be less than 2, setting to 2", TIMELINE_FETCH_LIMIT, fetchLimit ); 100 fetchLimit = 2; 101 } 102 } 103 104 protected TezNodeStats( final TezStepStats parentStepStats, FlowNode flowNode, ClientState clientState, Configuration configuration ) 105 { 106 super( flowNode, clientState ); 107 108 setFetchLimit( configuration ); 109 110 this.parentStepStats = parentStepStats; 111 this.kind = getStreamedTaps( flowNode ).isEmpty() ? Kind.PARTITIONED : Kind.SPLIT; 112 113 this.counterCache = new TezCounterCache<DAGClient>( this, configuration ) 114 { 115 @Override 116 protected DAGClient getJobStatusClient() 117 { 118 return parentStepStats.getJobStatusClient(); 119 } 120 121 protected TezCounters getCounters( DAGClient dagClient ) throws IOException 122 { 123 VertexStatus vertexStatus = updateProgress( dagClient, STATUS_GET_COUNTERS ); 124 125 if( vertexStatus == null ) 126 return null; 127 128 TezCounters vertexCounters = vertexStatus.getVertexCounters(); 129 130 if( vertexCounters == null ) 131 logWarn( "could not retrieve vertex counters in stats status: {}, and vertex state: {}", getStatus(), vertexStatus.getState() ); 132 133 return vertexCounters; 134 } 135 }; 136 } 137 138 /** 139 * Current rule sets do not guarantee setting Streamed annotation, but do for Accumulated 140 */ 141 private Set<Tap> getStreamedTaps( FlowNode flowNode ) 142 { 143 Set<Tap> taps = new HashSet<>( flowNode.getSourceTaps() ); 144 145 taps.remove( flowNode.getSourceElements( StreamMode.Accumulated ) ); 146 147 return taps; 148 } 149 150 @Override 151 public String getKind() 152 { 153 if( kind == null ) 154 return null; 155 156 return kind.name(); 157 } 158 159 private String retrieveVertexID( DAGClient dagClient ) 160 { 161 if( vertexID != null || !( dagClient instanceof TimelineClient ) ) 162 return vertexID; 163 164 try 165 { 166 vertexID = ( (TimelineClient) dagClient ).getVertexID( getID() ); 167 } 168 catch( IOException | CascadingException | TezException exception ) 169 { 170 logWarn( "unable to get vertex id", exception ); 171 } 172 173 return vertexID; 174 } 175 176 public int getTotalTaskCount() 177 { 178 return totalTaskCount; 179 } 180 181 public int getSucceededTaskCount() 182 { 183 return succeededTaskCount; 184 } 185 186 public int getFailedTaskCount() 187 { 188 return failedTaskCount; 189 } 190 191 public int getKilledTaskCount() 192 { 193 return killedTaskCount; 194 } 195 196 public int getRunningTaskCount() 197 { 198 return runningTaskCount; 199 } 200 201 @Override 202 protected boolean captureChildDetailInternal() 203 { 204 if( allChildrenFinished ) 205 return true; 206 207 DAGClient dagClient = parentStepStats.getJobStatusClient(); 208 209 if( dagClient == null ) 210 return false; 211 212 // we cannot get task counters without the timeline server running 213 if( dagClient instanceof TimelineClient ) 214 return withTimelineServer( (TimelineClient) dagClient ); 215 216 // these are just placeholders without counters, otherwise the order would be reversed as a failover mechanism 217 return withoutTimelineServer( dagClient ); 218 } 219 220 private boolean withTimelineServer( TimelineClient timelineClient ) 221 { 222 updateProgress( (DAGClient) timelineClient, null ); // get latest task counts 223 224 if( getTotalTaskCount() == 0 ) // nothing to do - try again later 225 return false; 226 227 if( sliceStatsMap.size() == getTotalTaskCount() ) 228 return updateAllTasks( timelineClient ); 229 230 return fetchAllTasks( timelineClient ); 231 } 232 233 private boolean updateAllTasks( TimelineClient timelineClient ) 234 { 235 if( allChildrenFinished ) 236 return true; 237 238 long startTime = System.currentTimeMillis(); 239 240 int count = 0; 241 242 for( FlowSliceStats sliceStats : sliceStatsMap.values() ) 243 { 244 if( sliceStats.getStatus().isFinished() ) 245 continue; 246 247 TaskStatus taskStatus = getTaskStatusFor( timelineClient, sliceStats.getProcessSliceID() ); 248 249 updateSliceWith( (TezSliceStats) sliceStats, taskStatus, System.currentTimeMillis() ); 250 251 count++; 252 } 253 254 if( count == 0 ) 255 allChildrenFinished = true; 256 257 logInfo( "updated {} slices in: {}", count, formatDurationFromMillis( System.currentTimeMillis() - startTime ) ); 258 259 return sliceStatsMap.size() == getTotalTaskCount(); 260 } 261 262 private boolean fetchAllTasks( TimelineClient timelineClient ) 263 { 264 long startTime = System.currentTimeMillis(); 265 String fromTaskId = null; 266 int startSize = sliceStatsMap.size(); 267 int iteration = 0; 268 boolean continueIterating = true; 269 boolean retrievedAreFinished = true; 270 271 while( continueIterating && sliceStatsMap.size() != getTotalTaskCount() ) 272 { 273 long lastFetch = System.currentTimeMillis(); 274 275 // we will see the same tasks twice as we paginate 276 Iterator<TaskStatus> vertexChildren = getTaskStatusIterator( timelineClient, fromTaskId ); 277 278 if( vertexChildren == null ) 279 return false; 280 281 int added = 0; 282 int updated = 0; 283 284 while( vertexChildren.hasNext() ) 285 { 286 TaskStatus taskStatus = vertexChildren.next(); 287 288 fromTaskId = taskStatus.getTaskID(); 289 290 TezSliceStats sliceStats = (TezSliceStats) sliceStatsMap.get( fromTaskId ); 291 292 if( sliceStats == null ) 293 { 294 added++; 295 296 sliceStats = new TezSliceStats( Util.createUniqueID(), kind, this.getStatus(), fromTaskId ); 297 298 sliceStatsMap.put( sliceStats.getProcessSliceID(), sliceStats ); 299 } 300 else 301 { 302 updated++; 303 } 304 305 updateSliceWith( sliceStats, taskStatus, lastFetch ); 306 307 if( !sliceStats.getStatus().isFinished() ) 308 retrievedAreFinished = false; 309 } 310 311 int retrieved = added + updated; 312 313 if( added == 0 && updated == 1 ) // if paginating, will have at least retrieved 1 task 314 continueIterating = false; 315 else 316 continueIterating = retrieved != 0; 317 318 if( continueIterating ) 319 logInfo( "iteration retrieved: {}, added {}, updated {} slices in iteration: {}, fetch limit: {}", retrieved, added, updated, ++iteration, fetchLimit ); 320 } 321 322 int total = sliceStatsMap.size(); 323 int added = total - startSize; 324 int remaining = getTotalTaskCount() - total; 325 String duration = formatDurationFromMillis( System.currentTimeMillis() - startTime ); 326 327 if( total == getTotalTaskCount() && retrievedAreFinished ) 328 allChildrenFinished = true; 329 330 if( iteration == 0 && total == 0 ) 331 logInfo( "no slices stats available yet, expecting: {}", remaining ); 332 else 333 logInfo( "added {} slices, in iterations: {}, with duration: {}, total fetched: {}, remaining: {}", added, iteration, duration, total, remaining ); 334 335 return total == getTotalTaskCount(); 336 } 337 338 private void updateSliceWith( TezSliceStats sliceStats, TaskStatus taskStatus, long lastFetch ) 339 { 340 if( taskStatus == null ) 341 return; 342 343 sliceStats.setStatus( getStatusForTaskStatus( taskStatus.getStatus() ) ); // ignores nulls 344 sliceStats.setSubmitTime( taskStatus.getScheduledTime() ); 345 sliceStats.setStartTime( taskStatus.getStartTime() ); 346 sliceStats.setFinishTime( taskStatus.getEndTime() ); 347 sliceStats.setDiagnostics( taskStatus.getDiagnostics() ); 348 sliceStats.setSuccessfulAttemptID( taskStatus.getSuccessfulAttemptID() ); 349 350 Map<String, Map<String, Long>> counters = taskStatus.getCounters(); 351 352 sliceStats.setCounters( counters ); // ignores nulls 353 354 if( counters != null ) 355 sliceStats.setLastFetch( lastFetch ); 356 } 357 358 private TaskStatus getTaskStatusFor( TimelineClient timelineClient, String taskID ) 359 { 360 try 361 { 362 return timelineClient.getVertexChild( taskID ); 363 } 364 catch( TezException exception ) 365 { 366 logWarn( "unable to get slice stat from timeline server for task id: {}", taskID, exception ); 367 } 368 369 return null; 370 } 371 372 private Iterator<TaskStatus> getTaskStatusIterator( TimelineClient timelineClient, String startTaskID ) 373 { 374 try 375 { 376 String vertexID = retrieveVertexID( (DAGClient) timelineClient ); 377 378 if( vertexID == null ) 379 { 380 logWarn( "unable to get slice stats from timeline server, did not retrieve valid vertex id for vertex name: {}", getID() ); 381 return null; 382 } 383 384 return timelineClient.getVertexChildren( vertexID, fetchLimit, startTaskID ); 385 } 386 catch( IOException | CascadingException | TezException exception ) 387 { 388 logWarn( "unable to get slice stats from timeline server", exception ); 389 } 390 391 return null; 392 } 393 394 private boolean withoutTimelineServer( DAGClient dagClient ) 395 { 396 VertexStatus vertexStatus = updateProgress( dagClient, STATUS_GET_COUNTERS ); 397 398 if( vertexStatus == null || getTotalTaskCount() == 0 ) 399 return false; 400 401 int total = sliceStatsMap.size(); 402 403 if( total == 0 ) // yet to be initialized 404 logWarn( "'{}' is disabled, or running an incompatible Tez version: {}, task level counters cannot be retrieved", YarnConfiguration.TIMELINE_SERVICE_ENABLED, TezStatsUtil.getPlatformVersion() ); 405 406 for( int i = total; i < totalTaskCount; i++ ) 407 { 408 TezSliceStats sliceStats = new TezSliceStats( Util.createUniqueID(), kind, this.getStatus(), null ); 409 410 // we don't have the taskId, so we are using the id as the key 411 sliceStatsMap.put( sliceStats.getID(), sliceStats ); 412 } 413 414 // a placeholder to simulate actual slice stats for now 415 Iterator<FlowSliceStats> iterator = sliceStatsMap.values().iterator(); 416 417 for( int i = 0; i < runningTaskCount && iterator.hasNext(); i++ ) 418 ( (TezSliceStats) iterator.next() ).setStatus( Status.RUNNING ); 419 420 for( int i = 0; i < succeededTaskCount && iterator.hasNext(); i++ ) 421 ( (TezSliceStats) iterator.next() ).setStatus( Status.SUCCESSFUL ); 422 423 for( int i = 0; i < failedTaskCount && iterator.hasNext(); i++ ) 424 ( (TezSliceStats) iterator.next() ).setStatus( Status.FAILED ); 425 426 for( int i = 0; i < killedTaskCount && iterator.hasNext(); i++ ) 427 ( (TezSliceStats) iterator.next() ).setStatus( Status.STOPPED ); 428 429 List<String> diagnostics = vertexStatus.getDiagnostics(); 430 431 for( String diagnostic : diagnostics ) 432 logInfo( "vertex diagnostics: {}", diagnostic ); 433 434 int finishedTaskCount = succeededTaskCount + failedTaskCount + killedTaskCount; 435 436 allChildrenFinished = totalTaskCount == finishedTaskCount; 437 438 return true; 439 } 440 441 private Status getStatusForTaskStatus( @Nullable String status ) 442 { 443 if( isEmpty( status ) ) 444 return null; 445 446 TaskState state = TaskState.valueOf( status ); 447 448 switch( state ) 449 { 450 case NEW: 451 return Status.PENDING; 452 case SCHEDULED: 453 return Status.SUBMITTED; 454 case RUNNING: 455 return Status.RUNNING; 456 case SUCCEEDED: 457 return Status.SUCCESSFUL; 458 case FAILED: 459 return Status.FAILED; 460 case KILLED: 461 return Status.STOPPED; 462 } 463 464 return null; 465 } 466 467 private VertexStatus updateProgress( DAGClient dagClient, Set<StatusGetOpts> statusGetOpts ) 468 { 469 VertexStatus vertexStatus = null; 470 471 try 472 { 473 vertexStatus = dagClient.getVertexStatus( getID(), statusGetOpts ); 474 } 475 catch( IOException | TezException exception ) 476 { 477 logWarn( "unable to get vertex status for: {}", getID(), exception ); 478 } 479 480 if( vertexStatus == null ) 481 return null; 482 483 Progress progress = vertexStatus.getProgress(); 484 485 totalTaskCount = progress.getTotalTaskCount(); 486 runningTaskCount = progress.getRunningTaskCount(); 487 succeededTaskCount = progress.getSucceededTaskCount(); 488 failedTaskCount = progress.getFailedTaskCount(); 489 killedTaskCount = progress.getKilledTaskCount(); 490 491 return vertexStatus; 492 } 493 }