001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.stream.element; 022 023import java.util.Collections; 024import java.util.List; 025import java.util.Map; 026 027import cascading.CascadingException; 028import cascading.flow.FlowProcess; 029import cascading.flow.SliceCounters; 030import cascading.flow.hadoop.HadoopCoGroupClosure; 031import cascading.flow.hadoop.util.TimedIterator; 032import cascading.flow.stream.duct.DuctException; 033import cascading.flow.stream.graph.IORole; 034import cascading.flow.tez.TezCoGroupClosure; 035import cascading.pipe.CoGroup; 036import cascading.tuple.Tuple; 037import cascading.tuple.io.TuplePair; 038import cascading.util.SortedListMultiMap; 039import org.apache.tez.runtime.api.LogicalInput; 040import org.apache.tez.runtime.api.LogicalOutput; 041import org.apache.tez.runtime.library.api.KeyValuesReader; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045/** 046 * 047 */ 048public class TezCoGroupGate extends TezGroupGate 049 { 050 private static final Logger LOG = LoggerFactory.getLogger( TezCoGroupGate.class ); 051 052 protected TimedIterator<Tuple>[] timedIterators; 053 054 public TezCoGroupGate( FlowProcess flowProcess, CoGroup coGroup, IORole role, LogicalOutput logicalOutput ) 055 { 056 super( flowProcess, coGroup, role, logicalOutput ); 057 } 058 059 public TezCoGroupGate( FlowProcess flowProcess, CoGroup coGroup, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs ) 060 { 061 super( flowProcess, coGroup, role, logicalInputs ); 062 063 this.timedIterators = new TimedIterator[ logicalInputs.getKeys().size() ]; 064 065 for( int i = 0; i < this.timedIterators.length; i++ ) 066 this.timedIterators[ i ] = new TimedIterator<>( flowProcess, SliceCounters.Read_Duration, SliceCounters.Tuples_Read, i ); 067 } 068 069 @Override 070 protected Throwable reduce() throws Exception 071 { 072 try 073 { 074 start( this ); 075 076 SortedListMultiMap<Integer, KeyValuesReader> readers = getKeyValuesReaders(); 077 SortedListMultiMap<Tuple, Iterable<Tuple>> iterables = getSortedMultiMap( readers.getKeys().size() ); 078 079 Map.Entry<Tuple, List<Iterable<Tuple>>> current = forwardToNext( readers, iterables, null ); 080 List<Iterable<Tuple>> currentValues; 081 082 while( current != null ) 083 { 084 currentValues = current.getValue(); 085 086 for( int i = 0; i < timedIterators.length; i++ ) 087 timedIterators[ i ].reset( currentValues.get( i ) ); 088 089 accept( current.getKey(), timedIterators ); 090 091 current = forwardToNext( readers, iterables, currentValues ); 092 } 093 094 complete( this ); 095 } 096 catch( Throwable throwable ) 097 { 098 if( !( throwable instanceof OutOfMemoryError ) ) 099 LOG.error( "caught throwable", throwable ); 100 101 return throwable; 102 } 103 104 return null; 105 } 106 107 private SortedListMultiMap<Integer, KeyValuesReader> getKeyValuesReaders() throws Exception 108 { 109 SortedListMultiMap<Integer, KeyValuesReader> readers = new SortedListMultiMap<>(); 110 111 for( Map.Entry<Integer, List<LogicalInput>> entry : logicalInputs.getEntries() ) 112 { 113 for( LogicalInput logicalInput : entry.getValue() ) 114 readers.put( entry.getKey(), (KeyValuesReader) logicalInput.getReader() ); 115 } 116 117 return readers; 118 } 119 120 private Map.Entry<Tuple, List<Iterable<Tuple>>> forwardToNext( SortedListMultiMap<Integer, KeyValuesReader> readers, SortedListMultiMap<Tuple, Iterable<Tuple>> iterables, List<Iterable<Tuple>> current ) 121 { 122 try 123 { 124 int size = current == null ? readers.getKeys().size() : current.size(); 125 126 for( int ordinal = 0; ordinal < size; ordinal++ ) 127 { 128 if( current != null && current.get( ordinal ) == null ) 129 continue; 130 131 for( KeyValuesReader reader : readers.getValues( ordinal ) ) 132 { 133 if( !reader.next() ) 134 continue; 135 136 Tuple currentKey = (Tuple) reader.getCurrentKey(); 137 138 if( splice.isSorted() ) 139 currentKey = ( (TuplePair) currentKey ).getLhs(); 140 141 currentKey = getDelegatedTuple( currentKey ); // applies hasher 142 143 Iterable<Tuple> currentValues = (Iterable) reader.getCurrentValues(); 144 145 iterables.set( currentKey, ordinal, currentValues ); 146 } 147 } 148 } 149 catch( OutOfMemoryError error ) 150 { 151 handleReThrowableException( "out of memory, try increasing task memory allocation", error ); 152 } 153 catch( CascadingException exception ) 154 { 155 handleException( exception, null ); 156 } 157 catch( Throwable throwable ) 158 { 159 handleException( new DuctException( "internal error", throwable ), null ); 160 } 161 162 return iterables.pollFirstEntry(); 163 } 164 165 private SortedListMultiMap<Tuple, Iterable<Tuple>> getSortedMultiMap( final int length ) 166 { 167 return new SortedListMultiMap<Tuple, Iterable<Tuple>>( getKeyComparator(), length ) 168 { 169 Iterable<Tuple>[] array = new Iterable[ length ]; 170 171 @Override 172 protected List createCollection() 173 { 174 List<Iterable<Tuple>> collection = super.createCollection(); 175 176 Collections.addAll( collection, array ); // init with nulls 177 178 return collection; 179 } 180 }; 181 } 182 183 @Override 184 protected HadoopCoGroupClosure createClosure() 185 { 186 return new TezCoGroupClosure( flowProcess, splice.getNumSelfJoins(), keyFields, valuesFields ); 187 } 188 189 @Override 190 protected Tuple unwrapGrouping( Tuple key ) 191 { 192 return key; 193 } 194 195 }