001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner;
022
023import java.io.Serializable;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import cascading.flow.FlowElement;
033import cascading.flow.FlowNode;
034import cascading.flow.FlowStep;
035import cascading.flow.planner.graph.AnnotatedGraph;
036import cascading.flow.planner.graph.ElementGraph;
037import cascading.flow.planner.graph.ElementGraphs;
038import cascading.flow.planner.graph.Extent;
039import cascading.flow.planner.graph.FlowElementGraph;
040import cascading.flow.stream.annotations.StreamMode;
041import cascading.pipe.Group;
042import cascading.pipe.Pipe;
043import cascading.stats.FlowNodeStats;
044import cascading.tap.Tap;
045import cascading.util.ProcessLogger;
046import cascading.util.Util;
047
048import static cascading.util.Util.createIdentitySet;
049
050/**
051 *
052 */
053public class BaseFlowNode implements Serializable, FlowNode, ProcessLogger
054  {
055  private final String id;
056  private int ordinal;
057  private String name;
058  private Map<String, String> processAnnotations;
059
060  private transient FlowStep flowStep;
061
062  protected ElementGraph nodeSubGraph;
063  protected List<? extends ElementGraph> pipelineGraphs = Collections.emptyList();
064
065  private transient Set<FlowElement> sourceElements;
066  private transient Set<FlowElement> sinkElements;
067  private Map<String, Tap> trapMap = Collections.emptyMap();
068  protected Set<Tap> sourceTaps;
069  protected Set<Tap> sinkTaps;
070
071  private Map<Tap, Set<String>> reverseSourceTaps;
072  private Map<Tap, Set<String>> reverseSinkTaps;
073  private Map<FlowElement, ElementGraph> streamPipelineMap = Collections.emptyMap();
074
075  /** optional metadata about the FlowStep */
076  private Map<String, String> flowNodeDescriptor = Collections.emptyMap();
077
078  protected transient FlowNodeStats flowNodeStats;
079
080  public BaseFlowNode( String name, int ordinal )
081    {
082    this( name, ordinal, null );
083    }
084
085  public BaseFlowNode( String name, int ordinal, Map<String, String> flowNodeDescriptor )
086    {
087    this( null, name, ordinal, flowNodeDescriptor );
088    }
089
090  public BaseFlowNode( ElementGraph nodeSubGraph, String name, int ordinal )
091    {
092    this( nodeSubGraph, name, ordinal, null );
093    }
094
095  public BaseFlowNode( ElementGraph nodeSubGraph, String name, int ordinal, Map<String, String> flowNodeDescriptor )
096    {
097    this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number
098    this.nodeSubGraph = nodeSubGraph;
099    setName( name );
100    this.ordinal = ordinal;
101    this.trapMap = Collections.emptyMap();
102
103    setFlowNodeDescriptor( flowNodeDescriptor );
104    }
105
106  public BaseFlowNode( ElementGraph nodeSubGraph )
107    {
108    this( null, nodeSubGraph, null, null );
109    }
110
111  public BaseFlowNode( ElementGraph nodeSubGraph, Map<String, String> flowNodeDescriptor )
112    {
113    this( null, nodeSubGraph, flowNodeDescriptor );
114    }
115
116  public BaseFlowNode( ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs )
117    {
118    this( null, nodeSubGraph, pipelineGraphs, null );
119    }
120
121  public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, Map<String, String> flowNodeDescriptor )
122    {
123    this( flowElementGraph, nodeSubGraph, null, flowNodeDescriptor );
124    }
125
126  public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs )
127    {
128    this( flowElementGraph, nodeSubGraph, pipelineGraphs, null );
129    }
130
131  public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs, Map<String, String> flowNodeDescriptor )
132    {
133    this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number
134    this.nodeSubGraph = nodeSubGraph;
135
136    setPipelineGraphs( pipelineGraphs );
137
138    setFlowNodeDescriptor( flowNodeDescriptor );
139
140    verifyPipelines();
141    createPipelineMap();
142
143    if( flowElementGraph != null )
144      {
145      assignTrappableNames( flowElementGraph );
146      assignTraps( flowElementGraph.getTrapMap() );
147      }
148    }
149
150  public void setOrdinal( int ordinal )
151    {
152    this.ordinal = ordinal;
153    }
154
155  @Override
156  public int getOrdinal()
157    {
158    return ordinal;
159    }
160
161  @Override
162  public String getID()
163    {
164    return id;
165    }
166
167  public void setName( String name )
168    {
169    this.name = name;
170    }
171
172  @Override
173  public String getName()
174    {
175    return name;
176    }
177
178  @Override
179  public Map<String, String> getFlowNodeDescriptor()
180    {
181    return flowNodeDescriptor;
182    }
183
184  protected void setFlowNodeDescriptor( Map<String, String> flowNodeDescriptor )
185    {
186    if( flowNodeDescriptor != null )
187      this.flowNodeDescriptor = flowNodeDescriptor;
188    }
189
190  @Override
191  public Map<String, String> getProcessAnnotations()
192    {
193    if( processAnnotations == null )
194      return Collections.emptyMap();
195
196    return Collections.unmodifiableMap( processAnnotations );
197    }
198
199  @Override
200  public void addProcessAnnotation( Enum annotation )
201    {
202    if( annotation == null )
203      return;
204
205    addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() );
206    }
207
208  @Override
209  public void addProcessAnnotation( String key, String value )
210    {
211    if( processAnnotations == null )
212      processAnnotations = new HashMap<>();
213
214    processAnnotations.put( key, value );
215    }
216
217  public void setFlowNodeStats( FlowNodeStats flowNodeStats )
218    {
219    this.flowNodeStats = flowNodeStats;
220    }
221
222  @Override
223  public FlowNodeStats getFlowNodeStats()
224    {
225    return flowNodeStats;
226    }
227
228  public void setFlowStep( FlowStep flowStep )
229    {
230    this.flowStep = flowStep;
231    }
232
233  @Override
234  public FlowStep getFlowStep()
235    {
236    return flowStep;
237    }
238
239  @Override
240  public ElementGraph getElementGraph()
241    {
242    return nodeSubGraph;
243    }
244
245  @Override
246  public Set<String> getSourceElementNames()
247    {
248    Set<String> results = new HashSet<>();
249
250    for( FlowElement flowElement : getSourceElements() )
251      {
252      if( flowElement instanceof Tap )
253        results.addAll( getSourceTapNames( (Tap) flowElement ) );
254      else
255        results.add( ( (Pipe) flowElement ).getName() );
256      }
257
258    return results;
259    }
260
261  public Set<FlowElement> getSourceElements()
262    {
263    if( sourceElements == null )
264      sourceElements = Collections.unmodifiableSet( ElementGraphs.findSources( nodeSubGraph, FlowElement.class ) );
265
266    return sourceElements;
267    }
268
269  @Override
270  public Set<? extends FlowElement> getSourceElements( Enum annotation )
271    {
272    Set<? extends FlowElement> annotated = getFlowElementsFor( annotation );
273    Set<FlowElement> sourceElements = getSourceElements();
274
275    Set<FlowElement> results = new HashSet<>();
276
277    for( FlowElement sourceElement : sourceElements )
278      {
279      if( annotated.contains( sourceElement ) )
280        results.add( sourceElement );
281      }
282
283    return results;
284    }
285
286  @Override
287  public Set<String> getSinkElementNames()
288    {
289    Set<String> results = new HashSet<>();
290
291    for( FlowElement flowElement : getSinkElements() )
292      {
293      if( flowElement instanceof Tap )
294        results.addAll( getSinkTapNames( (Tap) flowElement ) );
295      else
296        results.add( ( (Pipe) flowElement ).getName() );
297      }
298
299    return results;
300    }
301
302  @Override
303  public Set<FlowElement> getSinkElements()
304    {
305    if( sinkElements == null )
306      sinkElements = Collections.unmodifiableSet( ElementGraphs.findSinks( nodeSubGraph, FlowElement.class ) );
307
308    return sinkElements;
309    }
310
311  public Set<? extends FlowElement> getSinkElements( Enum annotation )
312    {
313    Set<? extends FlowElement> annotated = getFlowElementsFor( annotation );
314    Set<FlowElement> sinkElements = getSinkElements();
315
316    Set<FlowElement> results = new HashSet<>();
317
318    for( FlowElement sinkElement : sinkElements )
319      {
320      if( annotated.contains( sinkElement ) )
321        results.add( sinkElement );
322      }
323
324    return results;
325    }
326
327  @Override
328  public List<? extends ElementGraph> getPipelineGraphs()
329    {
330    return pipelineGraphs;
331    }
332
333  protected void setPipelineGraphs( List<? extends ElementGraph> pipelineGraphs )
334    {
335    if( pipelineGraphs != null )
336      this.pipelineGraphs = pipelineGraphs;
337    }
338
339  @Override
340  public ElementGraph getPipelineGraphFor( FlowElement streamedSource )
341    {
342    return streamPipelineMap.get( streamedSource );
343    }
344
345  @Override
346  public Collection<Group> getGroups()
347    {
348    return ElementGraphs.findAllGroups( nodeSubGraph );
349    }
350
351  @Override
352  public Set<Tap> getSourceTaps()
353    {
354    if( sourceTaps != null )
355      return sourceTaps;
356
357    sourceTaps = Collections.unmodifiableSet( Util.narrowIdentitySet( Tap.class, getSourceElements() ) );
358
359    return sourceTaps;
360    }
361
362  @Override
363  public Set<Tap> getSinkTaps()
364    {
365    if( sinkTaps != null )
366      return sinkTaps;
367
368    sinkTaps = Collections.unmodifiableSet( Util.narrowIdentitySet( Tap.class, getSinkElements() ) );
369
370    return sinkTaps;
371    }
372
373  @Override
374  public int getSubmitPriority()
375    {
376    return 0;
377    }
378
379  @Override
380  public Set<String> getSourceTapNames( Tap source )
381    {
382    return reverseSourceTaps.get( source );
383    }
384
385  @Override
386  public Set<String> getSinkTapNames( Tap sink )
387    {
388    return reverseSinkTaps.get( sink );
389    }
390
391  private void assignTrappableNames( FlowElementGraph flowElementGraph )
392    {
393    if( flowElementGraph == null )
394      return;
395
396    reverseSourceTaps = new HashMap<>();
397    reverseSinkTaps = new HashMap<>();
398
399    Set<Tap> sources = getSourceTaps();
400
401    for( Tap source : sources )
402      {
403      Set<Scope> scopes = flowElementGraph.outgoingEdgesOf( source );
404
405      for( Scope scope : scopes )
406        addSourceName( scope.getName(), source );
407      }
408
409    for( Map.Entry<String, Tap> entry : flowElementGraph.getSourceMap().entrySet() )
410      {
411      if( sources.contains( entry.getValue() ) )
412        addSourceName( entry.getKey(), entry.getValue() );
413      }
414
415    Set<Tap> sinks = getSinkTaps();
416
417    for( Tap sink : sinks )
418      {
419      Set<Scope> scopes = flowElementGraph.incomingEdgesOf( sink );
420
421      for( Scope scope : scopes )
422        addSinkName( scope.getName(), sink );
423      }
424
425    for( Map.Entry<String, Tap> entry : flowElementGraph.getSinkMap().entrySet() )
426      {
427      if( sinks.contains( entry.getValue() ) )
428        addSinkName( entry.getKey(), entry.getValue() );
429      }
430    }
431
432  private void addSourceName( String name, Tap source )
433    {
434    if( !reverseSourceTaps.containsKey( source ) )
435      reverseSourceTaps.put( source, new HashSet<String>() );
436
437    reverseSourceTaps.get( source ).add( name );
438    }
439
440  private void addSinkName( String name, Tap sink )
441    {
442    if( !reverseSinkTaps.containsKey( sink ) )
443      reverseSinkTaps.put( sink, new HashSet<String>() );
444
445    reverseSinkTaps.get( sink ).add( name );
446    }
447
448  @Override
449  public Map<String, Tap> getTrapMap()
450    {
451    return trapMap;
452    }
453
454  @Override
455  public Collection<? extends Tap> getTraps()
456    {
457    return getTrapMap().values();
458    }
459
460  private void assignTraps( Map<String, Tap> traps )
461    {
462    trapMap = new HashMap<>();
463
464    for( FlowElement flowElement : nodeSubGraph.vertexSet() )
465      {
466      Set<String> names = new HashSet<>();
467
468      if( flowElement instanceof Extent )
469        continue;
470
471      if( flowElement instanceof Pipe )
472        {
473        names.add( ( (Pipe) flowElement ).getName() );
474        }
475      else
476        {
477        Set<String> sourceTapNames = getSourceTapNames( (Tap) flowElement );
478
479        if( sourceTapNames != null )
480          names.addAll( sourceTapNames );
481
482        Set<String> sinkTapNames = getSinkTapNames( (Tap) flowElement );
483
484        if( sinkTapNames != null )
485          names.addAll( sinkTapNames );
486        }
487
488      for( String name : names )
489        {
490        if( traps.containsKey( name ) )
491          trapMap.put( name, traps.get( name ) );
492        }
493      }
494    }
495
496  private void verifyPipelines()
497    {
498    if( pipelineGraphs == null || pipelineGraphs.isEmpty() )
499      return;
500
501    Set<FlowElement> allElements = createIdentitySet( nodeSubGraph.vertexSet() );
502
503    for( ElementGraph pipelineGraph : pipelineGraphs )
504      allElements.removeAll( pipelineGraph.vertexSet() );
505
506    if( !allElements.isEmpty() )
507      throw new IllegalStateException( "union of pipeline graphs for flow node are missing elements: " + Util.join( allElements, ", " ) );
508    }
509
510  private void createPipelineMap()
511    {
512    if( pipelineGraphs == null || pipelineGraphs.isEmpty() )
513      return;
514
515    Map<FlowElement, ElementGraph> map = new HashMap<>( pipelineGraphs.size() );
516
517    for( ElementGraph pipelineGraph : pipelineGraphs )
518      {
519      if( !( pipelineGraph instanceof AnnotatedGraph ) )
520        throw new IllegalStateException( "pipeline graphs must be of type AnnotatedGraph, got: " + pipelineGraph.getClass().getName() );
521
522      Set<FlowElement> flowElements;
523
524      if( ( (AnnotatedGraph) pipelineGraph ).hasAnnotations() )
525        flowElements = ( (AnnotatedGraph) pipelineGraph ).getAnnotations().getValues( StreamMode.Streamed );
526      else
527        flowElements = ElementGraphs.findSources( pipelineGraph, FlowElement.class );
528
529      for( FlowElement flowElement : flowElements )
530        {
531        if( map.containsKey( flowElement ) )
532          throw new IllegalStateException( "duplicate streamable elements, found:  " + flowElement );
533
534        map.put( flowElement, pipelineGraph );
535        }
536      }
537
538    this.streamPipelineMap = map;
539    }
540
541  @Override
542  public Tap getTrap( String branchName )
543    {
544    return trapMap.get( branchName );
545    }
546
547  @Override
548  public Collection<? extends Scope> getPreviousScopes( FlowElement flowElement )
549    {
550    return nodeSubGraph.incomingEdgesOf( flowElement );
551    }
552
553  @Override
554  public Collection<? extends Scope> getNextScopes( FlowElement flowElement )
555    {
556    return nodeSubGraph.outgoingEdgesOf( flowElement );
557    }
558
559  @Override
560  public boolean equals( Object object )
561    {
562    if( this == object )
563      return true;
564
565    if( object == null || getClass() != object.getClass() )
566      return false;
567
568    BaseFlowNode flowNode = (BaseFlowNode) object;
569
570    if( id != null ? !id.equals( flowNode.id ) : flowNode.id != null )
571      return false;
572
573    return true;
574    }
575
576  @Override
577  public int hashCode()
578    {
579    return id != null ? id.hashCode() : 0;
580    }
581
582  @Override
583  public Set<? extends FlowElement> getFlowElementsFor( Enum annotation )
584    {
585    if( pipelineGraphs.isEmpty() )
586      return ( (AnnotatedGraph) getElementGraph() ).getAnnotations().getValues( annotation );
587
588    Set<FlowElement> results = createIdentitySet();
589
590    for( ElementGraph pipelineGraph : pipelineGraphs )
591      results.addAll( ( (AnnotatedGraph) pipelineGraph ).getAnnotations().getValues( annotation ) );
592
593    return results;
594    }
595
596  private ProcessLogger getLogger()
597    {
598    if( flowStep != null && flowStep instanceof ProcessLogger )
599      return (ProcessLogger) flowStep;
600
601    return ProcessLogger.NULL;
602    }
603
604  @Override
605  public boolean isInfoEnabled()
606    {
607    return getLogger().isInfoEnabled();
608    }
609
610  @Override
611  public boolean isDebugEnabled()
612    {
613    return getLogger().isDebugEnabled();
614    }
615
616  @Override
617  public void logInfo( String message, Object... arguments )
618    {
619    getLogger().logInfo( message, arguments );
620    }
621
622  @Override
623  public void logDebug( String message, Object... arguments )
624    {
625    getLogger().logDebug( message, arguments );
626    }
627
628  @Override
629  public void logWarn( String message )
630    {
631    getLogger().logWarn( message );
632    }
633
634  @Override
635  public void logWarn( String message, Object... arguments )
636    {
637    getLogger().logWarn( message, arguments );
638    }
639
640  @Override
641  public void logWarn( String message, Throwable throwable )
642    {
643    getLogger().logWarn( message, throwable );
644    }
645
646  @Override
647  public void logError( String message, Object... arguments )
648    {
649    getLogger().logError( message, arguments );
650    }
651
652  @Override
653  public void logError( String message, Throwable throwable )
654    {
655    getLogger().logError( message, throwable );
656    }
657  }