001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.stream.element;
022
023import cascading.CascadingException;
024import cascading.flow.FlowProcess;
025import cascading.flow.hadoop.stream.HadoopGroupGate;
026import cascading.flow.stream.element.InputSource;
027import cascading.flow.stream.graph.IORole;
028import cascading.pipe.Pipe;
029import cascading.pipe.Splice;
030import cascading.util.SortedListMultiMap;
031import org.apache.hadoop.mapred.OutputCollector;
032import org.apache.tez.runtime.api.LogicalInput;
033import org.apache.tez.runtime.api.LogicalOutput;
034import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 *
040 */
041public abstract class TezGroupGate extends HadoopGroupGate implements InputSource
042  {
043  private static final Logger LOG = LoggerFactory.getLogger( TezGroupGate.class );
044
045  protected OrderedPartitionedKVOutput logicalOutput;
046  protected SortedListMultiMap<Integer, LogicalInput> logicalInputs;
047
048  public TezGroupGate( FlowProcess flowProcess, Splice splice, IORole role, LogicalOutput logicalOutput )
049    {
050    super( flowProcess, splice, role );
051
052    if( logicalOutput == null )
053      throw new IllegalArgumentException( "output must not be null" );
054
055    this.logicalOutput = (OrderedPartitionedKVOutput) logicalOutput;
056    }
057
058  public TezGroupGate( FlowProcess flowProcess, Splice splice, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs )
059    {
060    super( flowProcess, splice, role );
061
062    if( logicalInputs == null || logicalInputs.getKeys().size() == 0 )
063      throw new IllegalArgumentException( "inputs must not be null or empty" );
064
065    this.logicalInputs = logicalInputs;
066    }
067
068  @Override
069  public void initialize()
070    {
071    super.initialize();
072
073    if( role == IORole.sink )
074      return;
075
076    initComparators();
077    }
078
079  @Override
080  public void prepare()
081    {
082    try
083      {
084      if( logicalInputs != null )
085        {
086        for( LogicalInput logicalInput : logicalInputs.getValues() )
087          {
088          LOG.info( "calling {}#start() on: {} {}, for {} inputs", logicalInput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ), logicalInputs.getValues().size() );
089
090          logicalInput.start();
091          }
092        }
093
094      if( logicalOutput != null )
095        {
096        LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ) );
097
098        logicalOutput.start();
099        }
100      }
101    catch( Exception exception )
102      {
103      throw new CascadingException( "unable to start input/output", exception );
104      }
105
106    super.prepare();
107    }
108
109  @Override
110  public void run( Object input ) throws Throwable
111    {
112    Throwable throwable = reduce();
113
114    if( throwable != null )
115      throw throwable;
116    }
117
118  protected abstract Throwable reduce() throws Exception;
119
120  @Override
121  protected OutputCollector createOutputCollector()
122    {
123    return new OldOutputCollector( logicalOutput );
124    }
125  }