001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.stream.element;
022
023import cascading.flow.FlowProcess;
024import cascading.flow.hadoop.HadoopCoGroupClosure;
025import cascading.flow.hadoop.HadoopFlowProcess;
026import cascading.flow.hadoop.stream.HadoopGroupGate;
027import cascading.flow.stream.duct.Duct;
028import cascading.flow.stream.graph.IORole;
029import cascading.flow.stream.graph.StreamGraph;
030import cascading.pipe.CoGroup;
031import cascading.tuple.Tuple;
032import cascading.tuple.io.IndexTuple;
033import cascading.tuple.io.KeyIndexTuple;
034import cascading.tuple.io.ValueIndexTuple;
035import org.apache.hadoop.mapred.OutputCollector;
036
037/**
038 *
039 */
040public class HadoopCoGroupGate extends HadoopGroupGate
041  {
042  IndexTuple keyTuple = new KeyIndexTuple();
043  IndexTuple valueTuple = new ValueIndexTuple();
044
045  public HadoopCoGroupGate( FlowProcess flowProcess, CoGroup coGroup, IORole role )
046    {
047    super( flowProcess, coGroup, role );
048    }
049
050  @Override
051  public void bind( StreamGraph streamGraph )
052    {
053    super.bind( streamGraph );
054    }
055
056  @Override
057  protected HadoopCoGroupClosure createClosure()
058    {
059    return new HadoopCoGroupClosure( flowProcess, splice.getNumSelfJoins(), keyFields, valuesFields );
060    }
061
062  @Override
063  protected void wrapGroupingAndCollect( Duct previous, Tuple valuesTuple, Tuple groupKey ) throws java.io.IOException
064    {
065    Integer ordinal = ordinalMap.get( previous );
066
067    keyTuple.setIndex( ordinal );
068    keyTuple.setTuple( groupKey );
069
070    valueTuple.setIndex( ordinal );
071    valueTuple.setTuple( valuesTuple );
072
073    collector.collect( keyTuple, valueTuple );
074    }
075
076  @Override
077  protected Tuple unwrapGrouping( Tuple key )
078    {
079    return ( (IndexTuple) key ).getTuple();
080    }
081
082  protected OutputCollector createOutputCollector()
083    {
084    return ( (HadoopFlowProcess) flowProcess ).getOutputCollector();
085    }
086  }