001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop2;
022
023import java.util.Map;
024import java.util.Properties;
025import java.util.Set;
026
027import cascading.flow.hadoop.planner.HadoopPlanner;
028import cascading.flow.hadoop.util.HadoopUtil;
029import cascading.flow.planner.PlannerInfo;
030import org.apache.hadoop.conf.Configuration;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Class Hadoop2MR1Planner is the core Hadoop MapReduce planner used by default through the {@link cascading.flow.hadoop2.Hadoop2MR1FlowConnector}.
036 * <p/>
037 * Notes:
038 * <p/>
039 * <strong>Custom JobConf properties</strong><br/>
040 * A custom JobConf instance can be passed to this planner by calling {@link #copyJobConf(java.util.Map, org.apache.hadoop.mapred.JobConf)}
041 * on a map properties object before constructing a new {@link cascading.flow.hadoop2.Hadoop2MR1FlowConnector}.
042 * <p/>
043 * A better practice would be to set Hadoop properties directly on the map properties object handed to the FlowConnector.
044 * All values in the map will be passed to a new default JobConf instance to be used as defaults for all resulting
045 * Flow instances.
046 * <p/>
047 * For example, {@code properties.set("mapred.child.java.opts","-Xmx512m");} would convince Hadoop
048 * to spawn all child jvms with a heap of 512MB.
049 */
050public class Hadoop2MR1Planner extends HadoopPlanner
051  {
052  /** Field LOG */
053  private static final Logger LOG = LoggerFactory.getLogger( Hadoop2MR1Planner.class );
054
055  public static final String PLATFORM_NAME = "hadoop2-mr1";
056
057  /**
058   * Method copyJobConf adds the given JobConf values to the given properties object. Use this method to pass
059   * custom default Hadoop JobConf properties to Hadoop.
060   *
061   * @param properties    of type Map
062   * @param configuration of type JobConf
063   */
064  public static void copyConfiguration( Map<Object, Object> properties, Configuration configuration )
065    {
066    for( Map.Entry<String, String> entry : configuration )
067      properties.put( entry.getKey(), entry.getValue() );
068    }
069
070  /**
071   * Method copyProperties adds the given Map values to the given JobConf object.
072   *
073   * @param configuration of type JobConf
074   * @param properties    of type Map
075   */
076  public static void copyProperties( Configuration configuration, Map<Object, Object> properties )
077    {
078    if( properties instanceof Properties )
079      {
080      Properties props = (Properties) properties;
081      Set<String> keys = props.stringPropertyNames();
082
083      for( String key : keys )
084        configuration.set( key, props.getProperty( key ) );
085      }
086    else
087      {
088      for( Map.Entry<Object, Object> entry : properties.entrySet() )
089        {
090        if( entry.getValue() != null )
091          configuration.set( entry.getKey().toString(), entry.getValue().toString() );
092        }
093      }
094    }
095
096  @Override
097  public PlannerInfo getPlannerInfo( String registryName )
098    {
099    return new PlannerInfo( getClass().getSimpleName(), PLATFORM_NAME, registryName );
100    }
101
102  @Override
103  protected void checkPlatform( Configuration conf )
104    {
105    if( !HadoopUtil.isYARN( conf ) )
106      LOG.warn( "running Hadoop 1.x based flows on YARN may cause problems, please use the 'cascading-hadoop' dependencies" );
107    }
108  }