001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.hadoop.stream.element;
023
024import cascading.flow.FlowProcess;
025import cascading.flow.hadoop.HadoopFlowProcess;
026import cascading.flow.hadoop.HadoopGroupByClosure;
027import cascading.flow.hadoop.stream.HadoopGroupGate;
028import cascading.flow.stream.duct.Duct;
029import cascading.flow.stream.graph.IORole;
030import cascading.pipe.GroupBy;
031import cascading.tuple.Tuple;
032import cascading.tuple.io.TuplePair;
033import org.apache.hadoop.mapred.OutputCollector;
034
035/**
036 *
037 */
038public class HadoopGroupByGate extends HadoopGroupGate
039  {
040  public HadoopGroupByGate( FlowProcess flowProcess, GroupBy groupBy, IORole role )
041    {
042    super( flowProcess, groupBy, role );
043    }
044
045  @Override
046  protected HadoopGroupByClosure createClosure()
047    {
048    // todo: collapse keyFields here if an array size > 1
049    return new HadoopGroupByClosure( flowProcess, keyFields, valuesFields );
050    }
051
052  @Override
053  protected void wrapGroupingAndCollect( Duct previous, int ordinal, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException
054    {
055    collector.collect( groupKey, valuesTuple );
056    }
057
058  @Override
059  protected Tuple unwrapGrouping( Tuple key )
060    {
061    return sortFields == null ? key : ( (TuplePair) key ).getLhs();
062    }
063
064  protected OutputCollector createOutputCollector()
065    {
066    return ( (HadoopFlowProcess) flowProcess ).getOutputCollector();
067    }
068  }