001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Map;
028import java.util.Set;
029
030import cascading.CascadingException;
031import cascading.flow.FlowProcess;
032import cascading.flow.FlowSession;
033import cascading.flow.hadoop.util.HadoopUtil;
034import cascading.tap.Tap;
035import cascading.tuple.Fields;
036import cascading.tuple.TupleEntry;
037import cascading.tuple.TupleEntryCollector;
038import cascading.tuple.TupleEntryIterator;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.mapred.JobConf;
041import org.apache.hadoop.mapred.OutputCollector;
042import org.apache.hadoop.mapred.Reporter;
043import org.apache.hadoop.util.ReflectionUtils;
044
045/**
046 * Class HadoopFlowProcess is an implementation of {@link FlowProcess} for Hadoop. Use this interface to get direct
047 * access to the Hadoop JobConf and Reporter interfaces.
048 * <p/>
049 * Be warned that coupling to this implementation will cause custom {@link cascading.operation.Operation}s to
050 * fail if they are executed on a system other than Hadoop.
051 *
052 * @see cascading.flow.FlowSession
053 * @see JobConf
054 * @see Reporter
055 */
056public class HadoopFlowProcess extends FlowProcess<JobConf> implements MapRed
057  {
058  /** Field jobConf */
059  final JobConf jobConf;
060  /** Field isMapper */
061  private final boolean isMapper;
062  /** Field reporter */
063  Reporter reporter = Reporter.NULL;
064  /** Field outputCollector */
065  private OutputCollector outputCollector;
066
067  public HadoopFlowProcess()
068    {
069    this.jobConf = new JobConf();
070    this.isMapper = true;
071    }
072
073  public HadoopFlowProcess( Configuration jobConf )
074    {
075    this( new JobConf( jobConf ) );
076    }
077
078  public HadoopFlowProcess( JobConf jobConf )
079    {
080    this.jobConf = jobConf;
081    this.isMapper = true;
082    }
083
084  public HadoopFlowProcess( FlowSession flowSession, JobConf jobConf )
085    {
086    super( flowSession );
087    this.jobConf = jobConf;
088    this.isMapper = true;
089    }
090
091  /**
092   * Constructor HadoopFlowProcess creates a new HadoopFlowProcess instance.
093   *
094   * @param flowSession of type FlowSession
095   * @param jobConf     of type JobConf
096   */
097  public HadoopFlowProcess( FlowSession flowSession, JobConf jobConf, boolean isMapper )
098    {
099    super( flowSession );
100    this.jobConf = jobConf;
101    this.isMapper = isMapper;
102    }
103
104  public HadoopFlowProcess( HadoopFlowProcess flowProcess, JobConf jobConf )
105    {
106    super( flowProcess );
107    this.jobConf = jobConf;
108    this.isMapper = flowProcess.isMapper();
109    this.reporter = flowProcess.getReporter();
110    }
111
112  @Override
113  public FlowProcess copyWith( JobConf jobConf )
114    {
115    return new HadoopFlowProcess( this, jobConf );
116    }
117
118  /**
119   * Method getJobConf returns the jobConf of this HadoopFlowProcess object.
120   *
121   * @return the jobConf (type JobConf) of this HadoopFlowProcess object.
122   */
123  public JobConf getJobConf()
124    {
125    return jobConf;
126    }
127
128  @Override
129  public JobConf getConfig()
130    {
131    return jobConf;
132    }
133
134  @Override
135  public JobConf getConfigCopy()
136    {
137    return HadoopUtil.copyJobConf( jobConf );
138    }
139
140  /**
141   * Method isMapper returns true if this part of the FlowProcess is a MapReduce mapper. If false, it is a reducer.
142   *
143   * @return boolean
144   */
145  public boolean isMapper()
146    {
147    return isMapper;
148    }
149
150  public int getCurrentNumMappers()
151    {
152    return getJobConf().getNumMapTasks();
153    }
154
155  public int getCurrentNumReducers()
156    {
157    return getJobConf().getNumReduceTasks();
158    }
159
160  /**
161   * Method getCurrentTaskNum returns the task number of this task. Task 0 is the first task.
162   *
163   * @return int
164   */
165  @Override
166  public int getCurrentSliceNum()
167    {
168    return getJobConf().getInt( "mapred.task.partition", 0 );
169    }
170
171  @Override
172  public int getNumProcessSlices()
173    {
174    if( isMapper() )
175      return getCurrentNumMappers();
176    else
177      return getCurrentNumReducers();
178    }
179
180  /**
181   * Method setReporter sets the reporter of this HadoopFlowProcess object.
182   *
183   * @param reporter the reporter of this HadoopFlowProcess object.
184   */
185  public void setReporter( Reporter reporter )
186    {
187    if( reporter == null )
188      this.reporter = Reporter.NULL;
189    else
190      this.reporter = reporter;
191    }
192
193  @Override
194  public Reporter getReporter()
195    {
196    return reporter;
197    }
198
199  public void setOutputCollector( OutputCollector outputCollector )
200    {
201    this.outputCollector = outputCollector;
202    }
203
204  public OutputCollector getOutputCollector()
205    {
206    return outputCollector;
207    }
208
209  @Override
210  public Object getProperty( String key )
211    {
212    return jobConf.get( key );
213    }
214
215  @Override
216  public Collection<String> getPropertyKeys()
217    {
218    Set<String> keys = new HashSet<String>();
219
220    for( Map.Entry<String, String> entry : jobConf )
221      keys.add( entry.getKey() );
222
223    return Collections.unmodifiableSet( keys );
224    }
225
226  @Override
227  public Object newInstance( String className )
228    {
229    if( className == null || className.isEmpty() )
230      return null;
231
232    try
233      {
234      Class type = (Class) HadoopFlowProcess.class.getClassLoader().loadClass( className.toString() );
235
236      return ReflectionUtils.newInstance( type, jobConf );
237      }
238    catch( ClassNotFoundException exception )
239      {
240      throw new CascadingException( "unable to load class: " + className.toString(), exception );
241      }
242    }
243
244  @Override
245  public void keepAlive()
246    {
247    getReporter().progress();
248    }
249
250  @Override
251  public void increment( Enum counter, long amount )
252    {
253    getReporter().incrCounter( counter, amount );
254    }
255
256  @Override
257  public void increment( String group, String counter, long amount )
258    {
259    getReporter().incrCounter( group, counter, amount );
260    }
261
262  @Override
263  public long getCounterValue( Enum counter )
264    {
265    return getReporter().getCounter( counter ).getValue();
266    }
267
268  @Override
269  public long getCounterValue( String group, String counter )
270    {
271    return getReporter().getCounter( group, counter ).getValue();
272    }
273
274  @Override
275  public void setStatus( String status )
276    {
277    getReporter().setStatus( status );
278    }
279
280  @Override
281  public boolean isCounterStatusInitialized()
282    {
283    return getReporter() != null;
284    }
285
286  @Override
287  public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
288    {
289    return tap.openForRead( this );
290    }
291
292  @Override
293  public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
294    {
295    return tap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks
296    }
297
298  @Override
299  public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException
300    {
301    JobConf jobConf = HadoopUtil.copyJobConf( getJobConf() );
302
303    int stepNum = jobConf.getInt( "cascading.flow.step.num", 0 );
304    String partname;
305
306    if( jobConf.getBoolean( "mapred.task.is.map", true ) )
307      partname = String.format( "-m-%05d-", stepNum );
308    else
309      partname = String.format( "-r-%05d-", stepNum );
310
311    jobConf.set( "cascading.tapcollector.partname", "%s%spart" + partname + "%05d" );
312
313    return trap.openForWrite( new HadoopFlowProcess( this, jobConf ), null ); // do not honor sinkmode as this may be opened across tasks
314    }
315
316  @Override
317  public TupleEntryCollector openSystemIntermediateForWrite() throws IOException
318    {
319    return new TupleEntryCollector( Fields.size( 2 ) )
320    {
321    @Override
322    protected void collect( TupleEntry tupleEntry )
323      {
324      try
325        {
326        getOutputCollector().collect( tupleEntry.getObject( 0 ), tupleEntry.getObject( 1 ) );
327        }
328      catch( IOException exception )
329        {
330        throw new CascadingException( "failed collecting key and value", exception );
331        }
332      }
333    };
334    }
335
336  @Override
337  public <C> C copyConfig( C config )
338    {
339    return HadoopUtil.copyJobConf( config );
340    }
341
342  @Override
343  public <C> Map<String, String> diffConfigIntoMap( C defaultConfig, C updatedConfig )
344    {
345    return HadoopUtil.getConfig( (Configuration) defaultConfig, (Configuration) updatedConfig );
346    }
347
348  @Override
349  public JobConf mergeMapIntoConfig( JobConf defaultConfig, Map<String, String> map )
350    {
351    return HadoopUtil.mergeConf( defaultConfig, map, false );
352    }
353  }