001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop.planner;
022    
023    import java.io.IOException;
024    
025    import cascading.flow.hadoop.HadoopFlowStep;
026    import cascading.flow.planner.BaseFlowStep;
027    import cascading.flow.planner.FlowStepJob;
028    import cascading.management.state.ClientState;
029    import cascading.stats.FlowStepStats;
030    import cascading.stats.hadoop.BaseHadoopStepStats;
031    import cascading.stats.hadoop.HadoopStepStats;
032    import org.apache.hadoop.mapred.JobClient;
033    import org.apache.hadoop.mapred.JobConf;
034    import org.apache.hadoop.mapred.JobStatus;
035    import org.apache.hadoop.mapred.RunningJob;
036    import org.apache.hadoop.mapred.TaskCompletionEvent;
037    
038    import static cascading.flow.FlowProps.JOB_POLLING_INTERVAL;
039    import static cascading.stats.CascadingStats.STATS_STORE_INTERVAL;
040    
041    /**
042     *
043     */
044    public class HadoopFlowStepJob extends FlowStepJob<JobConf>
045      {
046      /** static field to capture errors in hadoop local mode */
047      private static Throwable localError;
048      /** Field currentConf */
049      private final JobConf currentConf;
050      /** Field jobClient */
051      private JobClient jobClient;
052      /** Field runningJob */
053      private RunningJob runningJob;
054    
055      private static long getStoreInterval( JobConf jobConf )
056        {
057        return jobConf.getLong( STATS_STORE_INTERVAL, 60 * 1000 );
058        }
059    
060      public static long getJobPollingInterval( JobConf jobConf )
061        {
062        return jobConf.getLong( JOB_POLLING_INTERVAL, 5000 );
063        }
064    
065      public HadoopFlowStepJob( ClientState clientState, BaseFlowStep flowStep, JobConf currentConf )
066        {
067        super( clientState, flowStep, getJobPollingInterval( currentConf ), getStoreInterval( currentConf ) );
068        this.currentConf = currentConf;
069    
070        if( flowStep.isDebugEnabled() )
071          flowStep.logDebug( "using polling interval: " + pollingInterval );
072        }
073    
074      @Override
075      public JobConf getConfig()
076        {
077        return currentConf;
078        }
079    
080      @Override
081      protected FlowStepStats createStepStats( ClientState clientState )
082        {
083        return new HadoopStepStats( flowStep, clientState )
084        {
085        @Override
086        public JobClient getJobClient()
087          {
088          return jobClient;
089          }
090    
091        @Override
092        public RunningJob getRunningJob()
093          {
094          return runningJob;
095          }
096        };
097        }
098    
099      protected void internalBlockOnStop() throws IOException
100        {
101        if( runningJob != null && !runningJob.isComplete() )
102          runningJob.killJob();
103        }
104    
105      protected void internalNonBlockingStart() throws IOException
106        {
107        jobClient = new JobClient( currentConf );
108        runningJob = jobClient.submitJob( currentConf );
109    
110        flowStep.logInfo( "submitted hadoop job: " + runningJob.getID() );
111    
112        if( runningJob.getTrackingURL() != null )
113          flowStep.logInfo( "tracking url: " + runningJob.getTrackingURL() );
114        }
115    
116      protected boolean internalNonBlockingIsSuccessful() throws IOException
117        {
118        return runningJob != null && runningJob.isSuccessful();
119        }
120    
121      @Override
122      protected boolean isRemoteExecution()
123        {
124        return !( (HadoopFlowStep) flowStep ).isHadoopLocalMode( getConfig() );
125        }
126    
127      @Override
128      protected Throwable getThrowable()
129        {
130        return localError;
131        }
132    
133      protected String internalJobId()
134        {
135        return runningJob.getJobID();
136        }
137    
138      protected boolean internalNonBlockingIsComplete() throws IOException
139        {
140        return runningJob.isComplete();
141        }
142    
143      protected void dumpDebugInfo()
144        {
145        try
146          {
147          if( runningJob == null )
148            return;
149    
150          flowStep.logWarn( "hadoop job " + runningJob.getID() + " state at " + JobStatus.getJobRunState( runningJob.getJobState() ) );
151          flowStep.logWarn( "failure info: " + runningJob.getFailureInfo() );
152    
153          TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( 0 );
154          flowStep.logWarn( "task completion events identify failed tasks" );
155          flowStep.logWarn( "task completion events count: " + events.length );
156    
157          for( TaskCompletionEvent event : events )
158            flowStep.logWarn( "event = " + event );
159          }
160        catch( IOException exception )
161          {
162          flowStep.logError( "failed reading task completion events", exception );
163          }
164        }
165    
166      protected boolean internalIsStarted()
167        {
168        if( runningJob == null )
169          return false;
170    
171        try
172          {
173          return runningJob.mapProgress() > 0;
174          }
175        catch( IOException exception )
176          {
177          flowStep.logWarn( "unable to test for map progress", exception );
178          return false;
179          }
180        }
181    
182      /**
183       * Internal method to report errors that happen on hadoop local mode. Hadoops local
184       * JobRunner does not give access to TaskReports, but we want to be able to capture
185       * the exception and not just print it to stderr. FlowMapper and FlowReducer use this method.
186       *
187       * @param throwable the throwable to be reported.
188       */
189      public static void reportLocalError( Throwable throwable )
190        {
191        localError = throwable;
192        }
193      }