001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.planner;
022
023import java.net.URI;
024import java.util.Map;
025import java.util.Properties;
026import java.util.Set;
027
028import cascading.flow.FlowConnector;
029import cascading.flow.FlowDef;
030import cascading.flow.FlowElement;
031import cascading.flow.FlowStep;
032import cascading.flow.hadoop.util.HadoopUtil;
033import cascading.flow.planner.FlowPlanner;
034import cascading.flow.planner.PlannerInfo;
035import cascading.flow.planner.PlatformInfo;
036import cascading.flow.planner.graph.ElementGraph;
037import cascading.flow.planner.process.FlowNodeGraph;
038import cascading.flow.planner.rule.RuleRegistry;
039import cascading.flow.planner.rule.transformer.BoundaryElementFactory;
040import cascading.flow.planner.rule.transformer.IntermediateTapElementFactory;
041import cascading.flow.tez.Hadoop2TezFlow;
042import cascading.flow.tez.Hadoop2TezFlowStep;
043import cascading.flow.tez.util.TezUtil;
044import cascading.pipe.Boundary;
045import cascading.property.AppProps;
046import cascading.tap.Tap;
047import cascading.tap.hadoop.Hfs;
048import cascading.tap.hadoop.util.TempHfs;
049import cascading.util.Util;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.tez.dag.api.DAG;
052import org.apache.tez.dag.api.TezConfiguration;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056import static cascading.flow.tez.util.TezUtil.asJobConf;
057
058/**
059 */
060public class Hadoop2TezPlanner extends FlowPlanner<Hadoop2TezFlow, TezConfiguration>
061  {
062  /** Field LOG */
063  private static final Logger LOG = LoggerFactory.getLogger( Hadoop2TezPlanner.class );
064
065  public static final String PLATFORM_NAME = "hadoop2-tez";
066
067  /** Field defaultConfiguration */
068  private TezConfiguration defaultConfiguration;
069  /** Field intermediateSchemeClass */
070  private Class intermediateSchemeClass;
071
072  public static void copyConfiguration( Map<Object, Object> properties, Configuration configuration )
073    {
074    for( Map.Entry<String, String> entry : configuration )
075      properties.put( entry.getKey(), entry.getValue() );
076    }
077
078  public static TezConfiguration createConfiguration( Map<Object, Object> properties )
079    {
080    TezConfiguration conf = new TezConfiguration();
081
082    copyProperties( conf, properties );
083
084    return conf;
085    }
086
087  public static void copyProperties( Configuration jobConf, Map<Object, Object> properties )
088    {
089    if( properties instanceof Properties )
090      {
091      Properties props = (Properties) properties;
092      Set<String> keys = props.stringPropertyNames();
093
094      for( String key : keys )
095        jobConf.set( key, props.getProperty( key ) );
096      }
097    else
098      {
099      for( Map.Entry<Object, Object> entry : properties.entrySet() )
100        {
101        if( entry.getValue() != null )
102          jobConf.set( entry.getKey().toString(), entry.getValue().toString() );
103        }
104      }
105    }
106
107  @Override
108  public PlannerInfo getPlannerInfo( String registryName )
109    {
110    return new PlannerInfo( getClass().getSimpleName(), PLATFORM_NAME, registryName );
111    }
112
113  @Override
114  public TezConfiguration getDefaultConfig()
115    {
116    return defaultConfiguration;
117    }
118
119  @Override
120  public PlatformInfo getPlatformInfo()
121    {
122    return HadoopUtil.getPlatformInfo( DAG.class, null, "Tez" );
123    }
124
125  @Override
126  public void initialize( FlowConnector flowConnector, Map<Object, Object> properties )
127    {
128    super.initialize( flowConnector, properties );
129
130    defaultConfiguration = TezUtil.createTezConf( properties, createConfiguration( properties ) );
131    intermediateSchemeClass = flowConnector.getIntermediateSchemeClass( properties );
132
133    String applicationJarPath = AppProps.getApplicationJarPath( properties );
134
135    if( applicationJarPath == null )
136      {
137      Class type = AppProps.getApplicationJarClass( properties );
138
139      if( type == null )
140        type = HadoopUtil.findMainClass( Hadoop2TezPlanner.class );
141
142      if( type != null )
143        applicationJarPath = Util.findContainingJar( type );
144
145      AppProps.setApplicationJarPath( properties, applicationJarPath );
146      }
147
148    if( applicationJarPath != null )
149      LOG.info( "using application jar: {}", applicationJarPath );
150    else
151      LOG.info( "using application jar not provided, see cascading.property.AppProps for more information" );
152    }
153
154  @Override
155  public void configRuleRegistryDefaults( RuleRegistry ruleRegistry )
156    {
157    super.configRuleRegistryDefaults( ruleRegistry );
158
159    ruleRegistry.addDefaultElementFactory( IntermediateTapElementFactory.TEMP_TAP, new TempTapElementFactory() );
160    ruleRegistry.addDefaultElementFactory( BoundaryElementFactory.BOUNDARY_PIPE, new IntermediateBoundaryElementFactory() );
161    }
162
163  @Override
164  protected Hadoop2TezFlow createFlow( FlowDef flowDef )
165    {
166    return new Hadoop2TezFlow( getPlatformInfo(), getDefaultProperties(), getDefaultConfig(), flowDef );
167    }
168
169  public FlowStep<TezConfiguration> createFlowStep( ElementGraph stepElementGraph, FlowNodeGraph flowNodeGraph )
170    {
171    return new Hadoop2TezFlowStep( stepElementGraph, flowNodeGraph );
172    }
173
174  public URI getDefaultURIScheme( Tap tap )
175    {
176    return ( (Hfs) tap ).getDefaultFileSystemURIScheme( defaultConfiguration );
177    }
178
179  public URI getURIScheme( Tap tap )
180    {
181    return ( (Hfs) tap ).getURIScheme( defaultConfiguration );
182    }
183
184  @Override
185  protected Tap makeTempTap( String prefix, String name )
186    {
187    // must give Taps unique names
188    return new TempHfs( asJobConf( defaultConfiguration ), Util.makePath( prefix, name ), intermediateSchemeClass, prefix == null );
189    }
190
191  public class IntermediateBoundaryElementFactory extends BoundaryElementFactory
192    {
193
194    @Override
195    public FlowElement create( ElementGraph graph, FlowElement flowElement )
196      {
197      return new Boundary();
198      }
199    }
200  }