001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.stream.graph;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030
031import cascading.flow.FlowElement;
032import cascading.flow.FlowElements;
033import cascading.flow.FlowException;
034import cascading.flow.FlowNode;
035import cascading.flow.FlowProcess;
036import cascading.flow.Flows;
037import cascading.flow.hadoop.stream.HadoopMemoryJoinGate;
038import cascading.flow.hadoop.util.HadoopUtil;
039import cascading.flow.stream.annotations.StreamMode;
040import cascading.flow.stream.duct.Duct;
041import cascading.flow.stream.duct.Gate;
042import cascading.flow.stream.element.InputSource;
043import cascading.flow.stream.element.MemoryHashJoinGate;
044import cascading.flow.stream.element.SinkStage;
045import cascading.flow.stream.element.SourceStage;
046import cascading.flow.stream.graph.IORole;
047import cascading.flow.stream.graph.NodeStreamGraph;
048import cascading.flow.tez.Hadoop2TezFlowProcess;
049import cascading.flow.tez.stream.element.TezBoundaryStage;
050import cascading.flow.tez.stream.element.TezCoGroupGate;
051import cascading.flow.tez.stream.element.TezGroupByGate;
052import cascading.flow.tez.stream.element.TezMergeGate;
053import cascading.flow.tez.stream.element.TezSinkStage;
054import cascading.flow.tez.stream.element.TezSourceStage;
055import cascading.flow.tez.util.TezUtil;
056import cascading.pipe.Boundary;
057import cascading.pipe.CoGroup;
058import cascading.pipe.Group;
059import cascading.pipe.GroupBy;
060import cascading.pipe.HashJoin;
061import cascading.pipe.Merge;
062import cascading.pipe.Pipe;
063import cascading.tap.Tap;
064import cascading.util.SetMultiMap;
065import cascading.util.SortedListMultiMap;
066import cascading.util.Util;
067import org.apache.hadoop.conf.Configuration;
068import org.apache.tez.dag.api.TezConfiguration;
069import org.apache.tez.runtime.api.LogicalInput;
070import org.apache.tez.runtime.api.LogicalOutput;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import static cascading.flow.tez.util.TezUtil.*;
075
076/**
077 *
078 */
079public class Hadoop2TezStreamGraph extends NodeStreamGraph
080  {
081  private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezStreamGraph.class );
082
083  private InputSource streamedHead;
084  private Map<String, LogicalInput> inputMap;
085  private Map<String, LogicalOutput> outputMap;
086  private Map<LogicalInput, Configuration> inputConfigMap = new HashMap<>();
087  private Map<LogicalOutput, Configuration> outputConfigMap = new HashMap<>();
088  private SetMultiMap<String, LogicalInput> inputMultiMap;
089  private SetMultiMap<String, LogicalOutput> outputMultiMap;
090
091  public Hadoop2TezStreamGraph( Hadoop2TezFlowProcess currentProcess, FlowNode flowNode, Map<String, LogicalInput> inputMap, Map<String, LogicalOutput> outputMap )
092    {
093    super( currentProcess, flowNode );
094    this.inputMap = inputMap;
095    this.outputMap = outputMap;
096
097    buildGraph();
098
099    setTraps();
100    setScopes();
101
102    printGraph( node.getID(), node.getName(), flowProcess.getCurrentSliceNum() );
103    bind();
104    }
105
106  public InputSource getStreamedHead()
107    {
108    return streamedHead;
109    }
110
111  protected void buildGraph()
112    {
113    inputMultiMap = new SetMultiMap<>();
114
115    for( Map.Entry<String, LogicalInput> entry : inputMap.entrySet() )
116      {
117      Configuration inputConfiguration = getInputConfiguration( entry.getValue() );
118      inputConfigMap.put( entry.getValue(), inputConfiguration );
119
120      inputMultiMap.addAll( getEdgeSourceID( entry.getValue(), inputConfiguration ), entry.getValue() );
121      }
122
123    outputMultiMap = new SetMultiMap<>();
124
125    for( Map.Entry<String, LogicalOutput> entry : outputMap.entrySet() )
126      {
127      Configuration outputConfiguration = getOutputConfiguration( entry.getValue() );
128      outputConfigMap.put( entry.getValue(), outputConfiguration );
129
130      outputMultiMap.addAll( TezUtil.getEdgeSinkID( entry.getValue(), outputConfiguration ), entry.getValue() );
131      }
132
133    // this made the assumption we can have a physical and logical input per vertex. seems we can't
134    if( inputMultiMap.getKeys().size() == 1 )
135      {
136      streamedSource = Flows.getFlowElementForID( node.getSourceElements(), Util.getFirst( inputMultiMap.getKeys() ) );
137      }
138    else
139      {
140      Set<FlowElement> sourceElements = new HashSet<>( node.getSourceElements() );
141      Set<? extends FlowElement> accumulated = node.getSourceElements( StreamMode.Accumulated );
142
143      sourceElements.removeAll( accumulated );
144
145      if( sourceElements.size() != 1 )
146        throw new IllegalStateException( "too many input source keys, got: " + Util.join( sourceElements, ", " ) );
147
148      streamedSource = Util.getFirst( sourceElements );
149      }
150
151    LOG.info( "using streamed source: " + streamedSource );
152
153    streamedHead = handleHead( streamedSource, flowProcess );
154
155    Set<FlowElement> accumulated = new HashSet<>( node.getSourceElements() );
156
157    accumulated.remove( streamedSource );
158
159    Hadoop2TezFlowProcess tezProcess = (Hadoop2TezFlowProcess) flowProcess;
160    TezConfiguration conf = tezProcess.getConfiguration();
161
162    for( FlowElement flowElement : accumulated )
163      {
164      LOG.info( "using accumulated source: " + flowElement );
165
166      if( flowElement instanceof Tap )
167        {
168        Tap source = (Tap) flowElement;
169
170        // allows client side config to be used cluster side
171        String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( source ) );
172
173        if( property == null )
174          throw new IllegalStateException( "accumulated source conf property missing for: " + source.getIdentifier() );
175
176        conf = getSourceConf( tezProcess, conf, property );
177        }
178      else
179        {
180        conf = (TezConfiguration) inputConfigMap.get( FlowElements.id( flowElement ) );
181        }
182
183      FlowProcess flowProcess = conf == null ? tezProcess : new Hadoop2TezFlowProcess( tezProcess, conf );
184
185      handleHead( flowElement, flowProcess );
186      }
187    }
188
189  private TezConfiguration getSourceConf( FlowProcess<TezConfiguration> flowProcess, TezConfiguration conf, String property )
190    {
191    Map<String, String> priorConf;
192
193    try
194      {
195      priorConf = (Map<String, String>) HadoopUtil.deserializeBase64( property, conf, HashMap.class, true );
196      }
197    catch( IOException exception )
198      {
199      throw new FlowException( "unable to deserialize properties", exception );
200      }
201
202    return flowProcess.mergeMapIntoConfig( conf, priorConf );
203    }
204
205  private InputSource handleHead( FlowElement source, FlowProcess flowProcess )
206    {
207    Duct sourceDuct;
208
209    if( source instanceof Tap )
210      sourceDuct = createSourceStage( (Tap) source, flowProcess );
211    else if( source instanceof Merge )
212      sourceDuct = createMergeStage( (Merge) source, IORole.source );
213    else if( source instanceof Boundary )
214      sourceDuct = createBoundaryStage( (Boundary) source, IORole.source );
215    else if( ( (Group) source ).isGroupBy() )
216      sourceDuct = createGroupByGate( (GroupBy) source, IORole.source );
217    else
218      sourceDuct = createCoGroupGate( (CoGroup) source, IORole.source );
219
220    addHead( sourceDuct );
221
222    handleDuct( source, sourceDuct );
223
224    return (InputSource) sourceDuct;
225    }
226
227  protected SourceStage createSourceStage( Tap source, FlowProcess flowProcess )
228    {
229    String id = Tap.id( source );
230    LogicalInput logicalInput = inputMap.get( id );
231
232    if( logicalInput == null )
233      logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) );
234
235    if( logicalInput == null )
236      return new SourceStage( flowProcess, source );
237
238    return new TezSourceStage( flowProcess, source, logicalInput );
239    }
240
241  @Override
242  protected SinkStage createSinkStage( Tap sink )
243    {
244    String id = Tap.id( sink );
245    LogicalOutput logicalOutput = outputMap.get( id );
246
247    if( logicalOutput == null )
248      logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) );
249
250    if( logicalOutput == null )
251      throw new IllegalStateException( "could not find output for: " + sink );
252
253    return new TezSinkStage( flowProcess, sink, logicalOutput );
254    }
255
256  @Override
257  protected Duct createMergeStage( Merge element, IORole role )
258    {
259    if( role == IORole.pass )
260      return super.createMergeStage( element, IORole.pass );
261    else if( role == IORole.sink )
262      return createSinkMergeGate( element );
263    else if( role == IORole.source )
264      return createSourceMergeGate( element );
265    else
266      throw new UnsupportedOperationException( "both role not supported with merge" );
267    }
268
269  private Duct createSourceMergeGate( Merge element )
270    {
271    return new TezMergeGate( flowProcess, element, IORole.source, createInputMap( element ) );
272    }
273
274  private Duct createSinkMergeGate( Merge element )
275    {
276    return new TezMergeGate( flowProcess, element, IORole.sink, findLogicalOutputs( element ) );
277    }
278
279  @Override
280  protected Duct createBoundaryStage( Boundary element, IORole role )
281    {
282    if( role == IORole.pass )
283      return super.createBoundaryStage( element, IORole.pass );
284    else if( role == IORole.sink )
285      return createSinkBoundaryStage( element );
286    else if( role == IORole.source )
287      return createSourceBoundaryStage( element );
288    else
289      throw new UnsupportedOperationException( "both role not supported with boundary" );
290    }
291
292  private Duct createSourceBoundaryStage( Boundary element )
293    {
294    return new TezBoundaryStage( flowProcess, element, IORole.source, findLogicalInput( element ) );
295    }
296
297  private Duct createSinkBoundaryStage( Boundary element )
298    {
299    return new TezBoundaryStage( flowProcess, element, IORole.sink, findLogicalOutputs( element ) );
300    }
301
302  @Override
303  protected Gate createGroupByGate( GroupBy element, IORole role )
304    {
305    if( role == IORole.sink )
306      return createSinkGroupByGate( element );
307    else
308      return createSourceGroupByGate( element );
309    }
310
311  @Override
312  protected Gate createCoGroupGate( CoGroup element, IORole role )
313    {
314    if( role == IORole.sink )
315      return createSinkCoGroupByGate( element );
316    else
317      return createSourceCoGroupByGate( element );
318    }
319
320  private Gate createSinkCoGroupByGate( CoGroup element )
321    {
322    return new TezCoGroupGate( flowProcess, element, IORole.sink, findLogicalOutput( element ) );
323    }
324
325  private Gate createSourceCoGroupByGate( CoGroup element )
326    {
327    return new TezCoGroupGate( flowProcess, element, IORole.source, createInputMap( element ) );
328    }
329
330  protected Gate createSinkGroupByGate( GroupBy element )
331    {
332    return new TezGroupByGate( flowProcess, element, IORole.sink, findLogicalOutput( element ) );
333    }
334
335  protected Gate createSourceGroupByGate( GroupBy element )
336    {
337    return new TezGroupByGate( flowProcess, element, IORole.source, createInputMap( element ) );
338    }
339
340  private LogicalOutput findLogicalOutput( Pipe element )
341    {
342    String id = Pipe.id( element );
343    LogicalOutput logicalOutput = outputMap.get( id );
344
345    if( logicalOutput == null )
346      logicalOutput = outputMap.get( flowProcess.getStringProperty( "cascading.node.sink." + id ) );
347
348    if( logicalOutput == null )
349      throw new IllegalStateException( "could not find output for: " + element );
350
351    return logicalOutput;
352    }
353
354  private Collection<LogicalOutput> findLogicalOutputs( Pipe element )
355    {
356    String id = Pipe.id( element );
357
358    return outputMultiMap.getValues( id );
359    }
360
361  private LogicalInput findLogicalInput( Pipe element )
362    {
363    String id = Pipe.id( element );
364    LogicalInput logicalInput = inputMap.get( id );
365
366    if( logicalInput == null )
367      logicalInput = inputMap.get( flowProcess.getStringProperty( "cascading.node.source." + id ) );
368
369    if( logicalInput == null )
370      throw new IllegalStateException( "could not find input for: " + element );
371
372    return logicalInput;
373    }
374
375  /**
376   * Maps each input to an ordinal on the flowelement. an input may be bound to multiple ordinals.
377   *
378   * @param element
379   */
380  private SortedListMultiMap<Integer, LogicalInput> createInputMap( FlowElement element )
381    {
382    String id = FlowElements.id( element );
383    SortedListMultiMap<Integer, LogicalInput> ordinalMap = new SortedListMultiMap<>();
384
385    for( LogicalInput logicalInput : inputMap.values() )
386      {
387      Configuration configuration = inputConfigMap.get( logicalInput );
388
389      String foundID = configuration.get( "cascading.node.source" );
390
391      if( Util.isEmpty( foundID ) )
392        throw new IllegalStateException( "cascading.node.source property not set on source LogicalInput" );
393
394      if( !foundID.equals( id ) )
395        continue;
396
397      String values = configuration.get( "cascading.node.ordinals", "" );
398      List<Integer> ordinals = Util.split( Integer.class, ",", values );
399
400      for( Integer ordinal : ordinals )
401        ordinalMap.put( ordinal, logicalInput );
402      }
403
404    return ordinalMap;
405    }
406
407  @Override
408  protected MemoryHashJoinGate createNonBlockingJoinGate( HashJoin join )
409    {
410    return new HadoopMemoryJoinGate( flowProcess, join ); // does not use a latch
411    }
412  }