001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.io.IOException;
024import java.util.HashMap;
025import java.util.Map;
026
027import cascading.flow.FlowNode;
028import cascading.management.state.ClientState;
029import cascading.stats.BaseCachedNodeStats;
030import cascading.stats.FlowNodeStats;
031import cascading.stats.FlowSliceStats;
032import cascading.util.Util;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.mapred.JobClient;
035import org.apache.hadoop.mapred.RunningJob;
036import org.apache.hadoop.mapred.TaskCompletionEvent;
037import org.apache.hadoop.mapred.TaskID;
038import org.apache.hadoop.mapred.TaskReport;
039
040import static cascading.util.Util.formatDurationFromMillis;
041
042/**
043 *
044 */
045public class HadoopNodeStats extends BaseCachedNodeStats<Configuration, FlowNodeStats, Map<String, Map<String, Long>>>
046  {
047  private Map<TaskID, String> sliceIDCache = new HashMap<TaskID, String>( 4999 ); // caching for ids
048
049  private HadoopStepStats parentStepStats;
050  private HadoopSliceStats.Kind kind;
051
052  /**
053   * Constructor CascadingStats creates a new CascadingStats instance.
054   *
055   * @param parentStepStats
056   * @param configuration
057   * @param kind
058   * @param flowNode
059   * @param clientState
060   */
061  protected HadoopNodeStats( final HadoopStepStats parentStepStats, Configuration configuration, HadoopSliceStats.Kind kind, FlowNode flowNode, ClientState clientState )
062    {
063    super( flowNode, clientState );
064    this.parentStepStats = parentStepStats;
065    this.kind = kind;
066
067    this.counterCache = new HadoopNodeCounterCache( this, configuration );
068    }
069
070  @Override
071  public String getKind()
072    {
073    if( kind == null )
074      return null;
075
076    return kind.name();
077    }
078
079  private Status getParentStatus()
080    {
081    return parentStepStats.getStatus();
082    }
083
084  @Override
085  protected boolean captureChildDetailInternal()
086    {
087    if( allChildrenFinished )
088      return true;
089
090    JobClient jobClient = parentStepStats.getJobClient();
091    RunningJob runningJob = parentStepStats.getJobStatusClient();
092
093    if( jobClient == null || runningJob == null )
094      return false;
095
096    try
097      {
098      TaskReport[] taskReports; // todo: use Job task reports
099
100      if( kind == HadoopSliceStats.Kind.MAPPER )
101        taskReports = jobClient.getMapTaskReports( runningJob.getID() );
102      else
103        taskReports = jobClient.getReduceTaskReports( runningJob.getID() );
104
105      if( taskReports.length == 0 )
106        return false;
107
108      addTaskStats( taskReports, false );
109
110      return true;
111      }
112    catch( IOException exception )
113      {
114      logWarn( "unable to retrieve slice stats via task reports", exception );
115      }
116
117    return false;
118    }
119
120  protected void addTaskStats( TaskReport[] taskReports, boolean skipLast )
121    {
122    logInfo( "retrieved task reports: {}", taskReports.length );
123
124    long lastFetch = System.currentTimeMillis();
125    boolean fetchedAreFinished = true;
126
127    synchronized( sliceStatsMap )
128      {
129      int added = 0;
130      int updated = 0;
131
132      for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ )
133        {
134        TaskReport taskReport = taskReports[ i ];
135
136        if( taskReport == null )
137          {
138          logWarn( "found empty task report" );
139          continue;
140          }
141
142        String id = getSliceIDFor( taskReport.getTaskID() );
143        HadoopSliceStats sliceStats = new HadoopSliceStats( id, getParentStatus(), kind, taskReport, lastFetch );
144
145        if( sliceStatsMap.put( id, sliceStats ) != null )
146          updated++;
147        else
148          added++;
149
150        if( !sliceStats.getStatus().isFinished() )
151          fetchedAreFinished = false;
152        }
153
154      int total = sliceStatsMap.size();
155      String duration = formatDurationFromMillis( System.currentTimeMillis() - lastFetch );
156
157      logInfo( "added {}, updated: {} slices, with duration: {}, total fetched: {}", added, updated, duration, total );
158      }
159
160    allChildrenFinished = taskReports.length != 0 && fetchedAreFinished;
161    }
162
163  protected void addAttempt( TaskCompletionEvent event )
164    {
165    // the event could be a housekeeping task, which we are not tracking
166    String sliceID = sliceIDCache.get( event.getTaskAttemptId().getTaskID() );
167
168    if( sliceID == null )
169      return;
170
171    FlowSliceStats stats;
172
173    synchronized( sliceStatsMap )
174      {
175      stats = sliceStatsMap.get( sliceID );
176      }
177
178    if( stats == null )
179      return;
180
181    ( (HadoopSliceStats) stats ).addAttempt( event );
182    }
183
184  private String getSliceIDFor( TaskID taskID )
185    {
186    // using taskID instance as #toString is quite painful
187    String id = sliceIDCache.get( taskID );
188
189    if( id == null )
190      {
191      id = Util.createUniqueID();
192      sliceIDCache.put( taskID, id );
193      }
194
195    return id;
196    }
197  }