001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.hadoop; 023 024import java.io.IOException; 025import java.util.Arrays; 026import java.util.HashSet; 027import java.util.Iterator; 028import java.util.Map; 029import java.util.Set; 030 031import cascading.CascadingException; 032import cascading.flow.FlowElement; 033import cascading.flow.FlowException; 034import cascading.flow.FlowNode; 035import cascading.flow.FlowProcess; 036import cascading.flow.FlowRuntimeProps; 037import cascading.flow.hadoop.planner.HadoopFlowStepJob; 038import cascading.flow.hadoop.util.HadoopMRUtil; 039import cascading.flow.hadoop.util.HadoopUtil; 040import cascading.flow.planner.BaseFlowStep; 041import cascading.flow.planner.FlowStepJob; 042import cascading.flow.planner.PlatformInfo; 043import cascading.flow.planner.graph.ElementGraph; 044import cascading.flow.planner.process.FlowNodeGraph; 045import cascading.flow.planner.process.ProcessEdge; 046import cascading.management.state.ClientState; 047import cascading.pipe.CoGroup; 048import cascading.tap.Tap; 049import cascading.tap.hadoop.io.MultiInputFormat; 050import cascading.tap.hadoop.util.Hadoop18TapUtil; 051import cascading.tap.hadoop.util.TempHfs; 052import cascading.tuple.Fields; 053import cascading.tuple.hadoop.TupleSerialization; 054import cascading.tuple.hadoop.util.CoGroupingComparator; 055import cascading.tuple.hadoop.util.CoGroupingPartitioner; 056import cascading.tuple.hadoop.util.GroupingComparator; 057import cascading.tuple.hadoop.util.GroupingPartitioner; 058import cascading.tuple.hadoop.util.GroupingSortingComparator; 059import cascading.tuple.hadoop.util.GroupingSortingPartitioner; 060import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator; 061import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator; 062import cascading.tuple.hadoop.util.ReverseTupleComparator; 063import cascading.tuple.hadoop.util.TupleComparator; 064import cascading.tuple.io.KeyIndexTuple; 065import cascading.tuple.io.KeyTuple; 066import cascading.tuple.io.TuplePair; 067import cascading.tuple.io.ValueIndexTuple; 068import cascading.tuple.io.ValueTuple; 069import cascading.util.ProcessLogger; 070import cascading.util.Util; 071import cascading.util.Version; 072import org.apache.hadoop.filecache.DistributedCache; 073import org.apache.hadoop.fs.Path; 074import org.apache.hadoop.mapred.FileOutputFormat; 075import org.apache.hadoop.mapred.JobConf; 076import org.apache.hadoop.mapred.OutputFormat; 077 078import static cascading.flow.hadoop.util.HadoopUtil.*; 079 080/** 081 * 082 */ 083public class HadoopFlowStep extends BaseFlowStep<JobConf> 084 { 085 protected HadoopFlowStep() 086 { 087 } 088 089 protected HadoopFlowStep( String name, int ordinal ) 090 { 091 super( name, ordinal ); 092 } 093 094 public HadoopFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph ) 095 { 096 super( elementGraph, flowNodeGraph ); 097 } 098 099 @Override 100 public Map<Object, Object> getConfigAsProperties() 101 { 102 return HadoopUtil.createProperties( getConfig() ); 103 } 104 105 public JobConf createInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig ) 106 { 107 JobConf conf = parentConfig == null ? new JobConf() : HadoopUtil.copyJobConf( parentConfig ); 108 109 // disable warning 110 conf.setBoolean( "mapred.used.genericoptionsparser", true ); 111 112 conf.setJobName( getStepDisplayName( conf.getInt( "cascading.display.id.truncate", Util.ID_LENGTH ) ) ); 113 114 conf.setOutputKeyClass( KeyTuple.class ); 115 conf.setOutputValueClass( ValueTuple.class ); 116 117 conf.setMapRunnerClass( FlowMapper.class ); 118 conf.setReducerClass( FlowReducer.class ); 119 120 // set for use by the shuffling phase 121 TupleSerialization.setSerializations( conf ); 122 123 initFromSources( flowProcess, conf ); 124 125 initFromSink( flowProcess, conf ); 126 127 initFromTraps( flowProcess, conf ); 128 129 initFromStepConfigDef( conf ); 130 131 int numSinkParts = getSink().getScheme().getNumSinkParts(); 132 133 if( numSinkParts != 0 ) 134 { 135 // if no reducer, set num map tasks to control parts 136 if( getGroup() != null ) 137 conf.setNumReduceTasks( numSinkParts ); 138 else 139 conf.setNumMapTasks( numSinkParts ); 140 } 141 else if( getGroup() != null ) 142 { 143 int gatherPartitions = conf.getNumReduceTasks(); 144 145 if( gatherPartitions == 0 ) 146 gatherPartitions = conf.getInt( FlowRuntimeProps.GATHER_PARTITIONS, 0 ); 147 148 if( gatherPartitions == 0 ) 149 throw new FlowException( getName(), "a default number of gather partitions must be set, see FlowRuntimeProps" ); 150 151 conf.setNumReduceTasks( gatherPartitions ); 152 } 153 154 conf.setOutputKeyComparatorClass( TupleComparator.class ); 155 156 ProcessEdge processEdge = Util.getFirst( getFlowNodeGraph().edgeSet() ); 157 158 if( getGroup() == null ) 159 { 160 conf.setNumReduceTasks( 0 ); // disable reducers 161 } 162 else 163 { 164 // must set map output defaults when performing a reduce 165 conf.setMapOutputKeyClass( KeyTuple.class ); 166 conf.setMapOutputValueClass( ValueTuple.class ); 167 conf.setPartitionerClass( GroupingPartitioner.class ); 168 169 // handles the case the groupby sort should be reversed 170 if( getGroup().isSortReversed() ) 171 conf.setOutputKeyComparatorClass( ReverseTupleComparator.class ); 172 173 Integer ordinal = (Integer) Util.getFirst( processEdge.getSinkExpectedOrdinals() ); 174 175 addComparators( conf, "cascading.group.comparator", getGroup().getKeySelectors(), (Fields) processEdge.getResolvedKeyFields().get( ordinal ) ); 176 177 if( getGroup().isGroupBy() ) 178 addComparators( conf, "cascading.sort.comparator", getGroup().getSortingSelectors(), (Fields) processEdge.getResolvedSortFields().get( ordinal ) ); 179 180 if( !getGroup().isGroupBy() ) 181 { 182 conf.setPartitionerClass( CoGroupingPartitioner.class ); 183 conf.setMapOutputKeyClass( KeyIndexTuple.class ); // allows groups to be sorted by index 184 conf.setMapOutputValueClass( ValueIndexTuple.class ); 185 conf.setOutputKeyComparatorClass( IndexTupleCoGroupingComparator.class ); // sorts by group, then by index 186 conf.setOutputValueGroupingComparator( CoGroupingComparator.class ); 187 } 188 189 if( getGroup().isSorted() ) 190 { 191 conf.setPartitionerClass( GroupingSortingPartitioner.class ); 192 conf.setMapOutputKeyClass( TuplePair.class ); 193 194 if( getGroup().isSortReversed() ) 195 conf.setOutputKeyComparatorClass( ReverseGroupingSortingComparator.class ); 196 else 197 conf.setOutputKeyComparatorClass( GroupingSortingComparator.class ); 198 199 // no need to supply a reverse comparator, only equality is checked 200 conf.setOutputValueGroupingComparator( GroupingComparator.class ); 201 } 202 } 203 204 // if we write type information into the stream, we can perform comparisons in indexed tuples 205 // thus, if the edge is a CoGroup and they keys are not common types, force writing of type information 206 if( processEdge != null && ifCoGroupAndKeysHaveCommonTypes( this, processEdge.getFlowElement(), processEdge.getResolvedKeyFields() ) ) 207 { 208 conf.set( "cascading.node.ordinals", Util.join( processEdge.getSinkExpectedOrdinals(), "," ) ); 209 addFields( conf, "cascading.node.key.fields", processEdge.getResolvedKeyFields() ); 210 addFields( conf, "cascading.node.sort.fields", processEdge.getResolvedSortFields() ); 211 addFields( conf, "cascading.node.value.fields", processEdge.getResolvedValueFields() ); 212 } 213 214 // perform last so init above will pass to tasks 215 String versionString = Version.getRelease(); 216 217 if( versionString != null ) 218 conf.set( "cascading.version", versionString ); 219 220 conf.set( CASCADING_FLOW_STEP_ID, getID() ); 221 conf.set( "cascading.flow.step.num", Integer.toString( getOrdinal() ) ); 222 223 HadoopUtil.setIsInflow( conf ); 224 225 Iterator<FlowNode> iterator = getFlowNodeGraph().getTopologicalIterator(); 226 227 FlowNode mapperNode = iterator.next(); 228 FlowNode reducerNode = iterator.hasNext() ? iterator.next() : null; 229 230 if( reducerNode != null ) 231 reducerNode.addProcessAnnotation( FlowRuntimeProps.GATHER_PARTITIONS, Integer.toString( conf.getNumReduceTasks() ) ); 232 233 String mapState = pack( mapperNode, conf ); 234 String reduceState = pack( reducerNode, conf ); 235 236 // hadoop 20.2 doesn't like dist cache when using local mode 237 int maxSize = Short.MAX_VALUE; 238 239 int length = mapState.length() + reduceState.length(); 240 241 if( isHadoopLocalMode( conf ) || length < maxSize ) // seems safe 242 { 243 conf.set( "cascading.flow.step.node.map", mapState ); 244 245 if( !Util.isEmpty( reduceState ) ) 246 conf.set( "cascading.flow.step.node.reduce", reduceState ); 247 } 248 else 249 { 250 conf.set( "cascading.flow.step.node.map.path", HadoopMRUtil.writeStateToDistCache( conf, getID(), "map", mapState ) ); 251 252 if( !Util.isEmpty( reduceState ) ) 253 conf.set( "cascading.flow.step.node.reduce.path", HadoopMRUtil.writeStateToDistCache( conf, getID(), "reduce", reduceState ) ); 254 } 255 256 return conf; 257 } 258 259 private static boolean ifCoGroupAndKeysHaveCommonTypes( ProcessLogger processLogger, FlowElement flowElement, Map<Integer, Fields> resolvedKeyFields ) 260 { 261 if( !( flowElement instanceof CoGroup ) ) 262 return true; 263 264 if( resolvedKeyFields == null || resolvedKeyFields.size() < 2 ) 265 return true; 266 267 Iterator<Map.Entry<Integer, Fields>> iterator = resolvedKeyFields.entrySet().iterator(); 268 269 Fields fields = iterator.next().getValue(); 270 271 while( iterator.hasNext() ) 272 { 273 Fields next = iterator.next().getValue(); 274 275 if( !Arrays.equals( fields.getTypesClasses(), next.getTypesClasses() ) ) 276 { 277 processLogger.logWarn( "unable to perform: {}, on mismatched join types and optimize serialization with type exclusion, fields: {} & {}", flowElement, fields, next ); 278 return false; 279 } 280 } 281 282 return true; 283 } 284 285 public boolean isHadoopLocalMode( JobConf conf ) 286 { 287 return HadoopUtil.isLocal( conf ); 288 } 289 290 protected FlowStepJob<JobConf> createFlowStepJob( ClientState clientState, FlowProcess<JobConf> flowProcess, JobConf initializedStepConfig ) 291 { 292 try 293 { 294 return new HadoopFlowStepJob( clientState, this, initializedStepConfig ); 295 } 296 catch( NoClassDefFoundError error ) 297 { 298 PlatformInfo platformInfo = HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ); 299 String message = "unable to load platform specific class, please verify Hadoop cluster version: '%s', matches the Hadoop platform build dependency and associated FlowConnector, cascading-hadoop or cascading-hadoop2-mr1"; 300 301 logError( String.format( message, platformInfo.toString() ), error ); 302 303 throw error; 304 } 305 } 306 307 /** 308 * Method clean removes any temporary files used by this FlowStep instance. It will log any IOExceptions thrown. 309 * 310 * @param config of type JobConf 311 */ 312 public void clean( JobConf config ) 313 { 314 String stepStatePath = config.get( "cascading.flow.step.path" ); 315 316 if( stepStatePath != null ) 317 { 318 try 319 { 320 HadoopUtil.removeStateFromDistCache( config, stepStatePath ); 321 } 322 catch( IOException exception ) 323 { 324 logWarn( "unable to remove step state file: " + stepStatePath, exception ); 325 } 326 } 327 328 if( tempSink != null ) 329 { 330 try 331 { 332 tempSink.deleteResource( config ); 333 } 334 catch( Exception exception ) 335 { 336 // sink all exceptions, don't fail app 337 logWarn( "unable to remove temporary file: " + tempSink, exception ); 338 } 339 } 340 341 // safe way to handle zero sinks case 342 for( Tap sink : getSinkTaps() ) 343 cleanIntermediateData( config, sink ); 344 345 for( Tap tap : getTraps() ) 346 cleanTapMetaData( config, tap ); 347 } 348 349 protected void cleanIntermediateData( JobConf config, Tap sink ) 350 { 351 if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) ) 352 { 353 try 354 { 355 sink.deleteResource( config ); 356 } 357 catch( Exception exception ) 358 { 359 // sink all exceptions, don't fail app 360 logWarn( "unable to remove temporary file: " + sink, exception ); 361 } 362 } 363 else 364 { 365 cleanTapMetaData( config, sink ); 366 } 367 } 368 369 private void cleanTapMetaData( JobConf jobConf, Tap tap ) 370 { 371 try 372 { 373 Hadoop18TapUtil.cleanupTapMetaData( jobConf, tap ); 374 } 375 catch( IOException exception ) 376 { 377 // ignore exception 378 } 379 } 380 381 private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps ) 382 { 383 if( !traps.isEmpty() ) 384 { 385 JobConf trapConf = HadoopUtil.copyJobConf( conf ); 386 387 for( Tap tap : traps.values() ) 388 tap.sinkConfInit( flowProcess, trapConf ); 389 } 390 } 391 392 protected void initFromSources( FlowProcess<JobConf> flowProcess, JobConf conf ) 393 { 394 // handles case where same tap is used on multiple branches 395 // we do not want to init the same tap multiple times 396 Set<Tap> uniqueSources = getUniqueStreamedSources(); 397 398 JobConf[] streamedJobs = new JobConf[ uniqueSources.size() ]; 399 int i = 0; 400 401 for( Tap tap : uniqueSources ) 402 { 403 if( tap.getIdentifier() == null ) 404 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() ); 405 406 streamedJobs[ i ] = flowProcess.copyConfig( conf ); 407 408 streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) ); 409 410 tap.sourceConfInit( flowProcess, streamedJobs[ i ] ); 411 412 i++; 413 } 414 415 Set<Tap> accumulatedSources = getAllAccumulatedSources(); 416 417 for( Tap tap : accumulatedSources ) 418 { 419 JobConf accumulatedJob = flowProcess.copyConfig( conf ); 420 421 tap.sourceConfInit( flowProcess, accumulatedJob ); 422 423 Map<String, String> map = flowProcess.diffConfigIntoMap( conf, accumulatedJob ); 424 conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) ); 425 426 try 427 { 428 if( DistributedCache.getCacheFiles( accumulatedJob ) != null ) 429 DistributedCache.setCacheFiles( DistributedCache.getCacheFiles( accumulatedJob ), conf ); 430 } 431 catch( IOException exception ) 432 { 433 throw new CascadingException( exception ); 434 } 435 } 436 437 MultiInputFormat.addInputFormat( conf, streamedJobs ); //must come last 438 } 439 440 private void initFromStepConfigDef( final JobConf conf ) 441 { 442 initConfFromStepConfigDef( new ConfigurationSetter( conf ) ); 443 } 444 445 /** 446 * sources are specific to step, remove all known accumulated sources, if any 447 * 448 * @return 449 */ 450 private Set<Tap> getUniqueStreamedSources() 451 { 452 Set<Tap> allAccumulatedSources = getAllAccumulatedSources(); 453 454 // if a source is dual accumulated and streamed, honor the streamed annotation 455 allAccumulatedSources.removeAll( getAllStreamedSources() ); 456 457 // start with the full source declaration and removed undesired taps. the above methods are dependent on 458 // annotations which may not exist, so we are safeguarding a declared tap is treated streamed by default 459 HashSet<Tap> set = new HashSet<>( sources.keySet() ); 460 461 set.removeAll( allAccumulatedSources ); 462 463 return set; 464 } 465 466 protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf ) 467 { 468 // init sink first so tempSink can take precedence 469 if( getSink() != null ) 470 getSink().sinkConfInit( flowProcess, conf ); 471 472 Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class ); 473 boolean isFileOutputFormat = false; 474 475 if( outputFormat != null ) 476 isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat ); 477 478 Path outputPath = FileOutputFormat.getOutputPath( conf ); 479 480 // if no output path is set, we need to substitute an alternative if the OutputFormat is file based 481 // PartitionTap won't set the output, but will set an OutputFormat 482 // MultiSinkTap won't set the output or set the OutputFormat 483 // Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..) 484 if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) ) 485 tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true ); 486 487 // tempSink exists because sink is writeDirect 488 if( tempSink != null ) 489 tempSink.sinkConfInit( flowProcess, conf ); 490 } 491 492 protected void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf ) 493 { 494 initFromTraps( flowProcess, conf, getTrapMap() ); 495 } 496 }