001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.hadoop.stream.element;
023
024import cascading.flow.FlowProcess;
025import cascading.flow.hadoop.HadoopCoGroupClosure;
026import cascading.flow.hadoop.HadoopFlowProcess;
027import cascading.flow.hadoop.stream.HadoopGroupGate;
028import cascading.flow.stream.duct.Duct;
029import cascading.flow.stream.graph.IORole;
030import cascading.flow.stream.graph.StreamGraph;
031import cascading.pipe.CoGroup;
032import cascading.tuple.Tuple;
033import cascading.tuple.io.IndexTuple;
034import cascading.tuple.io.KeyIndexTuple;
035import cascading.tuple.io.ValueIndexTuple;
036import org.apache.hadoop.mapred.OutputCollector;
037
038/**
039 *
040 */
041public class HadoopCoGroupGate extends HadoopGroupGate
042  {
043  IndexTuple keyTuple = new KeyIndexTuple();
044  IndexTuple valueTuple = new ValueIndexTuple();
045
046  public HadoopCoGroupGate( FlowProcess flowProcess, CoGroup coGroup, IORole role )
047    {
048    super( flowProcess, coGroup, role );
049    }
050
051  @Override
052  public void bind( StreamGraph streamGraph )
053    {
054    super.bind( streamGraph );
055    }
056
057  @Override
058  protected HadoopCoGroupClosure createClosure()
059    {
060    return new HadoopCoGroupClosure( flowProcess, splice.getNumSelfJoins(), keyFields, valuesFields );
061    }
062
063  @Override
064  protected void wrapGroupingAndCollect( Duct previous, int ordinal, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException
065    {
066    keyTuple.setIndex( ordinal );
067    keyTuple.setTuple( groupKey );
068
069    valueTuple.setIndex( ordinal );
070    valueTuple.setTuple( valuesTuple );
071
072    collector.collect( keyTuple, valueTuple );
073    }
074
075  @Override
076  protected Tuple unwrapGrouping( Tuple key )
077    {
078    return ( (IndexTuple) key ).getTuple();
079    }
080
081  protected OutputCollector createOutputCollector()
082    {
083    return ( (HadoopFlowProcess) flowProcess ).getOutputCollector();
084    }
085  }