001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.rule;
022
023import java.io.PrintWriter;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.HashMap;
027import java.util.LinkedHashMap;
028import java.util.LinkedHashSet;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032
033import cascading.flow.planner.graph.ElementGraph;
034import cascading.flow.planner.graph.FlowElementGraph;
035import cascading.flow.planner.rule.util.ResultTree;
036import cascading.util.Util;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import static cascading.util.Util.formatDurationFromMillis;
041
042/**
043 *
044 */
045public class RuleResult
046  {
047  public static final int THRESHOLD_SECONDS = 10;
048
049  private static final Logger LOG = LoggerFactory.getLogger( RuleResult.class );
050
051  public enum ResultStatus
052    {
053      SUCCESS,
054      UNSUPPORTED,
055      ILLEGAL,
056      INTERRUPTED
057    }
058
059  private Map<ProcessLevel, Set<ElementGraph>> levelParents = new HashMap<>();
060  private ResultTree resultTree = new ResultTree();
061
062  private long duration = 0;
063  private Map<PlanPhase, Long> phaseDurations = new LinkedHashMap<>();
064  private Map<PlanPhase, Map<String, Long>> ruleDurations = new LinkedHashMap<>();
065
066  protected FlowElementGraph initialAssembly;
067  private RuleRegistry registry;
068  private Exception plannerException;
069
070  public RuleResult()
071    {
072    for( ProcessLevel level : ProcessLevel.values() )
073      levelParents.put( level, new LinkedHashSet<ElementGraph>() );
074    }
075
076  public RuleResult( RuleRegistry registry )
077    {
078    this();
079
080    this.registry = registry;
081    }
082
083  public RuleResult( FlowElementGraph initialAssembly )
084    {
085    this();
086
087    initResult( initialAssembly );
088    }
089
090  public RuleResult( RuleRegistry registry, FlowElementGraph initialAssembly )
091    {
092    this();
093    this.registry = registry;
094
095    initResult( initialAssembly );
096    }
097
098  public RuleRegistry getRegistry()
099    {
100    return registry;
101    }
102
103  public void setPlannerException( Exception plannerException )
104    {
105    this.plannerException = plannerException;
106    }
107
108  public Exception getPlannerException()
109    {
110    return plannerException;
111    }
112
113  public boolean hasPlannerException()
114    {
115    return plannerException != null;
116    }
117
118  public boolean isSuccess()
119    {
120    return !hasPlannerException();
121    }
122
123  public boolean isIllegal()
124    {
125    return !isSuccess() && !isUnsupported() && !isInterrupted();
126    }
127
128  public boolean isUnsupported()
129    {
130    return getPlannerException() instanceof UnsupportedPlanException;
131    }
132
133  public boolean isInterrupted()
134    {
135    return getPlannerException() instanceof InterruptedException;
136    }
137
138  public ResultStatus getResultStatus()
139    {
140    if( isSuccess() )
141      return ResultStatus.SUCCESS;
142
143    if( isUnsupported() )
144      return ResultStatus.UNSUPPORTED;
145
146    if( isInterrupted() )
147      return ResultStatus.INTERRUPTED;
148
149    return ResultStatus.ILLEGAL;
150    }
151
152  public void initResult( FlowElementGraph initialAssembly )
153    {
154    this.initialAssembly = initialAssembly;
155
156    setLevelResults( ProcessLevel.Assembly, initialAssembly, initialAssembly.copyElementGraph() );
157    }
158
159  public void setLevelResults( ProcessLevel level, Map<ElementGraph, List<? extends ElementGraph>> results )
160    {
161    for( Map.Entry<ElementGraph, List<? extends ElementGraph>> entry : results.entrySet() )
162      setLevelResults( level, entry.getKey(), entry.getValue() );
163    }
164
165  public void setLevelResults( ProcessLevel level, ElementGraph parent, ElementGraph child )
166    {
167    setLevelResults( level, parent, Collections.singletonList( child ) );
168    }
169
170  public void setLevelResults( ProcessLevel level, ElementGraph parent, List<? extends ElementGraph> elementGraphs )
171    {
172    levelParents.get( level ).add( parent );
173    resultTree.setChildren( parent, elementGraphs );
174    }
175
176  public Map<ElementGraph, List<? extends ElementGraph>> getLevelResults( ProcessLevel level )
177    {
178    Map<ElementGraph, List<? extends ElementGraph>> result = new HashMap<>();
179    Set<? extends ElementGraph> parents = levelParents.get( level );
180
181    if( !parents.isEmpty() )
182      {
183      for( ElementGraph parent : parents )
184        result.put( parent, resultTree.getChildren( parent ) );
185
186      return result;
187      }
188
189    // initialize the level
190    Set<ElementGraph> top = levelParents.get( ProcessLevel.parent( level ) );
191
192    for( ElementGraph parent : top )
193      {
194      List<? extends ElementGraph> children = resultTree.getChildren( parent );
195
196      for( ElementGraph child : children )
197        result.put( child, new ArrayList<ElementGraph>() );
198      }
199
200    return result;
201    }
202
203  public int[] getPathFor( ElementGraph parent, ElementGraph child )
204    {
205    return resultTree.getEdge( parent, child ).getOrdinals();
206    }
207
208  public int[] getPathFor( ElementGraph parent )
209    {
210    ResultTree.Path incomingEdge = resultTree.getIncomingEdge( parent );
211
212    if( incomingEdge == null )
213      return new int[ 0 ];
214
215    return incomingEdge.getOrdinals();
216    }
217
218  public FlowElementGraph getInitialAssembly()
219    {
220    return initialAssembly;
221    }
222
223  public FlowElementGraph getAssemblyGraph()
224    {
225    Map<ElementGraph, List<? extends ElementGraph>> results = getLevelResults( ProcessLevel.Assembly );
226
227    return (FlowElementGraph) Util.getFirst( results.get( getInitialAssembly() ) );
228    }
229
230  public Map<ElementGraph, List<? extends ElementGraph>> getAssemblyToStepGraphMap()
231    {
232    return getLevelResults( ProcessLevel.Step );
233    }
234
235  public Map<ElementGraph, List<? extends ElementGraph>> getStepToNodeGraphMap()
236    {
237    return getLevelResults( ProcessLevel.Node );
238    }
239
240  public Map<ElementGraph, List<? extends ElementGraph>> getNodeToPipelineGraphMap()
241    {
242    return getLevelResults( ProcessLevel.Pipeline );
243    }
244
245  public int getNumSteps()
246    {
247    return getStepToNodeGraphMap().size();
248    }
249
250  public int getNumNodes()
251    {
252    int nodes = 0;
253
254    for( List<? extends ElementGraph> nodesList : getStepToNodeGraphMap().values() )
255      nodes += nodesList.size();
256
257    return nodes;
258    }
259
260  public void setDuration( long begin, long end )
261    {
262    duration = end - begin;
263    }
264
265  public long getDuration()
266    {
267    return duration;
268    }
269
270  public void setPhaseDuration( PlanPhase phase, long begin, long end )
271    {
272    phaseDurations.put( phase, end - begin );
273    }
274
275  public void setRuleDuration( Rule rule, long begin, long end )
276    {
277    Map<String, Long> durations = ruleDurations.get( rule.getRulePhase() );
278
279    if( durations == null )
280      {
281      durations = new LinkedHashMap<>();
282      ruleDurations.put( rule.getRulePhase(), durations );
283      }
284
285    if( durations.containsKey( rule.getRuleName() ) )
286      throw new IllegalStateException( "duplicate rule found: " + rule.getRuleName() );
287
288    long duration = end - begin;
289
290    // print these as we go
291    if( duration > THRESHOLD_SECONDS * 1000 )
292      LOG.info( "rule: {}, took longer than {} seconds: {}", rule.getRuleName(), THRESHOLD_SECONDS, formatDurationFromMillis( duration ) );
293
294    durations.put( rule.getRuleName(), duration );
295    }
296
297  public void writeStats( PrintWriter writer )
298    {
299    writer.format( "duration\t%.03f\n", ( duration / 1000f ) );
300
301    writer.println();
302
303    for( PlanPhase phase : phaseDurations.keySet() )
304      {
305      long phaseDuration = phaseDurations.get( phase );
306
307      writer.format( "%s\t%.03f\n", phase, ( phaseDuration / 1000f ) );
308
309      Map<String, Long> rules = ruleDurations.get( phase );
310
311      writer.println( "=======================" );
312
313      if( rules != null )
314        {
315        for( String ruleName : rules.keySet() )
316          {
317          long ruleDuration = rules.get( ruleName );
318          writer.format( "%s\t%.03f\n", ruleName, ( ruleDuration / 1000f ) );
319          }
320        }
321
322      writer.println( "" );
323      }
324    }
325  }