001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.planner; 022 023import java.io.IOException; 024 025import cascading.flow.FlowException; 026import cascading.flow.hadoop.HadoopFlowStep; 027import cascading.flow.planner.BaseFlowStep; 028import cascading.flow.planner.FlowStepJob; 029import cascading.management.state.ClientState; 030import cascading.stats.FlowNodeStats; 031import cascading.stats.FlowStepStats; 032import cascading.stats.hadoop.HadoopStepStats; 033import org.apache.hadoop.mapred.JobClient; 034import org.apache.hadoop.mapred.JobConf; 035import org.apache.hadoop.mapred.JobStatus; 036import org.apache.hadoop.mapred.RunningJob; 037import org.apache.hadoop.mapred.TaskCompletionEvent; 038 039import static cascading.flow.FlowProps.JOB_POLLING_INTERVAL; 040import static cascading.stats.CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION; 041import static cascading.stats.CascadingStats.STATS_STORE_INTERVAL; 042 043/** 044 * 045 */ 046public class HadoopFlowStepJob extends FlowStepJob<JobConf> 047 { 048 /** static field to capture errors in hadoop local mode */ 049 private static Throwable localError; 050 /** Field jobClient */ 051 protected JobClient jobClient; 052 /** Field runningJob */ 053 protected RunningJob runningJob; 054 055 private static long getStoreInterval( JobConf jobConf ) 056 { 057 return jobConf.getLong( STATS_STORE_INTERVAL, 60 * 1000 ); 058 } 059 060 private static long getChildDetailsBlockingDuration( JobConf jobConf ) 061 { 062 return jobConf.getLong( STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION, 60 * 1000 ); 063 } 064 065 public static long getJobPollingInterval( JobConf jobConf ) 066 { 067 return jobConf.getLong( JOB_POLLING_INTERVAL, 5000 ); 068 } 069 070 public HadoopFlowStepJob( ClientState clientState, BaseFlowStep<JobConf> flowStep, JobConf currentConf ) 071 { 072 super( clientState, currentConf, flowStep, getJobPollingInterval( currentConf ), getStoreInterval( currentConf ), getChildDetailsBlockingDuration( currentConf ) ); 073 074 if( flowStep.isDebugEnabled() ) 075 flowStep.logDebug( "using polling interval: " + pollingInterval ); 076 } 077 078 @Override 079 protected FlowStepStats createStepStats( ClientState clientState ) 080 { 081 return new HadoopStepStats( flowStep, clientState ) 082 { 083 @Override 084 public JobClient getJobClient() 085 { 086 return jobClient; 087 } 088 089 @Override 090 public RunningJob getJobStatusClient() 091 { 092 return runningJob; 093 } 094 }; 095 } 096 097 protected void internalBlockOnStop() throws IOException 098 { 099 if( runningJob != null && !runningJob.isComplete() ) 100 runningJob.killJob(); 101 } 102 103 protected void internalNonBlockingStart() throws IOException 104 { 105 jobClient = new JobClient( jobConfiguration ); 106 runningJob = internalNonBlockingSubmit(); 107 108 flowStep.logInfo( "submitted hadoop job: " + runningJob.getID() ); 109 110 if( runningJob.getTrackingURL() != null ) 111 flowStep.logInfo( "tracking url: " + runningJob.getTrackingURL() ); 112 } 113 114 protected RunningJob internalNonBlockingSubmit() throws IOException 115 { 116 return jobClient.submitJob( jobConfiguration ); 117 } 118 119 @Override 120 protected void updateNodeStatus( FlowNodeStats flowNodeStats ) 121 { 122 try 123 { 124 if( runningJob == null || flowNodeStats.isFinished() ) 125 return; 126 127 boolean isMapper = flowNodeStats.getOrdinal() == 0; 128 Integer jobState = getJobStateSafe(); 129 130 if( jobState == null ) // if call throws an NPE internally 131 return; 132 133 if( JobStatus.FAILED == jobState ) 134 { 135 flowNodeStats.markFailed(); 136 return; 137 } 138 139 if( JobStatus.KILLED == jobState ) 140 { 141 flowNodeStats.markStopped(); 142 return; 143 } 144 145 float progress; 146 147 if( isMapper ) 148 progress = runningJob.mapProgress(); 149 else 150 progress = runningJob.reduceProgress(); 151 152 if( progress == 0.0F ) // not yet running, is only started 153 return; 154 155 if( progress != 1.0F ) 156 { 157 flowNodeStats.markRunning(); 158 return; 159 } 160 161 flowNodeStats.markRunning(); 162 163 if( isMapper && runningJob.reduceProgress() > 0.0F ) 164 { 165 flowNodeStats.markSuccessful(); 166 return; 167 } 168 169 if( JobStatus.SUCCEEDED == jobState ) 170 flowNodeStats.markSuccessful(); 171 } 172 catch( IOException exception ) 173 { 174 flowStep.logError( "failed setting node status", throwable ); 175 } 176 } 177 178 private Integer getJobStateSafe() throws IOException 179 { 180 try 181 { 182 return runningJob.getJobState(); 183 } 184 catch( NullPointerException exception ) // this happens 185 { 186 return null; 187 } 188 } 189 190 @Override 191 public boolean isSuccessful() 192 { 193 try 194 { 195 return super.isSuccessful(); 196 } 197 catch( NullPointerException exception ) 198 { 199 throw new FlowException( "Hadoop is not keeping a large enough job history, please increase the \'mapred.jobtracker.completeuserjobs.maximum\' property", exception ); 200 } 201 } 202 203 protected boolean internalNonBlockingIsSuccessful() throws IOException 204 { 205 return runningJob != null && runningJob.isSuccessful(); 206 } 207 208 @Override 209 protected boolean isRemoteExecution() 210 { 211 return !( (HadoopFlowStep) flowStep ).isHadoopLocalMode( getConfig() ); 212 } 213 214 @Override 215 protected Throwable getThrowable() 216 { 217 return localError; 218 } 219 220 protected String internalJobId() 221 { 222 return runningJob.getJobID(); 223 } 224 225 protected boolean internalNonBlockingIsComplete() throws IOException 226 { 227 return runningJob.isComplete(); 228 } 229 230 protected void dumpDebugInfo() 231 { 232 try 233 { 234 if( runningJob == null ) 235 return; 236 237 Integer jobState = getJobStateSafe(); // if call throws an NPE internally 238 239 if( jobState == null ) 240 return; 241 242 flowStep.logWarn( "hadoop job " + runningJob.getID() + " state at " + JobStatus.getJobRunState( jobState ) ); 243 flowStep.logWarn( "failure info: " + runningJob.getFailureInfo() ); 244 245 TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( 0 ); 246 flowStep.logWarn( "task completion events identify failed tasks" ); 247 flowStep.logWarn( "task completion events count: " + events.length ); 248 249 for( TaskCompletionEvent event : events ) 250 flowStep.logWarn( "event = " + event ); 251 } 252 catch( Throwable throwable ) 253 { 254 flowStep.logError( "failed reading task completion events", throwable ); 255 } 256 } 257 258 protected boolean internalIsStartedRunning() 259 { 260 if( runningJob == null ) 261 return false; 262 263 try 264 { 265 return runningJob.mapProgress() > 0; 266 } 267 catch( IOException exception ) 268 { 269 flowStep.logWarn( "unable to test for map progress", exception ); 270 return false; 271 } 272 } 273 274 /** 275 * Internal method to report errors that happen on hadoop local mode. Hadoops local 276 * JobRunner does not give access to TaskReports, but we want to be able to capture 277 * the exception and not just print it to stderr. FlowMapper and FlowReducer use this method. 278 * 279 * @param throwable the throwable to be reported. 280 */ 281 public static void reportLocalError( Throwable throwable ) 282 { 283 localError = throwable; 284 } 285 }