001    /*
002     * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.platform.hadoop2;
022    
023    import java.io.File;
024    import java.io.IOException;
025    import java.util.Map;
026    
027    import cascading.flow.FlowConnector;
028    import cascading.flow.FlowProps;
029    import cascading.flow.hadoop2.Hadoop2MR1FlowConnector;
030    import cascading.flow.hadoop2.Hadoop2MR1Planner;
031    import cascading.platform.hadoop.BaseHadoopPlatform;
032    import cascading.util.Util;
033    import org.apache.hadoop.fs.FileSystem;
034    import org.apache.hadoop.hdfs.MiniDFSCluster;
035    import org.apache.hadoop.mapred.JobConf;
036    import org.apache.hadoop.mapred.MiniMRClientCluster;
037    import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     * Class Hadoop2Platform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance
043     * so that all *PlatformTest classes can be tested against Apache Hadoop 2.x.
044     */
045    public class Hadoop2MR1Platform extends BaseHadoopPlatform
046      {
047      private static final Logger LOG = LoggerFactory.getLogger( Hadoop2MR1Platform.class );
048      private transient static MiniDFSCluster dfs;
049      private transient static MiniMRClientCluster mr;
050    
051      public Hadoop2MR1Platform()
052        {
053        }
054    
055      @Override
056      public String getName()
057        {
058        return "hadoop2-mr1";
059        }
060    
061      @Override
062      public FlowConnector getFlowConnector( Map<Object, Object> properties )
063        {
064        return new Hadoop2MR1FlowConnector( properties );
065        }
066    
067      @Override
068      public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks )
069        {
070        properties.put( "mapreduce.job.maps", Integer.toString( numMapTasks ) );
071        }
072    
073      @Override
074      public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks )
075        {
076        properties.put( "mapreduce.job.reduces", Integer.toString( numReduceTasks ) );
077        }
078    
079      @Override
080      public Integer getNumMapTasks( Map<Object, Object> properties )
081        {
082        if( properties.get( "mapreduce.job.maps" ) == null )
083          return null;
084    
085        return Integer.parseInt( properties.get( "mapreduce.job.maps" ).toString() );
086        }
087    
088      @Override
089      public Integer getNumReduceTasks( Map<Object, Object> properties )
090        {
091        if( properties.get( "mapreduce.job.reduces" ) == null )
092          return null;
093    
094        return Integer.parseInt( properties.get( "mapreduce.job.reduces" ).toString() );
095        }
096    
097      @Override
098      public synchronized void setUp() throws IOException
099        {
100        if( jobConf != null )
101          return;
102    
103        if( !isUseCluster() )
104          {
105          LOG.info( "not using cluster" );
106          jobConf = new JobConf();
107    
108          // enforce settings to make local mode behave the same across distributions
109          jobConf.set( "fs.defaultFS", "file:///" );
110          jobConf.set( "mapreduce.framework.name", "local" );
111    
112          String stagingDir = jobConf.get( "mapreduce.jobtracker.staging.root.dir" );
113    
114          if( Util.isEmpty( stagingDir ) )
115            jobConf.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/build/tmp/cascading/staging" );
116    
117          fileSys = FileSystem.get( jobConf );
118          }
119        else
120          {
121          LOG.info( "using cluster" );
122    
123          if( Util.isEmpty( System.getProperty( "hadoop.log.dir" ) ) )
124            System.setProperty( "hadoop.log.dir", "build/test/log" );
125    
126          if( Util.isEmpty( System.getProperty( "hadoop.tmp.dir" ) ) )
127            System.setProperty( "hadoop.tmp.dir", "build/test/tmp" );
128    
129          new File( System.getProperty( "hadoop.log.dir" ) ).mkdirs(); // ignored
130    
131          JobConf conf = new JobConf();
132    
133          if( !Util.isEmpty( System.getProperty( "mapred.jar" ) ) )
134            {
135            LOG.info( "using a remote cluster with jar: {}", System.getProperty( "mapred.jar" ) );
136            jobConf = conf;
137    
138            ( (JobConf) jobConf ).setJar( System.getProperty( "mapred.jar" ) );
139    
140            if( !Util.isEmpty( System.getProperty( "fs.default.name" ) ) )
141              {
142              LOG.info( "using {}={}", "fs.default.name", System.getProperty( "fs.default.name" ) );
143              jobConf.set( "fs.default.name", System.getProperty( "fs.default.name" ) );
144              }
145    
146            if( !Util.isEmpty( System.getProperty( "fs.defaultFS" ) ) )
147              {
148              LOG.info( "using {}={}", "fs.defaultFS", System.getProperty( "fs.defaultFS" ) );
149              jobConf.set( "fs.defaultFS", System.getProperty( "fs.defaultFS" ) );
150              }
151    
152            if( !Util.isEmpty( System.getProperty( "yarn.resourcemanager.address" ) ) )
153              {
154              LOG.info( "using {}={}", "yarn.resourcemanager.address", System.getProperty( "yarn.resourcemanager.address" ) );
155              jobConf.set( "yarn.resourcemanager.address", System.getProperty( "yarn.resourcemanager.address" ) );
156              }
157    
158            if( !Util.isEmpty( System.getProperty( "mapreduce.jobhistory.address" ) ) )
159              {
160              LOG.info( "using {}={}", "mapreduce.jobhistory.address", System.getProperty( "mapreduce.jobhistory.address" ) );
161              jobConf.set( "mapreduce.jobhistory.address", System.getProperty( "mapreduce.jobhistory.address" ) );
162              }
163    
164            jobConf.set( "mapreduce.user.classpath.first", "true" ); // use test dependencies
165            jobConf.set( "mapreduce.framework.name", "yarn" );
166            fileSys = FileSystem.get( jobConf );
167            }
168          else
169            {
170            conf.setBoolean( "yarn.is.minicluster", true );
171    //      conf.setInt( "yarn.nodemanager.delete.debug-delay-sec", -1 );
172    //      conf.set( "yarn.scheduler.capacity.root.queues", "default" );
173    //      conf.set( "yarn.scheduler.capacity.root.default.capacity", "100" );
174            // disable blacklisting hosts not to fail localhost during unit tests
175            conf.setBoolean( "yarn.app.mapreduce.am.job.node-blacklisting.enable", false );
176    
177            dfs = new MiniDFSCluster( conf, 4, true, null );
178            fileSys = dfs.getFileSystem();
179    
180            FileSystem.setDefaultUri( conf, fileSys.getUri() );
181    
182            mr = MiniMRClientClusterFactory.create( this.getClass(), 4, conf );
183    
184            jobConf = mr.getConfig();
185            }
186    
187          jobConf.set( "mapred.child.java.opts", "-Xmx512m" );
188          jobConf.setInt( "mapreduce.job.jvm.numtasks", -1 );
189          jobConf.setInt( "mapreduce.client.completion.pollinterval", 50 );
190          jobConf.setInt( "mapreduce.client.progressmonitor.pollinterval", 50 );
191          jobConf.setBoolean( "mapreduce.map.speculative", false );
192          jobConf.setBoolean( "mapreduce.reduce.speculative", false );
193          }
194    
195        jobConf.setInt( "mapreduce.job.maps", numMapTasks );
196        jobConf.setInt( "mapreduce.job.reduces", numReduceTasks );
197    
198        Map<Object, Object> globalProperties = getGlobalProperties();
199    
200        if( logger != null )
201          globalProperties.put( "log4j.logger", logger );
202    
203        FlowProps.setJobPollingInterval( globalProperties, 10 ); // should speed up tests
204    
205        Hadoop2MR1Planner.copyProperties( jobConf, globalProperties ); // copy any external properties
206    
207        Hadoop2MR1Planner.copyConfiguration( properties, jobConf ); // put all properties on the jobconf
208        }
209      }