001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.planner.rule;
023
024import java.util.HashMap;
025import java.util.LinkedList;
026import java.util.Map;
027import java.util.Set;
028import java.util.TreeSet;
029
030import cascading.flow.planner.iso.transformer.ElementFactory;
031import cascading.flow.planner.rule.util.RuleLogUtil;
032
033/**
034 * RuleRegistry contains all planner rules for a given platform. Where a rule is of type
035 * {@link cascading.flow.planner.rule.Rule} and allows for either asserts, transforms, and partitioning
036 * of pipe assembly process graphs.
037 * <p/>
038 * Process graphs are initially a standard Pipe assembly and connected Taps. And depending on the underlying platform
039 * are partitioned into units of execution along the process hierarchy of
040 * {@link cascading.flow.planner.rule.ProcessLevel#Assembly}, {@link cascading.flow.planner.rule.ProcessLevel#Step},
041 * {@link cascading.flow.planner.rule.ProcessLevel#Node}, and {@link cascading.flow.planner.rule.ProcessLevel#Pipeline}.
042 * <p/>
043 * Where the Assembly is the user created pipe assembly and taps. The Steps are physical jobs executed by a platform.
044 * Nodes are internal elements of work within a job (a Mapper or Reducer in the case of MapReduce). And Pipelines,
045 * which are non-blocking streamed paths within a node. These can be optional.
046 * <p/>
047 * Rules rely on a 'language' for sub-graph pattern matching and can be applied against a given process type
048 * (Assembly, or Step, etc), to test its correctness, or make changes to the graph. Or they can be used to partition a
049 * large graph into a smaller graph, converting an Assembly into Steps.
050 * <p/>
051 * Debugging rule sets can be done by enabling system properties, see {@link cascading.flow.planner.FlowPlanner}.
052 * <p/>
053 * The {@link cascading.flow.planner.rule.RuleExec} class is responsible for executing on a given rule set.
054 */
055public class RuleRegistry
056  {
057  private Map<String, ElementFactory> factories = new HashMap<>();
058  private Map<PlanPhase, LinkedList<Rule>> rules = new HashMap<>();
059  private String[] logLevels;
060
061  {
062  for( PlanPhase phase : PlanPhase.values() )
063    rules.put( phase, new LinkedList<Rule>() );
064  }
065
066  private boolean resolveElementsEnabled = true;
067
068  /**
069   * Method enableDebugLogging forces log4j to emit DEBUG level stats for the planner classes.
070   * <p/>
071   * Use {@link #restoreLogging()} to resume normal logging levels.
072   * <p/>
073   * For planner tracing, see {@link cascading.flow.planner.FlowPlanner} properties.
074   */
075  public void enableDebugLogging()
076    {
077    logLevels = RuleLogUtil.enableDebugLogging();
078    }
079
080  public void restoreLogging()
081    {
082    if( logLevels != null )
083      RuleLogUtil.restoreLogging( logLevels );
084    }
085
086  /**
087   * Adds the named elementFactory if it does not already exist.
088   *
089   * @param name
090   * @param elementFactory
091   * @return true if the factory was added
092   */
093  public boolean addDefaultElementFactory( String name, ElementFactory elementFactory )
094    {
095    if( hasElementFactory( name ) )
096      return false;
097
098    factories.put( name, elementFactory );
099
100    return true;
101    }
102
103  public ElementFactory addElementFactory( String name, ElementFactory elementFactory )
104    {
105    return factories.put( name, elementFactory );
106    }
107
108  public ElementFactory getElementFactory( String factoryName )
109    {
110    return factories.get( factoryName );
111    }
112
113  public boolean hasElementFactory( String factoryName )
114    {
115    return factories.containsKey( factoryName );
116    }
117
118  public LinkedList<Rule> getRulesFor( PlanPhase phase )
119    {
120    return rules.get( phase );
121    }
122
123  public boolean addRule( Rule rule )
124    {
125    if( rule.getRulePhase() == null )
126      throw new IllegalArgumentException( "rule must have a rule phase: " + rule.getRuleName() );
127
128    return rules.get( rule.getRulePhase() ).add( rule );
129    }
130
131  public boolean hasRule( String ruleName )
132    {
133    for( Map.Entry<PlanPhase, LinkedList<Rule>> entry : rules.entrySet() )
134      {
135      for( Rule rule : entry.getValue() )
136        {
137        if( rule.getRuleName().equals( ruleName ) )
138          return true;
139        }
140      }
141
142    return false;
143    }
144
145  public void setResolveElementsEnabled( boolean resolveElementsEnabled )
146    {
147    this.resolveElementsEnabled = resolveElementsEnabled;
148    }
149
150  public boolean enabledResolveElements()
151    {
152    return resolveElementsEnabled;
153    }
154
155  public Set<ProcessLevel> getProcessLevels()
156    {
157    Set<ProcessLevel> processLevels = new TreeSet<>();
158
159    for( PlanPhase planPhase : rules.keySet() )
160      {
161      // only phases that rules were registered for
162      if( !rules.get( planPhase ).isEmpty() )
163        processLevels.add( planPhase.getLevel() );
164      }
165
166    return processLevels;
167    }
168
169  public String getName()
170    {
171    return getClass().getSimpleName();
172    }
173
174  @Override
175  public boolean equals( Object object )
176    {
177    if( this == object )
178      return true;
179
180    if( object == null || getClass() != object.getClass() )
181      return false;
182
183    RuleRegistry that = (RuleRegistry) object;
184
185    if( resolveElementsEnabled != that.resolveElementsEnabled )
186      return false;
187
188    if( factories != null ? !factories.equals( that.factories ) : that.factories != null )
189      return false;
190
191    if( rules != null ? !rules.equals( that.rules ) : that.rules != null )
192      return false;
193
194    return true;
195    }
196
197  @Override
198  public int hashCode()
199    {
200    int result = factories != null ? factories.hashCode() : 0;
201    result = 31 * result + ( rules != null ? rules.hashCode() : 0 );
202    result = 31 * result + ( resolveElementsEnabled ? 1 : 0 );
203    return result;
204    }
205  }