001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.stream; 022 023import java.util.Iterator; 024 025import cascading.CascadingException; 026import cascading.flow.FlowProcess; 027import cascading.flow.SliceCounters; 028import cascading.flow.hadoop.HadoopGroupByClosure; 029import cascading.flow.stream.duct.Duct; 030import cascading.flow.stream.duct.DuctException; 031import cascading.flow.stream.element.GroupingSpliceGate; 032import cascading.flow.stream.graph.IORole; 033import cascading.flow.stream.graph.StreamGraph; 034import cascading.pipe.Splice; 035import cascading.pipe.joiner.BufferJoin; 036import cascading.tap.hadoop.util.MeasuredOutputCollector; 037import cascading.tuple.Tuple; 038import cascading.tuple.TupleEntry; 039import org.apache.hadoop.mapred.OutputCollector; 040 041/** 042 * 043 */ 044public abstract class HadoopGroupGate extends GroupingSpliceGate 045 { 046 protected HadoopGroupByClosure closure; 047 protected OutputCollector collector; 048 049 private final boolean isBufferJoin; 050 051 public HadoopGroupGate( FlowProcess flowProcess, Splice splice, IORole role ) 052 { 053 super( flowProcess, splice, role ); 054 055 isBufferJoin = splice.getJoiner() instanceof BufferJoin; 056 } 057 058 @Override 059 public void bind( StreamGraph streamGraph ) 060 { 061 if( role != IORole.sink ) 062 next = getNextFor( streamGraph ); 063 064 if( role == IORole.sink ) 065 setOrdinalMap( streamGraph ); 066 } 067 068 @Override 069 public void prepare() 070 { 071 if( role != IORole.source ) 072 collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() ); 073 074 if( role != IORole.sink ) 075 closure = createClosure(); 076 077 if( grouping != null && splice.getJoinDeclaredFields() != null && splice.getJoinDeclaredFields().isNone() ) 078 grouping.joinerClosure = closure; 079 } 080 081 protected abstract OutputCollector createOutputCollector(); 082 083 @Override 084 public void start( Duct previous ) 085 { 086 if( next != null ) 087 super.start( previous ); 088 } 089 090 // todo: receive should receive the edge or ordinal so no lookup 091 public void receive( Duct previous, TupleEntry incomingEntry ) 092 { 093 Integer pos = ordinalMap.get( previous ); 094 095 // create a view over the incoming tuple 096 Tuple groupTupleView = keyBuilder[ pos ].makeResult( incomingEntry.getTuple(), null ); 097 098 // reset keyTuple via groupTuple or groupSortTuple 099 if( sortFields == null ) 100 groupTuple.reset( groupTupleView ); 101 else 102 groupSortTuple.reset( groupTupleView, sortBuilder[ pos ].makeResult( incomingEntry.getTuple(), null ) ); 103 104 valueTuple.reset( valuesBuilder[ pos ].makeResult( incomingEntry.getTuple(), null ) ); 105 106 try 107 { 108 // keyTuple is a reference to either groupTuple or groupSortTuple 109 wrapGroupingAndCollect( previous, (Tuple) valueTuple, keyTuple ); 110 flowProcess.increment( SliceCounters.Tuples_Written, 1 ); 111 } 112 catch( OutOfMemoryError error ) 113 { 114 handleReThrowableException( "out of memory, try increasing task memory allocation", error ); 115 } 116 catch( CascadingException exception ) 117 { 118 handleException( exception, incomingEntry ); 119 } 120 catch( Throwable throwable ) 121 { 122 handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry ); 123 } 124 } 125 126 @Override 127 public void complete( Duct previous ) 128 { 129 if( next != null ) 130 super.complete( previous ); 131 } 132 133 public void accept( Tuple key, Iterator<Tuple>[] values ) 134 { 135 key = unwrapGrouping( key ); 136 137 closure.reset( key, values ); 138 139 // Buffer is using JoinerClosure directly 140 if( !isBufferJoin ) 141 tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) ); 142 else 143 tupleEntryIterator.reset( values ); 144 145 keyEntry.setTuple( closure.getGroupTuple( key ) ); 146 147 next.receive( this, grouping ); 148 } 149 150 protected abstract HadoopGroupByClosure createClosure(); 151 152 protected abstract void wrapGroupingAndCollect( Duct previous, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException; 153 154 protected abstract Tuple unwrapGrouping( Tuple key ); 155 }