001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.stats.hadoop; 022 023 import java.io.IOException; 024 import java.util.HashMap; 025 import java.util.Map; 026 027 import cascading.CascadingException; 028 import cascading.flow.FlowStep; 029 import cascading.management.state.ClientState; 030 import cascading.util.Util; 031 import org.apache.hadoop.mapred.JobConf; 032 import org.apache.hadoop.mapred.RunningJob; 033 import org.apache.hadoop.mapreduce.Job; 034 import org.apache.hadoop.mapreduce.TaskCompletionEvent; 035 import org.apache.hadoop.mapreduce.TaskID; 036 import org.apache.hadoop.mapreduce.TaskReport; 037 import org.apache.hadoop.mapreduce.TaskType; 038 import org.slf4j.Logger; 039 import org.slf4j.LoggerFactory; 040 041 /** 042 * HadoopStepStats is a hadoop2 (YARN) specific sub-class for fetching TaskReports in an efficient way. 043 */ 044 public abstract class HadoopStepStats extends BaseHadoopStepStats 045 { 046 /** logger. */ 047 private static final Logger LOG = LoggerFactory.getLogger( HadoopStepStats.class ); 048 049 private Map<TaskID, String> idCache = new HashMap<TaskID, String>( 4999 ); // nearest prime, caching for ids 050 051 protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState ) 052 { 053 super( flowStep, clientState ); 054 } 055 056 @Override 057 protected void addTaskStats( Map<String, HadoopSliceStats> taskStats, HadoopSliceStats.Kind kind, boolean skipLast ) throws IOException 058 { 059 TaskReport[] taskReports = retrieveTaskReports( kind ); 060 061 for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ ) 062 { 063 TaskReport taskReport = taskReports[ i ]; 064 065 if( taskReport == null ) 066 { 067 LOG.warn( "found empty task report" ); 068 continue; 069 } 070 071 String id = getIDFor( taskReport.getTaskID() ); 072 taskStats.put( id, new HadoopSliceStats( id, getStatus(), kind, stepHasReducers(), taskReport ) ); 073 074 incrementKind( kind ); 075 } 076 } 077 078 /** 079 * Retrieves the TaskReports via the mapreduce API. 080 * 081 * @param kind The kind of TaskReport to retrieve. 082 * @return An array of TaskReports, but never <code>nul</code>. 083 * @throws IOException 084 */ 085 private TaskReport[] retrieveTaskReports( HadoopSliceStats.Kind kind ) throws IOException 086 { 087 Job job = findJob(); 088 089 if( job == null ) 090 return new TaskReport[ 0 ]; 091 092 try 093 { 094 switch( kind ) 095 { 096 case MAPPER: 097 return job.getTaskReports( TaskType.MAP ); 098 case REDUCER: 099 return job.getTaskReports( TaskType.REDUCE ); 100 case SETUP: 101 return job.getTaskReports( TaskType.JOB_SETUP ); 102 case CLEANUP: 103 return job.getTaskReports( TaskType.JOB_CLEANUP ); 104 default: 105 return new TaskReport[ 0 ]; 106 } 107 } 108 catch( InterruptedException exception ) 109 { 110 throw new CascadingException( exception ); 111 } 112 } 113 114 /** 115 * Method extracts the {@link org.apache.hadoop.mapreduce.Job} from the RunningJob via reflection. 116 * 117 * @return A {@link Job} instance. 118 */ 119 private Job findJob() 120 { 121 // fetches the TaskReports via the mapreduce API to avoid memory pressure due to large array copies within the 122 // mapred api in Hadoop 2.x. 123 RunningJob runningJob = getRunningJob(); 124 125 if( runningJob == null ) 126 return null; 127 128 Job job = Util.returnInstanceFieldIfExistsSafe( runningJob, "job" ); 129 130 if( job == null ) 131 { 132 LOG.warn( "unable to get underlying org.apache.hadoop.mapreduce.Job from org.apache.hadoop.mapred.RunningJob, task level task counter will be unavailable" ); 133 return null; 134 } 135 136 return job; 137 } 138 139 @Override 140 protected void addAttemptsToTaskStats( Map<String, HadoopSliceStats> taskStats, boolean captureAttempts ) 141 { 142 Job job = findJob(); 143 144 if( job == null ) 145 return; 146 147 int count = 0; 148 149 while( captureAttempts ) 150 { 151 try 152 { 153 TaskCompletionEvent[] events = job.getTaskCompletionEvents( count ); 154 155 if( events.length == 0 ) 156 break; 157 158 for( TaskCompletionEvent event : events ) 159 { 160 if( event == null ) 161 { 162 LOG.warn( "found empty completion event" ); 163 continue; 164 } 165 166 // this will return a housekeeping task, which we are not tracking 167 HadoopSliceStats stats = taskStats.get( getIDFor( event.getTaskAttemptId().getTaskID() ) ); 168 169 if( stats != null ) 170 stats.addAttempt( event ); 171 } 172 173 count += events.length; 174 } 175 catch( IOException exception ) 176 { 177 throw new CascadingException( exception ); 178 } 179 } 180 } 181 182 private String getIDFor( TaskID taskID ) 183 { 184 // using taskID instance as #toString is quite painful 185 String id = idCache.get( taskID ); 186 187 if( id == null ) 188 { 189 id = Util.createUniqueID(); 190 idCache.put( taskID, id ); 191 } 192 193 return id; 194 } 195 }