001/* 002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.hadoop.util; 023 024import java.io.IOException; 025import java.io.UnsupportedEncodingException; 026import java.lang.reflect.Constructor; 027import java.lang.reflect.Field; 028import java.lang.reflect.InvocationTargetException; 029import java.net.URI; 030import java.net.URL; 031import java.util.Collection; 032import java.util.Collections; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.Properties; 039import java.util.Set; 040import java.util.jar.Attributes; 041import java.util.jar.Manifest; 042 043import cascading.CascadingException; 044import cascading.flow.FlowException; 045import cascading.flow.planner.BaseFlowStep; 046import cascading.flow.planner.PlatformInfo; 047import cascading.flow.planner.Scope; 048import cascading.pipe.Group; 049import cascading.scheme.hadoop.TextLine; 050import cascading.tap.hadoop.Hfs; 051import cascading.tuple.Fields; 052import cascading.util.LogUtil; 053import cascading.util.Util; 054import org.apache.commons.codec.binary.Base64; 055import org.apache.hadoop.conf.Configurable; 056import org.apache.hadoop.conf.Configuration; 057import org.apache.hadoop.fs.FileStatus; 058import org.apache.hadoop.fs.FileSystem; 059import org.apache.hadoop.fs.LocalFileSystem; 060import org.apache.hadoop.fs.Path; 061import org.apache.hadoop.mapred.JobConf; 062import org.apache.hadoop.util.StringUtils; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066import static cascading.util.Util.invokeInstanceMethod; 067 068/** 069 * 070 */ 071public class HadoopUtil 072 { 073 public static final String CASCADING_FLOW_EXECUTING = "cascading.flow.executing"; 074 075 private static final Logger LOG = LoggerFactory.getLogger( HadoopUtil.class ); 076 private static final String ENCODING = "US-ASCII"; 077 private static final Class<?> DEFAULT_OBJECT_SERIALIZER = JavaObjectSerializer.class; 078 079 private static PlatformInfo platformInfo; 080 081 public static void setIsInflow( Configuration conf ) 082 { 083 conf.setBoolean( CASCADING_FLOW_EXECUTING, true ); 084 } 085 086 public static boolean isInflow( Configuration conf ) 087 { 088 return conf.getBoolean( CASCADING_FLOW_EXECUTING, false ); 089 } 090 091 public static void initLog4j( JobConf configuration ) 092 { 093 initLog4j( (Configuration) configuration ); 094 } 095 096 public static void initLog4j( Configuration configuration ) 097 { 098 String values = configuration.get( "log4j.logger", null ); 099 100 if( values == null || values.length() == 0 ) 101 return; 102 103 if( !Util.hasClass( "org.apache.log4j.Logger" ) ) 104 { 105 LOG.info( "org.apache.log4j.Logger is not in the current CLASSPATH, not setting log4j.logger properties" ); 106 return; 107 } 108 109 String[] elements = values.split( "," ); 110 111 for( String element : elements ) 112 LogUtil.setLog4jLevel( element.split( "=" ) ); 113 } 114 115 // only place JobConf should ever be returned 116 public static JobConf asJobConfInstance( Configuration configuration ) 117 { 118 if( configuration instanceof JobConf ) 119 return (JobConf) configuration; 120 121 return new JobConf( configuration ); 122 } 123 124 public static <C> C copyJobConf( C parentJobConf ) 125 { 126 return copyConfiguration( parentJobConf ); 127 } 128 129 public static JobConf copyJobConf( JobConf parentJobConf ) 130 { 131 if( parentJobConf == null ) 132 throw new IllegalArgumentException( "parent may not be null" ); 133 134 // see https://github.com/Cascading/cascading/pull/21 135 // The JobConf(JobConf) constructor causes derived JobConfs to share Credentials. We want to avoid this, in 136 // case those Credentials are mutated later on down the road (which they will be, during job submission, in 137 // separate threads!). Using the JobConf(Configuration) constructor avoids Credentials-sharing. 138 final Configuration configurationCopy = new Configuration( parentJobConf ); 139 final JobConf jobConf = new JobConf( configurationCopy ); 140 141 jobConf.getCredentials().addAll( parentJobConf.getCredentials() ); 142 143 return jobConf; 144 } 145 146 public static JobConf createJobConf( Map<Object, Object> properties, JobConf defaultJobconf ) 147 { 148 JobConf jobConf = defaultJobconf == null ? new JobConf() : copyJobConf( defaultJobconf ); 149 150 if( properties == null ) 151 return jobConf; 152 153 return copyConfiguration( properties, jobConf ); 154 } 155 156 public static <C> C copyConfiguration( C parent ) 157 { 158 if( parent == null ) 159 throw new IllegalArgumentException( "parent may not be null" ); 160 161 if( !( parent instanceof Configuration ) ) 162 throw new IllegalArgumentException( "parent must be of type Configuration" ); 163 164 Configuration conf = (Configuration) parent; 165 166 // see https://github.com/Cascading/cascading/pull/21 167 // The JobConf(JobConf) constructor causes derived JobConfs to share Credentials. We want to avoid this, in 168 // case those Credentials are mutated later on down the road (which they will be, during job submission, in 169 // separate threads!). Using the JobConf(Configuration) constructor avoids Credentials-sharing. 170 Configuration configurationCopy = new Configuration( conf ); 171 172 Configuration copiedConf = callCopyConstructor( parent.getClass(), configurationCopy ); 173 174 if( Util.hasInstanceMethod( parent, "getCredentials", null ) ) 175 { 176 Object result = invokeInstanceMethod( parent, "getCredentials", null, null ); 177 Object credentials = invokeInstanceMethod( copiedConf, "getCredentials", null, null ); 178 179 invokeInstanceMethod( credentials, "addAll", new Object[]{result}, new Class[]{credentials.getClass()} ); 180 } 181 182 return (C) copiedConf; 183 } 184 185 protected static <C extends Configuration> C callCopyConstructor( Class type, Configuration parent ) 186 { 187 try 188 { 189 Constructor<C> constructor = type.getConstructor( parent.getClass() ); 190 191 return constructor.newInstance( parent ); 192 } 193 catch( NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException exception ) 194 { 195 throw new CascadingException( "unable to create copy of: " + type ); 196 } 197 } 198 199 public static <C extends Configuration> C copyConfiguration( Map<Object, Object> srcProperties, C dstConfiguration ) 200 { 201 Set<Object> keys = new HashSet<Object>( srcProperties.keySet() ); 202 203 // keys will only be grabbed if both key/value are String, so keep orig keys 204 if( srcProperties instanceof Properties ) 205 keys.addAll( ( (Properties) srcProperties ).stringPropertyNames() ); 206 207 for( Object key : keys ) 208 { 209 Object value = srcProperties.get( key ); 210 211 if( value == null && srcProperties instanceof Properties && key instanceof String ) 212 value = ( (Properties) srcProperties ).getProperty( (String) key ); 213 214 if( value == null ) // don't stuff null values 215 continue; 216 217 // don't let these objects pass, even though toString is called below. 218 if( value instanceof Class || value instanceof JobConf ) 219 continue; 220 221 dstConfiguration.set( key.toString(), value.toString() ); 222 } 223 224 return dstConfiguration; 225 } 226 227 public static Map<Object, Object> createProperties( Configuration jobConf ) 228 { 229 Map<Object, Object> properties = new HashMap<Object, Object>(); 230 231 if( jobConf == null ) 232 return properties; 233 234 for( Map.Entry<String, String> entry : jobConf ) 235 properties.put( entry.getKey(), entry.getValue() ); 236 237 return properties; 238 } 239 240 public static Thread getHDFSShutdownHook() 241 { 242 Exception caughtException; 243 244 try 245 { 246 // we must init the FS so the finalizer is registered 247 FileSystem.getLocal( new JobConf() ); 248 249 Field field = FileSystem.class.getDeclaredField( "clientFinalizer" ); 250 field.setAccessible( true ); 251 252 Thread finalizer = (Thread) field.get( null ); 253 254 if( finalizer != null ) 255 Runtime.getRuntime().removeShutdownHook( finalizer ); 256 257 return finalizer; 258 } 259 catch( NoSuchFieldException exception ) 260 { 261 caughtException = exception; 262 } 263 catch( IllegalAccessException exception ) 264 { 265 caughtException = exception; 266 } 267 catch( IOException exception ) 268 { 269 caughtException = exception; 270 } 271 272 LOG.debug( "unable to find and remove client hdfs shutdown hook, received exception: {}", caughtException.getClass().getName() ); 273 274 return null; 275 } 276 277 public static String encodeBytes( byte[] bytes ) 278 { 279 try 280 { 281 return new String( Base64.encodeBase64( bytes ), ENCODING ); 282 } 283 catch( UnsupportedEncodingException exception ) 284 { 285 throw new RuntimeException( exception ); 286 } 287 } 288 289 public static byte[] decodeBytes( String string ) 290 { 291 try 292 { 293 byte[] bytes = string.getBytes( ENCODING ); 294 return Base64.decodeBase64( bytes ); 295 } 296 catch( UnsupportedEncodingException exception ) 297 { 298 throw new RuntimeException( exception ); 299 } 300 } 301 302 public static <T> ObjectSerializer instantiateSerializer( Configuration conf, Class<T> type ) throws ClassNotFoundException 303 { 304 Class<ObjectSerializer> flowSerializerClass; 305 306 String serializerClassName = conf.get( ObjectSerializer.OBJECT_SERIALIZER_PROPERTY ); 307 308 if( serializerClassName == null || serializerClassName.length() == 0 ) 309 flowSerializerClass = (Class<ObjectSerializer>) DEFAULT_OBJECT_SERIALIZER; 310 else 311 flowSerializerClass = (Class<ObjectSerializer>) Class.forName( serializerClassName ); 312 313 ObjectSerializer objectSerializer; 314 315 try 316 { 317 objectSerializer = flowSerializerClass.newInstance(); 318 319 if( objectSerializer instanceof Configurable ) 320 ( (Configurable) objectSerializer ).setConf( conf ); 321 } 322 catch( Exception exception ) 323 { 324 exception.printStackTrace(); 325 throw new IllegalArgumentException( "Unable to instantiate serializer \"" 326 + flowSerializerClass.getName() 327 + "\" for class: " 328 + type.getName() ); 329 } 330 331 if( !objectSerializer.accepts( type ) ) 332 throw new IllegalArgumentException( serializerClassName + " won't accept objects of class " + type.toString() ); 333 334 return objectSerializer; 335 } 336 337 public static <T> String serializeBase64( T object, Configuration conf ) throws IOException 338 { 339 return serializeBase64( object, conf, true ); 340 } 341 342 public static <T> String serializeBase64( T object, Configuration conf, boolean compress ) throws IOException 343 { 344 ObjectSerializer objectSerializer; 345 346 try 347 { 348 objectSerializer = instantiateSerializer( conf, object.getClass() ); 349 } 350 catch( ClassNotFoundException exception ) 351 { 352 throw new IOException( exception ); 353 } 354 355 return encodeBytes( objectSerializer.serialize( object, compress ) ); 356 } 357 358 /** 359 * This method deserializes the Base64 encoded String into an Object instance. 360 * 361 * @param string 362 * @return an Object 363 */ 364 public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type ) throws IOException 365 { 366 return deserializeBase64( string, conf, type, true ); 367 } 368 369 public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type, boolean decompress ) throws IOException 370 { 371 if( string == null || string.length() == 0 ) 372 return null; 373 374 ObjectSerializer objectSerializer; 375 376 try 377 { 378 objectSerializer = instantiateSerializer( conf, type ); 379 } 380 catch( ClassNotFoundException exception ) 381 { 382 throw new IOException( exception ); 383 } 384 385 return objectSerializer.deserialize( decodeBytes( string ), type, decompress ); 386 } 387 388 public static Class findMainClass( Class defaultType ) 389 { 390 return Util.findMainClass( defaultType, "org.apache.hadoop" ); 391 } 392 393 public static Map<String, String> getConfig( Configuration defaultConf, Configuration updatedConf ) 394 { 395 Map<String, String> configs = new HashMap<String, String>(); 396 397 for( Map.Entry<String, String> entry : updatedConf ) 398 configs.put( entry.getKey(), entry.getValue() ); 399 400 for( Map.Entry<String, String> entry : defaultConf ) 401 { 402 if( entry.getValue() == null ) 403 continue; 404 405 String updatedValue = configs.get( entry.getKey() ); 406 407 // if both null, lets purge from map to save space 408 if( updatedValue == null && entry.getValue() == null ) 409 configs.remove( entry.getKey() ); 410 411 // if the values are the same, lets also purge from map to save space 412 if( updatedValue != null && updatedValue.equals( entry.getValue() ) ) 413 configs.remove( entry.getKey() ); 414 415 configs.remove( "mapred.working.dir" ); 416 configs.remove( "mapreduce.job.working.dir" ); // hadoop2 417 } 418 419 return configs; 420 } 421 422 public static JobConf[] getJobConfs( Configuration job, List<Map<String, String>> configs ) 423 { 424 JobConf[] jobConfs = new JobConf[ configs.size() ]; 425 426 for( int i = 0; i < jobConfs.length; i++ ) 427 jobConfs[ i ] = (JobConf) mergeConf( job, configs.get( i ), false ); 428 429 return jobConfs; 430 } 431 432 public static <J extends Configuration> J mergeConf( J job, Map<String, String> config, boolean directly ) 433 { 434 Configuration currentConf = directly ? job : ( job instanceof JobConf ? copyJobConf( (JobConf) job ) : new Configuration( job ) ); 435 436 for( String key : config.keySet() ) 437 { 438 LOG.debug( "merging key: {} value: {}", key, config.get( key ) ); 439 440 currentConf.set( key, config.get( key ) ); 441 } 442 443 return (J) currentConf; 444 } 445 446 public static Configuration removePropertiesFrom( Configuration jobConf, String... keys ) 447 { 448 Map<Object, Object> properties = createProperties( jobConf ); 449 450 for( String key : keys ) 451 properties.remove( key ); 452 453 return copyConfiguration( properties, new JobConf() ); 454 } 455 456 public static boolean removeStateFromDistCache( Configuration conf, String path ) throws IOException 457 { 458 return new Hfs( new TextLine(), path ).deleteResource( conf ); 459 } 460 461 public static PlatformInfo getPlatformInfo() 462 { 463 if( platformInfo == null ) 464 platformInfo = getPlatformInfoInternal( JobConf.class, "org/apache/hadoop", "Hadoop" ); 465 466 return platformInfo; 467 } 468 469 public static PlatformInfo getPlatformInfo( Class type, String attributePath, String platformName ) 470 { 471 if( platformInfo == null ) 472 platformInfo = getPlatformInfoInternal( type, attributePath, platformName ); 473 474 return platformInfo; 475 } 476 477 public static PlatformInfo createPlatformInfo( Class type, String attributePath, String platformName ) 478 { 479 return getPlatformInfoInternal( type, attributePath, platformName ); 480 } 481 482 private static PlatformInfo getPlatformInfoInternal( Class type, String attributePath, String platformName ) 483 { 484 URL url = type.getResource( type.getSimpleName() + ".class" ); 485 486 if( url == null || !url.toString().startsWith( "jar" ) ) 487 return new PlatformInfo( platformName, null, null ); 488 489 String path = url.toString(); 490 path = path.substring( 0, path.lastIndexOf( "!" ) + 1 ); 491 492 String manifestPath = path + "/META-INF/MANIFEST.MF"; 493 String parsedVersion = Util.findVersion( path.substring( 0, path.length() - 1 ) ); 494 495 Manifest manifest; 496 497 try 498 { 499 manifest = new Manifest( new URL( manifestPath ).openStream() ); 500 } 501 catch( IOException exception ) 502 { 503 LOG.warn( "unable to get manifest from {}: {}", manifestPath, exception.getMessage() ); 504 505 return new PlatformInfo( platformName, null, parsedVersion ); 506 } 507 508 Attributes attributes = manifest.getAttributes( attributePath ); 509 510 if( attributes == null ) 511 attributes = manifest.getMainAttributes(); 512 513 if( attributes == null ) 514 { 515 LOG.debug( "unable to get platform manifest attributes" ); 516 return new PlatformInfo( platformName, null, parsedVersion ); 517 } 518 519 String vendor = attributes.getValue( "Implementation-Vendor" ); 520 String version = attributes.getValue( "Implementation-Version" ); 521 522 if( Util.isEmpty( version ) ) 523 version = parsedVersion; 524 525 return new PlatformInfo( platformName, vendor, version ); 526 } 527 528 /** 529 * Copies paths from one local path to a remote path. If syncTimes is true, both modification and access time are 530 * changed to match the local 'from' path. 531 * <p/> 532 * Returns a map of file-name to remote modification times if the remote time is different than the local time. 533 * 534 * @param config 535 * @param commonPaths 536 * @param syncTimes 537 */ 538 public static Map<String, Long> syncPaths( Configuration config, Map<Path, Path> commonPaths, boolean syncTimes ) 539 { 540 if( commonPaths == null ) 541 return Collections.emptyMap(); 542 543 Map<String, Long> timestampMap = new HashMap<>(); 544 545 Map<Path, Path> copyPaths = getCopyPaths( config, commonPaths ); // tests remote file existence or if stale 546 547 LocalFileSystem localFS = getLocalFS( config ); 548 FileSystem remoteFS = getDefaultFS( config ); 549 550 for( Map.Entry<Path, Path> entry : copyPaths.entrySet() ) 551 { 552 Path localPath = entry.getKey(); 553 Path remotePath = entry.getValue(); 554 555 try 556 { 557 LOG.info( "copying from: {}, to: {}", localPath, remotePath ); 558 remoteFS.copyFromLocalFile( localPath, remotePath ); 559 560 if( !syncTimes ) 561 { 562 timestampMap.put( remotePath.getName(), remoteFS.getFileStatus( remotePath ).getModificationTime() ); 563 continue; 564 } 565 } 566 catch( IOException exception ) 567 { 568 throw new FlowException( "unable to copy local: " + localPath + " to remote: " + remotePath, exception ); 569 } 570 571 FileStatus localFileStatus = null; 572 573 try 574 { 575 // sync the modified times so we can lazily upload jars to hdfs after job is started 576 // otherwise modified time will be local to hdfs 577 localFileStatus = localFS.getFileStatus( localPath ); 578 remoteFS.setTimes( remotePath, localFileStatus.getModificationTime(), -1 ); // don't set the access time 579 } 580 catch( IOException exception ) 581 { 582 LOG.info( "unable to set local modification time on remote file: {}, 'dfs.namenode.accesstime.precision' may be set to 0 on HDFS.", remotePath ); 583 584 if( localFileStatus != null ) 585 timestampMap.put( remotePath.getName(), localFileStatus.getModificationTime() ); 586 } 587 } 588 589 return timestampMap; 590 } 591 592 public static Map<Path, Path> getCommonPaths( Map<String, Path> localPaths, Map<String, Path> remotePaths ) 593 { 594 Map<Path, Path> commonPaths = new HashMap<Path, Path>(); 595 596 for( Map.Entry<String, Path> entry : localPaths.entrySet() ) 597 { 598 if( remotePaths.containsKey( entry.getKey() ) ) 599 commonPaths.put( entry.getValue(), remotePaths.get( entry.getKey() ) ); 600 } 601 602 return commonPaths; 603 } 604 605 private static Map<Path, Path> getCopyPaths( Configuration config, Map<Path, Path> commonPaths ) 606 { 607 Map<Path, Path> copyPaths = new HashMap<Path, Path>(); 608 609 FileSystem remoteFS = getDefaultFS( config ); 610 FileSystem localFS = getLocalFS( config ); 611 612 for( Map.Entry<Path, Path> entry : commonPaths.entrySet() ) 613 { 614 Path localPath = entry.getKey(); 615 Path remotePath = entry.getValue(); 616 617 try 618 { 619 boolean localExists = localFS.exists( localPath ); 620 boolean remoteExist = remoteFS.exists( remotePath ); 621 622 if( localExists && !remoteExist ) 623 { 624 copyPaths.put( localPath, remotePath ); 625 } 626 else if( localExists ) 627 { 628 long localModTime = localFS.getFileStatus( localPath ).getModificationTime(); 629 long remoteModTime = remoteFS.getFileStatus( remotePath ).getModificationTime(); 630 631 if( localModTime > remoteModTime ) 632 copyPaths.put( localPath, remotePath ); 633 } 634 } 635 catch( IOException exception ) 636 { 637 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 638 } 639 } 640 641 return copyPaths; 642 } 643 644 public static void resolvePaths( Configuration config, Collection<String> classpath, String remoteRoot, String resourceSubPath, Map<String, Path> localPaths, Map<String, Path> remotePaths ) 645 { 646 FileSystem defaultFS = getDefaultFS( config ); 647 FileSystem localFS = getLocalFS( config ); 648 649 Path remoteRootPath = new Path( remoteRoot == null ? "./.staging" : remoteRoot ); 650 651 if( resourceSubPath != null ) 652 remoteRootPath = new Path( remoteRootPath, resourceSubPath ); 653 654 remoteRootPath = defaultFS.makeQualified( remoteRootPath ); 655 656 boolean defaultIsLocal = defaultFS.equals( localFS ); 657 658 for( String stringPath : classpath ) 659 { 660 Path path = new Path( stringPath ); 661 662 URI uri = path.toUri(); 663 664 if( uri.getScheme() == null && !defaultIsLocal ) // we want to sync 665 { 666 Path localPath = localFS.makeQualified( path ); 667 668 if( !exists( localFS, localPath ) ) 669 throw new FlowException( "path not found: " + localPath ); 670 671 String name = localPath.getName(); 672 673 if( resourceSubPath != null ) 674 name = resourceSubPath + "/" + name; 675 676 localPaths.put( name, localPath ); 677 remotePaths.put( name, defaultFS.makeQualified( new Path( remoteRootPath, path.getName() ) ) ); 678 } 679 else if( localFS.equals( getFileSystem( config, path ) ) ) 680 { 681 if( !exists( localFS, path ) ) 682 throw new FlowException( "path not found: " + path ); 683 684 Path localPath = localFS.makeQualified( path ); 685 686 String name = localPath.getName(); 687 688 if( resourceSubPath != null ) 689 name = resourceSubPath + "/" + name; 690 691 localPaths.put( name, localPath ); 692 } 693 else 694 { 695 if( !exists( defaultFS, path ) ) 696 throw new FlowException( "path not found: " + path ); 697 698 Path defaultPath = defaultFS.makeQualified( path ); 699 700 String name = defaultPath.getName(); 701 702 if( resourceSubPath != null ) 703 name = resourceSubPath + "/" + name; 704 705 remotePaths.put( name, defaultPath ); 706 } 707 } 708 } 709 710 private static boolean exists( FileSystem fileSystem, Path path ) 711 { 712 try 713 { 714 return fileSystem.exists( path ); 715 } 716 catch( IOException exception ) 717 { 718 throw new FlowException( "could not test file exists: " + path ); 719 } 720 } 721 722 private static FileSystem getFileSystem( Configuration config, Path path ) 723 { 724 try 725 { 726 return path.getFileSystem( config ); 727 } 728 catch( IOException exception ) 729 { 730 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 731 } 732 } 733 734 public static LocalFileSystem getLocalFS( Configuration config ) 735 { 736 try 737 { 738 return FileSystem.getLocal( config ); 739 } 740 catch( IOException exception ) 741 { 742 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 743 } 744 } 745 746 public static FileSystem getDefaultFS( Configuration config ) 747 { 748 try 749 { 750 return FileSystem.get( config ); 751 } 752 catch( IOException exception ) 753 { 754 throw new FlowException( "unable to get handle to underlying filesystem", exception ); 755 } 756 } 757 758 public static boolean isLocal( Configuration conf ) 759 { 760 // hadoop 1.0 and 2.0 use different properties to define local mode: we check the new YARN 761 // property first 762 String frameworkName = conf.get( "mapreduce.framework.name" ); 763 764 // we are running on hadoop 2.0 (YARN) 765 if( frameworkName != null ) 766 return frameworkName.equals( "local" ); 767 768 // for Tez 769 String tezLocal = conf.get( "tez.local.mode" ); 770 771 if( tezLocal != null ) 772 return tezLocal.equals( "true" ); 773 774 // hadoop 1.0: use the old property to determine the local mode 775 String hadoop1 = conf.get( "mapred.job.tracker" ); 776 777 if( hadoop1 == null ) 778 { 779 LOG.warn( "could not successfully test if Hadoop based platform is in standalone/local mode, no valid properties set, returning false - tests for: mapreduce.framework.name, tez.local.mode, and mapred.job.tracker" ); 780 return false; 781 } 782 783 return hadoop1.equals( "local" ); 784 } 785 786 public static boolean isYARN( Configuration conf ) 787 { 788 return conf.get( "mapreduce.framework.name" ) != null; 789 } 790 791 public static void setLocal( Configuration conf ) 792 { 793 // set both properties to local 794 conf.set( "mapred.job.tracker", "local" ); 795 796 // yarn 797 conf.set( "mapreduce.framework.name", "local" ); 798 799 // tez 800 conf.set( "tez.local.mode", "true" ); 801 conf.set( "tez.runtime.optimize.local.fetch", "true" ); 802 } 803 804 private static boolean interfaceAssignableFromClassName( Class<?> xface, String className ) 805 { 806 if( ( className == null ) || ( xface == null ) ) 807 return false; 808 809 try 810 { 811 Class<?> klass = Class.forName( className ); 812 if( klass == null ) 813 return false; 814 815 if( !xface.isAssignableFrom( klass ) ) 816 return false; 817 818 return true; 819 } 820 catch( ClassNotFoundException cnfe ) 821 { 822 return false; // let downstream figure it out 823 } 824 } 825 826 public static boolean setNewApi( Configuration conf, String className ) 827 { 828 if( className == null ) // silently return and let the error be caught downstream 829 return false; 830 831 boolean isStable = className.startsWith( "org.apache.hadoop.mapred." ) 832 || interfaceAssignableFromClassName( org.apache.hadoop.mapred.InputFormat.class, className ); 833 834 boolean isNew = className.startsWith( "org.apache.hadoop.mapreduce." ) 835 || interfaceAssignableFromClassName( org.apache.hadoop.mapreduce.InputFormat.class, className ); 836 837 if( isStable ) 838 conf.setBoolean( "mapred.mapper.new-api", false ); 839 else if( isNew ) 840 conf.setBoolean( "mapred.mapper.new-api", true ); 841 else 842 throw new IllegalStateException( "cannot determine if class denotes stable or new api, please set 'mapred.mapper.new-api' to the appropriate value" ); 843 844 return true; 845 } 846 847 public static void addInputPaths( Configuration conf, Iterable<Path> paths ) 848 { 849 Path workingDirectory = getWorkingDirectory( conf ); 850 String dirs = conf.get( "mapred.input.dir" ); 851 StringBuilder buffer = new StringBuilder( dirs == null ? "" : dirs ); 852 853 for( Path path : paths ) 854 { 855 if( !path.isAbsolute() ) 856 path = new Path( workingDirectory, path ); 857 858 String dirStr = StringUtils.escapeString( path.toString() ); 859 860 if( buffer.length() != 0 ) 861 buffer.append( ',' ); 862 863 buffer.append( dirStr ); 864 } 865 866 conf.set( "mapred.input.dir", buffer.toString() ); 867 } 868 869 public static void addInputPath( Configuration conf, Path path ) 870 { 871 Path workingDirectory = getWorkingDirectory( conf ); 872 path = new Path( workingDirectory, path ); 873 String dirStr = StringUtils.escapeString( path.toString() ); 874 String dirs = conf.get( "mapred.input.dir" ); 875 conf.set( "mapred.input.dir", dirs == null ? dirStr : 876 dirs + StringUtils.COMMA_STR + dirStr ); 877 } 878 879 public static void setOutputPath( Configuration conf, Path path ) 880 { 881 Path workingDirectory = getWorkingDirectory( conf ); 882 path = new Path( workingDirectory, path ); 883 conf.set( "mapred.output.dir", path.toString() ); 884 } 885 886 private static Path getWorkingDirectory( Configuration conf ) 887 { 888 String name = conf.get( "mapred.working.dir" ); 889 if( name != null ) 890 { 891 return new Path( name ); 892 } 893 else 894 { 895 try 896 { 897 Path dir = FileSystem.get( conf ).getWorkingDirectory(); 898 conf.set( "mapred.working.dir", dir.toString() ); 899 return dir; 900 } 901 catch( IOException e ) 902 { 903 throw new RuntimeException( e ); 904 } 905 } 906 } 907 908 public static Path getOutputPath( Configuration conf ) 909 { 910 String name = conf.get( "mapred.output.dir" ); 911 return name == null ? null : new Path( name ); 912 } 913 914 public static String pack( Object object, Configuration conf ) 915 { 916 if( object == null ) 917 return ""; 918 919 try 920 { 921 return serializeBase64( object, conf, true ); 922 } 923 catch( IOException exception ) 924 { 925 throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception ); 926 } 927 } 928 929 public static void addFields( Configuration conf, String property, Map<Integer, Fields> fields ) 930 { 931 if( fields == null || fields.isEmpty() ) 932 return; 933 934 Map<String, Fields> toPack = new HashMap<>(); 935 936 for( Map.Entry<Integer, Fields> entry : fields.entrySet() ) 937 toPack.put( entry.getKey().toString(), entry.getValue() ); 938 939 conf.set( property, pack( toPack, conf ) ); 940 } 941 942 public static Map<Integer, Fields> getFields( Configuration conf, String property ) throws IOException 943 { 944 String value = conf.getRaw( property ); 945 946 if( value == null || value.isEmpty() ) 947 return Collections.emptyMap(); 948 949 Map<String, Fields> map = deserializeBase64( value, conf, Map.class, true ); 950 Map<Integer, Fields> result = new HashMap<>(); 951 952 for( Map.Entry<String, Fields> entry : map.entrySet() ) 953 result.put( Integer.parseInt( entry.getKey() ), entry.getValue() ); 954 955 return result; 956 } 957 958 public static void addComparators( Configuration conf, String property, Map<String, Fields> map, BaseFlowStep flowStep, Group group ) 959 { 960 Iterator<Fields> fieldsIterator = map.values().iterator(); 961 962 if( !fieldsIterator.hasNext() ) 963 return; 964 965 Fields fields = fieldsIterator.next(); 966 967 if( fields.hasComparators() ) 968 { 969 conf.set( property, pack( fields, conf ) ); 970 return; 971 } 972 973 // use resolved fields if there are no comparators. 974 Set<Scope> previousScopes = flowStep.getPreviousScopes( group ); 975 976 fields = previousScopes.iterator().next().getOutValuesFields(); 977 978 if( fields.size() != 0 ) // allows fields.UNKNOWN to be used 979 conf.setInt( property + ".size", fields.size() ); 980 } 981 982 public static void addComparators( Configuration conf, String property, Map<String, Fields> map, Fields resolvedFields ) 983 { 984 Iterator<Fields> fieldsIterator = map.values().iterator(); 985 986 if( !fieldsIterator.hasNext() ) 987 return; 988 989 while( fieldsIterator.hasNext() ) 990 { 991 Fields fields = fieldsIterator.next(); 992 993 if( fields.hasComparators() ) 994 { 995 conf.set( property, pack( fields, conf ) ); 996 return; 997 } 998 } 999 1000 if( resolvedFields.size() != 0 ) // allows fields.UNKNOWN to be used 1001 conf.setInt( property + ".size", resolvedFields.size() ); 1002 } 1003 }