001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.stream.element;
022
023import cascading.flow.FlowProcess;
024import cascading.flow.hadoop.HadoopFlowProcess;
025import cascading.flow.hadoop.HadoopGroupByClosure;
026import cascading.flow.hadoop.stream.HadoopGroupGate;
027import cascading.flow.stream.duct.Duct;
028import cascading.flow.stream.graph.IORole;
029import cascading.pipe.GroupBy;
030import cascading.tuple.Tuple;
031import cascading.tuple.io.TuplePair;
032import org.apache.hadoop.mapred.OutputCollector;
033
034/**
035 *
036 */
037public class HadoopGroupByGate extends HadoopGroupGate
038  {
039  public HadoopGroupByGate( FlowProcess flowProcess, GroupBy groupBy, IORole role )
040    {
041    super( flowProcess, groupBy, role );
042    }
043
044  @Override
045  protected HadoopGroupByClosure createClosure()
046    {
047    // todo: collapse keyFields here if an array size > 1
048    return new HadoopGroupByClosure( flowProcess, keyFields, valuesFields );
049    }
050
051  @Override
052  protected void wrapGroupingAndCollect( Duct previous, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException
053    {
054    collector.collect( groupKey, valuesTuple );
055    }
056
057  @Override
058  protected Tuple unwrapGrouping( Tuple key )
059    {
060    return sortFields == null ? key : ( (TuplePair) key ).getLhs();
061    }
062
063  protected OutputCollector createOutputCollector()
064    {
065    return ( (HadoopFlowProcess) flowProcess ).getOutputCollector();
066    }
067  }