001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.tez;
022
023import java.io.IOException;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import javax.annotation.Nullable;
030
031import cascading.CascadingException;
032import cascading.flow.FlowNode;
033import cascading.flow.hadoop.util.HadoopUtil;
034import cascading.flow.stream.annotations.StreamMode;
035import cascading.management.state.ClientState;
036import cascading.property.PropertyUtil;
037import cascading.stats.BaseCachedNodeStats;
038import cascading.stats.BaseCachedStepStats;
039import cascading.stats.FlowSliceStats;
040import cascading.stats.tez.util.TaskStatus;
041import cascading.stats.tez.util.TezStatsUtil;
042import cascading.stats.tez.util.TimelineClient;
043import cascading.tap.Tap;
044import cascading.util.Util;
045import org.apache.hadoop.conf.Configuration;
046import org.apache.hadoop.yarn.conf.YarnConfiguration;
047import org.apache.tez.common.counters.TezCounters;
048import org.apache.tez.dag.api.TezException;
049import org.apache.tez.dag.api.client.DAGClient;
050import org.apache.tez.dag.api.client.Progress;
051import org.apache.tez.dag.api.client.StatusGetOpts;
052import org.apache.tez.dag.api.client.VertexStatus;
053import org.apache.tez.dag.api.oldrecords.TaskState;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057import static cascading.stats.tez.util.TezStatsUtil.STATUS_GET_COUNTERS;
058import static cascading.util.Util.formatDurationFromMillis;
059import static cascading.util.Util.isEmpty;
060
061/**
062 *
063 */
064public class TezNodeStats extends BaseCachedNodeStats<Configuration, DAGClient, TezCounters>
065  {
066  private static final Logger LOG = LoggerFactory.getLogger( TezNodeStats.class );
067
068  /**
069   * Sets the fetch limit from the timeline server. May be set as a System property.
070   */
071  public static final String TIMELINE_FETCH_LIMIT = "cascading.stats.timeline.fetch.limit";
072  public static final int DEFAULT_FETCH_LIMIT = 500;
073
074  private static int fetchLimit = -1;
075
076  public enum Kind
077    {
078      SPLIT, PARTITIONED, UNKNOWN
079    }
080
081  private BaseCachedStepStats<Configuration, DAGClient, TezCounters> parentStepStats;
082  private Kind kind;
083
084  private String vertexID;
085  private int totalTaskCount;
086  private int succeededTaskCount;
087  private int failedTaskCount;
088  private int killedTaskCount;
089  private int runningTaskCount;
090
091  private static void setFetchLimit( Configuration configuration )
092    {
093    if( fetchLimit > -1 )
094      return;
095
096    fetchLimit = PropertyUtil.getIntProperty( HadoopUtil.createProperties( configuration ), TIMELINE_FETCH_LIMIT, DEFAULT_FETCH_LIMIT );
097
098    if( fetchLimit < 2 )
099      {
100      LOG.warn( "property: {}, was set to: {}, may not be less than 2, setting to 2", TIMELINE_FETCH_LIMIT, fetchLimit );
101      fetchLimit = 2;
102      }
103    }
104
105  protected TezNodeStats( final BaseCachedStepStats<Configuration, DAGClient, TezCounters> parentStepStats, FlowNode flowNode, ClientState clientState, Configuration configuration )
106    {
107    super( flowNode, clientState );
108
109    setFetchLimit( configuration );
110
111    this.parentStepStats = parentStepStats;
112    this.kind = getStreamedTaps( flowNode ).isEmpty() ? Kind.PARTITIONED : Kind.SPLIT;
113
114    this.counterCache = new TezCounterCache<DAGClient>( this, configuration )
115    {
116    @Override
117    protected DAGClient getJobStatusClient()
118      {
119      return parentStepStats.getJobStatusClient();
120      }
121
122    protected TezCounters getCounters( DAGClient dagClient ) throws IOException
123      {
124      VertexStatus vertexStatus = updateProgress( dagClient, STATUS_GET_COUNTERS );
125
126      if( vertexStatus == null )
127        return null;
128
129      TezCounters vertexCounters = vertexStatus.getVertexCounters();
130
131      if( vertexCounters == null )
132        logWarn( "could not retrieve vertex counters in stats status: {}, and vertex state: {}", getStatus(), vertexStatus.getState() );
133
134      return vertexCounters;
135      }
136    };
137    }
138
139  /**
140   * Current rule sets do not guarantee setting Streamed annotation, but do for Accumulated
141   */
142  private Set<Tap> getStreamedTaps( FlowNode flowNode )
143    {
144    Set<Tap> taps = new HashSet<>( flowNode.getSourceTaps() );
145
146    taps.remove( flowNode.getSourceElements( StreamMode.Accumulated ) );
147
148    return taps;
149    }
150
151  @Override
152  public String getKind()
153    {
154    if( kind == null )
155      return null;
156
157    return kind.name();
158    }
159
160  private String retrieveVertexID( DAGClient dagClient )
161    {
162    if( vertexID != null || !( dagClient instanceof TimelineClient ) )
163      return vertexID;
164
165    try
166      {
167      vertexID = ( (TimelineClient) dagClient ).getVertexID( getID() );
168      }
169    catch( IOException | CascadingException | TezException exception )
170      {
171      logWarn( "unable to get vertex id", exception );
172      }
173
174    return vertexID;
175    }
176
177  public int getTotalTaskCount()
178    {
179    return totalTaskCount;
180    }
181
182  public int getSucceededTaskCount()
183    {
184    return succeededTaskCount;
185    }
186
187  public int getFailedTaskCount()
188    {
189    return failedTaskCount;
190    }
191
192  public int getKilledTaskCount()
193    {
194    return killedTaskCount;
195    }
196
197  public int getRunningTaskCount()
198    {
199    return runningTaskCount;
200    }
201
202  @Override
203  protected boolean captureChildDetailInternal()
204    {
205    if( allChildrenFinished )
206      return true;
207
208    DAGClient dagClient = parentStepStats.getJobStatusClient();
209
210    if( dagClient == null )
211      return false;
212
213    // we cannot get task counters without the timeline server running
214    if( dagClient instanceof TimelineClient )
215      return withTimelineServer( (TimelineClient) dagClient );
216
217    // these are just placeholders without counters, otherwise the order would be reversed as a failover mechanism
218    return withoutTimelineServer( dagClient );
219    }
220
221  private boolean withTimelineServer( TimelineClient timelineClient )
222    {
223    updateProgress( (DAGClient) timelineClient, null ); // get latest task counts
224
225    if( getTotalTaskCount() == 0 ) // nothing to do - try again later
226      return false;
227
228    if( sliceStatsMap.size() == getTotalTaskCount() )
229      return updateAllTasks( timelineClient );
230
231    return fetchAllTasks( timelineClient );
232    }
233
234  private boolean updateAllTasks( TimelineClient timelineClient )
235    {
236    if( allChildrenFinished )
237      return true;
238
239    long startTime = System.currentTimeMillis();
240
241    int count = 0;
242
243    for( FlowSliceStats sliceStats : sliceStatsMap.values() )
244      {
245      if( sliceStats.getStatus().isFinished() )
246        continue;
247
248      TaskStatus taskStatus = getTaskStatusFor( timelineClient, sliceStats.getProcessSliceID() );
249
250      updateSliceWith( (TezSliceStats) sliceStats, taskStatus, System.currentTimeMillis() );
251
252      count++;
253      }
254
255    if( count == 0 )
256      allChildrenFinished = true;
257
258    logInfo( "updated {} slices in: {}", count, formatDurationFromMillis( System.currentTimeMillis() - startTime ) );
259
260    return sliceStatsMap.size() == getTotalTaskCount();
261    }
262
263  private boolean fetchAllTasks( TimelineClient timelineClient )
264    {
265    long startTime = System.currentTimeMillis();
266    String fromTaskId = null;
267    int startSize = sliceStatsMap.size();
268    int iteration = 0;
269    boolean continueIterating = true;
270    boolean retrievedAreFinished = true;
271
272    while( continueIterating && sliceStatsMap.size() != getTotalTaskCount() )
273      {
274      long lastFetch = System.currentTimeMillis();
275
276      // we will see the same tasks twice as we paginate
277      Iterator<TaskStatus> vertexChildren = getTaskStatusIterator( timelineClient, fromTaskId );
278
279      if( vertexChildren == null )
280        return false;
281
282      int added = 0;
283      int updated = 0;
284
285      while( vertexChildren.hasNext() )
286        {
287        TaskStatus taskStatus = vertexChildren.next();
288
289        fromTaskId = taskStatus.getTaskID();
290
291        TezSliceStats sliceStats = (TezSliceStats) sliceStatsMap.get( fromTaskId );
292
293        if( sliceStats == null )
294          {
295          added++;
296
297          sliceStats = new TezSliceStats( Util.createUniqueID(), kind, getStatus(), vertexID, fromTaskId );
298
299          sliceStatsMap.put( sliceStats.getProcessSliceID(), sliceStats );
300          }
301        else
302          {
303          updated++;
304          }
305
306        updateSliceWith( sliceStats, taskStatus, lastFetch );
307
308        if( !sliceStats.getStatus().isFinished() )
309          retrievedAreFinished = false;
310        }
311
312      int retrieved = added + updated;
313
314      if( added == 0 && updated == 1 ) // if paginating, will have at least retrieved 1 task
315        continueIterating = false;
316      else
317        continueIterating = retrieved != 0;
318
319      if( continueIterating )
320        logInfo( "iteration retrieved: {}, added {}, updated {} slices in iteration: {}, fetch limit: {}", retrieved, added, updated, ++iteration, fetchLimit );
321      }
322
323    int total = sliceStatsMap.size();
324    int added = total - startSize;
325    int remaining = getTotalTaskCount() - total;
326    String duration = formatDurationFromMillis( System.currentTimeMillis() - startTime );
327
328    if( total == getTotalTaskCount() && retrievedAreFinished )
329      allChildrenFinished = true;
330
331    if( iteration == 0 && total == 0 )
332      logInfo( "no slices stats available yet, expecting: {}", remaining );
333    else
334      logInfo( "added {} slices, in iterations: {}, with duration: {}, total fetched: {}, remaining: {}", added, iteration, duration, total, remaining );
335
336    return total == getTotalTaskCount();
337    }
338
339  private void updateSliceWith( TezSliceStats sliceStats, TaskStatus taskStatus, long lastFetch )
340    {
341    if( taskStatus == null )
342      return;
343
344    sliceStats.setStatus( getStatusForTaskStatus( taskStatus.getStatus() ) ); // ignores nulls
345    sliceStats.setSubmitTime( taskStatus.getScheduledTime() );
346    sliceStats.setStartTime( taskStatus.getStartTime() );
347    sliceStats.setFinishTime( taskStatus.getEndTime() );
348    sliceStats.setDiagnostics( taskStatus.getDiagnostics() );
349    sliceStats.setSuccessfulAttemptID( taskStatus.getSuccessfulAttemptID() );
350
351    Map<String, Map<String, Long>> counters = taskStatus.getCounters();
352
353    sliceStats.setCounters( counters ); // ignores nulls
354
355    if( counters != null )
356      sliceStats.setLastFetch( lastFetch );
357    }
358
359  private TaskStatus getTaskStatusFor( TimelineClient timelineClient, String taskID )
360    {
361    try
362      {
363      return timelineClient.getVertexChild( taskID );
364      }
365    catch( TezException exception )
366      {
367      logWarn( "unable to get slice stat from timeline server for task id: {}", taskID, exception );
368      }
369
370    return null;
371    }
372
373  private Iterator<TaskStatus> getTaskStatusIterator( TimelineClient timelineClient, String startTaskID )
374    {
375    try
376      {
377      String vertexID = retrieveVertexID( (DAGClient) timelineClient );
378
379      if( vertexID == null )
380        {
381        logWarn( "unable to get slice stats from timeline server, did not retrieve valid vertex id for vertex name: {}", getID() );
382        return null;
383        }
384
385      return timelineClient.getVertexChildren( vertexID, fetchLimit, startTaskID );
386      }
387    catch( IOException | CascadingException | TezException exception )
388      {
389      logWarn( "unable to get slice stats from timeline server", exception );
390      }
391
392    return null;
393    }
394
395  private boolean withoutTimelineServer( DAGClient dagClient )
396    {
397    VertexStatus vertexStatus = updateProgress( dagClient, STATUS_GET_COUNTERS );
398
399    if( vertexStatus == null || getTotalTaskCount() == 0 )
400      return false;
401
402    int total = sliceStatsMap.size();
403
404    if( total == 0 ) // yet to be initialized
405      logWarn( "'{}' is disabled, or running an incompatible Tez version: {}, task level counters cannot be retrieved", YarnConfiguration.TIMELINE_SERVICE_ENABLED, TezStatsUtil.getPlatformVersion() );
406
407    for( int i = total; i < totalTaskCount; i++ )
408      {
409      TezSliceStats sliceStats = new TezSliceStats( Util.createUniqueID(), kind, getStatus(), vertexID, null );
410
411      // we don't have the taskId, so we are using the id as the key
412      sliceStatsMap.put( sliceStats.getID(), sliceStats );
413      }
414
415    // a placeholder to simulate actual slice stats for now
416    Iterator<FlowSliceStats> iterator = sliceStatsMap.values().iterator();
417
418    for( int i = 0; i < runningTaskCount && iterator.hasNext(); i++ )
419      ( (TezSliceStats) iterator.next() ).setStatus( Status.RUNNING );
420
421    for( int i = 0; i < succeededTaskCount && iterator.hasNext(); i++ )
422      ( (TezSliceStats) iterator.next() ).setStatus( Status.SUCCESSFUL );
423
424    for( int i = 0; i < failedTaskCount && iterator.hasNext(); i++ )
425      ( (TezSliceStats) iterator.next() ).setStatus( Status.FAILED );
426
427    for( int i = 0; i < killedTaskCount && iterator.hasNext(); i++ )
428      ( (TezSliceStats) iterator.next() ).setStatus( Status.STOPPED );
429
430    List<String> diagnostics = vertexStatus.getDiagnostics();
431
432    for( String diagnostic : diagnostics )
433      logInfo( "vertex diagnostics: {}", diagnostic );
434
435    int finishedTaskCount = succeededTaskCount + failedTaskCount + killedTaskCount;
436
437    allChildrenFinished = totalTaskCount == finishedTaskCount;
438
439    return true;
440    }
441
442  private Status getStatusForTaskStatus( @Nullable String status )
443    {
444    if( isEmpty( status ) )
445      return null;
446
447    TaskState state = TaskState.valueOf( status );
448
449    switch( state )
450      {
451      case NEW:
452        return Status.PENDING;
453      case SCHEDULED:
454        return Status.SUBMITTED;
455      case RUNNING:
456        return Status.RUNNING;
457      case SUCCEEDED:
458        return Status.SUCCESSFUL;
459      case FAILED:
460        return Status.FAILED;
461      case KILLED:
462        return Status.STOPPED;
463      }
464
465    return null;
466    }
467
468  private VertexStatus updateProgress( DAGClient dagClient, Set<StatusGetOpts> statusGetOpts )
469    {
470    VertexStatus vertexStatus = null;
471
472    try
473      {
474      vertexStatus = dagClient.getVertexStatus( getID(), statusGetOpts );
475      }
476    catch( IOException | TezException exception )
477      {
478      logWarn( "unable to get vertex status for: {}", getID(), exception );
479      }
480
481    if( vertexStatus == null )
482      return null;
483
484    Progress progress = vertexStatus.getProgress();
485
486    totalTaskCount = progress.getTotalTaskCount();
487    runningTaskCount = progress.getRunningTaskCount();
488    succeededTaskCount = progress.getSucceededTaskCount();
489    failedTaskCount = progress.getFailedTaskCount();
490    killedTaskCount = progress.getKilledTaskCount();
491
492    return vertexStatus;
493    }
494  }