001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.stream; 022 023 import java.util.Collection; 024 import java.util.Collections; 025 import java.util.HashSet; 026 import java.util.Iterator; 027 import java.util.LinkedList; 028 import java.util.List; 029 import java.util.ListIterator; 030 import java.util.Set; 031 032 import cascading.util.Util; 033 import org.jgrapht.DirectedGraph; 034 import org.jgrapht.GraphPath; 035 import org.jgrapht.Graphs; 036 import org.jgrapht.alg.KShortestPaths; 037 import org.jgrapht.traverse.TopologicalOrderIterator; 038 import org.slf4j.Logger; 039 import org.slf4j.LoggerFactory; 040 041 /** 042 * Class StreamGraph is the operation pipeline used during processing. This an internal use only class. 043 * <p/> 044 * Under some circumstances it may make sense to see the actual graph plan. To do so, enable one or both dot file 045 * properties, {@link #ERROR_DOT_FILE_NAME} and {@link #DOT_FILE_PATH}. 046 */ 047 public class StreamGraph 048 { 049 /** Property denoting the path and filename to write the failed stream graph dot file. */ 050 public final static String ERROR_DOT_FILE_NAME = "cascading.stream.error.dotfile"; 051 052 /** 053 * Property denoting the path to write all stream graph dot files. The filename will be generated 054 * based on platform properties. 055 */ 056 public final static String DOT_FILE_PATH = "cascading.stream.dotfile.path"; 057 058 private static final Logger LOG = LoggerFactory.getLogger( StreamGraph.class ); 059 060 private final Duct HEAD = new Extent( "head" ); 061 private final Duct TAIL = new Extent( "tail" ); 062 063 private final DuctGraph graph = new DuctGraph(); 064 065 private class Extent extends Stage 066 { 067 final String name; 068 069 private Extent( String name ) 070 { 071 this.name = name; 072 } 073 074 @Override 075 public String toString() 076 { 077 return name; 078 } 079 } 080 081 public StreamGraph() 082 { 083 } 084 085 protected Object getProperty( String name ) 086 { 087 return null; 088 } 089 090 Duct getHEAD() 091 { 092 return HEAD; 093 } 094 095 Duct getTAIL() 096 { 097 return TAIL; 098 } 099 100 public void addHead( Duct head ) 101 { 102 addPath( getHEAD(), head ); 103 } 104 105 public void addTail( Duct tail ) 106 { 107 addPath( tail, getTAIL() ); 108 } 109 110 public void addPath( Duct lhs, Duct rhs ) 111 { 112 addPath( lhs, 0, rhs ); 113 } 114 115 public void addPath( Duct lhs, int ordinal, Duct rhs ) 116 { 117 if( lhs == null && rhs == null ) 118 throw new IllegalArgumentException( "both lhs and rhs may not be null" ); 119 120 if( lhs == getTAIL() ) 121 throw new IllegalStateException( "lhs may not be a TAIL" ); 122 123 if( rhs == getHEAD() ) 124 throw new IllegalStateException( "rhs may not be a HEAD" ); 125 126 if( lhs == null ) 127 lhs = getHEAD(); 128 129 if( rhs == null ) 130 rhs = getTAIL(); 131 132 try 133 { 134 graph.addVertex( lhs ); 135 graph.addVertex( rhs ); 136 graph.addEdge( lhs, rhs, graph.makeOrdinal( ordinal ) ); 137 } 138 catch( RuntimeException exception ) 139 { 140 LOG.error( "unable to add path", exception ); 141 printGraphError(); 142 throw exception; 143 } 144 } 145 146 public void bind() 147 { 148 Iterator<Duct> iterator = getTopologicalOrderIterator(); 149 150 // build the actual processing graph 151 while( iterator.hasNext() ) 152 iterator.next().bind( this ); 153 154 iterator = getReversedTopologicalOrderIterator(); 155 156 // initialize all the ducts 157 while( iterator.hasNext() ) 158 iterator.next().initialize(); 159 } 160 161 /** Calls prepare starting at the tail and working backwards */ 162 public void prepare() 163 { 164 TopologicalOrderIterator<Duct, Integer> iterator = getReversedTopologicalOrderIterator(); 165 166 while( iterator.hasNext() ) 167 iterator.next().prepare(); 168 } 169 170 /** Calls cleanup starting at the head and working forwards */ 171 public void cleanup() 172 { 173 TopologicalOrderIterator<Duct, Integer> iterator = getTopologicalOrderIterator(); 174 175 while( iterator.hasNext() ) 176 iterator.next().cleanup(); 177 } 178 179 public Collection<Duct> getHeads() 180 { 181 return Graphs.successorListOf( graph, getHEAD() ); 182 } 183 184 public Collection<Duct> getTails() 185 { 186 return Graphs.predecessorListOf( graph, getTAIL() ); 187 } 188 189 public Duct[] findAllNextFor( Duct current ) 190 { 191 LinkedList<Duct> successors = new LinkedList<Duct>( Graphs.successorListOf( graph, current ) ); 192 ListIterator<Duct> iterator = successors.listIterator(); 193 194 while( iterator.hasNext() ) 195 { 196 Duct successor = iterator.next(); 197 198 if( successor == getHEAD() ) 199 throw new IllegalStateException( "HEAD may not be next" ); 200 201 if( successor == getTAIL() ) // tail is not included, its just a marker 202 iterator.remove(); 203 } 204 205 return successors.toArray( new Duct[ successors.size() ] ); 206 } 207 208 public Duct[] findAllPreviousFor( Duct current ) 209 { 210 LinkedList<Duct> predecessors = new LinkedList<Duct>( Graphs.predecessorListOf( graph, current ) ); 211 ListIterator<Duct> iterator = predecessors.listIterator(); 212 213 while( iterator.hasNext() ) 214 { 215 Duct successor = iterator.next(); 216 217 if( successor == getTAIL() ) 218 throw new IllegalStateException( "TAIL may not be successor" ); 219 220 if( successor == getHEAD() ) // head is not included, its just a marker 221 iterator.remove(); 222 } 223 224 return predecessors.toArray( new Duct[ predecessors.size() ] ); 225 } 226 227 public Duct createNextFor( Duct current ) 228 { 229 if( current == getHEAD() || current == getTAIL() ) 230 return null; 231 232 Set<DuctGraph.Ordinal> edges = graph.outgoingEdgesOf( current ); 233 234 if( edges.size() == 0 ) 235 throw new IllegalStateException( "ducts must have an outgoing edge, current: " + current ); 236 237 Duct next = graph.getEdgeTarget( edges.iterator().next() ); 238 239 if( current instanceof Gate ) 240 { 241 if( next instanceof OpenWindow ) 242 return next; 243 244 if( edges.size() > 1 ) 245 return createOpenWindow( createFork( findAllNextFor( current ) ) ); 246 247 if( next instanceof Reducing ) 248 return createOpenReducingWindow( next ); 249 250 return createOpenWindow( next ); 251 } 252 253 if( current instanceof Reducing ) 254 { 255 if( next instanceof Reducing ) 256 return next; 257 258 if( edges.size() > 1 ) 259 return createCloseWindow( createFork( findAllNextFor( current ) ) ); 260 261 return createCloseWindow( next ); 262 } 263 264 if( edges.size() > 1 ) 265 return createFork( findAllNextFor( current ) ); 266 267 if( next == getTAIL() ) // tail is not included, its just a marker 268 throw new IllegalStateException( "tail ducts should not bind to next" ); 269 270 return next; 271 } 272 273 private Duct createCloseWindow( Duct next ) 274 { 275 return new CloseReducingDuct( next ); 276 } 277 278 protected Duct createOpenWindow( Duct next ) 279 { 280 return new OpenDuct( next ); 281 } 282 283 protected Duct createOpenReducingWindow( Duct next ) 284 { 285 return new OpenReducingDuct( next ); 286 } 287 288 protected Duct createFork( Duct[] allNext ) 289 { 290 return new Fork( allNext ); 291 } 292 293 /** 294 * Returns all free paths to the current duct, usually a GroupGate. 295 * <p/> 296 * Paths all unique paths are counted, minus any immediate prior GroupGates as they 297 * block incoming paths into a single path 298 * 299 * @param duct of type Duct 300 * @return an int 301 */ 302 public int countAllEventingPathsTo( Duct duct ) 303 { 304 // find all immediate prior groups/ collapsed 305 LinkedList<List<Duct>> allPaths = asPathList( allPathsBetweenInclusive( getHEAD(), duct ) ); 306 307 Set<Duct> nearestCollapsed = new HashSet<Duct>(); 308 309 for( List<Duct> path : allPaths ) 310 { 311 Collections.reverse( path ); 312 313 path.remove( 0 ); // remove the duct param 314 315 for( Duct element : path ) 316 { 317 if( !( element instanceof Collapsing ) ) 318 continue; 319 320 nearestCollapsed.add( element ); 321 break; 322 } 323 } 324 325 // find all paths 326 // remove all paths containing prior groups 327 LinkedList<List<Duct>> collapsedPaths = new LinkedList<List<Duct>>( allPaths ); 328 ListIterator<List<Duct>> iterator = collapsedPaths.listIterator(); 329 while( iterator.hasNext() ) 330 { 331 List<Duct> path = iterator.next(); 332 333 if( Collections.disjoint( path, nearestCollapsed ) ) 334 iterator.remove(); 335 } 336 337 int collapsedPathsCount = 0; 338 for( Duct collapsed : nearestCollapsed ) 339 { 340 LinkedList<List<Duct>> subPaths = asPathList( allPathsBetweenInclusive( collapsed, duct ) ); 341 for( List<Duct> subPath : subPaths ) 342 { 343 subPath.remove( 0 ); // remove collapsed duct 344 if( Collections.disjoint( subPath, nearestCollapsed ) ) 345 collapsedPathsCount += 1; 346 } 347 } 348 349 int nonCollapsedPathsCount = allPaths.size() - collapsedPaths.size(); 350 351 // incoming == paths + prior 352 return nonCollapsedPathsCount + collapsedPathsCount; 353 } 354 355 public int ordinalBetween( Duct lhs, Duct rhs ) 356 { 357 return graph.getEdge( lhs, rhs ).ordinal; 358 } 359 360 private List<GraphPath<Duct, DuctGraph.Ordinal>> allPathsBetweenInclusive( Duct from, Duct to ) 361 { 362 return new KShortestPaths<Duct, DuctGraph.Ordinal>( graph, from, Integer.MAX_VALUE ).getPaths( to ); 363 } 364 365 public static LinkedList<List<Duct>> asPathList( List<GraphPath<Duct, DuctGraph.Ordinal>> paths ) 366 { 367 LinkedList<List<Duct>> results = new LinkedList<List<Duct>>(); 368 369 if( paths == null ) 370 return results; 371 372 for( GraphPath<Duct, DuctGraph.Ordinal> path : paths ) 373 results.add( Graphs.getPathVertexList( path ) ); 374 375 return results; 376 } 377 378 public TopologicalOrderIterator<Duct, Integer> getTopologicalOrderIterator() 379 { 380 try 381 { 382 return new TopologicalOrderIterator( graph ); 383 } 384 catch( RuntimeException exception ) 385 { 386 LOG.error( "failed creating topological iterator", exception ); 387 printGraphError(); 388 389 throw exception; 390 } 391 } 392 393 public TopologicalOrderIterator<Duct, Integer> getReversedTopologicalOrderIterator() 394 { 395 try 396 { 397 return new TopologicalOrderIterator( getReversedGraph() ); 398 } 399 catch( RuntimeException exception ) 400 { 401 LOG.error( "failed creating reversed topological iterator", exception ); 402 printGraphError(); 403 404 throw exception; 405 } 406 } 407 408 public DirectedGraph getReversedGraph() 409 { 410 DuctGraph reversedGraph = new DuctGraph(); 411 412 Graphs.addGraphReversed( reversedGraph, graph ); 413 414 return reversedGraph; 415 } 416 417 public Collection<Duct> getAllDucts() 418 { 419 return graph.vertexSet(); 420 } 421 422 public void printGraphError() 423 { 424 String filename = (String) getProperty( ERROR_DOT_FILE_NAME ); 425 426 if( filename == null ) 427 return; 428 429 printGraph( filename ); 430 } 431 432 public void printGraph( String id, String classifier, int discriminator ) 433 { 434 String path = (String) getProperty( DOT_FILE_PATH ); 435 436 if( path == null ) 437 return; 438 439 path = String.format( "%s/streamgraph-%s-%s-%s.dot", path, id, classifier, discriminator ); 440 441 printGraph( path ); 442 } 443 444 public void printGraph( String filename ) 445 { 446 LOG.info( "writing stream graph to {}", filename ); 447 Util.printGraph( filename, graph ); 448 } 449 }