001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026 027import cascading.flow.FlowNode; 028import cascading.flow.FlowProcess; 029import cascading.flow.hadoop.util.HadoopMRUtil; 030import cascading.flow.planner.BaseFlowNode; 031import cascading.flow.planner.Scope; 032import cascading.flow.planner.graph.ElementDirectedGraph; 033import cascading.flow.planner.graph.ElementGraph; 034import cascading.flow.planner.graph.ElementGraphs; 035import cascading.flow.planner.process.FlowNodeGraph; 036import cascading.flow.planner.process.ProcessEdge; 037import cascading.pipe.Pipe; 038import cascading.tap.Tap; 039import org.apache.hadoop.mapred.JobConf; 040 041/** Class MapReduceFlowStep wraps a {@link JobConf} and allows it to be executed as a {@link cascading.flow.Flow}. */ 042public class MapReduceFlowStep extends HadoopFlowStep 043 { 044 public static final String MAP = "Map"; 045 public static final String SHUFFLE = "Shuffle"; 046 public static final String REDUCE = "Reduce"; 047 048 /** Field jobConf */ 049 private final JobConf jobConf; 050 051 public MapReduceFlowStep( HadoopFlow flow, JobConf jobConf ) 052 { 053 if( flow == null ) 054 throw new IllegalArgumentException( "flow may not be null" ); 055 056 setName( jobConf.getJobName() ); 057 setFlow( flow ); 058 059 this.jobConf = jobConf; 060 061 configure(); // requires flow and jobConf 062 } 063 064 protected MapReduceFlowStep( HadoopFlow flow, String stepName, JobConf jobConf, Tap sink ) 065 { 066 if( flow == null ) 067 throw new IllegalArgumentException( "flow may not be null" ); 068 069 setName( stepName ); 070 setFlow( flow ); 071 072 this.jobConf = jobConf; 073 074 addSink( "default", sink ); 075 } 076 077 protected JobConf getJobConf() 078 { 079 return jobConf; 080 } 081 082 @Override 083 public ElementGraph getElementGraph() 084 { 085 if( elementGraph == null ) 086 elementGraph = createStepElementGraph( getFlowNodeGraph() ); 087 088 return elementGraph; 089 } 090 091 @Override 092 public FlowNodeGraph getFlowNodeGraph() 093 { 094 if( flowNodeGraph == null ) 095 flowNodeGraph = createFlowNodeGraph( createNodeElementGraphs( jobConf ) ); 096 097 return flowNodeGraph; 098 } 099 100 @Override 101 public JobConf createInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig ) 102 { 103 return jobConf; 104 } 105 106 private ElementGraph createStepElementGraph( FlowNodeGraph flowNodeGraph ) 107 { 108 return ElementGraphs.asElementDirectedGraph( flowNodeGraph.getElementGraphs() ).bindExtents(); 109 } 110 111 private List<ElementGraph> createNodeElementGraphs( JobConf jobConf ) 112 { 113 BaseMapReduceFlow baseFlow = (BaseMapReduceFlow) getFlow(); 114 boolean hasReducer = HadoopMRUtil.hasReducer( jobConf ); 115 116 List<ElementGraph> result = new ArrayList<>(); 117 ElementGraph mapElementGraph = createElementDirectedGraph(); 118 ElementGraph tailElementGraph = mapElementGraph; 119 120 Pipe headOperation = createMapOperation(); 121 Pipe tailOperation = headOperation; 122 123 mapElementGraph.addVertex( headOperation ); 124 125 result.add( mapElementGraph ); 126 127 ElementGraph reduceElementGraph = null; 128 129 if( hasReducer ) 130 { 131 Pipe shuffleOperation = createShuffleOperation(); 132 mapElementGraph.addVertex( shuffleOperation ); 133 134 mapElementGraph.addEdge( headOperation, shuffleOperation ); 135 136 reduceElementGraph = createElementDirectedGraph(); 137 138 reduceElementGraph.addVertex( shuffleOperation ); 139 Pipe reduceOperation = createReduceOperation(); 140 reduceElementGraph.addVertex( reduceOperation ); 141 142 reduceElementGraph.addEdge( shuffleOperation, reduceOperation ); 143 144 tailOperation = reduceOperation; 145 tailElementGraph = reduceElementGraph; 146 147 result.add( reduceElementGraph ); 148 } 149 150 Map<String, Tap> sources = baseFlow.createSources( jobConf ); 151 152 for( Map.Entry<String, Tap> entry : sources.entrySet() ) 153 { 154 mapElementGraph.addVertex( entry.getValue() ); 155 mapElementGraph.addEdge( entry.getValue(), headOperation, new Scope( entry.getKey() ) ); 156 } 157 158 Map<String, Tap> sinks = baseFlow.createSinks( jobConf ); 159 160 for( Map.Entry<String, Tap> entry : sinks.entrySet() ) 161 { 162 tailElementGraph.addVertex( entry.getValue() ); 163 tailElementGraph.addEdge( tailOperation, entry.getValue(), new Scope( entry.getKey() ) ); 164 } 165 166 mapElementGraph.bindExtents(); 167 168 if( reduceElementGraph != null ) 169 reduceElementGraph.bindExtents(); 170 171 return result; 172 } 173 174 protected ElementDirectedGraph createElementDirectedGraph() 175 { 176 return new ElementDirectedGraph(); 177 } 178 179 protected Pipe createMapOperation() 180 { 181 return new Pipe( MAP ); 182 } 183 184 protected Pipe createShuffleOperation() 185 { 186 return new Pipe( SHUFFLE ); 187 } 188 189 protected Pipe createReduceOperation() 190 { 191 return new Pipe( REDUCE ); 192 } 193 194 protected FlowNodeGraph createFlowNodeGraph( List<ElementGraph> elementGraphs ) 195 { 196 ElementGraph mapElementGraph = elementGraphs.get( 0 ); 197 ElementGraph reduceElementGraph = elementGraphs.size() == 2 ? elementGraphs.get( 1 ) : null; 198 199 FlowNodeGraph flowNodeGraph = new FlowNodeGraph(); 200 int nodes = elementGraphs.size(); 201 202 FlowNode mapperNode = new BaseFlowNode( mapElementGraph, String.format( "(1/%s)", nodes ), 0 ); 203 flowNodeGraph.addVertex( mapperNode ); 204 205 if( nodes == 2 ) 206 { 207 FlowNode reducerNode = new BaseFlowNode( reduceElementGraph, "(2/2)", 1 ); 208 flowNodeGraph.addVertex( reducerNode ); 209 flowNodeGraph.addEdge( mapperNode, reducerNode, new ProcessEdge( mapperNode, reducerNode ) ); 210 } 211 212 return flowNodeGraph; 213 } 214 }