001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.tez.util;
022
023import java.io.File;
024import java.io.IOException;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Map;
029import java.util.Properties;
030import java.util.Set;
031
032import cascading.CascadingException;
033import cascading.flow.FlowException;
034import cascading.flow.hadoop.util.HadoopUtil;
035import cascading.tap.hadoop.io.MultiInputSplit;
036import cascading.util.Util;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.LocalFileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.mapred.JobConf;
043import org.apache.hadoop.mapred.JobContext;
044import org.apache.hadoop.mapred.TaskAttemptID;
045import org.apache.hadoop.security.UserGroupInformation;
046import org.apache.hadoop.yarn.api.records.LocalResource;
047import org.apache.hadoop.yarn.api.records.LocalResourceType;
048import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
049import org.apache.hadoop.yarn.api.records.URL;
050import org.apache.hadoop.yarn.util.ConverterUtils;
051import org.apache.tez.common.TezUtils;
052import org.apache.tez.dag.api.TezConfiguration;
053import org.apache.tez.mapreduce.input.MRInput;
054import org.apache.tez.mapreduce.lib.MRReader;
055import org.apache.tez.mapreduce.output.MROutput;
056import org.apache.tez.runtime.api.AbstractLogicalInput;
057import org.apache.tez.runtime.api.AbstractLogicalOutput;
058import org.apache.tez.runtime.api.LogicalInput;
059import org.apache.tez.runtime.api.LogicalOutput;
060import org.apache.tez.runtime.api.MergedLogicalInput;
061import org.apache.tez.runtime.api.ProcessorContext;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065import static cascading.flow.hadoop.util.HadoopUtil.getCommonPaths;
066import static org.apache.hadoop.yarn.api.ApplicationConstants.CLASS_PATH_SEPARATOR;
067import static org.apache.hadoop.yarn.api.ApplicationConstants.Environment.CLASSPATH;
068import static org.apache.hadoop.yarn.api.ApplicationConstants.Environment.PWD;
069import static org.apache.tez.common.TezUtils.createConfFromByteString;
070import static org.apache.tez.common.TezUtils.createConfFromUserPayload;
071import static org.apache.tez.mapreduce.hadoop.MRInputHelpers.parseMRInputPayload;
072
073/**
074 *
075 */
076public class TezUtil
077  {
078  private static final Logger LOG = LoggerFactory.getLogger( TezUtil.class );
079
080  /**
081   * Attempting to localize all new JobConf calls
082   *
083   * @param configuration
084   * @return
085   */
086  public static JobConf asJobConf( Configuration configuration )
087    {
088    return new JobConf( configuration );
089    }
090
091  public static TezConfiguration createTezConf( Map<Object, Object> properties, TezConfiguration defaultJobconf )
092    {
093    TezConfiguration jobConf = defaultJobconf == null ? new TezConfiguration() : new TezConfiguration( defaultJobconf );
094
095    if( properties == null )
096      return jobConf;
097
098    Set<Object> keys = new HashSet<Object>( properties.keySet() );
099
100    // keys will only be grabbed if both key/value are String, so keep orig keys
101    if( properties instanceof Properties )
102      keys.addAll( ( (Properties) properties ).stringPropertyNames() );
103
104    for( Object key : keys )
105      {
106      Object value = properties.get( key );
107
108      if( value == null && properties instanceof Properties && key instanceof String )
109        value = ( (Properties) properties ).getProperty( (String) key );
110
111      if( value == null ) // don't stuff null values
112        continue;
113
114      // don't let these objects pass, even though toString is called below.
115      if( value instanceof Class || value instanceof TezConfiguration )
116        continue;
117
118      jobConf.set( key.toString(), value.toString() );
119      }
120
121    return jobConf;
122    }
123
124  public static UserGroupInformation getCurrentUser()
125    {
126    try
127      {
128      return UserGroupInformation.getCurrentUser();
129      }
130    catch( IOException exception )
131      {
132      throw new CascadingException( "unable to get current user", exception );
133      }
134    }
135
136  public static String getEdgeSourceID( LogicalInput input, Configuration configuration )
137    {
138    String id = configuration.get( "cascading.node.source" );
139
140    if( id == null )
141      throw new IllegalStateException( "no source id found: " + input.getClass().getName() );
142
143    return id;
144    }
145
146  public static String getEdgeSinkID( LogicalOutput output, Configuration configuration )
147    {
148    String id = configuration.get( "cascading.node.sink" );
149
150    if( id == null )
151      throw new IllegalStateException( "no sink id found: " + output.getClass().getName() );
152
153    return id;
154    }
155
156  public static Configuration getInputConfiguration( LogicalInput input )
157    {
158    try
159      {
160      if( input instanceof MergedLogicalInput )
161        input = (LogicalInput) Util.getFirst( ( (MergedLogicalInput) input ).getInputs() );
162
163      if( input instanceof MRInput )
164        return createConfFromByteString( parseMRInputPayload( ( (MRInput) input ).getContext().getUserPayload() ).getConfigurationBytes() );
165
166      if( input instanceof AbstractLogicalInput )
167        return createConfFromUserPayload( ( (AbstractLogicalInput) input ).getContext().getUserPayload() );
168      }
169    catch( IOException exception )
170      {
171      throw new FlowException( "unable to unpack payload", exception );
172      }
173
174    throw new IllegalStateException( "unknown input type: " + input.getClass().getName() );
175    }
176
177  public static Configuration getOutputConfiguration( LogicalOutput output )
178    {
179    try
180      {
181      if( output instanceof MROutput )
182        return TezUtils.createConfFromUserPayload( ( (MROutput) output ).getContext().getUserPayload() );
183
184      if( output instanceof AbstractLogicalOutput )
185        return createConfFromUserPayload( ( (AbstractLogicalOutput) output ).getContext().getUserPayload() );
186      }
187    catch( IOException exception )
188      {
189      throw new FlowException( "unable to unpack payload", exception );
190      }
191
192    throw new IllegalStateException( "unknown input type: " + output.getClass().getName() );
193    }
194
195  public static void setSourcePathForSplit( MRInput input, MRReader reader, Configuration configuration )
196    {
197    Path path = null;
198
199    if( Util.returnInstanceFieldIfExistsSafe( input, "useNewApi" ) )
200      {
201      org.apache.hadoop.mapreduce.InputSplit newInputSplit = (org.apache.hadoop.mapreduce.InputSplit) reader.getSplit();
202
203      if( newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit )
204        path = ( (org.apache.hadoop.mapreduce.lib.input.FileSplit) newInputSplit ).getPath();
205      }
206    else
207      {
208      org.apache.hadoop.mapred.InputSplit oldInputSplit = (org.apache.hadoop.mapred.InputSplit) reader.getSplit();
209
210      if( oldInputSplit instanceof org.apache.hadoop.mapred.FileSplit )
211        path = ( (org.apache.hadoop.mapred.FileSplit) oldInputSplit ).getPath();
212      }
213
214    if( path != null )
215      configuration.set( MultiInputSplit.CASCADING_SOURCE_PATH, path.toString() );
216    }
217
218  public static Map<Path, Path> addToClassPath( Configuration config, String stagingRoot, String resourceSubPath, Collection<String> classpath,
219                                                LocalResourceType resourceType, Map<String, LocalResource> localResources,
220                                                Map<String, String> environment )
221    {
222    if( classpath == null )
223      return null;
224
225    // given to fully qualified
226    Map<String, Path> localPaths = new HashMap<>();
227    Map<String, Path> remotePaths = new HashMap<>();
228
229    HadoopUtil.resolvePaths( config, classpath, stagingRoot, resourceSubPath, localPaths, remotePaths );
230
231    try
232      {
233      LocalFileSystem localFS = HadoopUtil.getLocalFS( config );
234
235      for( String fileName : localPaths.keySet() )
236        {
237        Path artifact = localPaths.get( fileName );
238        Path remotePath = remotePaths.get( fileName );
239
240        if( remotePath == null )
241          remotePath = artifact;
242
243        addResource( localResources, environment, fileName, localFS.getFileStatus( artifact ), remotePath, resourceType );
244        }
245
246      FileSystem defaultFS = HadoopUtil.getDefaultFS( config );
247
248      for( String fileName : remotePaths.keySet() )
249        {
250        Path artifact = remotePaths.get( fileName );
251        Path localPath = localPaths.get( fileName );
252
253        if( localPath != null )
254          continue;
255
256        addResource( localResources, environment, fileName, defaultFS.getFileStatus( artifact ), artifact, resourceType );
257        }
258      }
259    catch( IOException exception )
260      {
261      throw new FlowException( "unable to set remote resource paths", exception );
262      }
263
264    return getCommonPaths( localPaths, remotePaths );
265    }
266
267  protected static void addResource( Map<String, LocalResource> localResources, Map<String, String> environment, String fileName, FileStatus stats, Path fullPath, LocalResourceType type ) throws IOException
268    {
269    if( localResources.containsKey( fileName ) )
270      throw new FlowException( "duplicate filename added to classpath resources: " + fileName );
271
272    URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath( fullPath );
273    long len = stats.getLen();
274    long modificationTime = stats.getModificationTime();
275
276    LocalResource resource = LocalResource.newInstance(
277      yarnUrlFromPath,
278      type,
279      LocalResourceVisibility.APPLICATION,
280      len,
281      modificationTime );
282
283    if( type == LocalResourceType.PATTERN )
284      {
285      // todo: parametrize this for dynamic inclusion below
286      String pattern = "(?:classes/|lib/).*";
287
288      resource.setPattern( pattern );
289
290      if( environment != null )
291        {
292        String current = "";
293
294        current += PWD.$$() + File.separator + fileName + File.separator + "*" + CLASS_PATH_SEPARATOR;
295        current += PWD.$$() + File.separator + fileName + File.separator + "lib" + File.separator + "*" + CLASS_PATH_SEPARATOR;
296        current += PWD.$$() + File.separator + fileName + File.separator + "classes" + File.separator + "*" + CLASS_PATH_SEPARATOR;
297
298        String classPath = environment.get( CLASSPATH.name() );
299
300        if( classPath == null )
301          classPath = "";
302        else if( !classPath.startsWith( CLASS_PATH_SEPARATOR ) )
303          classPath += CLASS_PATH_SEPARATOR;
304
305        classPath += current;
306
307        LOG.info( "adding to cluster side classpath: {} ", classPath );
308
309        environment.put( CLASSPATH.name(), classPath );
310        }
311      }
312
313    localResources.put( fileName, resource );
314    }
315
316  public static void setMRProperties( ProcessorContext context, Configuration config, boolean isMapperOutput )
317    {
318    TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
319      .createMockTaskAttemptID( context.getApplicationId().getClusterTimestamp(),
320        context.getTaskVertexIndex(), context.getApplicationId().getId(),
321        context.getTaskIndex(), context.getTaskAttemptNumber(), isMapperOutput );
322
323    config.set( JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString() );
324    config.set( JobContext.TASK_ID, taskAttemptId.getTaskID().toString() );
325    config.setBoolean( JobContext.TASK_ISMAP, isMapperOutput );
326    config.setInt( JobContext.TASK_PARTITION, taskAttemptId.getTaskID().getId() );
327    }
328  }