001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner;
022
023import java.io.Serializable;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031
032import cascading.flow.FlowElement;
033import cascading.flow.FlowNode;
034import cascading.flow.FlowStep;
035import cascading.flow.planner.graph.AnnotatedGraph;
036import cascading.flow.planner.graph.ElementGraph;
037import cascading.flow.planner.graph.ElementGraphs;
038import cascading.flow.planner.graph.Extent;
039import cascading.flow.planner.graph.FlowElementGraph;
040import cascading.flow.stream.annotations.StreamMode;
041import cascading.pipe.Group;
042import cascading.pipe.Pipe;
043import cascading.tap.Tap;
044import cascading.util.ProcessLogger;
045import cascading.util.Util;
046
047import static cascading.util.Util.createIdentitySet;
048
049/**
050 *
051 */
052public class BaseFlowNode implements Serializable, FlowNode, ProcessLogger
053  {
054  private final String id;
055  private int ordinal;
056  private String name;
057  private Map<String, String> processAnnotations;
058
059  private transient FlowStep flowStep;
060
061  protected ElementGraph nodeSubGraph;
062  protected List<? extends ElementGraph> pipelineGraphs = Collections.emptyList();
063
064  private transient Set<FlowElement> sourceElements;
065  private transient Set<FlowElement> sinkElements;
066  private Map<String, Tap> trapMap;
067  private Set<Tap> sourceTaps;
068  private Set<Tap> sinkTaps;
069
070  private Map<Tap, Set<String>> reverseSourceTaps;
071  private Map<Tap, Set<String>> reverseSinkTaps;
072  private Map<FlowElement, ElementGraph> streamPipelineMap = Collections.emptyMap();
073
074  // for testing
075  public BaseFlowNode( String name, int ordinal )
076    {
077    this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number
078    setName( name );
079    this.ordinal = ordinal;
080    this.trapMap = Collections.emptyMap();
081    }
082
083  public BaseFlowNode( FlowElementGraph flowElementGraph, ElementGraph nodeSubGraph, List<? extends ElementGraph> pipelineGraphs )
084    {
085    this.id = Util.createUniqueIDWhichStartsWithAChar(); // timeline server cannot filter strings that start with a number
086    this.nodeSubGraph = nodeSubGraph;
087
088    if( pipelineGraphs != null )
089      this.pipelineGraphs = pipelineGraphs;
090
091    verifyPipelines();
092    createPipelineMap();
093
094    assignTrappableNames( flowElementGraph );
095    assignTraps( flowElementGraph.getTrapMap() );
096    }
097
098  public void setOrdinal( int ordinal )
099    {
100    this.ordinal = ordinal;
101    }
102
103  @Override
104  public int getOrdinal()
105    {
106    return ordinal;
107    }
108
109  @Override
110  public String getID()
111    {
112    return id;
113    }
114
115  public void setName( String name )
116    {
117    this.name = name;
118    }
119
120  @Override
121  public String getName()
122    {
123    return name;
124    }
125
126  @Override
127  public Map<String, String> getProcessAnnotations()
128    {
129    if( processAnnotations == null )
130      return Collections.emptyMap();
131
132    return Collections.unmodifiableMap( processAnnotations );
133    }
134
135  @Override
136  public void addProcessAnnotation( Enum annotation )
137    {
138    if( annotation == null )
139      return;
140
141    addProcessAnnotation( annotation.getDeclaringClass().getName(), annotation.name() );
142    }
143
144  @Override
145  public void addProcessAnnotation( String key, String value )
146    {
147    if( processAnnotations == null )
148      processAnnotations = new HashMap<>();
149
150    processAnnotations.put( key, value );
151    }
152
153  public void setFlowStep( FlowStep flowStep )
154    {
155    this.flowStep = flowStep;
156    }
157
158  @Override
159  public FlowStep getFlowStep()
160    {
161    return flowStep;
162    }
163
164  @Override
165  public ElementGraph getElementGraph()
166    {
167    return nodeSubGraph;
168    }
169
170  @Override
171  public Set<String> getSourceElementNames()
172    {
173    Set<String> results = new HashSet<>();
174
175    for( FlowElement flowElement : getSourceElements() )
176      {
177      if( flowElement instanceof Tap )
178        results.addAll( getSourceTapNames( (Tap) flowElement ) );
179      else
180        results.add( ( (Pipe) flowElement ).getName() );
181      }
182
183    return results;
184    }
185
186  public Set<FlowElement> getSourceElements()
187    {
188    if( sourceElements == null )
189      sourceElements = Collections.unmodifiableSet( ElementGraphs.findSources( nodeSubGraph, FlowElement.class ) );
190
191    return sourceElements;
192    }
193
194  @Override
195  public Set<? extends FlowElement> getSourceElements( Enum annotation )
196    {
197    Set<? extends FlowElement> annotated = getFlowElementsFor( annotation );
198    Set<FlowElement> sourceElements = getSourceElements();
199
200    Set<FlowElement> results = new HashSet<>();
201
202    for( FlowElement sourceElement : sourceElements )
203      {
204      if( annotated.contains( sourceElement ) )
205        results.add( sourceElement );
206      }
207
208    return results;
209    }
210
211  @Override
212  public Set<String> getSinkElementNames()
213    {
214    Set<String> results = new HashSet<>();
215
216    for( FlowElement flowElement : getSinkElements() )
217      {
218      if( flowElement instanceof Tap )
219        results.addAll( getSinkTapNames( (Tap) flowElement ) );
220      else
221        results.add( ( (Pipe) flowElement ).getName() );
222      }
223
224    return results;
225    }
226
227  @Override
228  public Set<FlowElement> getSinkElements()
229    {
230    if( sinkElements == null )
231      sinkElements = Collections.unmodifiableSet( ElementGraphs.findSinks( nodeSubGraph, FlowElement.class ) );
232
233    return sinkElements;
234    }
235
236  public Set<? extends FlowElement> getSinkElements( Enum annotation )
237    {
238    Set<? extends FlowElement> annotated = getFlowElementsFor( annotation );
239    Set<FlowElement> sinkElements = getSinkElements();
240
241    Set<FlowElement> results = new HashSet<>();
242
243    for( FlowElement sinkElement : sinkElements )
244      {
245      if( annotated.contains( sinkElement ) )
246        results.add( sinkElement );
247      }
248
249    return results;
250    }
251
252  @Override
253  public List<? extends ElementGraph> getPipelineGraphs()
254    {
255    return pipelineGraphs;
256    }
257
258  @Override
259  public ElementGraph getPipelineGraphFor( FlowElement streamedSource )
260    {
261    return streamPipelineMap.get( streamedSource );
262    }
263
264  @Override
265  public Collection<Group> getGroups()
266    {
267    return ElementGraphs.findAllGroups( nodeSubGraph );
268    }
269
270  @Override
271  public Set<Tap> getSourceTaps()
272    {
273    if( sourceTaps != null )
274      return sourceTaps;
275
276    sourceTaps = Collections.unmodifiableSet( Util.narrowSet( Tap.class, getSourceElements() ) );
277
278    return sourceTaps;
279    }
280
281  @Override
282  public Set<Tap> getSinkTaps()
283    {
284    if( sinkTaps != null )
285      return sinkTaps;
286
287    sinkTaps = Collections.unmodifiableSet( Util.narrowSet( Tap.class, getSinkElements() ) );
288
289    return sinkTaps;
290    }
291
292  @Override
293  public int getSubmitPriority()
294    {
295    return 0;
296    }
297
298  @Override
299  public Set<String> getSourceTapNames( Tap source )
300    {
301    return reverseSourceTaps.get( source );
302    }
303
304  @Override
305  public Set<String> getSinkTapNames( Tap sink )
306    {
307    return reverseSinkTaps.get( sink );
308    }
309
310  private void assignTrappableNames( FlowElementGraph flowElementGraph )
311    {
312    reverseSourceTaps = new HashMap<>();
313    reverseSinkTaps = new HashMap<>();
314
315    Set<Tap> sources = getSourceTaps();
316
317    for( Tap source : sources )
318      {
319      Set<Scope> scopes = flowElementGraph.outgoingEdgesOf( source );
320
321      for( Scope scope : scopes )
322        addSourceName( scope.getName(), source );
323      }
324
325    for( Map.Entry<String, Tap> entry : flowElementGraph.getSourceMap().entrySet() )
326      {
327      if( sources.contains( entry.getValue() ) )
328        addSourceName( entry.getKey(), entry.getValue() );
329      }
330
331    Set<Tap> sinks = getSinkTaps();
332
333    for( Tap sink : sinks )
334      {
335      Set<Scope> scopes = flowElementGraph.incomingEdgesOf( sink );
336
337      for( Scope scope : scopes )
338        addSinkName( scope.getName(), sink );
339      }
340
341    for( Map.Entry<String, Tap> entry : flowElementGraph.getSinkMap().entrySet() )
342      {
343      if( sinks.contains( entry.getValue() ) )
344        addSinkName( entry.getKey(), entry.getValue() );
345      }
346    }
347
348  private void addSourceName( String name, Tap source )
349    {
350    if( !reverseSourceTaps.containsKey( source ) )
351      reverseSourceTaps.put( source, new HashSet<String>() );
352
353    reverseSourceTaps.get( source ).add( name );
354    }
355
356  private void addSinkName( String name, Tap sink )
357    {
358    if( !reverseSinkTaps.containsKey( sink ) )
359      reverseSinkTaps.put( sink, new HashSet<String>() );
360
361    reverseSinkTaps.get( sink ).add( name );
362    }
363
364  @Override
365  public Map<String, Tap> getTrapMap()
366    {
367    return trapMap;
368    }
369
370  @Override
371  public Collection<? extends Tap> getTraps()
372    {
373    return getTrapMap().values();
374    }
375
376  private void assignTraps( Map<String, Tap> traps )
377    {
378    trapMap = new HashMap<>();
379
380    for( FlowElement flowElement : nodeSubGraph.vertexSet() )
381      {
382      Set<String> names = new HashSet<>();
383
384      if( flowElement instanceof Extent )
385        continue;
386
387      if( flowElement instanceof Pipe )
388        {
389        names.add( ( (Pipe) flowElement ).getName() );
390        }
391      else
392        {
393        Set<String> sourceTapNames = getSourceTapNames( (Tap) flowElement );
394
395        if( sourceTapNames != null )
396          names.addAll( sourceTapNames );
397
398        Set<String> sinkTapNames = getSinkTapNames( (Tap) flowElement );
399
400        if( sinkTapNames != null )
401          names.addAll( sinkTapNames );
402        }
403
404      for( String name : names )
405        {
406        if( traps.containsKey( name ) )
407          trapMap.put( name, traps.get( name ) );
408        }
409      }
410    }
411
412  private void verifyPipelines()
413    {
414    if( pipelineGraphs == null || pipelineGraphs.isEmpty() )
415      return;
416
417    Set<FlowElement> allElements = createIdentitySet( nodeSubGraph.vertexSet() );
418
419    for( ElementGraph pipelineGraph : pipelineGraphs )
420      allElements.removeAll( pipelineGraph.vertexSet() );
421
422    if( !allElements.isEmpty() )
423      throw new IllegalStateException( "union of pipeline graphs for flow node are missing elements: " + Util.join( allElements, ", " ) );
424    }
425
426  private void createPipelineMap()
427    {
428    if( pipelineGraphs == null || pipelineGraphs.isEmpty() )
429      return;
430
431    Map<FlowElement, ElementGraph> map = new HashMap<>( pipelineGraphs.size() );
432
433    for( ElementGraph pipelineGraph : pipelineGraphs )
434      {
435      if( !( pipelineGraph instanceof AnnotatedGraph ) )
436        throw new IllegalStateException( "pipeline graphs must be of type AnnotatedGraph, got: " + pipelineGraph.getClass().getName() );
437
438      Set<FlowElement> flowElements;
439
440      if( ( (AnnotatedGraph) pipelineGraph ).hasAnnotations() )
441        flowElements = ( (AnnotatedGraph) pipelineGraph ).getAnnotations().getValues( StreamMode.Streamed );
442      else
443        flowElements = ElementGraphs.findSources( pipelineGraph, FlowElement.class );
444
445      for( FlowElement flowElement : flowElements )
446        {
447        if( map.containsKey( flowElement ) )
448          throw new IllegalStateException( "duplicate streamable elements, found:  " + flowElement );
449
450        map.put( flowElement, pipelineGraph );
451        }
452      }
453
454    this.streamPipelineMap = map;
455    }
456
457  @Override
458  public Tap getTrap( String branchName )
459    {
460    return trapMap.get( branchName );
461    }
462
463  @Override
464  public Collection<? extends Scope> getPreviousScopes( FlowElement flowElement )
465    {
466    return nodeSubGraph.incomingEdgesOf( flowElement );
467    }
468
469  @Override
470  public Collection<? extends Scope> getNextScopes( FlowElement flowElement )
471    {
472    return nodeSubGraph.outgoingEdgesOf( flowElement );
473    }
474
475  @Override
476  public boolean equals( Object object )
477    {
478    if( this == object )
479      return true;
480
481    if( object == null || getClass() != object.getClass() )
482      return false;
483
484    BaseFlowNode flowNode = (BaseFlowNode) object;
485
486    if( id != null ? !id.equals( flowNode.id ) : flowNode.id != null )
487      return false;
488
489    return true;
490    }
491
492  @Override
493  public int hashCode()
494    {
495    return id != null ? id.hashCode() : 0;
496    }
497
498  @Override
499  public Set<? extends FlowElement> getFlowElementsFor( Enum annotation )
500    {
501    if( pipelineGraphs.isEmpty() )
502      return ( (AnnotatedGraph) getElementGraph() ).getAnnotations().getValues( annotation );
503
504    Set<FlowElement> results = createIdentitySet();
505
506    for( ElementGraph pipelineGraph : pipelineGraphs )
507      results.addAll( ( (AnnotatedGraph) pipelineGraph ).getAnnotations().getValues( annotation ) );
508
509    return results;
510    }
511
512  private ProcessLogger getLogger()
513    {
514    if( flowStep != null && flowStep instanceof ProcessLogger )
515      return (ProcessLogger) flowStep;
516
517    return ProcessLogger.NULL;
518    }
519
520  @Override
521  public boolean isInfoEnabled()
522    {
523    return getLogger().isInfoEnabled();
524    }
525
526  @Override
527  public boolean isDebugEnabled()
528    {
529    return getLogger().isDebugEnabled();
530    }
531
532  @Override
533  public void logInfo( String message, Object... arguments )
534    {
535    getLogger().logInfo( message, arguments );
536    }
537
538  @Override
539  public void logDebug( String message, Object... arguments )
540    {
541    getLogger().logDebug( message, arguments );
542    }
543
544  @Override
545  public void logWarn( String message )
546    {
547    getLogger().logWarn( message );
548    }
549
550  @Override
551  public void logWarn( String message, Object... arguments )
552    {
553    getLogger().logWarn( message, arguments );
554    }
555
556  @Override
557  public void logWarn( String message, Throwable throwable )
558    {
559    getLogger().logWarn( message, throwable );
560    }
561
562  @Override
563  public void logError( String message, Object... arguments )
564    {
565    getLogger().logError( message, arguments );
566    }
567
568  @Override
569  public void logError( String message, Throwable throwable )
570    {
571    getLogger().logError( message, throwable );
572    }
573  }