001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.io.IOException; 024 import java.util.Collections; 025 import java.util.HashMap; 026 import java.util.HashSet; 027 import java.util.Iterator; 028 import java.util.Map; 029 import java.util.Set; 030 031 import cascading.CascadingException; 032 import cascading.flow.FlowException; 033 import cascading.flow.FlowProcess; 034 import cascading.flow.hadoop.planner.HadoopFlowStepJob; 035 import cascading.flow.hadoop.util.HadoopUtil; 036 import cascading.flow.planner.BaseFlowStep; 037 import cascading.flow.planner.FlowStepJob; 038 import cascading.flow.planner.PlatformInfo; 039 import cascading.flow.planner.Scope; 040 import cascading.property.ConfigDef; 041 import cascading.tap.Tap; 042 import cascading.tap.hadoop.io.MultiInputFormat; 043 import cascading.tap.hadoop.util.Hadoop18TapUtil; 044 import cascading.tap.hadoop.util.TempHfs; 045 import cascading.tuple.Fields; 046 import cascading.tuple.Tuple; 047 import cascading.tuple.hadoop.TupleSerialization; 048 import cascading.tuple.hadoop.util.CoGroupingComparator; 049 import cascading.tuple.hadoop.util.CoGroupingPartitioner; 050 import cascading.tuple.hadoop.util.GroupingComparator; 051 import cascading.tuple.hadoop.util.GroupingPartitioner; 052 import cascading.tuple.hadoop.util.GroupingSortingComparator; 053 import cascading.tuple.hadoop.util.GroupingSortingPartitioner; 054 import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator; 055 import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator; 056 import cascading.tuple.hadoop.util.ReverseTupleComparator; 057 import cascading.tuple.hadoop.util.TupleComparator; 058 import cascading.tuple.io.IndexTuple; 059 import cascading.tuple.io.TuplePair; 060 import cascading.util.Util; 061 import cascading.util.Version; 062 import org.apache.hadoop.filecache.DistributedCache; 063 import org.apache.hadoop.fs.Path; 064 import org.apache.hadoop.mapred.FileOutputFormat; 065 import org.apache.hadoop.mapred.JobConf; 066 067 import static cascading.flow.hadoop.util.HadoopUtil.serializeBase64; 068 import static cascading.flow.hadoop.util.HadoopUtil.writeStateToDistCache; 069 070 /** 071 * 072 */ 073 public class HadoopFlowStep extends BaseFlowStep<JobConf> 074 { 075 /** Field mapperTraps */ 076 private final Map<String, Tap> mapperTraps = new HashMap<String, Tap>(); 077 /** Field reducerTraps */ 078 private final Map<String, Tap> reducerTraps = new HashMap<String, Tap>(); 079 080 public HadoopFlowStep( String name, int stepNum ) 081 { 082 super( name, stepNum ); 083 } 084 085 public JobConf getInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig ) 086 { 087 JobConf conf = parentConfig == null ? new JobConf() : HadoopUtil.copyJobConf( parentConfig ); 088 089 // disable warning 090 conf.setBoolean( "mapred.used.genericoptionsparser", true ); 091 092 conf.setJobName( getStepDisplayName( conf.getInt( "cascading.step.display.id.truncate", Util.ID_LENGTH ) ) ); 093 094 conf.setOutputKeyClass( Tuple.class ); 095 conf.setOutputValueClass( Tuple.class ); 096 097 conf.setMapRunnerClass( FlowMapper.class ); 098 conf.setReducerClass( FlowReducer.class ); 099 100 // set for use by the shuffling phase 101 TupleSerialization.setSerializations( conf ); 102 103 initFromSources( flowProcess, conf ); 104 105 initFromSink( flowProcess, conf ); 106 107 initFromTraps( flowProcess, conf ); 108 109 initFromProcessConfigDef( conf ); 110 111 if( getSink().getScheme().getNumSinkParts() != 0 ) 112 { 113 // if no reducer, set num map tasks to control parts 114 if( getGroup() != null ) 115 conf.setNumReduceTasks( getSink().getScheme().getNumSinkParts() ); 116 else 117 conf.setNumMapTasks( getSink().getScheme().getNumSinkParts() ); 118 } 119 120 conf.setOutputKeyComparatorClass( TupleComparator.class ); 121 122 if( getGroup() == null ) 123 { 124 conf.setNumReduceTasks( 0 ); // disable reducers 125 } 126 else 127 { 128 // must set map output defaults when performing a reduce 129 conf.setMapOutputKeyClass( Tuple.class ); 130 conf.setMapOutputValueClass( Tuple.class ); 131 conf.setPartitionerClass( GroupingPartitioner.class ); 132 133 // handles the case the groupby sort should be reversed 134 if( getGroup().isSortReversed() ) 135 conf.setOutputKeyComparatorClass( ReverseTupleComparator.class ); 136 137 addComparators( conf, "cascading.group.comparator", getGroup().getKeySelectors() ); 138 139 if( getGroup().isGroupBy() ) 140 addComparators( conf, "cascading.sort.comparator", getGroup().getSortingSelectors() ); 141 142 if( !getGroup().isGroupBy() ) 143 { 144 conf.setPartitionerClass( CoGroupingPartitioner.class ); 145 conf.setMapOutputKeyClass( IndexTuple.class ); // allows groups to be sorted by index 146 conf.setMapOutputValueClass( IndexTuple.class ); 147 conf.setOutputKeyComparatorClass( IndexTupleCoGroupingComparator.class ); // sorts by group, then by index 148 conf.setOutputValueGroupingComparator( CoGroupingComparator.class ); 149 } 150 151 if( getGroup().isSorted() ) 152 { 153 conf.setPartitionerClass( GroupingSortingPartitioner.class ); 154 conf.setMapOutputKeyClass( TuplePair.class ); 155 156 if( getGroup().isSortReversed() ) 157 conf.setOutputKeyComparatorClass( ReverseGroupingSortingComparator.class ); 158 else 159 conf.setOutputKeyComparatorClass( GroupingSortingComparator.class ); 160 161 // no need to supply a reverse comparator, only equality is checked 162 conf.setOutputValueGroupingComparator( GroupingComparator.class ); 163 } 164 } 165 166 // perform last so init above will pass to tasks 167 String versionString = Version.getRelease(); 168 169 if( versionString != null ) 170 conf.set( "cascading.version", versionString ); 171 172 conf.set( CASCADING_FLOW_STEP_ID, getID() ); 173 conf.set( "cascading.flow.step.num", Integer.toString( getStepNum() ) ); 174 175 String stepState = pack( this, conf ); 176 177 // hadoop 20.2 doesn't like dist cache when using local mode 178 int maxSize = Short.MAX_VALUE; 179 if( isHadoopLocalMode( conf ) || stepState.length() < maxSize ) // seems safe 180 conf.set( "cascading.flow.step", stepState ); 181 else 182 conf.set( "cascading.flow.step.path", writeStateToDistCache( conf, getID(), stepState ) ); 183 184 return conf; 185 } 186 187 public boolean isHadoopLocalMode( JobConf conf ) 188 { 189 return HadoopUtil.isLocal( conf ); 190 } 191 192 private String pack( Object object, JobConf conf ) 193 { 194 try 195 { 196 return serializeBase64( object, conf, true ); 197 } 198 catch( IOException exception ) 199 { 200 throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception ); 201 } 202 } 203 204 protected FlowStepJob<JobConf> createFlowStepJob( FlowProcess<JobConf> flowProcess, JobConf parentConfig ) 205 { 206 try 207 { 208 JobConf initializedConfig = getInitializedConfig( flowProcess, parentConfig ); 209 210 setConf( initializedConfig ); 211 212 return new HadoopFlowStepJob( createClientState( flowProcess ), this, initializedConfig ); 213 } 214 catch( NoClassDefFoundError error ) 215 { 216 PlatformInfo platformInfo = HadoopUtil.getPlatformInfo(); 217 String message = "unable to load platform specific class, please verify Hadoop cluster version: '%s', matches the Hadoop platform build dependency and associated FlowConnector, cascading-hadoop or cascading-hadoop2-mr1"; 218 219 logError( String.format( message, platformInfo.toString() ), error ); 220 221 throw error; 222 } 223 } 224 225 /** 226 * Method clean removes any temporary files used by this FlowStep instance. It will log any IOExceptions thrown. 227 * 228 * @param config of type JobConf 229 */ 230 public void clean( JobConf config ) 231 { 232 String stepStatePath = config.get( "cascading.flow.step.path" ); 233 234 if( stepStatePath != null ) 235 { 236 try 237 { 238 HadoopUtil.removeStateFromDistCache( config, stepStatePath ); 239 } 240 catch( IOException exception ) 241 { 242 logWarn( "unable to remove step state file: " + stepStatePath, exception ); 243 } 244 } 245 246 if( tempSink != null ) 247 { 248 try 249 { 250 tempSink.deleteResource( config ); 251 } 252 catch( Exception exception ) 253 { 254 // sink all exceptions, don't fail app 255 logWarn( "unable to remove temporary file: " + tempSink, exception ); 256 } 257 } 258 259 if( getSink().isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) ) 260 { 261 try 262 { 263 getSink().deleteResource( config ); 264 } 265 catch( Exception exception ) 266 { 267 // sink all exceptions, don't fail app 268 logWarn( "unable to remove temporary file: " + getSink(), exception ); 269 } 270 } 271 else 272 { 273 cleanTapMetaData( config, getSink() ); 274 } 275 276 for( Tap tap : getMapperTraps().values() ) 277 cleanTapMetaData( config, tap ); 278 279 for( Tap tap : getReducerTraps().values() ) 280 cleanTapMetaData( config, tap ); 281 282 } 283 284 private void cleanTapMetaData( JobConf jobConf, Tap tap ) 285 { 286 try 287 { 288 Hadoop18TapUtil.cleanupTapMetaData( jobConf, tap ); 289 } 290 catch( IOException exception ) 291 { 292 // ignore exception 293 } 294 } 295 296 private void addComparators( JobConf conf, String property, Map<String, Fields> map ) 297 { 298 Iterator<Fields> fieldsIterator = map.values().iterator(); 299 300 if( !fieldsIterator.hasNext() ) 301 return; 302 303 Fields fields = fieldsIterator.next(); 304 305 if( fields.hasComparators() ) 306 { 307 conf.set( property, pack( fields, conf ) ); 308 return; 309 } 310 311 // use resolved fields if there are no comparators. 312 Set<Scope> previousScopes = getPreviousScopes( getGroup() ); 313 314 fields = previousScopes.iterator().next().getOutValuesFields(); 315 316 if( fields.size() != 0 ) // allows fields.UNKNOWN to be used 317 conf.setInt( property + ".size", fields.size() ); 318 } 319 320 private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps ) 321 { 322 if( !traps.isEmpty() ) 323 { 324 JobConf trapConf = HadoopUtil.copyJobConf( conf ); 325 326 for( Tap tap : traps.values() ) 327 tap.sinkConfInit( flowProcess, trapConf ); 328 } 329 } 330 331 protected void initFromSources( FlowProcess<JobConf> flowProcess, JobConf conf ) 332 { 333 // handles case where same tap is used on multiple branches 334 // we do not want to init the same tap multiple times 335 Set<Tap> uniqueSources = getUniqueStreamedSources(); 336 337 JobConf[] streamedJobs = new JobConf[ uniqueSources.size() ]; 338 int i = 0; 339 340 for( Tap tap : uniqueSources ) 341 { 342 if( tap.getIdentifier() == null ) 343 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() ); 344 345 streamedJobs[ i ] = flowProcess.copyConfig( conf ); 346 347 streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) ); 348 349 tap.sourceConfInit( flowProcess, streamedJobs[ i ] ); 350 351 i++; 352 } 353 354 Set<Tap> accumulatedSources = getAllAccumulatedSources(); 355 356 for( Tap tap : accumulatedSources ) 357 { 358 JobConf accumulatedJob = flowProcess.copyConfig( conf ); 359 360 tap.sourceConfInit( flowProcess, accumulatedJob ); 361 362 Map<String, String> map = flowProcess.diffConfigIntoMap( conf, accumulatedJob ); 363 conf.set( "cascading.step.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) ); 364 365 try 366 { 367 if( DistributedCache.getCacheFiles( accumulatedJob ) != null ) 368 DistributedCache.setCacheFiles( DistributedCache.getCacheFiles( accumulatedJob ), conf ); 369 } 370 catch( IOException exception ) 371 { 372 throw new CascadingException( exception ); 373 } 374 } 375 376 MultiInputFormat.addInputFormat( conf, streamedJobs ); //must come last 377 } 378 379 public Tap getTapForID( Set<Tap> taps, String id ) 380 { 381 for( Tap tap : taps ) 382 { 383 if( Tap.id( tap ).equals( id ) ) 384 return tap; 385 } 386 387 return null; 388 } 389 390 private void initFromProcessConfigDef( final JobConf conf ) 391 { 392 initConfFromProcessConfigDef( getSetterFor( conf ) ); 393 } 394 395 private ConfigDef.Setter getSetterFor( final JobConf conf ) 396 { 397 return new ConfigDef.Setter() 398 { 399 @Override 400 public String set( String key, String value ) 401 { 402 String oldValue = get( key ); 403 404 conf.set( key, value ); 405 406 return oldValue; 407 } 408 409 @Override 410 public String update( String key, String value ) 411 { 412 String oldValue = get( key ); 413 414 if( oldValue == null ) 415 conf.set( key, value ); 416 else if( !oldValue.contains( value ) ) 417 conf.set( key, oldValue + "," + value ); 418 419 return oldValue; 420 } 421 422 @Override 423 public String get( String key ) 424 { 425 String value = conf.get( key ); 426 427 if( value == null || value.isEmpty() ) 428 return null; 429 430 return value; 431 } 432 }; 433 } 434 435 /** 436 * sources are specific to step, remove all known accumulated sources, if any 437 * 438 * @return 439 */ 440 private Set<Tap> getUniqueStreamedSources() 441 { 442 HashSet<Tap> set = new HashSet<Tap>( sources.keySet() ); 443 444 set.removeAll( getAllAccumulatedSources() ); 445 446 return set; 447 } 448 449 protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf ) 450 { 451 // init sink first so tempSink can take precedence 452 if( getSink() != null ) 453 getSink().sinkConfInit( flowProcess, conf ); 454 455 if( FileOutputFormat.getOutputPath( conf ) == null ) 456 tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true ); 457 458 // tempSink exists because sink is writeDirect 459 if( tempSink != null ) 460 tempSink.sinkConfInit( flowProcess, conf ); 461 } 462 463 protected void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf ) 464 { 465 initFromTraps( flowProcess, conf, getMapperTraps() ); 466 initFromTraps( flowProcess, conf, getReducerTraps() ); 467 } 468 469 @Override 470 public Set<Tap> getTraps() 471 { 472 Set<Tap> set = new HashSet<Tap>(); 473 474 set.addAll( mapperTraps.values() ); 475 set.addAll( reducerTraps.values() ); 476 477 return Collections.unmodifiableSet( set ); 478 } 479 480 @Override 481 public Tap getTrap( String name ) 482 { 483 Tap trap = getMapperTrap( name ); 484 485 if( trap == null ) 486 trap = getReducerTrap( name ); 487 488 return trap; 489 } 490 491 public Map<String, Tap> getMapperTraps() 492 { 493 return mapperTraps; 494 } 495 496 public Map<String, Tap> getReducerTraps() 497 { 498 return reducerTraps; 499 } 500 501 public Tap getMapperTrap( String name ) 502 { 503 return getMapperTraps().get( name ); 504 } 505 506 public Tap getReducerTrap( String name ) 507 { 508 return getReducerTraps().get( name ); 509 } 510 }