001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collection;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.Iterator;
030import java.util.LinkedList;
031import java.util.List;
032import java.util.Map;
033import java.util.Set;
034
035import cascading.CascadingException;
036import cascading.flow.FlowElement;
037import cascading.flow.FlowElements;
038import cascading.flow.FlowException;
039import cascading.flow.FlowNode;
040import cascading.flow.FlowProcess;
041import cascading.flow.FlowRuntimeProps;
042import cascading.flow.hadoop.ConfigurationSetter;
043import cascading.flow.hadoop.util.HadoopUtil;
044import cascading.flow.planner.BaseFlowStep;
045import cascading.flow.planner.FlowStepJob;
046import cascading.flow.planner.graph.ElementGraph;
047import cascading.flow.planner.process.FlowNodeGraph;
048import cascading.flow.planner.process.ProcessEdge;
049import cascading.flow.stream.annotations.StreamMode;
050import cascading.flow.tez.planner.Hadoop2TezFlowStepJob;
051import cascading.flow.tez.util.TezUtil;
052import cascading.management.state.ClientState;
053import cascading.pipe.Boundary;
054import cascading.pipe.CoGroup;
055import cascading.pipe.Group;
056import cascading.pipe.GroupBy;
057import cascading.pipe.Merge;
058import cascading.property.AppProps;
059import cascading.tap.Tap;
060import cascading.tap.hadoop.Hfs;
061import cascading.tap.hadoop.PartitionTap;
062import cascading.tap.hadoop.util.Hadoop18TapUtil;
063import cascading.tuple.Fields;
064import cascading.tuple.hadoop.TupleSerialization;
065import cascading.tuple.hadoop.util.GroupingSortingComparator;
066import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator;
067import cascading.tuple.hadoop.util.ReverseTupleComparator;
068import cascading.tuple.hadoop.util.TupleComparator;
069import cascading.tuple.io.KeyTuple;
070import cascading.tuple.io.TuplePair;
071import cascading.tuple.io.ValueTuple;
072import cascading.tuple.tez.util.GroupingSortingPartitioner;
073import cascading.tuple.tez.util.TuplePartitioner;
074import cascading.util.Util;
075import cascading.util.Version;
076import org.apache.hadoop.conf.Configuration;
077import org.apache.hadoop.fs.FileSystem;
078import org.apache.hadoop.fs.Path;
079import org.apache.hadoop.mapred.JobConf;
080import org.apache.hadoop.mapreduce.JobContext;
081import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
082import org.apache.hadoop.yarn.api.records.LocalResource;
083import org.apache.hadoop.yarn.api.records.LocalResourceType;
084import org.apache.tez.common.TezUtils;
085import org.apache.tez.dag.api.DAG;
086import org.apache.tez.dag.api.DataSinkDescriptor;
087import org.apache.tez.dag.api.DataSourceDescriptor;
088import org.apache.tez.dag.api.Edge;
089import org.apache.tez.dag.api.EdgeProperty;
090import org.apache.tez.dag.api.GroupInputEdge;
091import org.apache.tez.dag.api.InputDescriptor;
092import org.apache.tez.dag.api.OutputDescriptor;
093import org.apache.tez.dag.api.ProcessorDescriptor;
094import org.apache.tez.dag.api.TezConfiguration;
095import org.apache.tez.dag.api.UserPayload;
096import org.apache.tez.dag.api.Vertex;
097import org.apache.tez.dag.api.VertexGroup;
098import org.apache.tez.mapreduce.input.MRInput;
099import org.apache.tez.mapreduce.output.MROutput;
100import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
101import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
102import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
103import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
104import org.apache.tez.runtime.library.input.UnorderedKVInput;
105import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
106import org.apache.tez.runtime.library.output.UnorderedKVOutput;
107import org.apache.tez.runtime.library.output.UnorderedPartitionedKVOutput;
108import org.slf4j.Logger;
109import org.slf4j.LoggerFactory;
110
111import static cascading.flow.hadoop.util.HadoopUtil.*;
112import static cascading.flow.tez.util.TezUtil.addToClassPath;
113import static cascading.tap.hadoop.DistCacheTap.CASCADING_LOCAL_RESOURCES;
114import static cascading.tap.hadoop.DistCacheTap.CASCADING_REMOTE_RESOURCES;
115import static cascading.util.Util.getFirst;
116import static java.util.Collections.singletonList;
117import static org.apache.hadoop.yarn.api.records.LocalResourceType.ARCHIVE;
118import static org.apache.hadoop.yarn.api.records.LocalResourceType.FILE;
119
120/**
121 *
122 */
123public class Hadoop2TezFlowStep extends BaseFlowStep<TezConfiguration>
124  {
125  private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezFlowStep.class );
126
127  private Map<String, LocalResource> allLocalResources = new HashMap<>();
128  private Map<Path, Path> syncPaths = new HashMap<>();
129  private Map<String, String> environment = new HashMap<>();
130
131  public Hadoop2TezFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph )
132    {
133    super( elementGraph, flowNodeGraph );
134    }
135
136  @Override
137  public Map<Object, Object> getConfigAsProperties()
138    {
139    return HadoopUtil.createProperties( getConfig() );
140    }
141
142  @Override
143  public TezConfiguration createInitializedConfig( FlowProcess<TezConfiguration> flowProcess, TezConfiguration parentConfig )
144    {
145    TezConfiguration stepConf = parentConfig == null ? new TezConfiguration() : new TezConfiguration( parentConfig );
146
147    TupleSerialization.setSerializations( stepConf );
148
149    String versionString = Version.getRelease();
150
151    if( versionString != null )
152      stepConf.set( "cascading.version", versionString );
153
154    stepConf.set( CASCADING_FLOW_STEP_ID, getID() );
155    stepConf.set( "cascading.flow.step.num", Integer.toString( getOrdinal() ) );
156
157    String flowStagingPath = ( (Hadoop2TezFlow) getFlow() ).getFlowStagingPath();
158    List<String> classPath = ( (Hadoop2TezFlow) getFlow() ).getClassPath();
159
160    // is updated in addToClassPath method
161    Map<String, LocalResource> dagResources = new HashMap<>();
162
163    if( !classPath.isEmpty() )
164      {
165      // jars in the root will be in the remote CLASSPATH, no need to add to the environment
166      Map<Path, Path> dagClassPath = addToClassPath( stepConf, flowStagingPath, null, classPath, FILE, dagResources, null );
167
168      syncPaths.putAll( dagClassPath );
169      }
170
171    String appJarPath = stepConf.get( AppProps.APP_JAR_PATH );
172
173    if( appJarPath != null )
174      {
175      // the PATTERN represents the insides of the app jar, those elements must be added to the remote CLASSPATH
176      List<String> classpath = singletonList( appJarPath );
177      Map<Path, Path> pathMap = addToClassPath( stepConf, flowStagingPath, null, classpath, ARCHIVE, dagResources, environment );
178
179      syncPaths.putAll( pathMap );
180
181      // AM does not support environments like containers do, so the classpath has to be passed via configuration.
182      String fileName = new File( appJarPath ).getName();
183      stepConf.set( TezConfiguration.TEZ_CLUSTER_ADDITIONAL_CLASSPATH_PREFIX,
184        "$PWD/" + fileName + "/:$PWD/" + fileName + "/classes/:$PWD/" + fileName + "/lib/*:" );
185      }
186
187    allLocalResources.putAll( dagResources );
188
189    initFromStepConfigDef( stepConf );
190
191    return stepConf;
192    }
193
194  @Override
195  protected FlowStepJob createFlowStepJob( ClientState clientState, FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedStepConfig )
196    {
197    DAG dag = createDAG( flowProcess, initializedStepConfig );
198
199    return new Hadoop2TezFlowStepJob( clientState, this, initializedStepConfig, dag );
200    }
201
202  private DAG createDAG( FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig )
203    {
204    FlowNodeGraph nodeGraph = getFlowNodeGraph();
205    Map<FlowNode, Vertex> vertexMap = new HashMap<>();
206    DAG dag = DAG.create( getStepDisplayName( initializedConfig.getInt( "cascading.display.id.truncate", Util.ID_LENGTH ) ) );
207
208    dag.addTaskLocalFiles( allLocalResources );
209
210    Iterator<FlowNode> iterator = nodeGraph.getOrderedTopologicalIterator(); // ordering of nodes for consistent remote debugging
211
212    while( iterator.hasNext() )
213      {
214      FlowNode flowNode = iterator.next();
215
216      Vertex vertex = createVertex( flowProcess, initializedConfig, flowNode );
217      dag.addVertex( vertex );
218
219      vertexMap.put( flowNode, vertex );
220      }
221
222    LinkedList<ProcessEdge> processedEdges = new LinkedList<>();
223
224    for( ProcessEdge processEdge : nodeGraph.edgeSet() )
225      {
226      if( processedEdges.contains( processEdge ) )
227        continue;
228
229      FlowNode edgeTargetFlowNode = nodeGraph.getEdgeTarget( processEdge );
230
231      FlowElement flowElement = processEdge.getFlowElement();
232      List<FlowNode> sourceNodes = nodeGraph.getElementSourceProcesses( flowElement );
233
234      EdgeProperty edgeProperty = createEdgeProperty( initializedConfig, processEdge );
235
236      Vertex targetVertex = vertexMap.get( edgeTargetFlowNode );
237
238      if( sourceNodes.size() == 1 || flowElement instanceof CoGroup || flowElement instanceof Boundary ) // todo: create group vertices around incoming ordinal
239        {
240        FlowNode edgeSourceFlowNode = nodeGraph.getEdgeSource( processEdge );
241        Vertex sourceVertex = vertexMap.get( edgeSourceFlowNode );
242
243        LOG.debug( "adding edge between: {} and {}", sourceVertex, targetVertex );
244
245        dag.addEdge( Edge.create( sourceVertex, targetVertex, edgeProperty ) );
246        }
247      else if( flowElement instanceof GroupBy || flowElement instanceof Merge ) // merge - source nodes > 1
248        {
249        List<String> sourceVerticesIDs = new ArrayList<>();
250        List<Vertex> sourceVertices = new ArrayList<>();
251
252        for( FlowNode edgeSourceFlowNode : sourceNodes )
253          {
254          sourceVerticesIDs.add( edgeSourceFlowNode.getID() );
255          sourceVertices.add( vertexMap.get( edgeSourceFlowNode ) );
256          processedEdges.add( nodeGraph.getEdge( edgeSourceFlowNode, edgeTargetFlowNode ) );
257          }
258
259        VertexGroup vertexGroup = dag.createVertexGroup( edgeTargetFlowNode.getID(), sourceVertices.toArray( new Vertex[ sourceVertices.size() ] ) );
260
261        String inputClassName = flowElement instanceof Group ? OrderedGroupedMergedKVInput.class.getName() : ConcatenatedMergedKeyValueInput.class.getName();
262
263        InputDescriptor inputDescriptor = InputDescriptor.create( inputClassName ).setUserPayload( edgeProperty.getEdgeDestination().getUserPayload() );
264
265        LOG.info( "adding grouped edge between: {} and {}", Util.join( sourceVerticesIDs, "," ), targetVertex.getName() );
266        dag.addEdge( GroupInputEdge.create( vertexGroup, targetVertex, edgeProperty, inputDescriptor ) );
267        }
268      else
269        {
270        throw new UnsupportedOperationException( "can't make edge for: " + flowElement );
271        }
272      }
273
274    return dag;
275    }
276
277  private EdgeProperty createEdgeProperty( TezConfiguration config, ProcessEdge processEdge )
278    {
279    FlowElement flowElement = processEdge.getFlowElement();
280
281    EdgeValues edgeValues = new EdgeValues( new TezConfiguration( config ), processEdge );
282
283    edgeValues.keyClassName = KeyTuple.class.getName(); // TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS
284    edgeValues.valueClassName = ValueTuple.class.getName(); // TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS
285    edgeValues.keyComparatorClassName = TupleComparator.class.getName();
286    edgeValues.keyPartitionerClassName = TuplePartitioner.class.getName();
287    edgeValues.outputClassName = null;
288    edgeValues.inputClassName = null;
289    edgeValues.movementType = null;
290    edgeValues.sourceType = null;
291    edgeValues.schedulingType = null;
292
293    if( flowElement instanceof Group )
294      applyGroup( edgeValues );
295    else if( ( flowElement instanceof Boundary || flowElement instanceof Merge ) && processEdge.getSinkAnnotations().contains( StreamMode.Accumulated ) )
296      applyBoundaryMergeAccumulated( edgeValues );
297    else if( flowElement instanceof Boundary || flowElement instanceof Merge )
298      applyBoundaryMerge( edgeValues );
299    else
300      throw new IllegalStateException( "unsupported flow element: " + flowElement.getClass().getCanonicalName() );
301
302    applyEdgeAnnotations( processEdge, edgeValues );
303
304    return createEdgeProperty( edgeValues );
305    }
306
307  private void applyEdgeAnnotations( ProcessEdge processEdge, EdgeValues edgeValues )
308    {
309    processEdge.addEdgeAnnotation( edgeValues.movementType );
310    processEdge.addEdgeAnnotation( edgeValues.sourceType );
311    processEdge.addEdgeAnnotation( edgeValues.schedulingType );
312    }
313
314  private EdgeValues applyBoundaryMerge( EdgeValues edgeValues )
315    {
316    // todo: support for one to one
317    edgeValues.outputClassName = UnorderedPartitionedKVOutput.class.getName();
318    edgeValues.inputClassName = UnorderedKVInput.class.getName();
319
320    edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER;
321    edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
322    edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
323
324    return edgeValues;
325    }
326
327  private EdgeValues applyBoundaryMergeAccumulated( EdgeValues edgeValues )
328    {
329    edgeValues.outputClassName = UnorderedKVOutput.class.getName();
330    edgeValues.inputClassName = UnorderedKVInput.class.getName();
331
332    edgeValues.movementType = EdgeProperty.DataMovementType.BROADCAST;
333    edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
334    edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
335
336    return edgeValues;
337    }
338
339  private EdgeValues applyGroup( EdgeValues edgeValues )
340    {
341    Group group = (Group) edgeValues.flowElement;
342
343    if( group.isSortReversed() )
344      edgeValues.keyComparatorClassName = ReverseTupleComparator.class.getName();
345
346    int ordinal = getFirst( edgeValues.ordinals );
347
348    addComparators( edgeValues.config, "cascading.group.comparator", group.getKeySelectors(), edgeValues.getResolvedKeyFieldsMap().get( ordinal ) );
349
350    if( !group.isGroupBy() )
351      {
352      edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName();
353      edgeValues.inputClassName = OrderedGroupedKVInput.class.getName();
354
355      edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER;
356      edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
357      edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
358      }
359    else
360      {
361      addComparators( edgeValues.config, "cascading.sort.comparator", group.getSortingSelectors(), edgeValues.getResolvedSortFieldsMap().get( ordinal ) );
362
363      edgeValues.outputClassName = OrderedPartitionedKVOutput.class.getName();
364      edgeValues.inputClassName = OrderedGroupedKVInput.class.getName();
365
366      edgeValues.movementType = EdgeProperty.DataMovementType.SCATTER_GATHER;
367      edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED;
368      edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
369      }
370
371    if( group.isSorted() )
372      {
373      edgeValues.keyClassName = TuplePair.class.getName();
374      edgeValues.keyPartitionerClassName = GroupingSortingPartitioner.class.getName();
375
376      if( group.isSortReversed() )
377        edgeValues.keyComparatorClassName = ReverseGroupingSortingComparator.class.getName();
378      else
379        edgeValues.keyComparatorClassName = GroupingSortingComparator.class.getName();
380      }
381
382    return edgeValues;
383    }
384
385  private EdgeProperty createEdgeProperty( EdgeValues edgeValues )
386    {
387    TezConfiguration outputConfig = new TezConfiguration( edgeValues.getConfig() );
388    outputConfig.set( "cascading.node.sink", FlowElements.id( edgeValues.getFlowElement() ) );
389    outputConfig.set( "cascading.node.ordinals", Util.join( edgeValues.getOrdinals(), "," ) );
390    addFields( outputConfig, "cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap() );
391    addFields( outputConfig, "cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap() );
392    addFields( outputConfig, "cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap() );
393
394    UserPayload outputPayload = createIntermediatePayloadOutput( outputConfig, edgeValues );
395
396    TezConfiguration inputConfig = new TezConfiguration( edgeValues.getConfig() );
397    inputConfig.set( "cascading.node.source", FlowElements.id( edgeValues.getFlowElement() ) );
398    inputConfig.set( "cascading.node.ordinals", Util.join( edgeValues.getOrdinals(), "," ) );
399    addFields( inputConfig, "cascading.node.key.fields", edgeValues.getResolvedKeyFieldsMap() );
400    addFields( inputConfig, "cascading.node.sort.fields", edgeValues.getResolvedSortFieldsMap() );
401    addFields( inputConfig, "cascading.node.value.fields", edgeValues.getResolvedValueFieldsMap() );
402
403    UserPayload inputPayload = createIntermediatePayloadInput( inputConfig, edgeValues );
404
405    return EdgeProperty.create(
406      edgeValues.getMovementType(),
407      edgeValues.getSourceType(),
408      edgeValues.getSchedulingType(),
409      OutputDescriptor.create( edgeValues.getOutputClassName() ).setUserPayload( outputPayload ),
410      InputDescriptor.create( edgeValues.getInputClassName() ).setUserPayload( inputPayload )
411    );
412    }
413
414  private UserPayload createIntermediatePayloadOutput( TezConfiguration config, EdgeValues edgeValues )
415    {
416    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, edgeValues.keyClassName );
417    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, edgeValues.valueClassName );
418    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, edgeValues.keyComparatorClassName );
419    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, edgeValues.keyPartitionerClassName );
420
421    setWorkingDirectory( config );
422
423    return getPayload( config );
424    }
425
426  private UserPayload createIntermediatePayloadInput( TezConfiguration config, EdgeValues edgeValues )
427    {
428    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, edgeValues.keyClassName );
429    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, edgeValues.valueClassName );
430    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS, edgeValues.keyComparatorClassName );
431    config.set( TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, edgeValues.keyPartitionerClassName );
432
433    setWorkingDirectory( config );
434
435    return getPayload( config );
436    }
437
438  private static void setWorkingDirectory( Configuration conf )
439    {
440    String name = conf.get( JobContext.WORKING_DIR );
441
442    if( name != null )
443      return;
444
445    try
446      {
447      Path dir = FileSystem.get( conf ).getWorkingDirectory();
448      conf.set( JobContext.WORKING_DIR, dir.toString() );
449      }
450    catch( IOException exception )
451      {
452      throw new RuntimeException( exception );
453      }
454    }
455
456  public Vertex createVertex( FlowProcess<TezConfiguration> flowProcess, TezConfiguration initializedConfig, FlowNode flowNode )
457    {
458    JobConf conf = new JobConf( initializedConfig );
459
460    addInputOutputMapping( conf, flowNode );
461
462    conf.setBoolean( "mapred.used.genericoptionsparser", true );
463
464    Map<String, LocalResource> taskLocalResources = new HashMap<>();
465
466    Map<FlowElement, Configuration> sourceConfigs = initFromSources( flowNode, flowProcess, conf, taskLocalResources );
467    Map<FlowElement, Configuration> sinkConfigs = initFromSinks( flowNode, flowProcess, conf );
468
469    initFromTraps( flowNode, flowProcess, conf );
470
471    initFromNodeConfigDef( flowNode, conf );
472
473    // force step to local mode if any tap is local
474    setLocalMode( initializedConfig, conf, null );
475
476    conf.set( "cascading.flow.node.num", Integer.toString( flowNode.getOrdinal() ) );
477
478    HadoopUtil.setIsInflow( conf ); // must be called after all taps configurations have been retrieved
479
480    int parallelism = getParallelism( flowNode, conf );
481
482    if( parallelism == 0 )
483      throw new FlowException( getName(), "the default number of gather partitions must be set, see cascading.flow.FlowRuntimeProps" );
484
485    flowNode.addProcessAnnotation( FlowRuntimeProps.GATHER_PARTITIONS, Integer.toString( parallelism ) );
486
487    Vertex vertex = newVertex( flowNode, conf, parallelism );
488
489    if( !taskLocalResources.isEmpty() )
490      vertex.addTaskLocalFiles( taskLocalResources );
491
492    for( FlowElement flowElement : sourceConfigs.keySet() )
493      {
494      if( !( flowElement instanceof Tap ) )
495        continue;
496
497      Configuration sourceConf = sourceConfigs.get( flowElement );
498
499      // not setting the new-api value could result in failures if not set by the Scheme
500      if( sourceConf.get( "mapred.mapper.new-api" ) == null )
501        HadoopUtil.setNewApi( sourceConf, sourceConf.get( "mapred.input.format.class", sourceConf.get( "mapreduce.job.inputformat.class" ) ) );
502
503      // unfortunately we cannot just load the input format and set it on the builder with also pulling all other
504      // values out of the configuration.
505      MRInput.MRInputConfigBuilder configBuilder = MRInput.createConfigBuilder( sourceConf, null );
506
507      // the default in Tez is true, this overrides
508      if( conf.get( FlowRuntimeProps.COMBINE_SPLITS ) != null )
509        configBuilder.groupSplits( conf.getBoolean( FlowRuntimeProps.COMBINE_SPLITS, true ) );
510
511      // grouping splits loses file name info, breaking partition tap default impl
512      if( flowElement instanceof PartitionTap ) // todo: generify
513        configBuilder.groupSplits( false );
514
515      DataSourceDescriptor dataSourceDescriptor = configBuilder.build();
516
517      vertex.addDataSource( FlowElements.id( flowElement ), dataSourceDescriptor );
518      }
519
520    for( FlowElement flowElement : sinkConfigs.keySet() )
521      {
522      if( !( flowElement instanceof Tap ) )
523        continue;
524
525      Configuration sinkConf = sinkConfigs.get( flowElement );
526
527      Class outputFormatClass;
528      String outputPath;
529
530      // we have to set sane defaults if not set by the tap
531      // typically the case of MultiSinkTap
532      String formatClassName = sinkConf.get( "mapred.output.format.class", sinkConf.get( "mapreduce.job.outputformat.class" ) );
533
534      if( formatClassName == null )
535        {
536        outputFormatClass = TextOutputFormat.class; // unused, use "new" api, its the default
537        outputPath = Hfs.getTempPath( sinkConf ).toString(); // unused
538        }
539      else
540        {
541        outputFormatClass = Util.loadClass( formatClassName );
542        outputPath = getOutputPath( sinkConf );
543        }
544
545      if( outputPath == null && getOutputPath( sinkConf ) == null && isFileOutputFormat( outputFormatClass ) )
546        outputPath = Hfs.getTempPath( sinkConf ).toString(); // unused
547
548      MROutput.MROutputConfigBuilder configBuilder = MROutput.createConfigBuilder( sinkConf, outputFormatClass, outputPath );
549
550      DataSinkDescriptor dataSinkDescriptor = configBuilder.build();
551
552      vertex.addDataSink( FlowElements.id( flowElement ), dataSinkDescriptor );
553      }
554
555    addRemoteDebug( flowNode, vertex );
556    addRemoteProfiling( flowNode, vertex );
557
558    if( vertex.getTaskLaunchCmdOpts() != null )
559      flowNode.addProcessAnnotation( TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, vertex.getTaskLaunchCmdOpts() );
560
561    return vertex;
562    }
563
564  protected String getOutputPath( Configuration sinkConf )
565    {
566    return sinkConf.get( "mapred.output.dir", sinkConf.get( "mapreduce.output.fileoutputformat.outputdir" ) );
567    }
568
569  protected boolean isFileOutputFormat( Class outputFormatClass )
570    {
571    return org.apache.hadoop.mapred.FileOutputFormat.class.isAssignableFrom( outputFormatClass ) ||
572      org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.isAssignableFrom( outputFormatClass );
573    }
574
575  protected int getParallelism( FlowNode flowNode, JobConf conf )
576    {
577    // only count streamed taps, accumulated taps are always annotated
578    HashSet<Tap> sourceStreamedTaps = new HashSet<>( flowNode.getSourceTaps() );
579
580    sourceStreamedTaps.removeAll( flowNode.getSourceElements( StreamMode.Accumulated ) );
581
582    if( sourceStreamedTaps.size() != 0 )
583      return -1;
584
585    int parallelism = Integer.MAX_VALUE;
586
587    for( Tap tap : flowNode.getSinkTaps() )
588      {
589      int numSinkParts = tap.getScheme().getNumSinkParts();
590
591      if( numSinkParts == 0 )
592        continue;
593
594      if( parallelism != Integer.MAX_VALUE )
595        LOG.info( "multiple sink taps in flow node declaring numSinkParts, choosing lowest value. see cascading.flow.FlowRuntimeProps for broader control." );
596
597      parallelism = Math.min( parallelism, numSinkParts );
598      }
599
600    if( parallelism != Integer.MAX_VALUE )
601      return parallelism;
602
603    return conf.getInt( FlowRuntimeProps.GATHER_PARTITIONS, 0 );
604    }
605
606  private void addInputOutputMapping( JobConf conf, FlowNode flowNode )
607    {
608    FlowNodeGraph flowNodeGraph = getFlowNodeGraph();
609    Set<ProcessEdge> incomingEdges = flowNodeGraph.incomingEdgesOf( flowNode );
610
611    for( ProcessEdge processEdge : incomingEdges )
612      conf.set( "cascading.node.source." + processEdge.getFlowElementID(), processEdge.getSourceProcessID() );
613
614    Set<ProcessEdge> outgoingEdges = flowNodeGraph.outgoingEdgesOf( flowNode );
615
616    for( ProcessEdge processEdge : outgoingEdges )
617      conf.set( "cascading.node.sink." + processEdge.getFlowElementID(), processEdge.getSinkProcessID() );
618    }
619
620  protected Map<FlowElement, Configuration> initFromSources( FlowNode flowNode, FlowProcess<TezConfiguration> flowProcess,
621                                                             Configuration conf, Map<String, LocalResource> taskLocalResources )
622    {
623    Set<? extends FlowElement> accumulatedSources = flowNode.getSourceElements( StreamMode.Accumulated );
624
625    for( FlowElement element : accumulatedSources )
626      {
627      if( element instanceof Tap )
628        {
629        JobConf current = new JobConf( conf );
630        Tap tap = (Tap) element;
631
632        if( tap.getIdentifier() == null )
633          throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
634
635        tap.sourceConfInit( flowProcess, current );
636
637        Collection<String> paths = current.getStringCollection( CASCADING_LOCAL_RESOURCES + Tap.id( tap ) );
638
639        if( !paths.isEmpty() )
640          {
641          String flowStagingPath = ( (Hadoop2TezFlow) getFlow() ).getFlowStagingPath();
642          String resourceSubPath = Tap.id( tap );
643          Map<Path, Path> pathMap = TezUtil.addToClassPath( current, flowStagingPath, resourceSubPath, paths, LocalResourceType.FILE, taskLocalResources, null );
644
645          current.setStrings( CASCADING_REMOTE_RESOURCES + Tap.id( tap ), taskLocalResources.keySet().toArray( new String[ taskLocalResources.size() ] ) );
646
647          allLocalResources.putAll( taskLocalResources );
648          syncPaths.putAll( pathMap );
649          }
650
651        Map<String, String> map = flowProcess.diffConfigIntoMap( new TezConfiguration( conf ), new TezConfiguration( current ) );
652        conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) );
653
654        setLocalMode( conf, current, tap );
655        }
656      }
657
658    Set<FlowElement> sources = new HashSet<>( flowNode.getSourceElements() );
659
660    sources.removeAll( accumulatedSources );
661
662    if( sources.isEmpty() )
663      throw new IllegalStateException( "all sources marked as accumulated" );
664
665    Map<FlowElement, Configuration> configs = new HashMap<>();
666
667    for( FlowElement element : sources )
668      {
669      JobConf current = new JobConf( conf );
670
671      String id = FlowElements.id( element );
672
673      current.set( "cascading.node.source", id );
674
675      if( element instanceof Tap )
676        {
677        Tap tap = (Tap) element;
678
679        if( tap.getIdentifier() == null )
680          throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() );
681
682        tap.sourceConfInit( flowProcess, current );
683
684        setLocalMode( conf, current, tap );
685        }
686
687      configs.put( element, current );
688      }
689
690    return configs;
691    }
692
693  protected Map<FlowElement, Configuration> initFromSinks( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf )
694    {
695    Set<FlowElement> sinks = flowNode.getSinkElements();
696    Map<FlowElement, Configuration> configs = new HashMap<>();
697
698    for( FlowElement element : sinks )
699      {
700      JobConf current = new JobConf( conf );
701
702      if( element instanceof Tap )
703        {
704        Tap tap = (Tap) element;
705
706        if( tap.getIdentifier() == null )
707          throw new IllegalStateException( "tap may not have null identifier: " + element.toString() );
708
709        tap.sinkConfInit( flowProcess, current );
710
711        setLocalMode( conf, current, tap );
712        }
713
714      String id = FlowElements.id( element );
715
716      current.set( "cascading.node.sink", id );
717
718      configs.put( element, current );
719      }
720
721    return configs;
722    }
723
724  private void initFromNodeConfigDef( FlowNode flowNode, Configuration conf )
725    {
726    initConfFromNodeConfigDef( flowNode.getElementGraph(), new ConfigurationSetter( conf ) );
727    }
728
729  private void initFromStepConfigDef( Configuration conf )
730    {
731    initConfFromStepConfigDef( new ConfigurationSetter( conf ) );
732    }
733
734  protected void initFromTraps( FlowNode flowNode, FlowProcess<? extends Configuration> flowProcess, Configuration conf )
735    {
736    Map<String, Tap> traps = flowNode.getTrapMap();
737
738    if( !traps.isEmpty() )
739      {
740      JobConf trapConf = new JobConf( conf );
741
742      for( Tap tap : traps.values() )
743        {
744        tap.sinkConfInit( flowProcess, trapConf );
745        setLocalMode( conf, trapConf, tap );
746        }
747      }
748    }
749
750  private Vertex newVertex( FlowNode flowNode, Configuration conf, int parallelism )
751    {
752    conf.set( FlowNode.CASCADING_FLOW_NODE, pack( flowNode, conf ) ); // todo: pack into payload directly
753
754    ProcessorDescriptor descriptor = ProcessorDescriptor.create( FlowProcessor.class.getName() );
755
756    descriptor.setUserPayload( getPayload( conf ) );
757
758    Vertex vertex = Vertex.create( flowNode.getID(), descriptor, parallelism );
759
760    if( environment != null )
761      vertex.setTaskEnvironment( environment );
762
763    return vertex;
764    }
765
766  private UserPayload getPayload( Configuration conf )
767    {
768    try
769      {
770      return TezUtils.createUserPayloadFromConf( conf );
771      }
772    catch( IOException exception )
773      {
774      throw new CascadingException( exception );
775      }
776    }
777
778  private String pack( Object object, Configuration conf )
779    {
780    try
781      {
782      return serializeBase64( object, conf, true );
783      }
784    catch( IOException exception )
785      {
786      throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception );
787      }
788    }
789
790  @Override
791  public void clean( TezConfiguration config )
792    {
793    for( Tap sink : getSinkTaps() )
794      {
795      if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) )
796        {
797        try
798          {
799          sink.deleteResource( config );
800          }
801        catch( Exception exception )
802          {
803          // sink all exceptions, don't fail app
804          logWarn( "unable to remove temporary file: " + sink, exception );
805          }
806        }
807      else
808        {
809        cleanTapMetaData( config, sink );
810        }
811      }
812
813    for( Tap tap : getTraps() )
814      cleanTapMetaData( config, tap );
815    }
816
817  private void cleanTapMetaData( TezConfiguration config, Tap tap )
818    {
819    try
820      {
821      Hadoop18TapUtil.cleanupTapMetaData( config, tap );
822      }
823    catch( IOException exception )
824      {
825      // ignore exception
826      }
827    }
828
829  public void syncArtifacts()
830    {
831    // this may not be strictly necessary, but there is a condition where setting the access time
832    // fails, so there may be one were setting the modification time fails. if so, we can compensate.
833    Map<String, Long> timestamps = HadoopUtil.syncPaths( getConfig(), syncPaths, true );
834
835    for( Map.Entry<String, Long> entry : timestamps.entrySet() )
836      {
837      LocalResource localResource = allLocalResources.get( entry.getKey() );
838
839      if( localResource != null )
840        localResource.setTimestamp( entry.getValue() );
841      }
842    }
843
844  private void setLocalMode( Configuration parent, JobConf current, Tap tap )
845    {
846    // force step to local mode
847    if( !HadoopUtil.isLocal( current ) )
848      return;
849
850    if( tap != null )
851      logInfo( "tap forcing step to tez local mode: " + tap.getIdentifier() );
852
853    HadoopUtil.setLocal( parent );
854    }
855
856  private void addRemoteDebug( FlowNode flowNode, Vertex vertex )
857    {
858    String value = System.getProperty( "test.debug.node", null );
859
860    if( Util.isEmpty( value ) )
861      return;
862
863    if( !flowNode.getSourceElementNames().contains( value ) && asInt( value ) != flowNode.getOrdinal() )
864      return;
865
866    LOG.warn( "remote debugging enabled with property: {}, on node: {}, with node id: {}", "test.debug.node", value, flowNode.getID() );
867
868    String opts = vertex.getTaskLaunchCmdOpts();
869
870    if( opts == null )
871      opts = "";
872
873    String address = System.getProperty( "test.debug.address", "localhost:5005" ).trim();
874
875    opts += " -agentlib:jdwp=transport=dt_socket,server=n,address=" + address + ",suspend=y";
876
877    vertex.setTaskLaunchCmdOpts( opts );
878    }
879
880  private void addRemoteProfiling( FlowNode flowNode, Vertex vertex )
881    {
882    String value = System.getProperty( "test.profile.node", null );
883
884    if( Util.isEmpty( value ) )
885      return;
886
887    if( !flowNode.getSourceElementNames().contains( value ) && asInt( value ) != flowNode.getOrdinal() )
888      return;
889
890    LOG.warn( "remote profiling enabled with property: {}, on node: {}, with node id: {}", "test.profile.node", value, flowNode.getID() );
891
892    String opts = vertex.getTaskLaunchCmdOpts();
893
894    if( opts == null )
895      opts = "";
896
897    String path = System.getProperty( "test.profile.path", "/tmp/jfr/" );
898
899    if( !path.endsWith( "/" ) )
900      path += "/";
901
902    LOG.warn( "remote profiling property: {}, logging to path: {}", "test.profile.path", path );
903
904    opts += String.format( " -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=%1$s%2$s,disk=true,repository=%1$s%2$s", path, flowNode.getID() );
905
906    vertex.setTaskLaunchCmdOpts( opts );
907    }
908
909  private int asInt( String value )
910    {
911    try
912      {
913      return Integer.parseInt( value );
914      }
915    catch( NumberFormatException exception )
916      {
917      return -1;
918      }
919    }
920
921  public Map<String, LocalResource> getAllLocalResources()
922    {
923    return allLocalResources;
924    }
925
926  private static class EdgeValues
927    {
928    FlowElement flowElement;
929    TezConfiguration config;
930    Set<Integer> ordinals;
931    String keyClassName;
932    String valueClassName;
933    String keyComparatorClassName;
934    String keyPartitionerClassName;
935    String outputClassName;
936    String inputClassName;
937    EdgeProperty.DataMovementType movementType;
938    EdgeProperty.DataSourceType sourceType;
939    EdgeProperty.SchedulingType schedulingType;
940
941    Map<Integer, Fields> resolvedKeyFieldsMap;
942    Map<Integer, Fields> resolvedSortFieldsMap;
943    Map<Integer, Fields> resolvedValueFieldsMap;
944
945    private EdgeValues( TezConfiguration config, ProcessEdge processEdge )
946      {
947      this.config = config;
948      this.flowElement = processEdge.getFlowElement();
949      this.ordinals = processEdge.getSourceProvidedOrdinals();
950
951      this.resolvedKeyFieldsMap = processEdge.getResolvedKeyFields();
952      this.resolvedSortFieldsMap = processEdge.getResolvedSortFields();
953      this.resolvedValueFieldsMap = processEdge.getResolvedValueFields();
954      }
955
956    public FlowElement getFlowElement()
957      {
958      return flowElement;
959      }
960
961    public TezConfiguration getConfig()
962      {
963      return config;
964      }
965
966    public Set getOrdinals()
967      {
968      return ordinals;
969      }
970
971    public String getKeyClassName()
972      {
973      return keyClassName;
974      }
975
976    public String getValueClassName()
977      {
978      return valueClassName;
979      }
980
981    public String getKeyComparatorClassName()
982      {
983      return keyComparatorClassName;
984      }
985
986    public String getKeyPartitionerClassName()
987      {
988      return keyPartitionerClassName;
989      }
990
991    public String getOutputClassName()
992      {
993      return outputClassName;
994      }
995
996    public String getInputClassName()
997      {
998      return inputClassName;
999      }
1000
1001    public EdgeProperty.DataMovementType getMovementType()
1002      {
1003      return movementType;
1004      }
1005
1006    public EdgeProperty.DataSourceType getSourceType()
1007      {
1008      return sourceType;
1009      }
1010
1011    public EdgeProperty.SchedulingType getSchedulingType()
1012      {
1013      return schedulingType;
1014      }
1015
1016    public Map<Integer, Fields> getResolvedKeyFieldsMap()
1017      {
1018      return resolvedKeyFieldsMap;
1019      }
1020
1021    public Map<Integer, Fields> getResolvedSortFieldsMap()
1022      {
1023      return resolvedSortFieldsMap;
1024      }
1025
1026    public Map<Integer, Fields> getResolvedValueFieldsMap()
1027      {
1028      return resolvedValueFieldsMap;
1029      }
1030    }
1031  }