001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.planner; 022 023import java.net.URI; 024import java.util.Map; 025import java.util.Properties; 026import java.util.Set; 027 028import cascading.flow.FlowConnector; 029import cascading.flow.FlowDef; 030import cascading.flow.FlowStep; 031import cascading.flow.hadoop.HadoopFlow; 032import cascading.flow.hadoop.HadoopFlowStep; 033import cascading.flow.hadoop.util.HadoopUtil; 034import cascading.flow.planner.FlowPlanner; 035import cascading.flow.planner.PlannerInfo; 036import cascading.flow.planner.PlatformInfo; 037import cascading.flow.planner.graph.ElementGraph; 038import cascading.flow.planner.process.FlowNodeGraph; 039import cascading.flow.planner.rule.RuleRegistry; 040import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory; 041import cascading.property.AppProps; 042import cascading.tap.Tap; 043import cascading.tap.hadoop.Hfs; 044import cascading.tap.hadoop.util.TempHfs; 045import cascading.util.Util; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.mapred.JobConf; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * Class HadoopPlanner is the core Hadoop MapReduce planner used by default through a {@link cascading.flow.FlowConnector} 053 * sub-class. 054 * <p/> 055 * Notes: 056 * <p/> 057 * <strong>Custom JobConf properties</strong><br/> 058 * A custom JobConf instance can be passed to this planner by calling {@link #copyJobConf(java.util.Map, org.apache.hadoop.mapred.JobConf)} 059 * on a map properties object before constructing a new {@link cascading.flow.FlowConnector} sub-class. 060 * <p/> 061 * A better practice would be to set Hadoop properties directly on the map properties object handed to the FlowConnector. 062 * All values in the map will be passed to a new default JobConf instance to be used as defaults for all resulting 063 * Flow instances. 064 * <p/> 065 * For example, {@code properties.set("mapred.child.java.opts","-Xmx512m");} would convince Hadoop 066 * to spawn all child jvms with a heap of 512MB. 067 */ 068public class HadoopPlanner extends FlowPlanner<HadoopFlow, JobConf> 069 { 070 /** Field LOG */ 071 private static final Logger LOG = LoggerFactory.getLogger( HadoopPlanner.class ); 072 073 public static final String PLATFORM_NAME = "hadoop"; 074 075 /** Field jobConf */ 076 private JobConf defaultJobConf; 077 /** Field intermediateSchemeClass */ 078 private Class intermediateSchemeClass; 079 080 /** 081 * Method copyJobConf adds the given JobConf values to the given properties object. Use this method to pass 082 * custom default Hadoop JobConf properties to Hadoop. 083 * 084 * @param properties of type Map 085 * @param jobConf of type JobConf 086 */ 087 public static void copyJobConf( Map<Object, Object> properties, JobConf jobConf ) 088 { 089 for( Map.Entry<String, String> entry : jobConf ) 090 properties.put( entry.getKey(), entry.getValue() ); 091 } 092 093 /** 094 * Method createJobConf returns a new JobConf instance using the values in the given properties argument. 095 * 096 * @param properties of type Map 097 * @return a JobConf instance 098 */ 099 public static JobConf createJobConf( Map<Object, Object> properties ) 100 { 101 JobConf conf = new JobConf(); 102 103 copyProperties( conf, properties ); 104 105 return conf; 106 } 107 108 /** 109 * Method copyProperties adds the given Map values to the given JobConf object. 110 * 111 * @param jobConf of type JobConf 112 * @param properties of type Map 113 */ 114 public static void copyProperties( JobConf jobConf, Map<Object, Object> properties ) 115 { 116 if( properties instanceof Properties ) 117 { 118 Properties props = (Properties) properties; 119 Set<String> keys = props.stringPropertyNames(); 120 121 for( String key : keys ) 122 jobConf.set( key, props.getProperty( key ) ); 123 } 124 else 125 { 126 for( Map.Entry<Object, Object> entry : properties.entrySet() ) 127 { 128 if( entry.getValue() != null ) 129 jobConf.set( entry.getKey().toString(), entry.getValue().toString() ); 130 } 131 } 132 } 133 134 @Override 135 public PlannerInfo getPlannerInfo( String registryName ) 136 { 137 return new PlannerInfo( getClass().getSimpleName(), PLATFORM_NAME, registryName ); 138 } 139 140 @Override 141 public JobConf getDefaultConfig() 142 { 143 return defaultJobConf; 144 } 145 146 @Override 147 public PlatformInfo getPlatformInfo() 148 { 149 return HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ); 150 } 151 152 @Override 153 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 154 { 155 super.initialize( flowConnector, properties ); 156 157 defaultJobConf = HadoopUtil.createJobConf( properties, createJobConf( properties ) ); 158 checkPlatform( defaultJobConf ); 159 intermediateSchemeClass = flowConnector.getIntermediateSchemeClass( properties ); 160 161 Class type = AppProps.getApplicationJarClass( properties ); 162 if( defaultJobConf.getJar() == null && type != null ) 163 defaultJobConf.setJarByClass( type ); 164 165 String path = AppProps.getApplicationJarPath( properties ); 166 if( defaultJobConf.getJar() == null && path != null ) 167 defaultJobConf.setJar( path ); 168 169 if( defaultJobConf.getJar() == null ) 170 defaultJobConf.setJarByClass( HadoopUtil.findMainClass( HadoopPlanner.class ) ); 171 172 AppProps.setApplicationJarPath( properties, defaultJobConf.getJar() ); 173 174 LOG.info( "using application jar: {}", defaultJobConf.getJar() ); 175 } 176 177 @Override 178 public void configRuleRegistryDefaults( RuleRegistry ruleRegistry ) 179 { 180 super.configRuleRegistryDefaults( ruleRegistry ); 181 182 ruleRegistry.addDefaultElementFactory( IntermediateTapElementFactory.TEMP_TAP, new TempTapElementFactory() ); 183 } 184 185 protected void checkPlatform( Configuration conf ) 186 { 187 if( HadoopUtil.isYARN( conf ) ) 188 LOG.warn( "running YARN based flows on Hadoop 1.x may cause problems, please use the 'cascading-hadoop2-mr1' dependencies" ); 189 } 190 191 @Override 192 protected HadoopFlow createFlow( FlowDef flowDef ) 193 { 194 return new HadoopFlow( getPlatformInfo(), getDefaultProperties(), getDefaultConfig(), flowDef ); 195 } 196 197 public FlowStep<JobConf> createFlowStep( ElementGraph stepElementGraph, FlowNodeGraph flowNodeGraph ) 198 { 199 return new HadoopFlowStep( stepElementGraph, flowNodeGraph ); 200 } 201 202 public URI getDefaultURIScheme( Tap tap ) 203 { 204 return ( (Hfs) tap ).getDefaultFileSystemURIScheme( defaultJobConf ); 205 } 206 207 public URI getURIScheme( Tap tap ) 208 { 209 return ( (Hfs) tap ).getURIScheme( defaultJobConf ); 210 } 211 212 @Override 213 protected Tap makeTempTap( String prefix, String name ) 214 { 215 // must give Taps unique names 216 return new TempHfs( defaultJobConf, Util.makePath( prefix, name ), intermediateSchemeClass, prefix == null ); 217 } 218 }