001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.platform.hadoop2; 022 023 import java.io.File; 024 import java.io.IOException; 025 import java.util.Map; 026 027 import cascading.flow.FlowConnector; 028 import cascading.flow.FlowProps; 029 import cascading.flow.hadoop2.Hadoop2MR1FlowConnector; 030 import cascading.flow.hadoop2.Hadoop2MR1Planner; 031 import cascading.platform.hadoop.BaseHadoopPlatform; 032 import cascading.util.Util; 033 import org.apache.hadoop.fs.FileSystem; 034 import org.apache.hadoop.hdfs.MiniDFSCluster; 035 import org.apache.hadoop.mapred.JobConf; 036 import org.apache.hadoop.mapred.MiniMRClientCluster; 037 import org.apache.hadoop.mapred.MiniMRClientClusterFactory; 038 import org.slf4j.Logger; 039 import org.slf4j.LoggerFactory; 040 041 /** 042 * Class Hadoop2Platform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance 043 * so that all *PlatformTest classes can be tested against Apache Hadoop 2.x. 044 */ 045 public class Hadoop2MR1Platform extends BaseHadoopPlatform 046 { 047 private static final Logger LOG = LoggerFactory.getLogger( Hadoop2MR1Platform.class ); 048 private transient static MiniDFSCluster dfs; 049 private transient static MiniMRClientCluster mr; 050 051 public Hadoop2MR1Platform() 052 { 053 } 054 055 @Override 056 public String getName() 057 { 058 return "hadoop2-mr1"; 059 } 060 061 @Override 062 public FlowConnector getFlowConnector( Map<Object, Object> properties ) 063 { 064 return new Hadoop2MR1FlowConnector( properties ); 065 } 066 067 @Override 068 public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks ) 069 { 070 properties.put( "mapreduce.job.maps", Integer.toString( numMapTasks ) ); 071 } 072 073 @Override 074 public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks ) 075 { 076 properties.put( "mapreduce.job.reduces", Integer.toString( numReduceTasks ) ); 077 } 078 079 @Override 080 public Integer getNumMapTasks( Map<Object, Object> properties ) 081 { 082 if( properties.get( "mapreduce.job.maps" ) == null ) 083 return null; 084 085 return Integer.parseInt( properties.get( "mapreduce.job.maps" ).toString() ); 086 } 087 088 @Override 089 public Integer getNumReduceTasks( Map<Object, Object> properties ) 090 { 091 if( properties.get( "mapreduce.job.reduces" ) == null ) 092 return null; 093 094 return Integer.parseInt( properties.get( "mapreduce.job.reduces" ).toString() ); 095 } 096 097 @Override 098 public synchronized void setUp() throws IOException 099 { 100 if( jobConf != null ) 101 return; 102 103 if( !isUseCluster() ) 104 { 105 LOG.info( "not using cluster" ); 106 jobConf = new JobConf(); 107 108 // enforce settings to make local mode behave the same across distributions 109 jobConf.set( "fs.defaultFS", "file:///" ); 110 jobConf.set( "mapreduce.framework.name", "local" ); 111 jobConf.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/" + "build/tmp/cascading/staging" ); 112 113 fileSys = FileSystem.get( jobConf ); 114 } 115 else 116 { 117 LOG.info( "using cluster" ); 118 119 if( Util.isEmpty( System.getProperty( "hadoop.log.dir" ) ) ) 120 System.setProperty( "hadoop.log.dir", "build/test/log" ); 121 122 if( Util.isEmpty( System.getProperty( "hadoop.tmp.dir" ) ) ) 123 System.setProperty( "hadoop.tmp.dir", "build/test/tmp" ); 124 125 new File( System.getProperty( "hadoop.log.dir" ) ).mkdirs(); // ignored 126 127 JobConf conf = new JobConf(); 128 129 if( !Util.isEmpty( System.getProperty( "mapred.jar" ) ) ) 130 { 131 LOG.info( "using a remote cluster with jar: {}", System.getProperty( "mapred.jar" ) ); 132 jobConf = conf; 133 134 ( (JobConf) jobConf ).setJar( System.getProperty( "mapred.jar" ) ); 135 136 if( !Util.isEmpty( System.getProperty( "fs.default.name" ) ) ) 137 { 138 LOG.info( "using {}={}", "fs.default.name", System.getProperty( "fs.default.name" ) ); 139 jobConf.set( "fs.default.name", System.getProperty( "fs.default.name" ) ); 140 } 141 142 if( !Util.isEmpty( System.getProperty( "fs.defaultFS" ) ) ) 143 { 144 LOG.info( "using {}={}", "fs.defaultFS", System.getProperty( "fs.defaultFS" ) ); 145 jobConf.set( "fs.defaultFS", System.getProperty( "fs.defaultFS" ) ); 146 } 147 148 if( !Util.isEmpty( System.getProperty( "yarn.resourcemanager.address" ) ) ) 149 { 150 LOG.info( "using {}={}", "yarn.resourcemanager.address", System.getProperty( "yarn.resourcemanager.address" ) ); 151 jobConf.set( "yarn.resourcemanager.address", System.getProperty( "yarn.resourcemanager.address" ) ); 152 } 153 154 if( !Util.isEmpty( System.getProperty( "mapreduce.jobhistory.address" ) ) ) 155 { 156 LOG.info( "using {}={}", "mapreduce.jobhistory.address", System.getProperty( "mapreduce.jobhistory.address" ) ); 157 jobConf.set( "mapreduce.jobhistory.address", System.getProperty( "mapreduce.jobhistory.address" ) ); 158 } 159 160 jobConf.set( "mapreduce.user.classpath.first", "true" ); // use test dependencies 161 jobConf.set( "mapreduce.framework.name", "yarn" ); 162 fileSys = FileSystem.get( jobConf ); 163 } 164 else 165 { 166 conf.setBoolean( "yarn.is.minicluster", true ); 167 // conf.setInt( "yarn.nodemanager.delete.debug-delay-sec", -1 ); 168 // conf.set( "yarn.scheduler.capacity.root.queues", "default" ); 169 // conf.set( "yarn.scheduler.capacity.root.default.capacity", "100" ); 170 // disable blacklisting hosts not to fail localhost during unit tests 171 conf.setBoolean( "yarn.app.mapreduce.am.job.node-blacklisting.enable", false ); 172 173 dfs = new MiniDFSCluster( conf, 4, true, null ); 174 fileSys = dfs.getFileSystem(); 175 176 FileSystem.setDefaultUri( conf, fileSys.getUri() ); 177 178 mr = MiniMRClientClusterFactory.create( this.getClass(), 4, conf ); 179 180 jobConf = mr.getConfig(); 181 } 182 183 jobConf.set( "mapred.child.java.opts", "-Xmx512m" ); 184 jobConf.setInt( "mapreduce.job.jvm.numtasks", -1 ); 185 jobConf.setInt( "mapreduce.client.completion.pollinterval", 50 ); 186 jobConf.setInt( "mapreduce.client.progressmonitor.pollinterval", 50 ); 187 jobConf.setBoolean( "mapreduce.map.speculative", false ); 188 jobConf.setBoolean( "mapreduce.reduce.speculative", false ); 189 } 190 191 jobConf.setInt( "mapreduce.job.maps", numMapTasks ); 192 jobConf.setInt( "mapreduce.job.reduces", numReduceTasks ); 193 194 Map<Object, Object> globalProperties = getGlobalProperties(); 195 196 if( logger != null ) 197 globalProperties.put( "log4j.logger", logger ); 198 199 FlowProps.setJobPollingInterval( globalProperties, 10 ); // should speed up tests 200 201 Hadoop2MR1Planner.copyProperties( jobConf, globalProperties ); // copy any external properties 202 203 Hadoop2MR1Planner.copyConfiguration( properties, jobConf ); // put all properties on the jobconf 204 } 205 }