001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.stream.element;
022
023import java.io.IOException;
024import java.util.Collection;
025
026import cascading.CascadingException;
027import cascading.flow.FlowProcess;
028import cascading.flow.SliceCounters;
029import cascading.flow.planner.Scope;
030import cascading.flow.stream.duct.Duct;
031import cascading.flow.stream.duct.DuctException;
032import cascading.flow.stream.element.InputSource;
033import cascading.flow.stream.element.SpliceGate;
034import cascading.flow.stream.graph.IORole;
035import cascading.flow.stream.graph.StreamGraph;
036import cascading.pipe.Pipe;
037import cascading.pipe.Splice;
038import cascading.tap.hadoop.util.MeasuredOutputCollector;
039import cascading.tuple.Tuple;
040import cascading.tuple.TupleEntry;
041import cascading.tuple.io.KeyTuple;
042import cascading.tuple.io.ValueTuple;
043import cascading.tuple.util.Resettable1;
044import cascading.util.SortedListMultiMap;
045import cascading.util.Util;
046import org.apache.hadoop.mapred.OutputCollector;
047import org.apache.tez.runtime.api.LogicalInput;
048import org.apache.tez.runtime.api.LogicalOutput;
049import org.apache.tez.runtime.library.api.KeyValueReader;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053/**
054 *
055 */
056public class TezMergeGate extends SpliceGate<TupleEntry, TupleEntry> implements InputSource
057  {
058  private static final Logger LOG = LoggerFactory.getLogger( TezMergeGate.class );
059
060  protected Collection<LogicalOutput> logicalOutputs;
061  protected SortedListMultiMap<Integer, LogicalInput> logicalInputs;
062
063  private MeasuredOutputCollector collector;
064  private TupleEntry valueEntry;
065
066  private final Resettable1<Tuple> keyTuple = new KeyTuple();
067
068  public TezMergeGate( FlowProcess flowProcess, Splice splice, IORole role, Collection<LogicalOutput> logicalOutputs )
069    {
070    super( flowProcess, splice, role );
071
072    if( logicalOutputs == null || logicalOutputs.isEmpty() )
073      throw new IllegalArgumentException( "output must not be null or empty" );
074
075    this.logicalOutputs = logicalOutputs;
076    }
077
078  public TezMergeGate( FlowProcess flowProcess, Splice splice, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs )
079    {
080    super( flowProcess, splice, role );
081
082    if( logicalInputs == null || logicalInputs.getKeys().size() == 0 )
083      throw new IllegalArgumentException( "inputs must not be null or empty" );
084
085    if( logicalInputs.getValues().size() != 1 )
086      throw new IllegalArgumentException( "only supports a single input" );
087
088    this.logicalInputs = logicalInputs;
089    }
090
091  @Override
092  public void initialize()
093    {
094    super.initialize();
095
096    Scope outgoingScope = Util.getFirst( outgoingScopes );
097    valueEntry = new TupleEntry( outgoingScope.getOutValuesFields(), true );
098    }
099
100  @Override
101  public void bind( StreamGraph streamGraph )
102    {
103    if( role != IORole.sink )
104      next = getNextFor( streamGraph );
105    }
106
107  @Override
108  public void prepare()
109    {
110    try
111      {
112      if( logicalInputs != null )
113        {
114        for( LogicalInput logicalInput : logicalInputs.getValues() )
115          {
116          LOG.info( "calling {}#start() on: {} {}, for {} inputs", logicalInput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ), logicalInputs.getValues().size() );
117
118          logicalInput.start();
119          }
120        }
121
122      if( logicalOutputs != null )
123        {
124        for( LogicalOutput logicalOutput : logicalOutputs )
125          {
126          LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ) );
127
128          logicalOutput.start();
129          }
130        }
131      }
132    catch( Exception exception )
133      {
134      throw new CascadingException( "unable to start input/output", exception );
135      }
136
137    if( role != IORole.source )
138      collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() );
139
140    super.prepare();
141    }
142
143  @Override
144  public void start( Duct previous )
145    {
146    if( next != null )
147      super.start( previous );
148    }
149
150  @Override
151  public void receive( Duct previous, TupleEntry incomingEntry )
152    {
153    try
154      {
155      keyTuple.reset( incomingEntry.getTuple() );
156
157      collector.collect( keyTuple, ValueTuple.NULL );
158      flowProcess.increment( SliceCounters.Tuples_Written, 1 );
159      }
160    catch( OutOfMemoryError error )
161      {
162      handleReThrowableException( "out of memory, try increasing task memory allocation", error );
163      }
164    catch( CascadingException exception )
165      {
166      handleException( exception, incomingEntry );
167      }
168    catch( Throwable throwable )
169      {
170      handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry );
171      }
172    }
173
174  @Override
175  public void complete( Duct previous )
176    {
177    if( next != null )
178      super.complete( previous );
179    }
180
181  @Override
182  public void run( Object input ) throws Throwable
183    {
184    Throwable throwable = map();
185
186    if( throwable != null )
187      throw throwable;
188    }
189
190  protected Throwable map() throws Exception
191    {
192    Throwable localThrowable = null;
193
194    try
195      {
196      start( this );
197
198      // if multiple ordinals, an input could be duplicated if sourcing multiple paths
199      LogicalInput logicalInput = Util.getFirst( logicalInputs.getValues() );
200
201      KeyValueReader reader = (KeyValueReader) logicalInput.getReader();
202
203      while( reader.next() )
204        {
205        Tuple currentKey = (Tuple) reader.getCurrentKey();
206
207        valueEntry.setTuple( currentKey );
208        next.receive( this, valueEntry );
209        }
210
211      complete( this );
212      }
213    catch( Throwable throwable )
214      {
215      if( !( throwable instanceof OutOfMemoryError ) )
216        LOG.error( "caught throwable", throwable );
217
218      return throwable;
219      }
220
221    return localThrowable;
222    }
223
224  protected OutputCollector createOutputCollector()
225    {
226    if( logicalOutputs.size() == 1 )
227      return new OldOutputCollector( Util.getFirst( logicalOutputs ) );
228
229    final OutputCollector[] collectors = new OutputCollector[ logicalOutputs.size() ];
230
231    int count = 0;
232    for( LogicalOutput logicalOutput : logicalOutputs )
233      collectors[ count++ ] = new OldOutputCollector( logicalOutput );
234
235    return new OutputCollector()
236    {
237    @Override
238    public void collect( Object key, Object value ) throws IOException
239      {
240      for( OutputCollector outputCollector : collectors )
241        outputCollector.collect( key, value );
242      }
243    };
244    }
245  }