001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.planner.rule; 023 024import java.util.HashMap; 025import java.util.LinkedList; 026import java.util.Map; 027import java.util.Set; 028import java.util.TreeSet; 029 030import cascading.flow.planner.iso.transformer.ElementFactory; 031import cascading.flow.planner.rule.util.RuleLogUtil; 032 033/** 034 * RuleRegistry contains all planner rules for a given platform. Where a rule is of type 035 * {@link cascading.flow.planner.rule.Rule} and allows for either asserts, transforms, and partitioning 036 * of pipe assembly process graphs. 037 * <p/> 038 * Process graphs are initially a standard Pipe assembly and connected Taps. And depending on the underlying platform 039 * are partitioned into units of execution along the process hierarchy of 040 * {@link cascading.flow.planner.rule.ProcessLevel#Assembly}, {@link cascading.flow.planner.rule.ProcessLevel#Step}, 041 * {@link cascading.flow.planner.rule.ProcessLevel#Node}, and {@link cascading.flow.planner.rule.ProcessLevel#Pipeline}. 042 * <p/> 043 * Where the Assembly is the user created pipe assembly and taps. The Steps are physical jobs executed by a platform. 044 * Nodes are internal elements of work within a job (a Mapper or Reducer in the case of MapReduce). And Pipelines, 045 * which are non-blocking streamed paths within a node. These can be optional. 046 * <p/> 047 * Rules rely on a 'language' for sub-graph pattern matching and can be applied against a given process type 048 * (Assembly, or Step, etc), to test its correctness, or make changes to the graph. Or they can be used to partition a 049 * large graph into a smaller graph, converting an Assembly into Steps. 050 * <p/> 051 * Debugging rule sets can be done by enabling system properties, see {@link cascading.flow.planner.FlowPlanner}. 052 * <p/> 053 * The {@link cascading.flow.planner.rule.RuleExec} class is responsible for executing on a given rule set. 054 */ 055public class RuleRegistry 056 { 057 private Map<String, ElementFactory> factories = new HashMap<>(); 058 private Map<PlanPhase, LinkedList<Rule>> rules = new HashMap<>(); 059 private String[] logLevels; 060 061 { 062 for( PlanPhase phase : PlanPhase.values() ) 063 rules.put( phase, new LinkedList<Rule>() ); 064 } 065 066 private boolean resolveElementsEnabled = true; 067 068 /** 069 * Method enableDebugLogging forces log4j to emit DEBUG level stats for the planner classes. 070 * <p/> 071 * Use {@link #restoreLogging()} to resume normal logging levels. 072 * <p/> 073 * For planner tracing, see {@link cascading.flow.planner.FlowPlanner} properties. 074 */ 075 public void enableDebugLogging() 076 { 077 logLevels = RuleLogUtil.enableDebugLogging(); 078 } 079 080 public void restoreLogging() 081 { 082 if( logLevels != null ) 083 RuleLogUtil.restoreLogging( logLevels ); 084 } 085 086 /** 087 * Adds the named elementFactory if it does not already exist. 088 * 089 * @param name 090 * @param elementFactory 091 * @return true if the factory was added 092 */ 093 public boolean addDefaultElementFactory( String name, ElementFactory elementFactory ) 094 { 095 if( hasElementFactory( name ) ) 096 return false; 097 098 factories.put( name, elementFactory ); 099 100 return true; 101 } 102 103 public ElementFactory addElementFactory( String name, ElementFactory elementFactory ) 104 { 105 return factories.put( name, elementFactory ); 106 } 107 108 public ElementFactory getElementFactory( String factoryName ) 109 { 110 return factories.get( factoryName ); 111 } 112 113 public boolean hasElementFactory( String factoryName ) 114 { 115 return factories.containsKey( factoryName ); 116 } 117 118 public LinkedList<Rule> getRulesFor( PlanPhase phase ) 119 { 120 return rules.get( phase ); 121 } 122 123 public boolean addRule( Rule rule ) 124 { 125 if( rule.getRulePhase() == null ) 126 throw new IllegalArgumentException( "rule must have a rule phase: " + rule.getRuleName() ); 127 128 return rules.get( rule.getRulePhase() ).add( rule ); 129 } 130 131 public boolean hasRule( String ruleName ) 132 { 133 for( Map.Entry<PlanPhase, LinkedList<Rule>> entry : rules.entrySet() ) 134 { 135 for( Rule rule : entry.getValue() ) 136 { 137 if( rule.getRuleName().equals( ruleName ) ) 138 return true; 139 } 140 } 141 142 return false; 143 } 144 145 public void setResolveElementsEnabled( boolean resolveElementsEnabled ) 146 { 147 this.resolveElementsEnabled = resolveElementsEnabled; 148 } 149 150 public boolean enabledResolveElements() 151 { 152 return resolveElementsEnabled; 153 } 154 155 public Set<ProcessLevel> getProcessLevels() 156 { 157 Set<ProcessLevel> processLevels = new TreeSet<>(); 158 159 for( PlanPhase planPhase : rules.keySet() ) 160 { 161 // only phases that rules were registered for 162 if( !rules.get( planPhase ).isEmpty() ) 163 processLevels.add( planPhase.getLevel() ); 164 } 165 166 return processLevels; 167 } 168 169 public String getName() 170 { 171 return getClass().getSimpleName(); 172 } 173 174 @Override 175 public boolean equals( Object object ) 176 { 177 if( this == object ) 178 return true; 179 180 if( object == null || getClass() != object.getClass() ) 181 return false; 182 183 RuleRegistry that = (RuleRegistry) object; 184 185 if( resolveElementsEnabled != that.resolveElementsEnabled ) 186 return false; 187 188 if( factories != null ? !factories.equals( that.factories ) : that.factories != null ) 189 return false; 190 191 if( rules != null ? !rules.equals( that.rules ) : that.rules != null ) 192 return false; 193 194 return true; 195 } 196 197 @Override 198 public int hashCode() 199 { 200 int result = factories != null ? factories.hashCode() : 0; 201 result = 31 * result + ( rules != null ? rules.hashCode() : 0 ); 202 result = 31 * result + ( resolveElementsEnabled ? 1 : 0 ); 203 return result; 204 } 205 }