001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.stream.element; 022 023import java.io.IOException; 024 025import cascading.flow.FlowProcess; 026import cascading.flow.stream.element.SinkStage; 027import cascading.flow.tez.Hadoop2TezFlowProcess; 028import cascading.tap.Tap; 029import org.apache.tez.mapreduce.output.MROutput; 030import org.apache.tez.runtime.api.LogicalOutput; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * 036 */ 037public class TezSinkStage extends SinkStage 038 { 039 private static final Logger LOG = LoggerFactory.getLogger( TezSinkStage.class ); 040 041 private final MROutput logicalOutput; 042 private OldOutputCollector collector; 043 044 public TezSinkStage( FlowProcess flowProcess, Tap sink, LogicalOutput logicalOutput ) 045 { 046 super( flowProcess, sink ); 047 048 if( logicalOutput == null ) 049 throw new IllegalArgumentException( "output must not be null" ); 050 051 this.logicalOutput = (MROutput) logicalOutput; 052 } 053 054 @Override 055 public void prepare() 056 { 057 LOG.info( "calling {}#start() on: {}", logicalOutput.getClass().getSimpleName(), getSink() ); 058 059 logicalOutput.start(); 060 061 collector = new OldOutputCollector( logicalOutput ); 062 063 super.prepare(); 064 } 065 066 @Override 067 public void cleanup() 068 { 069 try 070 { 071 super.cleanup(); 072 } 073 finally 074 { 075 try 076 { 077 if( logicalOutput.isCommitRequired() ) 078 commit( logicalOutput ); 079 } 080 catch( Exception exception ) 081 { 082 LOG.warn( "exception on output close", exception ); 083 } 084 } 085 } 086 087 @Override 088 protected Object getOutput() 089 { 090 return collector; 091 } 092 093 private void commit( MROutput output ) throws IOException 094 { 095 int retries = 3; 096 while( true ) 097 { 098 // This will loop till the AM asks for the task to be killed. As 099 // against, the AM sending a signal to the task to kill itself 100 // gracefully. 101 try 102 { 103 if( ( (Hadoop2TezFlowProcess) flowProcess ).getContext().canCommit() ) 104 break; 105 106 Thread.sleep( 100 ); 107 } 108 catch( InterruptedException exception ) 109 { 110 //ignore 111 } 112 catch( IOException exception ) 113 { 114 LOG.warn( "failure sending canCommit", exception ); 115 116 if( --retries == 0 ) 117 throw exception; 118 } 119 } 120 121 // task can Commit now 122 try 123 { 124 output.commit(); 125 } 126 catch( IOException exception ) 127 { 128 LOG.warn( "failure committing", exception ); 129 130 //if it couldn't commit a successfully then delete the output 131 discardOutput( output ); 132 133 throw exception; 134 } 135 } 136 137 private void discardOutput( MROutput output ) 138 { 139 try 140 { 141 output.abort(); 142 } 143 catch( IOException exception ) 144 { 145 LOG.warn( "failure cleaning up", exception ); 146 } 147 } 148 }