001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.util; 022 023import java.io.PrintWriter; 024import java.io.Writer; 025import java.util.Arrays; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032 033import cascading.flow.FlowElement; 034import cascading.flow.planner.Scope; 035import cascading.flow.planner.graph.ElementGraph; 036import cascading.flow.planner.graph.Extent; 037import cascading.flow.planner.process.ProcessGraph; 038import cascading.flow.planner.process.ProcessModel; 039import cascading.flow.planner.process.ProcessModels; 040import cascading.tap.Tap; 041import cascading.util.jgrapht.ComponentAttributeProvider; 042import cascading.util.jgrapht.EdgeNameProvider; 043import cascading.util.jgrapht.VertexNameProvider; 044 045/** 046 * This class is a derivative of the JGraphT DOTExporter, with numerous enhancements but with 047 * retained compatibility. 048 */ 049public class DOTProcessGraphWriter 050 { 051 public static final String INDENT = " "; 052 public static final String CONNECTOR = " -> "; 053 054 private VertexNameProvider<Pair<ElementGraph, FlowElement>> vertexIDProvider; 055 private VertexNameProvider<FlowElement> vertexLabelProvider; 056 private EdgeNameProvider<Scope> edgeLabelProvider; 057 private ComponentAttributeProvider<FlowElement> vertexAttributeProvider; 058 private ComponentAttributeProvider<Scope> edgeAttributeProvider; 059 private VertexNameProvider<ProcessModel> clusterIDProvider; 060 private VertexNameProvider<ProcessModel> clusterLabelProvider; 061 062 public DOTProcessGraphWriter( VertexNameProvider<Pair<ElementGraph, FlowElement>> vertexIDProvider, VertexNameProvider<FlowElement> vertexLabelProvider, 063 EdgeNameProvider<Scope> edgeLabelProvider, 064 ComponentAttributeProvider<FlowElement> vertexAttributeProvider, ComponentAttributeProvider<Scope> edgeAttributeProvider, 065 VertexNameProvider<ProcessModel> clusterIDProvider, VertexNameProvider<ProcessModel> clusterLabelProvider ) 066 { 067 this.vertexIDProvider = vertexIDProvider; 068 this.vertexLabelProvider = vertexLabelProvider; 069 this.edgeLabelProvider = edgeLabelProvider; 070 this.vertexAttributeProvider = vertexAttributeProvider; 071 this.edgeAttributeProvider = edgeAttributeProvider; 072 this.clusterIDProvider = clusterIDProvider; 073 this.clusterLabelProvider = clusterLabelProvider; 074 } 075 076 public void writeGraph( Writer writer, ElementGraph parentGraph, ProcessGraph<? extends ProcessModel> processGraph ) 077 { 078 PrintWriter out = new PrintWriter( writer ); 079 080 out.println( "digraph G {" ); 081 082 Set<FlowElement> spanElements = getSpanElements( processGraph ); 083 Set<FlowElement> identityElements = getIdentityElements( processGraph ); 084 Set<FlowElement> duplicatedElements = processGraph.getDuplicatedElements( parentGraph ); 085 086 writeVertexSet( null, parentGraph, parentGraph, out, spanElements, true, duplicatedElements, identityElements ); 087 writeEdgeSet( processGraph, parentGraph, parentGraph, out, spanElements, true, identityElements ); 088 089 Iterator<? extends ProcessModel> topologicalIterator = processGraph.getOrdinalTopologicalIterator(); 090 091 while( topologicalIterator.hasNext() ) 092 { 093 ProcessModel processModel = topologicalIterator.next(); 094 095 out.println(); 096 out.print( "subgraph cluster_" ); 097 out.print( clusterIDProvider.getVertexName( processModel ) ); 098 out.println( " {" ); 099 100 out.print( INDENT ); 101 out.print( "label = \"" ); 102 out.print( clusterLabelProvider.getVertexName( processModel ) ); 103 out.println( "\";" ); 104 out.println(); 105 106 writeVertexSet( processModel, parentGraph, processModel.getElementGraph(), out, spanElements, false, duplicatedElements, identityElements ); 107 writeEdgeSet( processGraph, parentGraph, processModel.getElementGraph(), out, spanElements, false, identityElements ); 108 109 out.println( "}" ); 110 } 111 112 out.println( "}" ); 113 114 out.flush(); 115 } 116 117 protected Set<FlowElement> getIdentityElements( ProcessGraph<? extends ProcessModel> processGraph ) 118 { 119 Set<FlowElement> candidates = new HashSet<>(); 120 121 // force identity nodes to be visualized 122 for( ElementGraph elementGraph : processGraph.getIdentityElementGraphs() ) 123 { 124 if( !Util.contains( Tap.class, elementGraph.vertexSet() ) ) 125 candidates.addAll( elementGraph.vertexSet() ); 126 } 127 128 candidates.remove( Extent.head ); 129 candidates.remove( Extent.tail ); 130 131 // many elements span, but are included into multiple nodes 132 // the first to include the element owns it, making other node representations mis-representative 133 // for now, lets force shared elements to span, but this hides the identity nodes outright 134 Set<FlowElement> identityElements = new HashSet<>(); 135 136 for( FlowElement identityElement : candidates ) 137 { 138 if( processGraph.getElementProcesses( identityElement ).size() == 1 ) 139 identityElements.add( identityElement ); 140 } 141 142 return identityElements; 143 } 144 145 protected Set<FlowElement> getSpanElements( ProcessGraph<? extends ProcessModel> processGraph ) 146 { 147 Set<FlowElement> spanElements = new HashSet<>(); 148 149 spanElements.add( Extent.head ); 150 spanElements.add( Extent.tail ); 151 spanElements.addAll( processGraph.getAllSourceElements() ); 152 spanElements.addAll( processGraph.getAllSinkElements() ); 153 154 // forces tap to be within a node 155 spanElements.removeAll( processGraph.getSourceTaps() ); 156 spanElements.removeAll( processGraph.getSinkTaps() ); 157 158 return spanElements; 159 } 160 161 /** 162 * if renderSpans == true, write edges if either side crosses a node boundary. 163 * if renderSpans == false, only write edged that are contained in a node 164 */ 165 protected void writeEdgeSet( ProcessGraph<? extends ProcessModel> processGraph, ElementGraph parentGraph, ElementGraph currentGraph, PrintWriter out, Set<FlowElement> spansClusters, boolean renderSpans, Set<FlowElement> identityElements ) 166 { 167 out.println(); 168 169 for( Scope scope : currentGraph.edgeSet() ) 170 { 171 FlowElement edgeSource = currentGraph.getEdgeSource( scope ); 172 FlowElement edgeTarget = currentGraph.getEdgeTarget( scope ); 173 174 boolean sourceSpans = spansClusters.contains( edgeSource ); 175 boolean targetSpans = spansClusters.contains( edgeTarget ); 176 boolean spans = sourceSpans || targetSpans; 177 178 boolean sourceIdentity = identityElements.contains( edgeSource ); 179 boolean targetIdentity = identityElements.contains( edgeTarget ); 180 181 if( sourceIdentity && targetIdentity ) 182 spans = false; 183 184 if( spans != renderSpans ) 185 continue; 186 187 List<ElementGraph> sourceGraphs = Arrays.asList( currentGraph ); 188 List<ElementGraph> targetGraphs = Arrays.asList( currentGraph ); 189 190 if( sourceIdentity && targetIdentity ) 191 { 192 sourceGraphs = Arrays.asList( parentGraph ); 193 targetGraphs = Arrays.asList( parentGraph ); 194 } 195 else if( sourceSpans && targetSpans ) 196 { 197 sourceGraphs = Arrays.asList( currentGraph ); 198 targetGraphs = Arrays.asList( currentGraph ); 199 } 200 else if( sourceSpans ) 201 { 202 sourceGraphs = Arrays.asList( parentGraph ); 203 targetGraphs = processGraph.getElementGraphs( edgeTarget ); 204 } 205 else if( targetSpans ) 206 { 207 sourceGraphs = processGraph.getElementGraphs( edgeSource ); 208 targetGraphs = Arrays.asList( parentGraph ); 209 } 210 211 for( ElementGraph sourceGraph : sourceGraphs ) 212 { 213 for( ElementGraph targetGraph : targetGraphs ) 214 writeEdge( out, scope, edgeSource, edgeTarget, sourceGraph, targetGraph ); 215 } 216 } 217 } 218 219 private void writeEdge( PrintWriter out, Scope scope, FlowElement edgeSource, FlowElement edgeTarget, ElementGraph sourceGraph, ElementGraph targetGraph ) 220 { 221 String source = getVertexID( sourceGraph, edgeSource ); 222 String target = getVertexID( targetGraph, edgeTarget ); 223 224 out.print( INDENT + source + CONNECTOR + target ); 225 226 String labelName = null; 227 228 if( edgeLabelProvider != null ) 229 labelName = edgeLabelProvider.getEdgeName( scope ); 230 231 Map<String, String> attributes = null; 232 233 if( edgeAttributeProvider != null ) 234 attributes = edgeAttributeProvider.getComponentAttributes( scope ); 235 236 renderAttributes( out, labelName, attributes ); 237 238 out.println( ";" ); 239 } 240 241 protected void writeVertexSet( ProcessModel processModel, ElementGraph parentGraph, ElementGraph currentGraph, PrintWriter out, Set<FlowElement> spansClusters, boolean onlySpans, Set<FlowElement> duplicatedElements, Set<FlowElement> identityElements ) 242 { 243 boolean isIdentityGraph = false; 244 245 if( processModel != null ) 246 isIdentityGraph = ProcessModels.isIdentity( processModel, Tap.class ); 247 248 for( FlowElement flowElement : currentGraph.vertexSet() ) 249 { 250 boolean spans = spansClusters.contains( flowElement ); 251 boolean isIdentity = identityElements.contains( flowElement ); 252 253 if( isIdentity && isIdentityGraph ) 254 continue; 255 256 if( isIdentity ) 257 spans = false; 258 259 if( spans != onlySpans ) 260 continue; 261 262 out.print( INDENT + getVertexID( isIdentity ? parentGraph : currentGraph, flowElement ) ); 263 264 String labelName = null; 265 266 if( vertexLabelProvider != null ) 267 labelName = vertexLabelProvider.getVertexName( flowElement ); 268 269 Map<String, String> attributes = new HashMap<>(); 270 271 if( duplicatedElements.contains( flowElement ) ) 272 attributes.put( "color", getHSBColorFor( flowElement ) ); 273 274 if( vertexAttributeProvider != null ) 275 attributes.putAll( vertexAttributeProvider.getComponentAttributes( flowElement ) ); 276 277 renderAttributes( out, labelName, attributes ); 278 279 out.println( ";" ); 280 } 281 } 282 283 private void renderAttributes( PrintWriter out, String labelName, Map<String, String> attributes ) 284 { 285 if( labelName == null && attributes == null ) 286 return; 287 288 out.print( " [ " ); 289 290 if( labelName == null ) 291 labelName = attributes.get( "label" ); 292 293 if( labelName != null ) 294 out.print( "label=\"" + labelName + "\" " ); 295 296 if( attributes != null ) 297 { 298 for( Map.Entry<String, String> entry : attributes.entrySet() ) 299 { 300 String name = entry.getKey(); 301 302 if( name.equals( "label" ) ) 303 continue; 304 305 out.print( name + "=\"" + entry.getValue() + "\" " ); 306 } 307 } 308 309 out.print( "]" ); 310 } 311 312 private String getVertexID( ElementGraph elementGraph, FlowElement flowElement ) 313 { 314 return vertexIDProvider.getVertexName( new Pair<>( elementGraph, flowElement ) ); 315 } 316 317 Map<FlowElement, String> colors = new HashMap<>(); 318 float hue = 0.3f; 319 320 // a rudimentary attempt to progress the colors so they can be differentiated 321 private String getHSBColorFor( FlowElement flowElement ) 322 { 323 if( colors.containsKey( flowElement ) ) 324 return colors.get( flowElement ); 325 326 String result = String.format( "%f,%f,%f", 1.0 - hue % 1.0, 1.0, 0.9 ); 327 328 colors.put( flowElement, result ); 329 330 hue += 0.075 + ( 0.025 * Math.floor( hue ) ); 331 332 return result; 333 } 334 }