001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.planner; 022 023import java.io.File; 024import java.io.IOException; 025import java.util.EnumSet; 026import java.util.List; 027import java.util.Set; 028import java.util.concurrent.Callable; 029import java.util.concurrent.TimeUnit; 030 031import cascading.CascadingException; 032import cascading.flow.hadoop.util.HadoopUtil; 033import cascading.flow.planner.BaseFlowStep; 034import cascading.flow.planner.FlowStepJob; 035import cascading.flow.tez.Hadoop2TezFlow; 036import cascading.flow.tez.Hadoop2TezFlowStep; 037import cascading.management.state.ClientState; 038import cascading.stats.FlowNodeStats; 039import cascading.stats.FlowStepStats; 040import cascading.stats.tez.TezStepStats; 041import cascading.stats.tez.util.TezStatsUtil; 042import cascading.util.Util; 043import org.apache.hadoop.conf.Configuration; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.mapreduce.security.TokenCache; 047import org.apache.hadoop.security.Credentials; 048import org.apache.hadoop.yarn.conf.YarnConfiguration; 049import org.apache.tez.client.TezClient; 050import org.apache.tez.client.TezClientUtils; 051import org.apache.tez.dag.api.DAG; 052import org.apache.tez.dag.api.TezConfiguration; 053import org.apache.tez.dag.api.TezException; 054import org.apache.tez.dag.api.client.DAGClient; 055import org.apache.tez.dag.api.client.DAGStatus; 056import org.apache.tez.dag.api.client.StatusGetOpts; 057import org.apache.tez.dag.api.client.VertexStatus; 058 059import static cascading.flow.FlowProps.JOB_POLLING_INTERVAL; 060import static cascading.stats.CascadingStats.STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION; 061import static cascading.stats.CascadingStats.STATS_STORE_INTERVAL; 062 063/** 064 * 065 */ 066public class Hadoop2TezFlowStepJob extends FlowStepJob<TezConfiguration> 067 { 068 private static final Set<StatusGetOpts> STATUS_GET_OPTS = EnumSet.of( StatusGetOpts.GET_COUNTERS ); 069 070 private DAG dag; 071 072 private TezClient tezClient; 073 private DAGClient dagClient; 074 075 private String dagId; 076 077 private static long getStoreInterval( Configuration configuration ) 078 { 079 return configuration.getLong( STATS_STORE_INTERVAL, 60 * 1000 ); 080 } 081 082 private static long getChildDetailsBlockingDuration( Configuration configuration ) 083 { 084 return configuration.getLong( STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION, 60 * 1000 ); 085 } 086 087 public static long getJobPollingInterval( Configuration configuration ) 088 { 089 return configuration.getLong( JOB_POLLING_INTERVAL, 5000 ); 090 } 091 092 public Hadoop2TezFlowStepJob( ClientState clientState, BaseFlowStep<TezConfiguration> flowStep, TezConfiguration currentConf, DAG dag ) 093 { 094 super( clientState, currentConf, flowStep, getJobPollingInterval( currentConf ), getStoreInterval( currentConf ), getChildDetailsBlockingDuration( currentConf ) ); 095 this.dag = dag; 096 097 if( flowStep.isDebugEnabled() ) 098 flowStep.logDebug( "using polling interval: " + pollingInterval ); 099 } 100 101 @Override 102 protected FlowStepStats createStepStats( ClientState clientState ) 103 { 104 return new TezStepStats( flowStep, clientState ) 105 { 106 DAGClient timelineClient = null; 107 108 @Override 109 public DAGClient getJobStatusClient() 110 { 111 if( timelineClient != null ) 112 return timelineClient; 113 114 synchronized( this ) 115 { 116 if( isTimelineServiceEnabled( jobConfiguration ) ) 117 timelineClient = TezStatsUtil.createTimelineClient( dagClient ); // may return null 118 119 if( timelineClient == null ) 120 timelineClient = dagClient; 121 122 return timelineClient; 123 } 124 } 125 126 @Override 127 public String getProcessStatusURL() 128 { 129 return TezStatsUtil.getTrackingURL( tezClient, dagClient ); 130 } 131 132 @Override 133 public String getProcessStepID() 134 { 135 return dagId; 136 } 137 }; 138 } 139 140 protected void internalNonBlockingStart() throws IOException 141 { 142 try 143 { 144 if( !isTimelineServiceEnabled( jobConfiguration ) ) 145 flowStep.logWarn( "'" + YarnConfiguration.TIMELINE_SERVICE_ENABLED + "' is disabled, please enable to capture detailed metrics of completed flows, this may require starting the YARN timeline server daemon" ); 146 147 TezConfiguration workingConf = new TezConfiguration( jobConfiguration ); 148 149 // this could be problematic 150 flowStep.logInfo( "tez session mode enabled: " + workingConf.getBoolean( TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT ) ); 151 152 prepareEnsureStagingDir( workingConf ); 153 154 tezClient = TezClient.create( flowStep.getName(), workingConf, ( (Hadoop2TezFlowStep) flowStep ).getAllLocalResources(), null ); 155 156 tezClient.start(); 157 158 dagClient = tezClient.submitDAG( dag ); 159 160 dagId = Util.returnInstanceFieldIfExistsSafe( dagClient, "dagId" ); 161 162 flowStep.logInfo( "submitted tez dag to app master: {}, with dag id: {}", tezClient.getAppMasterApplicationId(), dagId ); 163 } 164 catch( TezException exception ) 165 { 166 this.throwable = exception; 167 throw new CascadingException( exception ); 168 } 169 } 170 171 private boolean isTimelineServiceEnabled( TezConfiguration workingConf ) 172 { 173 return workingConf.getBoolean( YarnConfiguration.TIMELINE_SERVICE_ENABLED, YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED ); 174 } 175 176 @Override 177 protected void updateNodeStatus( FlowNodeStats flowNodeStats ) 178 { 179 if( dagClient == null ) 180 return; 181 182 try 183 { 184 VertexStatus vertexStatus = dagClient.getVertexStatus( flowNodeStats.getID(), null ); // no counters 185 186 if( vertexStatus == null ) 187 return; 188 189 VertexStatus.State state = vertexStatus.getState(); 190 191 if( state == null ) 192 return; 193 194 List<String> diagnostics = null; 195 196 switch( state ) 197 { 198 case NEW: 199 break; 200 201 case INITIALIZING: 202 break; 203 204 case INITED: 205 break; 206 207 case RUNNING: 208 flowNodeStats.markRunning(); 209 break; 210 211 case SUCCEEDED: 212 if( !flowNodeStats.isRunning() ) 213 flowNodeStats.markRunning(); 214 215 flowNodeStats.markSuccessful(); 216 break; 217 218 case FAILED: 219 if( !flowNodeStats.isRunning() ) 220 flowNodeStats.markRunning(); 221 222 diagnostics = vertexStatus.getDiagnostics(); 223 224 if( diagnostics == null || diagnostics.isEmpty() ) 225 flowNodeStats.markFailed( throwable ); 226 else 227 flowNodeStats.markFailed( diagnostics.toArray( new String[ diagnostics.size() ] ) ); 228 229 break; 230 231 case KILLED: 232 if( !flowNodeStats.isRunning() ) 233 flowNodeStats.markRunning(); 234 235 flowNodeStats.markStopped(); 236 break; 237 238 case ERROR: 239 if( !flowNodeStats.isRunning() ) 240 flowNodeStats.markRunning(); 241 242 diagnostics = vertexStatus.getDiagnostics(); 243 244 if( diagnostics == null || diagnostics.isEmpty() ) 245 flowNodeStats.markFailed( throwable ); 246 else 247 flowNodeStats.markFailed( diagnostics.toArray( new String[ diagnostics.size() ] ) ); 248 249 break; 250 251 case TERMINATING: 252 break; 253 } 254 } 255 catch( IOException | TezException exception ) 256 { 257 flowStep.logError( "failed setting node status", throwable ); 258 } 259 } 260 261 private Path prepareEnsureStagingDir( TezConfiguration workingConf ) throws IOException 262 { 263 String stepStagingPath = createStepStagingPath(); 264 265 workingConf.set( TezConfiguration.TEZ_AM_STAGING_DIR, stepStagingPath ); 266 267 Path stagingDir = new Path( stepStagingPath ); 268 FileSystem fileSystem = FileSystem.get( workingConf ); 269 270 stagingDir = fileSystem.makeQualified( stagingDir ); 271 272 TokenCache.obtainTokensForNamenodes( new Credentials(), new Path[]{stagingDir}, workingConf ); 273 274 TezClientUtils.ensureStagingDirExists( workingConf, stagingDir ); 275 276 if( fileSystem.getScheme().startsWith( "file:/" ) ) 277 new File( stagingDir.toUri() ).mkdirs(); 278 279 return stagingDir; 280 } 281 282 String createStepStagingPath() 283 { 284 String result = ""; 285 286 if( HadoopUtil.isLocal( jobConfiguration ) ) 287 result = jobConfiguration.get( "hadoop.tmp.dir" ) + Path.SEPARATOR; 288 289 String flowStagingPath = ( (Hadoop2TezFlow) flowStep.getFlow() ).getFlowStagingPath(); 290 291 return result + flowStagingPath + Path.SEPARATOR + flowStep.getID(); 292 } 293 294 private DAGStatus.State getDagStatusState() 295 { 296 DAGStatus dagStatus = getDagStatus(); 297 298 if( dagStatus == null ) 299 { 300 flowStep.logWarn( "getDagStatus returned null" ); 301 302 return null; 303 } 304 305 DAGStatus.State state = dagStatus.getState(); 306 307 if( state == null ) 308 flowStep.logWarn( "dagStatus#getState returned null" ); 309 310 return state; 311 } 312 313 private boolean isDagStatusComplete() 314 { 315 DAGStatus dagStatus = getDagStatus(); 316 317 if( dagStatus == null ) 318 flowStep.logWarn( "getDagStatus returned null" ); 319 320 return dagStatus != null && dagStatus.isCompleted(); 321 } 322 323 private DAGStatus getDagStatus() 324 { 325 if( dagClient == null ) 326 return null; 327 328 try 329 { 330 return dagClient.getDAGStatus( null ); 331 } 332 catch( NullPointerException exception ) 333 { 334 flowStep.logWarn( "NPE thrown by getDAGStatus, known issue" ); 335 336 return null; 337 } 338 catch( IOException | TezException exception ) 339 { 340 throw new CascadingException( exception ); 341 } 342 } 343 344 private DAGStatus getDagStatusWithCounters() 345 { 346 if( dagClient == null ) 347 return null; 348 349 try 350 { 351 return dagClient.getDAGStatus( STATUS_GET_OPTS ); 352 } 353 catch( IOException | TezException exception ) 354 { 355 throw new CascadingException( "unable to get counters from dag client", exception ); 356 } 357 } 358 359 protected void internalBlockOnStop() throws IOException 360 { 361 if( isDagStatusComplete() ) 362 return; 363 364 try 365 { 366 if( dagClient != null ) 367 dagClient.tryKillDAG(); // sometimes throws an NPE 368 } 369 catch( Exception exception ) 370 { 371 flowStep.logWarn( "exception during attempt to kill dag", exception ); 372 } 373 374 stopDAGClient(); 375 stopTezClient(); 376 } 377 378 @Override 379 protected void internalCleanup() 380 { 381 stopDAGClient(); 382 stopTezClient(); 383 } 384 385 private void stopDAGClient() 386 { 387 try 388 { 389 if( dagClient != null ) 390 dagClient.close(); // may throw an NPE 391 } 392 catch( Exception exception ) 393 { 394 flowStep.logWarn( "exception during attempt to cleanup client", exception ); 395 } 396 } 397 398 private void stopTezClient() 399 { 400 try 401 { 402 if( tezClient == null ) 403 return; 404 405 if( isRemoteExecution() ) 406 { 407 tezClient.stop(); // will shutdown the session 408 return; 409 } 410 411 // the Tez LocalClient will frequently hang on #stop(), this causes tests to never complete 412 Boolean result = Util.submitWithTimeout( new Callable<Boolean>() 413 { 414 @Override 415 public Boolean call() throws Exception 416 { 417 tezClient.stop(); 418 return true; 419 } 420 }, 5, TimeUnit.MINUTES ); 421 422 if( result == null || !result ) 423 flowStep.logWarn( "tezClient#stop() timed out after 5 minutes, cancelling call, continuing" ); 424 } 425 catch( Exception exception ) 426 { 427 flowStep.logWarn( "exception during attempt to cleanup client", exception ); 428 } 429 } 430 431 protected boolean internalNonBlockingIsSuccessful() throws IOException 432 { 433 return isDagStatusComplete() && getDagStatusState() == DAGStatus.State.SUCCEEDED; 434 } 435 436 @Override 437 protected boolean isRemoteExecution() 438 { 439 return !HadoopUtil.isLocal( jobConfiguration ); 440 } 441 442 @Override 443 protected Throwable getThrowable() 444 { 445 return throwable; 446 } 447 448 protected String internalJobId() 449 { 450 return dagClient.getExecutionContext(); 451 } 452 453 protected boolean internalNonBlockingIsComplete() throws IOException 454 { 455 return isDagStatusComplete(); 456 } 457 458 protected void dumpDebugInfo() 459 { 460// try 461// { 462// if( dagStatus == null ) 463// return; 464 465// flowStep.logWarn( "hadoop job " + runningJob.getID() + " state at " + JobStatus.getJobRunState( runningJob.getJobState() ) ); 466// flowStep.logWarn( "failure info: " + runningJob.getFailureInfo() ); 467 468// TaskCompletionEvent[] events = runningJob.getTaskCompletionEvents( 0 ); 469// flowStep.logWarn( "task completion events identify failed tasks" ); 470// flowStep.logWarn( "task completion events count: " + events.length ); 471// 472// for( TaskCompletionEvent event : events ) 473// flowStep.logWarn( "event = " + event ); 474// } 475// catch( IOException exception ) 476// { 477// flowStep.logError( "failed reading task completion events", exception ); 478// } 479 } 480 481 protected boolean internalIsStartedRunning() 482 { 483 // this is an alternative, seems to be set in tests sooner 484 // but unsure if the tasks are actually engaged 485 return getDagStatusState() == DAGStatus.State.RUNNING || isDagStatusComplete(); 486/* 487 DAGStatus dagStatus = getDagStatus(); 488 489 if( dagStatus == null ) 490 return false; 491 492 Progress dagProgress = dagStatus.getDAGProgress(); 493 494 // not strictly true 495 if( dagProgress == null ) 496 return false; 497 498 // same as showing progress in map/reduce 499 int completed = dagProgress.getRunningTaskCount() 500 + dagProgress.getFailedTaskCount() 501 + dagProgress.getKilledTaskCount() 502 + dagProgress.getSucceededTaskCount(); 503 504 return completed > 0; 505*/ 506 } 507 }