001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.platform.hadoop; 022 023 import java.io.File; 024 import java.io.IOException; 025 import java.util.Map; 026 027 import cascading.flow.FlowConnector; 028 import cascading.flow.FlowProps; 029 import cascading.flow.hadoop.HadoopFlowConnector; 030 import cascading.flow.hadoop.planner.HadoopPlanner; 031 import cascading.util.Util; 032 import org.apache.hadoop.fs.FileSystem; 033 import org.apache.hadoop.hdfs.MiniDFSCluster; 034 import org.apache.hadoop.mapred.JobConf; 035 import org.apache.hadoop.mapred.MiniMRCluster; 036 import org.slf4j.Logger; 037 import org.slf4j.LoggerFactory; 038 039 /** 040 * Class HadoopPlatform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance 041 * so that all *PlatformTest classes can be tested against Apache Hadoop. 042 * <p/> 043 * This platform works in three modes. 044 * <p/> 045 * Hadoop standalone mode is when Hadoop is NOT run as a cluster, and all 046 * child tasks are in process and in memory of the "client" side code. 047 * <p/> 048 * Hadoop mini cluster mode where a cluster is created on demand using the Hadoop MiniDFSCluster and MiniMRCluster 049 * utilities. When a PlatformTestCase requests to use a cluster, this is the default cluster. All properties are 050 * pulled from the current CLASSPATH via the JobConf. 051 * <p/> 052 * Lastly remote cluster mode is enabled when the System property "mapred.jar" is set. This is a Hadoop property 053 * specifying the Hadoop "job jar" to be used cluster side. This MUST be the Cascading test suite and dependencies 054 * packaged in a Hadoop compatible way. This is left to be implemented by the framework using this mode. Additionally 055 * these properties may optionally be set if not already in the CLASSPATH; fs.default.name and mapred.job.tracker. 056 */ 057 public class HadoopPlatform extends BaseHadoopPlatform 058 { 059 private static final Logger LOG = LoggerFactory.getLogger( HadoopPlatform.class ); 060 061 public transient static MiniDFSCluster dfs; 062 public transient static MiniMRCluster mr; 063 064 public HadoopPlatform() 065 { 066 } 067 068 @Override 069 public FlowConnector getFlowConnector( Map<Object, Object> properties ) 070 { 071 return new HadoopFlowConnector( properties ); 072 } 073 074 @Override 075 public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks ) 076 { 077 properties.put( "mapred.map.tasks", Integer.toString( numMapTasks ) ); 078 } 079 080 @Override 081 public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks ) 082 { 083 properties.put( "mapred.reduce.tasks", Integer.toString( numReduceTasks ) ); 084 } 085 086 @Override 087 public Integer getNumMapTasks( Map<Object, Object> properties ) 088 { 089 if( properties.get( "mapred.map.tasks" ) == null ) 090 return null; 091 092 return Integer.parseInt( properties.get( "mapred.map.tasks" ).toString() ); 093 } 094 095 @Override 096 public Integer getNumReduceTasks( Map<Object, Object> properties ) 097 { 098 if( properties.get( "mapred.reduce.tasks" ) == null ) 099 return null; 100 101 return Integer.parseInt( properties.get( "mapred.reduce.tasks" ).toString() ); 102 } 103 104 @Override 105 public synchronized void setUp() throws IOException 106 { 107 if( jobConf != null ) 108 return; 109 110 if( !isUseCluster() ) 111 { 112 LOG.info( "not using cluster" ); 113 jobConf = new JobConf(); 114 115 // enforce the local file system in local mode 116 jobConf.set( "fs.default.name", "file:///" ); 117 jobConf.set( "mapred.job.tracker", "local" ); 118 119 String stagingDir = jobConf.get( "mapreduce.jobtracker.staging.root.dir" ); 120 121 if( Util.isEmpty( stagingDir ) ) 122 jobConf.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/build/tmp/cascading/staging" ); 123 124 fileSys = FileSystem.get( jobConf ); 125 } 126 else 127 { 128 LOG.info( "using cluster" ); 129 130 if( Util.isEmpty( System.getProperty( "hadoop.log.dir" ) ) ) 131 System.setProperty( "hadoop.log.dir", "cascading-hadoop/build/test/log" ); 132 133 if( Util.isEmpty( System.getProperty( "hadoop.tmp.dir" ) ) ) 134 System.setProperty( "hadoop.tmp.dir", "cascading-hadoop/build/test/tmp" ); 135 136 new File( System.getProperty( "hadoop.log.dir" ) ).mkdirs(); // ignored 137 138 JobConf conf = new JobConf(); 139 140 if( !Util.isEmpty( System.getProperty( "mapred.jar" ) ) ) 141 { 142 LOG.info( "using a remote cluster with jar: {}", System.getProperty( "mapred.jar" ) ); 143 jobConf = conf; 144 145 ( (JobConf) jobConf ).setJar( System.getProperty( "mapred.jar" ) ); 146 147 if( !Util.isEmpty( System.getProperty( "fs.default.name" ) ) ) 148 { 149 LOG.info( "using {}={}", "fs.default.name", System.getProperty( "fs.default.name" ) ); 150 jobConf.set( "fs.default.name", System.getProperty( "fs.default.name" ) ); 151 } 152 153 if( !Util.isEmpty( System.getProperty( "mapred.job.tracker" ) ) ) 154 { 155 LOG.info( "using {}={}", "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) ); 156 jobConf.set( "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) ); 157 } 158 159 jobConf.set( "mapreduce.user.classpath.first", "true" ); // use test dependencies 160 fileSys = FileSystem.get( jobConf ); 161 } 162 else 163 { 164 dfs = new MiniDFSCluster( conf, 4, true, null ); 165 fileSys = dfs.getFileSystem(); 166 mr = new MiniMRCluster( 4, fileSys.getUri().toString(), 1, null, null, conf ); 167 168 jobConf = mr.createJobConf(); 169 } 170 171 // jobConf.set( "mapred.map.max.attempts", "1" ); 172 // jobConf.set( "mapred.reduce.max.attempts", "1" ); 173 jobConf.set( "mapred.child.java.opts", "-Xmx512m" ); 174 jobConf.setInt( "mapred.job.reuse.jvm.num.tasks", -1 ); 175 jobConf.setInt( "jobclient.completion.poll.interval", 50 ); 176 jobConf.setInt( "jobclient.progress.monitor.poll.interval", 50 ); 177 ( (JobConf) jobConf ).setMapSpeculativeExecution( false ); 178 ( (JobConf) jobConf ).setReduceSpeculativeExecution( false ); 179 } 180 181 ( (JobConf) jobConf ).setNumMapTasks( numMapTasks ); 182 ( (JobConf) jobConf ).setNumReduceTasks( numReduceTasks ); 183 184 Map<Object, Object> globalProperties = getGlobalProperties(); 185 186 if( logger != null ) 187 globalProperties.put( "log4j.logger", logger ); 188 189 FlowProps.setJobPollingInterval( globalProperties, 10 ); // should speed up tests 190 191 HadoopPlanner.copyProperties( (JobConf) jobConf, globalProperties ); // copy any external properties 192 193 HadoopPlanner.copyJobConf( properties, (JobConf) jobConf ); // put all properties on the jobconf 194 } 195 }