001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.process; 022 023import java.io.FileWriter; 024import java.io.IOException; 025import java.io.Writer; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Comparator; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.PriorityQueue; 035import java.util.Set; 036 037import cascading.flow.FlowElement; 038import cascading.flow.planner.Scope; 039import cascading.flow.planner.graph.AnnotatedGraph; 040import cascading.flow.planner.graph.ElementGraph; 041import cascading.flow.planner.graph.ElementGraphs; 042import cascading.flow.planner.graph.Extent; 043import cascading.pipe.Group; 044import cascading.tap.Tap; 045import cascading.util.EnumMultiMap; 046import cascading.util.Util; 047import cascading.util.jgrapht.IntegerNameProvider; 048import cascading.util.jgrapht.VertexNameProvider; 049import org.jgrapht.graph.SimpleDirectedGraph; 050import org.jgrapht.traverse.TopologicalOrderIterator; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import static cascading.util.Util.createIdentitySet; 055 056/** 057 * 058 */ 059public abstract class BaseProcessGraph<Process extends ProcessModel> implements ProcessGraph<Process> 060 { 061 /** Field LOG */ 062 private static final Logger LOG = LoggerFactory.getLogger( BaseProcessGraph.class ); 063 064 final SimpleDirectedGraph<Process, ProcessEdge> graph; 065 066 protected Set<FlowElement> sourceElements = createIdentitySet(); 067 protected Set<FlowElement> sinkElements = createIdentitySet(); 068 private Set<Tap> sourceTaps; 069 private Set<Tap> sinkTaps; 070 protected Map<String, Tap> trapsMap = new HashMap<>(); 071 072 public BaseProcessGraph() 073 { 074 graph = new SimpleDirectedGraph( ProcessEdge.class ); 075 } 076 077 @Override 078 public boolean addVertex( Process process ) 079 { 080 sourceElements.addAll( process.getSourceElements() ); 081 sinkElements.addAll( process.getSinkElements() ); 082 trapsMap.putAll( process.getTrapMap() ); 083 084 return graph.addVertex( process ); 085 } 086 087 protected void bindEdges() 088 { 089 for( Process sinkProcess : vertexSet() ) 090 { 091 for( Process sourceProcess : vertexSet() ) 092 { 093 if( sourceProcess == sinkProcess ) 094 continue; 095 096 // outer edge sources and sinks to this graph 097 sourceElements.removeAll( sinkProcess.getSinkElements() ); 098 sinkElements.removeAll( sourceProcess.getSourceElements() ); 099 } 100 } 101 102 for( Process sinkProcess : vertexSet() ) 103 { 104 for( Process sourceProcess : vertexSet() ) 105 { 106 if( sourceProcess == sinkProcess ) 107 continue; 108 109 for( Object intermediate : sourceProcess.getSinkElements() ) 110 { 111 if( sinkProcess.getSourceElements().contains( intermediate ) ) 112 addEdge( sourceProcess, sinkProcess, new ProcessEdge<>( sourceProcess, (FlowElement) intermediate, sinkProcess ) ); 113 } 114 } 115 } 116 } 117 118 @Override 119 public Set<FlowElement> getSourceElements() 120 { 121 return sourceElements; 122 } 123 124 @Override 125 public Set<FlowElement> getSinkElements() 126 { 127 return sinkElements; 128 } 129 130 @Override 131 public Set<Tap> getSourceTaps() 132 { 133 if( sourceTaps != null ) 134 return sourceTaps; 135 136 sourceTaps = Util.narrowIdentitySet( Tap.class, getSourceElements() ); 137 138 return sourceTaps; 139 } 140 141 @Override 142 public Map<String, Tap> getSourceTapsMap() 143 { 144 Map<String, Tap> result = new HashMap<>(); 145 Set<Tap> sourceTaps = getSourceTaps(); 146 147 for( Tap sourceTap : sourceTaps ) 148 { 149 for( Process process : graph.vertexSet() ) 150 { 151 if( !process.getSourceTaps().contains( sourceTap ) ) 152 continue; 153 154 ElementGraph elementGraph = process.getElementGraph(); 155 156 for( Scope scope : elementGraph.outgoingEdgesOf( sourceTap ) ) 157 result.put( scope.getName(), sourceTap ); 158 } 159 } 160 161 return result; 162 } 163 164 @Override 165 public Set<Tap> getSinkTaps() 166 { 167 if( sinkTaps != null ) 168 return sinkTaps; 169 170 sinkTaps = Util.narrowIdentitySet( Tap.class, getSinkElements() ); 171 172 return sinkTaps; 173 } 174 175 @Override 176 public Map<String, Tap> getSinkTapsMap() 177 { 178 Map<String, Tap> result = new HashMap<>(); 179 Set<Tap> sinkTaps = getSinkTaps(); 180 181 for( Tap sinkTap : sinkTaps ) 182 { 183 for( Process process : graph.vertexSet() ) 184 { 185 if( !process.getSinkTaps().contains( sinkTap ) ) 186 continue; 187 188 ElementGraph elementGraph = process.getElementGraph(); 189 190 for( Scope scope : elementGraph.incomingEdgesOf( sinkTap ) ) 191 result.put( scope.getName(), sinkTap ); 192 } 193 } 194 195 return result; 196 } 197 198 @Override 199 public Map<String, Tap> getTrapsMap() 200 { 201 return trapsMap; 202 } 203 204 @Override 205 public Iterator<Process> getTopologicalIterator() 206 { 207 return getOrderedTopologicalIterator( new Comparator<Process>() 208 { 209 @Override 210 public int compare( Process lhs, Process rhs ) 211 { 212 return Integer.valueOf( lhs.getSubmitPriority() ).compareTo( rhs.getSubmitPriority() ); 213 } 214 } ); 215 } 216 217 @Override 218 public Iterator<Process> getOrdinalTopologicalIterator() 219 { 220 return getOrderedTopologicalIterator( new Comparator<Process>() 221 { 222 @Override 223 public int compare( Process lhs, Process rhs ) 224 { 225 return Integer.valueOf( lhs.getOrdinal() ).compareTo( rhs.getOrdinal() ); 226 } 227 } ); 228 } 229 230 @Override 231 public Iterator<Process> getOrderedTopologicalIterator( Comparator<Process> comparator ) 232 { 233 return new TopologicalOrderIterator<>( graph, new PriorityQueue<>( 10, comparator ) ); 234 } 235 236 @Override 237 public List<ElementGraph> getElementGraphs( FlowElement flowElement ) 238 { 239 List<Process> elementProcesses = getElementProcesses( flowElement ); 240 241 List<ElementGraph> elementGraphs = new ArrayList<>(); 242 243 for( Process elementProcess : elementProcesses ) 244 elementGraphs.add( elementProcess.getElementGraph() ); 245 246 return elementGraphs; 247 } 248 249 @Override 250 public List<Process> getElementProcesses( FlowElement flowElement ) 251 { 252 List<Process> processes = new ArrayList<>(); 253 254 for( Process process : vertexSet() ) 255 { 256 if( process.getElementGraph().vertexSet().contains( flowElement ) ) 257 processes.add( process ); 258 } 259 260 return processes; 261 } 262 263 @Override 264 public List<ElementGraph> getElementGraphs( Scope scope ) 265 { 266 List<Process> elementProcesses = getElementProcesses( scope ); 267 268 List<ElementGraph> elementGraphs = new ArrayList<>(); 269 270 for( Process elementProcess : elementProcesses ) 271 elementGraphs.add( elementProcess.getElementGraph() ); 272 273 return elementGraphs; 274 } 275 276 @Override 277 public List<Process> getElementProcesses( Scope scope ) 278 { 279 List<Process> processes = new ArrayList<>(); 280 281 for( Process process : vertexSet() ) 282 { 283 if( process.getElementGraph().edgeSet().contains( scope ) ) 284 processes.add( process ); 285 } 286 287 return processes; 288 } 289 290 @Override 291 public List<Process> getElementSourceProcesses( FlowElement flowElement ) 292 { 293 List<Process> sources = new ArrayList<>(); 294 295 for( Process process : vertexSet() ) 296 { 297 if( process.getSinkElements().contains( flowElement ) ) 298 sources.add( process ); 299 } 300 301 return sources; 302 } 303 304 @Override 305 public List<Process> getElementSinkProcesses( FlowElement flowElement ) 306 { 307 List<Process> sinks = new ArrayList<>(); 308 309 for( Process process : vertexSet() ) 310 { 311 if( process.getSourceElements().contains( flowElement ) ) 312 sinks.add( process ); 313 } 314 315 return sinks; 316 } 317 318 @Override 319 public Set<FlowElement> getAllSourceElements() 320 { 321 Set<FlowElement> results = createIdentitySet(); 322 323 for( Process process : vertexSet() ) 324 results.addAll( process.getSourceElements() ); 325 326 return results; 327 } 328 329 @Override 330 public Set<FlowElement> getAllSinkElements() 331 { 332 Set<FlowElement> results = createIdentitySet(); 333 334 for( Process process : vertexSet() ) 335 results.addAll( process.getSinkElements() ); 336 337 return results; 338 } 339 340 public EnumMultiMap<FlowElement> getAnnotations() 341 { 342 EnumMultiMap<FlowElement> annotations = new EnumMultiMap<>(); 343 344 for( Process process : vertexSet() ) 345 { 346 ElementGraph elementGraph = process.getElementGraph(); 347 348 if( elementGraph instanceof AnnotatedGraph ) 349 annotations.addAll( ( (AnnotatedGraph) elementGraph ).getAnnotations() ); 350 } 351 352 return annotations; 353 } 354 355 /** 356 * All elements, from the given ElementGraph, that belong to two or more processes, that are not sink or source elements that 357 * connect processes. 358 * 359 * @return Set 360 */ 361 @Override 362 public Set<FlowElement> getDuplicatedElements( ElementGraph elementGraph ) 363 { 364 Set<FlowElement> results = createIdentitySet(); 365 366 for( FlowElement flowElement : elementGraph.vertexSet() ) 367 { 368 if( getElementProcesses( flowElement ).size() > 1 ) 369 results.add( flowElement ); 370 } 371 372 results.remove( Extent.head ); 373 results.remove( Extent.tail ); 374 results.removeAll( getAllSourceElements() ); 375 results.removeAll( getAllSinkElements() ); 376 377 return results; 378 } 379 380 @Override 381 public Set<ElementGraph> getIdentityElementGraphs() 382 { 383 Set<ElementGraph> results = createIdentitySet(); 384 385 for( Process process : getIdentityProcesses() ) 386 results.add( process.getElementGraph() ); 387 388 return results; 389 } 390 391 /** 392 * Returns a set of processes that perform no internal operations. 393 * <p/> 394 * for example if a FlowNode only has a Merge source and a GroupBy sink. 395 * 396 * @return 397 */ 398 @Override 399 public Set<Process> getIdentityProcesses() 400 { 401 Set<Process> results = new HashSet<>(); 402 403 for( Process process : vertexSet() ) 404 { 405 if( ProcessModels.isIdentity( process ) ) 406 results.add( process ); 407 } 408 409 return results; 410 } 411 412 /** 413 * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging. 414 * 415 * @param filename of type String 416 */ 417 @Override 418 public void writeDOT( String filename ) 419 { 420 printProcessGraph( filename ); 421 } 422 423 protected void printProcessGraph( String filename ) 424 { 425 try 426 { 427 Writer writer = new FileWriter( filename ); 428 429 Util.writeDOT( writer, graph, new IntegerNameProvider<Process>(), new VertexNameProvider<Process>() 430 { 431 public String getVertexName( Process process ) 432 { 433 String name = "[" + process.getName() + "]"; 434 435 String sourceName = ""; 436 Set<Tap> sources = process.getSourceTaps(); 437 for( Tap source : sources ) 438 sourceName += "\\nsrc:[" + source.getIdentifier() + "]"; 439 440 if( sourceName.length() != 0 ) 441 name += sourceName; 442 443 Collection<Group> groups = process.getGroups(); 444 445 for( Group group : groups ) 446 { 447 String groupName = group.getName(); 448 449 if( groupName.length() != 0 ) 450 name += "\\ngrp:" + groupName; 451 } 452 453 Set<Tap> sinks = process.getSinkTaps(); 454 String sinkName = ""; 455 for( Tap sink : sinks ) 456 sinkName = "\\nsnk:[" + sink.getIdentifier() + "]"; 457 458 if( sinkName.length() != 0 ) 459 name += sinkName; 460 461 return name.replaceAll( "\"", "\'" ); 462 } 463 }, null ); 464 465 writer.close(); 466 } 467 catch( IOException exception ) 468 { 469 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 470 } 471 } 472 473 @Override 474 public void writeDOTNested( String filename, ElementGraph graph ) 475 { 476 ElementGraphs.printProcessGraph( filename, graph, this ); 477 } 478 479 public boolean containsEdge( Process sourceVertex, Process targetVertex ) 480 { 481 return graph.containsEdge( sourceVertex, targetVertex ); 482 } 483 484 public boolean removeAllEdges( Collection<? extends ProcessEdge> edges ) 485 { 486 return graph.removeAllEdges( edges ); 487 } 488 489 public Set<ProcessEdge> removeAllEdges( Process sourceVertex, Process targetVertex ) 490 { 491 return graph.removeAllEdges( sourceVertex, targetVertex ); 492 } 493 494 public boolean removeAllVertices( Collection<? extends Process> vertices ) 495 { 496 return graph.removeAllVertices( vertices ); 497 } 498 499 public Set<ProcessEdge> getAllEdges( Process sourceVertex, Process targetVertex ) 500 { 501 return graph.getAllEdges( sourceVertex, targetVertex ); 502 } 503 504 public ProcessEdge getEdge( Process sourceVertex, Process targetVertex ) 505 { 506 return graph.getEdge( sourceVertex, targetVertex ); 507 } 508 509 public ProcessEdge addEdge( Process sourceVertex, Process targetVertex ) 510 { 511 return graph.addEdge( sourceVertex, targetVertex ); 512 } 513 514 public boolean addEdge( Process sourceVertex, Process targetVertex, ProcessEdge processEdge ) 515 { 516 return graph.addEdge( sourceVertex, targetVertex, processEdge ); 517 } 518 519 public Process getEdgeSource( ProcessEdge processEdge ) 520 { 521 return graph.getEdgeSource( processEdge ); 522 } 523 524 public Process getEdgeTarget( ProcessEdge processEdge ) 525 { 526 return graph.getEdgeTarget( processEdge ); 527 } 528 529 public boolean containsEdge( ProcessEdge processEdge ) 530 { 531 return graph.containsEdge( processEdge ); 532 } 533 534 public boolean containsVertex( Process process ) 535 { 536 return graph.containsVertex( process ); 537 } 538 539 public Set<ProcessEdge> edgeSet() 540 { 541 return graph.edgeSet(); 542 } 543 544 public Set<ProcessEdge> edgesOf( Process vertex ) 545 { 546 return graph.edgesOf( vertex ); 547 } 548 549 public int inDegreeOf( Process vertex ) 550 { 551 return graph.inDegreeOf( vertex ); 552 } 553 554 public Set<ProcessEdge> incomingEdgesOf( Process vertex ) 555 { 556 return graph.incomingEdgesOf( vertex ); 557 } 558 559 public int outDegreeOf( Process vertex ) 560 { 561 return graph.outDegreeOf( vertex ); 562 } 563 564 public Set<ProcessEdge> outgoingEdgesOf( Process vertex ) 565 { 566 return graph.outgoingEdgesOf( vertex ); 567 } 568 569 public ProcessEdge removeEdge( Process sourceVertex, Process targetVertex ) 570 { 571 return graph.removeEdge( sourceVertex, targetVertex ); 572 } 573 574 public boolean removeEdge( ProcessEdge processEdge ) 575 { 576 return graph.removeEdge( processEdge ); 577 } 578 579 public boolean removeVertex( Process process ) 580 { 581 return graph.removeVertex( process ); 582 } 583 584 public Set<Process> vertexSet() 585 { 586 return graph.vertexSet(); 587 } 588 589 public double getEdgeWeight( ProcessEdge processEdge ) 590 { 591 return graph.getEdgeWeight( processEdge ); 592 } 593 }