001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.stats.hadoop;
022    
023    import java.io.IOException;
024    import java.util.HashMap;
025    import java.util.Map;
026    
027    import cascading.CascadingException;
028    import cascading.flow.FlowStep;
029    import cascading.management.state.ClientState;
030    import cascading.util.Util;
031    import org.apache.hadoop.mapred.JobConf;
032    import org.apache.hadoop.mapred.RunningJob;
033    import org.apache.hadoop.mapreduce.Job;
034    import org.apache.hadoop.mapreduce.TaskCompletionEvent;
035    import org.apache.hadoop.mapreduce.TaskID;
036    import org.apache.hadoop.mapreduce.TaskReport;
037    import org.apache.hadoop.mapreduce.TaskType;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     * HadoopStepStats is a hadoop2 (YARN) specific sub-class for fetching TaskReports in an efficient way.
043     */
044    public abstract class HadoopStepStats extends BaseHadoopStepStats
045      {
046      /** logger. */
047      private static final Logger LOG = LoggerFactory.getLogger( HadoopStepStats.class );
048    
049      private Map<TaskID, String> idCache = new HashMap<TaskID, String>( 4999 ); // nearest prime, caching for ids
050    
051      protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState )
052        {
053        super( flowStep, clientState );
054        }
055    
056      @Override
057      protected void addTaskStats( Map<String, HadoopSliceStats> taskStats, HadoopSliceStats.Kind kind, boolean skipLast ) throws IOException
058        {
059        TaskReport[] taskReports = retrieveTaskReports( kind );
060    
061        for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ )
062          {
063          TaskReport taskReport = taskReports[ i ];
064    
065          if( taskReport == null )
066            {
067            LOG.warn( "found empty task report" );
068            continue;
069            }
070    
071          String id = getIDFor( taskReport.getTaskID() );
072          taskStats.put( id, new HadoopSliceStats( id, getStatus(), kind, stepHasReducers(), taskReport ) );
073    
074          incrementKind( kind );
075          }
076        }
077    
078      /**
079       * Retrieves the TaskReports via the mapreduce API.
080       *
081       * @param kind The kind of TaskReport to retrieve.
082       * @return An array of TaskReports, but never <code>nul</code>.
083       * @throws IOException
084       */
085      private TaskReport[] retrieveTaskReports( HadoopSliceStats.Kind kind ) throws IOException
086        {
087        Job job = findJob();
088    
089        if( job == null )
090          return new TaskReport[ 0 ];
091    
092        try
093          {
094          switch( kind )
095            {
096            case MAPPER:
097              return job.getTaskReports( TaskType.MAP );
098            case REDUCER:
099              return job.getTaskReports( TaskType.REDUCE );
100            case SETUP:
101              return job.getTaskReports( TaskType.JOB_SETUP );
102            case CLEANUP:
103              return job.getTaskReports( TaskType.JOB_CLEANUP );
104            default:
105              return new TaskReport[ 0 ];
106            }
107          }
108        catch( InterruptedException exception )
109          {
110          throw new CascadingException( exception );
111          }
112        }
113    
114      /**
115       * Method extracts the {@link org.apache.hadoop.mapreduce.Job} from the RunningJob via reflection.
116       *
117       * @return A {@link Job} instance.
118       */
119      private Job findJob()
120        {
121        // fetches the TaskReports via the mapreduce API to avoid memory pressure due to large array copies within the
122        // mapred api in Hadoop 2.x.
123        RunningJob runningJob = getRunningJob();
124    
125        if( runningJob == null )
126          return null;
127    
128        Job job = Util.returnInstanceFieldIfExistsSafe( runningJob, "job" );
129    
130        if( job == null )
131          {
132          LOG.warn( "unable to get underlying org.apache.hadoop.mapreduce.Job from org.apache.hadoop.mapred.RunningJob, task level task counter will be unavailable" );
133          return null;
134          }
135    
136        return job;
137        }
138    
139      @Override
140      protected void addAttemptsToTaskStats( Map<String, HadoopSliceStats> taskStats, boolean captureAttempts )
141        {
142        Job job = findJob();
143    
144        if( job == null )
145          return;
146    
147        int count = 0;
148    
149        while( captureAttempts )
150          {
151          try
152            {
153            TaskCompletionEvent[] events = job.getTaskCompletionEvents( count );
154    
155            if( events.length == 0 )
156              break;
157    
158            for( TaskCompletionEvent event : events )
159              {
160              if( event == null )
161                {
162                LOG.warn( "found empty completion event" );
163                continue;
164                }
165    
166              // this will return a housekeeping task, which we are not tracking
167              HadoopSliceStats stats = taskStats.get( getIDFor( event.getTaskAttemptId().getTaskID() ) );
168    
169              if( stats != null )
170                stats.addAttempt( event );
171              }
172    
173            count += events.length;
174            }
175          catch( IOException exception )
176            {
177            throw new CascadingException( exception );
178            }
179          }
180        }
181    
182      private String getIDFor( TaskID taskID )
183        {
184        // using taskID instance as #toString is quite painful
185        String id = idCache.get( taskID );
186    
187        if( id == null )
188          {
189          id = Util.createUniqueID();
190          idCache.put( taskID, id );
191          }
192    
193        return id;
194        }
195      }