001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.stream.element;
022
023import cascading.flow.FlowProcess;
024import cascading.flow.SliceCounters;
025import cascading.flow.hadoop.HadoopGroupByClosure;
026import cascading.flow.hadoop.util.TimedIterator;
027import cascading.flow.stream.graph.IORole;
028import cascading.flow.tez.TezGroupByClosure;
029import cascading.flow.tez.util.SecondarySortKeyValuesReader;
030import cascading.pipe.GroupBy;
031import cascading.tuple.Tuple;
032import cascading.tuple.io.TuplePair;
033import cascading.util.SortedListMultiMap;
034import cascading.util.Util;
035import org.apache.tez.runtime.api.LogicalInput;
036import org.apache.tez.runtime.api.LogicalOutput;
037import org.apache.tez.runtime.library.api.KeyValuesReader;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 *
043 */
044public class TezGroupByGate extends TezGroupGate
045  {
046  private static final Logger LOG = LoggerFactory.getLogger( TezGroupByGate.class );
047
048  protected TimedIterator[] timedIterators;
049
050  public TezGroupByGate( FlowProcess flowProcess, GroupBy groupBy, IORole role, LogicalOutput logicalOutput )
051    {
052    super( flowProcess, groupBy, role, logicalOutput );
053    }
054
055  public TezGroupByGate( FlowProcess flowProcess, GroupBy groupBy, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs )
056    {
057    super( flowProcess, groupBy, role, logicalInputs );
058
059    this.timedIterators = TimedIterator.iterators( new TimedIterator<>( flowProcess, SliceCounters.Read_Duration, SliceCounters.Tuples_Read ) );
060    }
061
062  protected Throwable reduce() throws Exception
063    {
064    try
065      {
066      start( this );
067
068      // if multiple ordinals, an input could be duplicated if sourcing multiple paths
069      LogicalInput logicalInput = Util.getFirst( logicalInputs.getValues() );
070
071      KeyValuesReader reader = (KeyValuesReader) logicalInput.getReader();
072
073      if( sortFields != null )
074        reader = new SecondarySortKeyValuesReader( reader, groupComparators[ 0 ] );
075
076      while( reader.next() )
077        {
078        Tuple currentKey = (Tuple) reader.getCurrentKey(); // if secondary sorting, is a TuplePair
079        Iterable currentValues = reader.getCurrentValues();
080
081        timedIterators[ 0 ].reset( currentValues );
082
083        accept( currentKey, timedIterators ); // will unwrap the TuplePair
084        }
085
086      complete( this );
087      }
088    catch( Throwable throwable )
089      {
090      if( !( throwable instanceof OutOfMemoryError ) )
091        LOG.error( "caught throwable", throwable );
092
093      return throwable;
094      }
095
096    return null;
097    }
098
099  @Override
100  protected HadoopGroupByClosure createClosure()
101    {
102    return new TezGroupByClosure( flowProcess, keyFields, valuesFields );
103    }
104
105  @Override
106  protected Tuple unwrapGrouping( Tuple key )
107    {
108    // copying the lhs key during secondary sorting prevents the key from advancing at the end of the
109    // aggregation iterator
110    return sortFields == null ? key : new Tuple( ( (TuplePair) key ).getLhs() );
111    }
112  }