001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.stream.element; 022 023import java.io.IOException; 024import java.util.Collection; 025 026import cascading.CascadingException; 027import cascading.flow.FlowProcess; 028import cascading.flow.SliceCounters; 029import cascading.flow.planner.Scope; 030import cascading.flow.stream.duct.Duct; 031import cascading.flow.stream.duct.DuctException; 032import cascading.flow.stream.element.BoundaryStage; 033import cascading.flow.stream.element.InputSource; 034import cascading.flow.stream.graph.IORole; 035import cascading.flow.stream.graph.StreamGraph; 036import cascading.pipe.Boundary; 037import cascading.pipe.Pipe; 038import cascading.tap.hadoop.util.MeasuredOutputCollector; 039import cascading.tuple.Tuple; 040import cascading.tuple.TupleEntry; 041import cascading.tuple.io.KeyTuple; 042import cascading.tuple.io.ValueTuple; 043import cascading.tuple.util.Resettable1; 044import cascading.util.Util; 045import org.apache.hadoop.mapred.OutputCollector; 046import org.apache.tez.runtime.api.LogicalInput; 047import org.apache.tez.runtime.api.LogicalOutput; 048import org.apache.tez.runtime.library.api.KeyValueReader; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * 054 */ 055public class TezBoundaryStage extends BoundaryStage<TupleEntry, TupleEntry> implements InputSource 056 { 057 private static final Logger LOG = LoggerFactory.getLogger( TezBoundaryStage.class ); 058 059 protected Collection<LogicalOutput> logicalOutputs; 060 protected LogicalInput logicalInput; 061 062 private MeasuredOutputCollector collector; 063 private TupleEntry valueEntry; 064 065 private final Resettable1<Tuple> keyTuple = new KeyTuple(); 066 067 public TezBoundaryStage( FlowProcess flowProcess, Boundary boundary, IORole role, Collection<LogicalOutput> logicalOutputs ) 068 { 069 super( flowProcess, boundary, role ); 070 071 if( logicalOutputs == null || logicalOutputs.isEmpty() ) 072 throw new IllegalArgumentException( "output must not be null or empty" ); 073 074 this.logicalOutputs = logicalOutputs; 075 } 076 077 public TezBoundaryStage( FlowProcess flowProcess, Boundary boundary, IORole role, LogicalInput logicalInput ) 078 { 079 super( flowProcess, boundary, role ); 080 081 if( logicalInput == null ) 082 throw new IllegalArgumentException( "inputs must not be null or empty" ); 083 084 this.logicalInput = logicalInput; 085 } 086 087 @Override 088 public void initialize() 089 { 090 super.initialize(); 091 092 Scope outgoingScope = Util.getFirst( outgoingScopes ); 093 valueEntry = new TupleEntry( outgoingScope.getIncomingFunctionPassThroughFields(), true ); 094 } 095 096 @Override 097 public void bind( StreamGraph streamGraph ) 098 { 099 if( role != IORole.sink ) 100 next = getNextFor( streamGraph ); 101 } 102 103 @Override 104 public void prepare() 105 { 106 try 107 { 108 if( logicalInput != null ) 109 { 110 LOG.info( "calling {}#start() on: {} {}", logicalInput.getClass().getSimpleName(), getBoundary(), Pipe.id( getBoundary() ) ); 111 112 logicalInput.start(); 113 } 114 115 if( logicalOutputs != null ) 116 { 117 for( LogicalOutput logicalOutput : logicalOutputs ) 118 { 119 LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getBoundary(), Pipe.id( getBoundary() ) ); 120 121 logicalOutput.start(); 122 } 123 } 124 } 125 catch( Exception exception ) 126 { 127 throw new CascadingException( "unable to start input/output", exception ); 128 } 129 130 if( role != IORole.source ) 131 collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() ); 132 133 super.prepare(); 134 } 135 136 @Override 137 public void start( Duct previous ) 138 { 139 if( next != null ) 140 super.start( previous ); 141 } 142 143 @Override 144 public void receive( Duct previous, TupleEntry incomingEntry ) 145 { 146 try 147 { 148 Tuple tuple = incomingEntry.getTuple(); 149 150 keyTuple.reset( tuple ); 151 152 collector.collect( keyTuple, ValueTuple.NULL ); 153 flowProcess.increment( SliceCounters.Tuples_Written, 1 ); 154 } 155 catch( OutOfMemoryError error ) 156 { 157 handleReThrowableException( "out of memory, try increasing task memory allocation", error ); 158 } 159 catch( CascadingException exception ) 160 { 161 handleException( exception, incomingEntry ); 162 } 163 catch( Throwable throwable ) 164 { 165 handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry ); 166 } 167 } 168 169 @Override 170 public void complete( Duct previous ) 171 { 172 if( next != null ) 173 super.complete( previous ); 174 } 175 176 @Override 177 public void run( Object input ) throws Throwable 178 { 179 Throwable throwable = map(); 180 181 if( throwable != null ) 182 throw throwable; 183 } 184 185 protected Throwable map() throws Exception 186 { 187 Throwable localThrowable = null; 188 189 try 190 { 191 start( this ); 192 193 KeyValueReader reader = (KeyValueReader) logicalInput.getReader(); 194 195 while( reader.next() ) 196 { 197 Tuple currentKey = (Tuple) reader.getCurrentKey(); 198 199 valueEntry.setTuple( currentKey ); 200 next.receive( this, valueEntry ); 201 } 202 203 complete( this ); 204 } 205 catch( Throwable throwable ) 206 { 207 if( !( throwable instanceof OutOfMemoryError ) ) 208 LOG.error( "caught throwable", throwable ); 209 210 return throwable; 211 } 212 213 return localThrowable; 214 } 215 216 protected OutputCollector createOutputCollector() 217 { 218 if( logicalOutputs.size() == 1 ) 219 return new OldOutputCollector( Util.getFirst( logicalOutputs ) ); 220 221 final OutputCollector[] collectors = new OutputCollector[ logicalOutputs.size() ]; 222 223 int count = 0; 224 for( LogicalOutput logicalOutput : logicalOutputs ) 225 collectors[ count++ ] = new OldOutputCollector( logicalOutput ); 226 227 return new OutputCollector() 228 { 229 @Override 230 public void collect( Object key, Object value ) throws IOException 231 { 232 for( OutputCollector outputCollector : collectors ) 233 outputCollector.collect( key, value ); 234 } 235 }; 236 } 237 }