001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.stream.element; 022 023import cascading.CascadingException; 024import cascading.flow.FlowProcess; 025import cascading.flow.hadoop.stream.HadoopGroupGate; 026import cascading.flow.stream.element.InputSource; 027import cascading.flow.stream.graph.IORole; 028import cascading.pipe.Pipe; 029import cascading.pipe.Splice; 030import cascading.util.SortedListMultiMap; 031import org.apache.hadoop.mapred.OutputCollector; 032import org.apache.tez.runtime.api.LogicalInput; 033import org.apache.tez.runtime.api.LogicalOutput; 034import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * 040 */ 041public abstract class TezGroupGate extends HadoopGroupGate implements InputSource 042 { 043 private static final Logger LOG = LoggerFactory.getLogger( TezGroupGate.class ); 044 045 protected OrderedPartitionedKVOutput logicalOutput; 046 protected SortedListMultiMap<Integer, LogicalInput> logicalInputs; 047 048 public TezGroupGate( FlowProcess flowProcess, Splice splice, IORole role, LogicalOutput logicalOutput ) 049 { 050 super( flowProcess, splice, role ); 051 052 if( logicalOutput == null ) 053 throw new IllegalArgumentException( "output must not be null" ); 054 055 this.logicalOutput = (OrderedPartitionedKVOutput) logicalOutput; 056 } 057 058 public TezGroupGate( FlowProcess flowProcess, Splice splice, IORole role, SortedListMultiMap<Integer, LogicalInput> logicalInputs ) 059 { 060 super( flowProcess, splice, role ); 061 062 if( logicalInputs == null || logicalInputs.getKeys().size() == 0 ) 063 throw new IllegalArgumentException( "inputs must not be null or empty" ); 064 065 this.logicalInputs = logicalInputs; 066 } 067 068 @Override 069 public void initialize() 070 { 071 super.initialize(); 072 073 if( role == IORole.sink ) 074 return; 075 076 initComparators(); 077 } 078 079 @Override 080 public void prepare() 081 { 082 try 083 { 084 if( logicalInputs != null ) 085 { 086 for( LogicalInput logicalInput : logicalInputs.getValues() ) 087 { 088 LOG.info( "calling {}#start() on: {} {}, for {} inputs", logicalInput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ), logicalInputs.getValues().size() ); 089 090 logicalInput.start(); 091 } 092 } 093 094 if( logicalOutput != null ) 095 { 096 LOG.info( "calling {}#start() on: {} {}", logicalOutput.getClass().getSimpleName(), getSplice(), Pipe.id( getSplice() ) ); 097 098 logicalOutput.start(); 099 } 100 } 101 catch( Exception exception ) 102 { 103 throw new CascadingException( "unable to start input/output", exception ); 104 } 105 106 super.prepare(); 107 } 108 109 @Override 110 public void run( Object input ) throws Throwable 111 { 112 Throwable throwable = reduce(); 113 114 if( throwable != null ) 115 throw throwable; 116 } 117 118 protected abstract Throwable reduce() throws Exception; 119 120 @Override 121 protected OutputCollector createOutputCollector() 122 { 123 return new OldOutputCollector( logicalOutput ); 124 } 125 }