001/*
002 * Copyright (c) 2016-2018 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.hadoop.util;
023
024import java.io.IOException;
025import java.io.UnsupportedEncodingException;
026import java.lang.reflect.Constructor;
027import java.lang.reflect.Field;
028import java.lang.reflect.InvocationTargetException;
029import java.net.URI;
030import java.net.URL;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.Properties;
039import java.util.Set;
040import java.util.jar.Attributes;
041import java.util.jar.Manifest;
042
043import cascading.CascadingException;
044import cascading.flow.FlowException;
045import cascading.flow.planner.BaseFlowStep;
046import cascading.flow.planner.PlatformInfo;
047import cascading.flow.planner.Scope;
048import cascading.pipe.Group;
049import cascading.scheme.hadoop.TextLine;
050import cascading.tap.hadoop.Hfs;
051import cascading.tuple.Fields;
052import cascading.util.LogUtil;
053import cascading.util.Util;
054import org.apache.commons.codec.binary.Base64;
055import org.apache.hadoop.conf.Configurable;
056import org.apache.hadoop.conf.Configuration;
057import org.apache.hadoop.fs.FileStatus;
058import org.apache.hadoop.fs.FileSystem;
059import org.apache.hadoop.fs.LocalFileSystem;
060import org.apache.hadoop.fs.Path;
061import org.apache.hadoop.mapred.JobConf;
062import org.apache.hadoop.util.StringUtils;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import static cascading.util.Util.invokeInstanceMethod;
067
068/**
069 *
070 */
071public class HadoopUtil
072  {
073  public static final String CASCADING_FLOW_EXECUTING = "cascading.flow.executing";
074
075  private static final Logger LOG = LoggerFactory.getLogger( HadoopUtil.class );
076  private static final String ENCODING = "US-ASCII";
077  private static final Class<?> DEFAULT_OBJECT_SERIALIZER = JavaObjectSerializer.class;
078
079  private static PlatformInfo platformInfo;
080
081  public static void setIsInflow( Configuration conf )
082    {
083    conf.setBoolean( CASCADING_FLOW_EXECUTING, true );
084    }
085
086  public static boolean isInflow( Configuration conf )
087    {
088    return conf.getBoolean( CASCADING_FLOW_EXECUTING, false );
089    }
090
091  public static void initLog4j( JobConf configuration )
092    {
093    initLog4j( (Configuration) configuration );
094    }
095
096  public static void initLog4j( Configuration configuration )
097    {
098    String values = configuration.get( "log4j.logger", null );
099
100    if( values == null || values.length() == 0 )
101      return;
102
103    if( !Util.hasClass( "org.apache.log4j.Logger" ) )
104      {
105      LOG.info( "org.apache.log4j.Logger is not in the current CLASSPATH, not setting log4j.logger properties" );
106      return;
107      }
108
109    String[] elements = values.split( "," );
110
111    for( String element : elements )
112      LogUtil.setLog4jLevel( element.split( "=" ) );
113    }
114
115  // only place JobConf should ever be returned
116  public static JobConf asJobConfInstance( Configuration configuration )
117    {
118    if( configuration instanceof JobConf )
119      return (JobConf) configuration;
120
121    return new JobConf( configuration );
122    }
123
124  public static <C> C copyJobConf( C parentJobConf )
125    {
126    return copyConfiguration( parentJobConf );
127    }
128
129  public static JobConf copyJobConf( JobConf parentJobConf )
130    {
131    if( parentJobConf == null )
132      throw new IllegalArgumentException( "parent may not be null" );
133
134    // see https://github.com/Cascading/cascading/pull/21
135    // The JobConf(JobConf) constructor causes derived JobConfs to share Credentials. We want to avoid this, in
136    // case those Credentials are mutated later on down the road (which they will be, during job submission, in
137    // separate threads!). Using the JobConf(Configuration) constructor avoids Credentials-sharing.
138    final Configuration configurationCopy = new Configuration( parentJobConf );
139    final JobConf jobConf = new JobConf( configurationCopy );
140
141    jobConf.getCredentials().addAll( parentJobConf.getCredentials() );
142
143    return jobConf;
144    }
145
146  public static JobConf createJobConf( Map<Object, Object> properties, JobConf defaultJobconf )
147    {
148    JobConf jobConf = defaultJobconf == null ? new JobConf() : copyJobConf( defaultJobconf );
149
150    if( properties == null )
151      return jobConf;
152
153    return copyConfiguration( properties, jobConf );
154    }
155
156  public static <C> C copyConfiguration( C parent )
157    {
158    if( parent == null )
159      throw new IllegalArgumentException( "parent may not be null" );
160
161    if( !( parent instanceof Configuration ) )
162      throw new IllegalArgumentException( "parent must be of type Configuration" );
163
164    Configuration conf = (Configuration) parent;
165
166    // see https://github.com/Cascading/cascading/pull/21
167    // The JobConf(JobConf) constructor causes derived JobConfs to share Credentials. We want to avoid this, in
168    // case those Credentials are mutated later on down the road (which they will be, during job submission, in
169    // separate threads!). Using the JobConf(Configuration) constructor avoids Credentials-sharing.
170    Configuration configurationCopy = new Configuration( conf );
171
172    Configuration copiedConf = callCopyConstructor( parent.getClass(), configurationCopy );
173
174    if( Util.hasInstanceMethod( parent, "getCredentials", null ) )
175      {
176      Object result = invokeInstanceMethod( parent, "getCredentials", null, null );
177      Object credentials = invokeInstanceMethod( copiedConf, "getCredentials", null, null );
178
179      invokeInstanceMethod( credentials, "addAll", new Object[]{result}, new Class[]{credentials.getClass()} );
180      }
181
182    return (C) copiedConf;
183    }
184
185  protected static <C extends Configuration> C callCopyConstructor( Class type, Configuration parent )
186    {
187    try
188      {
189      Constructor<C> constructor = type.getConstructor( parent.getClass() );
190
191      return constructor.newInstance( parent );
192      }
193    catch( NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException exception )
194      {
195      throw new CascadingException( "unable to create copy of: " + type );
196      }
197    }
198
199  public static <C extends Configuration> C copyConfiguration( Map<Object, Object> srcProperties, C dstConfiguration )
200    {
201    Set<Object> keys = new HashSet<Object>( srcProperties.keySet() );
202
203    // keys will only be grabbed if both key/value are String, so keep orig keys
204    if( srcProperties instanceof Properties )
205      keys.addAll( ( (Properties) srcProperties ).stringPropertyNames() );
206
207    for( Object key : keys )
208      {
209      Object value = srcProperties.get( key );
210
211      if( value == null && srcProperties instanceof Properties && key instanceof String )
212        value = ( (Properties) srcProperties ).getProperty( (String) key );
213
214      if( value == null ) // don't stuff null values
215        continue;
216
217      // don't let these objects pass, even though toString is called below.
218      if( value instanceof Class || value instanceof JobConf )
219        continue;
220
221      dstConfiguration.set( key.toString(), value.toString() );
222      }
223
224    return dstConfiguration;
225    }
226
227  public static Map<Object, Object> createProperties( Configuration jobConf )
228    {
229    Map<Object, Object> properties = new HashMap<Object, Object>();
230
231    if( jobConf == null )
232      return properties;
233
234    for( Map.Entry<String, String> entry : jobConf )
235      properties.put( entry.getKey(), entry.getValue() );
236
237    return properties;
238    }
239
240  public static Thread getHDFSShutdownHook()
241    {
242    Exception caughtException;
243
244    try
245      {
246      // we must init the FS so the finalizer is registered
247      FileSystem.getLocal( new JobConf() );
248
249      Field field = FileSystem.class.getDeclaredField( "clientFinalizer" );
250      field.setAccessible( true );
251
252      Thread finalizer = (Thread) field.get( null );
253
254      if( finalizer != null )
255        Runtime.getRuntime().removeShutdownHook( finalizer );
256
257      return finalizer;
258      }
259    catch( NoSuchFieldException exception )
260      {
261      caughtException = exception;
262      }
263    catch( IllegalAccessException exception )
264      {
265      caughtException = exception;
266      }
267    catch( IOException exception )
268      {
269      caughtException = exception;
270      }
271
272    LOG.debug( "unable to find and remove client hdfs shutdown hook, received exception: {}", caughtException.getClass().getName() );
273
274    return null;
275    }
276
277  public static String encodeBytes( byte[] bytes )
278    {
279    try
280      {
281      return new String( Base64.encodeBase64( bytes ), ENCODING );
282      }
283    catch( UnsupportedEncodingException exception )
284      {
285      throw new RuntimeException( exception );
286      }
287    }
288
289  public static byte[] decodeBytes( String string )
290    {
291    try
292      {
293      byte[] bytes = string.getBytes( ENCODING );
294      return Base64.decodeBase64( bytes );
295      }
296    catch( UnsupportedEncodingException exception )
297      {
298      throw new RuntimeException( exception );
299      }
300    }
301
302  public static <T> ObjectSerializer instantiateSerializer( Configuration conf, Class<T> type ) throws ClassNotFoundException
303    {
304    Class<ObjectSerializer> flowSerializerClass;
305
306    String serializerClassName = conf.get( ObjectSerializer.OBJECT_SERIALIZER_PROPERTY );
307
308    if( serializerClassName == null || serializerClassName.length() == 0 )
309      flowSerializerClass = (Class<ObjectSerializer>) DEFAULT_OBJECT_SERIALIZER;
310    else
311      flowSerializerClass = (Class<ObjectSerializer>) Class.forName( serializerClassName );
312
313    ObjectSerializer objectSerializer;
314
315    try
316      {
317      objectSerializer = flowSerializerClass.newInstance();
318
319      if( objectSerializer instanceof Configurable )
320        ( (Configurable) objectSerializer ).setConf( conf );
321      }
322    catch( Exception exception )
323      {
324      exception.printStackTrace();
325      throw new IllegalArgumentException( "Unable to instantiate serializer \""
326        + flowSerializerClass.getName()
327        + "\" for class: "
328        + type.getName() );
329      }
330
331    if( !objectSerializer.accepts( type ) )
332      throw new IllegalArgumentException( serializerClassName + " won't accept objects of class " + type.toString() );
333
334    return objectSerializer;
335    }
336
337  public static <T> String serializeBase64( T object, Configuration conf ) throws IOException
338    {
339    return serializeBase64( object, conf, true );
340    }
341
342  public static <T> String serializeBase64( T object, Configuration conf, boolean compress ) throws IOException
343    {
344    ObjectSerializer objectSerializer;
345
346    try
347      {
348      objectSerializer = instantiateSerializer( conf, object.getClass() );
349      }
350    catch( ClassNotFoundException exception )
351      {
352      throw new IOException( exception );
353      }
354
355    return encodeBytes( objectSerializer.serialize( object, compress ) );
356    }
357
358  /**
359   * This method deserializes the Base64 encoded String into an Object instance.
360   *
361   * @param string
362   * @return an Object
363   */
364  public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type ) throws IOException
365    {
366    return deserializeBase64( string, conf, type, true );
367    }
368
369  public static <T> T deserializeBase64( String string, Configuration conf, Class<T> type, boolean decompress ) throws IOException
370    {
371    if( string == null || string.length() == 0 )
372      return null;
373
374    ObjectSerializer objectSerializer;
375
376    try
377      {
378      objectSerializer = instantiateSerializer( conf, type );
379      }
380    catch( ClassNotFoundException exception )
381      {
382      throw new IOException( exception );
383      }
384
385    return objectSerializer.deserialize( decodeBytes( string ), type, decompress );
386    }
387
388  public static Class findMainClass( Class defaultType )
389    {
390    return Util.findMainClass( defaultType, "org.apache.hadoop" );
391    }
392
393  public static Map<String, String> getConfig( Configuration defaultConf, Configuration updatedConf )
394    {
395    Map<String, String> configs = new HashMap<String, String>();
396
397    for( Map.Entry<String, String> entry : updatedConf )
398      configs.put( entry.getKey(), entry.getValue() );
399
400    for( Map.Entry<String, String> entry : defaultConf )
401      {
402      if( entry.getValue() == null )
403        continue;
404
405      String updatedValue = configs.get( entry.getKey() );
406
407      // if both null, lets purge from map to save space
408      if( updatedValue == null && entry.getValue() == null )
409        configs.remove( entry.getKey() );
410
411      // if the values are the same, lets also purge from map to save space
412      if( updatedValue != null && updatedValue.equals( entry.getValue() ) )
413        configs.remove( entry.getKey() );
414
415      configs.remove( "mapred.working.dir" );
416      configs.remove( "mapreduce.job.working.dir" ); // hadoop2
417      }
418
419    return configs;
420    }
421
422  public static JobConf[] getJobConfs( Configuration job, List<Map<String, String>> configs )
423    {
424    JobConf[] jobConfs = new JobConf[ configs.size() ];
425
426    for( int i = 0; i < jobConfs.length; i++ )
427      jobConfs[ i ] = (JobConf) mergeConf( job, configs.get( i ), false );
428
429    return jobConfs;
430    }
431
432  public static <J extends Configuration> J mergeConf( J job, Map<String, String> config, boolean directly )
433    {
434    Configuration currentConf = directly ? job : ( job instanceof JobConf ? copyJobConf( (JobConf) job ) : new Configuration( job ) );
435
436    for( String key : config.keySet() )
437      {
438      LOG.debug( "merging key: {} value: {}", key, config.get( key ) );
439
440      currentConf.set( key, config.get( key ) );
441      }
442
443    return (J) currentConf;
444    }
445
446  public static Configuration removePropertiesFrom( Configuration jobConf, String... keys )
447    {
448    Map<Object, Object> properties = createProperties( jobConf );
449
450    for( String key : keys )
451      properties.remove( key );
452
453    return copyConfiguration( properties, new JobConf() );
454    }
455
456  public static boolean removeStateFromDistCache( Configuration conf, String path ) throws IOException
457    {
458    return new Hfs( new TextLine(), path ).deleteResource( conf );
459    }
460
461  public static PlatformInfo getPlatformInfo()
462    {
463    if( platformInfo == null )
464      platformInfo = getPlatformInfoInternal( JobConf.class, "org/apache/hadoop", "Hadoop" );
465
466    return platformInfo;
467    }
468
469  public static PlatformInfo getPlatformInfo( Class type, String attributePath, String platformName )
470    {
471    if( platformInfo == null )
472      platformInfo = getPlatformInfoInternal( type, attributePath, platformName );
473
474    return platformInfo;
475    }
476
477  public static PlatformInfo createPlatformInfo( Class type, String attributePath, String platformName )
478    {
479    return getPlatformInfoInternal( type, attributePath, platformName );
480    }
481
482  private static PlatformInfo getPlatformInfoInternal( Class type, String attributePath, String platformName )
483    {
484    URL url = type.getResource( type.getSimpleName() + ".class" );
485
486    if( url == null || !url.toString().startsWith( "jar" ) )
487      return new PlatformInfo( platformName, null, null );
488
489    String path = url.toString();
490    path = path.substring( 0, path.lastIndexOf( "!" ) + 1 );
491
492    String manifestPath = path + "/META-INF/MANIFEST.MF";
493    String parsedVersion = Util.findVersion( path.substring( 0, path.length() - 1 ) );
494
495    Manifest manifest;
496
497    try
498      {
499      manifest = new Manifest( new URL( manifestPath ).openStream() );
500      }
501    catch( IOException exception )
502      {
503      LOG.warn( "unable to get manifest from {}: {}", manifestPath, exception.getMessage() );
504
505      return new PlatformInfo( platformName, null, parsedVersion );
506      }
507
508    Attributes attributes = manifest.getAttributes( attributePath );
509
510    if( attributes == null )
511      attributes = manifest.getMainAttributes();
512
513    if( attributes == null )
514      {
515      LOG.debug( "unable to get platform manifest attributes" );
516      return new PlatformInfo( platformName, null, parsedVersion );
517      }
518
519    String vendor = attributes.getValue( "Implementation-Vendor" );
520    String version = attributes.getValue( "Implementation-Version" );
521
522    if( Util.isEmpty( version ) )
523      version = parsedVersion;
524
525    return new PlatformInfo( platformName, vendor, version );
526    }
527
528  /**
529   * Copies paths from one local path to a remote path. If syncTimes is true, both modification and access time are
530   * changed to match the local 'from' path.
531   * <p/>
532   * Returns a map of file-name to remote modification times if the remote time is different than the local time.
533   *
534   * @param config
535   * @param commonPaths
536   * @param syncTimes
537   */
538  public static Map<String, Long> syncPaths( Configuration config, Map<Path, Path> commonPaths, boolean syncTimes )
539    {
540    if( commonPaths == null )
541      return Collections.emptyMap();
542
543    Map<String, Long> timestampMap = new HashMap<>();
544
545    Map<Path, Path> copyPaths = getCopyPaths( config, commonPaths ); // tests remote file existence or if stale
546
547    LocalFileSystem localFS = getLocalFS( config );
548    FileSystem remoteFS = getDefaultFS( config );
549
550    for( Map.Entry<Path, Path> entry : copyPaths.entrySet() )
551      {
552      Path localPath = entry.getKey();
553      Path remotePath = entry.getValue();
554
555      try
556        {
557        LOG.info( "copying from: {}, to: {}", localPath, remotePath );
558        remoteFS.copyFromLocalFile( localPath, remotePath );
559
560        if( !syncTimes )
561          {
562          timestampMap.put( remotePath.getName(), remoteFS.getFileStatus( remotePath ).getModificationTime() );
563          continue;
564          }
565        }
566      catch( IOException exception )
567        {
568        throw new FlowException( "unable to copy local: " + localPath + " to remote: " + remotePath, exception );
569        }
570
571      FileStatus localFileStatus = null;
572
573      try
574        {
575        // sync the modified times so we can lazily upload jars to hdfs after job is started
576        // otherwise modified time will be local to hdfs
577        localFileStatus = localFS.getFileStatus( localPath );
578        remoteFS.setTimes( remotePath, localFileStatus.getModificationTime(), -1 ); // don't set the access time
579        }
580      catch( IOException exception )
581        {
582        LOG.info( "unable to set local modification time on remote file: {}, 'dfs.namenode.accesstime.precision' may be set to 0 on HDFS.", remotePath );
583
584        if( localFileStatus != null )
585          timestampMap.put( remotePath.getName(), localFileStatus.getModificationTime() );
586        }
587      }
588
589    return timestampMap;
590    }
591
592  public static Map<Path, Path> getCommonPaths( Map<String, Path> localPaths, Map<String, Path> remotePaths )
593    {
594    Map<Path, Path> commonPaths = new HashMap<Path, Path>();
595
596    for( Map.Entry<String, Path> entry : localPaths.entrySet() )
597      {
598      if( remotePaths.containsKey( entry.getKey() ) )
599        commonPaths.put( entry.getValue(), remotePaths.get( entry.getKey() ) );
600      }
601
602    return commonPaths;
603    }
604
605  private static Map<Path, Path> getCopyPaths( Configuration config, Map<Path, Path> commonPaths )
606    {
607    Map<Path, Path> copyPaths = new HashMap<Path, Path>();
608
609    FileSystem remoteFS = getDefaultFS( config );
610    FileSystem localFS = getLocalFS( config );
611
612    for( Map.Entry<Path, Path> entry : commonPaths.entrySet() )
613      {
614      Path localPath = entry.getKey();
615      Path remotePath = entry.getValue();
616
617      try
618        {
619        boolean localExists = localFS.exists( localPath );
620        boolean remoteExist = remoteFS.exists( remotePath );
621
622        if( localExists && !remoteExist )
623          {
624          copyPaths.put( localPath, remotePath );
625          }
626        else if( localExists )
627          {
628          long localModTime = localFS.getFileStatus( localPath ).getModificationTime();
629          long remoteModTime = remoteFS.getFileStatus( remotePath ).getModificationTime();
630
631          if( localModTime > remoteModTime )
632            copyPaths.put( localPath, remotePath );
633          }
634        }
635      catch( IOException exception )
636        {
637        throw new FlowException( "unable to get handle to underlying filesystem", exception );
638        }
639      }
640
641    return copyPaths;
642    }
643
644  public static void resolvePaths( Configuration config, Collection<String> classpath, String remoteRoot, String resourceSubPath, Map<String, Path> localPaths, Map<String, Path> remotePaths )
645    {
646    FileSystem defaultFS = getDefaultFS( config );
647    FileSystem localFS = getLocalFS( config );
648
649    Path remoteRootPath = new Path( remoteRoot == null ? "./.staging" : remoteRoot );
650
651    if( resourceSubPath != null )
652      remoteRootPath = new Path( remoteRootPath, resourceSubPath );
653
654    remoteRootPath = defaultFS.makeQualified( remoteRootPath );
655
656    boolean defaultIsLocal = defaultFS.equals( localFS );
657
658    for( String stringPath : classpath )
659      {
660      Path path = new Path( stringPath );
661
662      URI uri = path.toUri();
663
664      if( uri.getScheme() == null && !defaultIsLocal ) // we want to sync
665        {
666        Path localPath = localFS.makeQualified( path );
667
668        if( !exists( localFS, localPath ) )
669          throw new FlowException( "path not found: " + localPath );
670
671        String name = localPath.getName();
672
673        if( resourceSubPath != null )
674          name = resourceSubPath + "/" + name;
675
676        localPaths.put( name, localPath );
677        remotePaths.put( name, defaultFS.makeQualified( new Path( remoteRootPath, path.getName() ) ) );
678        }
679      else if( localFS.equals( getFileSystem( config, path ) ) )
680        {
681        if( !exists( localFS, path ) )
682          throw new FlowException( "path not found: " + path );
683
684        Path localPath = localFS.makeQualified( path );
685
686        String name = localPath.getName();
687
688        if( resourceSubPath != null )
689          name = resourceSubPath + "/" + name;
690
691        localPaths.put( name, localPath );
692        }
693      else
694        {
695        if( !exists( defaultFS, path ) )
696          throw new FlowException( "path not found: " + path );
697
698        Path defaultPath = defaultFS.makeQualified( path );
699
700        String name = defaultPath.getName();
701
702        if( resourceSubPath != null )
703          name = resourceSubPath + "/" + name;
704
705        remotePaths.put( name, defaultPath );
706        }
707      }
708    }
709
710  private static boolean exists( FileSystem fileSystem, Path path )
711    {
712    try
713      {
714      return fileSystem.exists( path );
715      }
716    catch( IOException exception )
717      {
718      throw new FlowException( "could not test file exists: " + path );
719      }
720    }
721
722  private static FileSystem getFileSystem( Configuration config, Path path )
723    {
724    try
725      {
726      return path.getFileSystem( config );
727      }
728    catch( IOException exception )
729      {
730      throw new FlowException( "unable to get handle to underlying filesystem", exception );
731      }
732    }
733
734  public static LocalFileSystem getLocalFS( Configuration config )
735    {
736    try
737      {
738      return FileSystem.getLocal( config );
739      }
740    catch( IOException exception )
741      {
742      throw new FlowException( "unable to get handle to underlying filesystem", exception );
743      }
744    }
745
746  public static FileSystem getDefaultFS( Configuration config )
747    {
748    try
749      {
750      return FileSystem.get( config );
751      }
752    catch( IOException exception )
753      {
754      throw new FlowException( "unable to get handle to underlying filesystem", exception );
755      }
756    }
757
758  public static boolean isLocal( Configuration conf )
759    {
760    // hadoop 1.0 and 2.0 use different properties to define local mode: we check the new YARN
761    // property first
762    String frameworkName = conf.get( "mapreduce.framework.name" );
763
764    // we are running on hadoop 2.0 (YARN)
765    if( frameworkName != null )
766      return frameworkName.equals( "local" );
767
768    // for Tez
769    String tezLocal = conf.get( "tez.local.mode" );
770
771    if( tezLocal != null )
772      return tezLocal.equals( "true" );
773
774    // hadoop 1.0: use the old property to determine the local mode
775    String hadoop1 = conf.get( "mapred.job.tracker" );
776
777    if( hadoop1 == null )
778      {
779      LOG.warn( "could not successfully test if Hadoop based platform is in standalone/local mode, no valid properties set, returning false - tests for: mapreduce.framework.name, tez.local.mode, and mapred.job.tracker" );
780      return false;
781      }
782
783    return hadoop1.equals( "local" );
784    }
785
786  public static boolean isYARN( Configuration conf )
787    {
788    return conf.get( "mapreduce.framework.name" ) != null;
789    }
790
791  public static void setLocal( Configuration conf )
792    {
793    // set both properties to local
794    conf.set( "mapred.job.tracker", "local" );
795
796    // yarn
797    conf.set( "mapreduce.framework.name", "local" );
798
799    // tez
800    conf.set( "tez.local.mode", "true" );
801    conf.set( "tez.runtime.optimize.local.fetch", "true" );
802    }
803
804  private static boolean interfaceAssignableFromClassName( Class<?> xface, String className )
805    {
806    if( ( className == null ) || ( xface == null ) )
807      return false;
808
809    try
810      {
811      Class<?> klass = Class.forName( className );
812      if( klass == null )
813        return false;
814
815      if( !xface.isAssignableFrom( klass ) )
816        return false;
817
818      return true;
819      }
820    catch( ClassNotFoundException cnfe )
821      {
822      return false; // let downstream figure it out
823      }
824    }
825
826  public static boolean setNewApi( Configuration conf, String className )
827    {
828    if( className == null ) // silently return and let the error be caught downstream
829      return false;
830
831    boolean isStable = className.startsWith( "org.apache.hadoop.mapred." )
832      || interfaceAssignableFromClassName( org.apache.hadoop.mapred.InputFormat.class, className );
833
834    boolean isNew = className.startsWith( "org.apache.hadoop.mapreduce." )
835      || interfaceAssignableFromClassName( org.apache.hadoop.mapreduce.InputFormat.class, className );
836
837    if( isStable )
838      conf.setBoolean( "mapred.mapper.new-api", false );
839    else if( isNew )
840      conf.setBoolean( "mapred.mapper.new-api", true );
841    else
842      throw new IllegalStateException( "cannot determine if class denotes stable or new api, please set 'mapred.mapper.new-api' to the appropriate value" );
843
844    return true;
845    }
846
847  public static void addInputPaths( Configuration conf, Iterable<Path> paths )
848    {
849    Path workingDirectory = getWorkingDirectory( conf );
850    String dirs = conf.get( "mapred.input.dir" );
851    StringBuilder buffer = new StringBuilder( dirs == null ? "" : dirs );
852
853    for( Path path : paths )
854      {
855      if( !path.isAbsolute() )
856        path = new Path( workingDirectory, path );
857
858      String dirStr = StringUtils.escapeString( path.toString() );
859
860      if( buffer.length() != 0 )
861        buffer.append( ',' );
862
863      buffer.append( dirStr );
864      }
865
866    conf.set( "mapred.input.dir", buffer.toString() );
867    }
868
869  public static void addInputPath( Configuration conf, Path path )
870    {
871    Path workingDirectory = getWorkingDirectory( conf );
872    path = new Path( workingDirectory, path );
873    String dirStr = StringUtils.escapeString( path.toString() );
874    String dirs = conf.get( "mapred.input.dir" );
875    conf.set( "mapred.input.dir", dirs == null ? dirStr :
876      dirs + StringUtils.COMMA_STR + dirStr );
877    }
878
879  public static void setOutputPath( Configuration conf, Path path )
880    {
881    Path workingDirectory = getWorkingDirectory( conf );
882    path = new Path( workingDirectory, path );
883    conf.set( "mapred.output.dir", path.toString() );
884    }
885
886  private static Path getWorkingDirectory( Configuration conf )
887    {
888    String name = conf.get( "mapred.working.dir" );
889    if( name != null )
890      {
891      return new Path( name );
892      }
893    else
894      {
895      try
896        {
897        Path dir = FileSystem.get( conf ).getWorkingDirectory();
898        conf.set( "mapred.working.dir", dir.toString() );
899        return dir;
900        }
901      catch( IOException e )
902        {
903        throw new RuntimeException( e );
904        }
905      }
906    }
907
908  public static Path getOutputPath( Configuration conf )
909    {
910    String name = conf.get( "mapred.output.dir" );
911    return name == null ? null : new Path( name );
912    }
913
914  public static String pack( Object object, Configuration conf )
915    {
916    if( object == null )
917      return "";
918
919    try
920      {
921      return serializeBase64( object, conf, true );
922      }
923    catch( IOException exception )
924      {
925      throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception );
926      }
927    }
928
929  public static void addFields( Configuration conf, String property, Map<Integer, Fields> fields )
930    {
931    if( fields == null || fields.isEmpty() )
932      return;
933
934    Map<String, Fields> toPack = new HashMap<>();
935
936    for( Map.Entry<Integer, Fields> entry : fields.entrySet() )
937      toPack.put( entry.getKey().toString(), entry.getValue() );
938
939    conf.set( property, pack( toPack, conf ) );
940    }
941
942  public static Map<Integer, Fields> getFields( Configuration conf, String property ) throws IOException
943    {
944    String value = conf.getRaw( property );
945
946    if( value == null || value.isEmpty() )
947      return Collections.emptyMap();
948
949    Map<String, Fields> map = deserializeBase64( value, conf, Map.class, true );
950    Map<Integer, Fields> result = new HashMap<>();
951
952    for( Map.Entry<String, Fields> entry : map.entrySet() )
953      result.put( Integer.parseInt( entry.getKey() ), entry.getValue() );
954
955    return result;
956    }
957
958  public static void addComparators( Configuration conf, String property, Map<String, Fields> map, BaseFlowStep flowStep, Group group )
959    {
960    Iterator<Fields> fieldsIterator = map.values().iterator();
961
962    if( !fieldsIterator.hasNext() )
963      return;
964
965    Fields fields = fieldsIterator.next();
966
967    if( fields.hasComparators() )
968      {
969      conf.set( property, pack( fields, conf ) );
970      return;
971      }
972
973    // use resolved fields if there are no comparators.
974    Set<Scope> previousScopes = flowStep.getPreviousScopes( group );
975
976    fields = previousScopes.iterator().next().getOutValuesFields();
977
978    if( fields.size() != 0 ) // allows fields.UNKNOWN to be used
979      conf.setInt( property + ".size", fields.size() );
980    }
981
982  public static void addComparators( Configuration conf, String property, Map<String, Fields> map, Fields resolvedFields )
983    {
984    Iterator<Fields> fieldsIterator = map.values().iterator();
985
986    if( !fieldsIterator.hasNext() )
987      return;
988
989    while( fieldsIterator.hasNext() )
990      {
991      Fields fields = fieldsIterator.next();
992
993      if( fields.hasComparators() )
994        {
995        conf.set( property, pack( fields, conf ) );
996        return;
997        }
998      }
999
1000    if( resolvedFields.size() != 0 ) // allows fields.UNKNOWN to be used
1001      conf.setInt( property + ".size", resolvedFields.size() );
1002    }
1003  }