001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop.planner; 022 023 import java.net.URI; 024 import java.util.Collections; 025 import java.util.Comparator; 026 import java.util.HashMap; 027 import java.util.HashSet; 028 import java.util.Iterator; 029 import java.util.List; 030 import java.util.Map; 031 import java.util.Properties; 032 import java.util.Set; 033 import java.util.TreeSet; 034 035 import cascading.flow.FlowConnector; 036 import cascading.flow.FlowDef; 037 import cascading.flow.FlowElement; 038 import cascading.flow.hadoop.HadoopFlow; 039 import cascading.flow.hadoop.util.HadoopUtil; 040 import cascading.flow.planner.ElementGraph; 041 import cascading.flow.planner.ElementGraphs; 042 import cascading.flow.planner.FlowPlanner; 043 import cascading.flow.planner.FlowStepGraph; 044 import cascading.flow.planner.PlatformInfo; 045 import cascading.flow.planner.Scope; 046 import cascading.pipe.CoGroup; 047 import cascading.pipe.Every; 048 import cascading.pipe.Group; 049 import cascading.pipe.Pipe; 050 import cascading.property.AppProps; 051 import cascading.property.PropertyUtil; 052 import cascading.tap.Tap; 053 import cascading.tap.hadoop.Hfs; 054 import cascading.tap.hadoop.util.TempHfs; 055 import cascading.util.Util; 056 import org.apache.hadoop.mapred.JobConf; 057 import org.jgrapht.GraphPath; 058 import org.jgrapht.Graphs; 059 import org.slf4j.Logger; 060 import org.slf4j.LoggerFactory; 061 062 import static cascading.flow.planner.ElementGraphs.getAllShortestPathsBetween; 063 064 /** 065 * Class HadoopPlanner is the core Hadoop MapReduce planner. 066 * <p/> 067 * Notes: 068 * <p/> 069 * <strong>Custom JobConf properties</strong><br/> 070 * A custom JobConf instance can be passed to this planner by calling {@link #copyJobConf(java.util.Map, org.apache.hadoop.mapred.JobConf)} 071 * on a map properties object before constructing a new {@link cascading.flow.hadoop.HadoopFlowConnector}. 072 * <p/> 073 * A better practice would be to set Hadoop properties directly on the map properties object handed to the FlowConnector. 074 * All values in the map will be passed to a new default JobConf instance to be used as defaults for all resulting 075 * Flow instances. 076 * <p/> 077 * For example, {@code properties.set("mapred.child.java.opts","-Xmx512m");} would convince Hadoop 078 * to spawn all child jvms with a heap of 512MB. 079 */ 080 public class HadoopPlanner extends FlowPlanner<HadoopFlow, JobConf> 081 { 082 /** Field LOG */ 083 private static final Logger LOG = LoggerFactory.getLogger( HadoopPlanner.class ); 084 085 /** Field jobConf */ 086 private JobConf jobConf; 087 /** Field intermediateSchemeClass */ 088 private Class intermediateSchemeClass; 089 090 /** 091 * Method copyJobConf adds the given JobConf values to the given properties object. Use this method to pass 092 * custom default Hadoop JobConf properties to Hadoop. 093 * 094 * @param properties of type Map 095 * @param jobConf of type JobConf 096 */ 097 public static void copyJobConf( Map<Object, Object> properties, JobConf jobConf ) 098 { 099 for( Map.Entry<String, String> entry : jobConf ) 100 properties.put( entry.getKey(), entry.getValue() ); 101 } 102 103 /** 104 * Method createJobConf returns a new JobConf instance using the values in the given properties argument. 105 * 106 * @param properties of type Map 107 * @return a JobConf instance 108 */ 109 public static JobConf createJobConf( Map<Object, Object> properties ) 110 { 111 JobConf conf = new JobConf(); 112 113 copyProperties( conf, properties ); 114 115 return conf; 116 } 117 118 /** 119 * Method copyProperties adds the given Map values to the given JobConf object. 120 * 121 * @param jobConf of type JobConf 122 * @param properties of type Map 123 */ 124 public static void copyProperties( JobConf jobConf, Map<Object, Object> properties ) 125 { 126 if( properties instanceof Properties ) 127 { 128 Properties props = (Properties) properties; 129 Set<String> keys = props.stringPropertyNames(); 130 131 for( String key : keys ) 132 jobConf.set( key, props.getProperty( key ) ); 133 } 134 else 135 { 136 for( Map.Entry<Object, Object> entry : properties.entrySet() ) 137 { 138 if( entry.getValue() != null ) 139 jobConf.set( entry.getKey().toString(), entry.getValue().toString() ); 140 } 141 } 142 } 143 144 /** 145 * Method setNormalizeHeterogeneousSources adds the given doNormalize boolean to the given properties object. 146 * Use this method if additional jobs should be planned in to handle incompatible InputFormat classes. 147 * <p/> 148 * Normalization is off by default and should only be enabled by advanced users. Typically this will decrease 149 * application performance. 150 * 151 * @param properties of type Map 152 * @param doNormalize of type boolean 153 */ 154 @Deprecated 155 public static void setNormalizeHeterogeneousSources( Map<Object, Object> properties, boolean doNormalize ) 156 { 157 properties.put( "cascading.multimapreduceplanner.normalizesources", Boolean.toString( doNormalize ) ); 158 } 159 160 /** 161 * Method getNormalizeHeterogeneousSources returns if this planner will normalize heterogeneous input sources. 162 * 163 * @param properties of type Map 164 * @return a boolean 165 */ 166 @Deprecated 167 public static boolean getNormalizeHeterogeneousSources( Map<Object, Object> properties ) 168 { 169 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, "cascading.multimapreduceplanner.normalizesources", "false" ) ); 170 } 171 172 /** 173 * Method setCollapseAdjacentTaps enables/disables an optimization that will identify if a sink tap and an intermediate tap 174 * are equivalent field wise, and discard the intermediate tap for the sink tap to minimize the number of MR jobs. 175 * <p/> 176 * Note that some Scheme types may lose type information if the planner cannot detect field types. This could result 177 * in type mismatch errors during joins. 178 * 179 * @param properties 180 * @param collapseAdjacent 181 */ 182 public static void setCollapseAdjacentTaps( Map<Object, Object> properties, boolean collapseAdjacent ) 183 { 184 properties.put( "cascading.multimapreduceplanner.collapseadjacentaps", Boolean.toString( collapseAdjacent ) ); 185 } 186 187 public static boolean getCollapseAdjacentTaps( Map<Object, Object> properties ) 188 { 189 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, "cascading.multimapreduceplanner.collapseadjacentaps", "true" ) ); 190 } 191 192 @Override 193 public JobConf getConfig() 194 { 195 return jobConf; 196 } 197 198 @Override 199 public PlatformInfo getPlatformInfo() 200 { 201 return HadoopUtil.getPlatformInfo(); 202 } 203 204 @Override 205 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 206 { 207 super.initialize( flowConnector, properties ); 208 209 jobConf = HadoopUtil.createJobConf( properties, createJobConf( properties ) ); 210 checkPlatform( jobConf ); 211 intermediateSchemeClass = flowConnector.getIntermediateSchemeClass( properties ); 212 213 Class type = AppProps.getApplicationJarClass( properties ); 214 if( jobConf.getJar() == null && type != null ) 215 jobConf.setJarByClass( type ); 216 217 String path = AppProps.getApplicationJarPath( properties ); 218 if( jobConf.getJar() == null && path != null ) 219 jobConf.setJar( path ); 220 221 if( jobConf.getJar() == null ) 222 jobConf.setJarByClass( HadoopUtil.findMainClass( HadoopPlanner.class ) ); 223 224 AppProps.setApplicationJarPath( properties, jobConf.getJar() ); 225 226 LOG.info( "using application jar: {}", jobConf.getJar() ); 227 } 228 229 protected void checkPlatform( JobConf jobConf ) 230 { 231 if( HadoopUtil.isYARN( jobConf ) ) 232 LOG.warn( "running YARN based flows on Hadoop 1.x may cause problems, please use the 'cascading-hadoop2-mr1' dependencies" ); 233 } 234 235 @Override 236 protected HadoopFlow createFlow( FlowDef flowDef ) 237 { 238 return new HadoopFlow( getPlatformInfo(), getProperties(), getConfig(), flowDef ); 239 } 240 241 @Override 242 public HadoopFlow buildFlow( FlowDef flowDef ) 243 { 244 ElementGraph elementGraph = null; 245 246 try 247 { 248 // generic 249 verifyAllTaps( flowDef ); 250 251 HadoopFlow flow = createFlow( flowDef ); 252 253 Pipe[] tails = resolveTails( flowDef, flow ); 254 255 verifyAssembly( flowDef, tails ); 256 257 elementGraph = createElementGraph( flowDef, tails ); 258 259 // rules 260 failOnLoneGroupAssertion( elementGraph ); 261 failOnMissingGroup( elementGraph ); 262 failOnMisusedBuffer( elementGraph ); 263 failOnGroupEverySplit( elementGraph ); 264 265 // m/r specific 266 handleWarnEquivalentPaths( elementGraph ); 267 handleSplit( elementGraph ); 268 handleJobPartitioning( elementGraph ); 269 handleJoins( elementGraph ); 270 handleNonSafeOperations( elementGraph ); 271 272 if( getNormalizeHeterogeneousSources( properties ) ) 273 handleHeterogeneousSources( elementGraph ); 274 275 // generic 276 elementGraph.removeUnnecessaryPipes(); // groups must be added before removing pipes 277 elementGraph.resolveFields(); 278 279 elementGraph = flow.updateSchemes( elementGraph ); 280 281 // m/r specific 282 if( getCollapseAdjacentTaps( properties ) ) 283 handleAdjacentTaps( elementGraph ); 284 285 FlowStepGraph flowStepGraph = new HadoopStepGraph( flowDef.getName(), elementGraph ); 286 287 flow.initialize( elementGraph, flowStepGraph ); 288 289 return flow; 290 } 291 catch( Exception exception ) 292 { 293 throw handleExceptionDuringPlanning( exception, elementGraph ); 294 } 295 } 296 297 private void handleWarnEquivalentPaths( ElementGraph elementGraph ) 298 { 299 List<CoGroup> coGroups = elementGraph.findAllCoGroups(); 300 301 for( CoGroup coGroup : coGroups ) 302 { 303 List<GraphPath<FlowElement, Scope>> graphPaths = elementGraph.getAllShortestPathsTo( coGroup ); 304 305 List<List<FlowElement>> paths = ElementGraphs.asPathList( graphPaths ); 306 307 if( !areEquivalentPaths( elementGraph, paths ) ) 308 continue; 309 310 LOG.warn( "found equivalent paths from: {} to: {}", paths.get( 0 ).get( 1 ), coGroup ); 311 312 // in order to remove dupe paths, we need to verify there isn't any branching 313 } 314 } 315 316 private boolean areEquivalentPaths( ElementGraph elementGraph, List<List<FlowElement>> paths ) 317 { 318 int length = sameLength( paths ); 319 320 if( length == -1 ) 321 return false; 322 323 Set<FlowElement> elements = new TreeSet<FlowElement>( new EquivalenceComparator( elementGraph ) ); 324 325 for( int i = 0; i < length; i++ ) 326 { 327 elements.clear(); 328 329 for( List<FlowElement> path : paths ) 330 elements.add( path.get( i ) ); 331 332 if( elements.size() != 1 ) 333 return false; 334 } 335 336 return true; 337 } 338 339 private class EquivalenceComparator implements Comparator<FlowElement> 340 { 341 private final ElementGraph elementGraph; 342 343 public EquivalenceComparator( ElementGraph elementGraph ) 344 { 345 this.elementGraph = elementGraph; 346 } 347 348 @Override 349 public int compare( FlowElement lhs, FlowElement rhs ) 350 { 351 boolean areEquivalent = lhs.isEquivalentTo( rhs ); 352 boolean sameIncoming = elementGraph.inDegreeOf( lhs ) == elementGraph.inDegreeOf( rhs ); 353 boolean sameOutgoing = elementGraph.outDegreeOf( lhs ) == elementGraph.outDegreeOf( rhs ); 354 355 if( areEquivalent && sameIncoming && sameOutgoing ) 356 return 0; 357 358 return System.identityHashCode( lhs ) - System.identityHashCode( rhs ); 359 } 360 } 361 362 private int sameLength( List<List<FlowElement>> paths ) 363 { 364 int lastSize = paths.get( 0 ).size(); 365 366 for( int i = 1; i < paths.size(); i++ ) 367 { 368 if( paths.get( i ).size() != lastSize ) 369 return -1; 370 } 371 372 return lastSize; 373 } 374 375 /** 376 * optimized for this case 377 * <pre> 378 * e - t e1 - e - t 379 * t - e1 - -- > t - 380 * e - t e1 - e - t 381 * </pre> 382 * <p/> 383 * this should run in two map/red jobs, not 3. needs to be a flag on e1 to prevent this 384 * <p/> 385 * <pre> 386 * g - t g - t 387 * g - e - --> g - e - t - 388 * g - t g - t 389 * </pre> 390 * <p/> 391 * <pre> 392 * - e - e e - e 393 * t - e1 - e2 - g --> t - e1 - e2 - t - - g 394 * - e - e e - e 395 * </pre> 396 * 397 * @param elementGraph 398 */ 399 private void handleSplit( ElementGraph elementGraph ) 400 { 401 // if there was a graph change, iterate paths again. 402 while( !internalSplit( elementGraph ) ) 403 ; 404 } 405 406 private boolean internalSplit( ElementGraph elementGraph ) 407 { 408 List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsBetweenExtents(); 409 410 for( GraphPath<FlowElement, Scope> path : paths ) 411 { 412 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); 413 Set<Pipe> tapInsertions = new HashSet<Pipe>(); 414 FlowElement lastInsertable = null; 415 416 for( int i = 0; i < flowElements.size(); i++ ) 417 { 418 FlowElement flowElement = flowElements.get( i ); 419 420 if( flowElement instanceof ElementGraph.Extent ) // is an extent: head or tail 421 continue; 422 423 // if Tap, Group, or Every - we insert the tap here 424 if( flowElement instanceof Tap || flowElement instanceof Group || flowElement instanceof Every ) 425 lastInsertable = flowElement; 426 427 // support splits on Pipe unless the previous is a Tap 428 if( flowElement.getClass() == Pipe.class && flowElements.get( i - 1 ) instanceof Tap ) 429 continue; 430 431 if( flowElement instanceof Tap ) 432 continue; 433 434 if( elementGraph.outDegreeOf( flowElement ) <= 1 ) 435 continue; 436 437 // we are at the root of a split here 438 439 // do any split paths converge on a single Group? 440 int maxPaths = elementGraph.getMaxNumPathsBetweenElementAndGroupingMergeJoin( flowElement ); 441 if( maxPaths <= 1 && lastInsertable instanceof Tap ) 442 continue; 443 444 tapInsertions.add( (Pipe) flowElement ); 445 } 446 447 for( Pipe pipe : tapInsertions ) 448 insertTempTapAfter( elementGraph, pipe ); 449 450 if( !tapInsertions.isEmpty() ) 451 return false; 452 } 453 454 return true; 455 } 456 457 /** 458 * will collapse adjacent and equivalent taps. 459 * equivalence is based on the tap adjacent taps using the same filesystem 460 * and the sink being symmetrical, and having the same fields as the temp tap. 461 * <p/> 462 * <p/> 463 * must be run after fields are resolved so temp taps have fully defined scheme instances. 464 * 465 * @param elementGraph 466 */ 467 private void handleAdjacentTaps( ElementGraph elementGraph ) 468 { 469 // if there was a graph change, iterate paths again. 470 while( !internalAdjacentTaps( elementGraph ) ) 471 ; 472 } 473 474 private boolean internalAdjacentTaps( ElementGraph elementGraph ) 475 { 476 List<Tap> taps = elementGraph.findAllTaps(); 477 478 for( Tap tap : taps ) 479 { 480 if( !( tap.isTemporary() ) ) 481 continue; 482 483 for( FlowElement successor : elementGraph.getAllSuccessors( tap ) ) 484 { 485 if( !( successor instanceof Hfs ) ) 486 continue; 487 488 Hfs successorTap = (Hfs) successor; 489 490 // does this scheme source what it sinks 491 if( !successorTap.getScheme().isSymmetrical() ) 492 continue; 493 494 URI tempURIScheme = getDefaultURIScheme( tap ); // temp uses default fs 495 URI successorURIScheme = getURIScheme( successorTap ); 496 497 if( !tempURIScheme.equals( successorURIScheme ) ) 498 continue; 499 500 // safe, both are symmetrical 501 // should be called after fields are resolved 502 if( !tap.getSourceFields().equals( successorTap.getSourceFields() ) ) 503 continue; 504 505 elementGraph.replaceElementWith( tap, successor ); 506 507 return false; 508 } 509 } 510 511 return true; 512 } 513 514 private URI getDefaultURIScheme( Tap tap ) 515 { 516 return ( (Hfs) tap ).getDefaultFileSystemURIScheme( jobConf ); 517 } 518 519 private URI getURIScheme( Tap tap ) 520 { 521 return ( (Hfs) tap ).getURIScheme( jobConf ); 522 } 523 524 private void handleHeterogeneousSources( ElementGraph elementGraph ) 525 { 526 while( !internalHeterogeneousSources( elementGraph ) ) 527 ; 528 } 529 530 private boolean internalHeterogeneousSources( ElementGraph elementGraph ) 531 { 532 // find all Groups 533 List<Group> groups = elementGraph.findAllMergeJoinGroups(); 534 535 // compare group sources 536 Map<Group, Set<Tap>> normalizeGroups = new HashMap<Group, Set<Tap>>(); 537 538 for( Group group : groups ) 539 { 540 Set<Tap> taps = new HashSet<Tap>(); 541 542 // iterate each shortest path to current group finding each tap sourcing the merge/join 543 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( group ) ) 544 { 545 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is group 546 Collections.reverse( flowElements ); // first element is group 547 548 for( FlowElement previousElement : flowElements ) 549 { 550 if( previousElement instanceof Tap ) 551 { 552 taps.add( (Tap) previousElement ); 553 break; // stop finding taps in this path 554 } 555 } 556 } 557 558 if( taps.size() == 1 ) 559 continue; 560 561 Iterator<Tap> iterator = taps.iterator(); 562 Tap commonTap = iterator.next(); 563 564 while( iterator.hasNext() ) 565 { 566 Tap tap = iterator.next(); 567 568 // making assumption hadoop can handle multiple filesytems, but not multiple inputformats 569 // in the same job 570 // possibly could test for common input format 571 if( getSchemeClass( tap ) != getSchemeClass( commonTap ) ) 572 { 573 normalizeGroups.put( group, taps ); 574 break; 575 } 576 } 577 } 578 579 // if incompatible, insert Tap after its join/merge pipe 580 for( Group group : normalizeGroups.keySet() ) 581 { 582 Set<Tap> taps = normalizeGroups.get( group ); 583 584 for( Tap tap : taps ) 585 { 586 if( tap.isTemporary() || getSchemeClass( tap ).equals( intermediateSchemeClass ) ) // we normalize to TempHfs 587 continue; 588 589 // handle case where there is a split on a pipe between the tap and group 590 for( GraphPath<FlowElement, Scope> path : getAllShortestPathsBetween( elementGraph, tap, group ) ) 591 { 592 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // shortest path tap -> group 593 Collections.reverse( flowElements ); // group -> tap 594 595 FlowElement flowElement = flowElements.get( 1 ); 596 597 if( flowElement instanceof Tap && ( (Tap) flowElement ).isTemporary() ) 598 continue; 599 600 LOG.warn( "inserting step to normalize incompatible sources: {}", tap ); 601 602 insertTempTapAfter( elementGraph, (Pipe) flowElement ); 603 604 return false; 605 } 606 } 607 } 608 609 return normalizeGroups.isEmpty(); 610 } 611 612 @Override 613 protected Tap makeTempTap( String prefix, String name ) 614 { 615 // must give Taps unique names 616 return new TempHfs( jobConf, Util.makePath( prefix, name ), intermediateSchemeClass, prefix == null ); 617 } 618 619 private Class getSchemeClass( Tap tap ) 620 { 621 if( tap.isTemporary() ) 622 return ( (TempHfs) tap ).getSchemeClass(); 623 else 624 return tap.getScheme().getClass(); 625 } 626 }