001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.io.IOException; 024 025import cascading.flow.FlowException; 026import cascading.flow.FlowNode; 027import cascading.flow.FlowStep; 028import cascading.flow.planner.BaseFlowStep; 029import cascading.management.state.ClientState; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.mapred.Counters; 032import org.apache.hadoop.mapred.JobClient; 033import org.apache.hadoop.mapred.JobConf; 034import org.apache.hadoop.mapred.RunningJob; 035import org.apache.hadoop.mapred.TaskCompletionEvent; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** Class HadoopStepStats provides Hadoop specific statistics and methods to underlying Hadoop facilities. */ 040public abstract class HadoopStepStats extends BaseHadoopStepStats<RunningJob, Counters> 041 { 042 private static final Logger LOG = LoggerFactory.getLogger( HadoopStepStats.class ); 043 044 private HadoopNodeStats mapperNodeStats; 045 private HadoopNodeStats reducerNodeStats; 046 047 protected HadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState ) 048 { 049 super( flowStep, clientState ); 050 051 BaseFlowStep<JobConf> step = (BaseFlowStep<JobConf>) flowStep; 052 053 // don't rely on the iterator topological sort to identify mapper or reducer 054 for( FlowNode current : step.getFlowNodeGraph().vertexSet() ) 055 { 056 if( step.getFlowNodeGraph().inDegreeOf( current ) == 0 ) 057 { 058 if( mapperNodeStats != null ) 059 throw new IllegalStateException( "mapper node already found" ); 060 061 mapperNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.MAPPER, current, clientState ); 062 addNodeStats( mapperNodeStats ); 063 } 064 else 065 { 066 if( reducerNodeStats != null ) 067 throw new IllegalStateException( "reducer node already found" ); 068 069 reducerNodeStats = new HadoopNodeStats( this, getConfig(), HadoopSliceStats.Kind.REDUCER, current, clientState ); 070 addNodeStats( reducerNodeStats ); 071 } 072 } 073 074 if( mapperNodeStats == null ) 075 throw new IllegalStateException( "mapper node not found" ); 076 077 counterCache = new HadoopStepCounterCache( this, (Configuration) getConfig() ) 078 { 079 @Override 080 protected RunningJob getJobStatusClient() 081 { 082 return HadoopStepStats.this.getJobStatusClient(); 083 } 084 }; 085 } 086 087 private Configuration getConfig() 088 { 089 return (Configuration) this.getFlowStep().getConfig(); 090 } 091 092 /** 093 * Method getNumMapTasks returns the numMapTasks from the Hadoop job file. 094 * 095 * @return the numMapTasks (type int) of this HadoopStepStats object. 096 */ 097 public int getNumMapTasks() 098 { 099 return mapperNodeStats.getChildren().size(); 100 } 101 102 /** 103 * Method getNumReduceTasks returns the numReducerTasks from the Hadoop job file. 104 * 105 * @return the numReducerTasks (type int) of this HadoopStepStats object. 106 */ 107 public int getNumReduceTasks() 108 { 109 return reducerNodeStats == null ? 0 : reducerNodeStats.getChildren().size(); 110 } 111 112 /** 113 * Method getProcessStepID returns the Hadoop running job JobID. 114 * 115 * @return the jobID (type String) of this HadoopStepStats object. 116 */ 117 @Override 118 public String getProcessStepID() 119 { 120 if( getJobStatusClient() == null ) 121 return null; 122 123 return getJobStatusClient().getID().toString(); 124 } 125 126 /** 127 * Method getJobClient returns the Hadoop {@link JobClient} managing this Hadoop job. 128 * 129 * @return the jobClient (type JobClient) of this HadoopStepStats object. 130 */ 131 public abstract JobClient getJobClient(); 132 133 /** 134 * Returns the underlying Map tasks progress percentage. 135 * <p/> 136 * This method is experimental. 137 * 138 * @return float 139 */ 140 public float getMapProgress() 141 { 142 RunningJob runningJob = getJobStatusClient(); 143 144 if( runningJob == null ) 145 return 0; 146 147 try 148 { 149 return runningJob.mapProgress(); 150 } 151 catch( IOException exception ) 152 { 153 throw new FlowException( "unable to get progress" ); 154 } 155 } 156 157 /** 158 * Returns the underlying Reduce tasks progress percentage. 159 * <p/> 160 * This method is experimental. 161 * 162 * @return float 163 */ 164 public float getReduceProgress() 165 { 166 RunningJob runningJob = getJobStatusClient(); 167 168 if( runningJob == null ) 169 return 0; 170 171 try 172 { 173 return runningJob.reduceProgress(); 174 } 175 catch( IOException exception ) 176 { 177 throw new FlowException( "unable to get progress" ); 178 } 179 } 180 181 public String getStatusURL() 182 { 183 RunningJob runningJob = getJobStatusClient(); 184 185 if( runningJob == null ) 186 return null; 187 188 return runningJob.getTrackingURL(); 189 } 190 191 private boolean stepHasReducers() 192 { 193 return getFlowStep().getNumFlowNodes() > 1; 194 } 195 196 /** Method captureDetail captures statistics task details and completion events. */ 197 @Override 198 public synchronized void captureDetail( Type depth ) 199 { 200 if( !getType().isChild( depth ) || !isDetailStale() ) 201 return; 202 203 JobClient jobClient = getJobClient(); 204 RunningJob runningJob = getJobStatusClient(); 205 206 if( jobClient == null || runningJob == null ) 207 return; 208 209 try 210 { 211 mapperNodeStats.captureDetail( depth ); 212 213 if( reducerNodeStats != null ) 214 reducerNodeStats.captureDetail( depth ); 215 216 int count = 0; 217 218 while( depth == Type.ATTEMPT ) 219 { 220 TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( count ); 221 222 if( events.length == 0 ) 223 break; 224 225 addAttemptsToTaskStats( events ); 226 count += events.length; 227 } 228 229 markDetailCaptured(); 230 } 231 catch( IOException exception ) 232 { 233 LOG.warn( "unable to get task stats", exception ); 234 } 235 } 236 237 private void addAttemptsToTaskStats( TaskCompletionEvent[] events ) 238 { 239 for( TaskCompletionEvent event : events ) 240 { 241 if( event == null ) 242 { 243 LOG.warn( "found empty completion event" ); 244 continue; 245 } 246 247 if( event.isMapTask() ) 248 mapperNodeStats.addAttempt( event ); 249 else 250 reducerNodeStats.addAttempt( event ); 251 } 252 } 253 }