001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.io.IOException; 024 import java.util.Collection; 025 import java.util.Collections; 026 import java.util.HashSet; 027 import java.util.Map; 028 import java.util.Set; 029 030 import cascading.CascadingException; 031 import cascading.flow.FlowProcess; 032 import cascading.flow.FlowSession; 033 import cascading.flow.hadoop.util.HadoopUtil; 034 import cascading.tap.Tap; 035 import cascading.tuple.Fields; 036 import cascading.tuple.TupleEntry; 037 import cascading.tuple.TupleEntryCollector; 038 import cascading.tuple.TupleEntryIterator; 039 import org.apache.hadoop.mapred.JobConf; 040 import org.apache.hadoop.mapred.OutputCollector; 041 import org.apache.hadoop.mapred.Reporter; 042 import org.apache.hadoop.util.ReflectionUtils; 043 044 /** 045 * Class HadoopFlowProcess is an implementation of {@link FlowProcess} for Hadoop. Use this interface to get direct 046 * access to the Hadoop JobConf and Reporter interfaces. 047 * <p/> 048 * Be warned that coupling to this implementation will cause custom {@link cascading.operation.Operation}s to 049 * fail if they are executed on a system other than Hadoop. 050 * 051 * @see cascading.flow.FlowSession 052 * @see JobConf 053 * @see Reporter 054 */ 055 public class HadoopFlowProcess extends FlowProcess<JobConf> 056 { 057 /** Field jobConf */ 058 final JobConf jobConf; 059 /** Field isMapper */ 060 private final boolean isMapper; 061 /** Field reporter */ 062 Reporter reporter = Reporter.NULL; 063 /** Field outputCollector */ 064 private OutputCollector outputCollector; 065 066 public HadoopFlowProcess() 067 { 068 this.jobConf = new JobConf(); 069 this.isMapper = true; 070 } 071 072 public HadoopFlowProcess( JobConf jobConf ) 073 { 074 this.jobConf = jobConf; 075 this.isMapper = true; 076 } 077 078 public HadoopFlowProcess( FlowSession flowSession, JobConf jobConf ) 079 { 080 super( flowSession ); 081 this.jobConf = jobConf; 082 this.isMapper = true; 083 } 084 085 /** 086 * Constructor HadoopFlowProcess creates a new HadoopFlowProcess instance. 087 * 088 * @param flowSession of type FlowSession 089 * @param jobConf of type JobConf 090 */ 091 public HadoopFlowProcess( FlowSession flowSession, JobConf jobConf, boolean isMapper ) 092 { 093 super( flowSession ); 094 this.jobConf = jobConf; 095 this.isMapper = isMapper; 096 } 097 098 public HadoopFlowProcess( HadoopFlowProcess flowProcess, JobConf jobConf ) 099 { 100 super( flowProcess.getCurrentSession() ); 101 this.jobConf = jobConf; 102 this.isMapper = flowProcess.isMapper(); 103 this.reporter = flowProcess.getReporter(); 104 } 105 106 @Override 107 public FlowProcess copyWith( JobConf jobConf ) 108 { 109 return new HadoopFlowProcess( this, jobConf ); 110 } 111 112 /** 113 * Method getJobConf returns the jobConf of this HadoopFlowProcess object. 114 * 115 * @return the jobConf (type JobConf) of this HadoopFlowProcess object. 116 */ 117 public JobConf getJobConf() 118 { 119 return jobConf; 120 } 121 122 @Override 123 public JobConf getConfigCopy() 124 { 125 return HadoopUtil.copyJobConf( jobConf ); 126 } 127 128 /** 129 * Method isMapper returns true if this part of the FlowProcess is a MapReduce mapper. If false, it is a reducer. 130 * 131 * @return boolean 132 */ 133 public boolean isMapper() 134 { 135 return isMapper; 136 } 137 138 public int getCurrentNumMappers() 139 { 140 return getJobConf().getNumMapTasks(); 141 } 142 143 public int getCurrentNumReducers() 144 { 145 return getJobConf().getNumReduceTasks(); 146 } 147 148 /** 149 * Method getCurrentTaskNum returns the task number of this task. Task 0 is the first task. 150 * 151 * @return int 152 */ 153 @Override 154 public int getCurrentSliceNum() 155 { 156 return getJobConf().getInt( "mapred.task.partition", 0 ); 157 } 158 159 @Override 160 public int getNumProcessSlices() 161 { 162 if( isMapper() ) 163 return getCurrentNumMappers(); 164 else 165 return getCurrentNumReducers(); 166 } 167 168 /** 169 * Method setReporter sets the reporter of this HadoopFlowProcess object. 170 * 171 * @param reporter the reporter of this HadoopFlowProcess object. 172 */ 173 public void setReporter( Reporter reporter ) 174 { 175 if( reporter == null ) 176 this.reporter = Reporter.NULL; 177 else 178 this.reporter = reporter; 179 } 180 181 /** 182 * Method getReporter returns the reporter of this HadoopFlowProcess object. 183 * 184 * @return the reporter (type Reporter) of this HadoopFlowProcess object. 185 */ 186 public Reporter getReporter() 187 { 188 return reporter; 189 } 190 191 public void setOutputCollector( OutputCollector outputCollector ) 192 { 193 this.outputCollector = outputCollector; 194 } 195 196 public OutputCollector getOutputCollector() 197 { 198 return outputCollector; 199 } 200 201 @Override 202 public Object getProperty( String key ) 203 { 204 return jobConf.get( key ); 205 } 206 207 @Override 208 public Collection<String> getPropertyKeys() 209 { 210 Set<String> keys = new HashSet<String>(); 211 212 for( Map.Entry<String, String> entry : jobConf ) 213 keys.add( entry.getKey() ); 214 215 return Collections.unmodifiableSet( keys ); 216 } 217 218 @Override 219 public Object newInstance( String className ) 220 { 221 if( className == null || className.isEmpty() ) 222 return null; 223 224 try 225 { 226 Class type = (Class) HadoopFlowProcess.class.getClassLoader().loadClass( className.toString() ); 227 228 return ReflectionUtils.newInstance( type, jobConf ); 229 } 230 catch( ClassNotFoundException exception ) 231 { 232 throw new CascadingException( "unable to load class: " + className.toString(), exception ); 233 } 234 } 235 236 @Override 237 public void keepAlive() 238 { 239 getReporter().progress(); 240 } 241 242 @Override 243 public void increment( Enum counter, long amount ) 244 { 245 getReporter().incrCounter( counter, amount ); 246 } 247 248 @Override 249 public void increment( String group, String counter, long amount ) 250 { 251 getReporter().incrCounter( group, counter, amount ); 252 } 253 254 @Override 255 public void setStatus( String status ) 256 { 257 getReporter().setStatus( status ); 258 } 259 260 @Override 261 public boolean isCounterStatusInitialized() 262 { 263 return getReporter() != null; 264 } 265 266 @Override 267 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 268 { 269 return tap.openForRead( this ); 270 } 271 272 @Override 273 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 274 { 275 return tap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks 276 } 277 278 @Override 279 public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException 280 { 281 JobConf jobConf = HadoopUtil.copyJobConf( getJobConf() ); 282 283 int stepNum = jobConf.getInt( "cascading.flow.step.num", 0 ); 284 String partname; 285 286 if( jobConf.getBoolean( "mapred.task.is.map", true ) ) 287 partname = String.format( "-m-%05d-", stepNum ); 288 else 289 partname = String.format( "-r-%05d-", stepNum ); 290 291 jobConf.set( "cascading.tapcollector.partname", "%s%spart" + partname + "%05d" ); 292 293 return trap.openForWrite( new HadoopFlowProcess( this, jobConf ), null ); // do not honor sinkmode as this may be opened across tasks 294 } 295 296 @Override 297 public TupleEntryCollector openSystemIntermediateForWrite() throws IOException 298 { 299 return new TupleEntryCollector( Fields.size( 2 ) ) 300 { 301 @Override 302 protected void collect( TupleEntry tupleEntry ) 303 { 304 try 305 { 306 getOutputCollector().collect( tupleEntry.getObject( 0 ), tupleEntry.getObject( 1 ) ); 307 } 308 catch( IOException exception ) 309 { 310 throw new CascadingException( "failed collecting key and value", exception ); 311 } 312 } 313 }; 314 } 315 316 @Override 317 public JobConf copyConfig( JobConf jobConf ) 318 { 319 return HadoopUtil.copyJobConf( jobConf ); 320 } 321 322 @Override 323 public Map<String, String> diffConfigIntoMap( JobConf defaultConfig, JobConf updatedConfig ) 324 { 325 return HadoopUtil.getConfig( defaultConfig, updatedConfig ); 326 } 327 328 @Override 329 public JobConf mergeMapIntoConfig( JobConf defaultConfig, Map<String, String> map ) 330 { 331 return HadoopUtil.mergeConf( defaultConfig, map, false ); 332 } 333 }