001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.util; 022 023import java.io.IOException; 024import java.net.URI; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028 029import cascading.flow.FlowException; 030import cascading.flow.hadoop.HadoopFlowProcess; 031import cascading.scheme.hadoop.TextLine; 032import cascading.tap.SinkMode; 033import cascading.tap.hadoop.Hfs; 034import cascading.tap.hadoop.Lfs; 035import cascading.tuple.Fields; 036import cascading.tuple.Tuple; 037import cascading.tuple.TupleEntryCollector; 038import cascading.tuple.TupleEntryIterator; 039import cascading.util.Util; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.filecache.DistributedCache; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.LocalFileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.mapred.JobConf; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * 051 */ 052public class HadoopMRUtil 053 { 054 private static final Logger LOG = LoggerFactory.getLogger( HadoopMRUtil.class ); 055 056 public static String writeStateToDistCache( JobConf conf, String id, String kind, String stepState ) 057 { 058 if( Util.isEmpty( stepState ) ) 059 return null; 060 061 LOG.info( "writing step state to dist cache, too large for job conf, size: {}", stepState.length() ); 062 063 String statePath = Hfs.getTempPath( conf ) + "/" + kind + "-state-" + id; 064 065 Hfs temp = new Hfs( new TextLine(), statePath, SinkMode.REPLACE ); 066 067 try 068 { 069 TupleEntryCollector writer = temp.openForWrite( new HadoopFlowProcess( conf ) ); 070 071 writer.add( new Tuple( stepState ) ); 072 073 writer.close(); 074 } 075 catch( IOException exception ) 076 { 077 throw new FlowException( "unable to write step state to Hadoop FS: " + temp.getIdentifier() ); 078 } 079 080 URI uri = new Path( statePath ).toUri(); 081 DistributedCache.addCacheFile( uri, conf ); 082 083 LOG.info( "using step state path: {}", uri ); 084 085 return statePath; 086 } 087 088 public static String readStateFromDistCache( JobConf jobConf, String id, String kind ) throws IOException 089 { 090 Path[] files = DistributedCache.getLocalCacheFiles( jobConf ); 091 092 Path stepStatePath = null; 093 094 for( Path file : files ) 095 { 096 if( !file.toString().contains( kind + "-state-" + id ) ) 097 continue; 098 099 stepStatePath = file; 100 break; 101 } 102 103 if( stepStatePath == null ) 104 throw new FlowException( "unable to find step state from distributed cache" ); 105 106 LOG.info( "reading step state from local path: {}", stepStatePath ); 107 108 Hfs temp = new Lfs( new TextLine( new Fields( "line" ) ), stepStatePath.toString() ); 109 110 TupleEntryIterator reader = null; 111 112 try 113 { 114 reader = temp.openForRead( new HadoopFlowProcess( jobConf ) ); 115 116 if( !reader.hasNext() ) 117 throw new FlowException( "step state path is empty: " + temp.getIdentifier() ); 118 119 return reader.next().getString( 0 ); 120 } 121 catch( IOException exception ) 122 { 123 throw new FlowException( "unable to find state path: " + temp.getIdentifier(), exception ); 124 } 125 finally 126 { 127 try 128 { 129 if( reader != null ) 130 reader.close(); 131 } 132 catch( IOException exception ) 133 { 134 LOG.warn( "error closing state path reader", exception ); 135 } 136 } 137 } 138 139 /** 140 * Add to class path. 141 * 142 * @param config the config 143 * @param classpath the classpath 144 */ 145 public static Map<Path, Path> addToClassPath( Configuration config, List<String> classpath ) 146 { 147 if( classpath == null ) 148 return null; 149 150 // given to fully qualified 151 Map<String, Path> localPaths = new HashMap<String, Path>(); 152 Map<String, Path> remotePaths = new HashMap<String, Path>(); 153 154 HadoopUtil.resolvePaths( config, classpath, null, null, localPaths, remotePaths ); 155 156 try 157 { 158 LocalFileSystem localFS = HadoopUtil.getLocalFS( config ); 159 160 for( String path : localPaths.keySet() ) 161 { 162 // only add local if no remote 163 if( remotePaths.containsKey( path ) ) 164 continue; 165 166 Path artifact = localPaths.get( path ); 167 168 DistributedCache.addFileToClassPath( artifact.makeQualified( localFS ), config ); 169 } 170 171 FileSystem defaultFS = HadoopUtil.getDefaultFS( config ); 172 173 for( String path : remotePaths.keySet() ) 174 { 175 // always add remote 176 Path artifact = remotePaths.get( path ); 177 178 DistributedCache.addFileToClassPath( artifact.makeQualified( defaultFS ), config ); 179 } 180 } 181 catch( IOException exception ) 182 { 183 throw new FlowException( "unable to set distributed cache paths", exception ); 184 } 185 186 return HadoopUtil.getCommonPaths( localPaths, remotePaths ); 187 } 188 189 public static boolean hasReducer( JobConf jobConf ) 190 { 191 return jobConf.getReducerClass() != null; 192 } 193 }