001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.stream.graph;
022
023import cascading.flow.FlowElement;
024import cascading.flow.FlowNode;
025import cascading.flow.hadoop.HadoopFlowProcess;
026import cascading.flow.hadoop.stream.element.HadoopCoGroupGate;
027import cascading.flow.hadoop.stream.element.HadoopGroupByGate;
028import cascading.flow.hadoop.stream.element.HadoopSinkStage;
029import cascading.flow.stream.duct.Duct;
030import cascading.flow.stream.duct.Gate;
031import cascading.flow.stream.element.SinkStage;
032import cascading.flow.stream.graph.IORole;
033import cascading.flow.stream.graph.NodeStreamGraph;
034import cascading.pipe.CoGroup;
035import cascading.pipe.Group;
036import cascading.pipe.GroupBy;
037import cascading.pipe.HashJoin;
038import cascading.tap.Tap;
039import cascading.util.Util;
040
041/**
042 *
043 */
044public class HadoopReduceStreamGraph extends NodeStreamGraph
045  {
046  public HadoopReduceStreamGraph( HadoopFlowProcess flowProcess, FlowNode node, FlowElement sourceElement )
047    {
048    super( flowProcess, node, sourceElement );
049
050    buildGraph();
051
052    setTraps();
053    setScopes();
054
055    printGraph( node.getID(), "reduce", flowProcess.getCurrentSliceNum() );
056
057    bind();
058    }
059
060  protected void buildGraph()
061    {
062    Group group = (Group) Util.getFirst( node.getSourceElements() );
063
064    Duct rhsDuct;
065
066    if( group.isGroupBy() )
067      rhsDuct = new HadoopGroupByGate( flowProcess, (GroupBy) group, IORole.source );
068    else
069      rhsDuct = new HadoopCoGroupGate( flowProcess, (CoGroup) group, IORole.source );
070
071    addHead( rhsDuct );
072
073    handleDuct( group, rhsDuct );
074    }
075
076  @Override
077  protected SinkStage createSinkStage( Tap element )
078    {
079    return new HadoopSinkStage( flowProcess, element );
080    }
081
082  protected Gate createCoGroupGate( CoGroup element, IORole role )
083    {
084    throw new IllegalStateException( "should not happen" );
085    }
086
087  @Override
088  protected Gate createGroupByGate( GroupBy element, IORole role )
089    {
090    throw new IllegalStateException( "should not happen" );
091    }
092
093  @Override
094  protected Gate createHashJoinGate( HashJoin join )
095    {
096    throw new IllegalStateException( "should not happen" );
097    }
098
099  }