001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.planner;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.EnumSet;
026import java.util.List;
027import java.util.Set;
028import java.util.concurrent.Callable;
029import java.util.concurrent.TimeUnit;
030
031import cascading.CascadingException;
032import cascading.flow.hadoop.util.HadoopUtil;
033import cascading.flow.planner.BaseFlowStep;
034import cascading.flow.planner.FlowStepJob;
035import cascading.flow.tez.Hadoop2TezFlow;
036import cascading.flow.tez.Hadoop2TezFlowStep;
037import cascading.management.state.ClientState;
038import cascading.stats.FlowNodeStats;
039import cascading.stats.FlowStepStats;
040import cascading.stats.tez.TezStepStats;
041import cascading.stats.tez.util.TezStatsUtil;
042import cascading.util.Util;
043import org.apache.hadoop.conf.Configuration;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.mapreduce.security.TokenCache;
047import org.apache.hadoop.security.Credentials;
048import org.apache.hadoop.yarn.conf.YarnConfiguration;
049import org.apache.tez.client.TezClient;
050import org.apache.tez.client.TezClientUtils;
051import org.apache.tez.dag.api.DAG;
052import org.apache.tez.dag.api.TezConfiguration;
053import org.apache.tez.dag.api.TezException;
054import org.apache.tez.dag.api.client.DAGClient;
055import org.apache.tez.dag.api.client.DAGStatus;
056import org.apache.tez.dag.api.client.StatusGetOpts;
057import org.apache.tez.dag.api.client.VertexStatus;
058
059import static cascading.flow.FlowProps.JOB_POLLING_INTERVAL;
060import static cascading.stats.CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION;
061import static cascading.stats.CascadingStats.STATS_STORE_INTERVAL;
062
063/**
064 *
065 */
066public class Hadoop2TezFlowStepJob extends FlowStepJob<TezConfiguration>
067  {
068  private static final Set<StatusGetOpts> STATUS_GET_OPTS = EnumSet.of( StatusGetOpts.GET_COUNTERS );
069
070  private DAG dag;
071
072  private TezClient tezClient;
073  private DAGClient dagClient;
074
075  private String dagId;
076
077  private static long getStoreInterval( Configuration configuration )
078    {
079    return configuration.getLong( STATS_STORE_INTERVAL, 60 * 1000 );
080    }
081
082  private static long getChildDetailsBlockingDuration( Configuration configuration )
083    {
084    return configuration.getLong( STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION, 60 * 1000 );
085    }
086
087  public static long getJobPollingInterval( Configuration configuration )
088    {
089    return configuration.getLong( JOB_POLLING_INTERVAL, 5000 );
090    }
091
092  public Hadoop2TezFlowStepJob( ClientState clientState, BaseFlowStep<TezConfiguration> flowStep, TezConfiguration currentConf, DAG dag )
093    {
094    super( clientState, currentConf, flowStep, getJobPollingInterval( currentConf ), getStoreInterval( currentConf ), getChildDetailsBlockingDuration( currentConf ) );
095    this.dag = dag;
096
097    if( flowStep.isDebugEnabled() )
098      flowStep.logDebug( "using polling interval: " + pollingInterval );
099    }
100
101  @Override
102  protected FlowStepStats createStepStats( ClientState clientState )
103    {
104    return new TezStepStats( flowStep, clientState )
105    {
106    DAGClient timelineClient = null;
107
108    @Override
109    public DAGClient getJobStatusClient()
110      {
111      if( timelineClient != null )
112        return timelineClient;
113
114      synchronized( this )
115        {
116        if( isTimelineServiceEnabled( jobConfiguration ) )
117          timelineClient = TezStatsUtil.createTimelineClient( dagClient ); // may return null
118
119        if( timelineClient == null )
120          timelineClient = dagClient;
121
122        return timelineClient;
123        }
124      }
125
126    @Override
127    public String getProcessStatusURL()
128      {
129      return TezStatsUtil.getTrackingURL( tezClient, dagClient );
130      }
131
132    @Override
133    public String getProcessStepID()
134      {
135      return dagId;
136      }
137    };
138    }
139
140  protected void internalNonBlockingStart() throws IOException
141    {
142    try
143      {
144      if( !isTimelineServiceEnabled( jobConfiguration ) )
145        flowStep.logWarn( "'" + YarnConfiguration.TIMELINE_SERVICE_ENABLED + "' is disabled, please enable to capture detailed metrics of completed flows, this may require starting the YARN timeline server daemon" );
146
147      TezConfiguration workingConf = new TezConfiguration( jobConfiguration );
148
149      // this could be problematic
150      flowStep.logInfo( "tez session mode enabled: " + workingConf.getBoolean( TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT ) );
151
152      prepareEnsureStagingDir( workingConf );
153
154      tezClient = TezClient.create( flowStep.getName(), workingConf, ( (Hadoop2TezFlowStep) flowStep ).getAllLocalResources(), null );
155
156      tezClient.start();
157
158      dagClient = tezClient.submitDAG( dag );
159
160      dagId = Util.returnInstanceFieldIfExistsSafe( dagClient, "dagId" );
161
162      flowStep.logInfo( "submitted tez dag to app master: {}, with dag id: {}", tezClient.getAppMasterApplicationId(), dagId );
163      }
164    catch( TezException exception )
165      {
166      this.throwable = exception;
167      throw new CascadingException( exception );
168      }
169    }
170
171  private boolean isTimelineServiceEnabled( TezConfiguration workingConf )
172    {
173    return workingConf.getBoolean( YarnConfiguration.TIMELINE_SERVICE_ENABLED, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED );
174    }
175
176  @Override
177  protected void updateNodeStatus( FlowNodeStats flowNodeStats )
178    {
179    if( dagClient == null )
180      return;
181
182    try
183      {
184      VertexStatus vertexStatus = dagClient.getVertexStatus( flowNodeStats.getID(), null ); // no counters
185
186      if( vertexStatus == null )
187        return;
188
189      VertexStatus.State state = vertexStatus.getState();
190
191      if( state == null )
192        return;
193
194      List<String> diagnostics = null;
195
196      switch( state )
197        {
198        case NEW:
199          break;
200
201        case INITIALIZING:
202          break;
203
204        case INITED:
205          break;
206
207        case RUNNING:
208          flowNodeStats.markRunning();
209          break;
210
211        case SUCCEEDED:
212          if( !flowNodeStats.isRunning() )
213            flowNodeStats.markRunning();
214
215          flowNodeStats.markSuccessful();
216          break;
217
218        case FAILED:
219          if( !flowNodeStats.isRunning() )
220            flowNodeStats.markRunning();
221
222          diagnostics = vertexStatus.getDiagnostics();
223
224          if( diagnostics == null || diagnostics.isEmpty() )
225            flowNodeStats.markFailed( throwable );
226          else
227            flowNodeStats.markFailed( diagnostics.toArray( new String[ diagnostics.size() ] ) );
228
229          break;
230
231        case KILLED:
232          if( !flowNodeStats.isRunning() )
233            flowNodeStats.markRunning();
234
235          flowNodeStats.markStopped();
236          break;
237
238        case ERROR:
239          if( !flowNodeStats.isRunning() )
240            flowNodeStats.markRunning();
241
242          diagnostics = vertexStatus.getDiagnostics();
243
244          if( diagnostics == null || diagnostics.isEmpty() )
245            flowNodeStats.markFailed( throwable );
246          else
247            flowNodeStats.markFailed( diagnostics.toArray( new String[ diagnostics.size() ] ) );
248
249          break;
250
251        case TERMINATING:
252          break;
253        }
254      }
255    catch( IOException | TezException exception )
256      {
257      flowStep.logError( "failed setting node status", throwable );
258      }
259    }
260
261  private Path prepareEnsureStagingDir( TezConfiguration workingConf ) throws IOException
262    {
263    String stepStagingPath = createStepStagingPath();
264
265    workingConf.set( TezConfiguration.TEZ_AM_STAGING_DIR, stepStagingPath );
266
267    Path stagingDir = new Path( stepStagingPath );
268    FileSystem fileSystem = FileSystem.get( workingConf );
269
270    stagingDir = fileSystem.makeQualified( stagingDir );
271
272    TokenCache.obtainTokensForNamenodes( new Credentials(), new Path[]{stagingDir}, workingConf );
273
274    TezClientUtils.ensureStagingDirExists( workingConf, stagingDir );
275
276    if( fileSystem.getScheme().startsWith( "file:/" ) )
277      new File( stagingDir.toUri() ).mkdirs();
278
279    return stagingDir;
280    }
281
282  String createStepStagingPath()
283    {
284    String result = "";
285
286    if( HadoopUtil.isLocal( jobConfiguration ) )
287      result = jobConfiguration.get( "hadoop.tmp.dir" ) + Path.SEPARATOR;
288
289    String flowStagingPath = ( (Hadoop2TezFlow) flowStep.getFlow() ).getFlowStagingPath();
290
291    return result + flowStagingPath + Path.SEPARATOR + flowStep.getID();
292    }
293
294  private DAGStatus.State getDagStatusState()
295    {
296    DAGStatus dagStatus = getDagStatus();
297
298    if( dagStatus == null )
299      {
300      flowStep.logWarn( "getDagStatus returned null" );
301
302      return null;
303      }
304
305    DAGStatus.State state = dagStatus.getState();
306
307    if( state == null )
308      flowStep.logWarn( "dagStatus#getState returned null" );
309
310    return state;
311    }
312
313  private boolean isDagStatusComplete()
314    {
315    DAGStatus dagStatus = getDagStatus();
316
317    if( dagStatus == null )
318      flowStep.logWarn( "getDagStatus returned null" );
319
320    return dagStatus != null && dagStatus.isCompleted();
321    }
322
323  private DAGStatus getDagStatus()
324    {
325    if( dagClient == null )
326      return null;
327
328    try
329      {
330      return dagClient.getDAGStatus( null );
331      }
332    catch( NullPointerException exception )
333      {
334      flowStep.logWarn( "NPE thrown by getDAGStatus, known issue" );
335
336      return null;
337      }
338    catch( IOException | TezException exception )
339      {
340      throw new CascadingException( exception );
341      }
342    }
343
344  private DAGStatus getDagStatusWithCounters()
345    {
346    if( dagClient == null )
347      return null;
348
349    try
350      {
351      return dagClient.getDAGStatus( STATUS_GET_OPTS );
352      }
353    catch( IOException | TezException exception )
354      {
355      throw new CascadingException( "unable to get counters from dag client", exception );
356      }
357    }
358
359  protected void internalBlockOnStop() throws IOException
360    {
361    if( isDagStatusComplete() )
362      return;
363
364    try
365      {
366      if( dagClient != null )
367        dagClient.tryKillDAG(); // sometimes throws an NPE
368      }
369    catch( Exception exception )
370      {
371      flowStep.logWarn( "exception during attempt to kill dag", exception );
372      }
373
374    stopDAGClient();
375    stopTezClient();
376    }
377
378  @Override
379  protected void internalCleanup()
380    {
381    stopDAGClient();
382    stopTezClient();
383    }
384
385  private void stopDAGClient()
386    {
387    try
388      {
389      if( dagClient != null )
390        dagClient.close(); // may throw an NPE
391      }
392    catch( Exception exception )
393      {
394      flowStep.logWarn( "exception during attempt to cleanup client", exception );
395      }
396    }
397
398  private void stopTezClient()
399    {
400    try
401      {
402      if( tezClient == null )
403        return;
404
405      if( isRemoteExecution() )
406        {
407        tezClient.stop(); // will shutdown the session
408        return;
409        }
410
411      // the Tez LocalClient will frequently hang on #stop(), this causes tests to never complete
412      Boolean result = Util.submitWithTimeout( new Callable<Boolean>()
413      {
414      @Override
415      public Boolean call() throws Exception
416        {
417        tezClient.stop();
418        return true;
419        }
420      }, 5, TimeUnit.MINUTES );
421
422      if( result == null || !result )
423        flowStep.logWarn( "tezClient#stop() timed out after 5 minutes, cancelling call, continuing" );
424      }
425    catch( Exception exception )
426      {
427      flowStep.logWarn( "exception during attempt to cleanup client", exception );
428      }
429    }
430
431  protected boolean internalNonBlockingIsSuccessful() throws IOException
432    {
433    return isDagStatusComplete() && getDagStatusState() == DAGStatus.State.SUCCEEDED;
434    }
435
436  @Override
437  protected boolean isRemoteExecution()
438    {
439    return !HadoopUtil.isLocal( jobConfiguration );
440    }
441
442  @Override
443  protected Throwable getThrowable()
444    {
445    return throwable;
446    }
447
448  protected String internalJobId()
449    {
450    return dagClient.getExecutionContext();
451    }
452
453  protected boolean internalNonBlockingIsComplete() throws IOException
454    {
455    return isDagStatusComplete();
456    }
457
458  protected void dumpDebugInfo()
459    {
460//    try
461//      {
462//      if( dagStatus == null )
463//        return;
464
465//      flowStep.logWarn( "hadoop job " + runningJob.getID() + " state at " + JobStatus.getJobRunState( runningJob.getJobState() ) );
466//      flowStep.logWarn( "failure info: " + runningJob.getFailureInfo() );
467
468//      TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( 0 );
469//      flowStep.logWarn( "task completion events identify failed tasks" );
470//      flowStep.logWarn( "task completion events count: " + events.length );
471//
472//      for( TaskCompletionEvent event : events )
473//        flowStep.logWarn( "event = " + event );
474//      }
475//    catch( IOException exception )
476//      {
477//      flowStep.logError( "failed reading task completion events", exception );
478//      }
479    }
480
481  protected boolean internalIsStartedRunning()
482    {
483    // this is an alternative, seems to be set in tests sooner
484    // but unsure if the tasks are actually engaged
485    return getDagStatusState() == DAGStatus.State.RUNNING || isDagStatusComplete();
486/*
487    DAGStatus dagStatus = getDagStatus();
488
489    if( dagStatus == null )
490      return false;
491
492    Progress dagProgress = dagStatus.getDAGProgress();
493
494    // not strictly true
495    if( dagProgress == null )
496      return false;
497
498    // same as showing progress in map/reduce
499    int completed = dagProgress.getRunningTaskCount()
500      + dagProgress.getFailedTaskCount()
501      + dagProgress.getKilledTaskCount()
502      + dagProgress.getSucceededTaskCount();
503
504    return completed > 0;
505*/
506    }
507  }