001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.planner.rule;
022
023import java.util.HashMap;
024import java.util.LinkedList;
025import java.util.Map;
026import java.util.Set;
027import java.util.TreeSet;
028
029import cascading.flow.planner.iso.transformer.ElementFactory;
030import cascading.util.LogUtil;
031
032/**
033 * RuleRegistry contains all planner rules for a given platform. Where a rule is of type
034 * {@link cascading.flow.planner.rule.Rule} and allows for either asserts, transforms, and partitioning
035 * of pipe assembly process graphs.
036 * <p/>
037 * Process graphs are initially a standard Pipe assembly and connected Taps. And depending on the underlying platform
038 * are partitioned into units of execution along the process hierarchy of
039 * {@link cascading.flow.planner.rule.ProcessLevel#Assembly}, {@link cascading.flow.planner.rule.ProcessLevel#Step},
040 * {@link cascading.flow.planner.rule.ProcessLevel#Node}, and {@link cascading.flow.planner.rule.ProcessLevel#Pipeline}.
041 * <p/>
042 * Where the Assembly is the user created pipe assembly and taps. The Steps are physical jobs executed by a platform.
043 * Nodes are internal elements of work within a job (a Mapper or Reducer in the case of MapReduce). And Pipelines,
044 * which are non-blocking streamed paths within a node. These can be optional.
045 * <p/>
046 * Rules rely on a 'language' for sub-graph pattern matching and can be applied against a given process type
047 * (Assembly, or Step, etc), to test its correctness, or make changes to the graph. Or they can be used to partition a
048 * large graph into a smaller graph, converting an Assembly into Steps.
049 * <p/>
050 * Debugging rule sets can be done by enabling system properties, see {@link cascading.flow.planner.FlowPlanner}.
051 * <p/>
052 * The {@link cascading.flow.planner.rule.RuleExec} class is responsible for executing on a given rule set.
053 */
054public class RuleRegistry
055  {
056  private Map<String, ElementFactory> factories = new HashMap<>();
057  private Map<PlanPhase, LinkedList<Rule>> rules = new HashMap<>();
058
059  {
060  for( PlanPhase phase : PlanPhase.values() )
061    rules.put( phase, new LinkedList<Rule>() );
062  }
063
064  private boolean resolveElementsEnabled = true;
065
066  /**
067   * Method enableDebugLogging forces log4j to emit DEBUG level stats for the planner classes.
068   * <p/>
069   * For planner tracing, see {@link cascading.flow.planner.FlowPlanner} properties.
070   */
071  public void enableDebugLogging()
072    {
073    LogUtil.setLog4jLevel( "cascading.flow.planner.rule", "DEBUG" );
074    LogUtil.setLog4jLevel( "cascading.flow.planner.iso.transformer", "DEBUG" );
075    LogUtil.setLog4jLevel( "cascading.flow.planner.iso.assertion", "DEBUG" );
076    LogUtil.setLog4jLevel( "cascading.flow.planner.iso.subgraph", "DEBUG" );
077    LogUtil.setLog4jLevel( "cascading.flow.planner.iso.finder", "DEBUG" );
078//    LogUtil.setLog4jLevel( "org.apache", "DEBUG" );
079    }
080
081  /**
082   * Adds the named elementFactory if it does not already exist.
083   *
084   * @param name
085   * @param elementFactory
086   * @return true if the factory was added
087   */
088  public boolean addDefaultElementFactory( String name, ElementFactory elementFactory )
089    {
090    if( hasElementFactory( name ) )
091      return false;
092
093    factories.put( name, elementFactory );
094
095    return true;
096    }
097
098  public ElementFactory addElementFactory( String name, ElementFactory elementFactory )
099    {
100    return factories.put( name, elementFactory );
101    }
102
103  public ElementFactory getElementFactory( String factoryName )
104    {
105    return factories.get( factoryName );
106    }
107
108  public boolean hasElementFactory( String factoryName )
109    {
110    return factories.containsKey( factoryName );
111    }
112
113  public LinkedList<Rule> getRulesFor( PlanPhase phase )
114    {
115    return rules.get( phase );
116    }
117
118  public boolean addRule( Rule rule )
119    {
120    if( rule.getRulePhase() == null )
121      throw new IllegalArgumentException( "rule must have a rule phase: " + rule.getRuleName() );
122
123    return rules.get( rule.getRulePhase() ).add( rule );
124    }
125
126  public boolean hasRule( String ruleName )
127    {
128    for( Map.Entry<PlanPhase, LinkedList<Rule>> entry : rules.entrySet() )
129      {
130      for( Rule rule : entry.getValue() )
131        {
132        if( rule.getRuleName().equals( ruleName ) )
133          return true;
134        }
135      }
136
137    return false;
138    }
139
140  public void setResolveElementsEnabled( boolean resolveElementsEnabled )
141    {
142    this.resolveElementsEnabled = resolveElementsEnabled;
143    }
144
145  public boolean enabledResolveElements()
146    {
147    return resolveElementsEnabled;
148    }
149
150  public Set<ProcessLevel> getProcessLevels()
151    {
152    Set<ProcessLevel> processLevels = new TreeSet<>();
153
154    for( PlanPhase planPhase : rules.keySet() )
155      {
156      // only phases that rules were registered for
157      if( !rules.get( planPhase ).isEmpty() )
158        processLevels.add( planPhase.getLevel() );
159      }
160
161    return processLevels;
162    }
163
164  public String getName()
165    {
166    return getClass().getSimpleName();
167    }
168
169  @Override
170  public boolean equals( Object object )
171    {
172    if( this == object )
173      return true;
174
175    if( object == null || getClass() != object.getClass() )
176      return false;
177
178    RuleRegistry that = (RuleRegistry) object;
179
180    if( resolveElementsEnabled != that.resolveElementsEnabled )
181      return false;
182
183    if( factories != null ? !factories.equals( that.factories ) : that.factories != null )
184      return false;
185
186    if( rules != null ? !rules.equals( that.rules ) : that.rules != null )
187      return false;
188
189    return true;
190    }
191
192  @Override
193  public int hashCode()
194    {
195    int result = factories != null ? factories.hashCode() : 0;
196    result = 31 * result + ( rules != null ? rules.hashCode() : 0 );
197    result = 31 * result + ( resolveElementsEnabled ? 1 : 0 );
198    return result;
199    }
200  }