001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop.planner; 022 023 import java.io.IOException; 024 025 import cascading.flow.hadoop.HadoopFlowStep; 026 import cascading.flow.planner.BaseFlowStep; 027 import cascading.flow.planner.FlowStepJob; 028 import cascading.management.state.ClientState; 029 import cascading.stats.FlowStepStats; 030 import cascading.stats.hadoop.BaseHadoopStepStats; 031 import cascading.stats.hadoop.HadoopStepStats; 032 import org.apache.hadoop.mapred.JobClient; 033 import org.apache.hadoop.mapred.JobConf; 034 import org.apache.hadoop.mapred.JobStatus; 035 import org.apache.hadoop.mapred.RunningJob; 036 import org.apache.hadoop.mapred.TaskCompletionEvent; 037 038 import static cascading.flow.FlowProps.JOB_POLLING_INTERVAL; 039 import static cascading.stats.CascadingStats.STATS_STORE_INTERVAL; 040 041 /** 042 * 043 */ 044 public class HadoopFlowStepJob extends FlowStepJob<JobConf> 045 { 046 /** static field to capture errors in hadoop local mode */ 047 private static Throwable localError; 048 /** Field currentConf */ 049 private final JobConf currentConf; 050 /** Field jobClient */ 051 private JobClient jobClient; 052 /** Field runningJob */ 053 private RunningJob runningJob; 054 055 private static long getStoreInterval( JobConf jobConf ) 056 { 057 return jobConf.getLong( STATS_STORE_INTERVAL, 60 * 1000 ); 058 } 059 060 public static long getJobPollingInterval( JobConf jobConf ) 061 { 062 return jobConf.getLong( JOB_POLLING_INTERVAL, 5000 ); 063 } 064 065 public HadoopFlowStepJob( ClientState clientState, BaseFlowStep flowStep, JobConf currentConf ) 066 { 067 super( clientState, flowStep, getJobPollingInterval( currentConf ), getStoreInterval( currentConf ) ); 068 this.currentConf = currentConf; 069 070 if( flowStep.isDebugEnabled() ) 071 flowStep.logDebug( "using polling interval: " + pollingInterval ); 072 } 073 074 @Override 075 public JobConf getConfig() 076 { 077 return currentConf; 078 } 079 080 @Override 081 protected FlowStepStats createStepStats( ClientState clientState ) 082 { 083 return new HadoopStepStats( flowStep, clientState ) 084 { 085 @Override 086 public JobClient getJobClient() 087 { 088 return jobClient; 089 } 090 091 @Override 092 public RunningJob getRunningJob() 093 { 094 return runningJob; 095 } 096 }; 097 } 098 099 protected void internalBlockOnStop() throws IOException 100 { 101 if( runningJob != null && !runningJob.isComplete() ) 102 runningJob.killJob(); 103 } 104 105 protected void internalNonBlockingStart() throws IOException 106 { 107 jobClient = new JobClient( currentConf ); 108 runningJob = jobClient.submitJob( currentConf ); 109 110 flowStep.logInfo( "submitted hadoop job: " + runningJob.getID() ); 111 112 if( runningJob.getTrackingURL() != null ) 113 flowStep.logInfo( "tracking url: " + runningJob.getTrackingURL() ); 114 } 115 116 protected boolean internalNonBlockingIsSuccessful() throws IOException 117 { 118 return runningJob != null && runningJob.isSuccessful(); 119 } 120 121 @Override 122 protected boolean isRemoteExecution() 123 { 124 return !( (HadoopFlowStep) flowStep ).isHadoopLocalMode( getConfig() ); 125 } 126 127 @Override 128 protected Throwable getThrowable() 129 { 130 return localError; 131 } 132 133 protected String internalJobId() 134 { 135 return runningJob.getJobID(); 136 } 137 138 protected boolean internalNonBlockingIsComplete() throws IOException 139 { 140 return runningJob.isComplete(); 141 } 142 143 protected void dumpDebugInfo() 144 { 145 try 146 { 147 if( runningJob == null ) 148 return; 149 150 flowStep.logWarn( "hadoop job " + runningJob.getID() + " state at " + JobStatus.getJobRunState( runningJob.getJobState() ) ); 151 flowStep.logWarn( "failure info: " + runningJob.getFailureInfo() ); 152 153 TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( 0 ); 154 flowStep.logWarn( "task completion events identify failed tasks" ); 155 flowStep.logWarn( "task completion events count: " + events.length ); 156 157 for( TaskCompletionEvent event : events ) 158 flowStep.logWarn( "event = " + event ); 159 } 160 catch( IOException exception ) 161 { 162 flowStep.logError( "failed reading task completion events", exception ); 163 } 164 } 165 166 protected boolean internalIsStarted() 167 { 168 if( runningJob == null ) 169 return false; 170 171 try 172 { 173 return runningJob.mapProgress() > 0; 174 } 175 catch( IOException exception ) 176 { 177 flowStep.logWarn( "unable to test for map progress", exception ); 178 return false; 179 } 180 } 181 182 /** 183 * Internal method to report errors that happen on hadoop local mode. Hadoops local 184 * JobRunner does not give access to TaskReports, but we want to be able to capture 185 * the exception and not just print it to stderr. FlowMapper and FlowReducer use this method. 186 * 187 * @param throwable the throwable to be reported. 188 */ 189 public static void reportLocalError( Throwable throwable ) 190 { 191 localError = throwable; 192 } 193 }