001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.tez; 022 023import java.io.IOException; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import javax.annotation.Nullable; 030 031import cascading.CascadingException; 032import cascading.flow.FlowNode; 033import cascading.flow.hadoop.util.HadoopUtil; 034import cascading.flow.stream.annotations.StreamMode; 035import cascading.management.state.ClientState; 036import cascading.property.PropertyUtil; 037import cascading.stats.BaseCachedNodeStats; 038import cascading.stats.BaseCachedStepStats; 039import cascading.stats.FlowSliceStats; 040import cascading.stats.tez.util.TaskStatus; 041import cascading.stats.tez.util.TezStatsUtil; 042import cascading.stats.tez.util.TimelineClient; 043import cascading.tap.Tap; 044import cascading.util.Util; 045import org.apache.hadoop.conf.Configuration; 046import org.apache.hadoop.yarn.conf.YarnConfiguration; 047import org.apache.tez.common.counters.TezCounters; 048import org.apache.tez.dag.api.TezException; 049import org.apache.tez.dag.api.client.DAGClient; 050import org.apache.tez.dag.api.client.Progress; 051import org.apache.tez.dag.api.client.StatusGetOpts; 052import org.apache.tez.dag.api.client.VertexStatus; 053import org.apache.tez.dag.api.oldrecords.TaskState; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057import static cascading.stats.tez.util.TezStatsUtil.STATUS_GET_COUNTERS; 058import static cascading.util.Util.formatDurationFromMillis; 059import static cascading.util.Util.isEmpty; 060 061/** 062 * 063 */ 064public class TezNodeStats extends BaseCachedNodeStats<Configuration, DAGClient, TezCounters> 065 { 066 private static final Logger LOG = LoggerFactory.getLogger( TezNodeStats.class ); 067 068 /** 069 * Sets the fetch limit from the timeline server. May be set as a System property. 070 */ 071 public static final String TIMELINE_FETCH_LIMIT = "cascading.stats.timeline.fetch.limit"; 072 public static final int DEFAULT_FETCH_LIMIT = 500; 073 074 private static int fetchLimit = -1; 075 076 public enum Kind 077 { 078 SPLIT, PARTITIONED, UNKNOWN 079 } 080 081 private BaseCachedStepStats<Configuration, DAGClient, TezCounters> parentStepStats; 082 private Kind kind; 083 084 private String vertexID; 085 private int totalTaskCount; 086 private int succeededTaskCount; 087 private int failedTaskCount; 088 private int killedTaskCount; 089 private int runningTaskCount; 090 091 private static void setFetchLimit( Configuration configuration ) 092 { 093 if( fetchLimit > -1 ) 094 return; 095 096 fetchLimit = PropertyUtil.getIntProperty( HadoopUtil.createProperties( configuration ), TIMELINE_FETCH_LIMIT, DEFAULT_FETCH_LIMIT ); 097 098 if( fetchLimit < 2 ) 099 { 100 LOG.warn( "property: {}, was set to: {}, may not be less than 2, setting to 2", TIMELINE_FETCH_LIMIT, fetchLimit ); 101 fetchLimit = 2; 102 } 103 } 104 105 protected TezNodeStats( final BaseCachedStepStats<Configuration, DAGClient, TezCounters> parentStepStats, FlowNode flowNode, ClientState clientState, Configuration configuration ) 106 { 107 super( flowNode, clientState ); 108 109 setFetchLimit( configuration ); 110 111 this.parentStepStats = parentStepStats; 112 this.kind = getStreamedTaps( flowNode ).isEmpty() ? Kind.PARTITIONED : Kind.SPLIT; 113 114 this.counterCache = new TezCounterCache<DAGClient>( this, configuration ) 115 { 116 @Override 117 protected DAGClient getJobStatusClient() 118 { 119 return parentStepStats.getJobStatusClient(); 120 } 121 122 protected TezCounters getCounters( DAGClient dagClient ) throws IOException 123 { 124 VertexStatus vertexStatus = updateProgress( dagClient, STATUS_GET_COUNTERS ); 125 126 if( vertexStatus == null ) 127 return null; 128 129 TezCounters vertexCounters = vertexStatus.getVertexCounters(); 130 131 if( vertexCounters == null ) 132 logWarn( "could not retrieve vertex counters in stats status: {}, and vertex state: {}", getStatus(), vertexStatus.getState() ); 133 134 return vertexCounters; 135 } 136 }; 137 } 138 139 /** 140 * Current rule sets do not guarantee setting Streamed annotation, but do for Accumulated 141 */ 142 private Set<Tap> getStreamedTaps( FlowNode flowNode ) 143 { 144 Set<Tap> taps = new HashSet<>( flowNode.getSourceTaps() ); 145 146 taps.remove( flowNode.getSourceElements( StreamMode.Accumulated ) ); 147 148 return taps; 149 } 150 151 @Override 152 public String getKind() 153 { 154 if( kind == null ) 155 return null; 156 157 return kind.name(); 158 } 159 160 private String retrieveVertexID( DAGClient dagClient ) 161 { 162 if( vertexID != null || !( dagClient instanceof TimelineClient ) ) 163 return vertexID; 164 165 try 166 { 167 vertexID = ( (TimelineClient) dagClient ).getVertexID( getID() ); 168 } 169 catch( IOException | CascadingException | TezException exception ) 170 { 171 logWarn( "unable to get vertex id", exception ); 172 } 173 174 return vertexID; 175 } 176 177 public int getTotalTaskCount() 178 { 179 return totalTaskCount; 180 } 181 182 public int getSucceededTaskCount() 183 { 184 return succeededTaskCount; 185 } 186 187 public int getFailedTaskCount() 188 { 189 return failedTaskCount; 190 } 191 192 public int getKilledTaskCount() 193 { 194 return killedTaskCount; 195 } 196 197 public int getRunningTaskCount() 198 { 199 return runningTaskCount; 200 } 201 202 @Override 203 protected boolean captureChildDetailInternal() 204 { 205 if( allChildrenFinished ) 206 return true; 207 208 DAGClient dagClient = parentStepStats.getJobStatusClient(); 209 210 if( dagClient == null ) 211 return false; 212 213 // we cannot get task counters without the timeline server running 214 if( dagClient instanceof TimelineClient ) 215 return withTimelineServer( (TimelineClient) dagClient ); 216 217 // these are just placeholders without counters, otherwise the order would be reversed as a failover mechanism 218 return withoutTimelineServer( dagClient ); 219 } 220 221 private boolean withTimelineServer( TimelineClient timelineClient ) 222 { 223 updateProgress( (DAGClient) timelineClient, null ); // get latest task counts 224 225 if( getTotalTaskCount() == 0 ) // nothing to do - try again later 226 return false; 227 228 if( sliceStatsMap.size() == getTotalTaskCount() ) 229 return updateAllTasks( timelineClient ); 230 231 return fetchAllTasks( timelineClient ); 232 } 233 234 private boolean updateAllTasks( TimelineClient timelineClient ) 235 { 236 if( allChildrenFinished ) 237 return true; 238 239 long startTime = System.currentTimeMillis(); 240 241 int count = 0; 242 243 for( FlowSliceStats sliceStats : sliceStatsMap.values() ) 244 { 245 if( sliceStats.getStatus().isFinished() ) 246 continue; 247 248 TaskStatus taskStatus = getTaskStatusFor( timelineClient, sliceStats.getProcessSliceID() ); 249 250 updateSliceWith( (TezSliceStats) sliceStats, taskStatus, System.currentTimeMillis() ); 251 252 count++; 253 } 254 255 if( count == 0 ) 256 allChildrenFinished = true; 257 258 logInfo( "updated {} slices in: {}", count, formatDurationFromMillis( System.currentTimeMillis() - startTime ) ); 259 260 return sliceStatsMap.size() == getTotalTaskCount(); 261 } 262 263 private boolean fetchAllTasks( TimelineClient timelineClient ) 264 { 265 long startTime = System.currentTimeMillis(); 266 String fromTaskId = null; 267 int startSize = sliceStatsMap.size(); 268 int iteration = 0; 269 boolean continueIterating = true; 270 boolean retrievedAreFinished = true; 271 272 while( continueIterating && sliceStatsMap.size() != getTotalTaskCount() ) 273 { 274 long lastFetch = System.currentTimeMillis(); 275 276 // we will see the same tasks twice as we paginate 277 Iterator<TaskStatus> vertexChildren = getTaskStatusIterator( timelineClient, fromTaskId ); 278 279 if( vertexChildren == null ) 280 return false; 281 282 int added = 0; 283 int updated = 0; 284 285 while( vertexChildren.hasNext() ) 286 { 287 TaskStatus taskStatus = vertexChildren.next(); 288 289 fromTaskId = taskStatus.getTaskID(); 290 291 TezSliceStats sliceStats = (TezSliceStats) sliceStatsMap.get( fromTaskId ); 292 293 if( sliceStats == null ) 294 { 295 added++; 296 297 sliceStats = new TezSliceStats( Util.createUniqueID(), kind, getStatus(), vertexID, fromTaskId ); 298 299 sliceStatsMap.put( sliceStats.getProcessSliceID(), sliceStats ); 300 } 301 else 302 { 303 updated++; 304 } 305 306 updateSliceWith( sliceStats, taskStatus, lastFetch ); 307 308 if( !sliceStats.getStatus().isFinished() ) 309 retrievedAreFinished = false; 310 } 311 312 int retrieved = added + updated; 313 314 if( added == 0 && updated == 1 ) // if paginating, will have at least retrieved 1 task 315 continueIterating = false; 316 else 317 continueIterating = retrieved != 0; 318 319 if( continueIterating ) 320 logInfo( "iteration retrieved: {}, added {}, updated {} slices in iteration: {}, fetch limit: {}", retrieved, added, updated, ++iteration, fetchLimit ); 321 } 322 323 int total = sliceStatsMap.size(); 324 int added = total - startSize; 325 int remaining = getTotalTaskCount() - total; 326 String duration = formatDurationFromMillis( System.currentTimeMillis() - startTime ); 327 328 if( total == getTotalTaskCount() && retrievedAreFinished ) 329 allChildrenFinished = true; 330 331 if( iteration == 0 && total == 0 ) 332 logInfo( "no slices stats available yet, expecting: {}", remaining ); 333 else 334 logInfo( "added {} slices, in iterations: {}, with duration: {}, total fetched: {}, remaining: {}", added, iteration, duration, total, remaining ); 335 336 return total == getTotalTaskCount(); 337 } 338 339 private void updateSliceWith( TezSliceStats sliceStats, TaskStatus taskStatus, long lastFetch ) 340 { 341 if( taskStatus == null ) 342 return; 343 344 sliceStats.setStatus( getStatusForTaskStatus( taskStatus.getStatus() ) ); // ignores nulls 345 sliceStats.setSubmitTime( taskStatus.getScheduledTime() ); 346 sliceStats.setStartTime( taskStatus.getStartTime() ); 347 sliceStats.setFinishTime( taskStatus.getEndTime() ); 348 sliceStats.setDiagnostics( taskStatus.getDiagnostics() ); 349 sliceStats.setSuccessfulAttemptID( taskStatus.getSuccessfulAttemptID() ); 350 351 Map<String, Map<String, Long>> counters = taskStatus.getCounters(); 352 353 sliceStats.setCounters( counters ); // ignores nulls 354 355 if( counters != null ) 356 sliceStats.setLastFetch( lastFetch ); 357 } 358 359 private TaskStatus getTaskStatusFor( TimelineClient timelineClient, String taskID ) 360 { 361 try 362 { 363 return timelineClient.getVertexChild( taskID ); 364 } 365 catch( TezException exception ) 366 { 367 logWarn( "unable to get slice stat from timeline server for task id: {}", taskID, exception ); 368 } 369 370 return null; 371 } 372 373 private Iterator<TaskStatus> getTaskStatusIterator( TimelineClient timelineClient, String startTaskID ) 374 { 375 try 376 { 377 String vertexID = retrieveVertexID( (DAGClient) timelineClient ); 378 379 if( vertexID == null ) 380 { 381 logWarn( "unable to get slice stats from timeline server, did not retrieve valid vertex id for vertex name: {}", getID() ); 382 return null; 383 } 384 385 return timelineClient.getVertexChildren( vertexID, fetchLimit, startTaskID ); 386 } 387 catch( IOException | CascadingException | TezException exception ) 388 { 389 logWarn( "unable to get slice stats from timeline server", exception ); 390 } 391 392 return null; 393 } 394 395 private boolean withoutTimelineServer( DAGClient dagClient ) 396 { 397 VertexStatus vertexStatus = updateProgress( dagClient, STATUS_GET_COUNTERS ); 398 399 if( vertexStatus == null || getTotalTaskCount() == 0 ) 400 return false; 401 402 int total = sliceStatsMap.size(); 403 404 if( total == 0 ) // yet to be initialized 405 logWarn( "'{}' is disabled, or running an incompatible Tez version: {}, task level counters cannot be retrieved", YarnConfiguration.TIMELINE_SERVICE_ENABLED, TezStatsUtil.getPlatformVersion() ); 406 407 for( int i = total; i < totalTaskCount; i++ ) 408 { 409 TezSliceStats sliceStats = new TezSliceStats( Util.createUniqueID(), kind, getStatus(), vertexID, null ); 410 411 // we don't have the taskId, so we are using the id as the key 412 sliceStatsMap.put( sliceStats.getID(), sliceStats ); 413 } 414 415 // a placeholder to simulate actual slice stats for now 416 Iterator<FlowSliceStats> iterator = sliceStatsMap.values().iterator(); 417 418 for( int i = 0; i < runningTaskCount && iterator.hasNext(); i++ ) 419 ( (TezSliceStats) iterator.next() ).setStatus( Status.RUNNING ); 420 421 for( int i = 0; i < succeededTaskCount && iterator.hasNext(); i++ ) 422 ( (TezSliceStats) iterator.next() ).setStatus( Status.SUCCESSFUL ); 423 424 for( int i = 0; i < failedTaskCount && iterator.hasNext(); i++ ) 425 ( (TezSliceStats) iterator.next() ).setStatus( Status.FAILED ); 426 427 for( int i = 0; i < killedTaskCount && iterator.hasNext(); i++ ) 428 ( (TezSliceStats) iterator.next() ).setStatus( Status.STOPPED ); 429 430 List<String> diagnostics = vertexStatus.getDiagnostics(); 431 432 for( String diagnostic : diagnostics ) 433 logInfo( "vertex diagnostics: {}", diagnostic ); 434 435 int finishedTaskCount = succeededTaskCount + failedTaskCount + killedTaskCount; 436 437 allChildrenFinished = totalTaskCount == finishedTaskCount; 438 439 return true; 440 } 441 442 private Status getStatusForTaskStatus( @Nullable String status ) 443 { 444 if( isEmpty( status ) ) 445 return null; 446 447 TaskState state = TaskState.valueOf( status ); 448 449 switch( state ) 450 { 451 case NEW: 452 return Status.PENDING; 453 case SCHEDULED: 454 return Status.SUBMITTED; 455 case RUNNING: 456 return Status.RUNNING; 457 case SUCCEEDED: 458 return Status.SUCCESSFUL; 459 case FAILED: 460 return Status.FAILED; 461 case KILLED: 462 return Status.STOPPED; 463 } 464 465 return null; 466 } 467 468 private VertexStatus updateProgress( DAGClient dagClient, Set<StatusGetOpts> statusGetOpts ) 469 { 470 VertexStatus vertexStatus = null; 471 472 try 473 { 474 vertexStatus = dagClient.getVertexStatus( getID(), statusGetOpts ); 475 } 476 catch( IOException | TezException exception ) 477 { 478 logWarn( "unable to get vertex status for: {}", getID(), exception ); 479 } 480 481 if( vertexStatus == null ) 482 return null; 483 484 Progress progress = vertexStatus.getProgress(); 485 486 totalTaskCount = progress.getTotalTaskCount(); 487 runningTaskCount = progress.getRunningTaskCount(); 488 succeededTaskCount = progress.getSucceededTaskCount(); 489 failedTaskCount = progress.getFailedTaskCount(); 490 killedTaskCount = progress.getKilledTaskCount(); 491 492 return vertexStatus; 493 } 494 }