001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.stream.element; 022 023import cascading.CascadingException; 024import cascading.flow.FlowProcess; 025import cascading.flow.hadoop.stream.HadoopGroupGate; 026import cascading.flow.stream.duct.Duct; 027import cascading.flow.stream.element.InputSource; 028import cascading.flow.stream.graph.IORole; 029import cascading.pipe.Pipe; 030import cascading.pipe.Splice; 031import cascading.tuple.Tuple; 032import cascading.util.SortedListMultiMap; 033import org.apache.hadoop.mapred.OutputCollector; 034import org.apache.tez.runtime.api.LogicalInput; 035import org.apache.tez.runtime.api.LogicalOutput; 036import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * 042 */ 043public abstract class TezGroupGate extends HadoopGroupGate implements InputSource 044 { 045 private static final Logger LOG = LoggerFactory.getLogger( TezGroupGate.class ); 046 047 protected OrderedPartitionedKVOutput logicalOutput; 048 protected SortedListMultiMap<Integer, LogicalInput> logicalInputs; 049 050 public TezGroupGate( FlowProcess flowProcess, Splice splice, IORole role, LogicalOutput logicalOutput ) 051 { 052 super( flowProcess, splice, role ); 053 054 if( logicalOutput == null ) 055 throw new IllegalArgumentException( "output must not be null" ); 056 057 this.logicalOutput = (OrderedPartitionedKVOutput) logicalOutput; 058 } 059 060 public TezGroupGate( FlowProcess flowProcess, Splice splice, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs ) 061 { 062 super( flowProcess, splice, role ); 063 064 if( logicalInputs == null || logicalInputs.getKeys().size() == 0 ) 065 throw new IllegalArgumentException( "inputs must not be null or empty" ); 066 067 this.logicalInputs = logicalInputs; 068 } 069 070 @Override 071 public void initialize() 072 { 073 super.initialize(); 074 075 if( role == IORole.sink ) 076 return; 077 078 initComparators(); 079 } 080 081 @Override 082 public void prepare() 083 { 084 try 085 { 086 if( logicalInputs != null ) 087 { 088 for( LogicalInput logicalInput : logicalInputs.getValues() ) 089 { 090 LOG.info( "calling {}#start() on: {} {}, for {} inputs", logicalInput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ), logicalInputs.getValues().size() ); 091 092 logicalInput.start(); 093 } 094 } 095 096 if( logicalOutput != null ) 097 { 098 LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ) ); 099 100 logicalOutput.start(); 101 } 102 } 103 catch( Exception exception ) 104 { 105 throw new CascadingException( "unable to start input/output", exception ); 106 } 107 108 super.prepare(); 109 } 110 111 @Override 112 public void run( Object input ) throws Throwable 113 { 114 Throwable throwable = reduce(); 115 116 if( throwable != null ) 117 throw throwable; 118 } 119 120 protected abstract Throwable reduce() throws Exception; 121 122 @Override 123 protected void wrapGroupingAndCollect( Duct previous, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException 124 { 125 collector.collect( groupKey, valuesTuple ); 126 } 127 128 @Override 129 protected OutputCollector createOutputCollector() 130 { 131 return new OldOutputCollector( logicalOutput ); 132 } 133 }