001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.planner.rule; 022 023import java.util.HashMap; 024import java.util.LinkedList; 025import java.util.Map; 026import java.util.Set; 027import java.util.TreeSet; 028 029import cascading.flow.planner.iso.transformer.ElementFactory; 030import cascading.util.LogUtil; 031 032/** 033 * RuleRegistry contains all planner rules for a given platform. Where a rule is of type 034 * {@link cascading.flow.planner.rule.Rule} and allows for either asserts, transforms, and partitioning 035 * of pipe assembly process graphs. 036 * <p/> 037 * Process graphs are initially a standard Pipe assembly and connected Taps. And depending on the underlying platform 038 * are partitioned into units of execution along the process hierarchy of 039 * {@link cascading.flow.planner.rule.ProcessLevel#Assembly}, {@link cascading.flow.planner.rule.ProcessLevel#Step}, 040 * {@link cascading.flow.planner.rule.ProcessLevel#Node}, and {@link cascading.flow.planner.rule.ProcessLevel#Pipeline}. 041 * <p/> 042 * Where the Assembly is the user created pipe assembly and taps. The Steps are physical jobs executed by a platform. 043 * Nodes are internal elements of work within a job (a Mapper or Reducer in the case of MapReduce). And Pipelines, 044 * which are non-blocking streamed paths within a node. These can be optional. 045 * <p/> 046 * Rules rely on a 'language' for sub-graph pattern matching and can be applied against a given process type 047 * (Assembly, or Step, etc), to test its correctness, or make changes to the graph. Or they can be used to partition a 048 * large graph into a smaller graph, converting an Assembly into Steps. 049 * <p/> 050 * Debugging rule sets can be done by enabling system properties, see {@link cascading.flow.planner.FlowPlanner}. 051 * <p/> 052 * The {@link cascading.flow.planner.rule.RuleExec} class is responsible for executing on a given rule set. 053 */ 054public class RuleRegistry 055 { 056 private Map<String, ElementFactory> factories = new HashMap<>(); 057 private Map<PlanPhase, LinkedList<Rule>> rules = new HashMap<>(); 058 059 { 060 for( PlanPhase phase : PlanPhase.values() ) 061 rules.put( phase, new LinkedList<Rule>() ); 062 } 063 064 private boolean resolveElementsEnabled = true; 065 066 /** 067 * Method enableDebugLogging forces log4j to emit DEBUG level stats for the planner classes. 068 * <p/> 069 * For planner tracing, see {@link cascading.flow.planner.FlowPlanner} properties. 070 */ 071 public void enableDebugLogging() 072 { 073 LogUtil.setLog4jLevel( "cascading.flow.planner.rule", "DEBUG" ); 074 LogUtil.setLog4jLevel( "cascading.flow.planner.iso.transformer", "DEBUG" ); 075 LogUtil.setLog4jLevel( "cascading.flow.planner.iso.assertion", "DEBUG" ); 076 LogUtil.setLog4jLevel( "cascading.flow.planner.iso.subgraph", "DEBUG" ); 077 LogUtil.setLog4jLevel( "cascading.flow.planner.iso.finder", "DEBUG" ); 078// LogUtil.setLog4jLevel( "org.apache", "DEBUG" ); 079 } 080 081 /** 082 * Adds the named elementFactory if it does not already exist. 083 * 084 * @param name 085 * @param elementFactory 086 * @return true if the factory was added 087 */ 088 public boolean addDefaultElementFactory( String name, ElementFactory elementFactory ) 089 { 090 if( hasElementFactory( name ) ) 091 return false; 092 093 factories.put( name, elementFactory ); 094 095 return true; 096 } 097 098 public ElementFactory addElementFactory( String name, ElementFactory elementFactory ) 099 { 100 return factories.put( name, elementFactory ); 101 } 102 103 public ElementFactory getElementFactory( String factoryName ) 104 { 105 return factories.get( factoryName ); 106 } 107 108 public boolean hasElementFactory( String factoryName ) 109 { 110 return factories.containsKey( factoryName ); 111 } 112 113 public LinkedList<Rule> getRulesFor( PlanPhase phase ) 114 { 115 return rules.get( phase ); 116 } 117 118 public boolean addRule( Rule rule ) 119 { 120 if( rule.getRulePhase() == null ) 121 throw new IllegalArgumentException( "rule must have a rule phase: " + rule.getRuleName() ); 122 123 return rules.get( rule.getRulePhase() ).add( rule ); 124 } 125 126 public boolean hasRule( String ruleName ) 127 { 128 for( Map.Entry<PlanPhase, LinkedList<Rule>> entry : rules.entrySet() ) 129 { 130 for( Rule rule : entry.getValue() ) 131 { 132 if( rule.getRuleName().equals( ruleName ) ) 133 return true; 134 } 135 } 136 137 return false; 138 } 139 140 public void setResolveElementsEnabled( boolean resolveElementsEnabled ) 141 { 142 this.resolveElementsEnabled = resolveElementsEnabled; 143 } 144 145 public boolean enabledResolveElements() 146 { 147 return resolveElementsEnabled; 148 } 149 150 public Set<ProcessLevel> getProcessLevels() 151 { 152 Set<ProcessLevel> processLevels = new TreeSet<>(); 153 154 for( PlanPhase planPhase : rules.keySet() ) 155 { 156 // only phases that rules were registered for 157 if( !rules.get( planPhase ).isEmpty() ) 158 processLevels.add( planPhase.getLevel() ); 159 } 160 161 return processLevels; 162 } 163 164 public String getName() 165 { 166 return getClass().getSimpleName(); 167 } 168 169 @Override 170 public boolean equals( Object object ) 171 { 172 if( this == object ) 173 return true; 174 175 if( object == null || getClass() != object.getClass() ) 176 return false; 177 178 RuleRegistry that = (RuleRegistry) object; 179 180 if( resolveElementsEnabled != that.resolveElementsEnabled ) 181 return false; 182 183 if( factories != null ? !factories.equals( that.factories ) : that.factories != null ) 184 return false; 185 186 if( rules != null ? !rules.equals( that.rules ) : that.rules != null ) 187 return false; 188 189 return true; 190 } 191 192 @Override 193 public int hashCode() 194 { 195 int result = factories != null ? factories.hashCode() : 0; 196 result = 31 * result + ( rules != null ? rules.hashCode() : 0 ); 197 result = 31 * result + ( resolveElementsEnabled ? 1 : 0 ); 198 return result; 199 } 200 }