001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.platform.hadoop2; 022 023 import java.io.File; 024 import java.io.IOException; 025 import java.util.Map; 026 027 import cascading.flow.FlowConnector; 028 import cascading.flow.FlowProps; 029 import cascading.flow.hadoop2.Hadoop2MR1FlowConnector; 030 import cascading.flow.hadoop2.Hadoop2MR1Planner; 031 import cascading.platform.hadoop.BaseHadoopPlatform; 032 import cascading.util.Util; 033 import org.apache.hadoop.fs.FileSystem; 034 import org.apache.hadoop.hdfs.MiniDFSCluster; 035 import org.apache.hadoop.mapred.JobConf; 036 import org.apache.hadoop.mapred.MiniMRClientCluster; 037 import org.apache.hadoop.mapred.MiniMRClientClusterFactory; 038 import org.slf4j.Logger; 039 import org.slf4j.LoggerFactory; 040 041 /** 042 * Class Hadoop2Platform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance 043 * so that all *PlatformTest classes can be tested against Apache Hadoop 2.x. 044 */ 045 public class Hadoop2MR1Platform extends BaseHadoopPlatform 046 { 047 private static final Logger LOG = LoggerFactory.getLogger( Hadoop2MR1Platform.class ); 048 private transient static MiniDFSCluster dfs; 049 private transient static MiniMRClientCluster mr; 050 051 public Hadoop2MR1Platform() 052 { 053 } 054 055 @Override 056 public String getName() 057 { 058 return "hadoop2-mr1"; 059 } 060 061 @Override 062 public FlowConnector getFlowConnector( Map<Object, Object> properties ) 063 { 064 return new Hadoop2MR1FlowConnector( properties ); 065 } 066 067 @Override 068 public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks ) 069 { 070 properties.put( "mapreduce.job.maps", Integer.toString( numMapTasks ) ); 071 } 072 073 @Override 074 public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks ) 075 { 076 properties.put( "mapreduce.job.reduces", Integer.toString( numReduceTasks ) ); 077 } 078 079 @Override 080 public Integer getNumMapTasks( Map<Object, Object> properties ) 081 { 082 if( properties.get( "mapreduce.job.maps" ) == null ) 083 return null; 084 085 return Integer.parseInt( properties.get( "mapreduce.job.maps" ).toString() ); 086 } 087 088 @Override 089 public Integer getNumReduceTasks( Map<Object, Object> properties ) 090 { 091 if( properties.get( "mapreduce.job.reduces" ) == null ) 092 return null; 093 094 return Integer.parseInt( properties.get( "mapreduce.job.reduces" ).toString() ); 095 } 096 097 @Override 098 public synchronized void setUp() throws IOException 099 { 100 if( jobConf != null ) 101 return; 102 103 if( !isUseCluster() ) 104 { 105 LOG.info( "not using cluster" ); 106 jobConf = new JobConf(); 107 108 // enforce settings to make local mode behave the same across distributions 109 jobConf.set( "fs.defaultFS", "file:///" ); 110 jobConf.set( "mapreduce.framework.name", "local" ); 111 112 String stagingDir = jobConf.get( "mapreduce.jobtracker.staging.root.dir" ); 113 114 if( Util.isEmpty( stagingDir ) ) 115 jobConf.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/build/tmp/cascading/staging" ); 116 117 fileSys = FileSystem.get( jobConf ); 118 } 119 else 120 { 121 LOG.info( "using cluster" ); 122 123 if( Util.isEmpty( System.getProperty( "hadoop.log.dir" ) ) ) 124 System.setProperty( "hadoop.log.dir", "build/test/log" ); 125 126 if( Util.isEmpty( System.getProperty( "hadoop.tmp.dir" ) ) ) 127 System.setProperty( "hadoop.tmp.dir", "build/test/tmp" ); 128 129 new File( System.getProperty( "hadoop.log.dir" ) ).mkdirs(); // ignored 130 131 JobConf conf = new JobConf(); 132 133 if( !Util.isEmpty( System.getProperty( "mapred.jar" ) ) ) 134 { 135 LOG.info( "using a remote cluster with jar: {}", System.getProperty( "mapred.jar" ) ); 136 jobConf = conf; 137 138 ( (JobConf) jobConf ).setJar( System.getProperty( "mapred.jar" ) ); 139 140 if( !Util.isEmpty( System.getProperty( "fs.default.name" ) ) ) 141 { 142 LOG.info( "using {}={}", "fs.default.name", System.getProperty( "fs.default.name" ) ); 143 jobConf.set( "fs.default.name", System.getProperty( "fs.default.name" ) ); 144 } 145 146 if( !Util.isEmpty( System.getProperty( "fs.defaultFS" ) ) ) 147 { 148 LOG.info( "using {}={}", "fs.defaultFS", System.getProperty( "fs.defaultFS" ) ); 149 jobConf.set( "fs.defaultFS", System.getProperty( "fs.defaultFS" ) ); 150 } 151 152 if( !Util.isEmpty( System.getProperty( "yarn.resourcemanager.address" ) ) ) 153 { 154 LOG.info( "using {}={}", "yarn.resourcemanager.address", System.getProperty( "yarn.resourcemanager.address" ) ); 155 jobConf.set( "yarn.resourcemanager.address", System.getProperty( "yarn.resourcemanager.address" ) ); 156 } 157 158 if( !Util.isEmpty( System.getProperty( "mapreduce.jobhistory.address" ) ) ) 159 { 160 LOG.info( "using {}={}", "mapreduce.jobhistory.address", System.getProperty( "mapreduce.jobhistory.address" ) ); 161 jobConf.set( "mapreduce.jobhistory.address", System.getProperty( "mapreduce.jobhistory.address" ) ); 162 } 163 164 jobConf.set( "mapreduce.user.classpath.first", "true" ); // use test dependencies 165 jobConf.set( "mapreduce.framework.name", "yarn" ); 166 fileSys = FileSystem.get( jobConf ); 167 } 168 else 169 { 170 conf.setBoolean( "yarn.is.minicluster", true ); 171 // conf.setInt( "yarn.nodemanager.delete.debug-delay-sec", -1 ); 172 // conf.set( "yarn.scheduler.capacity.root.queues", "default" ); 173 // conf.set( "yarn.scheduler.capacity.root.default.capacity", "100" ); 174 // disable blacklisting hosts not to fail localhost during unit tests 175 conf.setBoolean( "yarn.app.mapreduce.am.job.node-blacklisting.enable", false ); 176 177 dfs = new MiniDFSCluster( conf, 4, true, null ); 178 fileSys = dfs.getFileSystem(); 179 180 FileSystem.setDefaultUri( conf, fileSys.getUri() ); 181 182 mr = MiniMRClientClusterFactory.create( this.getClass(), 4, conf ); 183 184 jobConf = mr.getConfig(); 185 } 186 187 jobConf.set( "mapred.child.java.opts", "-Xmx512m" ); 188 jobConf.setInt( "mapreduce.job.jvm.numtasks", -1 ); 189 jobConf.setInt( "mapreduce.client.completion.pollinterval", 50 ); 190 jobConf.setInt( "mapreduce.client.progressmonitor.pollinterval", 50 ); 191 jobConf.setBoolean( "mapreduce.map.speculative", false ); 192 jobConf.setBoolean( "mapreduce.reduce.speculative", false ); 193 } 194 195 jobConf.setInt( "mapreduce.job.maps", numMapTasks ); 196 jobConf.setInt( "mapreduce.job.reduces", numReduceTasks ); 197 198 Map<Object, Object> globalProperties = getGlobalProperties(); 199 200 if( logger != null ) 201 globalProperties.put( "log4j.logger", logger ); 202 203 FlowProps.setJobPollingInterval( globalProperties, 10 ); // should speed up tests 204 205 Hadoop2MR1Planner.copyProperties( jobConf, globalProperties ); // copy any external properties 206 207 Hadoop2MR1Planner.copyConfiguration( properties, jobConf ); // put all properties on the jobconf 208 } 209 }