001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.stream.element; 022 023import java.io.IOException; 024 025import cascading.flow.FlowProcess; 026import cascading.flow.FlowProcessWrapper; 027import cascading.flow.stream.element.SinkStage; 028import cascading.flow.tez.Hadoop2TezFlowProcess; 029import cascading.tap.Tap; 030import org.apache.tez.mapreduce.output.MROutput; 031import org.apache.tez.runtime.api.LogicalOutput; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * 037 */ 038public class TezSinkStage extends SinkStage 039 { 040 private static final Logger LOG = LoggerFactory.getLogger( TezSinkStage.class ); 041 042 private final MROutput logicalOutput; 043 private OldOutputCollector collector; 044 045 public TezSinkStage( FlowProcess flowProcess, Tap sink, LogicalOutput logicalOutput ) 046 { 047 super( flowProcess, sink ); 048 049 if( logicalOutput == null ) 050 throw new IllegalArgumentException( "output must not be null" ); 051 052 this.logicalOutput = (MROutput) logicalOutput; 053 } 054 055 @Override 056 public void prepare() 057 { 058 LOG.info( "calling {}#start() on: {}", logicalOutput.getClass().getSimpleName(), getSink() ); 059 060 logicalOutput.start(); 061 062 collector = new OldOutputCollector( logicalOutput ); 063 064 super.prepare(); 065 } 066 067 @Override 068 public void cleanup() 069 { 070 try 071 { 072 super.cleanup(); 073 } 074 finally 075 { 076 try 077 { 078 if( logicalOutput.isCommitRequired() ) 079 commit( logicalOutput ); 080 } 081 catch( Exception exception ) 082 { 083 LOG.warn( "exception on output close", exception ); 084 } 085 } 086 } 087 088 @Override 089 protected Object getOutput() 090 { 091 return collector; 092 } 093 094 private void commit( MROutput output ) throws IOException 095 { 096 int retries = 3; 097 while( true ) 098 { 099 // This will loop till the AM asks for the task to be killed. As 100 // against, the AM sending a signal to the task to kill itself 101 // gracefully. 102 try 103 { 104 if( ( (Hadoop2TezFlowProcess) FlowProcessWrapper.undelegate( flowProcess ) ).getContext().canCommit() ) 105 break; 106 107 Thread.sleep( 100 ); 108 } 109 catch( InterruptedException exception ) 110 { 111 //ignore 112 } 113 catch( IOException exception ) 114 { 115 LOG.warn( "failure sending canCommit", exception ); 116 117 if( --retries == 0 ) 118 throw exception; 119 } 120 } 121 122 // task can Commit now 123 try 124 { 125 output.commit(); 126 } 127 catch( IOException exception ) 128 { 129 LOG.warn( "failure committing", exception ); 130 131 //if it couldn't commit a successfully then delete the output 132 discardOutput( output ); 133 134 throw exception; 135 } 136 } 137 138 private void discardOutput( MROutput output ) 139 { 140 try 141 { 142 output.abort(); 143 } 144 catch( IOException exception ) 145 { 146 LOG.warn( "failure cleaning up", exception ); 147 } 148 } 149 }