001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop.planner; 022 023 import java.net.URI; 024 import java.util.Collections; 025 import java.util.Comparator; 026 import java.util.HashMap; 027 import java.util.HashSet; 028 import java.util.Iterator; 029 import java.util.List; 030 import java.util.Map; 031 import java.util.Properties; 032 import java.util.Set; 033 import java.util.TreeSet; 034 035 import cascading.flow.FlowConnector; 036 import cascading.flow.FlowDef; 037 import cascading.flow.FlowElement; 038 import cascading.flow.hadoop.HadoopFlow; 039 import cascading.flow.hadoop.util.HadoopUtil; 040 import cascading.flow.planner.ElementGraph; 041 import cascading.flow.planner.ElementGraphs; 042 import cascading.flow.planner.FlowPlanner; 043 import cascading.flow.planner.FlowStepGraph; 044 import cascading.flow.planner.PlatformInfo; 045 import cascading.flow.planner.Scope; 046 import cascading.pipe.CoGroup; 047 import cascading.pipe.Every; 048 import cascading.pipe.Group; 049 import cascading.pipe.Pipe; 050 import cascading.property.AppProps; 051 import cascading.property.PropertyUtil; 052 import cascading.tap.Tap; 053 import cascading.tap.hadoop.Hfs; 054 import cascading.tap.hadoop.util.TempHfs; 055 import cascading.util.Util; 056 import org.apache.hadoop.mapred.JobConf; 057 import org.jgrapht.GraphPath; 058 import org.jgrapht.Graphs; 059 import org.slf4j.Logger; 060 import org.slf4j.LoggerFactory; 061 062 import static cascading.flow.planner.ElementGraphs.getAllShortestPathsBetween; 063 064 /** 065 * Class HadoopPlanner is the core Hadoop MapReduce planner. 066 * <p/> 067 * Notes: 068 * <p/> 069 * <strong>Custom JobConf properties</strong><br/> 070 * A custom JobConf instance can be passed to this planner by calling {@link #copyJobConf(java.util.Map, org.apache.hadoop.mapred.JobConf)} 071 * on a map properties object before constructing a new {@link cascading.flow.hadoop.HadoopFlowConnector}. 072 * <p/> 073 * A better practice would be to set Hadoop properties directly on the map properties object handed to the FlowConnector. 074 * All values in the map will be passed to a new default JobConf instance to be used as defaults for all resulting 075 * Flow instances. 076 * <p/> 077 * For example, {@code properties.set("mapred.child.java.opts","-Xmx512m");} would convince Hadoop 078 * to spawn all child jvms with a heap of 512MB. 079 */ 080 public class HadoopPlanner extends FlowPlanner<HadoopFlow, JobConf> 081 { 082 /** Field LOG */ 083 private static final Logger LOG = LoggerFactory.getLogger( HadoopPlanner.class ); 084 085 /** Field jobConf */ 086 private JobConf jobConf; 087 /** Field intermediateSchemeClass */ 088 private Class intermediateSchemeClass; 089 090 /** 091 * Method copyJobConf adds the given JobConf values to the given properties object. Use this method to pass 092 * custom default Hadoop JobConf properties to Hadoop. 093 * 094 * @param properties of type Map 095 * @param jobConf of type JobConf 096 */ 097 public static void copyJobConf( Map<Object, Object> properties, JobConf jobConf ) 098 { 099 for( Map.Entry<String, String> entry : jobConf ) 100 properties.put( entry.getKey(), entry.getValue() ); 101 } 102 103 /** 104 * Method createJobConf returns a new JobConf instance using the values in the given properties argument. 105 * 106 * @param properties of type Map 107 * @return a JobConf instance 108 */ 109 public static JobConf createJobConf( Map<Object, Object> properties ) 110 { 111 JobConf conf = new JobConf(); 112 113 copyProperties( conf, properties ); 114 115 return conf; 116 } 117 118 /** 119 * Method copyProperties adds the given Map values to the given JobConf object. 120 * 121 * @param jobConf of type JobConf 122 * @param properties of type Map 123 */ 124 public static void copyProperties( JobConf jobConf, Map<Object, Object> properties ) 125 { 126 if( properties instanceof Properties ) 127 { 128 Properties props = (Properties) properties; 129 Set<String> keys = props.stringPropertyNames(); 130 131 for( String key : keys ) 132 jobConf.set( key, props.getProperty( key ) ); 133 } 134 else 135 { 136 for( Map.Entry<Object, Object> entry : properties.entrySet() ) 137 { 138 if( entry.getValue() != null ) 139 jobConf.set( entry.getKey().toString(), entry.getValue().toString() ); 140 } 141 } 142 } 143 144 /** 145 * Method setNormalizeHeterogeneousSources adds the given doNormalize boolean to the given properties object. 146 * Use this method if additional jobs should be planned in to handle incompatible InputFormat classes. 147 * <p/> 148 * Normalization is off by default and should only be enabled by advanced users. Typically this will decrease 149 * application performance. 150 * 151 * @param properties of type Map 152 * @param doNormalize of type boolean 153 */ 154 @Deprecated 155 public static void setNormalizeHeterogeneousSources( Map<Object, Object> properties, boolean doNormalize ) 156 { 157 properties.put( "cascading.multimapreduceplanner.normalizesources", Boolean.toString( doNormalize ) ); 158 } 159 160 /** 161 * Method getNormalizeHeterogeneousSources returns if this planner will normalize heterogeneous input sources. 162 * 163 * @param properties of type Map 164 * @return a boolean 165 */ 166 @Deprecated 167 public static boolean getNormalizeHeterogeneousSources( Map<Object, Object> properties ) 168 { 169 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, "cascading.multimapreduceplanner.normalizesources", "false" ) ); 170 } 171 172 /** 173 * Method setCollapseAdjacentTaps enables/disables an optimization that will identify if a sink tap and an intermediate tap 174 * are equivalent field wise, and discard the intermediate tap for the sink tap to minimize the number of MR jobs. 175 * <p/> 176 * Note that some Scheme types may lose type information if the planner cannot detect field types. This could result 177 * in type mismatch errors during joins. 178 * 179 * @param properties 180 * @param collapseAdjacent 181 */ 182 public static void setCollapseAdjacentTaps( Map<Object, Object> properties, boolean collapseAdjacent ) 183 { 184 properties.put( "cascading.multimapreduceplanner.collapseadjacentaps", Boolean.toString( collapseAdjacent ) ); 185 } 186 187 public static boolean getCollapseAdjacentTaps( Map<Object, Object> properties ) 188 { 189 return Boolean.parseBoolean( PropertyUtil.getProperty( properties, "cascading.multimapreduceplanner.collapseadjacentaps", "true" ) ); 190 } 191 192 @Override 193 public JobConf getConfig() 194 { 195 return jobConf; 196 } 197 198 @Override 199 public PlatformInfo getPlatformInfo() 200 { 201 return HadoopUtil.getPlatformInfo(); 202 } 203 204 @Override 205 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 206 { 207 super.initialize( flowConnector, properties ); 208 209 jobConf = HadoopUtil.createJobConf( properties, createJobConf( properties ) ); 210 intermediateSchemeClass = flowConnector.getIntermediateSchemeClass( properties ); 211 212 Class type = AppProps.getApplicationJarClass( properties ); 213 if( jobConf.getJar() == null && type != null ) 214 jobConf.setJarByClass( type ); 215 216 String path = AppProps.getApplicationJarPath( properties ); 217 if( jobConf.getJar() == null && path != null ) 218 jobConf.setJar( path ); 219 220 if( jobConf.getJar() == null ) 221 jobConf.setJarByClass( HadoopUtil.findMainClass( HadoopPlanner.class ) ); 222 223 AppProps.setApplicationJarPath( properties, jobConf.getJar() ); 224 225 LOG.info( "using application jar: {}", jobConf.getJar() ); 226 } 227 228 @Override 229 protected HadoopFlow createFlow( FlowDef flowDef ) 230 { 231 return new HadoopFlow( getPlatformInfo(), getProperties(), getConfig(), flowDef ); 232 } 233 234 @Override 235 public HadoopFlow buildFlow( FlowDef flowDef ) 236 { 237 ElementGraph elementGraph = null; 238 239 try 240 { 241 // generic 242 verifyAllTaps( flowDef ); 243 244 HadoopFlow flow = createFlow( flowDef ); 245 246 Pipe[] tails = resolveTails( flowDef, flow ); 247 248 verifyAssembly( flowDef, tails ); 249 250 elementGraph = createElementGraph( flowDef, tails ); 251 252 // rules 253 failOnLoneGroupAssertion( elementGraph ); 254 failOnMissingGroup( elementGraph ); 255 failOnMisusedBuffer( elementGraph ); 256 failOnGroupEverySplit( elementGraph ); 257 258 // m/r specific 259 handleWarnEquivalentPaths( elementGraph ); 260 handleSplit( elementGraph ); 261 handleJobPartitioning( elementGraph ); 262 handleJoins( elementGraph ); 263 handleNonSafeOperations( elementGraph ); 264 265 if( getNormalizeHeterogeneousSources( properties ) ) 266 handleHeterogeneousSources( elementGraph ); 267 268 // generic 269 elementGraph.removeUnnecessaryPipes(); // groups must be added before removing pipes 270 elementGraph.resolveFields(); 271 272 elementGraph = flow.updateSchemes( elementGraph ); 273 274 // m/r specific 275 if( getCollapseAdjacentTaps( properties ) ) 276 handleAdjacentTaps( elementGraph ); 277 278 FlowStepGraph flowStepGraph = new HadoopStepGraph( flowDef.getName(), elementGraph ); 279 280 flow.initialize( elementGraph, flowStepGraph ); 281 282 return flow; 283 } 284 catch( Exception exception ) 285 { 286 throw handleExceptionDuringPlanning( exception, elementGraph ); 287 } 288 } 289 290 private void handleWarnEquivalentPaths( ElementGraph elementGraph ) 291 { 292 List<CoGroup> coGroups = elementGraph.findAllCoGroups(); 293 294 for( CoGroup coGroup : coGroups ) 295 { 296 List<GraphPath<FlowElement, Scope>> graphPaths = elementGraph.getAllShortestPathsTo( coGroup ); 297 298 List<List<FlowElement>> paths = ElementGraphs.asPathList( graphPaths ); 299 300 if( !areEquivalentPaths( elementGraph, paths ) ) 301 continue; 302 303 LOG.warn( "found equivalent paths from: {} to: {}", paths.get( 0 ).get( 1 ), coGroup ); 304 305 // in order to remove dupe paths, we need to verify there isn't any branching 306 } 307 } 308 309 private boolean areEquivalentPaths( ElementGraph elementGraph, List<List<FlowElement>> paths ) 310 { 311 int length = sameLength( paths ); 312 313 if( length == -1 ) 314 return false; 315 316 Set<FlowElement> elements = new TreeSet<FlowElement>( new EquivalenceComparator( elementGraph ) ); 317 318 for( int i = 0; i < length; i++ ) 319 { 320 elements.clear(); 321 322 for( List<FlowElement> path : paths ) 323 elements.add( path.get( i ) ); 324 325 if( elements.size() != 1 ) 326 return false; 327 } 328 329 return true; 330 } 331 332 private class EquivalenceComparator implements Comparator<FlowElement> 333 { 334 private final ElementGraph elementGraph; 335 336 public EquivalenceComparator( ElementGraph elementGraph ) 337 { 338 this.elementGraph = elementGraph; 339 } 340 341 @Override 342 public int compare( FlowElement lhs, FlowElement rhs ) 343 { 344 boolean areEquivalent = lhs.isEquivalentTo( rhs ); 345 boolean sameIncoming = elementGraph.inDegreeOf( lhs ) == elementGraph.inDegreeOf( rhs ); 346 boolean sameOutgoing = elementGraph.outDegreeOf( lhs ) == elementGraph.outDegreeOf( rhs ); 347 348 if( areEquivalent && sameIncoming && sameOutgoing ) 349 return 0; 350 351 return System.identityHashCode( lhs ) - System.identityHashCode( rhs ); 352 } 353 } 354 355 private int sameLength( List<List<FlowElement>> paths ) 356 { 357 int lastSize = paths.get( 0 ).size(); 358 359 for( int i = 1; i < paths.size(); i++ ) 360 { 361 if( paths.get( i ).size() != lastSize ) 362 return -1; 363 } 364 365 return lastSize; 366 } 367 368 /** 369 * optimized for this case 370 * <pre> 371 * e - t e1 - e - t 372 * t - e1 - -- > t - 373 * e - t e1 - e - t 374 * </pre> 375 * <p/> 376 * this should run in two map/red jobs, not 3. needs to be a flag on e1 to prevent this 377 * <p/> 378 * <pre> 379 * g - t g - t 380 * g - e - --> g - e - t - 381 * g - t g - t 382 * </pre> 383 * <p/> 384 * <pre> 385 * - e - e e - e 386 * t - e1 - e2 - g --> t - e1 - e2 - t - - g 387 * - e - e e - e 388 * </pre> 389 * 390 * @param elementGraph 391 */ 392 private void handleSplit( ElementGraph elementGraph ) 393 { 394 // if there was a graph change, iterate paths again. 395 while( !internalSplit( elementGraph ) ) 396 ; 397 } 398 399 private boolean internalSplit( ElementGraph elementGraph ) 400 { 401 List<GraphPath<FlowElement, Scope>> paths = elementGraph.getAllShortestPathsBetweenExtents(); 402 403 for( GraphPath<FlowElement, Scope> path : paths ) 404 { 405 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); 406 Set<Pipe> tapInsertions = new HashSet<Pipe>(); 407 FlowElement lastInsertable = null; 408 409 for( int i = 0; i < flowElements.size(); i++ ) 410 { 411 FlowElement flowElement = flowElements.get( i ); 412 413 if( flowElement instanceof ElementGraph.Extent ) // is an extent: head or tail 414 continue; 415 416 // if Tap, Group, or Every - we insert the tap here 417 if( flowElement instanceof Tap || flowElement instanceof Group || flowElement instanceof Every ) 418 lastInsertable = flowElement; 419 420 // support splits on Pipe unless the previous is a Tap 421 if( flowElement.getClass() == Pipe.class && flowElements.get( i - 1 ) instanceof Tap ) 422 continue; 423 424 if( flowElement instanceof Tap ) 425 continue; 426 427 if( elementGraph.outDegreeOf( flowElement ) <= 1 ) 428 continue; 429 430 // we are at the root of a split here 431 432 // do any split paths converge on a single Group? 433 int maxPaths = elementGraph.getMaxNumPathsBetweenElementAndGroupingMergeJoin( flowElement ); 434 if( maxPaths <= 1 && lastInsertable instanceof Tap ) 435 continue; 436 437 tapInsertions.add( (Pipe) flowElement ); 438 } 439 440 for( Pipe pipe : tapInsertions ) 441 insertTempTapAfter( elementGraph, pipe ); 442 443 if( !tapInsertions.isEmpty() ) 444 return false; 445 } 446 447 return true; 448 } 449 450 /** 451 * will collapse adjacent and equivalent taps. 452 * equivalence is based on the tap adjacent taps using the same filesystem 453 * and the sink being symmetrical, and having the same fields as the temp tap. 454 * <p/> 455 * <p/> 456 * must be run after fields are resolved so temp taps have fully defined scheme instances. 457 * 458 * @param elementGraph 459 */ 460 private void handleAdjacentTaps( ElementGraph elementGraph ) 461 { 462 // if there was a graph change, iterate paths again. 463 while( !internalAdjacentTaps( elementGraph ) ) 464 ; 465 } 466 467 private boolean internalAdjacentTaps( ElementGraph elementGraph ) 468 { 469 List<Tap> taps = elementGraph.findAllTaps(); 470 471 for( Tap tap : taps ) 472 { 473 if( !( tap.isTemporary() ) ) 474 continue; 475 476 for( FlowElement successor : elementGraph.getAllSuccessors( tap ) ) 477 { 478 if( !( successor instanceof Hfs ) ) 479 continue; 480 481 Hfs successorTap = (Hfs) successor; 482 483 // does this scheme source what it sinks 484 if( !successorTap.getScheme().isSymmetrical() ) 485 continue; 486 487 URI tempURIScheme = getDefaultURIScheme( tap ); // temp uses default fs 488 URI successorURIScheme = getURIScheme( successorTap ); 489 490 if( !tempURIScheme.equals( successorURIScheme ) ) 491 continue; 492 493 // safe, both are symmetrical 494 // should be called after fields are resolved 495 if( !tap.getSourceFields().equals( successorTap.getSourceFields() ) ) 496 continue; 497 498 elementGraph.replaceElementWith( tap, successor ); 499 500 return false; 501 } 502 } 503 504 return true; 505 } 506 507 private URI getDefaultURIScheme( Tap tap ) 508 { 509 return ( (Hfs) tap ).getDefaultFileSystemURIScheme( jobConf ); 510 } 511 512 private URI getURIScheme( Tap tap ) 513 { 514 return ( (Hfs) tap ).getURIScheme( jobConf ); 515 } 516 517 private void handleHeterogeneousSources( ElementGraph elementGraph ) 518 { 519 while( !internalHeterogeneousSources( elementGraph ) ) 520 ; 521 } 522 523 private boolean internalHeterogeneousSources( ElementGraph elementGraph ) 524 { 525 // find all Groups 526 List<Group> groups = elementGraph.findAllMergeJoinGroups(); 527 528 // compare group sources 529 Map<Group, Set<Tap>> normalizeGroups = new HashMap<Group, Set<Tap>>(); 530 531 for( Group group : groups ) 532 { 533 Set<Tap> taps = new HashSet<Tap>(); 534 535 // iterate each shortest path to current group finding each tap sourcing the merge/join 536 for( GraphPath<FlowElement, Scope> path : elementGraph.getAllShortestPathsTo( group ) ) 537 { 538 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // last element is group 539 Collections.reverse( flowElements ); // first element is group 540 541 for( FlowElement previousElement : flowElements ) 542 { 543 if( previousElement instanceof Tap ) 544 { 545 taps.add( (Tap) previousElement ); 546 break; // stop finding taps in this path 547 } 548 } 549 } 550 551 if( taps.size() == 1 ) 552 continue; 553 554 Iterator<Tap> iterator = taps.iterator(); 555 Tap commonTap = iterator.next(); 556 557 while( iterator.hasNext() ) 558 { 559 Tap tap = iterator.next(); 560 561 // making assumption hadoop can handle multiple filesytems, but not multiple inputformats 562 // in the same job 563 // possibly could test for common input format 564 if( getSchemeClass( tap ) != getSchemeClass( commonTap ) ) 565 { 566 normalizeGroups.put( group, taps ); 567 break; 568 } 569 } 570 } 571 572 // if incompatible, insert Tap after its join/merge pipe 573 for( Group group : normalizeGroups.keySet() ) 574 { 575 Set<Tap> taps = normalizeGroups.get( group ); 576 577 for( Tap tap : taps ) 578 { 579 if( tap.isTemporary() || getSchemeClass( tap ).equals( intermediateSchemeClass ) ) // we normalize to TempHfs 580 continue; 581 582 // handle case where there is a split on a pipe between the tap and group 583 for( GraphPath<FlowElement, Scope> path : getAllShortestPathsBetween( elementGraph, tap, group ) ) 584 { 585 List<FlowElement> flowElements = Graphs.getPathVertexList( path ); // shortest path tap -> group 586 Collections.reverse( flowElements ); // group -> tap 587 588 FlowElement flowElement = flowElements.get( 1 ); 589 590 if( flowElement instanceof Tap && ( (Tap) flowElement ).isTemporary() ) 591 continue; 592 593 LOG.warn( "inserting step to normalize incompatible sources: {}", tap ); 594 595 insertTempTapAfter( elementGraph, (Pipe) flowElement ); 596 597 return false; 598 } 599 } 600 } 601 602 return normalizeGroups.isEmpty(); 603 } 604 605 @Override 606 protected Tap makeTempTap( String prefix, String name ) 607 { 608 // must give Taps unique names 609 return new TempHfs( jobConf, Util.makePath( prefix, name ), intermediateSchemeClass, prefix == null ); 610 } 611 612 private Class getSchemeClass( Tap tap ) 613 { 614 if( tap.isTemporary() ) 615 return ( (TempHfs) tap ).getSchemeClass(); 616 else 617 return tap.getScheme().getClass(); 618 } 619 }