001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.util; 022 023import java.io.IOException; 024import java.io.UnsupportedEncodingException; 025import java.lang.reflect.Constructor; 026import java.lang.reflect.Field; 027import java.lang.reflect.InvocationTargetException; 028import java.net.URI; 029import java.net.URL; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Properties; 038import java.util.Set; 039import java.util.jar.Attributes; 040import java.util.jar.Manifest; 041 042import cascading.CascadingException; 043import cascading.flow.FlowException; 044import cascading.flow.planner.BaseFlowStep; 045import cascading.flow.planner.PlatformInfo; 046import cascading.flow.planner.Scope; 047import cascading.pipe.Group; 048import cascading.scheme.hadoop.TextLine; 049import cascading.tap.hadoop.Hfs; 050import cascading.tuple.Fields; 051import cascading.util.LogUtil; 052import cascading.util.Util; 053import org.apache.commons.codec.binary.Base64; 054import org.apache.hadoop.conf.Configurable; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.filecache.DistributedCache; 057import org.apache.hadoop.fs.FileStatus; 058import org.apache.hadoop.fs.FileSystem; 059import org.apache.hadoop.fs.LocalFileSystem; 060import org.apache.hadoop.fs.Path; 061import org.apache.hadoop.mapred.JobConf; 062import org.apache.hadoop.util.StringUtils; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import static cascading.util.Util.invokeInstanceMethod; 067 068/** 069 * 070 */ 071public class HadoopUtil 072 { 073 public static final String CASCADING_FLOW_EXECUTING = "cascading.flow.executing"; 074 075 private static final Logger LOG = LoggerFactory.getLogger( HadoopUtil.class ); 076 private static final String ENCODING = "US-ASCII"; 077 private static final Class<?> DEFAULT_OBJECT_SERIALIZER = JavaObjectSerializer.class; 078 079 private static PlatformInfo platformInfo; 080 081 public static void setIsInflow( Configuration conf ) 082 { 083 conf.setBoolean( CASCADING_FLOW_EXECUTING, true ); 084 } 085 086 public static boolean isInflow( Configuration conf ) 087 { 088 return conf.getBoolean( CASCADING_FLOW_EXECUTING, false ); 089 } 090 091 public static void initLog4j( JobConf configuration ) 092 { 093 initLog4j( (Configuration) configuration ); 094 } 095 096 public static void initLog4j( Configuration configuration ) 097 { 098 String values = configuration.get( "log4j.logger", null ); 099 100 if( values == null || values.length() == 0 ) 101 return; 102 103 if( !Util.hasClass( "org.apache.log4j.Logger" ) ) 104 { 105 LOG.info( "org.apache.log4j.Logger is not in the current CLASSPATH, not setting log4j.logger properties" ); 106 return; 107 } 108 109 String[] elements = values.split( "," ); 110 111 for( String element : elements ) 112 LogUtil.setLog4jLevel( element.split( "=" ) ); 113 } 114 115 // only place JobConf should ever be returned 116 public static JobConf asJobConfInstance( Configuration configuration ) 117 { 118 if( configuration instanceof JobConf ) 119 return (JobConf) configuration; 120 121 return new JobConf( configuration ); 122 } 123 124 public static <C> C copyJobConf( C parentJobConf ) 125 { 126 return copyConfiguration( parentJobConf ); 127 } 128 129 public static JobConf copyJobConf( JobConf parentJobConf ) 130 { 131 if( parentJobConf == null ) 132 throw new IllegalArgumentException( "parent may not be null" ); 133 134 // see https://github.com/Cascading/cascading/pull/21 135 // The JobConf(JobConf) constructor causes derived JobConfs to share Credentials. We want to avoid this, in 136 // case those Credentials are mutated later on down the road (which they will be, during job submission, in 137 // separate threads!). Using the JobConf(Configuration) constructor avoids Credentials-sharing. 138 final Configuration configurationCopy = new Configuration( parentJobConf ); 139 final JobConf jobConf = new JobConf( configurationCopy ); 140 141 jobConf.getCredentials().addAll( parentJobConf.getCredentials() ); 142 143 return jobConf; 144 } 145 146 public static JobConf createJobConf( Map<Object, Object> properties, JobConf defaultJobconf ) 147 { 148 JobConf jobConf = defaultJobconf == null ? new JobConf() : copyJobConf( defaultJobconf ); 149 150 if( properties == null ) 151 return jobConf; 152 153 return copyConfiguration( properties, jobConf ); 154 } 155 156 public static <C> C copyConfiguration( C parent ) 157 { 158 if( parent == null ) 159 throw new IllegalArgumentException( "parent may not be null" ); 160 161 if( !( parent instanceof Configuration ) ) 162 throw new IllegalArgumentException( "parent must be of type Configuration" ); 163 164 Configuration conf = (Configuration) parent; 165 166 // see https://github.com/Cascading/cascading/pull/21 167 // The JobConf(JobConf) constructor causes derived JobConfs to share Credentials. We want to avoid this, in 168 // case those Credentials are mutated later on down the road (which they will be, during job submission, in 169 // separate threads!). Using the JobConf(Configuration) constructor avoids Credentials-sharing. 170 Configuration configurationCopy = new Configuration( conf ); 171 172 Configuration copiedConf = callCopyConstructor( parent.getClass(), configurationCopy ); 173 174 if( Util.hasInstanceMethod( parent, "getCredentials", null ) ) 175 { 176 Object result = invokeInstanceMethod( parent, "getCredentials", null, null ); 177 Object credentials = invokeInstanceMethod( copiedConf, "getCredentials", null, null ); 178 179 invokeInstanceMethod( credentials, "addAll", new Object[]{result}, new Class[]{credentials.getClass()} ); 180 } 181 182 return (C) copiedConf; 183 } 184 185 protected static <C extends Configuration> C callCopyConstructor( Class type, Configuration parent ) 186 { 187 try 188 { 189 Constructor<C> constructor = type.getConstructor( parent.getClass() ); 190 191 return constructor.newInstance( parent ); 192 } 193 catch( NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException exception ) 194 { 195 throw new CascadingException( "unable to create copy of: " + type ); 196 } 197 } 198 199 public static <C extends Configuration> C copyConfiguration( Map<Object, Object> srcProperties, C dstConfiguration ) 200 { 201 Set<Object> keys = new HashSet<Object>( srcProperties.keySet() ); 202 203 // keys will only be grabbed if both key/value are String, so keep orig keys 204 if( srcProperties instanceof Properties ) 205 keys.addAll( ( (Properties) srcProperties ).stringPropertyNames() ); 206 207 for( Object key : keys ) 208 { 209 Object value = srcProperties.get( key ); 210 211 if( value == null && srcProperties instanceof Properties && key instanceof String ) 212 value = ( (Properties) srcProperties ).getProperty( (String) key ); 213 214 if( value == null ) // don't stuff null values 215 continue; 216 217 // don't let these objects pass, even though toString is called below. 218 if( value instanceof Class || value instanceof JobConf ) 219 continue; 220 221 dstConfiguration.set( key.toString(), value.toString() ); 222 } 223 224 return dstConfiguration; 225 } 226 227 public static Map<Object, Object> createProperties( Configuration jobConf ) 228 { 229 Map<Object, Object> properties = new HashMap<Object, Object>(); 230 231 if( jobConf == null ) 232 return properties; 233 234 for( Map.Entry<String, String> entry : jobConf ) 235 properties.put( entry.getKey(), entry.getValue() ); 236 237 return properties; 238 } 239 240 public static Thread getHDFSShutdownHook() 241 { 242 Exception caughtException; 243 244 try 245 { 246 // we must init the FS so the finalizer is registered 247 FileSystem.getLocal( new JobConf() ); 248 249 Field field = FileSystem.class.getDeclaredField( "clientFinalizer" ); 250 field.setAccessible( true ); 251 252 Thread finalizer = (Thread) field.get( null ); 253 254 if( finalizer != null ) 255 Runtime.getRuntime().removeShutdownHook( finalizer ); 256 257 return finalizer; 258 } 259 catch( NoSuchFieldException exception ) 260 { 261 caughtException = exception; 262 } 263 catch( IllegalAccessException exception ) 264 { 265 caughtException = exception; 266 } 267 catch( IOException exception ) 268 { 269 caughtException = exception; 270 } 271 272 LOG.debug( "unable to find and remove client hdfs shutdown hook, received exception: {}", caughtException.getClass().getName() ); 273 274 return null; 275 } 276 277 public static String encodeBytes( byte[] bytes ) 278 { 279 try 280 { 281 return new String( Base64.encodeBase64( bytes ), ENCODING ); 282 } 283 catch( UnsupportedEncodingException exception ) 284 { 285 throw new RuntimeException( exception ); 286 } 287 } 288 289 public static byte[] decodeBytes( String string ) 290 { 291 try 292 { 293 byte[] bytes = string.getBytes( ENCODING ); 294 return Base64.decodeBase64( bytes ); 295 } 296 catch( UnsupportedEncodingException exception ) 297 { 298 throw new RuntimeException( exception ); 299 } 300 } 301 302 public static <T> ObjectSerializer instantiateSerializer( Configuration conf, Class<T> type ) throws ClassNotFoundException 303 { 304 Class<ObjectSerializer> flowSerializerClass; 305 306 String serializerClassName = conf.get( ObjectSerializer.OBJECT_SERIALIZER_PROPERTY ); 307 308 if( serializerClassName == null || serializerClassName.length() == 0 ) 309 flowSerializerClass = (Class<ObjectSerializer>) DEFAULT_OBJECT_SERIALIZER; 310 else 311 flowSerializerClass = (Class<ObjectSerializer>) Class.forName( serializerClassName ); 312 313 ObjectSerializer objectSerializer; 314 315 try 316 { 317 objectSerializer = flowSerializerClass.newInstance(); 318 319 if( objectSerializer instanceof Configurable ) 320 ( (Configurable) objectSerializer ).setConf( conf ); 321 } 322 catch( Exception exception ) 323 { 324 exception.printStackTrace(); 325 throw new IllegalArgumentException( "Unable to instantiate serializer \"" 326 + flowSerializerClass.getName() 327 + "\" for class: " 328 + type.getName() ); 329 } 330 331 if( !objectSerializer.accepts( type ) ) 332 throw new IllegalArgumentException( serializerClassName + " won't accept objects of class " + type.toString() ); 333 334 return objectSerializer; 335 } 336 337 public static <T> String serializeBase64( T object, Configuration conf ) throws IOException 338 { 339 return serializeBase64( object, conf, true ); 340 } 341 342 public static <T> String serializeBase64( T object, Configuration conf, boolean compress ) throws IOException 343 { 344 ObjectSerializer objectSerializer; 345 346 try 347 { 348 objectSerializer = instantiateSerializer( conf, object.getClass() ); 349 } 350 catch( ClassNotFoundException exception ) 351 { 352 throw new IOException( exception ); 353 } 354 355 return encodeBytes( objectSerializer.serialize( object, compress ) ); 356 } 357 358 /** 359 * This method deserializes the Base64 encoded String into an Object instance. 360 * 361 * @param string 362 * @return an Object 363 */ 364 public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type ) throws IOException 365 { 366 return deserializeBase64( string, conf, type, true ); 367 } 368 369 public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type, boolean decompress ) throws IOException 370 { 371 if( string == null || string.length() == 0 ) 372 return null; 373 374 ObjectSerializer objectSerializer; 375 376 try 377 { 378 objectSerializer = instantiateSerializer( conf, type ); 379 } 380 catch( ClassNotFoundException exception ) 381 { 382 throw new IOException( exception ); 383 } 384 385 return objectSerializer.deserialize( decodeBytes( string ), type, decompress ); 386 } 387 388 public static Class findMainClass( Class defaultType ) 389 { 390 return Util.findMainClass( defaultType, "org.apache.hadoop" ); 391 } 392 393 public static Map<String, String> getConfig( Configuration defaultConf, Configuration updatedConf ) 394 { 395 Map<String, String> configs = new HashMap<String, String>(); 396 397 for( Map.Entry<String, String> entry : updatedConf ) 398 configs.put( entry.getKey(), entry.getValue() ); 399 400 for( Map.Entry<String, String> entry : defaultConf ) 401 { 402 if( entry.getValue() == null ) 403 continue; 404 405 String updatedValue = configs.get( entry.getKey() ); 406 407 // if both null, lets purge from map to save space 408 if( updatedValue == null && entry.getValue() == null ) 409 configs.remove( entry.getKey() ); 410 411 // if the values are the same, lets also purge from map to save space 412 if( updatedValue != null && updatedValue.equals( entry.getValue() ) ) 413 configs.remove( entry.getKey() ); 414 415 configs.remove( "mapred.working.dir" ); 416 configs.remove( "mapreduce.job.working.dir" ); // hadoop2 417 } 418 419 return configs; 420 } 421 422 public static JobConf[] getJobConfs( Configuration job, List<Map<String, String>> configs ) 423 { 424 JobConf[] jobConfs = new JobConf[ configs.size() ]; 425 426 for( int i = 0; i < jobConfs.length; i++ ) 427 jobConfs[ i ] = (JobConf) mergeConf( job, configs.get( i ), false ); 428 429 return jobConfs; 430 } 431 432 public static <J extends Configuration> J mergeConf( J job, Map<String, String> config, boolean directly ) 433 { 434 Configuration currentConf = directly ? job : ( job instanceof JobConf ? copyJobConf( (JobConf) job ) : new Configuration( job ) ); 435 436 for( String key : config.keySet() ) 437 { 438 LOG.debug( "merging key: {} value: {}", key, config.get( key ) ); 439 440 currentConf.set( key, config.get( key ) ); 441 } 442 443 return (J) currentConf; 444 } 445 446 public static Configuration removePropertiesFrom( Configuration jobConf, String... keys ) 447 { 448 Map<Object, Object> properties = createProperties( jobConf ); 449 450 for( String key : keys ) 451 properties.remove( key ); 452 453 return copyConfiguration( properties, new JobConf() ); 454 } 455 456 public static boolean removeStateFromDistCache( Configuration conf, String path ) throws IOException 457 { 458 return new Hfs( new TextLine(), path ).deleteResource( conf ); 459 } 460 461 public static PlatformInfo getPlatformInfo() 462 { 463 if( platformInfo == null ) 464 platformInfo = getPlatformInfoInternal( JobConf.class, "org/apache/hadoop", "Hadoop" ); 465 466 return platformInfo; 467 } 468 469 public static PlatformInfo getPlatformInfo( Class type, String attributePath, String platformName ) 470 { 471 if( platformInfo == null ) 472 platformInfo = getPlatformInfoInternal( type, attributePath, platformName ); 473 474 return platformInfo; 475 } 476 477 private static PlatformInfo getPlatformInfoInternal( Class type, String attributePath, String platformName ) 478 { 479 URL url = type.getResource( type.getSimpleName() + ".class" ); 480 481 if( url == null || !url.toString().startsWith( "jar" ) ) 482 return new PlatformInfo( platformName, null, null ); 483 484 String path = url.toString(); 485 path = path.substring( 0, path.lastIndexOf( "!" ) + 1 ); 486 487 String manifestPath = path + "/META-INF/MANIFEST.MF"; 488 String parsedVersion = Util.findVersion( path.substring( 0, path.length() - 1 ) ); 489 490 Manifest manifest; 491 492 try 493 { 494 manifest = new Manifest( new URL( manifestPath ).openStream() ); 495 } 496 catch( IOException exception ) 497 { 498 LOG.warn( "unable to get manifest from {}: {}", manifestPath, exception.getMessage() ); 499 500 return new PlatformInfo( "Hadoop", null, parsedVersion ); 501 } 502 503 Attributes attributes = manifest.getAttributes( attributePath ); 504 505 if( attributes == null ) 506 attributes = manifest.getMainAttributes(); 507 508 if( attributes == null ) 509 { 510 LOG.debug( "unable to get Hadoop manifest attributes" ); 511 return new PlatformInfo( platformName, null, parsedVersion ); 512 } 513 514 String vendor = attributes.getValue( "Implementation-Vendor" ); 515 String version = attributes.getValue( "Implementation-Version" ); 516 517 if( Util.isEmpty( version ) ) 518 version = parsedVersion; 519 520 return new PlatformInfo( platformName, vendor, version ); 521 } 522 523 /** 524 * Add to class path. 525 * 526 * @param config the config 527 * @param classpath the classpath 528 */ 529 public static Map<Path, Path> addToClassPath( Configuration config, List<String> classpath ) 530 { 531 if( classpath == null ) 532 return null; 533 534 // given to fully qualified 535 Map<String, Path> localPaths = new HashMap<String, Path>(); 536 Map<String, Path> remotePaths = new HashMap<String, Path>(); 537 538 resolvePaths( config, classpath, null, null, localPaths, remotePaths ); 539 540 try 541 { 542 LocalFileSystem localFS = getLocalFS( config ); 543 544 for( String path : localPaths.keySet() ) 545 { 546 // only add local if no remote 547 if( remotePaths.containsKey( path ) ) 548 continue; 549 550 Path artifact = localPaths.get( path ); 551 552 DistributedCache.addFileToClassPath( artifact.makeQualified( localFS ), config ); 553 } 554 555 FileSystem defaultFS = getDefaultFS( config ); 556 557 for( String path : remotePaths.keySet() ) 558 { 559 // always add remote 560 Path artifact = remotePaths.get( path ); 561 562 DistributedCache.addFileToClassPath( artifact.makeQualified( defaultFS ), config ); 563 } 564 } 565 catch( IOException exception ) 566 { 567 throw new FlowException( "unable to set distributed cache paths", exception ); 568 } 569 570 return getCommonPaths( localPaths, remotePaths ); 571 } 572 573 /** 574 * Copies paths from one local path to a remote path. If syncTimes is true, both modification and access time are 575 * changed to match the local 'from' path. 576 * <p/> 577 * Returns a map of file-name to remote modification times if the remote time is different than the local time. 578 * 579 * @param config 580 * @param commonPaths 581 * @param syncTimes 582 */ 583 public static Map<String, Long> syncPaths( Configuration config, Map<Path, Path> commonPaths, boolean syncTimes ) 584 { 585 if( commonPaths == null ) 586 return Collections.emptyMap(); 587 588 Map<String, Long> timestampMap = new HashMap<>(); 589 590 Map<Path, Path> copyPaths = getCopyPaths( config, commonPaths ); // tests remote file existence or if stale 591 592 LocalFileSystem localFS = getLocalFS( config ); 593 FileSystem remoteFS = getDefaultFS( config ); 594 595 for( Map.Entry<Path, Path> entry : copyPaths.entrySet() ) 596 { 597 Path localPath = entry.getKey(); 598 Path remotePath = entry.getValue(); 599 600 try 601 { 602 LOG.info( "copying from: {}, to: {}", localPath, remotePath ); 603 remoteFS.copyFromLocalFile( localPath, remotePath ); 604 605 if( !syncTimes ) 606 { 607 timestampMap.put( remotePath.getName(), remoteFS.getFileStatus( remotePath ).getModificationTime() ); 608 continue; 609 } 610 } 611 catch( IOException exception ) 612 { 613 throw new FlowException( "unable to copy local: " + localPath + " to remote: " + remotePath, exception ); 614 } 615 616 FileStatus localFileStatus = null; 617 618 try 619 { 620 // sync the modified times so we can lazily upload jars to hdfs after job is started 621 // otherwise modified time will be local to hdfs 622 localFileStatus = localFS.getFileStatus( localPath ); 623 remoteFS.setTimes( remotePath, localFileStatus.getModificationTime(), -1 ); // don't set the access time 624 } 625 catch( IOException exception ) 626 { 627 LOG.info( "unable to set local modification time on remote file: {}, 'dfs.namenode.accesstime.precision' may be set to 0 on HDFS.", remotePath ); 628 629 if( localFileStatus != null ) 630 timestampMap.put( remotePath.getName(), localFileStatus.getModificationTime() ); 631 } 632 } 633 634 return timestampMap; 635 } 636 637 public static Map<Path, Path> getCommonPaths( Map<String, Path> localPaths, Map<String, Path> remotePaths ) 638 { 639 Map<Path, Path> commonPaths = new HashMap<Path, Path>(); 640 641 for( Map.Entry<String, Path> entry : localPaths.entrySet() ) 642 { 643 if( remotePaths.containsKey( entry.getKey() ) ) 644 commonPaths.put( entry.getValue(), remotePaths.get( entry.getKey() ) ); 645 } 646 647 return commonPaths; 648 } 649 650 private static Map<Path, Path> getCopyPaths( Configuration config, Map<Path, Path> commonPaths ) 651 { 652 Map<Path, Path> copyPaths = new HashMap<Path, Path>(); 653 654 FileSystem remoteFS = getDefaultFS( config ); 655 FileSystem localFS = getLocalFS( config ); 656 657 for( Map.Entry<Path, Path> entry : commonPaths.entrySet() ) 658 { 659 Path localPath = entry.getKey(); 660 Path remotePath = entry.getValue(); 661 662 try 663 { 664 boolean localExists = localFS.exists( localPath ); 665 boolean remoteExist = remoteFS.exists( remotePath ); 666 667 if( localExists && !remoteExist ) 668 { 669 copyPaths.put( localPath, remotePath ); 670 } 671 else if( localExists ) 672 { 673 long localModTime = localFS.getFileStatus( localPath ).getModificationTime(); 674 long remoteModTime = remoteFS.getFileStatus( remotePath ).getModificationTime(); 675 676 if( localModTime > remoteModTime ) 677 copyPaths.put( localPath, remotePath ); 678 } 679 } 680 catch( IOException exception ) 681 { 682 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 683 } 684 } 685 686 return copyPaths; 687 } 688 689 public static void resolvePaths( Configuration config, Collection<String> classpath, String remoteRoot, String resourceSubPath, Map<String, Path> localPaths, Map<String, Path> remotePaths ) 690 { 691 FileSystem defaultFS = getDefaultFS( config ); 692 FileSystem localFS = getLocalFS( config ); 693 694 Path remoteRootPath = new Path( remoteRoot == null ? "./.staging" : remoteRoot ); 695 696 if( resourceSubPath != null ) 697 remoteRootPath = new Path( remoteRootPath, resourceSubPath ); 698 699 remoteRootPath = defaultFS.makeQualified( remoteRootPath ); 700 701 boolean defaultIsLocal = defaultFS.equals( localFS ); 702 703 for( String stringPath : classpath ) 704 { 705 Path path = new Path( stringPath ); 706 707 URI uri = path.toUri(); 708 709 if( uri.getScheme() == null && !defaultIsLocal ) // we want to sync 710 { 711 Path localPath = localFS.makeQualified( path ); 712 713 if( !exists( localFS, localPath ) ) 714 throw new FlowException( "path not found: " + localPath ); 715 716 String name = localPath.getName(); 717 718 if( resourceSubPath != null ) 719 name = resourceSubPath + "/" + name; 720 721 localPaths.put( name, localPath ); 722 remotePaths.put( name, defaultFS.makeQualified( new Path( remoteRootPath, path.getName() ) ) ); 723 } 724 else if( localFS.equals( getFileSystem( config, path ) ) ) 725 { 726 if( !exists( localFS, path ) ) 727 throw new FlowException( "path not found: " + path ); 728 729 Path localPath = localFS.makeQualified( path ); 730 731 String name = localPath.getName(); 732 733 if( resourceSubPath != null ) 734 name = resourceSubPath + "/" + name; 735 736 localPaths.put( name, localPath ); 737 } 738 else 739 { 740 if( !exists( defaultFS, path ) ) 741 throw new FlowException( "path not found: " + path ); 742 743 Path defaultPath = defaultFS.makeQualified( path ); 744 745 String name = defaultPath.getName(); 746 747 if( resourceSubPath != null ) 748 name = resourceSubPath + "/" + name; 749 750 remotePaths.put( name, defaultPath ); 751 } 752 } 753 } 754 755 private static boolean exists( FileSystem fileSystem, Path path ) 756 { 757 try 758 { 759 return fileSystem.exists( path ); 760 } 761 catch( IOException exception ) 762 { 763 throw new FlowException( "could not test file exists: " + path ); 764 } 765 } 766 767 private static FileSystem getFileSystem( Configuration config, Path path ) 768 { 769 try 770 { 771 return path.getFileSystem( config ); 772 } 773 catch( IOException exception ) 774 { 775 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 776 } 777 } 778 779 public static LocalFileSystem getLocalFS( Configuration config ) 780 { 781 try 782 { 783 return FileSystem.getLocal( config ); 784 } 785 catch( IOException exception ) 786 { 787 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 788 } 789 } 790 791 public static FileSystem getDefaultFS( Configuration config ) 792 { 793 try 794 { 795 return FileSystem.get( config ); 796 } 797 catch( IOException exception ) 798 { 799 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 800 } 801 } 802 803 public static boolean isLocal( Configuration conf ) 804 { 805 // hadoop 1.0 and 2.0 use different properties to define local mode: we check the new YARN 806 // property first 807 String frameworkName = conf.get( "mapreduce.framework.name" ); 808 809 // we are running on hadoop 2.0 (YARN) 810 if( frameworkName != null ) 811 return frameworkName.equals( "local" ); 812 813 // for Tez 814 String tezLocal = conf.get( "tez.local.mode" ); 815 816 if( tezLocal != null ) 817 return tezLocal.equals( "true" ); 818 819 // hadoop 1.0: use the old property to determine the local mode 820 String hadoop1 = conf.get( "mapred.job.tracker" ); 821 822 if( hadoop1 == null ) 823 { 824 LOG.warn( "could not successfully test if Hadoop based platform is in standalone/local mode, no valid properties set, returning false - tests for: mapreduce.framework.name, tez.local.mode, and mapred.job.tracker" ); 825 return false; 826 } 827 828 return hadoop1.equals( "local" ); 829 } 830 831 public static boolean isYARN( Configuration conf ) 832 { 833 return conf.get( "mapreduce.framework.name" ) != null; 834 } 835 836 public static void setLocal( Configuration conf ) 837 { 838 // set both properties to local 839 conf.set( "mapred.job.tracker", "local" ); 840 841 // yarn 842 conf.set( "mapreduce.framework.name", "local" ); 843 844 // tez 845 conf.set( "tez.local.mode", "true" ); 846 conf.set( "tez.runtime.optimize.local.fetch", "true" ); 847 } 848 849 public static boolean setNewApi( Configuration conf, String className ) 850 { 851 if( className == null ) // silently return and let the error be caught downstream 852 return false; 853 854 boolean isStable = className.startsWith( "org.apache.hadoop.mapred." ); 855 boolean isNew = className.startsWith( "org.apache.hadoop.mapreduce." ); 856 857 if( isStable ) 858 conf.setBoolean( "mapred.mapper.new-api", false ); 859 else if( isNew ) 860 conf.setBoolean( "mapred.mapper.new-api", true ); 861 else 862 throw new IllegalStateException( "cannot determine if class denotes stable or new api, please set 'mapred.mapper.new-api' to the appropriate value" ); 863 864 return true; 865 } 866 867 public static void addInputPath( Configuration conf, Path path ) 868 { 869 Path workingDirectory = getWorkingDirectory( conf ); 870 path = new Path( workingDirectory, path ); 871 String dirStr = StringUtils.escapeString( path.toString() ); 872 String dirs = conf.get( "mapred.input.dir" ); 873 conf.set( "mapred.input.dir", dirs == null ? dirStr : 874 dirs + StringUtils.COMMA_STR + dirStr ); 875 } 876 877 public static void setOutputPath( Configuration conf, Path path ) 878 { 879 Path workingDirectory = getWorkingDirectory( conf ); 880 path = new Path( workingDirectory, path ); 881 conf.set( "mapred.output.dir", path.toString() ); 882 } 883 884 private static Path getWorkingDirectory( Configuration conf ) 885 { 886 String name = conf.get( "mapred.working.dir" ); 887 if( name != null ) 888 { 889 return new Path( name ); 890 } 891 else 892 { 893 try 894 { 895 Path dir = FileSystem.get( conf ).getWorkingDirectory(); 896 conf.set( "mapred.working.dir", dir.toString() ); 897 return dir; 898 } 899 catch( IOException e ) 900 { 901 throw new RuntimeException( e ); 902 } 903 } 904 } 905 906 public static Path getOutputPath( Configuration conf ) 907 { 908 String name = conf.get( "mapred.output.dir" ); 909 return name == null ? null : new Path( name ); 910 } 911 912 public static String pack( Object object, Configuration conf ) 913 { 914 if( object == null ) 915 return ""; 916 917 try 918 { 919 return serializeBase64( object, conf, true ); 920 } 921 catch( IOException exception ) 922 { 923 throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception ); 924 } 925 } 926 927 public static void addComparators( Configuration conf, String property, Map<String, Fields> map, BaseFlowStep flowStep, Group group ) 928 { 929 Iterator<Fields> fieldsIterator = map.values().iterator(); 930 931 if( !fieldsIterator.hasNext() ) 932 return; 933 934 Fields fields = fieldsIterator.next(); 935 936 if( fields.hasComparators() ) 937 { 938 conf.set( property, pack( fields, conf ) ); 939 return; 940 } 941 942 // use resolved fields if there are no comparators. 943 Set<Scope> previousScopes = flowStep.getPreviousScopes( group ); 944 945 fields = previousScopes.iterator().next().getOutValuesFields(); 946 947 if( fields.size() != 0 ) // allows fields.UNKNOWN to be used 948 conf.setInt( property + ".size", fields.size() ); 949 } 950 }