001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.stream.element;
022
023import java.io.IOException;
024
025import cascading.flow.FlowProcess;
026import cascading.flow.stream.element.SinkStage;
027import cascading.flow.tez.Hadoop2TezFlowProcess;
028import cascading.tap.Tap;
029import org.apache.tez.mapreduce.output.MROutput;
030import org.apache.tez.runtime.api.LogicalOutput;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 *
036 */
037public class TezSinkStage extends SinkStage
038  {
039  private static final Logger LOG = LoggerFactory.getLogger( TezSinkStage.class );
040
041  private final MROutput logicalOutput;
042  private OldOutputCollector collector;
043
044  public TezSinkStage( FlowProcess flowProcess, Tap sink, LogicalOutput logicalOutput )
045    {
046    super( flowProcess, sink );
047
048    if( logicalOutput == null )
049      throw new IllegalArgumentException( "output must not be null" );
050
051    this.logicalOutput = (MROutput) logicalOutput;
052    }
053
054  @Override
055  public void prepare()
056    {
057    LOG.info( "calling {}#start() on: {}", logicalOutput.getClass().getSimpleName(), getSink() );
058
059    logicalOutput.start();
060
061    collector = new OldOutputCollector( logicalOutput );
062
063    super.prepare();
064    }
065
066  @Override
067  public void cleanup()
068    {
069    try
070      {
071      super.cleanup();
072      }
073    finally
074      {
075      try
076        {
077        if( logicalOutput.isCommitRequired() )
078          commit( logicalOutput );
079        }
080      catch( Exception exception )
081        {
082        LOG.warn( "exception on output close", exception );
083        }
084      }
085    }
086
087  @Override
088  protected Object getOutput()
089    {
090    return collector;
091    }
092
093  private void commit( MROutput output ) throws IOException
094    {
095    int retries = 3;
096    while( true )
097      {
098      // This will loop till the AM asks for the task to be killed. As
099      // against, the AM sending a signal to the task to kill itself
100      // gracefully.
101      try
102        {
103        if( ( (Hadoop2TezFlowProcess) flowProcess ).getContext().canCommit() )
104          break;
105
106        Thread.sleep( 100 );
107        }
108      catch( InterruptedException exception )
109        {
110        //ignore
111        }
112      catch( IOException exception )
113        {
114        LOG.warn( "failure sending canCommit", exception );
115
116        if( --retries == 0 )
117          throw exception;
118        }
119      }
120
121    // task can Commit now
122    try
123      {
124      output.commit();
125      }
126    catch( IOException exception )
127      {
128      LOG.warn( "failure committing", exception );
129
130      //if it couldn't commit a successfully then delete the output
131      discardOutput( output );
132
133      throw exception;
134      }
135    }
136
137  private void discardOutput( MROutput output )
138    {
139    try
140      {
141      output.abort();
142      }
143    catch( IOException exception )
144      {
145      LOG.warn( "failure cleaning up", exception );
146      }
147    }
148  }