001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.rule.util;
022
023import java.io.File;
024import java.io.IOException;
025import java.io.PrintWriter;
026import java.nio.file.FileSystems;
027import java.nio.file.Path;
028import java.util.Collections;
029import java.util.EnumMap;
030import java.util.Iterator;
031import java.util.LinkedHashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035
036import cascading.flow.Flow;
037import cascading.flow.FlowNode;
038import cascading.flow.FlowStep;
039import cascading.flow.Flows;
040import cascading.flow.planner.FlowPlanner;
041import cascading.flow.planner.PlannerContext;
042import cascading.flow.planner.graph.ElementGraph;
043import cascading.flow.planner.graph.FlowElementGraph;
044import cascading.flow.planner.iso.GraphResult;
045import cascading.flow.planner.iso.assertion.Asserted;
046import cascading.flow.planner.iso.subgraph.Partitions;
047import cascading.flow.planner.iso.transformer.Transformed;
048import cascading.flow.planner.process.FlowNodeGraph;
049import cascading.flow.planner.process.FlowStepGraph;
050import cascading.flow.planner.rule.PlanPhase;
051import cascading.flow.planner.rule.ProcessLevel;
052import cascading.flow.planner.rule.Rule;
053import cascading.flow.planner.rule.RuleResult;
054import cascading.property.AppProps;
055import cascading.util.ProcessLogger;
056import cascading.util.Util;
057import cascading.util.Version;
058
059import static cascading.flow.planner.graph.ElementGraphs.canonicalHash;
060import static cascading.property.PropertyUtil.getStringProperty;
061
062public class TraceWriter
063  {
064  public static final String GREEN = "0000000000000000000400000000000000000000000000000000000000000000";
065  public static final String ORANGE = "0000000000000000000E00000000000000000000000000000000000000000000";
066  public static final String RED = "0000000000000000000C00000000000000000000000000000000000000000000";
067  private String flowName;
068  private Map properties = Collections.emptyMap();
069  private ProcessLogger processLogger = ProcessLogger.NULL;
070
071  Map<ProcessLevel, Set<Rule>> counts = new EnumMap<>( ProcessLevel.class );
072
073  public TraceWriter()
074    {
075    }
076
077  public TraceWriter( Flow flow )
078    {
079    if( flow == null )
080      return;
081
082    this.properties = flow.getConfigAsProperties();
083    this.flowName = Flows.getNameOrID( flow );
084    this.processLogger = (ProcessLogger) flow;
085    }
086
087  protected Path getFullTransformTracePath( String registryName )
088    {
089    Path planTransformTracePath = getPlanTransformTracePath();
090
091    if( planTransformTracePath == null )
092      return null;
093
094    return planTransformTracePath.resolve( registryName );
095    }
096
097  public boolean isTransformTraceEnabled()
098    {
099    return !isTransformTraceDisabled();
100    }
101
102  public boolean isTransformTraceDisabled()
103    {
104    return getPlanTransformTracePath() == null;
105    }
106
107  public void writeTransformPlan( String registryName, PlanPhase phase, Rule rule, int[] ordinals, GraphResult graphResult )
108    {
109    if( isTransformTraceDisabled() )
110      return;
111
112    String ruleName = String.format( "%02d-%s-%04d", phase.ordinal(), phase, addRule( rule ) );
113
114    for( int i = 1; i < ordinals.length; i++ )
115      ruleName = String.format( "%s-%04d", ruleName, ordinals[ i ] );
116
117    ruleName = String.format( "%s-%s", ruleName, graphResult.getRuleName() );
118
119    Path path = getFullTransformTracePath( registryName ).resolve( ruleName );
120
121    graphResult.writeDOTs( path.toString() );
122
123    markResult( graphResult, path );
124    }
125
126  public void writeTransformPlan( String registryName, FlowElementGraph flowElementGraph, String name )
127    {
128    if( isTransformTraceDisabled() )
129      return;
130
131    if( flowElementGraph == null )
132      {
133      processLogger.logInfo( "cannot write phase assembly trace, flowElementGraph is null" );
134      return;
135      }
136
137    Path file = getFullTransformTracePath( registryName ).resolve( name ).normalize();
138
139    processLogger.logInfo( "writing phase assembly trace: {}, to: {}", name, file );
140
141    flowElementGraph.writeDOT( file.toString() );
142    }
143
144  public void writeTransformPlan( String registryName, List<? extends ElementGraph> flowElementGraphs, PlanPhase phase, String subName )
145    {
146    if( isTransformTraceDisabled() )
147      return;
148
149    if( flowElementGraphs == null || flowElementGraphs.isEmpty() )
150      {
151      processLogger.logInfo( "cannot write phase step trace, flowElementGraphs is empty" );
152      return;
153      }
154
155    for( int i = 0; i < flowElementGraphs.size(); i++ )
156      {
157      ElementGraph flowElementGraph = flowElementGraphs.get( i );
158      String name = String.format( "%02d-%s-%s-%04d.dot", phase.ordinal(), phase, subName, i );
159
160      Path file = getFullTransformTracePath( registryName ).resolve( name ).normalize();
161
162      processLogger.logInfo( "writing phase step trace: {}, to: {}", name, file );
163
164      flowElementGraph.writeDOT( file.toString() );
165      }
166    }
167
168  public void writeTransformPlan( String registryName, Map<ElementGraph, List<? extends ElementGraph>> parentGraphsMap, Map<ElementGraph, List<? extends ElementGraph>> subGraphsMap, PlanPhase phase, String subName )
169    {
170    if( isTransformTraceDisabled() )
171      return;
172
173    if( parentGraphsMap == null || parentGraphsMap.isEmpty() )
174      {
175      processLogger.logInfo( "cannot write phase node pipeline trace, parentGraphsMap is empty" );
176      return;
177      }
178
179    int stepCount = 0;
180    for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : parentGraphsMap.entrySet() )
181      {
182      int nodeCount = 0;
183      for( ElementGraph elementGraph : entry.getValue() )
184        {
185        List<? extends ElementGraph> pipelineGraphs = subGraphsMap.get( elementGraph );
186
187        if( pipelineGraphs == null )
188          continue;
189
190        for( int i = 0; i < pipelineGraphs.size(); i++ )
191          {
192          ElementGraph flowElementGraph = pipelineGraphs.get( i );
193          String name = String.format( "%02d-%s-%s-%04d-%04d-%04d.dot", phase.ordinal(), phase, subName, stepCount, nodeCount, i );
194
195          Path file = getFullTransformTracePath( registryName ).resolve( name );
196
197          processLogger.logInfo( "writing phase node pipeline trace: {}, to: {}", name, file );
198
199          flowElementGraph.writeDOT( file.toString() );
200          }
201
202        nodeCount++;
203        }
204
205      stepCount++;
206      }
207    }
208
209  public void writeTransformPlan( String registryName, Map<ElementGraph, List<? extends ElementGraph>> subGraphsMap, PlanPhase phase, String subName )
210    {
211    if( isTransformTraceDisabled() )
212      return;
213
214    if( subGraphsMap == null || subGraphsMap.isEmpty() )
215      {
216      processLogger.logInfo( "cannot write phase node trace, subGraphs is empty" );
217      return;
218      }
219
220    int stepCount = 0;
221    for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : subGraphsMap.entrySet() )
222      {
223      List<? extends ElementGraph> flowElementGraphs = entry.getValue();
224
225      for( int i = 0; i < flowElementGraphs.size(); i++ )
226        {
227        ElementGraph flowElementGraph = flowElementGraphs.get( i );
228        String name = String.format( "%02d-%s-%s-%04d-%04d.dot", phase.ordinal(), phase, subName, stepCount, i );
229
230        Path file = getFullTransformTracePath( registryName ).resolve( name );
231
232        processLogger.logInfo( "writing phase node trace: {}, to: {}", name, file );
233
234        flowElementGraph.writeDOT( file.toString() );
235        }
236
237      stepCount++;
238      }
239    }
240
241  protected Path getPlanTracePath()
242    {
243    return applyScope( getStringProperty( System.getProperties(), properties, FlowPlanner.TRACE_PLAN_PATH ) );
244    }
245
246  protected Path getPlanTransformTracePath()
247    {
248    return applyScope( getStringProperty( System.getProperties(), properties, FlowPlanner.TRACE_PLAN_TRANSFORM_PATH ) );
249    }
250
251  protected Path getPlanStatsPath()
252    {
253    return applyScope( getStringProperty( System.getProperties(), properties, FlowPlanner.TRACE_STATS_PATH ) );
254    }
255
256  private Path applyScope( String path )
257    {
258    if( path == null )
259      return null;
260
261    return FileSystems.getDefault().getPath( path, flowName );
262    }
263
264  public void writeTracePlan( String registryName, String fileName, ElementGraph elementGraph )
265    {
266    Path path = getPlanTracePath();
267
268    if( path == null )
269      return;
270
271    if( elementGraph == null )
272      {
273      processLogger.logInfo( "cannot write trace element plan, elementGraph is null" );
274      return;
275      }
276
277    if( registryName != null )
278      path = path.resolve( registryName );
279
280    Path filePath = path.resolve( String.format( "%s-%s.dot", fileName, canonicalHash( elementGraph ) ) );
281    File file = filePath.toFile();
282
283    processLogger.logInfo( "writing trace element plan: {}", file );
284
285    String filename = file.toString();
286
287    elementGraph.writeDOT( filename );
288    }
289
290  public void writeTracePlan( String registryName, String fileName, FlowStepGraph stepGraph )
291    {
292    Path path = getPlanTracePath();
293
294    if( path == null )
295      return;
296
297    if( stepGraph == null )
298      {
299      processLogger.logInfo( "cannot write step plan, stepGraph is null" );
300      return;
301      }
302
303    if( registryName != null )
304      path = path.resolve( registryName );
305
306    Path filePath = path.resolve( String.format( "%s.dot", fileName ) );
307    File file = filePath.toFile();
308
309    processLogger.logInfo( "writing trace step plan: {}", file );
310
311    stepGraph.writeDOT( file.toString() );
312    }
313
314  public void writeTracePlanSteps( String directoryName, FlowStepGraph stepGraph )
315    {
316    if( stepGraph == null )
317      {
318      processLogger.logInfo( "cannot write trace step plan, stepGraph is null" );
319      return;
320      }
321
322    Iterator<FlowStep> iterator = stepGraph.getTopologicalIterator();
323
324    while( iterator.hasNext() )
325      writePlan( iterator.next(), directoryName );
326    }
327
328  private void writePlan( FlowStep flowStep, String directoryName )
329    {
330    Path path = getPlanTracePath();
331
332    if( path == null )
333      return;
334
335    int stepOrdinal = flowStep.getOrdinal();
336    Path rootPath = path.resolve( directoryName );
337    ElementGraph stepSubGraph = flowStep.getElementGraph();
338    String stepGraphName = String.format( "%s/%04d-step-sub-graph-%s.dot", rootPath, stepOrdinal, canonicalHash( stepSubGraph ) );
339
340    stepSubGraph.writeDOT( stepGraphName );
341
342    FlowNodeGraph flowNodeGraph = flowStep.getFlowNodeGraph();
343
344    String stepNodeElementGraphName = String.format( "%s/%04d-step-node-sub-graph.dot", rootPath, stepOrdinal );
345
346    flowNodeGraph.writeDOTNested( stepNodeElementGraphName, stepSubGraph );
347
348    String stepNodeGraphName = String.format( "%s/%04d-step-node-graph.dot", rootPath, stepOrdinal );
349
350    flowNodeGraph.writeDOT( stepNodeGraphName );
351
352    Iterator<FlowNode> iterator = flowNodeGraph.getOrderedTopologicalIterator();
353
354    while( iterator.hasNext() )
355      {
356      FlowNode flowNode = iterator.next();
357      ElementGraph nodeGraph = flowNode.getElementGraph();
358      int nodeOrdinal = flowNode.getOrdinal();
359      String nodeGraphName = String.format( "%s/%04d-%04d-step-node-sub-graph-%s.dot", rootPath, stepOrdinal, nodeOrdinal, canonicalHash( nodeGraph ) );
360
361      nodeGraph.writeDOT( nodeGraphName );
362
363      List<? extends ElementGraph> pipelineGraphs = flowNode.getPipelineGraphs();
364
365      for( int j = 0; j < pipelineGraphs.size(); j++ )
366        {
367        ElementGraph pipelineGraph = pipelineGraphs.get( j );
368
369        String pipelineGraphName = String.format( "%s/%04d-%04d-%04d-step-node-pipeline-sub-graph.dot", rootPath, stepOrdinal, nodeOrdinal, j );
370
371        pipelineGraph.writeDOT( pipelineGraphName );
372        }
373      }
374    }
375
376  public void writeFinal( String fileName, RuleResult ruleResult )
377    {
378    Path path = getPlanTracePath();
379
380    if( path == null )
381      return;
382
383    Path filePath = path.resolve( String.format( "%s-%s.txt", fileName, ruleResult.getRegistry().getName() ) );
384    File file = filePath.toFile();
385
386    processLogger.logInfo( "writing final registry: {}", file );
387
388    try (PrintWriter writer = new PrintWriter( file ))
389      {
390      writer.println( "filename names winning rule registry" );
391      }
392    catch( IOException exception )
393      {
394      processLogger.logError( "could not write final registry", exception );
395      }
396    }
397
398  public void writeStats( PlannerContext plannerContext, RuleResult ruleResult )
399    {
400    Path path = getPlanStatsPath();
401
402    if( path == null )
403      return;
404
405    File file = path.resolve( String.format( "planner-stats-%s-%s.txt", ruleResult.getRegistry().getName(), ruleResult.getResultStatus() ) ).toFile();
406
407    processLogger.logInfo( "writing planner stats to: {}", file );
408
409    file.getParentFile().mkdirs();
410
411    try (PrintWriter writer = new PrintWriter( file ))
412      {
413      Flow flow = plannerContext.getFlow();
414
415      Map<Object, Object> configAsProperties = flow.getConfigAsProperties();
416
417      writer.format( "cascading version: %s, build: %s\n", emptyOrValue( Version.getReleaseFull() ), emptyOrValue( Version.getReleaseBuild() ) );
418      writer.format( "application id: %s\n", emptyOrValue( AppProps.getApplicationID( configAsProperties ) ) );
419      writer.format( "application name: %s\n", emptyOrValue( AppProps.getApplicationName( configAsProperties ) ) );
420      writer.format( "application version: %s\n", emptyOrValue( AppProps.getApplicationVersion( configAsProperties ) ) );
421      writer.format( "platform: %s\n", emptyOrValue( flow.getPlatformInfo() ) );
422      writer.format( "frameworks: %s\n", emptyOrValue( AppProps.getApplicationFrameworks( configAsProperties ) ) );
423
424      writer.println();
425
426      ruleResult.writeStats( writer );
427      }
428    catch( IOException exception )
429      {
430      processLogger.logError( "could not write stats", exception );
431      }
432    }
433
434  private static String emptyOrValue( Object value )
435    {
436    if( value == null )
437      return "";
438
439    if( Util.isEmpty( value.toString() ) )
440      return "";
441
442    return value.toString();
443    }
444
445  private void markResult( GraphResult graphResult, Path path )
446    {
447    if( graphResult instanceof Transformed )
448      markTransformed( (Transformed) graphResult, path );
449    else if( graphResult instanceof Asserted )
450      markAsserted( (Asserted) graphResult, path );
451    else if( graphResult instanceof Partitions )
452      markPartitioned( (Partitions) graphResult, path );
453    }
454
455  private void markPartitioned( Partitions partition, Path path )
456    {
457    String color = null;
458
459    if( partition.hasContractedMatches() )
460      color = ORANGE;
461
462    if( partition.hasSubGraphs() )
463      color = GREEN;
464
465    markFolder( path, color );
466    }
467
468  private void markAsserted( Asserted asserted, Path path )
469    {
470    if( asserted.getFirstAnchor() != null )
471      markFolder( path, RED );
472    }
473
474  private void markTransformed( Transformed transformed, Path path )
475    {
476    if( transformed.getEndGraph() != null && !transformed.getBeginGraph().equals( transformed.getEndGraph() ) )
477      markFolder( path, GREEN );
478    }
479
480  private void markFolder( Path path, String color )
481    {
482    if( !Util.IS_OSX )
483      return;
484
485    if( color == null )
486      return;
487
488    // xattr -wx com.apple.FinderInfo 0000000000000000000400000000000000000000000000000000000000000000 child-0-ContractedGraphTransformer
489
490    File file = path.toFile();
491
492    File parentFile = file.getParentFile();
493    String name = file.getName();
494
495    String[] command = {
496      "xattr",
497      "-wx",
498      "com.apple.FinderInfo",
499      color,
500      name
501    };
502
503    Util.execProcess( parentFile, command );
504    }
505
506  int addRule( Rule rule )
507    {
508    return addRule( rule.getRulePhase().getLevel(), rule );
509    }
510
511  int addRule( ProcessLevel level, Rule rule )
512    {
513    if( !counts.containsKey( level ) )
514      counts.put( level, new LinkedHashSet<Rule>() );
515
516    Set<Rule> rules = counts.get( level );
517
518    rules.add( rule );
519
520    return rules.size() - 1;
521    }
522  }