001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.stream.element;
022
023import java.io.IOException;
024
025import cascading.flow.FlowProcess;
026import cascading.flow.FlowProcessWrapper;
027import cascading.flow.stream.element.SinkStage;
028import cascading.flow.tez.Hadoop2TezFlowProcess;
029import cascading.tap.Tap;
030import org.apache.tez.mapreduce.output.MROutput;
031import org.apache.tez.runtime.api.LogicalOutput;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 *
037 */
038public class TezSinkStage extends SinkStage
039  {
040  private static final Logger LOG = LoggerFactory.getLogger( TezSinkStage.class );
041
042  private final MROutput logicalOutput;
043  private OldOutputCollector collector;
044
045  public TezSinkStage( FlowProcess flowProcess, Tap sink, LogicalOutput logicalOutput )
046    {
047    super( flowProcess, sink );
048
049    if( logicalOutput == null )
050      throw new IllegalArgumentException( "output must not be null" );
051
052    this.logicalOutput = (MROutput) logicalOutput;
053    }
054
055  @Override
056  public void prepare()
057    {
058    LOG.info( "calling {}#start() on: {}", logicalOutput.getClass().getSimpleName(), getSink() );
059
060    logicalOutput.start();
061
062    collector = new OldOutputCollector( logicalOutput );
063
064    super.prepare();
065    }
066
067  @Override
068  public void cleanup()
069    {
070    try
071      {
072      super.cleanup();
073      }
074    finally
075      {
076      try
077        {
078        if( logicalOutput.isCommitRequired() )
079          commit( logicalOutput );
080        }
081      catch( Exception exception )
082        {
083        LOG.warn( "exception on output close", exception );
084        }
085      }
086    }
087
088  @Override
089  protected Object getOutput()
090    {
091    return collector;
092    }
093
094  private void commit( MROutput output ) throws IOException
095    {
096    int retries = 3;
097    while( true )
098      {
099      // This will loop till the AM asks for the task to be killed. As
100      // against, the AM sending a signal to the task to kill itself
101      // gracefully.
102      try
103        {
104        if( ( (Hadoop2TezFlowProcess) FlowProcessWrapper.undelegate( flowProcess ) ).getContext().canCommit() )
105          break;
106
107        Thread.sleep( 100 );
108        }
109      catch( InterruptedException exception )
110        {
111        //ignore
112        }
113      catch( IOException exception )
114        {
115        LOG.warn( "failure sending canCommit", exception );
116
117        if( --retries == 0 )
118          throw exception;
119        }
120      }
121
122    // task can Commit now
123    try
124      {
125      output.commit();
126      }
127    catch( IOException exception )
128      {
129      LOG.warn( "failure committing", exception );
130
131      //if it couldn't commit a successfully then delete the output
132      discardOutput( output );
133
134      throw exception;
135      }
136    }
137
138  private void discardOutput( MROutput output )
139    {
140    try
141      {
142      output.abort();
143      }
144    catch( IOException exception )
145      {
146      LOG.warn( "failure cleaning up", exception );
147      }
148    }
149  }