001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.planner;
022
023import java.net.URI;
024import java.util.Map;
025import java.util.Properties;
026import java.util.Set;
027
028import cascading.flow.FlowConnector;
029import cascading.flow.FlowDef;
030import cascading.flow.FlowStep;
031import cascading.flow.hadoop.HadoopFlow;
032import cascading.flow.hadoop.HadoopFlowStep;
033import cascading.flow.hadoop.util.HadoopUtil;
034import cascading.flow.planner.FlowPlanner;
035import cascading.flow.planner.PlannerInfo;
036import cascading.flow.planner.PlatformInfo;
037import cascading.flow.planner.graph.ElementGraph;
038import cascading.flow.planner.process.FlowNodeGraph;
039import cascading.flow.planner.rule.RuleRegistry;
040import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory;
041import cascading.property.AppProps;
042import cascading.tap.Tap;
043import cascading.tap.hadoop.Hfs;
044import cascading.tap.hadoop.util.TempHfs;
045import cascading.util.Util;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.mapred.JobConf;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * Class HadoopPlanner is the core Hadoop MapReduce planner used by default through a {@link cascading.flow.FlowConnector}
053 * sub-class.
054 * <p/>
055 * Notes:
056 * <p/>
057 * <strong>Custom JobConf properties</strong><br/>
058 * A custom JobConf instance can be passed to this planner by calling {@link #copyJobConf(java.util.Map, org.apache.hadoop.mapred.JobConf)}
059 * on a map properties object before constructing a new {@link cascading.flow.FlowConnector} sub-class.
060 * <p/>
061 * A better practice would be to set Hadoop properties directly on the map properties object handed to the FlowConnector.
062 * All values in the map will be passed to a new default JobConf instance to be used as defaults for all resulting
063 * Flow instances.
064 * <p/>
065 * For example, {@code properties.set("mapred.child.java.opts","-Xmx512m");} would convince Hadoop
066 * to spawn all child jvms with a heap of 512MB.
067 */
068public class HadoopPlanner extends FlowPlanner<HadoopFlow, JobConf>
069  {
070  /** Field LOG */
071  private static final Logger LOG = LoggerFactory.getLogger( HadoopPlanner.class );
072
073  public static final String PLATFORM_NAME = "hadoop";
074
075  /** Field jobConf */
076  private JobConf defaultJobConf;
077  /** Field intermediateSchemeClass */
078  private Class intermediateSchemeClass;
079
080  /**
081   * Method copyJobConf adds the given JobConf values to the given properties object. Use this method to pass
082   * custom default Hadoop JobConf properties to Hadoop.
083   *
084   * @param properties of type Map
085   * @param jobConf    of type JobConf
086   */
087  public static void copyJobConf( Map<Object, Object> properties, JobConf jobConf )
088    {
089    for( Map.Entry<String, String> entry : jobConf )
090      properties.put( entry.getKey(), entry.getValue() );
091    }
092
093  /**
094   * Method createJobConf returns a new JobConf instance using the values in the given properties argument.
095   *
096   * @param properties of type Map
097   * @return a JobConf instance
098   */
099  public static JobConf createJobConf( Map<Object, Object> properties )
100    {
101    JobConf conf = new JobConf();
102
103    copyProperties( conf, properties );
104
105    return conf;
106    }
107
108  /**
109   * Method copyProperties adds the given Map values to the given JobConf object.
110   *
111   * @param jobConf    of type JobConf
112   * @param properties of type Map
113   */
114  public static void copyProperties( JobConf jobConf, Map<Object, Object> properties )
115    {
116    if( properties instanceof Properties )
117      {
118      Properties props = (Properties) properties;
119      Set<String> keys = props.stringPropertyNames();
120
121      for( String key : keys )
122        jobConf.set( key, props.getProperty( key ) );
123      }
124    else
125      {
126      for( Map.Entry<Object, Object> entry : properties.entrySet() )
127        {
128        if( entry.getValue() != null )
129          jobConf.set( entry.getKey().toString(), entry.getValue().toString() );
130        }
131      }
132    }
133
134  @Override
135  public PlannerInfo getPlannerInfo( String registryName )
136    {
137    return new PlannerInfo( getClass().getSimpleName(), PLATFORM_NAME, registryName );
138    }
139
140  @Override
141  public JobConf getDefaultConfig()
142    {
143    return defaultJobConf;
144    }
145
146  @Override
147  public PlatformInfo getPlatformInfo()
148    {
149    return HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" );
150    }
151
152  @Override
153  public void initialize( FlowConnector flowConnector, Map<Object, Object> properties )
154    {
155    super.initialize( flowConnector, properties );
156
157    defaultJobConf = HadoopUtil.createJobConf( properties, createJobConf( properties ) );
158    checkPlatform( defaultJobConf );
159    intermediateSchemeClass = flowConnector.getIntermediateSchemeClass( properties );
160
161    Class type = AppProps.getApplicationJarClass( properties );
162    if( defaultJobConf.getJar() == null && type != null )
163      defaultJobConf.setJarByClass( type );
164
165    String path = AppProps.getApplicationJarPath( properties );
166    if( defaultJobConf.getJar() == null && path != null )
167      defaultJobConf.setJar( path );
168
169    if( defaultJobConf.getJar() == null )
170      defaultJobConf.setJarByClass( HadoopUtil.findMainClass( HadoopPlanner.class ) );
171
172    AppProps.setApplicationJarPath( properties, defaultJobConf.getJar() );
173
174    LOG.info( "using application jar: {}", defaultJobConf.getJar() );
175    }
176
177  @Override
178  public void configRuleRegistryDefaults( RuleRegistry ruleRegistry )
179    {
180    super.configRuleRegistryDefaults( ruleRegistry );
181
182    ruleRegistry.addDefaultElementFactory( IntermediateTapElementFactory.TEMP_TAP, new TempTapElementFactory() );
183    }
184
185  protected void checkPlatform( Configuration conf )
186    {
187    if( HadoopUtil.isYARN( conf ) )
188      LOG.warn( "running YARN based flows on Hadoop 1.x may cause problems, please use the 'cascading-hadoop2-mr1' dependencies" );
189    }
190
191  @Override
192  protected HadoopFlow createFlow( FlowDef flowDef )
193    {
194    return new HadoopFlow( getPlatformInfo(), getDefaultProperties(), getDefaultConfig(), flowDef );
195    }
196
197  public FlowStep<JobConf> createFlowStep( ElementGraph stepElementGraph, FlowNodeGraph flowNodeGraph )
198    {
199    return new HadoopFlowStep( stepElementGraph, flowNodeGraph );
200    }
201
202  public URI getDefaultURIScheme( Tap tap )
203    {
204    return ( (Hfs) tap ).getDefaultFileSystemURIScheme( defaultJobConf );
205    }
206
207  public URI getURIScheme( Tap tap )
208    {
209    return ( (Hfs) tap ).getURIScheme( defaultJobConf );
210    }
211
212  @Override
213  protected Tap makeTempTap( String prefix, String name )
214    {
215    // must give Taps unique names
216    return new TempHfs( defaultJobConf, Util.makePath( prefix, name ), intermediateSchemeClass, prefix == null );
217    }
218  }