001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.iso.transformer;
022
023import java.util.Collections;
024import java.util.Set;
025
026import cascading.flow.FlowElement;
027import cascading.flow.planner.PlannerContext;
028import cascading.flow.planner.graph.ElementGraph;
029import cascading.flow.planner.iso.expression.ExpressionGraph;
030import cascading.flow.planner.iso.finder.GraphFinder;
031import cascading.flow.planner.iso.finder.Match;
032import cascading.util.ProcessLogger;
033
034/**
035 *
036 */
037public abstract class RecursiveGraphTransformer<E extends ElementGraph> extends GraphTransformer<E, E>
038  {
039  /**
040   * Graphs must be transformed iteratively. In order to offset rules that may cause a loop, the recursion depth
041   * must be tracked.
042   * <p/>
043   * Some complex graphs may require a very deep search, so this value may need to be increased.
044   * <p/>
045   * During debugging, the depth may need to be shallow so that the trace logs can be written to disk before an
046   * OOME causes the planner to exit.
047   * <p/>
048   * This property may be either set at the planner ){@link cascading.flow.FlowConnector} or system level
049   * {@link System#getProperties()}.
050   */
051  public static final String TRANSFORM_RECURSION_DEPTH_MAX = "cascading.planner.transformer.recursion.depth.max";
052  public static final int DEFAULT_TRANSFORM_RECURSION_DEPTH_MAX = 1000;
053
054  private final GraphFinder finder;
055  private final ExpressionGraph expressionGraph;
056  private final boolean findAllPrimaries;
057
058  protected RecursiveGraphTransformer( ExpressionGraph expressionGraph )
059    {
060    this.expressionGraph = expressionGraph;
061    this.finder = new GraphFinder( expressionGraph );
062    this.findAllPrimaries = expressionGraph.supportsNonRecursiveMatch();
063    }
064
065  @Override
066  public Transformed<E> transform( PlannerContext plannerContext, E rootGraph )
067    {
068    int maxDepth = plannerContext.getIntProperty( TRANSFORM_RECURSION_DEPTH_MAX, DEFAULT_TRANSFORM_RECURSION_DEPTH_MAX );
069
070    Transformed<E> transformed = new Transformed<>( plannerContext, this, expressionGraph, rootGraph );
071
072    E result = transform( plannerContext.getLogger(), transformed, rootGraph, maxDepth, 0 );
073
074    transformed.setEndGraph( result );
075
076    return transformed;
077    }
078
079  protected E transform( ProcessLogger processLogger, Transformed<E> transformed, E graph, int maxDepth, int currentDepth )
080    {
081    if( currentDepth == maxDepth )
082      {
083      processLogger.logInfo( "!!! transform recursion ending, reached depth: {}", currentDepth );
084      return graph;
085      }
086
087    if( processLogger.isDebugEnabled() )
088      processLogger.logDebug( "preparing match within: {}", this.getClass().getSimpleName() );
089
090    ElementGraph prepared = prepareForMatch( processLogger, transformed, graph );
091
092    if( processLogger.isDebugEnabled() )
093      processLogger.logDebug( "completed match within: {}, with result: {}", this.getClass().getSimpleName(), prepared != null );
094
095    if( prepared == null )
096      return graph;
097
098    Set<FlowElement> exclusions = addExclusions( graph );
099
100    Match match;
101
102    if( processLogger.isDebugEnabled() )
103      processLogger.logDebug( "performing match within: {}, using recursion: {}", this.getClass().getSimpleName(), !findAllPrimaries );
104
105    // for trivial cases, disable recursion and capture all primaries initially
106    // if prepareForMatch returns a sub-graph, find all matches in the sub-graph, but we do not exit the recursion
107    if( findAllPrimaries )
108      match = finder.findAllMatches( transformed.getPlannerContext(), prepared, exclusions );
109    else
110      match = finder.findFirstMatch( transformed.getPlannerContext(), prepared, exclusions );
111
112    if( processLogger.isDebugEnabled() )
113      processLogger.logDebug( "completed match within: {}", this.getClass().getSimpleName() );
114
115    if( processLogger.isDebugEnabled() )
116      processLogger.logDebug( "performing transform in place within: {}", this.getClass().getSimpleName() );
117
118    boolean transformResult = transformGraphInPlaceUsing( transformed, graph, match );
119
120    if( processLogger.isDebugEnabled() )
121      processLogger.logDebug( "completed transform in place within: {}, with result: {}", this.getClass().getSimpleName(), transformResult );
122
123    if( !transformResult )
124      return graph;
125
126    transformed.addRecursionTransform( graph );
127
128    if( !requiresRecursiveSearch() )
129      return graph;
130
131    return transform( processLogger, transformed, graph, maxDepth, ++currentDepth );
132    }
133
134  /**
135   * By default, prepareForMatch returns the same graph, but sub-classes may return a sub-graph, one of many
136   * requiring sub-sequent matches.
137   * <p/>
138   * if we are searching the whole graph, there is no need to perform a recursion against the new transformed graph
139   */
140  protected boolean requiresRecursiveSearch()
141    {
142    return !findAllPrimaries;
143    }
144
145  protected Set<FlowElement> addExclusions( E graph )
146    {
147    return Collections.emptySet();
148    }
149
150  protected ElementGraph prepareForMatch( ProcessLogger processLogger, Transformed<E> transformed, E graph )
151    {
152    return graph;
153    }
154
155  protected abstract boolean transformGraphInPlaceUsing( Transformed<E> transformed, E graph, Match match );
156  }