001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.stream.element;
022
023import java.util.Collections;
024import java.util.List;
025import java.util.Map;
026
027import cascading.CascadingException;
028import cascading.flow.FlowProcess;
029import cascading.flow.SliceCounters;
030import cascading.flow.hadoop.HadoopCoGroupClosure;
031import cascading.flow.hadoop.util.TimedIterator;
032import cascading.flow.stream.duct.DuctException;
033import cascading.flow.stream.graph.IORole;
034import cascading.flow.tez.TezCoGroupClosure;
035import cascading.pipe.CoGroup;
036import cascading.tuple.Tuple;
037import cascading.tuple.io.TuplePair;
038import cascading.util.SortedListMultiMap;
039import org.apache.tez.runtime.api.LogicalInput;
040import org.apache.tez.runtime.api.LogicalOutput;
041import org.apache.tez.runtime.library.api.KeyValuesReader;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 *
047 */
048public class TezCoGroupGate extends TezGroupGate
049  {
050  private static final Logger LOG = LoggerFactory.getLogger( TezCoGroupGate.class );
051
052  protected TimedIterator<Tuple>[] timedIterators;
053
054  public TezCoGroupGate( FlowProcess flowProcess, CoGroup coGroup, IORole role, LogicalOutput logicalOutput )
055    {
056    super( flowProcess, coGroup, role, logicalOutput );
057    }
058
059  public TezCoGroupGate( FlowProcess flowProcess, CoGroup coGroup, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs )
060    {
061    super( flowProcess, coGroup, role, logicalInputs );
062
063    this.timedIterators = new TimedIterator[ logicalInputs.getKeys().size() ];
064
065    for( int i = 0; i < this.timedIterators.length; i++ )
066      this.timedIterators[ i ] = new TimedIterator<>( flowProcess, SliceCounters.Read_Duration, SliceCounters.Tuples_Read, i );
067    }
068
069  @Override
070  protected Throwable reduce() throws Exception
071    {
072    try
073      {
074      start( this );
075
076      SortedListMultiMap<Integer, KeyValuesReader> readers = getKeyValuesReaders();
077      SortedListMultiMap<Tuple, Iterable<Tuple>> iterables = getSortedMultiMap( readers.getKeys().size() );
078
079      Map.Entry<Tuple, List<Iterable<Tuple>>> current = forwardToNext( readers, iterables, null );
080      List<Iterable<Tuple>> currentValues;
081
082      while( current != null )
083        {
084        currentValues = current.getValue();
085
086        for( int i = 0; i < timedIterators.length; i++ )
087          timedIterators[ i ].reset( currentValues.get( i ) );
088
089        accept( current.getKey(), timedIterators );
090
091        current = forwardToNext( readers, iterables, currentValues );
092        }
093
094      complete( this );
095      }
096    catch( Throwable throwable )
097      {
098      if( !( throwable instanceof OutOfMemoryError ) )
099        LOG.error( "caught throwable", throwable );
100
101      return throwable;
102      }
103
104    return null;
105    }
106
107  private SortedListMultiMap<Integer, KeyValuesReader> getKeyValuesReaders() throws Exception
108    {
109    SortedListMultiMap<Integer, KeyValuesReader> readers = new SortedListMultiMap<>();
110
111    for( Map.Entry<Integer, List<LogicalInput>> entry : logicalInputs.getEntries() )
112      {
113      for( LogicalInput logicalInput : entry.getValue() )
114        readers.put( entry.getKey(), (KeyValuesReader) logicalInput.getReader() );
115      }
116
117    return readers;
118    }
119
120  private Map.Entry<Tuple, List<Iterable<Tuple>>> forwardToNext( SortedListMultiMap<Integer, KeyValuesReader> readers, SortedListMultiMap<Tuple, Iterable<Tuple>> iterables, List<Iterable<Tuple>> current )
121    {
122    try
123      {
124      int size = current == null ? readers.getKeys().size() : current.size();
125
126      for( int ordinal = 0; ordinal < size; ordinal++ )
127        {
128        if( current != null && current.get( ordinal ) == null )
129          continue;
130
131        for( KeyValuesReader reader : readers.getValues( ordinal ) )
132          {
133          if( !reader.next() )
134            continue;
135
136          Tuple currentKey = (Tuple) reader.getCurrentKey();
137
138          if( splice.isSorted() )
139            currentKey = ( (TuplePair) currentKey ).getLhs();
140
141          currentKey = getDelegatedTuple( currentKey ); // applies hasher
142
143          Iterable<Tuple> currentValues = (Iterable) reader.getCurrentValues();
144
145          iterables.set( currentKey, ordinal, currentValues );
146          }
147        }
148      }
149    catch( OutOfMemoryError error )
150      {
151      handleReThrowableException( "out of memory, try increasing task memory allocation", error );
152      }
153    catch( CascadingException exception )
154      {
155      handleException( exception, null );
156      }
157    catch( Throwable throwable )
158      {
159      handleException( new DuctException( "internal error", throwable ), null );
160      }
161
162    return iterables.pollFirstEntry();
163    }
164
165  private SortedListMultiMap<Tuple, Iterable<Tuple>> getSortedMultiMap( final int length )
166    {
167    return new SortedListMultiMap<Tuple, Iterable<Tuple>>( getKeyComparator(), length )
168    {
169    Iterable<Tuple>[] array = new Iterable[ length ];
170
171    @Override
172    protected List createCollection()
173      {
174      List<Iterable<Tuple>> collection = super.createCollection();
175
176      Collections.addAll( collection, array ); // init with nulls
177
178      return collection;
179      }
180    };
181    }
182
183  @Override
184  protected HadoopCoGroupClosure createClosure()
185    {
186    return new TezCoGroupClosure( flowProcess, splice.getNumSelfJoins(), keyFields, valuesFields );
187    }
188
189  @Override
190  protected Tuple unwrapGrouping( Tuple key )
191    {
192    return key;
193    }
194
195  }