001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.process;
022
023import java.io.FileWriter;
024import java.io.IOException;
025import java.io.Writer;
026import java.util.ArrayList;
027import java.util.Collection;
028import java.util.Comparator;
029import java.util.HashMap;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.List;
033import java.util.Map;
034import java.util.PriorityQueue;
035import java.util.Set;
036
037import cascading.flow.FlowElement;
038import cascading.flow.planner.Scope;
039import cascading.flow.planner.graph.AnnotatedGraph;
040import cascading.flow.planner.graph.ElementGraph;
041import cascading.flow.planner.graph.ElementGraphs;
042import cascading.flow.planner.graph.Extent;
043import cascading.pipe.Group;
044import cascading.tap.Tap;
045import cascading.util.EnumMultiMap;
046import cascading.util.Util;
047import cascading.util.jgrapht.IntegerNameProvider;
048import cascading.util.jgrapht.VertexNameProvider;
049import org.jgrapht.graph.SimpleDirectedGraph;
050import org.jgrapht.traverse.TopologicalOrderIterator;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import static cascading.util.Util.createIdentitySet;
055
056/**
057 *
058 */
059public abstract class BaseProcessGraph<Process extends ProcessModel> implements ProcessGraph<Process>
060  {
061  /** Field LOG */
062  private static final Logger LOG = LoggerFactory.getLogger( BaseProcessGraph.class );
063
064  final SimpleDirectedGraph<Process, ProcessEdge> graph;
065
066  protected Set<FlowElement> sourceElements = createIdentitySet();
067  protected Set<FlowElement> sinkElements = createIdentitySet();
068  private Set<Tap> sourceTaps;
069  private Set<Tap> sinkTaps;
070  protected Map<String, Tap> trapsMap = new HashMap<>();
071
072  public BaseProcessGraph()
073    {
074    graph = new SimpleDirectedGraph( ProcessEdge.class );
075    }
076
077  @Override
078  public boolean addVertex( Process process )
079    {
080    sourceElements.addAll( process.getSourceElements() );
081    sinkElements.addAll( process.getSinkElements() );
082    trapsMap.putAll( process.getTrapMap() );
083
084    return graph.addVertex( process );
085    }
086
087  protected void bindEdges()
088    {
089    for( Process sinkProcess : vertexSet() )
090      {
091      for( Process sourceProcess : vertexSet() )
092        {
093        if( sourceProcess == sinkProcess )
094          continue;
095
096        // outer edge sources and sinks to this graph
097        sourceElements.removeAll( sinkProcess.getSinkElements() );
098        sinkElements.removeAll( sourceProcess.getSourceElements() );
099        }
100      }
101
102    for( Process sinkProcess : vertexSet() )
103      {
104      for( Process sourceProcess : vertexSet() )
105        {
106        if( sourceProcess == sinkProcess )
107          continue;
108
109        for( Object intermediate : sourceProcess.getSinkElements() )
110          {
111          if( sinkProcess.getSourceElements().contains( intermediate ) )
112            addEdge( sourceProcess, sinkProcess, new ProcessEdge<>( sourceProcess, (FlowElement) intermediate, sinkProcess ) );
113          }
114        }
115      }
116    }
117
118  @Override
119  public Set<FlowElement> getSourceElements()
120    {
121    return sourceElements;
122    }
123
124  @Override
125  public Set<FlowElement> getSinkElements()
126    {
127    return sinkElements;
128    }
129
130  @Override
131  public Set<Tap> getSourceTaps()
132    {
133    if( sourceTaps != null )
134      return sourceTaps;
135
136    sourceTaps = Util.narrowSet( Tap.class, getSourceElements() );
137
138    return sourceTaps;
139    }
140
141  @Override
142  public Set<Tap> getSinkTaps()
143    {
144    if( sinkTaps != null )
145      return sinkTaps;
146
147    sinkTaps = Util.narrowSet( Tap.class, getSinkElements() );
148
149    return sinkTaps;
150    }
151
152  @Override
153  public Map<String, Tap> getTrapsMap()
154    {
155    return trapsMap;
156    }
157
158  @Override
159  public Iterator<Process> getTopologicalIterator()
160    {
161    return getOrderedTopologicalIterator( new Comparator<Process>()
162    {
163    @Override
164    public int compare( Process lhs, Process rhs )
165      {
166      return Integer.valueOf( lhs.getSubmitPriority() ).compareTo( rhs.getSubmitPriority() );
167      }
168    } );
169    }
170
171  @Override
172  public Iterator<Process> getOrdinalTopologicalIterator()
173    {
174    return getOrderedTopologicalIterator( new Comparator<Process>()
175    {
176    @Override
177    public int compare( Process lhs, Process rhs )
178      {
179      return Integer.valueOf( lhs.getOrdinal() ).compareTo( rhs.getOrdinal() );
180      }
181    } );
182    }
183
184  @Override
185  public Iterator<Process> getOrderedTopologicalIterator( Comparator<Process> comparator )
186    {
187    return new TopologicalOrderIterator<>( graph, new PriorityQueue<>( 10, comparator ) );
188    }
189
190  @Override
191  public List<ElementGraph> getElementGraphs( FlowElement flowElement )
192    {
193    List<Process> elementProcesses = getElementProcesses( flowElement );
194
195    List<ElementGraph> elementGraphs = new ArrayList<>();
196
197    for( Process elementProcess : elementProcesses )
198      elementGraphs.add( elementProcess.getElementGraph() );
199
200    return elementGraphs;
201    }
202
203  @Override
204  public List<Process> getElementProcesses( FlowElement flowElement )
205    {
206    List<Process> processes = new ArrayList<>();
207
208    for( Process process : vertexSet() )
209      {
210      if( process.getElementGraph().vertexSet().contains( flowElement ) )
211        processes.add( process );
212      }
213
214    return processes;
215    }
216
217  @Override
218  public List<ElementGraph> getElementGraphs( Scope scope )
219    {
220    List<Process> elementProcesses = getElementProcesses( scope );
221
222    List<ElementGraph> elementGraphs = new ArrayList<>();
223
224    for( Process elementProcess : elementProcesses )
225      elementGraphs.add( elementProcess.getElementGraph() );
226
227    return elementGraphs;
228    }
229
230  @Override
231  public List<Process> getElementProcesses( Scope scope )
232    {
233    List<Process> processes = new ArrayList<>();
234
235    for( Process process : vertexSet() )
236      {
237      if( process.getElementGraph().edgeSet().contains( scope ) )
238        processes.add( process );
239      }
240
241    return processes;
242    }
243
244  @Override
245  public List<Process> getElementSourceProcesses( FlowElement flowElement )
246    {
247    List<Process> sources = new ArrayList<>();
248
249    for( Process process : vertexSet() )
250      {
251      if( process.getSinkElements().contains( flowElement ) )
252        sources.add( process );
253      }
254
255    return sources;
256    }
257
258  @Override
259  public List<Process> getElementSinkProcesses( FlowElement flowElement )
260    {
261    List<Process> sinks = new ArrayList<>();
262
263    for( Process process : vertexSet() )
264      {
265      if( process.getSourceElements().contains( flowElement ) )
266        sinks.add( process );
267      }
268
269    return sinks;
270    }
271
272  @Override
273  public Set<FlowElement> getAllSourceElements()
274    {
275    Set<FlowElement> results = createIdentitySet();
276
277    for( Process process : vertexSet() )
278      results.addAll( process.getSourceElements() );
279
280    return results;
281    }
282
283  @Override
284  public Set<FlowElement> getAllSinkElements()
285    {
286    Set<FlowElement> results = createIdentitySet();
287
288    for( Process process : vertexSet() )
289      results.addAll( process.getSinkElements() );
290
291    return results;
292    }
293
294  public EnumMultiMap<FlowElement> getAnnotations()
295    {
296    EnumMultiMap<FlowElement> annotations = new EnumMultiMap<>();
297
298    for( Process process : vertexSet() )
299      {
300      ElementGraph elementGraph = process.getElementGraph();
301
302      if( elementGraph instanceof AnnotatedGraph )
303        annotations.addAll( ( (AnnotatedGraph) elementGraph ).getAnnotations() );
304      }
305
306    return annotations;
307    }
308
309  /**
310   * All elements, from the given ElementGraph, that belong to two or more processes, that are not sink or source elements that
311   * connect processes.
312   *
313   * @return Set
314   */
315  @Override
316  public Set<FlowElement> getDuplicatedElements( ElementGraph elementGraph )
317    {
318    Set<FlowElement> results = createIdentitySet();
319
320    for( FlowElement flowElement : elementGraph.vertexSet() )
321      {
322      if( getElementProcesses( flowElement ).size() > 1 )
323        results.add( flowElement );
324      }
325
326    results.remove( Extent.head );
327    results.remove( Extent.tail );
328    results.removeAll( getAllSourceElements() );
329    results.removeAll( getAllSinkElements() );
330
331    return results;
332    }
333
334  @Override
335  public Set<ElementGraph> getIdentityElementGraphs()
336    {
337    Set<ElementGraph> results = createIdentitySet();
338
339    for( Process process : getIdentityProcesses() )
340      results.add( process.getElementGraph() );
341
342    return results;
343    }
344
345  /**
346   * Returns a set of processes that perform no internal operations.
347   * <p/>
348   * for example if a FlowNode only has a Merge source and a GroupBy sink.
349   *
350   * @return
351   */
352  @Override
353  public Set<Process> getIdentityProcesses()
354    {
355    Set<Process> results = new HashSet<>();
356
357    for( Process process : vertexSet() )
358      {
359      if( ProcessModels.isIdentity( process ) )
360        results.add( process );
361      }
362
363    return results;
364    }
365
366  /**
367   * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging.
368   *
369   * @param filename of type String
370   */
371  @Override
372  public void writeDOT( String filename )
373    {
374    printProcessGraph( filename );
375    }
376
377  protected void printProcessGraph( String filename )
378    {
379    try
380      {
381      Writer writer = new FileWriter( filename );
382
383      Util.writeDOT( writer, graph, new IntegerNameProvider<Process>(), new VertexNameProvider<Process>()
384      {
385      public String getVertexName( Process process )
386        {
387        String name = "[" + process.getName() + "]";
388
389        String sourceName = "";
390        Set<Tap> sources = process.getSourceTaps();
391        for( Tap source : sources )
392          sourceName += "\\nsrc:[" + source.getIdentifier() + "]";
393
394        if( sourceName.length() != 0 )
395          name += sourceName;
396
397        Collection<Group> groups = process.getGroups();
398
399        for( Group group : groups )
400          {
401          String groupName = group.getName();
402
403          if( groupName.length() != 0 )
404            name += "\\ngrp:" + groupName;
405          }
406
407        Set<Tap> sinks = process.getSinkTaps();
408        String sinkName = "";
409        for( Tap sink : sinks )
410          sinkName = "\\nsnk:[" + sink.getIdentifier() + "]";
411
412        if( sinkName.length() != 0 )
413          name += sinkName;
414
415        return name.replaceAll( "\"", "\'" );
416        }
417      }, null );
418
419      writer.close();
420      }
421    catch( IOException exception )
422      {
423      LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception );
424      }
425    }
426
427  @Override
428  public void writeDOTNested( String filename, ElementGraph graph )
429    {
430    ElementGraphs.printProcessGraph( filename, graph, this );
431    }
432
433  public boolean containsEdge( Process sourceVertex, Process targetVertex )
434    {
435    return graph.containsEdge( sourceVertex, targetVertex );
436    }
437
438  public boolean removeAllEdges( Collection<? extends ProcessEdge> edges )
439    {
440    return graph.removeAllEdges( edges );
441    }
442
443  public Set<ProcessEdge> removeAllEdges( Process sourceVertex, Process targetVertex )
444    {
445    return graph.removeAllEdges( sourceVertex, targetVertex );
446    }
447
448  public boolean removeAllVertices( Collection<? extends Process> vertices )
449    {
450    return graph.removeAllVertices( vertices );
451    }
452
453  public Set<ProcessEdge> getAllEdges( Process sourceVertex, Process targetVertex )
454    {
455    return graph.getAllEdges( sourceVertex, targetVertex );
456    }
457
458  public ProcessEdge getEdge( Process sourceVertex, Process targetVertex )
459    {
460    return graph.getEdge( sourceVertex, targetVertex );
461    }
462
463  public ProcessEdge addEdge( Process sourceVertex, Process targetVertex )
464    {
465    return graph.addEdge( sourceVertex, targetVertex );
466    }
467
468  public boolean addEdge( Process sourceVertex, Process targetVertex, ProcessEdge processEdge )
469    {
470    return graph.addEdge( sourceVertex, targetVertex, processEdge );
471    }
472
473  public Process getEdgeSource( ProcessEdge processEdge )
474    {
475    return graph.getEdgeSource( processEdge );
476    }
477
478  public Process getEdgeTarget( ProcessEdge processEdge )
479    {
480    return graph.getEdgeTarget( processEdge );
481    }
482
483  public boolean containsEdge( ProcessEdge processEdge )
484    {
485    return graph.containsEdge( processEdge );
486    }
487
488  public boolean containsVertex( Process process )
489    {
490    return graph.containsVertex( process );
491    }
492
493  public Set<ProcessEdge> edgeSet()
494    {
495    return graph.edgeSet();
496    }
497
498  public Set<ProcessEdge> edgesOf( Process vertex )
499    {
500    return graph.edgesOf( vertex );
501    }
502
503  public int inDegreeOf( Process vertex )
504    {
505    return graph.inDegreeOf( vertex );
506    }
507
508  public Set<ProcessEdge> incomingEdgesOf( Process vertex )
509    {
510    return graph.incomingEdgesOf( vertex );
511    }
512
513  public int outDegreeOf( Process vertex )
514    {
515    return graph.outDegreeOf( vertex );
516    }
517
518  public Set<ProcessEdge> outgoingEdgesOf( Process vertex )
519    {
520    return graph.outgoingEdgesOf( vertex );
521    }
522
523  public ProcessEdge removeEdge( Process sourceVertex, Process targetVertex )
524    {
525    return graph.removeEdge( sourceVertex, targetVertex );
526    }
527
528  public boolean removeEdge( ProcessEdge processEdge )
529    {
530    return graph.removeEdge( processEdge );
531    }
532
533  public boolean removeVertex( Process process )
534    {
535    return graph.removeVertex( process );
536    }
537
538  public Set<Process> vertexSet()
539    {
540    return graph.vertexSet();
541    }
542
543  public double getEdgeWeight( ProcessEdge processEdge )
544    {
545    return graph.getEdgeWeight( processEdge );
546    }
547  }