001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.tez.planner; 022 023import java.net.URI; 024import java.util.Map; 025import java.util.Properties; 026import java.util.Set; 027 028import cascading.flow.FlowConnector; 029import cascading.flow.FlowDef; 030import cascading.flow.FlowElement; 031import cascading.flow.FlowStep; 032import cascading.flow.hadoop.util.HadoopUtil; 033import cascading.flow.planner.BaseFlowStepFactory; 034import cascading.flow.planner.FlowPlanner; 035import cascading.flow.planner.PlannerInfo; 036import cascading.flow.planner.PlatformInfo; 037import cascading.flow.planner.graph.ElementGraph; 038import cascading.flow.planner.process.FlowNodeGraph; 039import cascading.flow.planner.process.FlowStepFactory; 040import cascading.flow.planner.rule.RuleRegistry; 041import cascading.flow.planner.rule.transformer.BoundaryElementFactory; 042import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory; 043import cascading.flow.tez.Hadoop2TezFlow; 044import cascading.flow.tez.Hadoop2TezFlowStep; 045import cascading.flow.tez.util.TezUtil; 046import cascading.pipe.Boundary; 047import cascading.property.AppProps; 048import cascading.tap.Tap; 049import cascading.tap.hadoop.Hfs; 050import cascading.tap.hadoop.util.TempHfs; 051import cascading.util.Util; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.tez.dag.api.DAG; 054import org.apache.tez.dag.api.TezConfiguration; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import static cascading.flow.tez.util.TezUtil.asJobConf; 059 060/** 061 */ 062public class Hadoop2TezPlanner extends FlowPlanner<Hadoop2TezFlow, TezConfiguration> 063 { 064 /** Field LOG */ 065 private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezPlanner.class ); 066 067 public static final String PLATFORM_NAME = "hadoop2-tez"; 068 069 /** Field defaultConfiguration */ 070 private TezConfiguration defaultConfiguration; 071 /** Field intermediateSchemeClass */ 072 private Class intermediateSchemeClass; 073 074 public static void copyConfiguration( Map<Object, Object> properties, Configuration configuration ) 075 { 076 for( Map.Entry<String, String> entry : configuration ) 077 properties.put( entry.getKey(), entry.getValue() ); 078 } 079 080 public static TezConfiguration createConfiguration( Map<Object, Object> properties ) 081 { 082 TezConfiguration conf = new TezConfiguration(); 083 084 copyProperties( conf, properties ); 085 086 return conf; 087 } 088 089 public static void copyProperties( Configuration jobConf, Map<Object, Object> properties ) 090 { 091 if( properties instanceof Properties ) 092 { 093 Properties props = (Properties) properties; 094 Set<String> keys = props.stringPropertyNames(); 095 096 for( String key : keys ) 097 jobConf.set( key, props.getProperty( key ) ); 098 } 099 else 100 { 101 for( Map.Entry<Object, Object> entry : properties.entrySet() ) 102 { 103 if( entry.getValue() != null ) 104 jobConf.set( entry.getKey().toString(), entry.getValue().toString() ); 105 } 106 } 107 } 108 109 @Override 110 public PlannerInfo getPlannerInfo( String registryName ) 111 { 112 return new PlannerInfo( getClass().getSimpleName(), PLATFORM_NAME, registryName ); 113 } 114 115 @Override 116 public TezConfiguration getDefaultConfig() 117 { 118 return defaultConfiguration; 119 } 120 121 @Override 122 public PlatformInfo getPlatformInfo() 123 { 124 return HadoopUtil.getPlatformInfo( DAG.class, null, "Tez" ); 125 } 126 127 @Override 128 public void initialize( FlowConnector flowConnector, Map<Object, Object> properties ) 129 { 130 super.initialize( flowConnector, properties ); 131 132 defaultConfiguration = TezUtil.createTezConf( properties, createConfiguration( properties ) ); 133 intermediateSchemeClass = flowConnector.getIntermediateSchemeClass( properties ); 134 135 String applicationJarPath = AppProps.getApplicationJarPath( properties ); 136 137 if( applicationJarPath == null ) 138 { 139 Class type = AppProps.getApplicationJarClass( properties ); 140 141 if( type == null ) 142 type = HadoopUtil.findMainClass( Hadoop2TezPlanner.class ); 143 144 if( type != null ) 145 applicationJarPath = Util.findContainingJar( type ); 146 147 AppProps.setApplicationJarPath( properties, applicationJarPath ); 148 } 149 150 if( applicationJarPath != null ) 151 LOG.info( "using application jar: {}", applicationJarPath ); 152 else 153 LOG.info( "using application jar not provided, see cascading.property.AppProps for more information" ); 154 } 155 156 @Override 157 public void configRuleRegistryDefaults( RuleRegistry ruleRegistry ) 158 { 159 super.configRuleRegistryDefaults( ruleRegistry ); 160 161 ruleRegistry.addDefaultElementFactory( IntermediateTapElementFactory.TEMP_TAP, new TempTapElementFactory() ); 162 ruleRegistry.addDefaultElementFactory( BoundaryElementFactory.BOUNDARY_PIPE, new IntermediateBoundaryElementFactory() ); 163 } 164 165 @Override 166 protected Hadoop2TezFlow createFlow( FlowDef flowDef ) 167 { 168 return new Hadoop2TezFlow( getPlatformInfo(), getDefaultProperties(), getDefaultConfig(), flowDef ); 169 } 170 171 @Override 172 public FlowStepFactory<TezConfiguration> getFlowStepFactory() 173 { 174 return new BaseFlowStepFactory<TezConfiguration>( getFlowNodeFactory() ) 175 { 176 @Override 177 public FlowStep<TezConfiguration> createFlowStep( ElementGraph stepElementGraph, FlowNodeGraph flowNodeGraph ) 178 { 179 return new Hadoop2TezFlowStep( stepElementGraph, flowNodeGraph ); 180 } 181 }; 182 } 183 184 public URI getDefaultURIScheme( Tap tap ) 185 { 186 return ( (Hfs) tap ).getDefaultFileSystemURIScheme( defaultConfiguration ); 187 } 188 189 public URI getURIScheme( Tap tap ) 190 { 191 return ( (Hfs) tap ).getURIScheme( defaultConfiguration ); 192 } 193 194 @Override 195 protected Tap makeTempTap( String prefix, String name ) 196 { 197 // must give Taps unique names 198 return new TempHfs( asJobConf( defaultConfiguration ), Util.makePath( prefix, name ), intermediateSchemeClass, prefix == null ); 199 } 200 201 public class IntermediateBoundaryElementFactory extends BoundaryElementFactory 202 { 203 204 @Override 205 public FlowElement create( ElementGraph graph, FlowElement flowElement ) 206 { 207 return new Boundary(); 208 } 209 } 210 }