001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.stream;
022
023import java.util.Iterator;
024
025import cascading.CascadingException;
026import cascading.flow.FlowProcess;
027import cascading.flow.SliceCounters;
028import cascading.flow.hadoop.HadoopGroupByClosure;
029import cascading.flow.stream.duct.Duct;
030import cascading.flow.stream.duct.DuctException;
031import cascading.flow.stream.element.GroupingSpliceGate;
032import cascading.flow.stream.graph.IORole;
033import cascading.flow.stream.graph.StreamGraph;
034import cascading.pipe.Splice;
035import cascading.pipe.joiner.BufferJoin;
036import cascading.tap.hadoop.util.MeasuredOutputCollector;
037import cascading.tuple.Tuple;
038import cascading.tuple.TupleEntry;
039import cascading.tuple.io.TuplePair;
040import org.apache.hadoop.mapred.OutputCollector;
041
042/**
043 *
044 */
045public abstract class HadoopGroupGate extends GroupingSpliceGate
046  {
047  protected HadoopGroupByClosure closure;
048  protected OutputCollector collector;
049
050  private final boolean isBufferJoin;
051
052  public HadoopGroupGate( FlowProcess flowProcess, Splice splice, IORole role )
053    {
054    super( flowProcess, splice, role );
055
056    isBufferJoin = splice.getJoiner() instanceof BufferJoin;
057    }
058
059  @Override
060  public void bind( StreamGraph streamGraph )
061    {
062    if( role != IORole.sink )
063      next = getNextFor( streamGraph );
064
065    if( role == IORole.sink )
066      setOrdinalMap( streamGraph );
067    }
068
069  @Override
070  public void prepare()
071    {
072    if( role != IORole.source )
073      collector = new MeasuredOutputCollector( flowProcess, SliceCounters.Write_Duration, createOutputCollector() );
074
075    if( role != IORole.sink )
076      closure = createClosure();
077
078    if( grouping != null && splice.getJoinDeclaredFields() != null && splice.getJoinDeclaredFields().isNone() )
079      grouping.joinerClosure = closure;
080    }
081
082  protected abstract OutputCollector createOutputCollector();
083
084  @Override
085  public void start( Duct previous )
086    {
087    if( next != null )
088      super.start( previous );
089    }
090
091  public void receive( Duct previous, TupleEntry incomingEntry ) // todo: receive should receive the edge or ordinal so no lookup
092  {
093  Integer pos = ordinalMap.get( previous ); // todo: when posMap size == 1, pos is always zero -- optimize #get() out
094
095  Tuple groupTuple = keyBuilder[ pos ].makeResult( incomingEntry.getTuple(), null );
096  Tuple sortTuple = sortFields == null ? null : sortBuilder[ pos ].makeResult( incomingEntry.getTuple(), null );
097  Tuple valuesTuple = valuesBuilder[ pos ].makeResult( incomingEntry.getTuple(), null );
098
099  Tuple groupKey = sortTuple == null ? groupTuple : new TuplePair( groupTuple, sortTuple );
100
101  try
102    {
103    wrapGroupingAndCollect( previous, valuesTuple, groupKey );
104    flowProcess.increment( SliceCounters.Tuples_Written, 1 );
105    }
106  catch( OutOfMemoryError error )
107    {
108    handleReThrowableException( "out of memory, try increasing task memory allocation", error );
109    }
110  catch( CascadingException exception )
111    {
112    handleException( exception, incomingEntry );
113    }
114  catch( Throwable throwable )
115    {
116    handleException( new DuctException( "internal error: " + incomingEntry.getTuple().print(), throwable ), incomingEntry );
117    }
118  }
119
120  @Override
121  public void complete( Duct previous )
122    {
123    if( next != null )
124      super.complete( previous );
125    }
126
127  public void accept( Tuple key, Iterator<Tuple>[] values )
128    {
129    key = unwrapGrouping( key );
130
131    closure.reset( key, values );
132
133    // Buffer is using JoinerClosure directly
134    if( !isBufferJoin )
135      tupleEntryIterator.reset( splice.getJoiner().getIterator( closure ) );
136    else
137      tupleEntryIterator.reset( values );
138
139    keyEntry.setTuple( closure.getGroupTuple( key ) );
140
141    next.receive( this, grouping );
142    }
143
144  protected abstract HadoopGroupByClosure createClosure();
145
146  protected abstract void wrapGroupingAndCollect( Duct previous, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException;
147
148  protected abstract Tuple unwrapGrouping( Tuple key );
149  }