001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.tez.stream.element;
023
024import java.io.IOException;
025import java.util.Collection;
026
027import cascading.CascadingException;
028import cascading.flow.FlowProcess;
029import cascading.flow.SliceCounters;
030import cascading.flow.planner.Scope;
031import cascading.flow.stream.duct.Duct;
032import cascading.flow.stream.duct.DuctException;
033import cascading.flow.stream.element.BoundaryStage;
034import cascading.flow.stream.element.InputSource;
035import cascading.flow.stream.graph.IORole;
036import cascading.flow.stream.graph.StreamGraph;
037import cascading.pipe.Boundary;
038import cascading.pipe.Pipe;
039import cascading.tap.hadoop.util.MeasuredOutputCollector;
040import cascading.tuple.Tuple;
041import cascading.tuple.TupleEntry;
042import cascading.tuple.io.KeyTuple;
043import cascading.tuple.io.ValueTuple;
044import cascading.tuple.util.Resettable1;
045import cascading.util.Util;
046import org.apache.hadoop.mapred.OutputCollector;
047import org.apache.tez.runtime.api.LogicalInput;
048import org.apache.tez.runtime.api.LogicalOutput;
049import org.apache.tez.runtime.library.api.KeyValueReader;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 *
055 */
056public class TezBoundaryStage extends BoundaryStage<TupleEntry, TupleEntry> implements InputSource
057  {
058  private static final Logger LOG = LoggerFactory.getLogger( TezBoundaryStage.class );
059
060  protected Collection<LogicalOutput> logicalOutputs;
061  protected LogicalInput logicalInput;
062
063  private MeasuredOutputCollector collector;
064  private TupleEntry valueEntry;
065
066  private final Resettable1<Tuple> keyTuple = new KeyTuple();
067
068  public TezBoundaryStage( FlowProcess flowProcess, Boundary boundary, IORole role, Collection<LogicalOutput> logicalOutputs )
069    {
070    super( flowProcess, boundary, role );
071
072    if( logicalOutputs == null || logicalOutputs.isEmpty() )
073      throw new IllegalArgumentException( "output must not be null or empty" );
074
075    this.logicalOutputs = logicalOutputs;
076    }
077
078  public TezBoundaryStage( FlowProcess flowProcess, Boundary boundary, IORole role, LogicalInput logicalInput )
079    {
080    super( flowProcess, boundary, role );
081
082    if( logicalInput == null )
083      throw new IllegalArgumentException( "inputs must not be null or empty" );
084
085    this.logicalInput = logicalInput;
086    }
087
088  @Override
089  public void initialize()
090    {
091    super.initialize();
092
093    Scope outgoingScope = Util.getFirst( outgoingScopes );
094    valueEntry = new TupleEntry( outgoingScope.getIncomingFunctionPassThroughFields(), true );
095    }
096
097  @Override
098  public void bind( StreamGraph streamGraph )
099    {
100    if( role != IORole.sink )
101      next = getNextFor( streamGraph );
102    }
103
104  @Override
105  public void prepare()
106    {
107    try
108      {
109      if( logicalInput != null )
110        {
111        LOG.info( "calling {}#start() on: {} {}", logicalInput.getClass().getSimpleName(), getBoundary(), Pipe.id( getBoundary() ) );
112
113        logicalInput.start();
114        }
115
116      if( logicalOutputs != null )
117        {
118        for( LogicalOutput logicalOutput : logicalOutputs )
119          {
120          LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getBoundary(), Pipe.id( getBoundary() ) );
121
122          logicalOutput.start();
123          }
124        }
125      }
126    catch( Exception exception )
127      {
128      throw new CascadingException( "unable to start input/output", exception );
129      }
130
131    if( role != IORole.source )
132      collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() );
133
134    super.prepare();
135    }
136
137  @Override
138  public void start( Duct previous )
139    {
140    if( next != null )
141      super.start( previous );
142    }
143
144  @Override
145  public void receive( Duct previous, int ordinal, TupleEntry incomingEntry )
146    {
147    try
148      {
149      Tuple tuple = incomingEntry.getTuple();
150
151      keyTuple.reset( tuple );
152
153      collector.collect( keyTuple, ValueTuple.NULL );
154      flowProcess.increment( SliceCounters.Tuples_Written, 1 );
155      }
156    catch( OutOfMemoryError error )
157      {
158      handleReThrowableException( "out of memory, try increasing task memory allocation", error );
159      }
160    catch( CascadingException exception )
161      {
162      handleException( exception, incomingEntry );
163      }
164    catch( Throwable throwable )
165      {
166      handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry );
167      }
168    }
169
170  @Override
171  public void complete( Duct previous )
172    {
173    if( next != null )
174      super.complete( previous );
175    }
176
177  @Override
178  public void run( Object input ) throws Throwable
179    {
180    Throwable throwable = map();
181
182    if( throwable != null )
183      throw throwable;
184    }
185
186  protected Throwable map() throws Exception
187    {
188    Throwable localThrowable = null;
189
190    try
191      {
192      start( this );
193
194      KeyValueReader reader = (KeyValueReader) logicalInput.getReader();
195
196      while( reader.next() )
197        {
198        Tuple currentKey = (Tuple) reader.getCurrentKey();
199
200        valueEntry.setTuple( currentKey );
201        next.receive( this, 0, valueEntry );
202        }
203
204      complete( this );
205      }
206    catch( Throwable throwable )
207      {
208      if( !( throwable instanceof OutOfMemoryError ) )
209        LOG.error( "caught throwable", throwable );
210
211      return throwable;
212      }
213
214    return localThrowable;
215    }
216
217  protected OutputCollector createOutputCollector()
218    {
219    if( logicalOutputs.size() == 1 )
220      return new OldOutputCollector( Util.getFirst( logicalOutputs ) );
221
222    final OutputCollector[] collectors = new OutputCollector[ logicalOutputs.size() ];
223
224    int count = 0;
225    for( LogicalOutput logicalOutput : logicalOutputs )
226      collectors[ count++ ] = new OldOutputCollector( logicalOutput );
227
228    return new OutputCollector()
229      {
230      @Override
231      public void collect( Object key, Object value ) throws IOException
232        {
233        for( OutputCollector outputCollector : collectors )
234          outputCollector.collect( key, value );
235        }
236      };
237    }
238  }