001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.io.IOException;
024import java.util.HashMap;
025import java.util.Map;
026
027import cascading.flow.FlowNode;
028import cascading.management.state.ClientState;
029import cascading.stats.FlowNodeStats;
030import cascading.stats.FlowSliceStats;
031import cascading.util.Util;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.mapred.JobClient;
034import org.apache.hadoop.mapred.RunningJob;
035import org.apache.hadoop.mapred.TaskCompletionEvent;
036import org.apache.hadoop.mapred.TaskID;
037import org.apache.hadoop.mapred.TaskReport;
038
039import static cascading.util.Util.formatDurationFromMillis;
040
041/**
042 *
043 */
044public class HadoopNodeStats extends BaseHadoopNodeStats<FlowNodeStats, Map<String, Map<String, Long>>>
045  {
046  private Map<TaskID, String> sliceIDCache = new HashMap<TaskID, String>( 4999 ); // caching for ids
047
048  private HadoopStepStats parentStepStats;
049  private HadoopSliceStats.Kind kind;
050
051  /**
052   * Constructor CascadingStats creates a new CascadingStats instance.
053   *
054   * @param parentStepStats
055   * @param configuration
056   * @param kind
057   * @param flowNode
058   * @param clientState
059   */
060  protected HadoopNodeStats( final HadoopStepStats parentStepStats, Configuration configuration, HadoopSliceStats.Kind kind, FlowNode flowNode, ClientState clientState )
061    {
062    super( flowNode, clientState );
063    this.parentStepStats = parentStepStats;
064    this.kind = kind;
065
066    this.counterCache = new HadoopNodeCounterCache( this, configuration );
067    }
068
069  @Override
070  public String getKind()
071    {
072    if( kind == null )
073      return null;
074
075    return kind.name();
076    }
077
078  private Status getParentStatus()
079    {
080    return parentStepStats.getStatus();
081    }
082
083  @Override
084  protected boolean captureChildDetailInternal()
085    {
086    if( allChildrenFinished )
087      return true;
088
089    JobClient jobClient = parentStepStats.getJobClient();
090    RunningJob runningJob = parentStepStats.getJobStatusClient();
091
092    if( jobClient == null || runningJob == null )
093      return false;
094
095    try
096      {
097      TaskReport[] taskReports; // todo: use Job task reports
098
099      if( kind == HadoopSliceStats.Kind.MAPPER )
100        taskReports = jobClient.getMapTaskReports( runningJob.getID() );
101      else
102        taskReports = jobClient.getReduceTaskReports( runningJob.getID() );
103
104      if( taskReports.length == 0 )
105        return false;
106
107      addTaskStats( taskReports, false );
108
109      return true;
110      }
111    catch( IOException exception )
112      {
113      logWarn( "unable to retrieve slice stats via task reports", exception );
114      }
115
116    return false;
117    }
118
119  protected void addTaskStats( TaskReport[] taskReports, boolean skipLast )
120    {
121    logInfo( "retrieved task reports: {}", taskReports.length );
122
123    long lastFetch = System.currentTimeMillis();
124    boolean fetchedAreFinished = true;
125
126    synchronized( sliceStatsMap )
127      {
128      int added = 0;
129      int updated = 0;
130
131      for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ )
132        {
133        TaskReport taskReport = taskReports[ i ];
134
135        if( taskReport == null )
136          {
137          logWarn( "found empty task report" );
138          continue;
139          }
140
141        String id = getSliceIDFor( taskReport.getTaskID() );
142        HadoopSliceStats sliceStats = new HadoopSliceStats( id, getParentStatus(), kind, taskReport, lastFetch );
143
144        if( sliceStatsMap.put( id, sliceStats ) != null )
145          updated++;
146        else
147          added++;
148
149        if( !sliceStats.getStatus().isFinished() )
150          fetchedAreFinished = false;
151        }
152
153      int total = sliceStatsMap.size();
154      String duration = formatDurationFromMillis( System.currentTimeMillis() - lastFetch );
155
156      logInfo( "added {}, updated: {} slices, with duration: {}, total fetched: {}", added, updated, duration, total );
157      }
158
159    allChildrenFinished = taskReports.length != 0 && fetchedAreFinished;
160    }
161
162  protected void addAttempt( TaskCompletionEvent event )
163    {
164    // the event could be a housekeeping task, which we are not tracking
165    String sliceID = sliceIDCache.get( event.getTaskAttemptId().getTaskID() );
166
167    if( sliceID == null )
168      return;
169
170    FlowSliceStats stats;
171
172    synchronized( sliceStatsMap )
173      {
174      stats = sliceStatsMap.get( sliceID );
175      }
176
177    if( stats == null )
178      return;
179
180    ( (HadoopSliceStats) stats ).addAttempt( event );
181    }
182
183  private String getSliceIDFor( TaskID taskID )
184    {
185    // using taskID instance as #toString is quite painful
186    String id = sliceIDCache.get( taskID );
187
188    if( id == null )
189      {
190      id = Util.createUniqueID();
191      sliceIDCache.put( taskID, id );
192      }
193
194    return id;
195    }
196  }