001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.io.IOException; 024 import java.util.Collections; 025 import java.util.HashMap; 026 import java.util.HashSet; 027 import java.util.Iterator; 028 import java.util.Map; 029 import java.util.Set; 030 031 import cascading.CascadingException; 032 import cascading.flow.FlowException; 033 import cascading.flow.FlowProcess; 034 import cascading.flow.hadoop.planner.HadoopFlowStepJob; 035 import cascading.flow.hadoop.util.HadoopUtil; 036 import cascading.flow.planner.BaseFlowStep; 037 import cascading.flow.planner.FlowStepJob; 038 import cascading.flow.planner.PlatformInfo; 039 import cascading.flow.planner.Scope; 040 import cascading.property.ConfigDef; 041 import cascading.tap.Tap; 042 import cascading.tap.hadoop.io.MultiInputFormat; 043 import cascading.tap.hadoop.util.Hadoop18TapUtil; 044 import cascading.tap.hadoop.util.TempHfs; 045 import cascading.tuple.Fields; 046 import cascading.tuple.Tuple; 047 import cascading.tuple.hadoop.TupleSerialization; 048 import cascading.tuple.hadoop.util.CoGroupingComparator; 049 import cascading.tuple.hadoop.util.CoGroupingPartitioner; 050 import cascading.tuple.hadoop.util.GroupingComparator; 051 import cascading.tuple.hadoop.util.GroupingPartitioner; 052 import cascading.tuple.hadoop.util.GroupingSortingComparator; 053 import cascading.tuple.hadoop.util.GroupingSortingPartitioner; 054 import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator; 055 import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator; 056 import cascading.tuple.hadoop.util.ReverseTupleComparator; 057 import cascading.tuple.hadoop.util.TupleComparator; 058 import cascading.tuple.io.IndexTuple; 059 import cascading.tuple.io.TuplePair; 060 import cascading.util.Util; 061 import cascading.util.Version; 062 import org.apache.hadoop.filecache.DistributedCache; 063 import org.apache.hadoop.fs.Path; 064 import org.apache.hadoop.mapred.FileOutputFormat; 065 import org.apache.hadoop.mapred.JobConf; 066 import org.apache.hadoop.mapred.OutputFormat; 067 068 import static cascading.flow.hadoop.util.HadoopUtil.serializeBase64; 069 import static cascading.flow.hadoop.util.HadoopUtil.writeStateToDistCache; 070 071 /** 072 * 073 */ 074 public class HadoopFlowStep extends BaseFlowStep<JobConf> 075 { 076 /** Field mapperTraps */ 077 private final Map<String, Tap> mapperTraps = new HashMap<String, Tap>(); 078 /** Field reducerTraps */ 079 private final Map<String, Tap> reducerTraps = new HashMap<String, Tap>(); 080 081 public HadoopFlowStep( String name, int stepNum ) 082 { 083 super( name, stepNum ); 084 } 085 086 public JobConf getInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig ) 087 { 088 JobConf conf = parentConfig == null ? new JobConf() : HadoopUtil.copyJobConf( parentConfig ); 089 090 // disable warning 091 conf.setBoolean( "mapred.used.genericoptionsparser", true ); 092 093 conf.setJobName( getStepDisplayName( conf.getInt( "cascading.step.display.id.truncate", Util.ID_LENGTH ) ) ); 094 095 conf.setOutputKeyClass( Tuple.class ); 096 conf.setOutputValueClass( Tuple.class ); 097 098 conf.setMapRunnerClass( FlowMapper.class ); 099 conf.setReducerClass( FlowReducer.class ); 100 101 // set for use by the shuffling phase 102 TupleSerialization.setSerializations( conf ); 103 104 initFromSources( flowProcess, conf ); 105 106 initFromSink( flowProcess, conf ); 107 108 initFromTraps( flowProcess, conf ); 109 110 initFromProcessConfigDef( conf ); 111 112 if( getSink().getScheme().getNumSinkParts() != 0 ) 113 { 114 // if no reducer, set num map tasks to control parts 115 if( getGroup() != null ) 116 conf.setNumReduceTasks( getSink().getScheme().getNumSinkParts() ); 117 else 118 conf.setNumMapTasks( getSink().getScheme().getNumSinkParts() ); 119 } 120 121 conf.setOutputKeyComparatorClass( TupleComparator.class ); 122 123 if( getGroup() == null ) 124 { 125 conf.setNumReduceTasks( 0 ); // disable reducers 126 } 127 else 128 { 129 // must set map output defaults when performing a reduce 130 conf.setMapOutputKeyClass( Tuple.class ); 131 conf.setMapOutputValueClass( Tuple.class ); 132 conf.setPartitionerClass( GroupingPartitioner.class ); 133 134 // handles the case the groupby sort should be reversed 135 if( getGroup().isSortReversed() ) 136 conf.setOutputKeyComparatorClass( ReverseTupleComparator.class ); 137 138 addComparators( conf, "cascading.group.comparator", getGroup().getKeySelectors() ); 139 140 if( getGroup().isGroupBy() ) 141 addComparators( conf, "cascading.sort.comparator", getGroup().getSortingSelectors() ); 142 143 if( !getGroup().isGroupBy() ) 144 { 145 conf.setPartitionerClass( CoGroupingPartitioner.class ); 146 conf.setMapOutputKeyClass( IndexTuple.class ); // allows groups to be sorted by index 147 conf.setMapOutputValueClass( IndexTuple.class ); 148 conf.setOutputKeyComparatorClass( IndexTupleCoGroupingComparator.class ); // sorts by group, then by index 149 conf.setOutputValueGroupingComparator( CoGroupingComparator.class ); 150 } 151 152 if( getGroup().isSorted() ) 153 { 154 conf.setPartitionerClass( GroupingSortingPartitioner.class ); 155 conf.setMapOutputKeyClass( TuplePair.class ); 156 157 if( getGroup().isSortReversed() ) 158 conf.setOutputKeyComparatorClass( ReverseGroupingSortingComparator.class ); 159 else 160 conf.setOutputKeyComparatorClass( GroupingSortingComparator.class ); 161 162 // no need to supply a reverse comparator, only equality is checked 163 conf.setOutputValueGroupingComparator( GroupingComparator.class ); 164 } 165 } 166 167 // perform last so init above will pass to tasks 168 String versionString = Version.getRelease(); 169 170 if( versionString != null ) 171 conf.set( "cascading.version", versionString ); 172 173 conf.set( CASCADING_FLOW_STEP_ID, getID() ); 174 conf.set( "cascading.flow.step.num", Integer.toString( getStepNum() ) ); 175 176 String stepState = pack( this, conf ); 177 178 // hadoop 20.2 doesn't like dist cache when using local mode 179 int maxSize = Short.MAX_VALUE; 180 if( isHadoopLocalMode( conf ) || stepState.length() < maxSize ) // seems safe 181 conf.set( "cascading.flow.step", stepState ); 182 else 183 conf.set( "cascading.flow.step.path", writeStateToDistCache( conf, getID(), stepState ) ); 184 185 return conf; 186 } 187 188 public boolean isHadoopLocalMode( JobConf conf ) 189 { 190 return HadoopUtil.isLocal( conf ); 191 } 192 193 private String pack( Object object, JobConf conf ) 194 { 195 try 196 { 197 return serializeBase64( object, conf, true ); 198 } 199 catch( IOException exception ) 200 { 201 throw new FlowException( "unable to pack object: " + object.getClass().getCanonicalName(), exception ); 202 } 203 } 204 205 protected FlowStepJob<JobConf> createFlowStepJob( FlowProcess<JobConf> flowProcess, JobConf parentConfig ) 206 { 207 try 208 { 209 JobConf initializedConfig = getInitializedConfig( flowProcess, parentConfig ); 210 211 setConf( initializedConfig ); 212 213 return new HadoopFlowStepJob( createClientState( flowProcess ), this, initializedConfig ); 214 } 215 catch( NoClassDefFoundError error ) 216 { 217 PlatformInfo platformInfo = HadoopUtil.getPlatformInfo(); 218 String message = "unable to load platform specific class, please verify Hadoop cluster version: '%s', matches the Hadoop platform build dependency and associated FlowConnector, cascading-hadoop or cascading-hadoop2-mr1"; 219 220 logError( String.format( message, platformInfo.toString() ), error ); 221 222 throw error; 223 } 224 } 225 226 /** 227 * Method clean removes any temporary files used by this FlowStep instance. It will log any IOExceptions thrown. 228 * 229 * @param config of type JobConf 230 */ 231 public void clean( JobConf config ) 232 { 233 String stepStatePath = config.get( "cascading.flow.step.path" ); 234 235 if( stepStatePath != null ) 236 { 237 try 238 { 239 HadoopUtil.removeStateFromDistCache( config, stepStatePath ); 240 } 241 catch( IOException exception ) 242 { 243 logWarn( "unable to remove step state file: " + stepStatePath, exception ); 244 } 245 } 246 247 if( tempSink != null ) 248 { 249 try 250 { 251 tempSink.deleteResource( config ); 252 } 253 catch( Exception exception ) 254 { 255 // sink all exceptions, don't fail app 256 logWarn( "unable to remove temporary file: " + tempSink, exception ); 257 } 258 } 259 260 if( getSink().isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) ) 261 { 262 try 263 { 264 getSink().deleteResource( config ); 265 } 266 catch( Exception exception ) 267 { 268 // sink all exceptions, don't fail app 269 logWarn( "unable to remove temporary file: " + getSink(), exception ); 270 } 271 } 272 else 273 { 274 cleanTapMetaData( config, getSink() ); 275 } 276 277 for( Tap tap : getMapperTraps().values() ) 278 cleanTapMetaData( config, tap ); 279 280 for( Tap tap : getReducerTraps().values() ) 281 cleanTapMetaData( config, tap ); 282 283 } 284 285 private void cleanTapMetaData( JobConf jobConf, Tap tap ) 286 { 287 try 288 { 289 Hadoop18TapUtil.cleanupTapMetaData( jobConf, tap ); 290 } 291 catch( IOException exception ) 292 { 293 // ignore exception 294 } 295 } 296 297 private void addComparators( JobConf conf, String property, Map<String, Fields> map ) 298 { 299 Iterator<Fields> fieldsIterator = map.values().iterator(); 300 301 if( !fieldsIterator.hasNext() ) 302 return; 303 304 Fields fields = fieldsIterator.next(); 305 306 if( fields.hasComparators() ) 307 { 308 conf.set( property, pack( fields, conf ) ); 309 return; 310 } 311 312 // use resolved fields if there are no comparators. 313 Set<Scope> previousScopes = getPreviousScopes( getGroup() ); 314 315 fields = previousScopes.iterator().next().getOutValuesFields(); 316 317 if( fields.size() != 0 ) // allows fields.UNKNOWN to be used 318 conf.setInt( property + ".size", fields.size() ); 319 } 320 321 private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps ) 322 { 323 if( !traps.isEmpty() ) 324 { 325 JobConf trapConf = HadoopUtil.copyJobConf( conf ); 326 327 for( Tap tap : traps.values() ) 328 tap.sinkConfInit( flowProcess, trapConf ); 329 } 330 } 331 332 protected void initFromSources( FlowProcess<JobConf> flowProcess, JobConf conf ) 333 { 334 // handles case where same tap is used on multiple branches 335 // we do not want to init the same tap multiple times 336 Set<Tap> uniqueSources = getUniqueStreamedSources(); 337 338 JobConf[] streamedJobs = new JobConf[ uniqueSources.size() ]; 339 int i = 0; 340 341 for( Tap tap : uniqueSources ) 342 { 343 if( tap.getIdentifier() == null ) 344 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() ); 345 346 streamedJobs[ i ] = flowProcess.copyConfig( conf ); 347 348 streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) ); 349 350 tap.sourceConfInit( flowProcess, streamedJobs[ i ] ); 351 352 i++; 353 } 354 355 Set<Tap> accumulatedSources = getAllAccumulatedSources(); 356 357 for( Tap tap : accumulatedSources ) 358 { 359 JobConf accumulatedJob = flowProcess.copyConfig( conf ); 360 361 tap.sourceConfInit( flowProcess, accumulatedJob ); 362 363 Map<String, String> map = flowProcess.diffConfigIntoMap( conf, accumulatedJob ); 364 conf.set( "cascading.step.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) ); 365 366 try 367 { 368 if( DistributedCache.getCacheFiles( accumulatedJob ) != null ) 369 DistributedCache.setCacheFiles( DistributedCache.getCacheFiles( accumulatedJob ), conf ); 370 } 371 catch( IOException exception ) 372 { 373 throw new CascadingException( exception ); 374 } 375 } 376 377 MultiInputFormat.addInputFormat( conf, streamedJobs ); //must come last 378 } 379 380 public Tap getTapForID( Set<Tap> taps, String id ) 381 { 382 for( Tap tap : taps ) 383 { 384 if( Tap.id( tap ).equals( id ) ) 385 return tap; 386 } 387 388 return null; 389 } 390 391 private void initFromProcessConfigDef( final JobConf conf ) 392 { 393 initConfFromProcessConfigDef( getSetterFor( conf ) ); 394 } 395 396 private ConfigDef.Setter getSetterFor( final JobConf conf ) 397 { 398 return new ConfigDef.Setter() 399 { 400 @Override 401 public String set( String key, String value ) 402 { 403 String oldValue = get( key ); 404 405 conf.set( key, value ); 406 407 return oldValue; 408 } 409 410 @Override 411 public String update( String key, String value ) 412 { 413 String oldValue = get( key ); 414 415 if( oldValue == null ) 416 conf.set( key, value ); 417 else if( !oldValue.contains( value ) ) 418 conf.set( key, oldValue + "," + value ); 419 420 return oldValue; 421 } 422 423 @Override 424 public String get( String key ) 425 { 426 String value = conf.get( key ); 427 428 if( value == null || value.isEmpty() ) 429 return null; 430 431 return value; 432 } 433 }; 434 } 435 436 /** 437 * sources are specific to step, remove all known accumulated sources, if any 438 * 439 * @return 440 */ 441 private Set<Tap> getUniqueStreamedSources() 442 { 443 HashSet<Tap> set = new HashSet<Tap>( sources.keySet() ); 444 445 set.removeAll( getAllAccumulatedSources() ); 446 447 return set; 448 } 449 450 protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf ) 451 { 452 // init sink first so tempSink can take precedence 453 if( getSink() != null ) 454 getSink().sinkConfInit( flowProcess, conf ); 455 456 Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class ); 457 boolean isFileOutputFormat = false; 458 459 if( outputFormat != null ) 460 isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat ); 461 462 Path outputPath = FileOutputFormat.getOutputPath( conf ); 463 464 // if no output path is set, we need to substitute an alternative if the OutputFormat is file based 465 // PartitionTap won't set the output, but will set an OutputFormat 466 // MultiSinkTap won't set the output or set the OutputFormat 467 // Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..) 468 if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) ) 469 tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true ); 470 471 // tempSink exists because sink is writeDirect 472 if( tempSink != null ) 473 tempSink.sinkConfInit( flowProcess, conf ); 474 } 475 476 protected void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf ) 477 { 478 initFromTraps( flowProcess, conf, getMapperTraps() ); 479 initFromTraps( flowProcess, conf, getReducerTraps() ); 480 } 481 482 @Override 483 public Set<Tap> getTraps() 484 { 485 Set<Tap> set = new HashSet<Tap>(); 486 487 set.addAll( mapperTraps.values() ); 488 set.addAll( reducerTraps.values() ); 489 490 return Collections.unmodifiableSet( set ); 491 } 492 493 @Override 494 public Tap getTrap( String name ) 495 { 496 Tap trap = getMapperTrap( name ); 497 498 if( trap == null ) 499 trap = getReducerTrap( name ); 500 501 return trap; 502 } 503 504 public Map<String, Tap> getMapperTraps() 505 { 506 return mapperTraps; 507 } 508 509 public Map<String, Tap> getReducerTraps() 510 { 511 return reducerTraps; 512 } 513 514 public Tap getMapperTrap( String name ) 515 { 516 return getMapperTraps().get( name ); 517 } 518 519 public Tap getReducerTrap( String name ) 520 { 521 return getReducerTraps().get( name ); 522 } 523 }