001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.io.IOException;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.Set;
028
029import cascading.CascadingException;
030import cascading.flow.FlowException;
031import cascading.flow.FlowNode;
032import cascading.flow.FlowProcess;
033import cascading.flow.FlowRuntimeProps;
034import cascading.flow.hadoop.planner.HadoopFlowStepJob;
035import cascading.flow.hadoop.util.HadoopMRUtil;
036import cascading.flow.hadoop.util.HadoopUtil;
037import cascading.flow.planner.BaseFlowStep;
038import cascading.flow.planner.FlowStepJob;
039import cascading.flow.planner.PlatformInfo;
040import cascading.flow.planner.graph.ElementGraph;
041import cascading.flow.planner.process.FlowNodeGraph;
042import cascading.management.state.ClientState;
043import cascading.tap.Tap;
044import cascading.tap.hadoop.io.MultiInputFormat;
045import cascading.tap.hadoop.util.Hadoop18TapUtil;
046import cascading.tap.hadoop.util.TempHfs;
047import cascading.tuple.Tuple;
048import cascading.tuple.hadoop.TupleSerialization;
049import cascading.tuple.hadoop.util.CoGroupingComparator;
050import cascading.tuple.hadoop.util.CoGroupingPartitioner;
051import cascading.tuple.hadoop.util.GroupingComparator;
052import cascading.tuple.hadoop.util.GroupingPartitioner;
053import cascading.tuple.hadoop.util.GroupingSortingComparator;
054import cascading.tuple.hadoop.util.GroupingSortingPartitioner;
055import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator;
056import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator;
057import cascading.tuple.hadoop.util.ReverseTupleComparator;
058import cascading.tuple.hadoop.util.TupleComparator;
059import cascading.tuple.io.IndexTuple;
060import cascading.tuple.io.TuplePair;
061import cascading.util.Util;
062import cascading.util.Version;
063import org.apache.hadoop.filecache.DistributedCache;
064import org.apache.hadoop.fs.Path;
065import org.apache.hadoop.mapred.FileOutputFormat;
066import org.apache.hadoop.mapred.JobConf;
067import org.apache.hadoop.mapred.OutputFormat;
068
069import static cascading.flow.hadoop.util.HadoopUtil.addComparators;
070import static cascading.flow.hadoop.util.HadoopUtil.pack;
071
072/**
073 *
074 */
075public class HadoopFlowStep extends BaseFlowStep<JobConf>
076  {
077  protected HadoopFlowStep( String name, int ordinal )
078    {
079    super( name, ordinal );
080    }
081
082  public HadoopFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph )
083    {
084    super( elementGraph, flowNodeGraph );
085    }
086
087  @Override
088  public Map<Object, Object> getConfigAsProperties()
089    {
090    return HadoopUtil.createProperties( getConfig() );
091    }
092
093  public JobConf createInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig )
094    {
095    JobConf conf = parentConfig == null ? new JobConf() : HadoopUtil.copyJobConf( parentConfig );
096
097    // disable warning
098    conf.setBoolean( "mapred.used.genericoptionsparser", true );
099
100    conf.setJobName( getStepDisplayName( conf.getInt( "cascading.display.id.truncate", Util.ID_LENGTH ) ) );
101
102    conf.setOutputKeyClass( Tuple.class );
103    conf.setOutputValueClass( Tuple.class );
104
105    conf.setMapRunnerClass( FlowMapper.class );
106    conf.setReducerClass( FlowReducer.class );
107
108    // set for use by the shuffling phase
109    TupleSerialization.setSerializations( conf );
110
111    initFromSources( flowProcess, conf );
112
113    initFromSink( flowProcess, conf );
114
115    initFromTraps( flowProcess, conf );
116
117    initFromStepConfigDef( conf );
118
119    int numSinkParts = getSink().getScheme().getNumSinkParts();
120
121    if( numSinkParts != 0 )
122      {
123      // if no reducer, set num map tasks to control parts
124      if( getGroup() != null )
125        conf.setNumReduceTasks( numSinkParts );
126      else
127        conf.setNumMapTasks( numSinkParts );
128      }
129    else if( getGroup() != null )
130      {
131      int gatherPartitions = conf.getNumReduceTasks();
132
133      if( gatherPartitions == 0 )
134        gatherPartitions = conf.getInt( FlowRuntimeProps.GATHER_PARTITIONS, 0 );
135
136      if( gatherPartitions == 0 )
137        throw new FlowException( getName(), "a default number of gather partitions must be set, see FlowRuntimeProps" );
138
139      conf.setNumReduceTasks( gatherPartitions );
140      }
141
142    conf.setOutputKeyComparatorClass( TupleComparator.class );
143
144    if( getGroup() == null )
145      {
146      conf.setNumReduceTasks( 0 ); // disable reducers
147      }
148    else
149      {
150      // must set map output defaults when performing a reduce
151      conf.setMapOutputKeyClass( Tuple.class );
152      conf.setMapOutputValueClass( Tuple.class );
153      conf.setPartitionerClass( GroupingPartitioner.class );
154
155      // handles the case the groupby sort should be reversed
156      if( getGroup().isSortReversed() )
157        conf.setOutputKeyComparatorClass( ReverseTupleComparator.class );
158
159      addComparators( conf, "cascading.group.comparator", getGroup().getKeySelectors(), this, getGroup() );
160
161      if( getGroup().isGroupBy() )
162        addComparators( conf, "cascading.sort.comparator", getGroup().getSortingSelectors(), this, getGroup() );
163
164      if( !getGroup().isGroupBy() )
165        {
166        conf.setPartitionerClass( CoGroupingPartitioner.class );
167        conf.setMapOutputKeyClass( IndexTuple.class ); // allows groups to be sorted by index
168        conf.setMapOutputValueClass( IndexTuple.class );
169        conf.setOutputKeyComparatorClass( IndexTupleCoGroupingComparator.class ); // sorts by group, then by index
170        conf.setOutputValueGroupingComparator( CoGroupingComparator.class );
171        }
172
173      if( getGroup().isSorted() )
174        {
175        conf.setPartitionerClass( GroupingSortingPartitioner.class );
176        conf.setMapOutputKeyClass( TuplePair.class );
177
178        if( getGroup().isSortReversed() )
179          conf.setOutputKeyComparatorClass( ReverseGroupingSortingComparator.class );
180        else
181          conf.setOutputKeyComparatorClass( GroupingSortingComparator.class );
182
183        // no need to supply a reverse comparator, only equality is checked
184        conf.setOutputValueGroupingComparator( GroupingComparator.class );
185        }
186      }
187
188    // perform last so init above will pass to tasks
189    String versionString = Version.getRelease();
190
191    if( versionString != null )
192      conf.set( "cascading.version", versionString );
193
194    conf.set( CASCADING_FLOW_STEP_ID, getID() );
195    conf.set( "cascading.flow.step.num", Integer.toString( getOrdinal() ) );
196
197    HadoopUtil.setIsInflow( conf );
198
199    Iterator<FlowNode> iterator = getFlowNodeGraph().getTopologicalIterator();
200
201    FlowNode mapperNode = iterator.next();
202    FlowNode reducerNode = iterator.hasNext() ? iterator.next() : null;
203
204    if( reducerNode != null )
205      reducerNode.addProcessAnnotation( FlowRuntimeProps.GATHER_PARTITIONS, Integer.toString( conf.getNumReduceTasks() ) );
206
207    String mapState = pack( mapperNode, conf );
208    String reduceState = pack( reducerNode, conf );
209
210    // hadoop 20.2 doesn't like dist cache when using local mode
211    int maxSize = Short.MAX_VALUE;
212
213    int length = mapState.length() + reduceState.length();
214
215    if( isHadoopLocalMode( conf ) || length < maxSize ) // seems safe
216      {
217      conf.set( "cascading.flow.step.node.map", mapState );
218
219      if( !Util.isEmpty( reduceState ) )
220        conf.set( "cascading.flow.step.node.reduce", reduceState );
221      }
222    else
223      {
224      conf.set( "cascading.flow.step.node.map.path", HadoopMRUtil.writeStateToDistCache( conf, getID(), "map", mapState ) );
225
226      if( !Util.isEmpty( reduceState ) )
227        conf.set( "cascading.flow.step.node.reduce.path", HadoopMRUtil.writeStateToDistCache( conf, getID(), "reduce", reduceState ) );
228      }
229
230    return conf;
231    }
232
233  public boolean isHadoopLocalMode( JobConf conf )
234    {
235    return HadoopUtil.isLocal( conf );
236    }
237
238  protected FlowStepJob<JobConf> createFlowStepJob( ClientState clientState, FlowProcess<JobConf> flowProcess, JobConf initializedStepConfig )
239    {
240    try
241      {
242      return new HadoopFlowStepJob( clientState, this, initializedStepConfig );
243      }
244    catch( NoClassDefFoundError error )
245      {
246      PlatformInfo platformInfo = HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" );
247      String message = "unable to load platform specific class, please verify Hadoop cluster version: '%s', matches the Hadoop platform build dependency and associated FlowConnector, cascading-hadoop or cascading-hadoop2-mr1";
248
249      logError( String.format( message, platformInfo.toString() ), error );
250
251      throw error;
252      }
253    }
254
255  /**
256   * Method clean removes any temporary files used by this FlowStep instance. It will log any IOExceptions thrown.
257   *
258   * @param config of type JobConf
259   */
260  public void clean( JobConf config )
261    {
262    String stepStatePath = config.get( "cascading.flow.step.path" );
263
264    if( stepStatePath != null )
265      {
266      try
267        {
268        HadoopUtil.removeStateFromDistCache( config, stepStatePath );
269        }
270      catch( IOException exception )
271        {
272        logWarn( "unable to remove step state file: " + stepStatePath, exception );
273        }
274      }
275
276    if( tempSink != null )
277      {
278      try
279        {
280        tempSink.deleteResource( config );
281        }
282      catch( Exception exception )
283        {
284        // sink all exceptions, don't fail app
285        logWarn( "unable to remove temporary file: " + tempSink, exception );
286        }
287      }
288
289    if( getSink().isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
290      {
291      try
292        {
293        getSink().deleteResource( config );
294        }
295      catch( Exception exception )
296        {
297        // sink all exceptions, don't fail app
298        logWarn( "unable to remove temporary file: " + getSink(), exception );
299        }
300      }
301    else
302      {
303      cleanTapMetaData( config, getSink() );
304      }
305
306    for( Tap tap : getTraps() )
307      cleanTapMetaData( config, tap );
308    }
309
310  private void cleanTapMetaData( JobConf jobConf, Tap tap )
311    {
312    try
313      {
314      Hadoop18TapUtil.cleanupTapMetaData( jobConf, tap );
315      }
316    catch( IOException exception )
317      {
318      // ignore exception
319      }
320    }
321
322  private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps )
323    {
324    if( !traps.isEmpty() )
325      {
326      JobConf trapConf = HadoopUtil.copyJobConf( conf );
327
328      for( Tap tap : traps.values() )
329        tap.sinkConfInit( flowProcess, trapConf );
330      }
331    }
332
333  protected void initFromSources( FlowProcess<JobConf> flowProcess, JobConf conf )
334    {
335    // handles case where same tap is used on multiple branches
336    // we do not want to init the same tap multiple times
337    Set<Tap> uniqueSources = getUniqueStreamedSources();
338
339    JobConf[] streamedJobs = new JobConf[ uniqueSources.size() ];
340    int i = 0;
341
342    for( Tap tap : uniqueSources )
343      {
344      if( tap.getIdentifier() == null )
345        throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
346
347      streamedJobs[ i ] = flowProcess.copyConfig( conf );
348
349      streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) );
350
351      tap.sourceConfInit( flowProcess, streamedJobs[ i ] );
352
353      i++;
354      }
355
356    Set<Tap> accumulatedSources = getAllAccumulatedSources();
357
358    for( Tap tap : accumulatedSources )
359      {
360      JobConf accumulatedJob = flowProcess.copyConfig( conf );
361
362      tap.sourceConfInit( flowProcess, accumulatedJob );
363
364      Map<String, String> map = flowProcess.diffConfigIntoMap( conf, accumulatedJob );
365      conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );
366
367      try
368        {
369        if( DistributedCache.getCacheFiles( accumulatedJob ) != null )
370          DistributedCache.setCacheFiles( DistributedCache.getCacheFiles( accumulatedJob ), conf );
371        }
372      catch( IOException exception )
373        {
374        throw new CascadingException( exception );
375        }
376      }
377
378    MultiInputFormat.addInputFormat( conf, streamedJobs ); //must come last
379    }
380
381  private void initFromStepConfigDef( final JobConf conf )
382    {
383    initConfFromStepConfigDef( new ConfigurationSetter( conf ) );
384    }
385
386  /**
387   * sources are specific to step, remove all known accumulated sources, if any
388   *
389   * @return
390   */
391  private Set<Tap> getUniqueStreamedSources()
392    {
393    Set<Tap> allAccumulatedSources = getAllAccumulatedSources();
394
395    HashSet<Tap> set = new HashSet<>( sources.keySet() );
396
397    set.removeAll( allAccumulatedSources );
398
399    return set;
400    }
401
402  protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf )
403    {
404    // init sink first so tempSink can take precedence
405    if( getSink() != null )
406      getSink().sinkConfInit( flowProcess, conf );
407
408    Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class );
409    boolean isFileOutputFormat = false;
410
411    if( outputFormat != null )
412      isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat );
413
414    Path outputPath = FileOutputFormat.getOutputPath( conf );
415
416    // if no output path is set, we need to substitute an alternative if the OutputFormat is file based
417    // PartitionTap won't set the output, but will set an OutputFormat
418    // MultiSinkTap won't set the output or set the OutputFormat
419    // Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..)
420    if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) )
421      tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true );
422
423    // tempSink exists because sink is writeDirect
424    if( tempSink != null )
425      tempSink.sinkConfInit( flowProcess, conf );
426    }
427
428  protected void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf )
429    {
430    initFromTraps( flowProcess, conf, getTrapMap() );
431    }
432  }