001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop.stream; 022 023 import java.util.List; 024 025 import cascading.flow.FlowElement; 026 import cascading.flow.hadoop.HadoopFlowProcess; 027 import cascading.flow.hadoop.HadoopFlowStep; 028 import cascading.flow.stream.Duct; 029 import cascading.flow.stream.Gate; 030 import cascading.flow.stream.SinkStage; 031 import cascading.flow.stream.SpliceGate; 032 import cascading.flow.stream.StepStreamGraph; 033 import cascading.pipe.CoGroup; 034 import cascading.pipe.Group; 035 import cascading.pipe.GroupBy; 036 import cascading.pipe.HashJoin; 037 import cascading.tap.Tap; 038 039 /** 040 * 041 */ 042 public class HadoopReduceStreamGraph extends StepStreamGraph 043 { 044 public HadoopReduceStreamGraph( HadoopFlowProcess flowProcess, HadoopFlowStep step ) 045 { 046 super( flowProcess, step ); 047 048 buildGraph(); 049 050 setTraps(); 051 setScopes(); 052 053 printGraph( step.getID(), "reduce", flowProcess.getCurrentSliceNum() ); 054 055 bind(); 056 } 057 058 protected void buildGraph() 059 { 060 Group group = step.getGroup(); 061 062 Duct rhsDuct; 063 064 if( group.isGroupBy() ) 065 rhsDuct = new HadoopGroupByGate( flowProcess, (GroupBy) group, SpliceGate.Role.source ); 066 else 067 rhsDuct = new HadoopCoGroupGate( flowProcess, (CoGroup) group, SpliceGate.Role.source ); 068 069 addHead( rhsDuct ); 070 071 handleDuct( group, rhsDuct ); 072 } 073 074 @Override 075 protected SinkStage createSinkStage( Tap element ) 076 { 077 return new HadoopSinkStage( flowProcess, element ); 078 } 079 080 protected Gate createCoGroupGate( CoGroup element ) 081 { 082 throw new IllegalStateException( "should not happen" ); 083 } 084 085 protected Gate createGroupByGate( GroupBy element ) 086 { 087 throw new IllegalStateException( "should not happen" ); 088 } 089 090 @Override 091 protected Gate createHashJoinGate( HashJoin join ) 092 { 093 throw new IllegalStateException( "should not happen" ); 094 } 095 096 protected boolean stopOnElement( FlowElement lhsElement, List<FlowElement> successors ) 097 { 098 if( successors.isEmpty() ) 099 { 100 if( !( lhsElement instanceof Tap ) ) 101 throw new IllegalStateException( "expected a Tap instance" ); 102 103 return true; 104 } 105 106 return false; 107 } 108 }