001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner.graph; 023 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Map; 029import java.util.Set; 030 031import cascading.flow.FlowElement; 032import cascading.flow.FlowElements; 033import cascading.flow.planner.ElementGraphException; 034import cascading.flow.planner.PlatformInfo; 035import cascading.flow.planner.Scope; 036import cascading.pipe.Checkpoint; 037import cascading.pipe.Pipe; 038import cascading.pipe.Splice; 039import cascading.pipe.SubAssembly; 040import cascading.tap.Tap; 041import cascading.util.EnumMultiMap; 042import cascading.util.Util; 043import org.jgrapht.Graphs; 044import org.jgrapht.traverse.DepthFirstIterator; 045import org.jgrapht.traverse.TopologicalOrderIterator; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** Class ElementGraph represents the executable FlowElement graph. */ 050public class FlowElementGraph extends ElementMultiGraph implements AnnotatedGraph 051 { 052 /** Field LOG */ 053 private static final Logger LOG = LoggerFactory.getLogger( FlowElementGraph.class ); 054 055 /** Field resolved */ 056 private boolean resolved; 057 /** Field platformInfo */ 058 protected PlatformInfo platformInfo; 059 /** Field sources */ 060 protected Map<String, Tap> sources; 061 /** Field sinks */ 062 protected Map<String, Tap> sinks; 063 /** Field traps */ 064 protected Map<String, Tap> traps; 065 /** Field checkpoints */ 066 protected Map<String, Tap> checkpoints; 067 /** Field requireUniqueCheckpoints */ 068 private boolean requireUniqueCheckpoints; 069 070 // used for creating isolated test graphs 071 protected FlowElementGraph() 072 { 073 } 074 075 public FlowElementGraph( FlowElementGraph flowElementGraph ) 076 { 077 this(); 078 this.platformInfo = flowElementGraph.platformInfo; 079 this.sources = flowElementGraph.sources; 080 this.sinks = flowElementGraph.sinks; 081 this.traps = flowElementGraph.traps; 082 this.checkpoints = flowElementGraph.checkpoints; 083 this.requireUniqueCheckpoints = flowElementGraph.requireUniqueCheckpoints; 084 085 if( flowElementGraph.annotations != null ) 086 this.annotations = new EnumMultiMap<>( flowElementGraph.annotations ); 087 088 copyFrom( flowElementGraph ); 089 } 090 091 public FlowElementGraph( PlatformInfo platformInfo, ElementGraph elementGraph, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints ) 092 { 093 this(); 094 this.platformInfo = platformInfo; 095 096 if( elementGraph == null ) 097 elementGraph = BaseElementGraph.NULL; 098 099 if( sources == null || sources.isEmpty() ) 100 throw new IllegalArgumentException( "sources may not be null or empty" ); 101 102 if( sinks == null || sinks.isEmpty() ) 103 throw new IllegalArgumentException( "sinks may not be null or empty" ); 104 105 this.sources = new HashMap<>( sources ); 106 this.sinks = new HashMap<>( sinks ); 107 this.traps = new HashMap<>( traps == null ? Collections.<String, Tap>emptyMap() : traps ); 108 this.checkpoints = new HashMap<>( checkpoints == null ? Collections.<String, Tap>emptyMap() : checkpoints ); 109 110 EnumMultiMap<FlowElement> annotations = ElementGraphs.annotations( elementGraph ); 111 112 if( annotations != null ) 113 this.annotations = new EnumMultiMap<>( annotations ); 114 115 // prevents multiple edge from head and to tail extents 116 copyFrom( ElementGraphs.asExtentMaskedSubGraph( elementGraph ) ); 117 118 bindExtents(); 119 } 120 121 public FlowElementGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks ) 122 { 123 this( null, pipes, sources, sinks, Collections.<String, Tap>emptyMap(), Collections.<String, Tap>emptyMap(), false ); 124 } 125 126 /** 127 * Constructor ElementGraph creates a new ElementGraph instance. 128 * 129 * @param pipes of type Pipe[] 130 * @param sources of type Map<String, Tap> 131 * @param sinks of type Map<String, Tap> 132 */ 133 public FlowElementGraph( PlatformInfo platformInfo, Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps, Map<String, Tap> checkpoints, boolean requireUniqueCheckpoints ) 134 { 135 this(); 136 this.platformInfo = platformInfo; 137 this.sources = sources; 138 this.sinks = sinks; 139 this.traps = traps; 140 this.checkpoints = checkpoints; 141 this.requireUniqueCheckpoints = requireUniqueCheckpoints; 142 143 assembleGraph( pipes, sources, sinks ); 144 145 verifyGraph(); 146 } 147 148 public Map<String, Tap> getSourceMap() 149 { 150 return sources; 151 } 152 153 public Map<String, Tap> getSinkMap() 154 { 155 return sinks; 156 } 157 158 public Map<String, Tap> getTrapMap() 159 { 160 return traps; 161 } 162 163 public Map<String, Tap> getCheckpointsMap() 164 { 165 return checkpoints; 166 } 167 168 public Collection<Tap> getSources() 169 { 170 return sources.values(); 171 } 172 173 public Collection<Tap> getSinks() 174 { 175 return sinks.values(); 176 } 177 178 public Collection<Tap> getTraps() 179 { 180 return traps.values(); 181 } 182 183 protected void initialize( Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails ) 184 { 185 this.sources = sources; 186 this.sinks = sinks; 187 this.traps = Util.createHashMap(); 188 189 assembleGraph( tails, sources, sinks ); 190 191 verifyGraph(); 192 } 193 194 private void assembleGraph( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks ) 195 { 196 HashMap<String, Tap> sourcesCopy = new HashMap<String, Tap>( sources ); 197 HashMap<String, Tap> sinksCopy = new HashMap<String, Tap>( sinks ); 198 199 for( Pipe pipe : pipes ) 200 makeGraph( pipe, sourcesCopy, sinksCopy ); 201 202 addExtents( sources, sinks ); 203 } 204 205 private void verifyGraph() 206 { 207 if( vertexSet().isEmpty() ) 208 return; 209 210 Set<String> checkpointNames = new HashSet<String>(); 211 212 // need to verify that only Extent instances are origins in this graph. Otherwise a Tap was not properly connected 213 TopologicalOrderIterator<FlowElement, Scope> iterator = getTopologicalIterator(); 214 215 FlowElement flowElement = null; 216 217 while( iterator.hasNext() ) 218 { 219 try 220 { 221 flowElement = iterator.next(); 222 } 223 catch( IllegalArgumentException exception ) 224 { 225 if( flowElement == null ) 226 throw new ElementGraphException( "unable to traverse to the first element" ); 227 228 throw new ElementGraphException( flowElement, "unable to traverse to the next element after " + flowElement ); 229 } 230 231 if( requireUniqueCheckpoints && flowElement instanceof Checkpoint ) 232 { 233 String name = ( (Checkpoint) flowElement ).getName(); 234 235 if( checkpointNames.contains( name ) ) 236 throw new ElementGraphException( (Pipe) flowElement, "may not have duplicate checkpoint names in assembly, found: " + name ); 237 238 checkpointNames.add( name ); 239 } 240 241 if( incomingEdgesOf( flowElement ).size() != 0 && outgoingEdgesOf( flowElement ).size() != 0 ) 242 continue; 243 244 if( flowElement instanceof Extent ) 245 continue; 246 247 if( flowElement instanceof Pipe ) 248 { 249 if( incomingEdgesOf( flowElement ).size() == 0 ) 250 throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to head Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" ); 251 else 252 throw new ElementGraphException( (Pipe) flowElement, "no Tap connected to tail Pipe: " + flowElement + ", possible ambiguous branching, try explicitly naming tails" ); 253 } 254 255 if( flowElement instanceof Tap ) 256 throw new ElementGraphException( (Tap) flowElement, "no Pipe connected to Tap: " + flowElement ); 257 else 258 throw new ElementGraphException( flowElement, "unknown element type: " + flowElement ); 259 } 260 } 261 262 protected FlowElementGraph shallowCopyElementGraph() 263 { 264 FlowElementGraph copy = new FlowElementGraph(); 265 Graphs.addGraph( copy.graph, this.graph ); 266 267 copy.traps = new HashMap<String, Tap>( this.traps ); 268 269 return copy; 270 } 271 272 public boolean isResolved() 273 { 274 return resolved; 275 } 276 277 public void setResolved( boolean resolved ) 278 { 279 this.resolved = resolved; 280 } 281 282 @Override 283 protected boolean allowMultipleExtentEdges() 284 { 285 return false; 286 } 287 288 /** 289 * created to support the ability to generate all paths between the head and tail of the process. 290 * 291 * @param sources 292 * @param sinks 293 */ 294 private void addExtents( Map<String, Tap> sources, Map<String, Tap> sinks ) 295 { 296 addVertex( Extent.head ); 297 298 for( String source : sources.keySet() ) 299 { 300 Scope scope = addEdge( Extent.head, sources.get( source ) ); 301 302 // edge may already exist, if so, above returns null 303 if( scope != null ) 304 scope.setName( source ); 305 } 306 307 addVertex( Extent.tail ); 308 309 for( String sink : sinks.keySet() ) 310 { 311 Scope scope; 312 313 try 314 { 315 scope = addEdge( sinks.get( sink ), Extent.tail ); 316 } 317 catch( IllegalArgumentException exception ) 318 { 319 throw new ElementGraphException( "missing pipe for sink tap: [" + sink + "]" ); 320 } 321 322 if( scope == null ) 323 throw new ElementGraphException( "cannot sink to the same path from multiple branches: [" + Util.join( sinks.values() ) + "]" ); 324 325 scope.setName( sink ); 326 } 327 } 328 329 /** 330 * Performs one rule check, verifies group does not join duplicate tap resources. 331 * <p/> 332 * Scopes are always named after the source side of the source -> target relationship 333 */ 334 private void makeGraph( Pipe current, Map<String, Tap> sources, Map<String, Tap> sinks ) 335 { 336 LOG.debug( "adding pipe: {}", current ); 337 338 if( current instanceof SubAssembly ) 339 { 340 for( Pipe pipe : SubAssembly.unwind( current.getPrevious() ) ) 341 makeGraph( pipe, sources, sinks ); 342 343 return; 344 } 345 346 if( containsVertex( current ) ) 347 return; 348 349 addVertex( current ); 350 351 Tap sink = sinks.remove( current.getName() ); 352 353 if( sink != null ) 354 { 355 LOG.debug( "adding sink: {}", sink ); 356 357 addVertex( sink ); 358 359 LOG.debug( "adding edge: {} -> {}", current, sink ); 360 361 addEdge( current, sink ).setName( current.getName() ); 362 } 363 364 // PipeAssemblies should always have a previous 365 if( SubAssembly.unwind( current.getPrevious() ).length == 0 ) 366 { 367 Tap source = sources.remove( current.getName() ); 368 369 if( source != null ) 370 { 371 LOG.debug( "adding source: {}", source ); 372 373 addVertex( source ); 374 375 LOG.debug( "adding edge: {} -> {}", source, current ); 376 377 Scope scope = addEdge( source, current ); 378 379 scope.setName( current.getName() ); 380 381 setOrdinal( source, current, scope ); 382 } 383 } 384 385 for( Pipe previous : SubAssembly.unwind( current.getPrevious() ) ) 386 { 387 makeGraph( previous, sources, sinks ); 388 389 LOG.debug( "adding edge: {} -> ", previous, current ); 390 391 if( getEdge( previous, current ) != null ) 392 throw new ElementGraphException( previous, "cannot distinguish pipe branches, give pipe unique name: " + previous ); 393 394 Scope scope = addEdge( previous, current ); 395 396 scope.setName( previous.getName() ); // name scope after previous pipe 397 398 setOrdinal( previous, current, scope ); 399 } 400 } 401 402 private void setOrdinal( FlowElement previous, Pipe current, Scope scope ) 403 { 404 if( current instanceof Splice ) 405 { 406 Splice splice = (Splice) current; 407 408 Integer ordinal; 409 410 if( previous instanceof Tap ) // revert to pipe name 411 ordinal = splice.getPipePos().get( scope.getName() ); 412 else // GroupBy allows for duplicate pipe names, this guarantees correct ordinality 413 ordinal = FlowElements.findOrdinal( splice, (Pipe) previous ); 414 415 scope.setOrdinal( ordinal ); 416 417 Set<Scope> scopes = new HashSet<>( incomingEdgesOf( current ) ); 418 419 scopes.remove( scope ); 420 421 for( Scope other : scopes ) 422 { 423 if( other.getOrdinal() == scope.getOrdinal() ) 424 throw new IllegalStateException( "duplicate ordinals" ); 425 } 426 427 if( splice.isJoin() && ordinal != 0 ) 428 scope.setNonBlocking( false ); 429 } 430 } 431 432 /** 433 * Method getTopologicalIterator returns the topologicalIterator of this ElementGraph object. 434 * 435 * @return the topologicalIterator (type TopologicalOrderIterator<FlowElement, Scope>) of this ElementGraph object. 436 */ 437 public TopologicalOrderIterator<FlowElement, Scope> getTopologicalIterator() 438 { 439 return new TopologicalOrderIterator<>( this.graph ); 440 } 441 442 /** 443 * Method getDepthFirstIterator returns the depthFirstIterator of this ElementGraph object. 444 * 445 * @return the depthFirstIterator (type DepthFirstIterator<FlowElement, Scope>) of this ElementGraph object. 446 */ 447 public DepthFirstIterator<FlowElement, Scope> getDepthFirstIterator() 448 { 449 return new DepthFirstIterator<>( this.graph, Extent.head ); 450 } 451 452 private BaseElementGraph copyWithTraps() 453 { 454 FlowElementGraph copy = shallowCopyElementGraph(); 455 456 copy.addTrapsToGraph(); 457 458 return copy; 459 } 460 461 private void addTrapsToGraph() 462 { 463 DepthFirstIterator<FlowElement, Scope> iterator = getDepthFirstIterator(); 464 465 while( iterator.hasNext() ) 466 { 467 FlowElement element = iterator.next(); 468 469 if( !( element instanceof Pipe ) ) 470 continue; 471 472 Pipe pipe = (Pipe) element; 473 Tap trap = traps.get( pipe.getName() ); 474 475 if( trap == null ) 476 continue; 477 478 addVertex( trap ); 479 480 if( LOG.isDebugEnabled() ) 481 LOG.debug( "adding trap edge: " + pipe + " -> " + trap ); 482 483 if( getEdge( pipe, trap ) != null ) 484 continue; 485 486 addEdge( pipe, trap ).setName( pipe.getName() ); // name scope after previous pipe 487 } 488 } 489 490 /** 491 * Method writeDOT writes this element graph to a DOT file for easy visualization and debugging. 492 * 493 * @param filename of type String 494 */ 495 @Override 496 public void writeDOT( String filename ) 497 { 498 boolean success = ElementGraphs.printElementGraph( filename, this.copyWithTraps(), platformInfo ); 499 500 if( success ) 501 Util.writePDF( filename ); 502 } 503 504 @Override 505 public ElementGraph copyElementGraph() 506 { 507 return new FlowElementGraph( this ); 508 } 509 }