001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.platform.hadoop2;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.Map;
026
027import cascading.flow.FlowConnector;
028import cascading.flow.FlowProcess;
029import cascading.flow.FlowProps;
030import cascading.flow.FlowSession;
031import cascading.flow.hadoop.HadoopFlowProcess;
032import cascading.flow.hadoop2.Hadoop2MR1FlowConnector;
033import cascading.flow.hadoop2.Hadoop2MR1Planner;
034import cascading.platform.hadoop.BaseHadoopPlatform;
035import cascading.util.Util;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.hdfs.MiniDFSCluster;
038import org.apache.hadoop.mapred.JobConf;
039import org.apache.hadoop.mapred.MiniMRClientCluster;
040import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Class Hadoop2Platform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance
046 * so that all *PlatformTest classes can be tested against Apache Hadoop 2.x.
047 */
048public class Hadoop2MR1Platform extends BaseHadoopPlatform<JobConf>
049  {
050  private static final Logger LOG = LoggerFactory.getLogger( Hadoop2MR1Platform.class );
051  private transient static MiniDFSCluster dfs;
052  private transient static MiniMRClientCluster mr;
053
054  public Hadoop2MR1Platform()
055    {
056    }
057
058  @Override
059  public String getName()
060    {
061    return "hadoop2-mr1";
062    }
063
064  @Override
065  public FlowConnector getFlowConnector( Map<Object, Object> properties )
066    {
067    return new Hadoop2MR1FlowConnector( properties );
068    }
069
070  @Override
071  public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks )
072    {
073    properties.put( "mapreduce.job.maps", Integer.toString( numMapTasks ) );
074    }
075
076  @Override
077  public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks )
078    {
079    properties.put( "mapreduce.job.reduces", Integer.toString( numReduceTasks ) );
080    }
081
082  @Override
083  public Integer getNumMapTasks( Map<Object, Object> properties )
084    {
085    if( properties.get( "mapreduce.job.maps" ) == null )
086      return null;
087
088    return Integer.parseInt( properties.get( "mapreduce.job.maps" ).toString() );
089    }
090
091  @Override
092  public Integer getNumReduceTasks( Map<Object, Object> properties )
093    {
094    if( properties.get( "mapreduce.job.reduces" ) == null )
095      return null;
096
097    return Integer.parseInt( properties.get( "mapreduce.job.reduces" ).toString() );
098    }
099
100  public JobConf getConfiguration()
101    {
102    return new JobConf( configuration );
103    }
104
105  @Override
106  public FlowProcess getFlowProcess()
107    {
108    return new HadoopFlowProcess( FlowSession.NULL, getConfiguration(), true );
109    }
110
111  @Override
112  public synchronized void setUp() throws IOException
113    {
114    if( configuration != null )
115      return;
116
117    if( !isUseCluster() )
118      {
119      LOG.info( "not using cluster" );
120      configuration = new JobConf();
121
122      // enforce settings to make local mode behave the same across distributions
123      configuration.set( "fs.defaultFS", "file:///" );
124      configuration.set( "mapreduce.framework.name", "local" );
125      configuration.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/" + "build/tmp/cascading/staging" );
126
127      String stagingDir = configuration.get( "mapreduce.jobtracker.staging.root.dir" );
128
129      if( Util.isEmpty( stagingDir ) )
130        configuration.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/build/tmp/cascading/staging" );
131
132      fileSys = FileSystem.get( configuration );
133      }
134    else
135      {
136      LOG.info( "using cluster" );
137
138      if( Util.isEmpty( System.getProperty( "hadoop.log.dir" ) ) )
139        System.setProperty( "hadoop.log.dir", "build/test/log" );
140
141      if( Util.isEmpty( System.getProperty( "hadoop.tmp.dir" ) ) )
142        System.setProperty( "hadoop.tmp.dir", "build/test/tmp" );
143
144      new File( System.getProperty( "hadoop.log.dir" ) ).mkdirs(); // ignored
145
146      JobConf conf = new JobConf();
147
148      if( !Util.isEmpty( System.getProperty( "mapred.jar" ) ) )
149        {
150        LOG.info( "using a remote cluster with jar: {}", System.getProperty( "mapred.jar" ) );
151        configuration = conf;
152
153        ( (JobConf) configuration ).setJar( System.getProperty( "mapred.jar" ) );
154
155        if( !Util.isEmpty( System.getProperty( "fs.default.name" ) ) )
156          {
157          LOG.info( "using {}={}", "fs.default.name", System.getProperty( "fs.default.name" ) );
158          configuration.set( "fs.default.name", System.getProperty( "fs.default.name" ) );
159          }
160
161        if( !Util.isEmpty( System.getProperty( "mapred.job.tracker" ) ) )
162          {
163          LOG.info( "using {}={}", "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) );
164          configuration.set( "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) );
165          }
166
167        if( !Util.isEmpty( System.getProperty( "fs.defaultFS" ) ) )
168          {
169          LOG.info( "using {}={}", "fs.defaultFS", System.getProperty( "fs.defaultFS" ) );
170          configuration.set( "fs.defaultFS", System.getProperty( "fs.defaultFS" ) );
171          }
172
173        if( !Util.isEmpty( System.getProperty( "yarn.resourcemanager.address" ) ) )
174          {
175          LOG.info( "using {}={}", "yarn.resourcemanager.address", System.getProperty( "yarn.resourcemanager.address" ) );
176          configuration.set( "yarn.resourcemanager.address", System.getProperty( "yarn.resourcemanager.address" ) );
177          }
178
179        if( !Util.isEmpty( System.getProperty( "mapreduce.jobhistory.address" ) ) )
180          {
181          LOG.info( "using {}={}", "mapreduce.jobhistory.address", System.getProperty( "mapreduce.jobhistory.address" ) );
182          configuration.set( "mapreduce.jobhistory.address", System.getProperty( "mapreduce.jobhistory.address" ) );
183          }
184
185        configuration.set( "mapreduce.user.classpath.first", "true" ); // use test dependencies
186        configuration.set( "mapreduce.framework.name", "yarn" );
187
188        fileSys = FileSystem.get( configuration );
189        }
190      else
191        {
192        conf.setBoolean( "yarn.is.minicluster", true );
193//      conf.setInt( "yarn.nodemanager.delete.debug-delay-sec", -1 );
194//      conf.set( "yarn.scheduler.capacity.root.queues", "default" );
195//      conf.set( "yarn.scheduler.capacity.root.default.capacity", "100" );
196        // disable blacklisting hosts not to fail localhost during unit tests
197        conf.setBoolean( "yarn.app.mapreduce.am.job.node-blacklisting.enable", false );
198
199        dfs = new MiniDFSCluster( conf, 4, true, null );
200        fileSys = dfs.getFileSystem();
201
202        FileSystem.setDefaultUri( conf, fileSys.getUri() );
203
204        mr = MiniMRClientClusterFactory.create( this.getClass(), 4, conf );
205
206        configuration = mr.getConfig();
207        }
208
209      configuration.set( "mapred.child.java.opts", "-Xmx512m" );
210      configuration.setInt( "mapreduce.job.jvm.numtasks", -1 );
211      configuration.setInt( "mapreduce.client.completion.pollinterval", 50 );
212      configuration.setInt( "mapreduce.client.progressmonitor.pollinterval", 50 );
213      configuration.setBoolean( "mapreduce.map.speculative", false );
214      configuration.setBoolean( "mapreduce.reduce.speculative", false );
215      }
216
217    configuration.setInt( "mapreduce.job.maps", numMappers );
218    configuration.setInt( "mapreduce.job.reduces", numReducers );
219
220    Map<Object, Object> globalProperties = getGlobalProperties();
221
222    if( logger != null )
223      globalProperties.put( "log4j.logger", logger );
224
225    FlowProps.setJobPollingInterval( globalProperties, 10 ); // should speed up tests
226
227    Hadoop2MR1Planner.copyProperties( configuration, globalProperties ); // copy any external properties
228
229    Hadoop2MR1Planner.copyConfiguration( properties, configuration ); // put all properties on the jobconf
230    }
231  }