001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner;
022
023import java.io.Serializable;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import cascading.flow.FlowElement;
033import cascading.flow.FlowNode;
034import cascading.flow.FlowStep;
035import cascading.flow.planner.graph.AnnotatedGraph;
036import cascading.flow.planner.graph.ElementGraph;
037import cascading.flow.planner.graph.ElementGraphs;
038import cascading.flow.planner.graph.Extent;
039import cascading.flow.planner.graph.FlowElementGraph;
040import cascading.flow.stream.annotations.StreamMode;
041import cascading.pipe.Group;
042import cascading.pipe.Pipe;
043import cascading.stats.FlowNodeStats;
044import cascading.tap.Tap;
045import cascading.util.ProcessLogger;
046import cascading.util.Util;
047
048import static cascading.util.Util.createIdentitySet;
049
050/**
051 *
052 */
053public class BaseFlowNode implements Serializable, FlowNode, ProcessLogger
054  {
055  private final String id;
056  private int ordinal;
057  private String name;
058  private Map<String, String> processAnnotations;
059
060  private transient FlowStep flowStep;
061
062  protected ElementGraph nodeSubGraph;
063  protected List<? extends ElementGraph> pipelineGraphs = Collections.emptyList();
064
065  private transient Set<FlowElement> sourceElements;
066  private transient Set<FlowElement> sinkElements;
067  private Map<String, Tap> trapMap = Collections.emptyMap();
068  protected Set<Tap> sourceTaps;
069  protected Set<Tap> sinkTaps;
070
071  private Map<Tap, Set<String>> reverseSourceTaps;
072  private Map<Tap, Set<String>> reverseSinkTaps;
073  private Map<FlowElement, ElementGraph> streamPipelineMap = Collections.emptyMap();
074
075  /** optional metadata about the FlowStep */
076  private Map<String, String> flowNodeDescriptor = Collections.emptyMap();
077
078  protected transient FlowNodeStats flowNodeStats;
079
080  public BaseFlowNode( String name, int ordinal )
081    {
082    this( name, ordinal, null );
083    }
084
085  public BaseFlowNode( String name, int ordinal, Map<String, String> flowNodeDescriptor )
086    {
087    this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number
088    setName( name );
089    this.ordinal = ordinal;
090    this.trapMap = Collections.emptyMap();
091
092    setFlowNodeDescriptor( flowNodeDescriptor );
093    }
094
095  public BaseFlowNode( ElementGraph nodeSubGraph )
096    {
097    this( null, nodeSubGraph, null, null );
098    }
099
100  public BaseFlowNode( ElementGraph nodeSubGraph, Map<String, String> flowNodeDescriptor )
101    {
102    this( null, nodeSubGraph, flowNodeDescriptor );
103    }
104
105  public BaseFlowNode( ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs )
106    {
107    this( null, nodeSubGraph, pipelineGraphs, null );
108    }
109
110  public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, Map<String, String> flowNodeDescriptor )
111    {
112    this( flowElementGraph, nodeSubGraph, null, flowNodeDescriptor );
113    }
114
115  public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs )
116    {
117    this( flowElementGraph, nodeSubGraph, pipelineGraphs, null );
118    }
119
120  public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs, Map<String, String> flowNodeDescriptor )
121    {
122    this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number
123    this.nodeSubGraph = nodeSubGraph;
124
125    setPipelineGraphs( pipelineGraphs );
126
127    setFlowNodeDescriptor( flowNodeDescriptor );
128
129    verifyPipelines();
130    createPipelineMap();
131
132    if( flowElementGraph != null )
133      {
134      assignTrappableNames( flowElementGraph );
135      assignTraps( flowElementGraph.getTrapMap() );
136      }
137    }
138
139  public void setOrdinal( int ordinal )
140    {
141    this.ordinal = ordinal;
142    }
143
144  @Override
145  public int getOrdinal()
146    {
147    return ordinal;
148    }
149
150  @Override
151  public String getID()
152    {
153    return id;
154    }
155
156  public void setName( String name )
157    {
158    this.name = name;
159    }
160
161  @Override
162  public String getName()
163    {
164    return name;
165    }
166
167  @Override
168  public Map<String, String> getFlowNodeDescriptor()
169    {
170    return flowNodeDescriptor;
171    }
172
173  protected void setFlowNodeDescriptor( Map<String, String> flowNodeDescriptor )
174    {
175    if( flowNodeDescriptor != null )
176      this.flowNodeDescriptor = flowNodeDescriptor;
177    }
178
179  @Override
180  public Map<String, String> getProcessAnnotations()
181    {
182    if( processAnnotations == null )
183      return Collections.emptyMap();
184
185    return Collections.unmodifiableMap( processAnnotations );
186    }
187
188  @Override
189  public void addProcessAnnotation( Enum annotation )
190    {
191    if( annotation == null )
192      return;
193
194    addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() );
195    }
196
197  @Override
198  public void addProcessAnnotation( String key, String value )
199    {
200    if( processAnnotations == null )
201      processAnnotations = new HashMap<>();
202
203    processAnnotations.put( key, value );
204    }
205
206  public void setFlowNodeStats( FlowNodeStats flowNodeStats )
207    {
208    this.flowNodeStats = flowNodeStats;
209    }
210
211  @Override
212  public FlowNodeStats getFlowNodeStats()
213    {
214    return flowNodeStats;
215    }
216
217  public void setFlowStep( FlowStep flowStep )
218    {
219    this.flowStep = flowStep;
220    }
221
222  @Override
223  public FlowStep getFlowStep()
224    {
225    return flowStep;
226    }
227
228  @Override
229  public ElementGraph getElementGraph()
230    {
231    return nodeSubGraph;
232    }
233
234  @Override
235  public Set<String> getSourceElementNames()
236    {
237    Set<String> results = new HashSet<>();
238
239    for( FlowElement flowElement : getSourceElements() )
240      {
241      if( flowElement instanceof Tap )
242        results.addAll( getSourceTapNames( (Tap) flowElement ) );
243      else
244        results.add( ( (Pipe) flowElement ).getName() );
245      }
246
247    return results;
248    }
249
250  public Set<FlowElement> getSourceElements()
251    {
252    if( sourceElements == null )
253      sourceElements = Collections.unmodifiableSet( ElementGraphs.findSources( nodeSubGraph, FlowElement.class ) );
254
255    return sourceElements;
256    }
257
258  @Override
259  public Set<? extends FlowElement> getSourceElements( Enum annotation )
260    {
261    Set<? extends FlowElement> annotated = getFlowElementsFor( annotation );
262    Set<FlowElement> sourceElements = getSourceElements();
263
264    Set<FlowElement> results = new HashSet<>();
265
266    for( FlowElement sourceElement : sourceElements )
267      {
268      if( annotated.contains( sourceElement ) )
269        results.add( sourceElement );
270      }
271
272    return results;
273    }
274
275  @Override
276  public Set<String> getSinkElementNames()
277    {
278    Set<String> results = new HashSet<>();
279
280    for( FlowElement flowElement : getSinkElements() )
281      {
282      if( flowElement instanceof Tap )
283        results.addAll( getSinkTapNames( (Tap) flowElement ) );
284      else
285        results.add( ( (Pipe) flowElement ).getName() );
286      }
287
288    return results;
289    }
290
291  @Override
292  public Set<FlowElement> getSinkElements()
293    {
294    if( sinkElements == null )
295      sinkElements = Collections.unmodifiableSet( ElementGraphs.findSinks( nodeSubGraph, FlowElement.class ) );
296
297    return sinkElements;
298    }
299
300  public Set<? extends FlowElement> getSinkElements( Enum annotation )
301    {
302    Set<? extends FlowElement> annotated = getFlowElementsFor( annotation );
303    Set<FlowElement> sinkElements = getSinkElements();
304
305    Set<FlowElement> results = new HashSet<>();
306
307    for( FlowElement sinkElement : sinkElements )
308      {
309      if( annotated.contains( sinkElement ) )
310        results.add( sinkElement );
311      }
312
313    return results;
314    }
315
316  @Override
317  public List<? extends ElementGraph> getPipelineGraphs()
318    {
319    return pipelineGraphs;
320    }
321
322  protected void setPipelineGraphs( List<? extends ElementGraph> pipelineGraphs )
323    {
324    if( pipelineGraphs != null )
325      this.pipelineGraphs = pipelineGraphs;
326    }
327
328  @Override
329  public ElementGraph getPipelineGraphFor( FlowElement streamedSource )
330    {
331    return streamPipelineMap.get( streamedSource );
332    }
333
334  @Override
335  public Collection<Group> getGroups()
336    {
337    return ElementGraphs.findAllGroups( nodeSubGraph );
338    }
339
340  @Override
341  public Set<Tap> getSourceTaps()
342    {
343    if( sourceTaps != null )
344      return sourceTaps;
345
346    sourceTaps = Collections.unmodifiableSet( Util.narrowIdentitySet( Tap.class, getSourceElements() ) );
347
348    return sourceTaps;
349    }
350
351  @Override
352  public Set<Tap> getSinkTaps()
353    {
354    if( sinkTaps != null )
355      return sinkTaps;
356
357    sinkTaps = Collections.unmodifiableSet( Util.narrowIdentitySet( Tap.class, getSinkElements() ) );
358
359    return sinkTaps;
360    }
361
362  @Override
363  public int getSubmitPriority()
364    {
365    return 0;
366    }
367
368  @Override
369  public Set<String> getSourceTapNames( Tap source )
370    {
371    return reverseSourceTaps.get( source );
372    }
373
374  @Override
375  public Set<String> getSinkTapNames( Tap sink )
376    {
377    return reverseSinkTaps.get( sink );
378    }
379
380  private void assignTrappableNames( FlowElementGraph flowElementGraph )
381    {
382    if( flowElementGraph == null )
383      return;
384
385    reverseSourceTaps = new HashMap<>();
386    reverseSinkTaps = new HashMap<>();
387
388    Set<Tap> sources = getSourceTaps();
389
390    for( Tap source : sources )
391      {
392      Set<Scope> scopes = flowElementGraph.outgoingEdgesOf( source );
393
394      for( Scope scope : scopes )
395        addSourceName( scope.getName(), source );
396      }
397
398    for( Map.Entry<String, Tap> entry : flowElementGraph.getSourceMap().entrySet() )
399      {
400      if( sources.contains( entry.getValue() ) )
401        addSourceName( entry.getKey(), entry.getValue() );
402      }
403
404    Set<Tap> sinks = getSinkTaps();
405
406    for( Tap sink : sinks )
407      {
408      Set<Scope> scopes = flowElementGraph.incomingEdgesOf( sink );
409
410      for( Scope scope : scopes )
411        addSinkName( scope.getName(), sink );
412      }
413
414    for( Map.Entry<String, Tap> entry : flowElementGraph.getSinkMap().entrySet() )
415      {
416      if( sinks.contains( entry.getValue() ) )
417        addSinkName( entry.getKey(), entry.getValue() );
418      }
419    }
420
421  private void addSourceName( String name, Tap source )
422    {
423    if( !reverseSourceTaps.containsKey( source ) )
424      reverseSourceTaps.put( source, new HashSet<String>() );
425
426    reverseSourceTaps.get( source ).add( name );
427    }
428
429  private void addSinkName( String name, Tap sink )
430    {
431    if( !reverseSinkTaps.containsKey( sink ) )
432      reverseSinkTaps.put( sink, new HashSet<String>() );
433
434    reverseSinkTaps.get( sink ).add( name );
435    }
436
437  @Override
438  public Map<String, Tap> getTrapMap()
439    {
440    return trapMap;
441    }
442
443  @Override
444  public Collection<? extends Tap> getTraps()
445    {
446    return getTrapMap().values();
447    }
448
449  private void assignTraps( Map<String, Tap> traps )
450    {
451    trapMap = new HashMap<>();
452
453    for( FlowElement flowElement : nodeSubGraph.vertexSet() )
454      {
455      Set<String> names = new HashSet<>();
456
457      if( flowElement instanceof Extent )
458        continue;
459
460      if( flowElement instanceof Pipe )
461        {
462        names.add( ( (Pipe) flowElement ).getName() );
463        }
464      else
465        {
466        Set<String> sourceTapNames = getSourceTapNames( (Tap) flowElement );
467
468        if( sourceTapNames != null )
469          names.addAll( sourceTapNames );
470
471        Set<String> sinkTapNames = getSinkTapNames( (Tap) flowElement );
472
473        if( sinkTapNames != null )
474          names.addAll( sinkTapNames );
475        }
476
477      for( String name : names )
478        {
479        if( traps.containsKey( name ) )
480          trapMap.put( name, traps.get( name ) );
481        }
482      }
483    }
484
485  private void verifyPipelines()
486    {
487    if( pipelineGraphs == null || pipelineGraphs.isEmpty() )
488      return;
489
490    Set<FlowElement> allElements = createIdentitySet( nodeSubGraph.vertexSet() );
491
492    for( ElementGraph pipelineGraph : pipelineGraphs )
493      allElements.removeAll( pipelineGraph.vertexSet() );
494
495    if( !allElements.isEmpty() )
496      throw new IllegalStateException( "union of pipeline graphs for flow node are missing elements: " + Util.join( allElements, ", " ) );
497    }
498
499  private void createPipelineMap()
500    {
501    if( pipelineGraphs == null || pipelineGraphs.isEmpty() )
502      return;
503
504    Map<FlowElement, ElementGraph> map = new HashMap<>( pipelineGraphs.size() );
505
506    for( ElementGraph pipelineGraph : pipelineGraphs )
507      {
508      if( !( pipelineGraph instanceof AnnotatedGraph ) )
509        throw new IllegalStateException( "pipeline graphs must be of type AnnotatedGraph, got: " + pipelineGraph.getClass().getName() );
510
511      Set<FlowElement> flowElements;
512
513      if( ( (AnnotatedGraph) pipelineGraph ).hasAnnotations() )
514        flowElements = ( (AnnotatedGraph) pipelineGraph ).getAnnotations().getValues( StreamMode.Streamed );
515      else
516        flowElements = ElementGraphs.findSources( pipelineGraph, FlowElement.class );
517
518      for( FlowElement flowElement : flowElements )
519        {
520        if( map.containsKey( flowElement ) )
521          throw new IllegalStateException( "duplicate streamable elements, found:  " + flowElement );
522
523        map.put( flowElement, pipelineGraph );
524        }
525      }
526
527    this.streamPipelineMap = map;
528    }
529
530  @Override
531  public Tap getTrap( String branchName )
532    {
533    return trapMap.get( branchName );
534    }
535
536  @Override
537  public Collection<? extends Scope> getPreviousScopes( FlowElement flowElement )
538    {
539    return nodeSubGraph.incomingEdgesOf( flowElement );
540    }
541
542  @Override
543  public Collection<? extends Scope> getNextScopes( FlowElement flowElement )
544    {
545    return nodeSubGraph.outgoingEdgesOf( flowElement );
546    }
547
548  @Override
549  public boolean equals( Object object )
550    {
551    if( this == object )
552      return true;
553
554    if( object == null || getClass() != object.getClass() )
555      return false;
556
557    BaseFlowNode flowNode = (BaseFlowNode) object;
558
559    if( id != null ? !id.equals( flowNode.id ) : flowNode.id != null )
560      return false;
561
562    return true;
563    }
564
565  @Override
566  public int hashCode()
567    {
568    return id != null ? id.hashCode() : 0;
569    }
570
571  @Override
572  public Set<? extends FlowElement> getFlowElementsFor( Enum annotation )
573    {
574    if( pipelineGraphs.isEmpty() )
575      return ( (AnnotatedGraph) getElementGraph() ).getAnnotations().getValues( annotation );
576
577    Set<FlowElement> results = createIdentitySet();
578
579    for( ElementGraph pipelineGraph : pipelineGraphs )
580      results.addAll( ( (AnnotatedGraph) pipelineGraph ).getAnnotations().getValues( annotation ) );
581
582    return results;
583    }
584
585  private ProcessLogger getLogger()
586    {
587    if( flowStep != null && flowStep instanceof ProcessLogger )
588      return (ProcessLogger) flowStep;
589
590    return ProcessLogger.NULL;
591    }
592
593  @Override
594  public boolean isInfoEnabled()
595    {
596    return getLogger().isInfoEnabled();
597    }
598
599  @Override
600  public boolean isDebugEnabled()
601    {
602    return getLogger().isDebugEnabled();
603    }
604
605  @Override
606  public void logInfo( String message, Object... arguments )
607    {
608    getLogger().logInfo( message, arguments );
609    }
610
611  @Override
612  public void logDebug( String message, Object... arguments )
613    {
614    getLogger().logDebug( message, arguments );
615    }
616
617  @Override
618  public void logWarn( String message )
619    {
620    getLogger().logWarn( message );
621    }
622
623  @Override
624  public void logWarn( String message, Object... arguments )
625    {
626    getLogger().logWarn( message, arguments );
627    }
628
629  @Override
630  public void logWarn( String message, Throwable throwable )
631    {
632    getLogger().logWarn( message, throwable );
633    }
634
635  @Override
636  public void logError( String message, Object... arguments )
637    {
638    getLogger().logError( message, arguments );
639    }
640
641  @Override
642  public void logError( String message, Throwable throwable )
643    {
644    getLogger().logError( message, throwable );
645    }
646  }