001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.process;
022
023import java.io.FileWriter;
024import java.io.IOException;
025import java.io.Writer;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Comparator;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.PriorityQueue;
035import java.util.Set;
036
037import cascading.flow.FlowElement;
038import cascading.flow.planner.Scope;
039import cascading.flow.planner.graph.AnnotatedGraph;
040import cascading.flow.planner.graph.ElementGraph;
041import cascading.flow.planner.graph.ElementGraphs;
042import cascading.flow.planner.graph.Extent;
043import cascading.pipe.Group;
044import cascading.tap.Tap;
045import cascading.util.EnumMultiMap;
046import cascading.util.Util;
047import cascading.util.jgrapht.IntegerNameProvider;
048import cascading.util.jgrapht.VertexNameProvider;
049import org.jgrapht.graph.SimpleDirectedGraph;
050import org.jgrapht.traverse.TopologicalOrderIterator;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import static cascading.util.Util.createIdentitySet;
055
056/**
057 *
058 */
059public abstract class BaseProcessGraph<Process extends ProcessModel> implements ProcessGraph<Process>
060  {
061  /** Field LOG */
062  private static final Logger LOG = LoggerFactory.getLogger( BaseProcessGraph.class );
063
064  final SimpleDirectedGraph<Process, ProcessEdge> graph;
065
066  protected Set<FlowElement> sourceElements = createIdentitySet();
067  protected Set<FlowElement> sinkElements = createIdentitySet();
068  private Set<Tap> sourceTaps;
069  private Set<Tap> sinkTaps;
070  protected Map<String, Tap> trapsMap = new HashMap<>();
071
072  public BaseProcessGraph()
073    {
074    graph = new SimpleDirectedGraph( ProcessEdge.class );
075    }
076
077  @Override
078  public boolean addVertex( Process process )
079    {
080    sourceElements.addAll( process.getSourceElements() );
081    sinkElements.addAll( process.getSinkElements() );
082    trapsMap.putAll( process.getTrapMap() );
083
084    return graph.addVertex( process );
085    }
086
087  protected void bindEdges()
088    {
089    for( Process sinkProcess : vertexSet() )
090      {
091      for( Process sourceProcess : vertexSet() )
092        {
093        if( sourceProcess == sinkProcess )
094          continue;
095
096        // outer edge sources and sinks to this graph
097        sourceElements.removeAll( sinkProcess.getSinkElements() );
098        sinkElements.removeAll( sourceProcess.getSourceElements() );
099        }
100      }
101
102    for( Process sinkProcess : vertexSet() )
103      {
104      for( Process sourceProcess : vertexSet() )
105        {
106        if( sourceProcess == sinkProcess )
107          continue;
108
109        for( Object intermediate : sourceProcess.getSinkElements() )
110          {
111          if( sinkProcess.getSourceElements().contains( intermediate ) )
112            addEdge( sourceProcess, sinkProcess, new ProcessEdge<>( sourceProcess, (FlowElement) intermediate, sinkProcess ) );
113          }
114        }
115      }
116    }
117
118  @Override
119  public Set<FlowElement> getSourceElements()
120    {
121    return sourceElements;
122    }
123
124  @Override
125  public Set<FlowElement> getSinkElements()
126    {
127    return sinkElements;
128    }
129
130  @Override
131  public Set<Tap> getSourceTaps()
132    {
133    if( sourceTaps != null )
134      return sourceTaps;
135
136    sourceTaps = Util.narrowIdentitySet( Tap.class, getSourceElements() );
137
138    return sourceTaps;
139    }
140
141  @Override
142  public Map<String, Tap> getSourceTapsMap()
143    {
144    Map<String, Tap> result = new HashMap<>();
145    Set<Tap> sourceTaps = getSourceTaps();
146
147    for( Tap sourceTap : sourceTaps )
148      {
149      for( Process process : graph.vertexSet() )
150        {
151        if( !process.getSourceTaps().contains( sourceTap ) )
152          continue;
153
154        ElementGraph elementGraph = process.getElementGraph();
155
156        for( Scope scope : elementGraph.outgoingEdgesOf( sourceTap ) )
157          result.put( scope.getName(), sourceTap );
158        }
159      }
160
161    return result;
162    }
163
164  @Override
165  public Set<Tap> getSinkTaps()
166    {
167    if( sinkTaps != null )
168      return sinkTaps;
169
170    sinkTaps = Util.narrowIdentitySet( Tap.class, getSinkElements() );
171
172    return sinkTaps;
173    }
174
175  @Override
176  public Map<String, Tap> getSinkTapsMap()
177    {
178    Map<String, Tap> result = new HashMap<>();
179    Set<Tap> sinkTaps = getSinkTaps();
180
181    for( Tap sinkTap : sinkTaps )
182      {
183      for( Process process : graph.vertexSet() )
184        {
185        if( !process.getSinkTaps().contains( sinkTap ) )
186          continue;
187
188        ElementGraph elementGraph = process.getElementGraph();
189
190        for( Scope scope : elementGraph.incomingEdgesOf( sinkTap ) )
191          result.put( scope.getName(), sinkTap );
192        }
193      }
194
195    return result;
196    }
197
198  @Override
199  public Map<String, Tap> getTrapsMap()
200    {
201    return trapsMap;
202    }
203
204  @Override
205  public Iterator<Process> getTopologicalIterator()
206    {
207    return getOrderedTopologicalIterator( new Comparator<Process>()
208    {
209    @Override
210    public int compare( Process lhs, Process rhs )
211      {
212      return Integer.valueOf( lhs.getSubmitPriority() ).compareTo( rhs.getSubmitPriority() );
213      }
214    } );
215    }
216
217  @Override
218  public Iterator<Process> getOrdinalTopologicalIterator()
219    {
220    return getOrderedTopologicalIterator( new Comparator<Process>()
221    {
222    @Override
223    public int compare( Process lhs, Process rhs )
224      {
225      return Integer.valueOf( lhs.getOrdinal() ).compareTo( rhs.getOrdinal() );
226      }
227    } );
228    }
229
230  @Override
231  public Iterator<Process> getOrderedTopologicalIterator( Comparator<Process> comparator )
232    {
233    return new TopologicalOrderIterator<>( graph, new PriorityQueue<>( 10, comparator ) );
234    }
235
236  @Override
237  public List<ElementGraph> getElementGraphs( FlowElement flowElement )
238    {
239    List<Process> elementProcesses = getElementProcesses( flowElement );
240
241    List<ElementGraph> elementGraphs = new ArrayList<>();
242
243    for( Process elementProcess : elementProcesses )
244      elementGraphs.add( elementProcess.getElementGraph() );
245
246    return elementGraphs;
247    }
248
249  @Override
250  public List<Process> getElementProcesses( FlowElement flowElement )
251    {
252    List<Process> processes = new ArrayList<>();
253
254    for( Process process : vertexSet() )
255      {
256      if( process.getElementGraph().vertexSet().contains( flowElement ) )
257        processes.add( process );
258      }
259
260    return processes;
261    }
262
263  @Override
264  public List<ElementGraph> getElementGraphs( Scope scope )
265    {
266    List<Process> elementProcesses = getElementProcesses( scope );
267
268    List<ElementGraph> elementGraphs = new ArrayList<>();
269
270    for( Process elementProcess : elementProcesses )
271      elementGraphs.add( elementProcess.getElementGraph() );
272
273    return elementGraphs;
274    }
275
276  @Override
277  public List<Process> getElementProcesses( Scope scope )
278    {
279    List<Process> processes = new ArrayList<>();
280
281    for( Process process : vertexSet() )
282      {
283      if( process.getElementGraph().edgeSet().contains( scope ) )
284        processes.add( process );
285      }
286
287    return processes;
288    }
289
290  @Override
291  public List<Process> getElementSourceProcesses( FlowElement flowElement )
292    {
293    List<Process> sources = new ArrayList<>();
294
295    for( Process process : vertexSet() )
296      {
297      if( process.getSinkElements().contains( flowElement ) )
298        sources.add( process );
299      }
300
301    return sources;
302    }
303
304  @Override
305  public List<Process> getElementSinkProcesses( FlowElement flowElement )
306    {
307    List<Process> sinks = new ArrayList<>();
308
309    for( Process process : vertexSet() )
310      {
311      if( process.getSourceElements().contains( flowElement ) )
312        sinks.add( process );
313      }
314
315    return sinks;
316    }
317
318  @Override
319  public Set<FlowElement> getAllSourceElements()
320    {
321    Set<FlowElement> results = createIdentitySet();
322
323    for( Process process : vertexSet() )
324      results.addAll( process.getSourceElements() );
325
326    return results;
327    }
328
329  @Override
330  public Set<FlowElement> getAllSinkElements()
331    {
332    Set<FlowElement> results = createIdentitySet();
333
334    for( Process process : vertexSet() )
335      results.addAll( process.getSinkElements() );
336
337    return results;
338    }
339
340  public EnumMultiMap<FlowElement> getAnnotations()
341    {
342    EnumMultiMap<FlowElement> annotations = new EnumMultiMap<>();
343
344    for( Process process : vertexSet() )
345      {
346      ElementGraph elementGraph = process.getElementGraph();
347
348      if( elementGraph instanceof AnnotatedGraph )
349        annotations.addAll( ( (AnnotatedGraph) elementGraph ).getAnnotations() );
350      }
351
352    return annotations;
353    }
354
355  /**
356   * All elements, from the given ElementGraph, that belong to two or more processes, that are not sink or source elements that
357   * connect processes.
358   *
359   * @return Set
360   */
361  @Override
362  public Set<FlowElement> getDuplicatedElements( ElementGraph elementGraph )
363    {
364    Set<FlowElement> results = createIdentitySet();
365
366    for( FlowElement flowElement : elementGraph.vertexSet() )
367      {
368      if( getElementProcesses( flowElement ).size() > 1 )
369        results.add( flowElement );
370      }
371
372    results.remove( Extent.head );
373    results.remove( Extent.tail );
374    results.removeAll( getAllSourceElements() );
375    results.removeAll( getAllSinkElements() );
376
377    return results;
378    }
379
380  @Override
381  public Set<ElementGraph> getIdentityElementGraphs()
382    {
383    Set<ElementGraph> results = createIdentitySet();
384
385    for( Process process : getIdentityProcesses() )
386      results.add( process.getElementGraph() );
387
388    return results;
389    }
390
391  /**
392   * Returns a set of processes that perform no internal operations.
393   * <p/>
394   * for example if a FlowNode only has a Merge source and a GroupBy sink.
395   *
396   * @return
397   */
398  @Override
399  public Set<Process> getIdentityProcesses()
400    {
401    Set<Process> results = new HashSet<>();
402
403    for( Process process : vertexSet() )
404      {
405      if( ProcessModels.isIdentity( process ) )
406        results.add( process );
407      }
408
409    return results;
410    }
411
412  /**
413   * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging.
414   *
415   * @param filename of type String
416   */
417  @Override
418  public void writeDOT( String filename )
419    {
420    printProcessGraph( filename );
421    }
422
423  protected void printProcessGraph( String filename )
424    {
425    try
426      {
427      Writer writer = new FileWriter( filename );
428
429      Util.writeDOT( writer, graph, new IntegerNameProvider<Process>(), new VertexNameProvider<Process>()
430      {
431      public String getVertexName( Process process )
432        {
433        String name = "[" + process.getName() + "]";
434
435        String sourceName = "";
436        Set<Tap> sources = process.getSourceTaps();
437        for( Tap source : sources )
438          sourceName += "\\nsrc:[" + source.getIdentifier() + "]";
439
440        if( sourceName.length() != 0 )
441          name += sourceName;
442
443        Collection<Group> groups = process.getGroups();
444
445        for( Group group : groups )
446          {
447          String groupName = group.getName();
448
449          if( groupName.length() != 0 )
450            name += "\\ngrp:" + groupName;
451          }
452
453        Set<Tap> sinks = process.getSinkTaps();
454        String sinkName = "";
455        for( Tap sink : sinks )
456          sinkName = "\\nsnk:[" + sink.getIdentifier() + "]";
457
458        if( sinkName.length() != 0 )
459          name += sinkName;
460
461        return name.replaceAll( "\"", "\'" );
462        }
463      }, null );
464
465      writer.close();
466      }
467    catch( IOException exception )
468      {
469      LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
470      }
471    }
472
473  @Override
474  public void writeDOTNested( String filename, ElementGraph graph )
475    {
476    ElementGraphs.printProcessGraph( filename, graph, this );
477    }
478
479  public boolean containsEdge( Process sourceVertex, Process targetVertex )
480    {
481    return graph.containsEdge( sourceVertex, targetVertex );
482    }
483
484  public boolean removeAllEdges( Collection<? extends ProcessEdge> edges )
485    {
486    return graph.removeAllEdges( edges );
487    }
488
489  public Set<ProcessEdge> removeAllEdges( Process sourceVertex, Process targetVertex )
490    {
491    return graph.removeAllEdges( sourceVertex, targetVertex );
492    }
493
494  public boolean removeAllVertices( Collection<? extends Process> vertices )
495    {
496    return graph.removeAllVertices( vertices );
497    }
498
499  public Set<ProcessEdge> getAllEdges( Process sourceVertex, Process targetVertex )
500    {
501    return graph.getAllEdges( sourceVertex, targetVertex );
502    }
503
504  public ProcessEdge getEdge( Process sourceVertex, Process targetVertex )
505    {
506    return graph.getEdge( sourceVertex, targetVertex );
507    }
508
509  public ProcessEdge addEdge( Process sourceVertex, Process targetVertex )
510    {
511    return graph.addEdge( sourceVertex, targetVertex );
512    }
513
514  public boolean addEdge( Process sourceVertex, Process targetVertex, ProcessEdge processEdge )
515    {
516    return graph.addEdge( sourceVertex, targetVertex, processEdge );
517    }
518
519  public Process getEdgeSource( ProcessEdge processEdge )
520    {
521    return graph.getEdgeSource( processEdge );
522    }
523
524  public Process getEdgeTarget( ProcessEdge processEdge )
525    {
526    return graph.getEdgeTarget( processEdge );
527    }
528
529  public boolean containsEdge( ProcessEdge processEdge )
530    {
531    return graph.containsEdge( processEdge );
532    }
533
534  public boolean containsVertex( Process process )
535    {
536    return graph.containsVertex( process );
537    }
538
539  public Set<ProcessEdge> edgeSet()
540    {
541    return graph.edgeSet();
542    }
543
544  public Set<ProcessEdge> edgesOf( Process vertex )
545    {
546    return graph.edgesOf( vertex );
547    }
548
549  public int inDegreeOf( Process vertex )
550    {
551    return graph.inDegreeOf( vertex );
552    }
553
554  public Set<ProcessEdge> incomingEdgesOf( Process vertex )
555    {
556    return graph.incomingEdgesOf( vertex );
557    }
558
559  public int outDegreeOf( Process vertex )
560    {
561    return graph.outDegreeOf( vertex );
562    }
563
564  public Set<ProcessEdge> outgoingEdgesOf( Process vertex )
565    {
566    return graph.outgoingEdgesOf( vertex );
567    }
568
569  public ProcessEdge removeEdge( Process sourceVertex, Process targetVertex )
570    {
571    return graph.removeEdge( sourceVertex, targetVertex );
572    }
573
574  public boolean removeEdge( ProcessEdge processEdge )
575    {
576    return graph.removeEdge( processEdge );
577    }
578
579  public boolean removeVertex( Process process )
580    {
581    return graph.removeVertex( process );
582    }
583
584  public Set<Process> vertexSet()
585    {
586    return graph.vertexSet();
587    }
588
589  public double getEdgeWeight( ProcessEdge processEdge )
590    {
591    return graph.getEdgeWeight( processEdge );
592    }
593  }