001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.io.IOException; 024import java.util.Arrays; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.Map; 028import java.util.Set; 029 030import cascading.CascadingException; 031import cascading.flow.FlowElement; 032import cascading.flow.FlowException; 033import cascading.flow.FlowNode; 034import cascading.flow.FlowProcess; 035import cascading.flow.FlowRuntimeProps; 036import cascading.flow.hadoop.planner.HadoopFlowStepJob; 037import cascading.flow.hadoop.util.HadoopMRUtil; 038import cascading.flow.hadoop.util.HadoopUtil; 039import cascading.flow.planner.BaseFlowStep; 040import cascading.flow.planner.FlowStepJob; 041import cascading.flow.planner.PlatformInfo; 042import cascading.flow.planner.graph.ElementGraph; 043import cascading.flow.planner.process.FlowNodeGraph; 044import cascading.flow.planner.process.ProcessEdge; 045import cascading.management.state.ClientState; 046import cascading.pipe.CoGroup; 047import cascading.tap.Tap; 048import cascading.tap.hadoop.io.MultiInputFormat; 049import cascading.tap.hadoop.util.Hadoop18TapUtil; 050import cascading.tap.hadoop.util.TempHfs; 051import cascading.tuple.Fields; 052import cascading.tuple.hadoop.TupleSerialization; 053import cascading.tuple.hadoop.util.CoGroupingComparator; 054import cascading.tuple.hadoop.util.CoGroupingPartitioner; 055import cascading.tuple.hadoop.util.GroupingComparator; 056import cascading.tuple.hadoop.util.GroupingPartitioner; 057import cascading.tuple.hadoop.util.GroupingSortingComparator; 058import cascading.tuple.hadoop.util.GroupingSortingPartitioner; 059import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator; 060import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator; 061import cascading.tuple.hadoop.util.ReverseTupleComparator; 062import cascading.tuple.hadoop.util.TupleComparator; 063import cascading.tuple.io.KeyIndexTuple; 064import cascading.tuple.io.KeyTuple; 065import cascading.tuple.io.TuplePair; 066import cascading.tuple.io.ValueIndexTuple; 067import cascading.tuple.io.ValueTuple; 068import cascading.util.ProcessLogger; 069import cascading.util.Util; 070import cascading.util.Version; 071import org.apache.hadoop.filecache.DistributedCache; 072import org.apache.hadoop.fs.Path; 073import org.apache.hadoop.mapred.FileOutputFormat; 074import org.apache.hadoop.mapred.JobConf; 075import org.apache.hadoop.mapred.OutputFormat; 076 077import static cascading.flow.hadoop.util.HadoopUtil.*; 078 079/** 080 * 081 */ 082public class HadoopFlowStep extends BaseFlowStep<JobConf> 083 { 084 protected HadoopFlowStep( String name, int ordinal ) 085 { 086 super( name, ordinal ); 087 } 088 089 public HadoopFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph ) 090 { 091 super( elementGraph, flowNodeGraph ); 092 } 093 094 @Override 095 public Map<Object, Object> getConfigAsProperties() 096 { 097 return HadoopUtil.createProperties( getConfig() ); 098 } 099 100 public JobConf createInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig ) 101 { 102 JobConf conf = parentConfig == null ? new JobConf() : HadoopUtil.copyJobConf( parentConfig ); 103 104 // disable warning 105 conf.setBoolean( "mapred.used.genericoptionsparser", true ); 106 107 conf.setJobName( getStepDisplayName( conf.getInt( "cascading.display.id.truncate", Util.ID_LENGTH ) ) ); 108 109 conf.setOutputKeyClass( KeyTuple.class ); 110 conf.setOutputValueClass( ValueTuple.class ); 111 112 conf.setMapRunnerClass( FlowMapper.class ); 113 conf.setReducerClass( FlowReducer.class ); 114 115 // set for use by the shuffling phase 116 TupleSerialization.setSerializations( conf ); 117 118 initFromSources( flowProcess, conf ); 119 120 initFromSink( flowProcess, conf ); 121 122 initFromTraps( flowProcess, conf ); 123 124 initFromStepConfigDef( conf ); 125 126 int numSinkParts = getSink().getScheme().getNumSinkParts(); 127 128 if( numSinkParts != 0 ) 129 { 130 // if no reducer, set num map tasks to control parts 131 if( getGroup() != null ) 132 conf.setNumReduceTasks( numSinkParts ); 133 else 134 conf.setNumMapTasks( numSinkParts ); 135 } 136 else if( getGroup() != null ) 137 { 138 int gatherPartitions = conf.getNumReduceTasks(); 139 140 if( gatherPartitions == 0 ) 141 gatherPartitions = conf.getInt( FlowRuntimeProps.GATHER_PARTITIONS, 0 ); 142 143 if( gatherPartitions == 0 ) 144 throw new FlowException( getName(), "a default number of gather partitions must be set, see FlowRuntimeProps" ); 145 146 conf.setNumReduceTasks( gatherPartitions ); 147 } 148 149 conf.setOutputKeyComparatorClass( TupleComparator.class ); 150 151 ProcessEdge processEdge = Util.getFirst( getFlowNodeGraph().edgeSet() ); 152 153 if( getGroup() == null ) 154 { 155 conf.setNumReduceTasks( 0 ); // disable reducers 156 } 157 else 158 { 159 // must set map output defaults when performing a reduce 160 conf.setMapOutputKeyClass( KeyTuple.class ); 161 conf.setMapOutputValueClass( ValueTuple.class ); 162 conf.setPartitionerClass( GroupingPartitioner.class ); 163 164 // handles the case the groupby sort should be reversed 165 if( getGroup().isSortReversed() ) 166 conf.setOutputKeyComparatorClass( ReverseTupleComparator.class ); 167 168 Integer ordinal = (Integer) Util.getFirst( processEdge.getSinkExpectedOrdinals() ); 169 170 addComparators( conf, "cascading.group.comparator", getGroup().getKeySelectors(), (Fields) processEdge.getResolvedKeyFields().get( ordinal ) ); 171 172 if( getGroup().isGroupBy() ) 173 addComparators( conf, "cascading.sort.comparator", getGroup().getSortingSelectors(), (Fields) processEdge.getResolvedSortFields().get( ordinal ) ); 174 175 if( !getGroup().isGroupBy() ) 176 { 177 conf.setPartitionerClass( CoGroupingPartitioner.class ); 178 conf.setMapOutputKeyClass( KeyIndexTuple.class ); // allows groups to be sorted by index 179 conf.setMapOutputValueClass( ValueIndexTuple.class ); 180 conf.setOutputKeyComparatorClass( IndexTupleCoGroupingComparator.class ); // sorts by group, then by index 181 conf.setOutputValueGroupingComparator( CoGroupingComparator.class ); 182 } 183 184 if( getGroup().isSorted() ) 185 { 186 conf.setPartitionerClass( GroupingSortingPartitioner.class ); 187 conf.setMapOutputKeyClass( TuplePair.class ); 188 189 if( getGroup().isSortReversed() ) 190 conf.setOutputKeyComparatorClass( ReverseGroupingSortingComparator.class ); 191 else 192 conf.setOutputKeyComparatorClass( GroupingSortingComparator.class ); 193 194 // no need to supply a reverse comparator, only equality is checked 195 conf.setOutputValueGroupingComparator( GroupingComparator.class ); 196 } 197 } 198 199 // if we write type information into the stream, we can perform comparisons in indexed tuples 200 // thus, if the edge is a CoGroup and they keys are not common types, force writing of type information 201 if( processEdge != null && ifCoGroupAndKeysHaveCommonTypes( this, processEdge.getFlowElement(), processEdge.getResolvedKeyFields() ) ) 202 { 203 conf.set( "cascading.node.ordinals", Util.join( processEdge.getSinkExpectedOrdinals(), "," ) ); 204 addFields( conf, "cascading.node.key.fields", processEdge.getResolvedKeyFields() ); 205 addFields( conf, "cascading.node.sort.fields", processEdge.getResolvedSortFields() ); 206 addFields( conf, "cascading.node.value.fields", processEdge.getResolvedValueFields() ); 207 } 208 209 // perform last so init above will pass to tasks 210 String versionString = Version.getRelease(); 211 212 if( versionString != null ) 213 conf.set( "cascading.version", versionString ); 214 215 conf.set( CASCADING_FLOW_STEP_ID, getID() ); 216 conf.set( "cascading.flow.step.num", Integer.toString( getOrdinal() ) ); 217 218 HadoopUtil.setIsInflow( conf ); 219 220 Iterator<FlowNode> iterator = getFlowNodeGraph().getTopologicalIterator(); 221 222 FlowNode mapperNode = iterator.next(); 223 FlowNode reducerNode = iterator.hasNext() ? iterator.next() : null; 224 225 if( reducerNode != null ) 226 reducerNode.addProcessAnnotation( FlowRuntimeProps.GATHER_PARTITIONS, Integer.toString( conf.getNumReduceTasks() ) ); 227 228 String mapState = pack( mapperNode, conf ); 229 String reduceState = pack( reducerNode, conf ); 230 231 // hadoop 20.2 doesn't like dist cache when using local mode 232 int maxSize = Short.MAX_VALUE; 233 234 int length = mapState.length() + reduceState.length(); 235 236 if( isHadoopLocalMode( conf ) || length < maxSize ) // seems safe 237 { 238 conf.set( "cascading.flow.step.node.map", mapState ); 239 240 if( !Util.isEmpty( reduceState ) ) 241 conf.set( "cascading.flow.step.node.reduce", reduceState ); 242 } 243 else 244 { 245 conf.set( "cascading.flow.step.node.map.path", HadoopMRUtil.writeStateToDistCache( conf, getID(), "map", mapState ) ); 246 247 if( !Util.isEmpty( reduceState ) ) 248 conf.set( "cascading.flow.step.node.reduce.path", HadoopMRUtil.writeStateToDistCache( conf, getID(), "reduce", reduceState ) ); 249 } 250 251 return conf; 252 } 253 254 private static boolean ifCoGroupAndKeysHaveCommonTypes( ProcessLogger processLogger, FlowElement flowElement, Map<Integer, Fields> resolvedKeyFields ) 255 { 256 if( !( flowElement instanceof CoGroup ) ) 257 return true; 258 259 if( resolvedKeyFields == null || resolvedKeyFields.size() < 2 ) 260 return true; 261 262 Iterator<Map.Entry<Integer, Fields>> iterator = resolvedKeyFields.entrySet().iterator(); 263 264 Fields fields = iterator.next().getValue(); 265 266 while( iterator.hasNext() ) 267 { 268 Fields next = iterator.next().getValue(); 269 270 if( !Arrays.equals( fields.getTypesClasses(), next.getTypesClasses() ) ) 271 { 272 processLogger.logWarn( "unable to perform: {}, on mismatched join types and optimize serialization with type exclusion, fields: {} & {}", flowElement, fields, next ); 273 return false; 274 } 275 } 276 277 return true; 278 } 279 280 public boolean isHadoopLocalMode( JobConf conf ) 281 { 282 return HadoopUtil.isLocal( conf ); 283 } 284 285 protected FlowStepJob<JobConf> createFlowStepJob( ClientState clientState, FlowProcess<JobConf> flowProcess, JobConf initializedStepConfig ) 286 { 287 try 288 { 289 return new HadoopFlowStepJob( clientState, this, initializedStepConfig ); 290 } 291 catch( NoClassDefFoundError error ) 292 { 293 PlatformInfo platformInfo = HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ); 294 String message = "unable to load platform specific class, please verify Hadoop cluster version: '%s', matches the Hadoop platform build dependency and associated FlowConnector, cascading-hadoop or cascading-hadoop2-mr1"; 295 296 logError( String.format( message, platformInfo.toString() ), error ); 297 298 throw error; 299 } 300 } 301 302 /** 303 * Method clean removes any temporary files used by this FlowStep instance. It will log any IOExceptions thrown. 304 * 305 * @param config of type JobConf 306 */ 307 public void clean( JobConf config ) 308 { 309 String stepStatePath = config.get( "cascading.flow.step.path" ); 310 311 if( stepStatePath != null ) 312 { 313 try 314 { 315 HadoopUtil.removeStateFromDistCache( config, stepStatePath ); 316 } 317 catch( IOException exception ) 318 { 319 logWarn( "unable to remove step state file: " + stepStatePath, exception ); 320 } 321 } 322 323 if( tempSink != null ) 324 { 325 try 326 { 327 tempSink.deleteResource( config ); 328 } 329 catch( Exception exception ) 330 { 331 // sink all exceptions, don't fail app 332 logWarn( "unable to remove temporary file: " + tempSink, exception ); 333 } 334 } 335 336 // safe way to handle zero sinks case 337 for( Tap sink : getSinkTaps() ) 338 cleanIntermediateData( config, sink ); 339 340 for( Tap tap : getTraps() ) 341 cleanTapMetaData( config, tap ); 342 } 343 344 protected void cleanIntermediateData( JobConf config, Tap sink ) 345 { 346 if( sink.isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) ) 347 { 348 try 349 { 350 sink.deleteResource( config ); 351 } 352 catch( Exception exception ) 353 { 354 // sink all exceptions, don't fail app 355 logWarn( "unable to remove temporary file: " + sink, exception ); 356 } 357 } 358 else 359 { 360 cleanTapMetaData( config, sink ); 361 } 362 } 363 364 private void cleanTapMetaData( JobConf jobConf, Tap tap ) 365 { 366 try 367 { 368 Hadoop18TapUtil.cleanupTapMetaData( jobConf, tap ); 369 } 370 catch( IOException exception ) 371 { 372 // ignore exception 373 } 374 } 375 376 private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps ) 377 { 378 if( !traps.isEmpty() ) 379 { 380 JobConf trapConf = HadoopUtil.copyJobConf( conf ); 381 382 for( Tap tap : traps.values() ) 383 tap.sinkConfInit( flowProcess, trapConf ); 384 } 385 } 386 387 protected void initFromSources( FlowProcess<JobConf> flowProcess, JobConf conf ) 388 { 389 // handles case where same tap is used on multiple branches 390 // we do not want to init the same tap multiple times 391 Set<Tap> uniqueSources = getUniqueStreamedSources(); 392 393 JobConf[] streamedJobs = new JobConf[ uniqueSources.size() ]; 394 int i = 0; 395 396 for( Tap tap : uniqueSources ) 397 { 398 if( tap.getIdentifier() == null ) 399 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() ); 400 401 streamedJobs[ i ] = flowProcess.copyConfig( conf ); 402 403 streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) ); 404 405 tap.sourceConfInit( flowProcess, streamedJobs[ i ] ); 406 407 i++; 408 } 409 410 Set<Tap> accumulatedSources = getAllAccumulatedSources(); 411 412 for( Tap tap : accumulatedSources ) 413 { 414 JobConf accumulatedJob = flowProcess.copyConfig( conf ); 415 416 tap.sourceConfInit( flowProcess, accumulatedJob ); 417 418 Map<String, String> map = flowProcess.diffConfigIntoMap( conf, accumulatedJob ); 419 conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) ); 420 421 try 422 { 423 if( DistributedCache.getCacheFiles( accumulatedJob ) != null ) 424 DistributedCache.setCacheFiles( DistributedCache.getCacheFiles( accumulatedJob ), conf ); 425 } 426 catch( IOException exception ) 427 { 428 throw new CascadingException( exception ); 429 } 430 } 431 432 MultiInputFormat.addInputFormat( conf, streamedJobs ); //must come last 433 } 434 435 private void initFromStepConfigDef( final JobConf conf ) 436 { 437 initConfFromStepConfigDef( new ConfigurationSetter( conf ) ); 438 } 439 440 /** 441 * sources are specific to step, remove all known accumulated sources, if any 442 * 443 * @return 444 */ 445 private Set<Tap> getUniqueStreamedSources() 446 { 447 Set<Tap> allAccumulatedSources = getAllAccumulatedSources(); 448 449 HashSet<Tap> set = new HashSet<>( sources.keySet() ); 450 451 set.removeAll( allAccumulatedSources ); 452 453 return set; 454 } 455 456 protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf ) 457 { 458 // init sink first so tempSink can take precedence 459 if( getSink() != null ) 460 getSink().sinkConfInit( flowProcess, conf ); 461 462 Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class ); 463 boolean isFileOutputFormat = false; 464 465 if( outputFormat != null ) 466 isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat ); 467 468 Path outputPath = FileOutputFormat.getOutputPath( conf ); 469 470 // if no output path is set, we need to substitute an alternative if the OutputFormat is file based 471 // PartitionTap won't set the output, but will set an OutputFormat 472 // MultiSinkTap won't set the output or set the OutputFormat 473 // Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..) 474 if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) ) 475 tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true ); 476 477 // tempSink exists because sink is writeDirect 478 if( tempSink != null ) 479 tempSink.sinkConfInit( flowProcess, conf ); 480 } 481 482 protected void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf ) 483 { 484 initFromTraps( flowProcess, conf, getTrapMap() ); 485 } 486 }