001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.stream; 022 023import java.util.Iterator; 024 025import cascading.CascadingException; 026import cascading.flow.FlowProcess; 027import cascading.flow.SliceCounters; 028import cascading.flow.hadoop.HadoopGroupByClosure; 029import cascading.flow.stream.duct.Duct; 030import cascading.flow.stream.duct.DuctException; 031import cascading.flow.stream.element.GroupingSpliceGate; 032import cascading.flow.stream.graph.IORole; 033import cascading.flow.stream.graph.StreamGraph; 034import cascading.pipe.Splice; 035import cascading.pipe.joiner.BufferJoin; 036import cascading.tap.hadoop.util.MeasuredOutputCollector; 037import cascading.tuple.Tuple; 038import cascading.tuple.TupleEntry; 039import cascading.tuple.io.TuplePair; 040import org.apache.hadoop.mapred.OutputCollector; 041 042/** 043 * 044 */ 045public abstract class HadoopGroupGate extends GroupingSpliceGate 046 { 047 protected HadoopGroupByClosure closure; 048 protected OutputCollector collector; 049 050 private final boolean isBufferJoin; 051 052 public HadoopGroupGate( FlowProcess flowProcess, Splice splice, IORole role ) 053 { 054 super( flowProcess, splice, role ); 055 056 isBufferJoin = splice.getJoiner() instanceof BufferJoin; 057 } 058 059 @Override 060 public void bind( StreamGraph streamGraph ) 061 { 062 if( role != IORole.sink ) 063 next = getNextFor( streamGraph ); 064 065 if( role == IORole.sink ) 066 setOrdinalMap( streamGraph ); 067 } 068 069 @Override 070 public void prepare() 071 { 072 if( role != IORole.source ) 073 collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() ); 074 075 if( role != IORole.sink ) 076 closure = createClosure(); 077 078 if( grouping != null && splice.getJoinDeclaredFields() != null && splice.getJoinDeclaredFields().isNone() ) 079 grouping.joinerClosure = closure; 080 } 081 082 protected abstract OutputCollector createOutputCollector(); 083 084 @Override 085 public void start( Duct previous ) 086 { 087 if( next != null ) 088 super.start( previous ); 089 } 090 091 public void receive( Duct previous, TupleEntry incomingEntry ) // todo: receive should receive the edge or ordinal so no lookup 092 { 093 Integer pos = ordinalMap.get( previous ); // todo: when posMap size == 1, pos is always zero -- optimize #get() out 094 095 Tuple groupTuple = keyBuilder[ pos ].makeResult( incomingEntry.getTuple(), null ); 096 Tuple sortTuple = sortFields == null ? null : sortBuilder[ pos ].makeResult( incomingEntry.getTuple(), null ); 097 Tuple valuesTuple = valuesBuilder[ pos ].makeResult( incomingEntry.getTuple(), null ); 098 099 Tuple groupKey = sortTuple == null ? groupTuple : new TuplePair( groupTuple, sortTuple ); 100 101 try 102 { 103 wrapGroupingAndCollect( previous, valuesTuple, groupKey ); 104 flowProcess.increment( SliceCounters.Tuples_Written, 1 ); 105 } 106 catch( OutOfMemoryError error ) 107 { 108 handleReThrowableException( "out of memory, try increasing task memory allocation", error ); 109 } 110 catch( CascadingException exception ) 111 { 112 handleException( exception, incomingEntry ); 113 } 114 catch( Throwable throwable ) 115 { 116 handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry ); 117 } 118 } 119 120 @Override 121 public void complete( Duct previous ) 122 { 123 if( next != null ) 124 super.complete( previous ); 125 } 126 127 public void accept( Tuple key, Iterator<Tuple>[] values ) 128 { 129 key = unwrapGrouping( key ); 130 131 closure.reset( key, values ); 132 133 // Buffer is using JoinerClosure directly 134 if( !isBufferJoin ) 135 tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) ); 136 else 137 tupleEntryIterator.reset( values ); 138 139 keyEntry.setTuple( closure.getGroupTuple( key ) ); 140 141 next.receive( this, grouping ); 142 } 143 144 protected abstract HadoopGroupByClosure createClosure(); 145 146 protected abstract void wrapGroupingAndCollect( Duct previous, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException; 147 148 protected abstract Tuple unwrapGrouping( Tuple key ); 149 }