001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.hadoop.stream; 023 024import java.util.Iterator; 025 026import cascading.CascadingException; 027import cascading.flow.FlowProcess; 028import cascading.flow.SliceCounters; 029import cascading.flow.hadoop.HadoopGroupByClosure; 030import cascading.flow.stream.duct.Duct; 031import cascading.flow.stream.duct.DuctException; 032import cascading.flow.stream.element.GroupingSpliceGate; 033import cascading.flow.stream.graph.IORole; 034import cascading.flow.stream.graph.StreamGraph; 035import cascading.pipe.Splice; 036import cascading.pipe.joiner.BufferJoin; 037import cascading.tap.hadoop.util.MeasuredOutputCollector; 038import cascading.tuple.Tuple; 039import cascading.tuple.TupleEntry; 040import org.apache.hadoop.mapred.OutputCollector; 041 042/** 043 * 044 */ 045public abstract class HadoopGroupGate extends GroupingSpliceGate 046 { 047 protected HadoopGroupByClosure closure; 048 protected OutputCollector collector; 049 050 private final boolean isBufferJoin; 051 052 public HadoopGroupGate( FlowProcess flowProcess, Splice splice, IORole role ) 053 { 054 super( flowProcess, splice, role ); 055 056 isBufferJoin = splice.getJoiner() instanceof BufferJoin; 057 } 058 059 @Override 060 public void bind( StreamGraph streamGraph ) 061 { 062 if( role != IORole.sink ) 063 next = getNextFor( streamGraph ); 064 } 065 066 @Override 067 public void prepare() 068 { 069 if( role != IORole.source ) 070 collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() ); 071 072 if( role != IORole.sink ) 073 closure = createClosure(); 074 075 if( grouping != null && splice.getJoinDeclaredFields() != null && splice.getJoinDeclaredFields().isNone() ) 076 grouping.joinerClosure = closure; 077 } 078 079 protected abstract OutputCollector createOutputCollector(); 080 081 @Override 082 public void start( Duct previous ) 083 { 084 if( next != null ) 085 super.start( previous ); 086 } 087 088 // todo: receive should receive the edge or ordinal so no lookup 089 public void receive( Duct previous, int ordinal, TupleEntry incomingEntry ) 090 { 091 // create a view over the incoming tuple 092 Tuple groupTupleView = keyBuilder[ ordinal ].makeResult( incomingEntry.getTuple(), null ); 093 094 // reset keyTuple via groupTuple or groupSortTuple 095 if( sortFields == null ) 096 groupTuple.reset( groupTupleView ); 097 else 098 groupSortTuple.reset( groupTupleView, sortBuilder[ ordinal ].makeResult( incomingEntry.getTuple(), null ) ); 099 100 valueTuple.reset( valuesBuilder[ ordinal ].makeResult( incomingEntry.getTuple(), null ) ); 101 102 try 103 { 104 // keyTuple is a reference to either groupTuple or groupSortTuple 105 wrapGroupingAndCollect( previous, ordinal, (Tuple) valueTuple, keyTuple ); 106 flowProcess.increment( SliceCounters.Tuples_Written, 1 ); 107 } 108 catch( OutOfMemoryError error ) 109 { 110 handleReThrowableException( "out of memory, try increasing task memory allocation", error ); 111 } 112 catch( CascadingException exception ) 113 { 114 handleException( exception, incomingEntry ); 115 } 116 catch( Throwable throwable ) 117 { 118 handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry ); 119 } 120 } 121 122 @Override 123 public void complete( Duct previous ) 124 { 125 if( next != null ) 126 super.complete( previous ); 127 } 128 129 public void accept( Tuple key, Iterator<Tuple>[] values ) 130 { 131 key = unwrapGrouping( key ); 132 133 closure.reset( key, values ); 134 135 // Buffer is using JoinerClosure directly 136 if( !isBufferJoin ) 137 tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) ); 138 else 139 tupleEntryIterator.reset( values ); 140 141 keyEntry.setTuple( closure.getGroupTuple( key ) ); 142 143 next.receive( this, 0, grouping ); 144 } 145 146 protected abstract HadoopGroupByClosure createClosure(); 147 148 protected abstract void wrapGroupingAndCollect( Duct previous, int ordinal, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException; 149 150 protected abstract Tuple unwrapGrouping( Tuple key ); 151 }