001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.stream.graph; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030 031import cascading.flow.FlowElement; 032import cascading.flow.FlowElements; 033import cascading.flow.FlowException; 034import cascading.flow.FlowNode; 035import cascading.flow.FlowProcess; 036import cascading.flow.Flows; 037import cascading.flow.hadoop.stream.HadoopMemoryJoinGate; 038import cascading.flow.hadoop.util.HadoopUtil; 039import cascading.flow.stream.annotations.StreamMode; 040import cascading.flow.stream.duct.Duct; 041import cascading.flow.stream.duct.Gate; 042import cascading.flow.stream.element.InputSource; 043import cascading.flow.stream.element.MemoryHashJoinGate; 044import cascading.flow.stream.element.SinkStage; 045import cascading.flow.stream.element.SourceStage; 046import cascading.flow.stream.graph.IORole; 047import cascading.flow.stream.graph.NodeStreamGraph; 048import cascading.flow.tez.Hadoop2TezFlowProcess; 049import cascading.flow.tez.stream.element.TezBoundaryStage; 050import cascading.flow.tez.stream.element.TezCoGroupGate; 051import cascading.flow.tez.stream.element.TezGroupByGate; 052import cascading.flow.tez.stream.element.TezMergeGate; 053import cascading.flow.tez.stream.element.TezSinkStage; 054import cascading.flow.tez.stream.element.TezSourceStage; 055import cascading.flow.tez.util.TezUtil; 056import cascading.pipe.Boundary; 057import cascading.pipe.CoGroup; 058import cascading.pipe.Group; 059import cascading.pipe.GroupBy; 060import cascading.pipe.HashJoin; 061import cascading.pipe.Merge; 062import cascading.pipe.Pipe; 063import cascading.tap.Tap; 064import cascading.util.SetMultiMap; 065import cascading.util.SortedListMultiMap; 066import cascading.util.Util; 067import org.apache.hadoop.conf.Configuration; 068import org.apache.tez.dag.api.TezConfiguration; 069import org.apache.tez.runtime.api.LogicalInput; 070import org.apache.tez.runtime.api.LogicalOutput; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import static cascading.flow.tez.util.TezUtil.*; 075 076/** 077 * 078 */ 079public class Hadoop2TezStreamGraph extends NodeStreamGraph 080 { 081 private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezStreamGraph.class ); 082 083 private InputSource streamedHead; 084 private Map<String, LogicalInput> inputMap; 085 private Map<String, LogicalOutput> outputMap; 086 private Map<LogicalInput, Configuration> inputConfigMap = new HashMap<>(); 087 private Map<LogicalOutput, Configuration> outputConfigMap = new HashMap<>(); 088 private SetMultiMap<String, LogicalInput> inputMultiMap; 089 private SetMultiMap<String, LogicalOutput> outputMultiMap; 090 091 public Hadoop2TezStreamGraph( Hadoop2TezFlowProcess currentProcess, FlowNode flowNode, Map<String, LogicalInput> inputMap, Map<String, LogicalOutput> outputMap ) 092 { 093 super( currentProcess, flowNode ); 094 this.inputMap = inputMap; 095 this.outputMap = outputMap; 096 097 buildGraph(); 098 099 setTraps(); 100 setScopes(); 101 102 printGraph( node.getID(), node.getName(), flowProcess.getCurrentSliceNum() ); 103 bind(); 104 } 105 106 public InputSource getStreamedHead() 107 { 108 return streamedHead; 109 } 110 111 protected void buildGraph() 112 { 113 inputMultiMap = new SetMultiMap<>(); 114 115 for( Map.Entry<String, LogicalInput> entry : inputMap.entrySet() ) 116 { 117 Configuration inputConfiguration = getInputConfiguration( entry.getValue() ); 118 inputConfigMap.put( entry.getValue(), inputConfiguration ); 119 120 inputMultiMap.addAll( getEdgeSourceID( entry.getValue(), inputConfiguration ), entry.getValue() ); 121 } 122 123 outputMultiMap = new SetMultiMap<>(); 124 125 for( Map.Entry<String, LogicalOutput> entry : outputMap.entrySet() ) 126 { 127 Configuration outputConfiguration = getOutputConfiguration( entry.getValue() ); 128 outputConfigMap.put( entry.getValue(), outputConfiguration ); 129 130 outputMultiMap.addAll( TezUtil.getEdgeSinkID( entry.getValue(), outputConfiguration ), entry.getValue() ); 131 } 132 133 // this made the assumption we can have a physical and logical input per vertex. seems we can't 134 if( inputMultiMap.getKeys().size() == 1 ) 135 { 136 streamedSource = Flows.getFlowElementForID( node.getSourceElements(), Util.getFirst( inputMultiMap.getKeys() ) ); 137 } 138 else 139 { 140 Set<FlowElement> sourceElements = new HashSet<>( node.getSourceElements() ); 141 Set<? extends FlowElement> accumulated = node.getSourceElements( StreamMode.Accumulated ); 142 143 sourceElements.removeAll( accumulated ); 144 145 if( sourceElements.size() != 1 ) 146 throw new IllegalStateException( "too many input source keys, got: " + Util.join( sourceElements, ", " ) ); 147 148 streamedSource = Util.getFirst( sourceElements ); 149 } 150 151 LOG.info( "using streamed source: " + streamedSource ); 152 153 streamedHead = handleHead( streamedSource, flowProcess ); 154 155 Set<FlowElement> accumulated = new HashSet<>( node.getSourceElements() ); 156 157 accumulated.remove( streamedSource ); 158 159 Hadoop2TezFlowProcess tezProcess = (Hadoop2TezFlowProcess) flowProcess; 160 TezConfiguration conf = tezProcess.getConfiguration(); 161 162 for( FlowElement flowElement : accumulated ) 163 { 164 LOG.info( "using accumulated source: " + flowElement ); 165 166 if( flowElement instanceof Tap ) 167 { 168 Tap source = (Tap) flowElement; 169 170 // allows client side config to be used cluster side 171 String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( source ) ); 172 173 if( property == null ) 174 throw new IllegalStateException( "accumulated source conf property missing for: " + source.getIdentifier() ); 175 176 conf = getSourceConf( tezProcess, conf, property ); 177 } 178 else 179 { 180 conf = (TezConfiguration) inputConfigMap.get( FlowElements.id( flowElement ) ); 181 } 182 183 FlowProcess flowProcess = conf == null ? tezProcess : new Hadoop2TezFlowProcess( tezProcess, conf ); 184 185 handleHead( flowElement, flowProcess ); 186 } 187 } 188 189 private TezConfiguration getSourceConf( FlowProcess<TezConfiguration> flowProcess, TezConfiguration conf, String property ) 190 { 191 Map<String, String> priorConf; 192 193 try 194 { 195 priorConf = (Map<String, String>) HadoopUtil.deserializeBase64( property, conf, HashMap.class, true ); 196 } 197 catch( IOException exception ) 198 { 199 throw new FlowException( "unable to deserialize properties", exception ); 200 } 201 202 return flowProcess.mergeMapIntoConfig( conf, priorConf ); 203 } 204 205 private InputSource handleHead( FlowElement source, FlowProcess flowProcess ) 206 { 207 Duct sourceDuct; 208 209 if( source instanceof Tap ) 210 sourceDuct = createSourceStage( (Tap) source, flowProcess ); 211 else if( source instanceof Merge ) 212 sourceDuct = createMergeStage( (Merge) source, IORole.source ); 213 else if( source instanceof Boundary ) 214 sourceDuct = createBoundaryStage( (Boundary) source, IORole.source ); 215 else if( ( (Group) source ).isGroupBy() ) 216 sourceDuct = createGroupByGate( (GroupBy) source, IORole.source ); 217 else 218 sourceDuct = createCoGroupGate( (CoGroup) source, IORole.source ); 219 220 addHead( sourceDuct ); 221 222 handleDuct( source, sourceDuct ); 223 224 return (InputSource) sourceDuct; 225 } 226 227 protected SourceStage createSourceStage( Tap source, FlowProcess flowProcess ) 228 { 229 String id = Tap.id( source ); 230 LogicalInput logicalInput = inputMap.get( id ); 231 232 if( logicalInput == null ) 233 logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) ); 234 235 if( logicalInput == null ) 236 return new SourceStage( flowProcess, source ); 237 238 return new TezSourceStage( flowProcess, source, logicalInput ); 239 } 240 241 @Override 242 protected SinkStage createSinkStage( Tap sink ) 243 { 244 String id = Tap.id( sink ); 245 LogicalOutput logicalOutput = outputMap.get( id ); 246 247 if( logicalOutput == null ) 248 logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) ); 249 250 if( logicalOutput == null ) 251 throw new IllegalStateException( "could not find output for: " + sink ); 252 253 return new TezSinkStage( flowProcess, sink, logicalOutput ); 254 } 255 256 @Override 257 protected Duct createMergeStage( Merge element, IORole role ) 258 { 259 if( role == IORole.pass ) 260 return super.createMergeStage( element, IORole.pass ); 261 else if( role == IORole.sink ) 262 return createSinkMergeGate( element ); 263 else if( role == IORole.source ) 264 return createSourceMergeGate( element ); 265 else 266 throw new UnsupportedOperationException( "both role not supported with merge" ); 267 } 268 269 private Duct createSourceMergeGate( Merge element ) 270 { 271 return new TezMergeGate( flowProcess, element, IORole.source, createInputMap( element ) ); 272 } 273 274 private Duct createSinkMergeGate( Merge element ) 275 { 276 return new TezMergeGate( flowProcess, element, IORole.sink, findLogicalOutputs( element ) ); 277 } 278 279 @Override 280 protected Duct createBoundaryStage( Boundary element, IORole role ) 281 { 282 if( role == IORole.pass ) 283 return super.createBoundaryStage( element, IORole.pass ); 284 else if( role == IORole.sink ) 285 return createSinkBoundaryStage( element ); 286 else if( role == IORole.source ) 287 return createSourceBoundaryStage( element ); 288 else 289 throw new UnsupportedOperationException( "both role not supported with boundary" ); 290 } 291 292 private Duct createSourceBoundaryStage( Boundary element ) 293 { 294 return new TezBoundaryStage( flowProcess, element, IORole.source, findLogicalInput( element ) ); 295 } 296 297 private Duct createSinkBoundaryStage( Boundary element ) 298 { 299 return new TezBoundaryStage( flowProcess, element, IORole.sink, findLogicalOutputs( element ) ); 300 } 301 302 @Override 303 protected Gate createGroupByGate( GroupBy element, IORole role ) 304 { 305 if( role == IORole.sink ) 306 return createSinkGroupByGate( element ); 307 else 308 return createSourceGroupByGate( element ); 309 } 310 311 @Override 312 protected Gate createCoGroupGate( CoGroup element, IORole role ) 313 { 314 if( role == IORole.sink ) 315 return createSinkCoGroupByGate( element ); 316 else 317 return createSourceCoGroupByGate( element ); 318 } 319 320 private Gate createSinkCoGroupByGate( CoGroup element ) 321 { 322 return new TezCoGroupGate( flowProcess, element, IORole.sink, findLogicalOutput( element ) ); 323 } 324 325 private Gate createSourceCoGroupByGate( CoGroup element ) 326 { 327 return new TezCoGroupGate( flowProcess, element, IORole.source, createInputMap( element ) ); 328 } 329 330 protected Gate createSinkGroupByGate( GroupBy element ) 331 { 332 return new TezGroupByGate( flowProcess, element, IORole.sink, findLogicalOutput( element ) ); 333 } 334 335 protected Gate createSourceGroupByGate( GroupBy element ) 336 { 337 return new TezGroupByGate( flowProcess, element, IORole.source, createInputMap( element ) ); 338 } 339 340 private LogicalOutput findLogicalOutput( Pipe element ) 341 { 342 String id = Pipe.id( element ); 343 LogicalOutput logicalOutput = outputMap.get( id ); 344 345 if( logicalOutput == null ) 346 logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) ); 347 348 if( logicalOutput == null ) 349 throw new IllegalStateException( "could not find output for: " + element ); 350 351 return logicalOutput; 352 } 353 354 private Collection<LogicalOutput> findLogicalOutputs( Pipe element ) 355 { 356 String id = Pipe.id( element ); 357 358 return outputMultiMap.getValues( id ); 359 } 360 361 private LogicalInput findLogicalInput( Pipe element ) 362 { 363 String id = Pipe.id( element ); 364 LogicalInput logicalInput = inputMap.get( id ); 365 366 if( logicalInput == null ) 367 logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) ); 368 369 if( logicalInput == null ) 370 throw new IllegalStateException( "could not find input for: " + element ); 371 372 return logicalInput; 373 } 374 375 /** 376 * Maps each input to an ordinal on the flowelement. an input may be bound to multiple ordinals. 377 * 378 * @param element 379 */ 380 private SortedListMultiMap<Integer, LogicalInput> createInputMap( FlowElement element ) 381 { 382 String id = FlowElements.id( element ); 383 SortedListMultiMap<Integer, LogicalInput> ordinalMap = new SortedListMultiMap<>(); 384 385 for( LogicalInput logicalInput : inputMap.values() ) 386 { 387 Configuration configuration = inputConfigMap.get( logicalInput ); 388 389 String foundID = configuration.get( "cascading.node.source" ); 390 391 if( Util.isEmpty( foundID ) ) 392 throw new IllegalStateException( "cascading.node.source property not set on source LogicalInput" ); 393 394 if( !foundID.equals( id ) ) 395 continue; 396 397 String values = configuration.get( "cascading.node.ordinals", "" ); 398 List<Integer> ordinals = Util.split( Integer.class, ",", values ); 399 400 for( Integer ordinal : ordinals ) 401 ordinalMap.put( ordinal, logicalInput ); 402 } 403 404 return ordinalMap; 405 } 406 407 @Override 408 protected MemoryHashJoinGate createNonBlockingJoinGate( HashJoin join ) 409 { 410 return new HadoopMemoryJoinGate( flowProcess, join ); // does not use a latch 411 } 412 }