001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.stream.element;
022
023import cascading.CascadingException;
024import cascading.flow.FlowProcess;
025import cascading.flow.hadoop.stream.HadoopGroupGate;
026import cascading.flow.stream.duct.Duct;
027import cascading.flow.stream.element.InputSource;
028import cascading.flow.stream.graph.IORole;
029import cascading.pipe.Pipe;
030import cascading.pipe.Splice;
031import cascading.tuple.Tuple;
032import cascading.util.SortedListMultiMap;
033import org.apache.hadoop.mapred.OutputCollector;
034import org.apache.tez.runtime.api.LogicalInput;
035import org.apache.tez.runtime.api.LogicalOutput;
036import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 *
042 */
043public abstract class TezGroupGate extends HadoopGroupGate implements InputSource
044  {
045  private static final Logger LOG = LoggerFactory.getLogger( TezGroupGate.class );
046
047  protected OrderedPartitionedKVOutput logicalOutput;
048  protected SortedListMultiMap<Integer, LogicalInput> logicalInputs;
049
050  public TezGroupGate( FlowProcess flowProcess, Splice splice, IORole role, LogicalOutput logicalOutput )
051    {
052    super( flowProcess, splice, role );
053
054    if( logicalOutput == null )
055      throw new IllegalArgumentException( "output must not be null" );
056
057    this.logicalOutput = (OrderedPartitionedKVOutput) logicalOutput;
058    }
059
060  public TezGroupGate( FlowProcess flowProcess, Splice splice, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs )
061    {
062    super( flowProcess, splice, role );
063
064    if( logicalInputs == null || logicalInputs.getKeys().size() == 0 )
065      throw new IllegalArgumentException( "inputs must not be null or empty" );
066
067    this.logicalInputs = logicalInputs;
068    }
069
070  @Override
071  public void initialize()
072    {
073    super.initialize();
074
075    if( role == IORole.sink )
076      return;
077
078    initComparators();
079    }
080
081  @Override
082  public void prepare()
083    {
084    try
085      {
086      if( logicalInputs != null )
087        {
088        for( LogicalInput logicalInput : logicalInputs.getValues() )
089          {
090          LOG.info( "calling {}#start() on: {} {}, for {} inputs", logicalInput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ), logicalInputs.getValues().size() );
091
092          logicalInput.start();
093          }
094        }
095
096      if( logicalOutput != null )
097        {
098        LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ) );
099
100        logicalOutput.start();
101        }
102      }
103    catch( Exception exception )
104      {
105      throw new CascadingException( "unable to start input/output", exception );
106      }
107
108    super.prepare();
109    }
110
111  @Override
112  public void run( Object input ) throws Throwable
113    {
114    Throwable throwable = reduce();
115
116    if( throwable != null )
117      throw throwable;
118    }
119
120  protected abstract Throwable reduce() throws Exception;
121
122  @Override
123  protected void wrapGroupingAndCollect( Duct previous, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException
124    {
125    collector.collect( groupKey, valuesTuple );
126    }
127
128  @Override
129  protected OutputCollector createOutputCollector()
130    {
131    return new OldOutputCollector( logicalOutput );
132    }
133  }