001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.platform.hadoop2;
022    
023    import java.io.File;
024    import java.io.IOException;
025    import java.util.Map;
026    
027    import cascading.flow.FlowConnector;
028    import cascading.flow.FlowProps;
029    import cascading.flow.hadoop2.Hadoop2MR1FlowConnector;
030    import cascading.flow.hadoop2.Hadoop2MR1Planner;
031    import cascading.platform.hadoop.BaseHadoopPlatform;
032    import cascading.util.Util;
033    import org.apache.hadoop.fs.FileSystem;
034    import org.apache.hadoop.hdfs.MiniDFSCluster;
035    import org.apache.hadoop.mapred.JobConf;
036    import org.apache.hadoop.mapred.MiniMRClientCluster;
037    import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     * Class Hadoop2Platform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance
043     * so that all *PlatformTest classes can be tested against Apache Hadoop 2.x.
044     */
045    public class Hadoop2MR1Platform extends BaseHadoopPlatform
046      {
047      private static final Logger LOG = LoggerFactory.getLogger( Hadoop2MR1Platform.class );
048      private transient static MiniDFSCluster dfs;
049      private transient static MiniMRClientCluster mr;
050    
051      public Hadoop2MR1Platform()
052        {
053        }
054    
055      @Override
056      public String getName()
057        {
058        return "hadoop2-mr1";
059        }
060    
061      @Override
062      public FlowConnector getFlowConnector( Map<Object, Object> properties )
063        {
064        return new Hadoop2MR1FlowConnector( properties );
065        }
066    
067      @Override
068      public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks )
069        {
070        properties.put( "mapreduce.job.maps", Integer.toString( numMapTasks ) );
071        }
072    
073      @Override
074      public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks )
075        {
076        properties.put( "mapreduce.job.reduces", Integer.toString( numReduceTasks ) );
077        }
078    
079      @Override
080      public Integer getNumMapTasks( Map<Object, Object> properties )
081        {
082        if( properties.get( "mapreduce.job.maps" ) == null )
083          return null;
084    
085        return Integer.parseInt( properties.get( "mapreduce.job.maps" ).toString() );
086        }
087    
088      @Override
089      public Integer getNumReduceTasks( Map<Object, Object> properties )
090        {
091        if( properties.get( "mapreduce.job.reduces" ) == null )
092          return null;
093    
094        return Integer.parseInt( properties.get( "mapreduce.job.reduces" ).toString() );
095        }
096    
097      @Override
098      public synchronized void setUp() throws IOException
099        {
100        if( jobConf != null )
101          return;
102    
103        if( !isUseCluster() )
104          {
105          LOG.info( "not using cluster" );
106          jobConf = new JobConf();
107    
108          // enforce settings to make local mode behave the same across distributions
109          jobConf.set( "fs.defaultFS", "file:///" );
110          jobConf.set( "mapreduce.framework.name", "local" );
111          jobConf.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/" + "build/tmp/cascading/staging" );
112    
113          fileSys = FileSystem.get( jobConf );
114          }
115        else
116          {
117          LOG.info( "using cluster" );
118    
119          if( Util.isEmpty( System.getProperty( "hadoop.log.dir" ) ) )
120            System.setProperty( "hadoop.log.dir", "build/test/log" );
121    
122          if( Util.isEmpty( System.getProperty( "hadoop.tmp.dir" ) ) )
123            System.setProperty( "hadoop.tmp.dir", "build/test/tmp" );
124    
125          new File( System.getProperty( "hadoop.log.dir" ) ).mkdirs(); // ignored
126    
127          JobConf conf = new JobConf();
128    
129          if( !Util.isEmpty( System.getProperty( "mapred.jar" ) ) )
130            {
131            LOG.info( "using a remote cluster with jar: {}", System.getProperty( "mapred.jar" ) );
132            jobConf = conf;
133    
134            ( (JobConf) jobConf ).setJar( System.getProperty( "mapred.jar" ) );
135    
136            if( !Util.isEmpty( System.getProperty( "fs.default.name" ) ) )
137              {
138              LOG.info( "using {}={}", "fs.default.name", System.getProperty( "fs.default.name" ) );
139              jobConf.set( "fs.default.name", System.getProperty( "fs.default.name" ) );
140              }
141    
142            if( !Util.isEmpty( System.getProperty( "fs.defaultFS" ) ) )
143              {
144              LOG.info( "using {}={}", "fs.defaultFS", System.getProperty( "fs.defaultFS" ) );
145              jobConf.set( "fs.defaultFS", System.getProperty( "fs.defaultFS" ) );
146              }
147    
148            if( !Util.isEmpty( System.getProperty( "yarn.resourcemanager.address" ) ) )
149              {
150              LOG.info( "using {}={}", "yarn.resourcemanager.address", System.getProperty( "yarn.resourcemanager.address" ) );
151              jobConf.set( "yarn.resourcemanager.address", System.getProperty( "yarn.resourcemanager.address" ) );
152              }
153    
154            if( !Util.isEmpty( System.getProperty( "mapreduce.jobhistory.address" ) ) )
155              {
156              LOG.info( "using {}={}", "mapreduce.jobhistory.address", System.getProperty( "mapreduce.jobhistory.address" ) );
157              jobConf.set( "mapreduce.jobhistory.address", System.getProperty( "mapreduce.jobhistory.address" ) );
158              }
159    
160            jobConf.set( "mapreduce.user.classpath.first", "true" ); // use test dependencies
161            jobConf.set( "mapreduce.framework.name", "yarn" );
162            fileSys = FileSystem.get( jobConf );
163            }
164          else
165            {
166            conf.setBoolean( "yarn.is.minicluster", true );
167    //      conf.setInt( "yarn.nodemanager.delete.debug-delay-sec", -1 );
168    //      conf.set( "yarn.scheduler.capacity.root.queues", "default" );
169    //      conf.set( "yarn.scheduler.capacity.root.default.capacity", "100" );
170            // disable blacklisting hosts not to fail localhost during unit tests
171            conf.setBoolean( "yarn.app.mapreduce.am.job.node-blacklisting.enable", false );
172    
173            dfs = new MiniDFSCluster( conf, 4, true, null );
174            fileSys = dfs.getFileSystem();
175    
176            FileSystem.setDefaultUri( conf, fileSys.getUri() );
177    
178            mr = MiniMRClientClusterFactory.create( this.getClass(), 4, conf );
179    
180            jobConf = mr.getConfig();
181            }
182    
183          jobConf.set( "mapred.child.java.opts", "-Xmx512m" );
184          jobConf.setInt( "mapreduce.job.jvm.numtasks", -1 );
185          jobConf.setInt( "mapreduce.client.completion.pollinterval", 50 );
186          jobConf.setInt( "mapreduce.client.progressmonitor.pollinterval", 50 );
187          jobConf.setBoolean( "mapreduce.map.speculative", false );
188          jobConf.setBoolean( "mapreduce.reduce.speculative", false );
189          }
190    
191        jobConf.setInt( "mapreduce.job.maps", numMapTasks );
192        jobConf.setInt( "mapreduce.job.reduces", numReduceTasks );
193    
194        Map<Object, Object> globalProperties = getGlobalProperties();
195    
196        if( logger != null )
197          globalProperties.put( "log4j.logger", logger );
198    
199        FlowProps.setJobPollingInterval( globalProperties, 10 ); // should speed up tests
200    
201        Hadoop2MR1Planner.copyProperties( jobConf, globalProperties ); // copy any external properties
202    
203        Hadoop2MR1Planner.copyConfiguration( properties, jobConf ); // put all properties on the jobconf
204        }
205      }