001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.graph; 022 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.IdentityHashMap; 027import java.util.Iterator; 028import java.util.LinkedList; 029import java.util.List; 030import java.util.ListIterator; 031import java.util.Map; 032import java.util.Set; 033 034import cascading.flow.stream.duct.CloseReducingDuct; 035import cascading.flow.stream.duct.Collapsing; 036import cascading.flow.stream.duct.Duct; 037import cascading.flow.stream.duct.DuctGraph; 038import cascading.flow.stream.duct.Fork; 039import cascading.flow.stream.duct.Gate; 040import cascading.flow.stream.duct.OpenDuct; 041import cascading.flow.stream.duct.OpenReducingDuct; 042import cascading.flow.stream.duct.OpenWindow; 043import cascading.flow.stream.duct.Reducing; 044import cascading.flow.stream.duct.Stage; 045import cascading.flow.stream.duct.Window; 046import cascading.util.Util; 047import org.jgrapht.DirectedGraph; 048import org.jgrapht.GraphPath; 049import org.jgrapht.Graphs; 050import org.jgrapht.alg.KShortestPaths; 051import org.jgrapht.traverse.TopologicalOrderIterator; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Class StreamGraph is the operation pipeline used during processing. This an internal use only class. 057 * <p/> 058 * Under some circumstances it may make sense to see the actual graph plan. To do so, enable one or both dot file 059 * properties, {@link #ERROR_DOT_FILE_NAME} and {@link #DOT_FILE_PATH}. 060 */ 061public class StreamGraph 062 { 063 /** Property denoting the path and filename to write the failed stream graph dot file. */ 064 public final static String ERROR_DOT_FILE_NAME = "cascading.stream.error.dotfile"; 065 066 /** 067 * Property denoting the path to write all stream graph dot files. The filename will be generated 068 * based on platform properties. 069 */ 070 public final static String DOT_FILE_PATH = "cascading.stream.dotfile.path"; 071 072 private static final Logger LOG = LoggerFactory.getLogger( StreamGraph.class ); 073 074 private final Duct HEAD = new Extent( "head" ); 075 private final Duct TAIL = new Extent( "tail" ); 076 077 private final DuctGraph ductGraph = new DuctGraph(); 078 079 private class Extent extends Stage 080 { 081 final String name; 082 083 private Extent( String name ) 084 { 085 this.name = name; 086 } 087 088 @Override 089 public String toString() 090 { 091 return name; 092 } 093 } 094 095 public StreamGraph() 096 { 097 } 098 099 protected Object getProperty( String name ) 100 { 101 return null; 102 } 103 104 Duct getHEAD() 105 { 106 return HEAD; 107 } 108 109 Duct getTAIL() 110 { 111 return TAIL; 112 } 113 114 public void addHead( Duct head ) 115 { 116 addPath( getHEAD(), head ); 117 } 118 119 public void addTail( Duct tail ) 120 { 121 addPath( tail, getTAIL() ); 122 } 123 124 public void addPath( Duct lhs, Duct rhs ) 125 { 126 addPath( lhs, 0, rhs ); 127 } 128 129 public void addPath( Duct lhs, int ordinal, Duct rhs ) 130 { 131 if( lhs == null && rhs == null ) 132 throw new IllegalArgumentException( "both lhs and rhs may not be null" ); 133 134 if( lhs == getTAIL() ) 135 throw new IllegalStateException( "lhs may not be a TAIL" ); 136 137 if( rhs == getHEAD() ) 138 throw new IllegalStateException( "rhs may not be a HEAD" ); 139 140 if( lhs == null ) 141 lhs = getHEAD(); 142 143 if( rhs == null ) 144 rhs = getTAIL(); 145 146 try 147 { 148 ductGraph.addVertex( lhs ); 149 ductGraph.addVertex( rhs ); 150 ductGraph.addEdge( lhs, rhs, ductGraph.makeOrdinal( ordinal ) ); 151 } 152 catch( RuntimeException exception ) 153 { 154 LOG.error( "unable to add path", exception ); 155 printGraphError(); 156 throw exception; 157 } 158 } 159 160 public void bind() 161 { 162 Iterator<Duct> iterator = getTopologicalOrderIterator(); 163 164 // build the actual processing graph 165 while( iterator.hasNext() ) 166 iterator.next().bind( this ); 167 168 iterator = getReversedTopologicalOrderIterator(); 169 170 // initialize all the ducts 171 while( iterator.hasNext() ) 172 iterator.next().initialize(); 173 } 174 175 /** Calls prepare starting at the tail and working backwards */ 176 public void prepare() 177 { 178 TopologicalOrderIterator<Duct, Integer> iterator = getReversedTopologicalOrderIterator(); 179 180 while( iterator.hasNext() ) 181 iterator.next().prepare(); 182 } 183 184 /** Calls cleanup starting at the head and working forwards */ 185 public void cleanup() 186 { 187 TopologicalOrderIterator<Duct, Integer> iterator = getTopologicalOrderIterator(); 188 189 while( iterator.hasNext() ) 190 iterator.next().cleanup(); 191 } 192 193 public Collection<Duct> getHeads() 194 { 195 return Graphs.successorListOf( ductGraph, getHEAD() ); 196 } 197 198 public Collection<Duct> getTails() 199 { 200 return Graphs.predecessorListOf( ductGraph, getTAIL() ); 201 } 202 203 public Duct[] findAllNextFor( Duct current ) 204 { 205 LinkedList<Duct> successors = new LinkedList<Duct>( Graphs.successorListOf( ductGraph, current ) ); 206 ListIterator<Duct> iterator = successors.listIterator(); 207 208 while( iterator.hasNext() ) 209 { 210 Duct successor = iterator.next(); 211 212 if( successor == getHEAD() ) 213 throw new IllegalStateException( "HEAD may not be next" ); 214 215 if( successor == getTAIL() ) // tail is not included, its just a marker 216 iterator.remove(); 217 } 218 219 return successors.toArray( new Duct[ successors.size() ] ); 220 } 221 222 public Duct[] findAllPreviousFor( Duct current ) 223 { 224 LinkedList<Duct> predecessors = new LinkedList<Duct>( Graphs.predecessorListOf( ductGraph, current ) ); 225 ListIterator<Duct> iterator = predecessors.listIterator(); 226 227 while( iterator.hasNext() ) 228 { 229 Duct successor = iterator.next(); 230 231 if( successor == getTAIL() ) 232 throw new IllegalStateException( "TAIL may not be successor" ); 233 234 if( successor == getHEAD() ) // head is not included, its just a marker 235 iterator.remove(); 236 } 237 238 return predecessors.toArray( new Duct[ predecessors.size() ] ); 239 } 240 241 public Map<Duct, Integer> getOrdinalMap( Duct current ) 242 { 243 Duct[] allPrevious = findAllPreviousFor( current ); 244 245 Map<Duct, Integer> results = new IdentityHashMap<>(); 246 247 for( Duct previous : allPrevious ) 248 results.put( previous, ductGraph.getEdge( previous, current ).getOrdinal() ); 249 250 return results; 251 } 252 253 public Duct createNextFor( Duct current ) 254 { 255 if( current == getHEAD() || current == getTAIL() ) 256 return null; 257 258 Set<DuctGraph.Ordinal> edges = ductGraph.outgoingEdgesOf( current ); 259 260 if( edges.size() == 0 ) 261 throw new IllegalStateException( "ducts must have an outgoing edge, current: " + current ); 262 263 Duct next = ductGraph.getEdgeTarget( edges.iterator().next() ); 264 265 if( current instanceof Gate ) 266 { 267 if( !( current instanceof Window ) ) // just collapse - Merge 268 { 269 if( edges.size() > 1 ) 270 return createFork( findAllNextFor( current ) ); 271 272 return next; 273 } 274 275 if( next instanceof OpenWindow ) 276 return next; 277 278 if( edges.size() > 1 ) 279 return createOpenWindow( createFork( findAllNextFor( current ) ) ); 280 281 if( next instanceof Reducing ) 282 return createOpenReducingWindow( next ); 283 284 return createOpenWindow( next ); 285 } 286 287 if( current instanceof Reducing ) 288 { 289 if( next instanceof Reducing ) 290 return next; 291 292 if( edges.size() > 1 ) 293 return createCloseWindow( createFork( findAllNextFor( current ) ) ); 294 295 return createCloseWindow( next ); 296 } 297 298 if( edges.size() > 1 ) 299 return createFork( findAllNextFor( current ) ); 300 301 if( next == getTAIL() ) // tail is not included, its just a marker 302 throw new IllegalStateException( "tail ducts should not bind to next" ); 303 304 return next; 305 } 306 307 private Duct createCloseWindow( Duct next ) 308 { 309 return new CloseReducingDuct( next ); 310 } 311 312 protected Duct createOpenWindow( Duct next ) 313 { 314 return new OpenDuct( next ); 315 } 316 317 protected Duct createOpenReducingWindow( Duct next ) 318 { 319 return new OpenReducingDuct( next ); 320 } 321 322 protected Duct createFork( Duct[] allNext ) 323 { 324 return new Fork( allNext ); 325 } 326 327 /** 328 * Returns all free paths to the current duct, usually a GroupGate. 329 * <p/> 330 * Paths all unique paths are counted, minus any immediate prior GroupGates as they 331 * block incoming paths into a single path 332 * 333 * @param duct of type Duct 334 * @return an int 335 */ 336 public int countAllEventingPathsTo( Duct duct ) 337 { 338 // find all immediate prior groups/ collapsed 339 LinkedList<List<Duct>> allPaths = asPathList( allPathsBetweenInclusive( getHEAD(), duct ) ); 340 341 Set<Duct> nearestCollapsed = new HashSet<Duct>(); 342 343 for( List<Duct> path : allPaths ) 344 { 345 Collections.reverse( path ); 346 347 path.remove( 0 ); // remove the duct param 348 349 for( Duct element : path ) 350 { 351 if( !( element instanceof Collapsing ) ) 352 continue; 353 354 nearestCollapsed.add( element ); 355 break; 356 } 357 } 358 359 // find all paths 360 // remove all paths containing prior groups 361 LinkedList<List<Duct>> collapsedPaths = new LinkedList<List<Duct>>( allPaths ); 362 ListIterator<List<Duct>> iterator = collapsedPaths.listIterator(); 363 while( iterator.hasNext() ) 364 { 365 List<Duct> path = iterator.next(); 366 367 if( Collections.disjoint( path, nearestCollapsed ) ) 368 iterator.remove(); 369 } 370 371 int collapsedPathsCount = 0; 372 for( Duct collapsed : nearestCollapsed ) 373 { 374 LinkedList<List<Duct>> subPaths = asPathList( allPathsBetweenInclusive( collapsed, duct ) ); 375 for( List<Duct> subPath : subPaths ) 376 { 377 subPath.remove( 0 ); // remove collapsed duct 378 if( Collections.disjoint( subPath, nearestCollapsed ) ) 379 collapsedPathsCount += 1; 380 } 381 } 382 383 int nonCollapsedPathsCount = allPaths.size() - collapsedPaths.size(); 384 385 // incoming == paths + prior 386 return nonCollapsedPathsCount + collapsedPathsCount; 387 } 388 389 public int ordinalBetween( Duct lhs, Duct rhs ) 390 { 391 return ductGraph.getEdge( lhs, rhs ).getOrdinal(); 392 } 393 394 private List<GraphPath<Duct, DuctGraph.Ordinal>> allPathsBetweenInclusive( Duct from, Duct to ) 395 { 396 return new KShortestPaths<>( ductGraph, from, Integer.MAX_VALUE ).getPaths( to ); 397 } 398 399 public static LinkedList<List<Duct>> asPathList( List<GraphPath<Duct, DuctGraph.Ordinal>> paths ) 400 { 401 LinkedList<List<Duct>> results = new LinkedList<List<Duct>>(); 402 403 if( paths == null ) 404 return results; 405 406 for( GraphPath<Duct, DuctGraph.Ordinal> path : paths ) 407 results.add( Graphs.getPathVertexList( path ) ); 408 409 return results; 410 } 411 412 public TopologicalOrderIterator<Duct, Integer> getTopologicalOrderIterator() 413 { 414 try 415 { 416 return new TopologicalOrderIterator( ductGraph ); 417 } 418 catch( RuntimeException exception ) 419 { 420 LOG.error( "failed creating topological iterator", exception ); 421 printGraphError(); 422 423 throw exception; 424 } 425 } 426 427 public TopologicalOrderIterator<Duct, Integer> getReversedTopologicalOrderIterator() 428 { 429 try 430 { 431 return new TopologicalOrderIterator( getReversedGraph() ); 432 } 433 catch( RuntimeException exception ) 434 { 435 LOG.error( "failed creating reversed topological iterator", exception ); 436 printGraphError(); 437 438 throw exception; 439 } 440 } 441 442 public DirectedGraph getReversedGraph() 443 { 444 DuctGraph reversedGraph = new DuctGraph(); 445 446 Graphs.addGraphReversed( reversedGraph, ductGraph ); 447 448 return reversedGraph; 449 } 450 451 public Collection<Duct> getAllDucts() 452 { 453 return ductGraph.vertexSet(); 454 } 455 456 public void printGraphError() 457 { 458 String filename = (String) getProperty( ERROR_DOT_FILE_NAME ); 459 460 if( filename == null ) 461 return; 462 463 printGraph( filename ); 464 } 465 466 public void printGraph( String id, String classifier, int discriminator ) 467 { 468 String path = (String) getProperty( DOT_FILE_PATH ); 469 470 if( path == null ) 471 return; 472 473 classifier = Util.cleansePathName( classifier ); 474 475 path = String.format( "%s/streamgraph-%s-%s-%s.dot", path, id, classifier, discriminator ); 476 477 printGraph( path ); 478 } 479 480 public void printGraph( String filename ) 481 { 482 LOG.info( "writing stream graph to {}", filename ); 483 Util.printGraph( filename, ductGraph ); 484 } 485 }