001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.process; 022 023import java.io.FileWriter; 024import java.io.IOException; 025import java.io.Writer; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.Comparator; 029import java.util.HashMap; 030import java.util.HashSet; 031import java.util.Iterator; 032import java.util.List; 033import java.util.Map; 034import java.util.PriorityQueue; 035import java.util.Set; 036 037import cascading.flow.FlowElement; 038import cascading.flow.planner.Scope; 039import cascading.flow.planner.graph.AnnotatedGraph; 040import cascading.flow.planner.graph.ElementGraph; 041import cascading.flow.planner.graph.ElementGraphs; 042import cascading.flow.planner.graph.Extent; 043import cascading.pipe.Group; 044import cascading.tap.Tap; 045import cascading.util.EnumMultiMap; 046import cascading.util.Util; 047import cascading.util.jgrapht.IntegerNameProvider; 048import cascading.util.jgrapht.VertexNameProvider; 049import org.jgrapht.graph.SimpleDirectedGraph; 050import org.jgrapht.traverse.TopologicalOrderIterator; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054import static cascading.util.Util.createIdentitySet; 055 056/** 057 * 058 */ 059public abstract class BaseProcessGraph<Process extends ProcessModel> implements ProcessGraph<Process> 060 { 061 /** Field LOG */ 062 private static final Logger LOG = LoggerFactory.getLogger( BaseProcessGraph.class ); 063 064 final SimpleDirectedGraph<Process, ProcessEdge> graph; 065 066 protected Set<FlowElement> sourceElements = createIdentitySet(); 067 protected Set<FlowElement> sinkElements = createIdentitySet(); 068 private Set<Tap> sourceTaps; 069 private Set<Tap> sinkTaps; 070 protected Map<String, Tap> trapsMap = new HashMap<>(); 071 072 public BaseProcessGraph() 073 { 074 graph = new SimpleDirectedGraph( ProcessEdge.class ); 075 } 076 077 @Override 078 public boolean addVertex( Process process ) 079 { 080 sourceElements.addAll( process.getSourceElements() ); 081 sinkElements.addAll( process.getSinkElements() ); 082 trapsMap.putAll( process.getTrapMap() ); 083 084 return graph.addVertex( process ); 085 } 086 087 protected void bindEdges() 088 { 089 for( Process sinkProcess : vertexSet() ) 090 { 091 for( Process sourceProcess : vertexSet() ) 092 { 093 if( sourceProcess == sinkProcess ) 094 continue; 095 096 // outer edge sources and sinks to this graph 097 sourceElements.removeAll( sinkProcess.getSinkElements() ); 098 sinkElements.removeAll( sourceProcess.getSourceElements() ); 099 } 100 } 101 102 for( Process sinkProcess : vertexSet() ) 103 { 104 for( Process sourceProcess : vertexSet() ) 105 { 106 if( sourceProcess == sinkProcess ) 107 continue; 108 109 for( Object intermediate : sourceProcess.getSinkElements() ) 110 { 111 if( sinkProcess.getSourceElements().contains( intermediate ) ) 112 addEdge( sourceProcess, sinkProcess, new ProcessEdge<>( sourceProcess, (FlowElement) intermediate, sinkProcess ) ); 113 } 114 } 115 } 116 } 117 118 @Override 119 public Set<FlowElement> getSourceElements() 120 { 121 return sourceElements; 122 } 123 124 @Override 125 public Set<FlowElement> getSinkElements() 126 { 127 return sinkElements; 128 } 129 130 @Override 131 public Set<Tap> getSourceTaps() 132 { 133 if( sourceTaps != null ) 134 return sourceTaps; 135 136 sourceTaps = Util.narrowSet( Tap.class, getSourceElements() ); 137 138 return sourceTaps; 139 } 140 141 @Override 142 public Set<Tap> getSinkTaps() 143 { 144 if( sinkTaps != null ) 145 return sinkTaps; 146 147 sinkTaps = Util.narrowSet( Tap.class, getSinkElements() ); 148 149 return sinkTaps; 150 } 151 152 @Override 153 public Map<String, Tap> getTrapsMap() 154 { 155 return trapsMap; 156 } 157 158 @Override 159 public Iterator<Process> getTopologicalIterator() 160 { 161 return getOrderedTopologicalIterator( new Comparator<Process>() 162 { 163 @Override 164 public int compare( Process lhs, Process rhs ) 165 { 166 return Integer.valueOf( lhs.getSubmitPriority() ).compareTo( rhs.getSubmitPriority() ); 167 } 168 } ); 169 } 170 171 @Override 172 public Iterator<Process> getOrdinalTopologicalIterator() 173 { 174 return getOrderedTopologicalIterator( new Comparator<Process>() 175 { 176 @Override 177 public int compare( Process lhs, Process rhs ) 178 { 179 return Integer.valueOf( lhs.getOrdinal() ).compareTo( rhs.getOrdinal() ); 180 } 181 } ); 182 } 183 184 @Override 185 public Iterator<Process> getOrderedTopologicalIterator( Comparator<Process> comparator ) 186 { 187 return new TopologicalOrderIterator<>( graph, new PriorityQueue<>( 10, comparator ) ); 188 } 189 190 @Override 191 public List<ElementGraph> getElementGraphs( FlowElement flowElement ) 192 { 193 List<Process> elementProcesses = getElementProcesses( flowElement ); 194 195 List<ElementGraph> elementGraphs = new ArrayList<>(); 196 197 for( Process elementProcess : elementProcesses ) 198 elementGraphs.add( elementProcess.getElementGraph() ); 199 200 return elementGraphs; 201 } 202 203 @Override 204 public List<Process> getElementProcesses( FlowElement flowElement ) 205 { 206 List<Process> processes = new ArrayList<>(); 207 208 for( Process process : vertexSet() ) 209 { 210 if( process.getElementGraph().vertexSet().contains( flowElement ) ) 211 processes.add( process ); 212 } 213 214 return processes; 215 } 216 217 @Override 218 public List<ElementGraph> getElementGraphs( Scope scope ) 219 { 220 List<Process> elementProcesses = getElementProcesses( scope ); 221 222 List<ElementGraph> elementGraphs = new ArrayList<>(); 223 224 for( Process elementProcess : elementProcesses ) 225 elementGraphs.add( elementProcess.getElementGraph() ); 226 227 return elementGraphs; 228 } 229 230 @Override 231 public List<Process> getElementProcesses( Scope scope ) 232 { 233 List<Process> processes = new ArrayList<>(); 234 235 for( Process process : vertexSet() ) 236 { 237 if( process.getElementGraph().edgeSet().contains( scope ) ) 238 processes.add( process ); 239 } 240 241 return processes; 242 } 243 244 @Override 245 public List<Process> getElementSourceProcesses( FlowElement flowElement ) 246 { 247 List<Process> sources = new ArrayList<>(); 248 249 for( Process process : vertexSet() ) 250 { 251 if( process.getSinkElements().contains( flowElement ) ) 252 sources.add( process ); 253 } 254 255 return sources; 256 } 257 258 @Override 259 public List<Process> getElementSinkProcesses( FlowElement flowElement ) 260 { 261 List<Process> sinks = new ArrayList<>(); 262 263 for( Process process : vertexSet() ) 264 { 265 if( process.getSourceElements().contains( flowElement ) ) 266 sinks.add( process ); 267 } 268 269 return sinks; 270 } 271 272 @Override 273 public Set<FlowElement> getAllSourceElements() 274 { 275 Set<FlowElement> results = createIdentitySet(); 276 277 for( Process process : vertexSet() ) 278 results.addAll( process.getSourceElements() ); 279 280 return results; 281 } 282 283 @Override 284 public Set<FlowElement> getAllSinkElements() 285 { 286 Set<FlowElement> results = createIdentitySet(); 287 288 for( Process process : vertexSet() ) 289 results.addAll( process.getSinkElements() ); 290 291 return results; 292 } 293 294 public EnumMultiMap<FlowElement> getAnnotations() 295 { 296 EnumMultiMap<FlowElement> annotations = new EnumMultiMap<>(); 297 298 for( Process process : vertexSet() ) 299 { 300 ElementGraph elementGraph = process.getElementGraph(); 301 302 if( elementGraph instanceof AnnotatedGraph ) 303 annotations.addAll( ( (AnnotatedGraph) elementGraph ).getAnnotations() ); 304 } 305 306 return annotations; 307 } 308 309 /** 310 * All elements, from the given ElementGraph, that belong to two or more processes, that are not sink or source elements that 311 * connect processes. 312 * 313 * @return Set 314 */ 315 @Override 316 public Set<FlowElement> getDuplicatedElements( ElementGraph elementGraph ) 317 { 318 Set<FlowElement> results = createIdentitySet(); 319 320 for( FlowElement flowElement : elementGraph.vertexSet() ) 321 { 322 if( getElementProcesses( flowElement ).size() > 1 ) 323 results.add( flowElement ); 324 } 325 326 results.remove( Extent.head ); 327 results.remove( Extent.tail ); 328 results.removeAll( getAllSourceElements() ); 329 results.removeAll( getAllSinkElements() ); 330 331 return results; 332 } 333 334 @Override 335 public Set<ElementGraph> getIdentityElementGraphs() 336 { 337 Set<ElementGraph> results = createIdentitySet(); 338 339 for( Process process : getIdentityProcesses() ) 340 results.add( process.getElementGraph() ); 341 342 return results; 343 } 344 345 /** 346 * Returns a set of processes that perform no internal operations. 347 * <p/> 348 * for example if a FlowNode only has a Merge source and a GroupBy sink. 349 * 350 * @return 351 */ 352 @Override 353 public Set<Process> getIdentityProcesses() 354 { 355 Set<Process> results = new HashSet<>(); 356 357 for( Process process : vertexSet() ) 358 { 359 if( ProcessModels.isIdentity( process ) ) 360 results.add( process ); 361 } 362 363 return results; 364 } 365 366 /** 367 * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging. 368 * 369 * @param filename of type String 370 */ 371 @Override 372 public void writeDOT( String filename ) 373 { 374 printProcessGraph( filename ); 375 } 376 377 protected void printProcessGraph( String filename ) 378 { 379 try 380 { 381 Writer writer = new FileWriter( filename ); 382 383 Util.writeDOT( writer, graph, new IntegerNameProvider<Process>(), new VertexNameProvider<Process>() 384 { 385 public String getVertexName( Process process ) 386 { 387 String name = "[" + process.getName() + "]"; 388 389 String sourceName = ""; 390 Set<Tap> sources = process.getSourceTaps(); 391 for( Tap source : sources ) 392 sourceName += "\\nsrc:[" + source.getIdentifier() + "]"; 393 394 if( sourceName.length() != 0 ) 395 name += sourceName; 396 397 Collection<Group> groups = process.getGroups(); 398 399 for( Group group : groups ) 400 { 401 String groupName = group.getName(); 402 403 if( groupName.length() != 0 ) 404 name += "\\ngrp:" + groupName; 405 } 406 407 Set<Tap> sinks = process.getSinkTaps(); 408 String sinkName = ""; 409 for( Tap sink : sinks ) 410 sinkName = "\\nsnk:[" + sink.getIdentifier() + "]"; 411 412 if( sinkName.length() != 0 ) 413 name += sinkName; 414 415 return name.replaceAll( "\"", "\'" ); 416 } 417 }, null ); 418 419 writer.close(); 420 } 421 catch( IOException exception ) 422 { 423 LOG.error( "failed printing graph to: {}, with exception: {}", filename, exception ); 424 } 425 } 426 427 @Override 428 public void writeDOTNested( String filename, ElementGraph graph ) 429 { 430 ElementGraphs.printProcessGraph( filename, graph, this ); 431 } 432 433 public boolean containsEdge( Process sourceVertex, Process targetVertex ) 434 { 435 return graph.containsEdge( sourceVertex, targetVertex ); 436 } 437 438 public boolean removeAllEdges( Collection<? extends ProcessEdge> edges ) 439 { 440 return graph.removeAllEdges( edges ); 441 } 442 443 public Set<ProcessEdge> removeAllEdges( Process sourceVertex, Process targetVertex ) 444 { 445 return graph.removeAllEdges( sourceVertex, targetVertex ); 446 } 447 448 public boolean removeAllVertices( Collection<? extends Process> vertices ) 449 { 450 return graph.removeAllVertices( vertices ); 451 } 452 453 public Set<ProcessEdge> getAllEdges( Process sourceVertex, Process targetVertex ) 454 { 455 return graph.getAllEdges( sourceVertex, targetVertex ); 456 } 457 458 public ProcessEdge getEdge( Process sourceVertex, Process targetVertex ) 459 { 460 return graph.getEdge( sourceVertex, targetVertex ); 461 } 462 463 public ProcessEdge addEdge( Process sourceVertex, Process targetVertex ) 464 { 465 return graph.addEdge( sourceVertex, targetVertex ); 466 } 467 468 public boolean addEdge( Process sourceVertex, Process targetVertex, ProcessEdge processEdge ) 469 { 470 return graph.addEdge( sourceVertex, targetVertex, processEdge ); 471 } 472 473 public Process getEdgeSource( ProcessEdge processEdge ) 474 { 475 return graph.getEdgeSource( processEdge ); 476 } 477 478 public Process getEdgeTarget( ProcessEdge processEdge ) 479 { 480 return graph.getEdgeTarget( processEdge ); 481 } 482 483 public boolean containsEdge( ProcessEdge processEdge ) 484 { 485 return graph.containsEdge( processEdge ); 486 } 487 488 public boolean containsVertex( Process process ) 489 { 490 return graph.containsVertex( process ); 491 } 492 493 public Set<ProcessEdge> edgeSet() 494 { 495 return graph.edgeSet(); 496 } 497 498 public Set<ProcessEdge> edgesOf( Process vertex ) 499 { 500 return graph.edgesOf( vertex ); 501 } 502 503 public int inDegreeOf( Process vertex ) 504 { 505 return graph.inDegreeOf( vertex ); 506 } 507 508 public Set<ProcessEdge> incomingEdgesOf( Process vertex ) 509 { 510 return graph.incomingEdgesOf( vertex ); 511 } 512 513 public int outDegreeOf( Process vertex ) 514 { 515 return graph.outDegreeOf( vertex ); 516 } 517 518 public Set<ProcessEdge> outgoingEdgesOf( Process vertex ) 519 { 520 return graph.outgoingEdgesOf( vertex ); 521 } 522 523 public ProcessEdge removeEdge( Process sourceVertex, Process targetVertex ) 524 { 525 return graph.removeEdge( sourceVertex, targetVertex ); 526 } 527 528 public boolean removeEdge( ProcessEdge processEdge ) 529 { 530 return graph.removeEdge( processEdge ); 531 } 532 533 public boolean removeVertex( Process process ) 534 { 535 return graph.removeVertex( process ); 536 } 537 538 public Set<Process> vertexSet() 539 { 540 return graph.vertexSet(); 541 } 542 543 public double getEdgeWeight( ProcessEdge processEdge ) 544 { 545 return graph.getEdgeWeight( processEdge ); 546 } 547 }