001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.planner;
022
023import java.net.URI;
024import java.util.Map;
025import java.util.Properties;
026import java.util.Set;
027
028import cascading.flow.FlowConnector;
029import cascading.flow.FlowConnectorProps;
030import cascading.flow.FlowDef;
031import cascading.flow.FlowStep;
032import cascading.flow.hadoop.HadoopFlow;
033import cascading.flow.hadoop.HadoopFlowStep;
034import cascading.flow.hadoop.util.HadoopUtil;
035import cascading.flow.planner.BaseFlowStepFactory;
036import cascading.flow.planner.FlowPlanner;
037import cascading.flow.planner.PlannerInfo;
038import cascading.flow.planner.PlatformInfo;
039import cascading.flow.planner.graph.ElementGraph;
040import cascading.flow.planner.process.FlowNodeGraph;
041import cascading.flow.planner.process.FlowStepFactory;
042import cascading.flow.planner.rule.RuleRegistry;
043import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory;
044import cascading.property.AppProps;
045import cascading.property.PropertyUtil;
046import cascading.tap.Tap;
047import cascading.tap.hadoop.DistCacheTap;
048import cascading.tap.hadoop.Hfs;
049import cascading.tap.hadoop.util.TempHfs;
050import cascading.util.Util;
051import org.apache.hadoop.conf.Configuration;
052import org.apache.hadoop.mapred.JobConf;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * Class HadoopPlanner is the core Hadoop MapReduce planner used by default through a {@link cascading.flow.FlowConnector}
058 * sub-class.
059 * <p/>
060 * Notes:
061 * <p/>
062 * <strong>Custom JobConf properties</strong><br/>
063 * A custom JobConf instance can be passed to this planner by calling {@link #copyJobConf(java.util.Map, org.apache.hadoop.mapred.JobConf)}
064 * on a map properties object before constructing a new {@link cascading.flow.FlowConnector} sub-class.
065 * <p/>
066 * A better practice would be to set Hadoop properties directly on the map properties object handed to the FlowConnector.
067 * All values in the map will be passed to a new default JobConf instance to be used as defaults for all resulting
068 * Flow instances.
069 * <p/>
070 * For example, {@code properties.set("mapred.child.java.opts","-Xmx512m");} would convince Hadoop
071 * to spawn all child jvms with a heap of 512MB.
072 */
073public class HadoopPlanner extends FlowPlanner<HadoopFlow, JobConf>
074  {
075  /** Field LOG */
076  private static final Logger LOG = LoggerFactory.getLogger( HadoopPlanner.class );
077
078  public static final String PLATFORM_NAME = "hadoop";
079
080  /** Field jobConf */
081  private JobConf defaultJobConf;
082  /** Field intermediateSchemeClass */
083  private Class intermediateSchemeClass;
084
085  /**
086   * Method copyJobConf adds the given JobConf values to the given properties object. Use this method to pass
087   * custom default Hadoop JobConf properties to Hadoop.
088   *
089   * @param properties of type Map
090   * @param jobConf    of type JobConf
091   */
092  public static void copyJobConf( Map<Object, Object> properties, JobConf jobConf )
093    {
094    for( Map.Entry<String, String> entry : jobConf )
095      properties.put( entry.getKey(), entry.getValue() );
096    }
097
098  /**
099   * Method createJobConf returns a new JobConf instance using the values in the given properties argument.
100   *
101   * @param properties of type Map
102   * @return a JobConf instance
103   */
104  public static JobConf createJobConf( Map<Object, Object> properties )
105    {
106    JobConf conf = new JobConf();
107
108    copyProperties( conf, properties );
109
110    return conf;
111    }
112
113  /**
114   * Method copyProperties adds the given Map values to the given JobConf object.
115   *
116   * @param jobConf    of type JobConf
117   * @param properties of type Map
118   */
119  public static void copyProperties( JobConf jobConf, Map<Object, Object> properties )
120    {
121    if( properties instanceof Properties )
122      {
123      Properties props = (Properties) properties;
124      Set<String> keys = props.stringPropertyNames();
125
126      for( String key : keys )
127        jobConf.set( key, props.getProperty( key ) );
128      }
129    else
130      {
131      for( Map.Entry<Object, Object> entry : properties.entrySet() )
132        {
133        if( entry.getValue() != null )
134          jobConf.set( entry.getKey().toString(), entry.getValue().toString() );
135        }
136      }
137    }
138
139  @Override
140  public PlannerInfo getPlannerInfo( String registryName )
141    {
142    return new PlannerInfo( getClass().getSimpleName(), PLATFORM_NAME, registryName );
143    }
144
145  @Override
146  public JobConf getDefaultConfig()
147    {
148    return defaultJobConf;
149    }
150
151  @Override
152  public PlatformInfo getPlatformInfo()
153    {
154    return HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" );
155    }
156
157  @Override
158  public void initialize( FlowConnector flowConnector, Map<Object, Object> properties )
159    {
160    super.initialize( flowConnector, properties );
161
162    defaultJobConf = HadoopUtil.createJobConf( properties, createJobConf( properties ) );
163    checkPlatform( defaultJobConf );
164    intermediateSchemeClass = flowConnector.getIntermediateSchemeClass( properties );
165
166    Class type = AppProps.getApplicationJarClass( properties );
167    if( defaultJobConf.getJar() == null && type != null )
168      defaultJobConf.setJarByClass( type );
169
170    String path = AppProps.getApplicationJarPath( properties );
171    if( defaultJobConf.getJar() == null && path != null )
172      defaultJobConf.setJar( path );
173
174    if( defaultJobConf.getJar() == null )
175      defaultJobConf.setJarByClass( HadoopUtil.findMainClass( HadoopPlanner.class ) );
176
177    AppProps.setApplicationJarPath( properties, defaultJobConf.getJar() );
178
179    LOG.info( "using application jar: {}", defaultJobConf.getJar() );
180    }
181
182  @Override
183  public void configRuleRegistryDefaults( RuleRegistry ruleRegistry )
184    {
185    super.configRuleRegistryDefaults( ruleRegistry );
186
187    ruleRegistry.addDefaultElementFactory( IntermediateTapElementFactory.TEMP_TAP, new TempTapElementFactory() );
188
189    if( PropertyUtil.getBooleanProperty( getDefaultProperties(), FlowConnectorProps.ENABLE_DECORATE_ACCUMULATED_TAP, true ) )
190      ruleRegistry.addDefaultElementFactory( IntermediateTapElementFactory.ACCUMULATED_TAP, new TempTapElementFactory( DistCacheTap.class.getName() ) );
191    }
192
193  protected void checkPlatform( Configuration conf )
194    {
195    if( HadoopUtil.isYARN( conf ) )
196      LOG.warn( "running YARN based flows on Hadoop 1.x may cause problems, please use the 'cascading-hadoop2-mr1' dependencies" );
197    }
198
199  @Override
200  protected HadoopFlow createFlow( FlowDef flowDef )
201    {
202    return new HadoopFlow( getPlatformInfo(), getDefaultProperties(), getDefaultConfig(), flowDef );
203    }
204
205  @Override
206  public FlowStepFactory<JobConf> getFlowStepFactory()
207    {
208    return new BaseFlowStepFactory<JobConf>( getFlowNodeFactory() )
209      {
210      @Override
211      public FlowStep<JobConf> createFlowStep( ElementGraph stepElementGraph, FlowNodeGraph flowNodeGraph )
212        {
213        return new HadoopFlowStep( stepElementGraph, flowNodeGraph );
214        }
215      };
216    }
217
218  public URI getDefaultURIScheme( Tap tap )
219    {
220    return ( (Hfs) tap ).getDefaultFileSystemURIScheme( defaultJobConf );
221    }
222
223  public URI getURIScheme( Tap tap )
224    {
225    return ( (Hfs) tap ).getURIScheme( defaultJobConf );
226    }
227
228  @Override
229  protected Tap makeTempTap( String prefix, String name )
230    {
231    // must give Taps unique names
232    return new TempHfs( defaultJobConf, Util.makePath( prefix, name ), intermediateSchemeClass, prefix == null );
233    }
234  }