001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.tez;
022
023import java.io.IOException;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import javax.annotation.Nullable;
030
031import cascading.CascadingException;
032import cascading.flow.FlowNode;
033import cascading.flow.hadoop.util.HadoopUtil;
034import cascading.flow.stream.annotations.StreamMode;
035import cascading.management.state.ClientState;
036import cascading.property.PropertyUtil;
037import cascading.stats.FlowSliceStats;
038import cascading.stats.hadoop.BaseHadoopNodeStats;
039import cascading.stats.tez.util.TaskStatus;
040import cascading.stats.tez.util.TezStatsUtil;
041import cascading.stats.tez.util.TimelineClient;
042import cascading.tap.Tap;
043import cascading.util.Util;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.yarn.conf.YarnConfiguration;
046import org.apache.tez.common.counters.TezCounters;
047import org.apache.tez.dag.api.TezException;
048import org.apache.tez.dag.api.client.DAGClient;
049import org.apache.tez.dag.api.client.Progress;
050import org.apache.tez.dag.api.client.StatusGetOpts;
051import org.apache.tez.dag.api.client.VertexStatus;
052import org.apache.tez.dag.api.oldrecords.TaskState;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import static cascading.stats.tez.util.TezStatsUtil.STATUS_GET_COUNTERS;
057import static cascading.util.Util.formatDurationFromMillis;
058import static cascading.util.Util.isEmpty;
059
060/**
061 *
062 */
063public class TezNodeStats extends BaseHadoopNodeStats<DAGClient, TezCounters>
064  {
065  private static final Logger LOG = LoggerFactory.getLogger( TezNodeStats.class );
066
067  /**
068   * Sets the fetch limit from the timeline server. May be set as a System property.
069   */
070  public static final String TIMELINE_FETCH_LIMIT = "cascading.stats.timeline.fetch.limit";
071  public static final int DEFAULT_FETCH_LIMIT = 500;
072
073  private static int fetchLimit = -1;
074
075  public enum Kind
076    {
077      SPLIT, PARTITIONED
078    }
079
080  private TezStepStats parentStepStats;
081  private Kind kind;
082
083  private String vertexID;
084  private int totalTaskCount;
085  private int succeededTaskCount;
086  private int failedTaskCount;
087  private int killedTaskCount;
088  private int runningTaskCount;
089
090  private static void setFetchLimit( Configuration configuration )
091    {
092    if( fetchLimit > -1 )
093      return;
094
095    fetchLimit = PropertyUtil.getIntProperty( HadoopUtil.createProperties( configuration ), TIMELINE_FETCH_LIMIT, DEFAULT_FETCH_LIMIT );
096
097    if( fetchLimit < 2 )
098      {
099      LOG.warn( "property: {}, was set to: {}, may not be less than 2, setting to 2", TIMELINE_FETCH_LIMIT, fetchLimit );
100      fetchLimit = 2;
101      }
102    }
103
104  protected TezNodeStats( final TezStepStats parentStepStats, FlowNode flowNode, ClientState clientState, Configuration configuration )
105    {
106    super( flowNode, clientState );
107
108    setFetchLimit( configuration );
109
110    this.parentStepStats = parentStepStats;
111    this.kind = getStreamedTaps( flowNode ).isEmpty() ? Kind.PARTITIONED : Kind.SPLIT;
112
113    this.counterCache = new TezCounterCache<DAGClient>( this, configuration )
114    {
115    @Override
116    protected DAGClient getJobStatusClient()
117      {
118      return parentStepStats.getJobStatusClient();
119      }
120
121    protected TezCounters getCounters( DAGClient dagClient ) throws IOException
122      {
123      VertexStatus vertexStatus = updateProgress( dagClient, STATUS_GET_COUNTERS );
124
125      if( vertexStatus == null )
126        return null;
127
128      TezCounters vertexCounters = vertexStatus.getVertexCounters();
129
130      if( vertexCounters == null )
131        logWarn( "could not retrieve vertex counters in stats status: {}, and vertex state: {}", getStatus(), vertexStatus.getState() );
132
133      return vertexCounters;
134      }
135    };
136    }
137
138  /**
139   * Current rule sets do not guarantee setting Streamed annotation, but do for Accumulated
140   */
141  private Set<Tap> getStreamedTaps( FlowNode flowNode )
142    {
143    Set<Tap> taps = new HashSet<>( flowNode.getSourceTaps() );
144
145    taps.remove( flowNode.getSourceElements( StreamMode.Accumulated ) );
146
147    return taps;
148    }
149
150  @Override
151  public String getKind()
152    {
153    if( kind == null )
154      return null;
155
156    return kind.name();
157    }
158
159  private String retrieveVertexID( DAGClient dagClient )
160    {
161    if( vertexID != null || !( dagClient instanceof TimelineClient ) )
162      return vertexID;
163
164    try
165      {
166      vertexID = ( (TimelineClient) dagClient ).getVertexID( getID() );
167      }
168    catch( IOException | CascadingException | TezException exception )
169      {
170      logWarn( "unable to get vertex id", exception );
171      }
172
173    return vertexID;
174    }
175
176  public int getTotalTaskCount()
177    {
178    return totalTaskCount;
179    }
180
181  public int getSucceededTaskCount()
182    {
183    return succeededTaskCount;
184    }
185
186  public int getFailedTaskCount()
187    {
188    return failedTaskCount;
189    }
190
191  public int getKilledTaskCount()
192    {
193    return killedTaskCount;
194    }
195
196  public int getRunningTaskCount()
197    {
198    return runningTaskCount;
199    }
200
201  @Override
202  protected boolean captureChildDetailInternal()
203    {
204    if( allChildrenFinished )
205      return true;
206
207    DAGClient dagClient = parentStepStats.getJobStatusClient();
208
209    if( dagClient == null )
210      return false;
211
212    // we cannot get task counters without the timeline server running
213    if( dagClient instanceof TimelineClient )
214      return withTimelineServer( (TimelineClient) dagClient );
215
216    // these are just placeholders without counters, otherwise the order would be reversed as a failover mechanism
217    return withoutTimelineServer( dagClient );
218    }
219
220  private boolean withTimelineServer( TimelineClient timelineClient )
221    {
222    updateProgress( (DAGClient) timelineClient, null ); // get latest task counts
223
224    if( getTotalTaskCount() == 0 ) // nothing to do - try again later
225      return false;
226
227    if( sliceStatsMap.size() == getTotalTaskCount() )
228      return updateAllTasks( timelineClient );
229
230    return fetchAllTasks( timelineClient );
231    }
232
233  private boolean updateAllTasks( TimelineClient timelineClient )
234    {
235    if( allChildrenFinished )
236      return true;
237
238    long startTime = System.currentTimeMillis();
239
240    int count = 0;
241
242    for( FlowSliceStats sliceStats : sliceStatsMap.values() )
243      {
244      if( sliceStats.getStatus().isFinished() )
245        continue;
246
247      TaskStatus taskStatus = getTaskStatusFor( timelineClient, sliceStats.getProcessSliceID() );
248
249      updateSliceWith( (TezSliceStats) sliceStats, taskStatus, System.currentTimeMillis() );
250
251      count++;
252      }
253
254    if( count == 0 )
255      allChildrenFinished = true;
256
257    logInfo( "updated {} slices in: {}", count, formatDurationFromMillis( System.currentTimeMillis() - startTime ) );
258
259    return sliceStatsMap.size() == getTotalTaskCount();
260    }
261
262  private boolean fetchAllTasks( TimelineClient timelineClient )
263    {
264    long startTime = System.currentTimeMillis();
265    String fromTaskId = null;
266    int startSize = sliceStatsMap.size();
267    int iteration = 0;
268    boolean continueIterating = true;
269    boolean retrievedAreFinished = true;
270
271    while( continueIterating && sliceStatsMap.size() != getTotalTaskCount() )
272      {
273      long lastFetch = System.currentTimeMillis();
274
275      // we will see the same tasks twice as we paginate
276      Iterator<TaskStatus> vertexChildren = getTaskStatusIterator( timelineClient, fromTaskId );
277
278      if( vertexChildren == null )
279        return false;
280
281      int added = 0;
282      int updated = 0;
283
284      while( vertexChildren.hasNext() )
285        {
286        TaskStatus taskStatus = vertexChildren.next();
287
288        fromTaskId = taskStatus.getTaskID();
289
290        TezSliceStats sliceStats = (TezSliceStats) sliceStatsMap.get( fromTaskId );
291
292        if( sliceStats == null )
293          {
294          added++;
295
296          sliceStats = new TezSliceStats( Util.createUniqueID(), kind, this.getStatus(), fromTaskId );
297
298          sliceStatsMap.put( sliceStats.getProcessSliceID(), sliceStats );
299          }
300        else
301          {
302          updated++;
303          }
304
305        updateSliceWith( sliceStats, taskStatus, lastFetch );
306
307        if( !sliceStats.getStatus().isFinished() )
308          retrievedAreFinished = false;
309        }
310
311      int retrieved = added + updated;
312
313      if( added == 0 && updated == 1 ) // if paginating, will have at least retrieved 1 task
314        continueIterating = false;
315      else
316        continueIterating = retrieved != 0;
317
318      if( continueIterating )
319        logInfo( "iteration retrieved: {}, added {}, updated {} slices in iteration: {}, fetch limit: {}", retrieved, added, updated, ++iteration, fetchLimit );
320      }
321
322    int total = sliceStatsMap.size();
323    int added = total - startSize;
324    int remaining = getTotalTaskCount() - total;
325    String duration = formatDurationFromMillis( System.currentTimeMillis() - startTime );
326
327    if( total == getTotalTaskCount() && retrievedAreFinished )
328      allChildrenFinished = true;
329
330    if( iteration == 0 && total == 0 )
331      logInfo( "no slices stats available yet, expecting: {}", remaining );
332    else
333      logInfo( "added {} slices, in iterations: {}, with duration: {}, total fetched: {}, remaining: {}", added, iteration, duration, total, remaining );
334
335    return total == getTotalTaskCount();
336    }
337
338  private void updateSliceWith( TezSliceStats sliceStats, TaskStatus taskStatus, long lastFetch )
339    {
340    if( taskStatus == null )
341      return;
342
343    sliceStats.setStatus( getStatusForTaskStatus( taskStatus.getStatus() ) ); // ignores nulls
344    sliceStats.setSubmitTime( taskStatus.getScheduledTime() );
345    sliceStats.setStartTime( taskStatus.getStartTime() );
346    sliceStats.setFinishTime( taskStatus.getEndTime() );
347    sliceStats.setDiagnostics( taskStatus.getDiagnostics() );
348    sliceStats.setSuccessfulAttemptID( taskStatus.getSuccessfulAttemptID() );
349
350    Map<String, Map<String, Long>> counters = taskStatus.getCounters();
351
352    sliceStats.setCounters( counters ); // ignores nulls
353
354    if( counters != null )
355      sliceStats.setLastFetch( lastFetch );
356    }
357
358  private TaskStatus getTaskStatusFor( TimelineClient timelineClient, String taskID )
359    {
360    try
361      {
362      return timelineClient.getVertexChild( taskID );
363      }
364    catch( TezException exception )
365      {
366      logWarn( "unable to get slice stat from timeline server for task id: {}", taskID, exception );
367      }
368
369    return null;
370    }
371
372  private Iterator<TaskStatus> getTaskStatusIterator( TimelineClient timelineClient, String startTaskID )
373    {
374    try
375      {
376      String vertexID = retrieveVertexID( (DAGClient) timelineClient );
377
378      if( vertexID == null )
379        {
380        logWarn( "unable to get slice stats from timeline server, did not retrieve valid vertex id for vertex name: {}", getID() );
381        return null;
382        }
383
384      return timelineClient.getVertexChildren( vertexID, fetchLimit, startTaskID );
385      }
386    catch( IOException | CascadingException | TezException exception )
387      {
388      logWarn( "unable to get slice stats from timeline server", exception );
389      }
390
391    return null;
392    }
393
394  private boolean withoutTimelineServer( DAGClient dagClient )
395    {
396    VertexStatus vertexStatus = updateProgress( dagClient, STATUS_GET_COUNTERS );
397
398    if( vertexStatus == null || getTotalTaskCount() == 0 )
399      return false;
400
401    int total = sliceStatsMap.size();
402
403    if( total == 0 ) // yet to be initialized
404      logWarn( "'{}' is disabled, or running an incompatible Tez version: {}, task level counters cannot be retrieved", YarnConfiguration.TIMELINE_SERVICE_ENABLED, TezStatsUtil.getPlatformVersion() );
405
406    for( int i = total; i < totalTaskCount; i++ )
407      {
408      TezSliceStats sliceStats = new TezSliceStats( Util.createUniqueID(), kind, this.getStatus(), null );
409
410      // we don't have the taskId, so we are using the id as the key
411      sliceStatsMap.put( sliceStats.getID(), sliceStats );
412      }
413
414    // a placeholder to simulate actual slice stats for now
415    Iterator<FlowSliceStats> iterator = sliceStatsMap.values().iterator();
416
417    for( int i = 0; i < runningTaskCount && iterator.hasNext(); i++ )
418      ( (TezSliceStats) iterator.next() ).setStatus( Status.RUNNING );
419
420    for( int i = 0; i < succeededTaskCount && iterator.hasNext(); i++ )
421      ( (TezSliceStats) iterator.next() ).setStatus( Status.SUCCESSFUL );
422
423    for( int i = 0; i < failedTaskCount && iterator.hasNext(); i++ )
424      ( (TezSliceStats) iterator.next() ).setStatus( Status.FAILED );
425
426    for( int i = 0; i < killedTaskCount && iterator.hasNext(); i++ )
427      ( (TezSliceStats) iterator.next() ).setStatus( Status.STOPPED );
428
429    List<String> diagnostics = vertexStatus.getDiagnostics();
430
431    for( String diagnostic : diagnostics )
432      logInfo( "vertex diagnostics: {}", diagnostic );
433
434    int finishedTaskCount = succeededTaskCount + failedTaskCount + killedTaskCount;
435
436    allChildrenFinished = totalTaskCount == finishedTaskCount;
437
438    return true;
439    }
440
441  private Status getStatusForTaskStatus( @Nullable String status )
442    {
443    if( isEmpty( status ) )
444      return null;
445
446    TaskState state = TaskState.valueOf( status );
447
448    switch( state )
449      {
450      case NEW:
451        return Status.PENDING;
452      case SCHEDULED:
453        return Status.SUBMITTED;
454      case RUNNING:
455        return Status.RUNNING;
456      case SUCCEEDED:
457        return Status.SUCCESSFUL;
458      case FAILED:
459        return Status.FAILED;
460      case KILLED:
461        return Status.STOPPED;
462      }
463
464    return null;
465    }
466
467  private VertexStatus updateProgress( DAGClient dagClient, Set<StatusGetOpts> statusGetOpts )
468    {
469    VertexStatus vertexStatus = null;
470
471    try
472      {
473      vertexStatus = dagClient.getVertexStatus( getID(), statusGetOpts );
474      }
475    catch( IOException | TezException exception )
476      {
477      logWarn( "unable to get vertex status for: {}", getID(), exception );
478      }
479
480    if( vertexStatus == null )
481      return null;
482
483    Progress progress = vertexStatus.getProgress();
484
485    totalTaskCount = progress.getTotalTaskCount();
486    runningTaskCount = progress.getRunningTaskCount();
487    succeededTaskCount = progress.getSucceededTaskCount();
488    failedTaskCount = progress.getFailedTaskCount();
489    killedTaskCount = progress.getKilledTaskCount();
490
491    return vertexStatus;
492    }
493  }