001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.io.IOException;
024import java.util.HashMap;
025import java.util.Map;
026
027import cascading.flow.FlowNode;
028import cascading.management.state.ClientState;
029import cascading.stats.BaseCachedNodeStats;
030import cascading.stats.CounterCache;
031import cascading.stats.FlowNodeStats;
032import cascading.stats.FlowSliceStats;
033import cascading.util.Util;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.mapred.RunningJob;
036import org.apache.hadoop.mapreduce.Job;
037import org.apache.hadoop.mapreduce.TaskCompletionEvent;
038import org.apache.hadoop.mapreduce.TaskID;
039import org.apache.hadoop.mapreduce.TaskReport;
040import org.apache.hadoop.mapreduce.TaskType;
041
042import static cascading.util.Util.formatDurationFromMillis;
043
044/**
045 *
046 */
047public class HadoopNodeStats extends BaseCachedNodeStats<Configuration, FlowNodeStats, Map<String, Map<String, Long>>>
048  {
049  private Map<TaskID, String> sliceIDCache = new HashMap<TaskID, String>( 4999 ); // caching for ids
050
051  private HadoopStepStats parentStepStats;
052  private HadoopSliceStats.Kind kind;
053
054  /**
055   * Constructor CascadingStats creates a new CascadingStats instance.
056   *
057   * @param parentStepStats
058   * @param configuration
059   * @param kind
060   * @param flowNode
061   * @param clientState
062   */
063  protected HadoopNodeStats( final HadoopStepStats parentStepStats, Configuration configuration, HadoopSliceStats.Kind kind, FlowNode flowNode, ClientState clientState )
064    {
065    super( flowNode, clientState );
066    this.parentStepStats = parentStepStats;
067    this.kind = kind;
068
069    this.counterCache = new HadoopNodeCounterCache( this, configuration );
070    }
071
072  @Override
073  public String getKind()
074    {
075    if( kind == null )
076      return null;
077
078    return kind.name();
079    }
080
081  private Status getParentStatus()
082    {
083    return parentStepStats.getStatus();
084    }
085
086  private RunningJob getJobStatusClient()
087    {
088    return parentStepStats.getJobStatusClient();
089    }
090
091  /**
092   * Retrieves the TaskReports via the mapreduce API.
093   *
094   * @param kind The kind of TaskReport to retrieve.
095   * @return An array of TaskReports, but never <code>nul</code>.
096   * @throws IOException
097   */
098  private TaskReport[] retrieveTaskReports( HadoopSliceStats.Kind kind ) throws IOException, InterruptedException
099    {
100    Job job = HadoopStepStats.getJob( getJobStatusClient() );
101
102    if( job == null )
103      return new TaskReport[ 0 ];
104
105    switch( kind )
106      {
107      case MAPPER:
108        return job.getTaskReports( TaskType.MAP );
109      case REDUCER:
110        return job.getTaskReports( TaskType.REDUCE );
111      case SETUP:
112        return job.getTaskReports( TaskType.JOB_SETUP );
113      case CLEANUP:
114        return job.getTaskReports( TaskType.JOB_CLEANUP );
115      default:
116        return new TaskReport[ 0 ];
117      }
118    }
119
120  @Override
121  protected boolean captureChildDetailInternal()
122    {
123    if( allChildrenFinished )
124      return true;
125
126    Job job = HadoopStepStats.getJob( getJobStatusClient() );
127
128    if( job == null )
129      return false;
130
131    try
132      {
133      TaskReport[] taskReports = retrieveTaskReports( kind );
134
135      if( taskReports.length == 0 )
136        return false;
137
138      addTaskStats( taskReports, false );
139
140      return true;
141      }
142    catch( IOException exception )
143      {
144      logWarn( "unable to retrieve slice stats via task reports", exception );
145      }
146    catch( InterruptedException exception )
147      {
148      logWarn( "retrieving task reports timed out, consider increasing timeout delay in CounterCache via: '{}', message: {}", CounterCache.COUNTER_TIMEOUT_PROPERTY, exception.getMessage() );
149      }
150
151    return false;
152    }
153
154  protected void addTaskStats( TaskReport[] taskReports, boolean skipLast )
155    {
156    logInfo( "retrieved task reports: {}", taskReports.length );
157
158    long lastFetch = System.currentTimeMillis();
159    boolean fetchedAreFinished = true;
160
161    synchronized( sliceStatsMap )
162      {
163      int added = 0;
164      int updated = 0;
165
166      for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ )
167        {
168        TaskReport taskReport = taskReports[ i ];
169
170        if( taskReport == null )
171          {
172          logWarn( "found empty task report" );
173          continue;
174          }
175
176        String id = getSliceIDFor( taskReport.getTaskID() );
177        HadoopSliceStats sliceStats = (HadoopSliceStats) sliceStatsMap.get( id );
178
179        if( sliceStats != null )
180          {
181          sliceStats.update( getParentStatus(), kind, taskReport, lastFetch );
182          updated++;
183          }
184        else
185          {
186          sliceStats = new HadoopSliceStats( id, getParentStatus(), kind, taskReport, lastFetch );
187          sliceStatsMap.put( id, sliceStats );
188          added++;
189          }
190
191        if( !sliceStats.getStatus().isFinished() )
192          fetchedAreFinished = false;
193        }
194
195      int total = sliceStatsMap.size();
196      String duration = formatDurationFromMillis( System.currentTimeMillis() - lastFetch );
197
198      logInfo( "added {}, updated: {} slices, with duration: {}, total fetched: {}", added, updated, duration, total );
199      }
200
201    allChildrenFinished = taskReports.length != 0 && fetchedAreFinished;
202    }
203
204  protected void addAttempt( TaskCompletionEvent event )
205    {
206    // the event could be a housekeeping task, which we are not tracking
207    String sliceID = sliceIDCache.get( event.getTaskAttemptId().getTaskID() );
208
209    if( sliceID == null )
210      return;
211
212    FlowSliceStats stats;
213
214    synchronized( sliceStatsMap )
215      {
216      stats = sliceStatsMap.get( sliceID );
217      }
218
219    if( stats == null )
220      return;
221
222    ( (HadoopSliceStats) stats ).addAttempt( event );
223    }
224
225  private String getSliceIDFor( TaskID taskID )
226    {
227    // using taskID instance as #toString is quite painful
228    String id = sliceIDCache.get( taskID );
229
230    if( id == null )
231      {
232      id = Util.createUniqueID();
233      sliceIDCache.put( taskID, id );
234      }
235
236    return id;
237    }
238  }