001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez; 022 023import java.io.File; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.Iterator; 030import java.util.LinkedList; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034 035import cascading.CascadingException; 036import cascading.flow.FlowElement; 037import cascading.flow.FlowElements; 038import cascading.flow.FlowException; 039import cascading.flow.FlowNode; 040import cascading.flow.FlowProcess; 041import cascading.flow.FlowRuntimeProps; 042import cascading.flow.hadoop.ConfigurationSetter; 043import cascading.flow.hadoop.util.HadoopUtil; 044import cascading.flow.planner.BaseFlowStep; 045import cascading.flow.planner.FlowStepJob; 046import cascading.flow.planner.graph.ElementGraph; 047import cascading.flow.planner.process.FlowNodeGraph; 048import cascading.flow.planner.process.ProcessEdge; 049import cascading.flow.stream.annotations.StreamMode; 050import cascading.flow.tez.planner.Hadoop2TezFlowStepJob; 051import cascading.flow.tez.util.TezUtil; 052import cascading.management.state.ClientState; 053import cascading.pipe.Boundary; 054import cascading.pipe.CoGroup; 055import cascading.pipe.Group; 056import cascading.pipe.GroupBy; 057import cascading.pipe.Merge; 058import cascading.property.AppProps; 059import cascading.tap.Tap; 060import cascading.tap.hadoop.Hfs; 061import cascading.tap.hadoop.PartitionTap; 062import cascading.tap.hadoop.util.Hadoop18TapUtil; 063import cascading.tuple.Fields; 064import cascading.tuple.hadoop.TupleSerialization; 065import cascading.tuple.hadoop.util.GroupingSortingComparator; 066import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator; 067import cascading.tuple.hadoop.util.ReverseTupleComparator; 068import cascading.tuple.hadoop.util.TupleComparator; 069import cascading.tuple.io.KeyTuple; 070import cascading.tuple.io.TuplePair; 071import cascading.tuple.io.ValueTuple; 072import cascading.tuple.tez.util.GroupingSortingPartitioner; 073import cascading.tuple.tez.util.TuplePartitioner; 074import cascading.util.Util; 075import cascading.util.Version; 076import org.apache.hadoop.conf.Configuration; 077import org.apache.hadoop.fs.FileSystem; 078import org.apache.hadoop.fs.Path; 079import org.apache.hadoop.mapred.JobConf; 080import org.apache.hadoop.mapreduce.JobContext; 081import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 082import org.apache.hadoop.yarn.api.records.LocalResource; 083import org.apache.hadoop.yarn.api.records.LocalResourceType; 084import org.apache.tez.common.TezUtils; 085import org.apache.tez.dag.api.DAG; 086import org.apache.tez.dag.api.DataSinkDescriptor; 087import org.apache.tez.dag.api.DataSourceDescriptor; 088import org.apache.tez.dag.api.Edge; 089import org.apache.tez.dag.api.EdgeProperty; 090import org.apache.tez.dag.api.GroupInputEdge; 091import org.apache.tez.dag.api.InputDescriptor; 092import org.apache.tez.dag.api.OutputDescriptor; 093import org.apache.tez.dag.api.ProcessorDescriptor; 094import org.apache.tez.dag.api.TezConfiguration; 095import org.apache.tez.dag.api.UserPayload; 096import org.apache.tez.dag.api.Vertex; 097import org.apache.tez.dag.api.VertexGroup; 098import org.apache.tez.mapreduce.input.MRInput; 099import org.apache.tez.mapreduce.output.MROutput; 100import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; 101import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; 102import org.apache.tez.runtime.library.input.OrderedGroupedKVInput; 103import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput; 104import org.apache.tez.runtime.library.input.UnorderedKVInput; 105import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; 106import org.apache.tez.runtime.library.output.UnorderedKVOutput; 107import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput; 108import org.slf4j.Logger; 109import org.slf4j.LoggerFactory; 110 111import static cascading.flow.hadoop.util.HadoopUtil.*; 112import static cascading.flow.tez.util.TezUtil.addToClassPath; 113import static cascading.tap.hadoop.DistCacheTap.CASCADING_LOCAL_RESOURCES; 114import static cascading.tap.hadoop.DistCacheTap.CASCADING_REMOTE_RESOURCES; 115import static cascading.util.Util.getFirst; 116import static java.util.Collections.singletonList; 117import static org.apache.hadoop.yarn.api.records.LocalResourceType.ARCHIVE; 118import static org.apache.hadoop.yarn.api.records.LocalResourceType.FILE; 119 120/** 121 * 122 */ 123public class Hadoop2TezFlowStep extends BaseFlowStep<TezConfiguration> 124 { 125 private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezFlowStep.class ); 126 127 private Map<String, LocalResource> allLocalResources = new HashMap<>(); 128 private Map<Path, Path> syncPaths = new HashMap<>(); 129 private Map<String, String> environment = new HashMap<>(); 130 131 public Hadoop2TezFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph ) 132 { 133 super( elementGraph, flowNodeGraph ); 134 } 135 136 @Override 137 public Map<Object, Object> getConfigAsProperties() 138 { 139 return HadoopUtil.createProperties( getConfig() ); 140 } 141 142 @Override 143 public TezConfiguration createInitializedConfig( FlowProcess<TezConfiguration> flowProcess, TezConfiguration parentConfig ) 144 { 145 TezConfiguration stepConf = parentConfig == null ? new TezConfiguration() : new TezConfiguration( parentConfig ); 146 147 TupleSerialization.setSerializations( stepConf ); 148 149 String versionString = Version.getRelease(); 150 151 if( versionString != null ) 152 stepConf.set( "cascading.version", versionString ); 153 154 stepConf.set( CASCADING_FLOW_STEP_ID, getID() ); 155 stepConf.set( "cascading.flow.step.num", Integer.toString( getOrdinal() ) ); 156 157 String flowStagingPath = ( (Hadoop2TezFlow) getFlow() ).getFlowStagingPath(); 158 List<String> classPath = ( (Hadoop2TezFlow) getFlow() ).getClassPath(); 159 160 // is updated in addToClassPath method 161 Map<String, LocalResource> dagResources = new HashMap<>(); 162 163 if( !classPath.isEmpty() ) 164 { 165 // jars in the root will be in the remote CLASSPATH, no need to add to the environment 166 Map<Path, Path> dagClassPath = addToClassPath( stepConf, flowStagingPath, null, classPath, FILE, dagResources, null ); 167 168 syncPaths.putAll( dagClassPath ); 169 } 170 171 String appJarPath = stepConf.get( AppProps.APP_JAR_PATH ); 172 173 if( appJarPath != null ) 174 { 175 // the PATTERN represents the insides of the app jar, those elements must be added to the remote CLASSPATH 176 List<String> classpath = singletonList( appJarPath ); 177 Map<Path, Path> pathMap = addToClassPath( stepConf, flowStagingPath, null, classpath, ARCHIVE, dagResources, environment ); 178 179 syncPaths.putAll( pathMap ); 180 181 // AM does not support environments like containers do, so the classpath has to be passed via configuration. 182 String fileName = new File( appJarPath ).getName(); 183 stepConf.set( TezConfiguration.TEZ_CLUSTER_ADDITIONAL_CLASSPATH_PREFIX, 184 "$PWD/" + fileName + "/:$PWD/" + fileName + "/classes/:$PWD/" + fileName + "/lib/*:" ); 185 } 186 187 allLocalResources.putAll( dagResources ); 188 189 initFromStepConfigDef( stepConf ); 190 191 return stepConf; 192 } 193 194 @Override 195 protected FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedStepConfig ) 196 { 197 DAG dag = createDAG( flowProcess, initializedStepConfig ); 198 199 return new Hadoop2TezFlowStepJob( clientState, this, initializedStepConfig, dag ); 200 } 201 202 private DAG createDAG( FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig ) 203 { 204 FlowNodeGraph nodeGraph = getFlowNodeGraph(); 205 Map<FlowNode, Vertex> vertexMap = new HashMap<>(); 206 DAG dag = DAG.create( getStepDisplayName( initializedConfig.getInt( "cascading.display.id.truncate", Util.ID_LENGTH ) ) ); 207 208 dag.addTaskLocalFiles( allLocalResources ); 209 210 Iterator<FlowNode> iterator = nodeGraph.getOrderedTopologicalIterator(); // ordering of nodes for consistent remote debugging 211 212 while( iterator.hasNext() ) 213 { 214 FlowNode flowNode = iterator.next(); 215 216 Vertex vertex = createVertex( flowProcess, initializedConfig, flowNode ); 217 dag.addVertex( vertex ); 218 219 vertexMap.put( flowNode, vertex ); 220 } 221 222 LinkedList<ProcessEdge> processedEdges = new LinkedList<>(); 223 224 for( ProcessEdge processEdge : nodeGraph.edgeSet() ) 225 { 226 if( processedEdges.contains( processEdge ) ) 227 continue; 228 229 FlowNode edgeTargetFlowNode = nodeGraph.getEdgeTarget( processEdge ); 230 231 FlowElement flowElement = processEdge.getFlowElement(); 232 List<FlowNode> sourceNodes = nodeGraph.getElementSourceProcesses( flowElement ); 233 234 EdgeProperty edgeProperty = createEdgeProperty( initializedConfig, processEdge ); 235 236 Vertex targetVertex = vertexMap.get( edgeTargetFlowNode ); 237 238 if( sourceNodes.size() == 1 || flowElement instanceof CoGroup || flowElement instanceof Boundary ) // todo: create group vertices around incoming ordinal 239 { 240 FlowNode edgeSourceFlowNode = nodeGraph.getEdgeSource( processEdge ); 241 Vertex sourceVertex = vertexMap.get( edgeSourceFlowNode ); 242 243 LOG.debug( "adding edge between: {} and {}", sourceVertex, targetVertex ); 244 245 dag.addEdge( Edge.create( sourceVertex, targetVertex, edgeProperty ) ); 246 } 247 else if( flowElement instanceof GroupBy || flowElement instanceof Merge ) // merge - source nodes > 1 248 { 249 List<String> sourceVerticesIDs = new ArrayList<>(); 250 List<Vertex> sourceVertices = new ArrayList<>(); 251 252 for( FlowNode edgeSourceFlowNode : sourceNodes ) 253 { 254 sourceVerticesIDs.add( edgeSourceFlowNode.getID() ); 255 sourceVertices.add( vertexMap.get( edgeSourceFlowNode ) ); 256 processedEdges.add( nodeGraph.getEdge( edgeSourceFlowNode, edgeTargetFlowNode ) ); 257 } 258 259 VertexGroup vertexGroup = dag.createVertexGroup( edgeTargetFlowNode.getID(), sourceVertices.toArray( new Vertex[ sourceVertices.size() ] ) ); 260 261 String inputClassName = flowElement instanceof Group ? OrderedGroupedMergedKVInput.class.getName() : ConcatenatedMergedKeyValueInput.class.getName(); 262 263 InputDescriptor inputDescriptor = InputDescriptor.create( inputClassName ).setUserPayload( edgeProperty.getEdgeDestination().getUserPayload() ); 264 265 LOG.info( "adding grouped edge between: {} and {}", Util.join( sourceVerticesIDs, "," ), targetVertex.getName() ); 266 dag.addEdge( GroupInputEdge.create( vertexGroup, targetVertex, edgeProperty, inputDescriptor ) ); 267 } 268 else 269 { 270 throw new UnsupportedOperationException( "can't make edge for: " + flowElement ); 271 } 272 } 273 274 return dag; 275 } 276 277 private EdgeProperty createEdgeProperty( TezConfiguration config, ProcessEdge processEdge ) 278 { 279 FlowElement flowElement = processEdge.getFlowElement(); 280 281 EdgeValues edgeValues = new EdgeValues( new TezConfiguration( config ), processEdge ); 282 283 edgeValues.keyClassName = KeyTuple.class.getName(); // TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS 284 edgeValues.valueClassName = ValueTuple.class.getName(); // TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS 285 edgeValues.keyComparatorClassName = TupleComparator.class.getName(); 286 edgeValues.keyPartitionerClassName = TuplePartitioner.class.getName(); 287 edgeValues.outputClassName = null; 288 edgeValues.inputClassName = null; 289 edgeValues.movementType = null; 290 edgeValues.sourceType = null; 291 edgeValues.schedulingType = null; 292 293 if( flowElement instanceof Group ) 294 applyGroup( edgeValues ); 295 else if( ( flowElement instanceof Boundary || flowElement instanceof Merge ) && processEdge.getSinkAnnotations().contains( StreamMode.Accumulated ) ) 296 applyBoundaryMergeAccumulated( edgeValues ); 297 else if( flowElement instanceof Boundary || flowElement instanceof Merge ) 298 applyBoundaryMerge( edgeValues ); 299 else 300 throw new IllegalStateException( "unsupported flow element: " + flowElement.getClass().getCanonicalName() ); 301 302 applyEdgeAnnotations( processEdge, edgeValues ); 303 304 return createEdgeProperty( edgeValues ); 305 } 306 307 private void applyEdgeAnnotations( ProcessEdge processEdge, EdgeValues edgeValues ) 308 { 309 processEdge.addEdgeAnnotation( edgeValues.movementType ); 310 processEdge.addEdgeAnnotation( edgeValues.sourceType ); 311 processEdge.addEdgeAnnotation( edgeValues.schedulingType ); 312 } 313 314 private EdgeValues applyBoundaryMerge( EdgeValues edgeValues ) 315 { 316 // todo: support for one to one 317 edgeValues.outputClassName = UnorderedPartitionedKVOutput.class.getName(); 318 edgeValues.inputClassName = UnorderedKVInput.class.getName(); 319 320 edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER; 321 edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED; 322 edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; 323 324 return edgeValues; 325 } 326 327 private EdgeValues applyBoundaryMergeAccumulated( EdgeValues edgeValues ) 328 { 329 edgeValues.outputClassName = UnorderedKVOutput.class.getName(); 330 edgeValues.inputClassName = UnorderedKVInput.class.getName(); 331 332 edgeValues.movementType = EdgeProperty.DataMovementType.BROADCAST; 333 edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED; 334 edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; 335 336 return edgeValues; 337 } 338 339 private EdgeValues applyGroup( EdgeValues edgeValues ) 340 { 341 Group group = (Group) edgeValues.flowElement; 342 343 if( group.isSortReversed() ) 344 edgeValues.keyComparatorClassName = ReverseTupleComparator.class.getName(); 345 346 int ordinal = getFirst( edgeValues.ordinals ); 347 348 addComparators( edgeValues.config, "cascading.group.comparator", group.getKeySelectors(), edgeValues.getResolvedKeyFieldsMap().get( ordinal ) ); 349 350 if( !group.isGroupBy() ) 351 { 352 edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName(); 353 edgeValues.inputClassName = OrderedGroupedKVInput.class.getName(); 354 355 edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER; 356 edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED; 357 edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; 358 } 359 else 360 { 361 addComparators( edgeValues.config, "cascading.sort.comparator", group.getSortingSelectors(), edgeValues.getResolvedSortFieldsMap().get( ordinal ) ); 362 363 edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName(); 364 edgeValues.inputClassName = OrderedGroupedKVInput.class.getName(); 365 366 edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER; 367 edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED; 368 edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; 369 } 370 371 if( group.isSorted() ) 372 { 373 edgeValues.keyClassName = TuplePair.class.getName(); 374 edgeValues.keyPartitionerClassName = GroupingSortingPartitioner.class.getName(); 375 376 if( group.isSortReversed() ) 377 edgeValues.keyComparatorClassName = ReverseGroupingSortingComparator.class.getName(); 378 else 379 edgeValues.keyComparatorClassName = GroupingSortingComparator.class.getName(); 380 } 381 382 return edgeValues; 383 } 384 385 private EdgeProperty createEdgeProperty( EdgeValues edgeValues ) 386 { 387 TezConfiguration outputConfig = new TezConfiguration( edgeValues.getConfig() ); 388 outputConfig.set( "cascading.node.sink", FlowElements.id( edgeValues.getFlowElement() ) ); 389 outputConfig.set( "cascading.node.ordinals", Util.join( edgeValues.getOrdinals(), "," ) ); 390 addFields( outputConfig, "cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap() ); 391 addFields( outputConfig, "cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap() ); 392 addFields( outputConfig, "cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap() ); 393 394 UserPayload outputPayload = createIntermediatePayloadOutput( outputConfig, edgeValues ); 395 396 TezConfiguration inputConfig = new TezConfiguration( edgeValues.getConfig() ); 397 inputConfig.set( "cascading.node.source", FlowElements.id( edgeValues.getFlowElement() ) ); 398 inputConfig.set( "cascading.node.ordinals", Util.join( edgeValues.getOrdinals(), "," ) ); 399 addFields( inputConfig, "cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap() ); 400 addFields( inputConfig, "cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap() ); 401 addFields( inputConfig, "cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap() ); 402 403 UserPayload inputPayload = createIntermediatePayloadInput( inputConfig, edgeValues ); 404 405 return EdgeProperty.create( 406 edgeValues.getMovementType(), 407 edgeValues.getSourceType(), 408 edgeValues.getSchedulingType(), 409 OutputDescriptor.create( edgeValues.getOutputClassName() ).setUserPayload( outputPayload ), 410 InputDescriptor.create( edgeValues.getInputClassName() ).setUserPayload( inputPayload ) 411 ); 412 } 413 414 private UserPayload createIntermediatePayloadOutput( TezConfiguration config, EdgeValues edgeValues ) 415 { 416 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, edgeValues.keyClassName ); 417 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, edgeValues.valueClassName ); 418 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, edgeValues.keyComparatorClassName ); 419 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, edgeValues.keyPartitionerClassName ); 420 421 setWorkingDirectory( config ); 422 423 return getPayload( config ); 424 } 425 426 private UserPayload createIntermediatePayloadInput( TezConfiguration config, EdgeValues edgeValues ) 427 { 428 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, edgeValues.keyClassName ); 429 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, edgeValues.valueClassName ); 430 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, edgeValues.keyComparatorClassName ); 431 config.set( TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, edgeValues.keyPartitionerClassName ); 432 433 setWorkingDirectory( config ); 434 435 return getPayload( config ); 436 } 437 438 private static void setWorkingDirectory( Configuration conf ) 439 { 440 String name = conf.get( JobContext.WORKING_DIR ); 441 442 if( name != null ) 443 return; 444 445 try 446 { 447 Path dir = FileSystem.get( conf ).getWorkingDirectory(); 448 conf.set( JobContext.WORKING_DIR, dir.toString() ); 449 } 450 catch( IOException exception ) 451 { 452 throw new RuntimeException( exception ); 453 } 454 } 455 456 public Vertex createVertex( FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig, FlowNode flowNode ) 457 { 458 JobConf conf = new JobConf( initializedConfig ); 459 460 addInputOutputMapping( conf, flowNode ); 461 462 conf.setBoolean( "mapred.used.genericoptionsparser", true ); 463 464 Map<String, LocalResource> taskLocalResources = new HashMap<>(); 465 466 Map<FlowElement, Configuration> sourceConfigs = initFromSources( flowNode, flowProcess, conf, taskLocalResources ); 467 Map<FlowElement, Configuration> sinkConfigs = initFromSinks( flowNode, flowProcess, conf ); 468 469 initFromTraps( flowNode, flowProcess, conf ); 470 471 initFromNodeConfigDef( flowNode, conf ); 472 473 // force step to local mode if any tap is local 474 setLocalMode( initializedConfig, conf, null ); 475 476 conf.set( "cascading.flow.node.num", Integer.toString( flowNode.getOrdinal() ) ); 477 478 HadoopUtil.setIsInflow( conf ); // must be called after all taps configurations have been retrieved 479 480 int parallelism = getParallelism( flowNode, conf ); 481 482 if( parallelism == 0 ) 483 throw new FlowException( getName(), "the default number of gather partitions must be set, see cascading.flow.FlowRuntimeProps" ); 484 485 flowNode.addProcessAnnotation( FlowRuntimeProps.GATHER_PARTITIONS, Integer.toString( parallelism ) ); 486 487 Vertex vertex = newVertex( flowNode, conf, parallelism ); 488 489 if( !taskLocalResources.isEmpty() ) 490 vertex.addTaskLocalFiles( taskLocalResources ); 491 492 for( FlowElement flowElement : sourceConfigs.keySet() ) 493 { 494 if( !( flowElement instanceof Tap ) ) 495 continue; 496 497 Configuration sourceConf = sourceConfigs.get( flowElement ); 498 499 // not setting the new-api value could result in failures if not set by the Scheme 500 if( sourceConf.get( "mapred.mapper.new-api" ) == null ) 501 HadoopUtil.setNewApi( sourceConf, sourceConf.get( "mapred.input.format.class", sourceConf.get( "mapreduce.job.inputformat.class" ) ) ); 502 503 // unfortunately we cannot just load the input format and set it on the builder with also pulling all other 504 // values out of the configuration. 505 MRInput.MRInputConfigBuilder configBuilder = MRInput.createConfigBuilder( sourceConf, null ); 506 507 // the default in Tez is true, this overrides 508 if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) != null ) 509 configBuilder.groupSplits( conf.getBoolean( FlowRuntimeProps.COMBINE_SPLITS, true ) ); 510 511 // grouping splits loses file name info, breaking partition tap default impl 512 if( flowElement instanceof PartitionTap ) // todo: generify 513 configBuilder.groupSplits( false ); 514 515 DataSourceDescriptor dataSourceDescriptor = configBuilder.build(); 516 517 vertex.addDataSource( FlowElements.id( flowElement ), dataSourceDescriptor ); 518 } 519 520 for( FlowElement flowElement : sinkConfigs.keySet() ) 521 { 522 if( !( flowElement instanceof Tap ) ) 523 continue; 524 525 Configuration sinkConf = sinkConfigs.get( flowElement ); 526 527 Class outputFormatClass; 528 String outputPath; 529 530 // we have to set sane defaults if not set by the tap 531 // typically the case of MultiSinkTap 532 String formatClassName = sinkConf.get( "mapred.output.format.class", sinkConf.get( "mapreduce.job.outputformat.class" ) ); 533 534 if( formatClassName == null ) 535 { 536 outputFormatClass = TextOutputFormat.class; // unused, use "new" api, its the default 537 outputPath = Hfs.getTempPath( sinkConf ).toString(); // unused 538 } 539 else 540 { 541 outputFormatClass = Util.loadClass( formatClassName ); 542 outputPath = getOutputPath( sinkConf ); 543 } 544 545 if( outputPath == null && getOutputPath( sinkConf ) == null && isFileOutputFormat( outputFormatClass ) ) 546 outputPath = Hfs.getTempPath( sinkConf ).toString(); // unused 547 548 MROutput.MROutputConfigBuilder configBuilder = MROutput.createConfigBuilder( sinkConf, outputFormatClass, outputPath ); 549 550 DataSinkDescriptor dataSinkDescriptor = configBuilder.build(); 551 552 vertex.addDataSink( FlowElements.id( flowElement ), dataSinkDescriptor ); 553 } 554 555 addRemoteDebug( flowNode, vertex ); 556 addRemoteProfiling( flowNode, vertex ); 557 558 if( vertex.getTaskLaunchCmdOpts() != null ) 559 flowNode.addProcessAnnotation( TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, vertex.getTaskLaunchCmdOpts() ); 560 561 return vertex; 562 } 563 564 protected String getOutputPath( Configuration sinkConf ) 565 { 566 return sinkConf.get( "mapred.output.dir", sinkConf.get( "mapreduce.output.fileoutputformat.outputdir" ) ); 567 } 568 569 protected boolean isFileOutputFormat( Class outputFormatClass ) 570 { 571 return org.apache.hadoop.mapred.FileOutputFormat.class.isAssignableFrom( outputFormatClass ) || 572 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.isAssignableFrom( outputFormatClass ); 573 } 574 575 protected int getParallelism( FlowNode flowNode, JobConf conf ) 576 { 577 // only count streamed taps, accumulated taps are always annotated 578 HashSet<Tap> sourceStreamedTaps = new HashSet<>( flowNode.getSourceTaps() ); 579 580 sourceStreamedTaps.removeAll( flowNode.getSourceElements( StreamMode.Accumulated ) ); 581 582 if( sourceStreamedTaps.size() != 0 ) 583 return -1; 584 585 int parallelism = Integer.MAX_VALUE; 586 587 for( Tap tap : flowNode.getSinkTaps() ) 588 { 589 int numSinkParts = tap.getScheme().getNumSinkParts(); 590 591 if( numSinkParts == 0 ) 592 continue; 593 594 if( parallelism != Integer.MAX_VALUE ) 595 LOG.info( "multiple sink taps in flow node declaring numSinkParts, choosing lowest value. see cascading.flow.FlowRuntimeProps for broader control." ); 596 597 parallelism = Math.min( parallelism, numSinkParts ); 598 } 599 600 if( parallelism != Integer.MAX_VALUE ) 601 return parallelism; 602 603 return conf.getInt( FlowRuntimeProps.GATHER_PARTITIONS, 0 ); 604 } 605 606 private void addInputOutputMapping( JobConf conf, FlowNode flowNode ) 607 { 608 FlowNodeGraph flowNodeGraph = getFlowNodeGraph(); 609 Set<ProcessEdge> incomingEdges = flowNodeGraph.incomingEdgesOf( flowNode ); 610 611 for( ProcessEdge processEdge : incomingEdges ) 612 conf.set( "cascading.node.source." + processEdge.getFlowElementID(), processEdge.getSourceProcessID() ); 613 614 Set<ProcessEdge> outgoingEdges = flowNodeGraph.outgoingEdgesOf( flowNode ); 615 616 for( ProcessEdge processEdge : outgoingEdges ) 617 conf.set( "cascading.node.sink." + processEdge.getFlowElementID(), processEdge.getSinkProcessID() ); 618 } 619 620 protected Map<FlowElement, Configuration> initFromSources( FlowNode flowNode, FlowProcess<TezConfiguration> flowProcess, 621 Configuration conf, Map<String, LocalResource> taskLocalResources ) 622 { 623 Set<? extends FlowElement> accumulatedSources = flowNode.getSourceElements( StreamMode.Accumulated ); 624 625 for( FlowElement element : accumulatedSources ) 626 { 627 if( element instanceof Tap ) 628 { 629 JobConf current = new JobConf( conf ); 630 Tap tap = (Tap) element; 631 632 if( tap.getIdentifier() == null ) 633 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() ); 634 635 tap.sourceConfInit( flowProcess, current ); 636 637 Collection<String> paths = current.getStringCollection( CASCADING_LOCAL_RESOURCES + Tap.id( tap ) ); 638 639 if( !paths.isEmpty() ) 640 { 641 String flowStagingPath = ( (Hadoop2TezFlow) getFlow() ).getFlowStagingPath(); 642 String resourceSubPath = Tap.id( tap ); 643 Map<Path, Path> pathMap = TezUtil.addToClassPath( current, flowStagingPath, resourceSubPath, paths, LocalResourceType.FILE, taskLocalResources, null ); 644 645 current.setStrings( CASCADING_REMOTE_RESOURCES + Tap.id( tap ), taskLocalResources.keySet().toArray( new String[ taskLocalResources.size() ] ) ); 646 647 allLocalResources.putAll( taskLocalResources ); 648 syncPaths.putAll( pathMap ); 649 } 650 651 Map<String, String> map = flowProcess.diffConfigIntoMap( new TezConfiguration( conf ), new TezConfiguration( current ) ); 652 conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) ); 653 654 setLocalMode( conf, current, tap ); 655 } 656 } 657 658 Set<FlowElement> sources = new HashSet<>( flowNode.getSourceElements() ); 659 660 sources.removeAll( accumulatedSources ); 661 662 if( sources.isEmpty() ) 663 throw new IllegalStateException( "all sources marked as accumulated" ); 664 665 Map<FlowElement, Configuration> configs = new HashMap<>(); 666 667 for( FlowElement element : sources ) 668 { 669 JobConf current = new JobConf( conf ); 670 671 String id = FlowElements.id( element ); 672 673 current.set( "cascading.node.source", id ); 674 675 if( element instanceof Tap ) 676 { 677 Tap tap = (Tap) element; 678 679 if( tap.getIdentifier() == null ) 680 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() ); 681 682 tap.sourceConfInit( flowProcess, current ); 683 684 setLocalMode( conf, current, tap ); 685 } 686 687 configs.put( element, current ); 688 } 689 690 return configs; 691 } 692 693 protected Map<FlowElement, Configuration> initFromSinks( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf ) 694 { 695 Set<FlowElement> sinks = flowNode.getSinkElements(); 696 Map<FlowElement, Configuration> configs = new HashMap<>(); 697 698 for( FlowElement element : sinks ) 699 { 700 JobConf current = new JobConf( conf ); 701 702 if( element instanceof Tap ) 703 { 704 Tap tap = (Tap) element; 705 706 if( tap.getIdentifier() == null ) 707 throw new IllegalStateException( "tap may not have null identifier: " + element.toString() ); 708 709 tap.sinkConfInit( flowProcess, current ); 710 711 setLocalMode( conf, current, tap ); 712 } 713 714 String id = FlowElements.id( element ); 715 716 current.set( "cascading.node.sink", id ); 717 718 configs.put( element, current ); 719 } 720 721 return configs; 722 } 723 724 private void initFromNodeConfigDef( FlowNode flowNode, Configuration conf ) 725 { 726 initConfFromNodeConfigDef( flowNode.getElementGraph(), new ConfigurationSetter( conf ) ); 727 } 728 729 private void initFromStepConfigDef( Configuration conf ) 730 { 731 initConfFromStepConfigDef( new ConfigurationSetter( conf ) ); 732 } 733 734 protected void initFromTraps( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf ) 735 { 736 Map<String, Tap> traps = flowNode.getTrapMap(); 737 738 if( !traps.isEmpty() ) 739 { 740 JobConf trapConf = new JobConf( conf ); 741 742 for( Tap tap : traps.values() ) 743 { 744 tap.sinkConfInit( flowProcess, trapConf ); 745 setLocalMode( conf, trapConf, tap ); 746 } 747 } 748 } 749 750 private Vertex newVertex( FlowNode flowNode, Configuration conf, int parallelism ) 751 { 752 conf.set( FlowNode.CASCADING_FLOW_NODE, pack( flowNode, conf ) ); // todo: pack into payload directly 753 754 ProcessorDescriptor descriptor = ProcessorDescriptor.create( FlowProcessor.class.getName() ); 755 756 descriptor.setUserPayload( getPayload( conf ) ); 757 758 Vertex vertex = Vertex.create( flowNode.getID(), descriptor, parallelism ); 759 760 if( environment != null ) 761 vertex.setTaskEnvironment( environment ); 762 763 return vertex; 764 } 765 766 private UserPayload getPayload( Configuration conf ) 767 { 768 try 769 { 770 return TezUtils.createUserPayloadFromConf( conf ); 771 } 772 catch( IOException exception ) 773 { 774 throw new CascadingException( exception ); 775 } 776 } 777 778 private String pack( Object object, Configuration conf ) 779 { 780 try 781 { 782 return serializeBase64( object, conf, true ); 783 } 784 catch( IOException exception ) 785 { 786 throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception ); 787 } 788 } 789 790 @Override 791 public void clean( TezConfiguration config ) 792 { 793 for( Tap sink : getSinkTaps() ) 794 { 795 if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) ) 796 { 797 try 798 { 799 sink.deleteResource( config ); 800 } 801 catch( Exception exception ) 802 { 803 // sink all exceptions, don't fail app 804 logWarn( "unable to remove temporary file: " + sink, exception ); 805 } 806 } 807 else 808 { 809 cleanTapMetaData( config, sink ); 810 } 811 } 812 813 for( Tap tap : getTraps() ) 814 cleanTapMetaData( config, tap ); 815 } 816 817 private void cleanTapMetaData( TezConfiguration config, Tap tap ) 818 { 819 try 820 { 821 Hadoop18TapUtil.cleanupTapMetaData( config, tap ); 822 } 823 catch( IOException exception ) 824 { 825 // ignore exception 826 } 827 } 828 829 public void syncArtifacts() 830 { 831 // this may not be strictly necessary, but there is a condition where setting the access time 832 // fails, so there may be one were setting the modification time fails. if so, we can compensate. 833 Map<String, Long> timestamps = HadoopUtil.syncPaths( getConfig(), syncPaths, true ); 834 835 for( Map.Entry<String, Long> entry : timestamps.entrySet() ) 836 { 837 LocalResource localResource = allLocalResources.get( entry.getKey() ); 838 839 if( localResource != null ) 840 localResource.setTimestamp( entry.getValue() ); 841 } 842 } 843 844 private void setLocalMode( Configuration parent, JobConf current, Tap tap ) 845 { 846 // force step to local mode 847 if( !HadoopUtil.isLocal( current ) ) 848 return; 849 850 if( tap != null ) 851 logInfo( "tap forcing step to tez local mode: " + tap.getIdentifier() ); 852 853 HadoopUtil.setLocal( parent ); 854 } 855 856 private void addRemoteDebug( FlowNode flowNode, Vertex vertex ) 857 { 858 String value = System.getProperty( "test.debug.node", null ); 859 860 if( Util.isEmpty( value ) ) 861 return; 862 863 if( !flowNode.getSourceElementNames().contains( value ) && asInt( value ) != flowNode.getOrdinal() ) 864 return; 865 866 LOG.warn( "remote debugging enabled with property: {}, on node: {}, with node id: {}", "test.debug.node", value, flowNode.getID() ); 867 868 String opts = vertex.getTaskLaunchCmdOpts(); 869 870 if( opts == null ) 871 opts = ""; 872 873 String address = System.getProperty( "test.debug.address", "localhost:5005" ).trim(); 874 875 opts += " -agentlib:jdwp=transport=dt_socket,server=n,address=" + address + ",suspend=y"; 876 877 vertex.setTaskLaunchCmdOpts( opts ); 878 } 879 880 private void addRemoteProfiling( FlowNode flowNode, Vertex vertex ) 881 { 882 String value = System.getProperty( "test.profile.node", null ); 883 884 if( Util.isEmpty( value ) ) 885 return; 886 887 if( !flowNode.getSourceElementNames().contains( value ) && asInt( value ) != flowNode.getOrdinal() ) 888 return; 889 890 LOG.warn( "remote profiling enabled with property: {}, on node: {}, with node id: {}", "test.profile.node", value, flowNode.getID() ); 891 892 String opts = vertex.getTaskLaunchCmdOpts(); 893 894 if( opts == null ) 895 opts = ""; 896 897 String path = System.getProperty( "test.profile.path", "/tmp/jfr/" ); 898 899 if( !path.endsWith( "/" ) ) 900 path += "/"; 901 902 LOG.warn( "remote profiling property: {}, logging to path: {}", "test.profile.path", path ); 903 904 opts += String.format( " -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=%1$s%2$s,disk=true,repository=%1$s%2$s", path, flowNode.getID() ); 905 906 vertex.setTaskLaunchCmdOpts( opts ); 907 } 908 909 private int asInt( String value ) 910 { 911 try 912 { 913 return Integer.parseInt( value ); 914 } 915 catch( NumberFormatException exception ) 916 { 917 return -1; 918 } 919 } 920 921 public Map<String, LocalResource> getAllLocalResources() 922 { 923 return allLocalResources; 924 } 925 926 private static class EdgeValues 927 { 928 FlowElement flowElement; 929 TezConfiguration config; 930 Set<Integer> ordinals; 931 String keyClassName; 932 String valueClassName; 933 String keyComparatorClassName; 934 String keyPartitionerClassName; 935 String outputClassName; 936 String inputClassName; 937 EdgeProperty.DataMovementType movementType; 938 EdgeProperty.DataSourceType sourceType; 939 EdgeProperty.SchedulingType schedulingType; 940 941 Map<Integer, Fields> resolvedKeyFieldsMap; 942 Map<Integer, Fields> resolvedSortFieldsMap; 943 Map<Integer, Fields> resolvedValueFieldsMap; 944 945 private EdgeValues( TezConfiguration config, ProcessEdge processEdge ) 946 { 947 this.config = config; 948 this.flowElement = processEdge.getFlowElement(); 949 this.ordinals = processEdge.getSourceProvidedOrdinals(); 950 951 this.resolvedKeyFieldsMap = processEdge.getResolvedKeyFields(); 952 this.resolvedSortFieldsMap = processEdge.getResolvedSortFields(); 953 this.resolvedValueFieldsMap = processEdge.getResolvedValueFields(); 954 } 955 956 public FlowElement getFlowElement() 957 { 958 return flowElement; 959 } 960 961 public TezConfiguration getConfig() 962 { 963 return config; 964 } 965 966 public Set getOrdinals() 967 { 968 return ordinals; 969 } 970 971 public String getKeyClassName() 972 { 973 return keyClassName; 974 } 975 976 public String getValueClassName() 977 { 978 return valueClassName; 979 } 980 981 public String getKeyComparatorClassName() 982 { 983 return keyComparatorClassName; 984 } 985 986 public String getKeyPartitionerClassName() 987 { 988 return keyPartitionerClassName; 989 } 990 991 public String getOutputClassName() 992 { 993 return outputClassName; 994 } 995 996 public String getInputClassName() 997 { 998 return inputClassName; 999 } 1000 1001 public EdgeProperty.DataMovementType getMovementType() 1002 { 1003 return movementType; 1004 } 1005 1006 public EdgeProperty.DataSourceType getSourceType() 1007 { 1008 return sourceType; 1009 } 1010 1011 public EdgeProperty.SchedulingType getSchedulingType() 1012 { 1013 return schedulingType; 1014 } 1015 1016 public Map<Integer, Fields> getResolvedKeyFieldsMap() 1017 { 1018 return resolvedKeyFieldsMap; 1019 } 1020 1021 public Map<Integer, Fields> getResolvedSortFieldsMap() 1022 { 1023 return resolvedSortFieldsMap; 1024 } 1025 1026 public Map<Integer, Fields> getResolvedValueFieldsMap() 1027 { 1028 return resolvedValueFieldsMap; 1029 } 1030 } 1031 }