001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.io.IOException; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.Map; 027import java.util.Set; 028 029import cascading.CascadingException; 030import cascading.flow.FlowException; 031import cascading.flow.FlowNode; 032import cascading.flow.FlowProcess; 033import cascading.flow.FlowRuntimeProps; 034import cascading.flow.hadoop.planner.HadoopFlowStepJob; 035import cascading.flow.hadoop.util.HadoopMRUtil; 036import cascading.flow.hadoop.util.HadoopUtil; 037import cascading.flow.planner.BaseFlowStep; 038import cascading.flow.planner.FlowStepJob; 039import cascading.flow.planner.PlatformInfo; 040import cascading.flow.planner.graph.ElementGraph; 041import cascading.flow.planner.process.FlowNodeGraph; 042import cascading.management.state.ClientState; 043import cascading.tap.Tap; 044import cascading.tap.hadoop.io.MultiInputFormat; 045import cascading.tap.hadoop.util.Hadoop18TapUtil; 046import cascading.tap.hadoop.util.TempHfs; 047import cascading.tuple.Tuple; 048import cascading.tuple.hadoop.TupleSerialization; 049import cascading.tuple.hadoop.util.CoGroupingComparator; 050import cascading.tuple.hadoop.util.CoGroupingPartitioner; 051import cascading.tuple.hadoop.util.GroupingComparator; 052import cascading.tuple.hadoop.util.GroupingPartitioner; 053import cascading.tuple.hadoop.util.GroupingSortingComparator; 054import cascading.tuple.hadoop.util.GroupingSortingPartitioner; 055import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator; 056import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator; 057import cascading.tuple.hadoop.util.ReverseTupleComparator; 058import cascading.tuple.hadoop.util.TupleComparator; 059import cascading.tuple.io.IndexTuple; 060import cascading.tuple.io.TuplePair; 061import cascading.util.Util; 062import cascading.util.Version; 063import org.apache.hadoop.filecache.DistributedCache; 064import org.apache.hadoop.fs.Path; 065import org.apache.hadoop.mapred.FileOutputFormat; 066import org.apache.hadoop.mapred.JobConf; 067import org.apache.hadoop.mapred.OutputFormat; 068 069import static cascading.flow.hadoop.util.HadoopUtil.addComparators; 070import static cascading.flow.hadoop.util.HadoopUtil.pack; 071 072/** 073 * 074 */ 075public class HadoopFlowStep extends BaseFlowStep<JobConf> 076 { 077 protected HadoopFlowStep( String name, int ordinal ) 078 { 079 super( name, ordinal ); 080 } 081 082 public HadoopFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph ) 083 { 084 super( elementGraph, flowNodeGraph ); 085 } 086 087 @Override 088 public Map<Object, Object> getConfigAsProperties() 089 { 090 return HadoopUtil.createProperties( getConfig() ); 091 } 092 093 public JobConf createInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig ) 094 { 095 JobConf conf = parentConfig == null ? new JobConf() : HadoopUtil.copyJobConf( parentConfig ); 096 097 // disable warning 098 conf.setBoolean( "mapred.used.genericoptionsparser", true ); 099 100 conf.setJobName( getStepDisplayName( conf.getInt( "cascading.display.id.truncate", Util.ID_LENGTH ) ) ); 101 102 conf.setOutputKeyClass( Tuple.class ); 103 conf.setOutputValueClass( Tuple.class ); 104 105 conf.setMapRunnerClass( FlowMapper.class ); 106 conf.setReducerClass( FlowReducer.class ); 107 108 // set for use by the shuffling phase 109 TupleSerialization.setSerializations( conf ); 110 111 initFromSources( flowProcess, conf ); 112 113 initFromSink( flowProcess, conf ); 114 115 initFromTraps( flowProcess, conf ); 116 117 initFromStepConfigDef( conf ); 118 119 int numSinkParts = getSink().getScheme().getNumSinkParts(); 120 121 if( numSinkParts != 0 ) 122 { 123 // if no reducer, set num map tasks to control parts 124 if( getGroup() != null ) 125 conf.setNumReduceTasks( numSinkParts ); 126 else 127 conf.setNumMapTasks( numSinkParts ); 128 } 129 else if( getGroup() != null ) 130 { 131 int gatherPartitions = conf.getNumReduceTasks(); 132 133 if( gatherPartitions == 0 ) 134 gatherPartitions = conf.getInt( FlowRuntimeProps.GATHER_PARTITIONS, 0 ); 135 136 if( gatherPartitions == 0 ) 137 throw new FlowException( getName(), "a default number of gather partitions must be set, see FlowRuntimeProps" ); 138 139 conf.setNumReduceTasks( gatherPartitions ); 140 } 141 142 conf.setOutputKeyComparatorClass( TupleComparator.class ); 143 144 if( getGroup() == null ) 145 { 146 conf.setNumReduceTasks( 0 ); // disable reducers 147 } 148 else 149 { 150 // must set map output defaults when performing a reduce 151 conf.setMapOutputKeyClass( Tuple.class ); 152 conf.setMapOutputValueClass( Tuple.class ); 153 conf.setPartitionerClass( GroupingPartitioner.class ); 154 155 // handles the case the groupby sort should be reversed 156 if( getGroup().isSortReversed() ) 157 conf.setOutputKeyComparatorClass( ReverseTupleComparator.class ); 158 159 addComparators( conf, "cascading.group.comparator", getGroup().getKeySelectors(), this, getGroup() ); 160 161 if( getGroup().isGroupBy() ) 162 addComparators( conf, "cascading.sort.comparator", getGroup().getSortingSelectors(), this, getGroup() ); 163 164 if( !getGroup().isGroupBy() ) 165 { 166 conf.setPartitionerClass( CoGroupingPartitioner.class ); 167 conf.setMapOutputKeyClass( IndexTuple.class ); // allows groups to be sorted by index 168 conf.setMapOutputValueClass( IndexTuple.class ); 169 conf.setOutputKeyComparatorClass( IndexTupleCoGroupingComparator.class ); // sorts by group, then by index 170 conf.setOutputValueGroupingComparator( CoGroupingComparator.class ); 171 } 172 173 if( getGroup().isSorted() ) 174 { 175 conf.setPartitionerClass( GroupingSortingPartitioner.class ); 176 conf.setMapOutputKeyClass( TuplePair.class ); 177 178 if( getGroup().isSortReversed() ) 179 conf.setOutputKeyComparatorClass( ReverseGroupingSortingComparator.class ); 180 else 181 conf.setOutputKeyComparatorClass( GroupingSortingComparator.class ); 182 183 // no need to supply a reverse comparator, only equality is checked 184 conf.setOutputValueGroupingComparator( GroupingComparator.class ); 185 } 186 } 187 188 // perform last so init above will pass to tasks 189 String versionString = Version.getRelease(); 190 191 if( versionString != null ) 192 conf.set( "cascading.version", versionString ); 193 194 conf.set( CASCADING_FLOW_STEP_ID, getID() ); 195 conf.set( "cascading.flow.step.num", Integer.toString( getOrdinal() ) ); 196 197 HadoopUtil.setIsInflow( conf ); 198 199 Iterator<FlowNode> iterator = getFlowNodeGraph().getTopologicalIterator(); 200 201 FlowNode mapperNode = iterator.next(); 202 FlowNode reducerNode = iterator.hasNext() ? iterator.next() : null; 203 204 if( reducerNode != null ) 205 reducerNode.addProcessAnnotation( FlowRuntimeProps.GATHER_PARTITIONS, Integer.toString( conf.getNumReduceTasks() ) ); 206 207 String mapState = pack( mapperNode, conf ); 208 String reduceState = pack( reducerNode, conf ); 209 210 // hadoop 20.2 doesn't like dist cache when using local mode 211 int maxSize = Short.MAX_VALUE; 212 213 int length = mapState.length() + reduceState.length(); 214 215 if( isHadoopLocalMode( conf ) || length < maxSize ) // seems safe 216 { 217 conf.set( "cascading.flow.step.node.map", mapState ); 218 219 if( !Util.isEmpty( reduceState ) ) 220 conf.set( "cascading.flow.step.node.reduce", reduceState ); 221 } 222 else 223 { 224 conf.set( "cascading.flow.step.node.map.path", HadoopMRUtil.writeStateToDistCache( conf, getID(), "map", mapState ) ); 225 226 if( !Util.isEmpty( reduceState ) ) 227 conf.set( "cascading.flow.step.node.reduce.path", HadoopMRUtil.writeStateToDistCache( conf, getID(), "reduce", reduceState ) ); 228 } 229 230 return conf; 231 } 232 233 public boolean isHadoopLocalMode( JobConf conf ) 234 { 235 return HadoopUtil.isLocal( conf ); 236 } 237 238 protected FlowStepJob<JobConf> createFlowStepJob( ClientState clientState, FlowProcess<JobConf> flowProcess, JobConf initializedStepConfig ) 239 { 240 try 241 { 242 return new HadoopFlowStepJob( clientState, this, initializedStepConfig ); 243 } 244 catch( NoClassDefFoundError error ) 245 { 246 PlatformInfo platformInfo = HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ); 247 String message = "unable to load platform specific class, please verify Hadoop cluster version: '%s', matches the Hadoop platform build dependency and associated FlowConnector, cascading-hadoop or cascading-hadoop2-mr1"; 248 249 logError( String.format( message, platformInfo.toString() ), error ); 250 251 throw error; 252 } 253 } 254 255 /** 256 * Method clean removes any temporary files used by this FlowStep instance. It will log any IOExceptions thrown. 257 * 258 * @param config of type JobConf 259 */ 260 public void clean( JobConf config ) 261 { 262 String stepStatePath = config.get( "cascading.flow.step.path" ); 263 264 if( stepStatePath != null ) 265 { 266 try 267 { 268 HadoopUtil.removeStateFromDistCache( config, stepStatePath ); 269 } 270 catch( IOException exception ) 271 { 272 logWarn( "unable to remove step state file: " + stepStatePath, exception ); 273 } 274 } 275 276 if( tempSink != null ) 277 { 278 try 279 { 280 tempSink.deleteResource( config ); 281 } 282 catch( Exception exception ) 283 { 284 // sink all exceptions, don't fail app 285 logWarn( "unable to remove temporary file: " + tempSink, exception ); 286 } 287 } 288 289 if( getSink().isTemporary() && ( getFlow().getFlowStats().isSuccessful() || getFlow().getRunID() == null ) ) 290 { 291 try 292 { 293 getSink().deleteResource( config ); 294 } 295 catch( Exception exception ) 296 { 297 // sink all exceptions, don't fail app 298 logWarn( "unable to remove temporary file: " + getSink(), exception ); 299 } 300 } 301 else 302 { 303 cleanTapMetaData( config, getSink() ); 304 } 305 306 for( Tap tap : getTraps() ) 307 cleanTapMetaData( config, tap ); 308 } 309 310 private void cleanTapMetaData( JobConf jobConf, Tap tap ) 311 { 312 try 313 { 314 Hadoop18TapUtil.cleanupTapMetaData( jobConf, tap ); 315 } 316 catch( IOException exception ) 317 { 318 // ignore exception 319 } 320 } 321 322 private void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf, Map<String, Tap> traps ) 323 { 324 if( !traps.isEmpty() ) 325 { 326 JobConf trapConf = HadoopUtil.copyJobConf( conf ); 327 328 for( Tap tap : traps.values() ) 329 tap.sinkConfInit( flowProcess, trapConf ); 330 } 331 } 332 333 protected void initFromSources( FlowProcess<JobConf> flowProcess, JobConf conf ) 334 { 335 // handles case where same tap is used on multiple branches 336 // we do not want to init the same tap multiple times 337 Set<Tap> uniqueSources = getUniqueStreamedSources(); 338 339 JobConf[] streamedJobs = new JobConf[ uniqueSources.size() ]; 340 int i = 0; 341 342 for( Tap tap : uniqueSources ) 343 { 344 if( tap.getIdentifier() == null ) 345 throw new IllegalStateException( "tap may not have null identifier: " + tap.toString() ); 346 347 streamedJobs[ i ] = flowProcess.copyConfig( conf ); 348 349 streamedJobs[ i ].set( "cascading.step.source", Tap.id( tap ) ); 350 351 tap.sourceConfInit( flowProcess, streamedJobs[ i ] ); 352 353 i++; 354 } 355 356 Set<Tap> accumulatedSources = getAllAccumulatedSources(); 357 358 for( Tap tap : accumulatedSources ) 359 { 360 JobConf accumulatedJob = flowProcess.copyConfig( conf ); 361 362 tap.sourceConfInit( flowProcess, accumulatedJob ); 363 364 Map<String, String> map = flowProcess.diffConfigIntoMap( conf, accumulatedJob ); 365 conf.set( "cascading.node.accumulated.source.conf." + Tap.id( tap ), pack( map, conf ) ); 366 367 try 368 { 369 if( DistributedCache.getCacheFiles( accumulatedJob ) != null ) 370 DistributedCache.setCacheFiles( DistributedCache.getCacheFiles( accumulatedJob ), conf ); 371 } 372 catch( IOException exception ) 373 { 374 throw new CascadingException( exception ); 375 } 376 } 377 378 MultiInputFormat.addInputFormat( conf, streamedJobs ); //must come last 379 } 380 381 private void initFromStepConfigDef( final JobConf conf ) 382 { 383 initConfFromStepConfigDef( new ConfigurationSetter( conf ) ); 384 } 385 386 /** 387 * sources are specific to step, remove all known accumulated sources, if any 388 * 389 * @return 390 */ 391 private Set<Tap> getUniqueStreamedSources() 392 { 393 Set<Tap> allAccumulatedSources = getAllAccumulatedSources(); 394 395 HashSet<Tap> set = new HashSet<>( sources.keySet() ); 396 397 set.removeAll( allAccumulatedSources ); 398 399 return set; 400 } 401 402 protected void initFromSink( FlowProcess<JobConf> flowProcess, JobConf conf ) 403 { 404 // init sink first so tempSink can take precedence 405 if( getSink() != null ) 406 getSink().sinkConfInit( flowProcess, conf ); 407 408 Class<? extends OutputFormat> outputFormat = conf.getClass( "mapred.output.format.class", null, OutputFormat.class ); 409 boolean isFileOutputFormat = false; 410 411 if( outputFormat != null ) 412 isFileOutputFormat = FileOutputFormat.class.isAssignableFrom( outputFormat ); 413 414 Path outputPath = FileOutputFormat.getOutputPath( conf ); 415 416 // if no output path is set, we need to substitute an alternative if the OutputFormat is file based 417 // PartitionTap won't set the output, but will set an OutputFormat 418 // MultiSinkTap won't set the output or set the OutputFormat 419 // Non file based OutputFormats don't have an output path, but do have an OutputFormat set (JDBCTap..) 420 if( outputPath == null && ( isFileOutputFormat || outputFormat == null ) ) 421 tempSink = new TempHfs( conf, "tmp:/" + new Path( getSink().getIdentifier() ).toUri().getPath(), true ); 422 423 // tempSink exists because sink is writeDirect 424 if( tempSink != null ) 425 tempSink.sinkConfInit( flowProcess, conf ); 426 } 427 428 protected void initFromTraps( FlowProcess<JobConf> flowProcess, JobConf conf ) 429 { 430 initFromTraps( flowProcess, conf, getTrapMap() ); 431 } 432 }