001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.stream.element; 022 023import java.io.IOException; 024import java.util.Collection; 025 026import cascading.CascadingException; 027import cascading.flow.FlowProcess; 028import cascading.flow.SliceCounters; 029import cascading.flow.planner.Scope; 030import cascading.flow.stream.duct.Duct; 031import cascading.flow.stream.duct.DuctException; 032import cascading.flow.stream.element.InputSource; 033import cascading.flow.stream.element.SpliceGate; 034import cascading.flow.stream.graph.IORole; 035import cascading.flow.stream.graph.StreamGraph; 036import cascading.pipe.Pipe; 037import cascading.pipe.Splice; 038import cascading.tap.hadoop.util.MeasuredOutputCollector; 039import cascading.tuple.Tuple; 040import cascading.tuple.TupleEntry; 041import cascading.tuple.io.KeyTuple; 042import cascading.tuple.io.ValueTuple; 043import cascading.tuple.util.Resettable1; 044import cascading.util.SortedListMultiMap; 045import cascading.util.Util; 046import org.apache.hadoop.mapred.OutputCollector; 047import org.apache.tez.runtime.api.LogicalInput; 048import org.apache.tez.runtime.api.LogicalOutput; 049import org.apache.tez.runtime.library.api.KeyValueReader; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053/** 054 * 055 */ 056public class TezMergeGate extends SpliceGate<TupleEntry, TupleEntry> implements InputSource 057 { 058 private static final Logger LOG = LoggerFactory.getLogger( TezMergeGate.class ); 059 060 protected Collection<LogicalOutput> logicalOutputs; 061 protected SortedListMultiMap<Integer, LogicalInput> logicalInputs; 062 063 private MeasuredOutputCollector collector; 064 private TupleEntry valueEntry; 065 066 private final Resettable1<Tuple> keyTuple = new KeyTuple(); 067 068 public TezMergeGate( FlowProcess flowProcess, Splice splice, IORole role, Collection<LogicalOutput> logicalOutputs ) 069 { 070 super( flowProcess, splice, role ); 071 072 if( logicalOutputs == null || logicalOutputs.isEmpty() ) 073 throw new IllegalArgumentException( "output must not be null or empty" ); 074 075 this.logicalOutputs = logicalOutputs; 076 } 077 078 public TezMergeGate( FlowProcess flowProcess, Splice splice, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs ) 079 { 080 super( flowProcess, splice, role ); 081 082 if( logicalInputs == null || logicalInputs.getKeys().size() == 0 ) 083 throw new IllegalArgumentException( "inputs must not be null or empty" ); 084 085 if( logicalInputs.getValues().size() != 1 ) 086 throw new IllegalArgumentException( "only supports a single input" ); 087 088 this.logicalInputs = logicalInputs; 089 } 090 091 @Override 092 public void initialize() 093 { 094 super.initialize(); 095 096 Scope outgoingScope = Util.getFirst( outgoingScopes ); 097 valueEntry = new TupleEntry( outgoingScope.getOutValuesFields(), true ); 098 } 099 100 @Override 101 public void bind( StreamGraph streamGraph ) 102 { 103 if( role != IORole.sink ) 104 next = getNextFor( streamGraph ); 105 } 106 107 @Override 108 public void prepare() 109 { 110 try 111 { 112 if( logicalInputs != null ) 113 { 114 for( LogicalInput logicalInput : logicalInputs.getValues() ) 115 { 116 LOG.info( "calling {}#start() on: {} {}, for {} inputs", logicalInput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ), logicalInputs.getValues().size() ); 117 118 logicalInput.start(); 119 } 120 } 121 122 if( logicalOutputs != null ) 123 { 124 for( LogicalOutput logicalOutput : logicalOutputs ) 125 { 126 LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ) ); 127 128 logicalOutput.start(); 129 } 130 } 131 } 132 catch( Exception exception ) 133 { 134 throw new CascadingException( "unable to start input/output", exception ); 135 } 136 137 if( role != IORole.source ) 138 collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() ); 139 140 super.prepare(); 141 } 142 143 @Override 144 public void start( Duct previous ) 145 { 146 if( next != null ) 147 super.start( previous ); 148 } 149 150 @Override 151 public void receive( Duct previous, TupleEntry incomingEntry ) 152 { 153 try 154 { 155 keyTuple.reset( incomingEntry.getTuple() ); 156 157 collector.collect( keyTuple, ValueTuple.NULL ); 158 flowProcess.increment( SliceCounters.Tuples_Written, 1 ); 159 } 160 catch( OutOfMemoryError error ) 161 { 162 handleReThrowableException( "out of memory, try increasing task memory allocation", error ); 163 } 164 catch( CascadingException exception ) 165 { 166 handleException( exception, incomingEntry ); 167 } 168 catch( Throwable throwable ) 169 { 170 handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry ); 171 } 172 } 173 174 @Override 175 public void complete( Duct previous ) 176 { 177 if( next != null ) 178 super.complete( previous ); 179 } 180 181 @Override 182 public void run( Object input ) throws Throwable 183 { 184 Throwable throwable = map(); 185 186 if( throwable != null ) 187 throw throwable; 188 } 189 190 protected Throwable map() throws Exception 191 { 192 Throwable localThrowable = null; 193 194 try 195 { 196 start( this ); 197 198 // if multiple ordinals, an input could be duplicated if sourcing multiple paths 199 LogicalInput logicalInput = Util.getFirst( logicalInputs.getValues() ); 200 201 KeyValueReader reader = (KeyValueReader) logicalInput.getReader(); 202 203 while( reader.next() ) 204 { 205 Tuple currentKey = (Tuple) reader.getCurrentKey(); 206 207 valueEntry.setTuple( currentKey ); 208 next.receive( this, valueEntry ); 209 } 210 211 complete( this ); 212 } 213 catch( Throwable throwable ) 214 { 215 if( !( throwable instanceof OutOfMemoryError ) ) 216 LOG.error( "caught throwable", throwable ); 217 218 return throwable; 219 } 220 221 return localThrowable; 222 } 223 224 protected OutputCollector createOutputCollector() 225 { 226 if( logicalOutputs.size() == 1 ) 227 return new OldOutputCollector( Util.getFirst( logicalOutputs ) ); 228 229 final OutputCollector[] collectors = new OutputCollector[ logicalOutputs.size() ]; 230 231 int count = 0; 232 for( LogicalOutput logicalOutput : logicalOutputs ) 233 collectors[ count++ ] = new OldOutputCollector( logicalOutput ); 234 235 return new OutputCollector() 236 { 237 @Override 238 public void collect( Object key, Object value ) throws IOException 239 { 240 for( OutputCollector outputCollector : collectors ) 241 outputCollector.collect( key, value ); 242 } 243 }; 244 } 245 }