001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.planner; 022 023import java.net.URI; 024import java.util.Map; 025import java.util.Properties; 026import java.util.Set; 027 028import cascading.flow.FlowConnector; 029import cascading.flow.FlowConnectorProps; 030import cascading.flow.FlowDef; 031import cascading.flow.FlowStep; 032import cascading.flow.hadoop.HadoopFlow; 033import cascading.flow.hadoop.HadoopFlowStep; 034import cascading.flow.hadoop.util.HadoopUtil; 035import cascading.flow.planner.BaseFlowStepFactory; 036import cascading.flow.planner.FlowPlanner; 037import cascading.flow.planner.PlannerInfo; 038import cascading.flow.planner.PlatformInfo; 039import cascading.flow.planner.graph.ElementGraph; 040import cascading.flow.planner.process.FlowNodeGraph; 041import cascading.flow.planner.process.FlowStepFactory; 042import cascading.flow.planner.rule.RuleRegistry; 043import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory; 044import cascading.property.AppProps; 045import cascading.property.PropertyUtil; 046import cascading.tap.Tap; 047import cascading.tap.hadoop.DistCacheTap; 048import cascading.tap.hadoop.Hfs; 049import cascading.tap.hadoop.util.TempHfs; 050import cascading.util.Util; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.mapred.JobConf; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * Class HadoopPlanner is the core Hadoop MapReduce planner used by default through a {@link cascading.flow.FlowConnector} 058 * sub-class. 059 * <p/> 060 * Notes: 061 * <p/> 062 * <strong>Custom JobConf properties</strong><br/> 063 * A custom JobConf instance can be passed to this planner by calling {@link #copyJobConf(java.util.Map, org.apache.hadoop.mapred.JobConf)} 064 * on a map properties object before constructing a new {@link cascading.flow.FlowConnector} sub-class. 065 * <p/> 066 * A better practice would be to set Hadoop properties directly on the map properties object handed to the FlowConnector. 067 * All values in the map will be passed to a new default JobConf instance to be used as defaults for all resulting 068 * Flow instances. 069 * <p/> 070 * For example, {@code properties.set("mapred.child.java.opts","-Xmx512m");} would convince Hadoop 071 * to spawn all child jvms with a heap of 512MB. 072 */ 073public class HadoopPlanner extends FlowPlanner<HadoopFlow, JobConf> 074 { 075 /** Field LOG */ 076 private static final Logger LOG = LoggerFactory.getLogger( HadoopPlanner.class ); 077 078 public static final String PLATFORM_NAME = "hadoop"; 079 080 /** Field jobConf */ 081 private JobConf defaultJobConf; 082 /** Field intermediateSchemeClass */ 083 private Class intermediateSchemeClass; 084 085 /** 086 * Method copyJobConf adds the given JobConf values to the given properties object. Use this method to pass 087 * custom default Hadoop JobConf properties to Hadoop. 088 * 089 * @param properties of type Map 090 * @param jobConf of type JobConf 091 */ 092 public static void copyJobConf( Map<Object, Object> properties, JobConf jobConf ) 093 { 094 for( Map.Entry<String, String> entry : jobConf ) 095 properties.put( entry.getKey(), entry.getValue() ); 096 } 097 098 /** 099 * Method createJobConf returns a new JobConf instance using the values in the given properties argument. 100 * 101 * @param properties of type Map 102 * @return a JobConf instance 103 */ 104 public static JobConf createJobConf( Map<Object, Object> properties ) 105 { 106 JobConf conf = new JobConf(); 107 108 copyProperties( conf, properties ); 109 110 return conf; 111 } 112 113 /** 114 * Method copyProperties adds the given Map values to the given JobConf object. 115 * 116 * @param jobConf of type JobConf 117 * @param properties of type Map 118 */ 119 public static void copyProperties( JobConf jobConf, Map<Object, Object> properties ) 120 { 121 if( properties instanceof Properties ) 122 { 123 Properties props = (Properties) properties; 124 Set<String> keys = props.stringPropertyNames(); 125 126 for( String key : keys ) 127 jobConf.set( key, props.getProperty( key ) ); 128 } 129 else 130 { 131 for( Map.Entry<Object, Object> entry : properties.entrySet() ) 132 { 133 if( entry.getValue() != null ) 134 jobConf.set( entry.getKey().toString(), entry.getValue().toString() ); 135 } 136 } 137 } 138 139 @Override 140 public PlannerInfo getPlannerInfo( String registryName ) 141 { 142 return new PlannerInfo( getClass().getSimpleName(), PLATFORM_NAME, registryName ); 143 } 144 145 @Override 146 public JobConf getDefaultConfig() 147 { 148 return defaultJobConf; 149 } 150 151 @Override 152 public PlatformInfo getPlatformInfo() 153 { 154 return HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ); 155 } 156 157 @Override 158 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 159 { 160 super.initialize( flowConnector, properties ); 161 162 defaultJobConf = HadoopUtil.createJobConf( properties, createJobConf( properties ) ); 163 checkPlatform( defaultJobConf ); 164 intermediateSchemeClass = flowConnector.getIntermediateSchemeClass( properties ); 165 166 Class type = AppProps.getApplicationJarClass( properties ); 167 if( defaultJobConf.getJar() == null && type != null ) 168 defaultJobConf.setJarByClass( type ); 169 170 String path = AppProps.getApplicationJarPath( properties ); 171 if( defaultJobConf.getJar() == null && path != null ) 172 defaultJobConf.setJar( path ); 173 174 if( defaultJobConf.getJar() == null ) 175 defaultJobConf.setJarByClass( HadoopUtil.findMainClass( HadoopPlanner.class ) ); 176 177 AppProps.setApplicationJarPath( properties, defaultJobConf.getJar() ); 178 179 LOG.info( "using application jar: {}", defaultJobConf.getJar() ); 180 } 181 182 @Override 183 public void configRuleRegistryDefaults( RuleRegistry ruleRegistry ) 184 { 185 super.configRuleRegistryDefaults( ruleRegistry ); 186 187 ruleRegistry.addDefaultElementFactory( IntermediateTapElementFactory.TEMP_TAP, new TempTapElementFactory() ); 188 189 if( PropertyUtil.getBooleanProperty( getDefaultProperties(), FlowConnectorProps.ENABLE_DECORATE_ACCUMULATED_TAP, true ) ) 190 ruleRegistry.addDefaultElementFactory( IntermediateTapElementFactory.ACCUMULATED_TAP, new TempTapElementFactory( DistCacheTap.class.getName() ) ); 191 } 192 193 protected void checkPlatform( Configuration conf ) 194 { 195 if( HadoopUtil.isYARN( conf ) ) 196 LOG.warn( "running YARN based flows on Hadoop 1.x may cause problems, please use the 'cascading-hadoop2-mr1' dependencies" ); 197 } 198 199 @Override 200 protected HadoopFlow createFlow( FlowDef flowDef ) 201 { 202 return new HadoopFlow( getPlatformInfo(), getDefaultProperties(), getDefaultConfig(), flowDef ); 203 } 204 205 @Override 206 public FlowStepFactory<JobConf> getFlowStepFactory() 207 { 208 return new BaseFlowStepFactory<JobConf>( getFlowNodeFactory() ) 209 { 210 @Override 211 public FlowStep<JobConf> createFlowStep( ElementGraph stepElementGraph, FlowNodeGraph flowNodeGraph ) 212 { 213 return new HadoopFlowStep( stepElementGraph, flowNodeGraph ); 214 } 215 }; 216 } 217 218 public URI getDefaultURIScheme( Tap tap ) 219 { 220 return ( (Hfs) tap ).getDefaultFileSystemURIScheme( defaultJobConf ); 221 } 222 223 public URI getURIScheme( Tap tap ) 224 { 225 return ( (Hfs) tap ).getURIScheme( defaultJobConf ); 226 } 227 228 @Override 229 protected Tap makeTempTap( String prefix, String name ) 230 { 231 // must give Taps unique names 232 return new TempHfs( defaultJobConf, Util.makePath( prefix, name ), intermediateSchemeClass, prefix == null ); 233 } 234 }