001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.platform.hadoop2;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.Map;
026
027import cascading.flow.FlowConnector;
028import cascading.flow.FlowProcess;
029import cascading.flow.FlowProps;
030import cascading.flow.FlowSession;
031import cascading.flow.hadoop.HadoopFlowProcess;
032import cascading.flow.hadoop2.Hadoop2MR1FlowConnector;
033import cascading.flow.hadoop2.Hadoop2MR1Planner;
034import cascading.platform.hadoop.BaseHadoopPlatform;
035import cascading.util.Util;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.hdfs.MiniDFSCluster;
038import org.apache.hadoop.mapred.JobConf;
039import org.apache.hadoop.mapred.MiniMRClientCluster;
040import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Class Hadoop2Platform is automatically loaded and injected into a {@link cascading.PlatformTestCase} instance
046 * so that all *PlatformTest classes can be tested against Apache Hadoop 2.x.
047 */
048public class Hadoop2MR1Platform extends BaseHadoopPlatform<JobConf>
049  {
050  private static final Logger LOG = LoggerFactory.getLogger( Hadoop2MR1Platform.class );
051  private transient static MiniDFSCluster dfs;
052  private transient static MiniMRClientCluster mr;
053
054  public Hadoop2MR1Platform()
055    {
056    }
057
058  @Override
059  public String getName()
060    {
061    return "hadoop2-mr1";
062    }
063
064  @Override
065  public FlowConnector getFlowConnector( Map<Object, Object> properties )
066    {
067    return new Hadoop2MR1FlowConnector( properties );
068    }
069
070  @Override
071  public void setNumMapTasks( Map<Object, Object> properties, int numMapTasks )
072    {
073    properties.put( "mapreduce.job.maps", Integer.toString( numMapTasks ) );
074    }
075
076  @Override
077  public void setNumReduceTasks( Map<Object, Object> properties, int numReduceTasks )
078    {
079    properties.put( "mapreduce.job.reduces", Integer.toString( numReduceTasks ) );
080    }
081
082  @Override
083  public Integer getNumMapTasks( Map<Object, Object> properties )
084    {
085    if( properties.get( "mapreduce.job.maps" ) == null )
086      return null;
087
088    return Integer.parseInt( properties.get( "mapreduce.job.maps" ).toString() );
089    }
090
091  @Override
092  public Integer getNumReduceTasks( Map<Object, Object> properties )
093    {
094    if( properties.get( "mapreduce.job.reduces" ) == null )
095      return null;
096
097    return Integer.parseInt( properties.get( "mapreduce.job.reduces" ).toString() );
098    }
099
100  public JobConf getConfiguration()
101    {
102    return new JobConf( configuration );
103    }
104
105  @Override
106  public FlowProcess getFlowProcess()
107    {
108    return new HadoopFlowProcess( FlowSession.NULL, getConfiguration(), true );
109    }
110
111  @Override
112  public synchronized void setUp() throws IOException
113    {
114    if( configuration != null )
115      return;
116
117    if( !isUseCluster() )
118      {
119      LOG.info( "not using cluster" );
120      configuration = new JobConf();
121
122      // enforce settings to make local mode behave the same across distributions
123      configuration.set( "fs.defaultFS", "file:///" );
124      configuration.set( "mapreduce.framework.name", "local" );
125      configuration.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/" + "build/tmp/cascading/staging" );
126
127      String stagingDir = configuration.get( "mapreduce.jobtracker.staging.root.dir" );
128
129      if( Util.isEmpty( stagingDir ) )
130        configuration.set( "mapreduce.jobtracker.staging.root.dir", System.getProperty( "user.dir" ) + "/build/tmp/cascading/staging" );
131
132      fileSys = FileSystem.get( configuration );
133      }
134    else
135      {
136      LOG.info( "using cluster" );
137
138      if( Util.isEmpty( System.getProperty( "hadoop.log.dir" ) ) )
139        System.setProperty( "hadoop.log.dir", "build/test/log" );
140
141      if( Util.isEmpty( System.getProperty( "hadoop.tmp.dir" ) ) )
142        System.setProperty( "hadoop.tmp.dir", "build/test/tmp" );
143
144      new File( System.getProperty( "hadoop.log.dir" ) ).mkdirs(); // ignored
145
146      JobConf conf = new JobConf();
147
148      if( getApplicationJar() != null )
149        {
150        LOG.info( "using a remote cluster with jar: {}", getApplicationJar() );
151
152        configuration = conf;
153
154        ( (JobConf) configuration ).setJar( getApplicationJar() );
155
156        if( !Util.isEmpty( System.getProperty( "fs.default.name" ) ) )
157          {
158          LOG.info( "using {}={}", "fs.default.name", System.getProperty( "fs.default.name" ) );
159          configuration.set( "fs.default.name", System.getProperty( "fs.default.name" ) );
160          }
161
162        if( !Util.isEmpty( System.getProperty( "mapred.job.tracker" ) ) )
163          {
164          LOG.info( "using {}={}", "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) );
165          configuration.set( "mapred.job.tracker", System.getProperty( "mapred.job.tracker" ) );
166          }
167
168        if( !Util.isEmpty( System.getProperty( "fs.defaultFS" ) ) )
169          {
170          LOG.info( "using {}={}", "fs.defaultFS", System.getProperty( "fs.defaultFS" ) );
171          configuration.set( "fs.defaultFS", System.getProperty( "fs.defaultFS" ) );
172          }
173
174        if( !Util.isEmpty( System.getProperty( "yarn.resourcemanager.address" ) ) )
175          {
176          LOG.info( "using {}={}", "yarn.resourcemanager.address", System.getProperty( "yarn.resourcemanager.address" ) );
177          configuration.set( "yarn.resourcemanager.address", System.getProperty( "yarn.resourcemanager.address" ) );
178          }
179
180        if( !Util.isEmpty( System.getProperty( "mapreduce.jobhistory.address" ) ) )
181          {
182          LOG.info( "using {}={}", "mapreduce.jobhistory.address", System.getProperty( "mapreduce.jobhistory.address" ) );
183          configuration.set( "mapreduce.jobhistory.address", System.getProperty( "mapreduce.jobhistory.address" ) );
184          }
185
186        configuration.set( "mapreduce.job.user.classpath.first", "true" ); // use test dependencies
187        configuration.set( "mapreduce.user.classpath.first", "true" ); // use test dependencies
188        configuration.set( "mapreduce.framework.name", "yarn" );
189
190        fileSys = FileSystem.get( configuration );
191        }
192      else
193        {
194        conf.setBoolean( "yarn.is.minicluster", true );
195//      conf.setInt( "yarn.nodemanager.delete.debug-delay-sec", -1 );
196//      conf.set( "yarn.scheduler.capacity.root.queues", "default" );
197//      conf.set( "yarn.scheduler.capacity.root.default.capacity", "100" );
198        // disable blacklisting hosts not to fail localhost during unit tests
199        conf.setBoolean( "yarn.app.mapreduce.am.job.node-blacklisting.enable", false );
200
201        dfs = new MiniDFSCluster( conf, 4, true, null );
202        fileSys = dfs.getFileSystem();
203
204        FileSystem.setDefaultUri( conf, fileSys.getUri() );
205
206        mr = MiniMRClientClusterFactory.create( this.getClass(), 4, conf );
207
208        configuration = mr.getConfig();
209        }
210
211      configuration.set( "mapred.child.java.opts", "-Xmx512m" );
212      configuration.setInt( "mapreduce.job.jvm.numtasks", -1 );
213      configuration.setInt( "mapreduce.client.completion.pollinterval", 50 );
214      configuration.setInt( "mapreduce.client.progressmonitor.pollinterval", 50 );
215      configuration.setBoolean( "mapreduce.map.speculative", false );
216      configuration.setBoolean( "mapreduce.reduce.speculative", false );
217      }
218
219    configuration.setInt( "mapreduce.job.maps", numMappers );
220    configuration.setInt( "mapreduce.job.reduces", numReducers );
221
222    Map<Object, Object> globalProperties = getGlobalProperties();
223
224    if( logger != null )
225      globalProperties.put( "log4j.logger", logger );
226
227    FlowProps.setJobPollingInterval( globalProperties, 10 ); // should speed up tests
228
229    Hadoop2MR1Planner.copyProperties( configuration, globalProperties ); // copy any external properties
230
231    Hadoop2MR1Planner.copyConfiguration( properties, configuration ); // put all properties on the jobconf
232    }
233  }