001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.hadoop.stream.graph;
023
024import cascading.flow.FlowElement;
025import cascading.flow.FlowNode;
026import cascading.flow.hadoop.HadoopFlowProcess;
027import cascading.flow.hadoop.stream.element.HadoopCoGroupGate;
028import cascading.flow.hadoop.stream.element.HadoopGroupByGate;
029import cascading.flow.hadoop.stream.element.HadoopSinkStage;
030import cascading.flow.stream.duct.Duct;
031import cascading.flow.stream.duct.Gate;
032import cascading.flow.stream.element.SinkStage;
033import cascading.flow.stream.graph.IORole;
034import cascading.flow.stream.graph.NodeStreamGraph;
035import cascading.pipe.CoGroup;
036import cascading.pipe.Group;
037import cascading.pipe.GroupBy;
038import cascading.pipe.HashJoin;
039import cascading.tap.Tap;
040import cascading.util.Util;
041
042/**
043 *
044 */
045public class HadoopReduceStreamGraph extends NodeStreamGraph
046  {
047  public HadoopReduceStreamGraph( HadoopFlowProcess flowProcess, FlowNode node, FlowElement sourceElement )
048    {
049    super( flowProcess, node, sourceElement );
050
051    buildGraph();
052
053    setTraps();
054    setScopes();
055
056    printGraph( node.getID(), "reduce", flowProcess.getCurrentSliceNum() );
057
058    bind();
059
060    printBoundGraph( node.getID(), "reduce", flowProcess.getCurrentSliceNum() );
061    }
062
063  protected void buildGraph()
064    {
065    Group group = (Group) Util.getFirst( node.getSourceElements() );
066
067    Duct rhsDuct;
068
069    if( group.isGroupBy() )
070      rhsDuct = new HadoopGroupByGate( flowProcess, (GroupBy) group, IORole.source );
071    else
072      rhsDuct = new HadoopCoGroupGate( flowProcess, (CoGroup) group, IORole.source );
073
074    addHead( rhsDuct );
075
076    handleDuct( group, rhsDuct );
077    }
078
079  @Override
080  protected SinkStage createSinkStage( Tap element )
081    {
082    return new HadoopSinkStage( flowProcess, element );
083    }
084
085  protected Gate createCoGroupGate( CoGroup element, IORole role )
086    {
087    throw new IllegalStateException( "should not happen" );
088    }
089
090  @Override
091  protected Gate createGroupByGate( GroupBy element, IORole role )
092    {
093    throw new IllegalStateException( "should not happen" );
094    }
095
096  @Override
097  protected Gate createHashJoinGate( HashJoin join )
098    {
099    throw new IllegalStateException( "should not happen" );
100    }
101
102  }