001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.stream.element;
022
023import java.io.IOException;
024import java.util.Collection;
025
026import cascading.CascadingException;
027import cascading.flow.FlowProcess;
028import cascading.flow.SliceCounters;
029import cascading.flow.planner.Scope;
030import cascading.flow.stream.duct.Duct;
031import cascading.flow.stream.duct.DuctException;
032import cascading.flow.stream.element.BoundaryStage;
033import cascading.flow.stream.element.InputSource;
034import cascading.flow.stream.graph.IORole;
035import cascading.flow.stream.graph.StreamGraph;
036import cascading.pipe.Boundary;
037import cascading.pipe.Pipe;
038import cascading.tap.hadoop.util.MeasuredOutputCollector;
039import cascading.tuple.Tuple;
040import cascading.tuple.TupleEntry;
041import cascading.tuple.io.KeyTuple;
042import cascading.tuple.io.ValueTuple;
043import cascading.tuple.util.Resettable1;
044import cascading.util.Util;
045import org.apache.hadoop.mapred.OutputCollector;
046import org.apache.tez.runtime.api.LogicalInput;
047import org.apache.tez.runtime.api.LogicalOutput;
048import org.apache.tez.runtime.library.api.KeyValueReader;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 *
054 */
055public class TezBoundaryStage extends BoundaryStage<TupleEntry, TupleEntry> implements InputSource
056  {
057  private static final Logger LOG = LoggerFactory.getLogger( TezBoundaryStage.class );
058
059  protected Collection<LogicalOutput> logicalOutputs;
060  protected LogicalInput logicalInput;
061
062  private MeasuredOutputCollector collector;
063  private TupleEntry valueEntry;
064
065  private final Resettable1<Tuple> keyTuple = new KeyTuple();
066
067  public TezBoundaryStage( FlowProcess flowProcess, Boundary boundary, IORole role, Collection<LogicalOutput> logicalOutputs )
068    {
069    super( flowProcess, boundary, role );
070
071    if( logicalOutputs == null || logicalOutputs.isEmpty() )
072      throw new IllegalArgumentException( "output must not be null or empty" );
073
074    this.logicalOutputs = logicalOutputs;
075    }
076
077  public TezBoundaryStage( FlowProcess flowProcess, Boundary boundary, IORole role, LogicalInput logicalInput )
078    {
079    super( flowProcess, boundary, role );
080
081    if( logicalInput == null )
082      throw new IllegalArgumentException( "inputs must not be null or empty" );
083
084    this.logicalInput = logicalInput;
085    }
086
087  @Override
088  public void initialize()
089    {
090    super.initialize();
091
092    Scope outgoingScope = Util.getFirst( outgoingScopes );
093    valueEntry = new TupleEntry( outgoingScope.getIncomingFunctionPassThroughFields(), true );
094    }
095
096  @Override
097  public void bind( StreamGraph streamGraph )
098    {
099    if( role != IORole.sink )
100      next = getNextFor( streamGraph );
101    }
102
103  @Override
104  public void prepare()
105    {
106    try
107      {
108      if( logicalInput != null )
109        {
110        LOG.info( "calling {}#start() on: {} {}", logicalInput.getClass().getSimpleName(), getBoundary(), Pipe.id( getBoundary() ) );
111
112        logicalInput.start();
113        }
114
115      if( logicalOutputs != null )
116        {
117        for( LogicalOutput logicalOutput : logicalOutputs )
118          {
119          LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getBoundary(), Pipe.id( getBoundary() ) );
120
121          logicalOutput.start();
122          }
123        }
124      }
125    catch( Exception exception )
126      {
127      throw new CascadingException( "unable to start input/output", exception );
128      }
129
130    if( role != IORole.source )
131      collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() );
132
133    super.prepare();
134    }
135
136  @Override
137  public void start( Duct previous )
138    {
139    if( next != null )
140      super.start( previous );
141    }
142
143  @Override
144  public void receive( Duct previous, TupleEntry incomingEntry )
145    {
146    try
147      {
148      Tuple tuple = incomingEntry.getTuple();
149
150      keyTuple.reset( tuple );
151
152      collector.collect( keyTuple, ValueTuple.NULL );
153      flowProcess.increment( SliceCounters.Tuples_Written, 1 );
154      }
155    catch( OutOfMemoryError error )
156      {
157      handleReThrowableException( "out of memory, try increasing task memory allocation", error );
158      }
159    catch( CascadingException exception )
160      {
161      handleException( exception, incomingEntry );
162      }
163    catch( Throwable throwable )
164      {
165      handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry );
166      }
167    }
168
169  @Override
170  public void complete( Duct previous )
171    {
172    if( next != null )
173      super.complete( previous );
174    }
175
176  @Override
177  public void run( Object input ) throws Throwable
178    {
179    Throwable throwable = map();
180
181    if( throwable != null )
182      throw throwable;
183    }
184
185  protected Throwable map() throws Exception
186    {
187    Throwable localThrowable = null;
188
189    try
190      {
191      start( this );
192
193      KeyValueReader reader = (KeyValueReader) logicalInput.getReader();
194
195      while( reader.next() )
196        {
197        Tuple currentKey = (Tuple) reader.getCurrentKey();
198
199        valueEntry.setTuple( currentKey );
200        next.receive( this, valueEntry );
201        }
202
203      complete( this );
204      }
205    catch( Throwable throwable )
206      {
207      if( !( throwable instanceof OutOfMemoryError ) )
208        LOG.error( "caught throwable", throwable );
209
210      return throwable;
211      }
212
213    return localThrowable;
214    }
215
216  protected OutputCollector createOutputCollector()
217    {
218    if( logicalOutputs.size() == 1 )
219      return new OldOutputCollector( Util.getFirst( logicalOutputs ) );
220
221    final OutputCollector[] collectors = new OutputCollector[ logicalOutputs.size() ];
222
223    int count = 0;
224    for( LogicalOutput logicalOutput : logicalOutputs )
225      collectors[ count++ ] = new OldOutputCollector( logicalOutput );
226
227    return new OutputCollector()
228    {
229    @Override
230    public void collect( Object key, Object value ) throws IOException
231      {
232      for( OutputCollector outputCollector : collectors )
233        outputCollector.collect( key, value );
234      }
235    };
236    }
237  }