001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.util.ArrayList; 024import java.util.Collection; 025import java.util.HashMap; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Properties; 030import java.util.concurrent.ExecutionException; 031 032import cascading.flow.hadoop.util.HadoopUtil; 033import cascading.flow.planner.PlatformInfo; 034import cascading.flow.planner.process.FlowStepGraph; 035import cascading.tap.SinkMode; 036import cascading.tap.Tap; 037import cascading.tap.hadoop.Hfs; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.mapred.JobConf; 040 041import static cascading.flow.planner.graph.ElementGraphs.asFlowElementGraph; 042import static cascading.util.Util.asList; 043 044/** 045 * Class MultiMapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs 046 * pre-configured via one or more {@link JobConf} objects. 047 * <p/> 048 * Use this class to group multiple JobConf instances together as a single Flow. MultiMapReduceFlow will automatically 049 * topologically order the JobConf instances and schedule them on the cluster once {@link #start()} or {@link #complete()} 050 * are called. 051 * <p> 052 * If you have a single JobConf instance, see {@link MapReduceFlow} as a alternative to this class. 053 * <p> 054 * This class will not delete any sinks before execution, it is up to the developer to make sure any intermediate and 055 * sink paths be removed/deleted before calling {@link #start()} or {@link #complete()}, otherwise Hadoop will throw 056 * an exception. 057 * <p> 058 * JobConf instances can be incrementally added at any point before the {@link #complete()} method is called. But they must 059 * logically (topologically) come after any previously provided JobConf instances. In practice the Flow will fail if 060 * the input source path is missing because a prior JobConf was not provided before the Flow was started. 061 * <p> 062 * The ordering is done by comparing the input and output paths of the given JobConf instances. By default, this class 063 * only works with JobConf instances that read and write from the Hadoop FileSystem (HDFS) (any path that would work 064 * with the {@link Hfs} Tap. 065 * <p> 066 * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the 067 * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and 068 * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into 069 * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone. 070 * <p/> 071 * MultiMapReduceFlow supports both org.apache.hadoop.mapred.* and org.apache.hadoop.mapreduce.* API Jobs. 072 */ 073public class MultiMapReduceFlow extends BaseMapReduceFlow 074 { 075 /** Field tapCache */ 076 private Map<String, Tap> tapCache = new HashMap<>(); 077 /** Field queuedSteps */ 078 private List<MapReduceFlowStep> queuedSteps = new LinkedList<>(); 079 /** Field completeCalled */ 080 private volatile boolean completeCalled = false; 081 /** Field block */ 082 private final Object lock = new Object(); 083 084 /** 085 * Constructor MultiMapReduceFlow creates a new MultiMapReduceFlow instance. 086 * 087 * @param name of String 088 * @param jobConf of JobConf 089 * @param jobConfs of JobConf... 090 */ 091 public MultiMapReduceFlow( String name, JobConf jobConf, JobConf... jobConfs ) 092 { 093 this( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), new Properties(), name ); 094 095 initializeFrom( asList( jobConf, jobConfs ) ); 096 } 097 098 /** 099 * Constructor MultiMapReduceFlow creates a new MultiMapReduceFlow instance. 100 * 101 * @param properties of Map<Object, Object> 102 * @param name of String 103 * @param jobConf of JobConf 104 * @param jobConfs of JobConf... 105 */ 106 public MultiMapReduceFlow( Map<Object, Object> properties, String name, JobConf jobConf, JobConf... jobConfs ) 107 { 108 this( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), properties, name, null ); 109 110 initializeFrom( asList( jobConf, jobConfs ) ); 111 } 112 113 /** 114 * Constructor MultiMapReduceFlow creates a new MultiMapReduceFlow instance. 115 * 116 * @param properties of Map<Object, Object> 117 * @param name of String 118 * @param flowDescriptor of Map<String, String> 119 * @param jobConf of JobConf 120 * @param jobConfs of JobConf... 121 */ 122 public MultiMapReduceFlow( Map<Object, Object> properties, String name, Map<String, String> flowDescriptor, JobConf jobConf, JobConf... jobConfs ) 123 { 124 this( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), properties, name, flowDescriptor ); 125 126 initializeFrom( asList( jobConf, jobConfs ) ); 127 } 128 129 /** 130 * Constructor MultiMapReduceFlow creates a new MultiMapReduceFlow instance. 131 * 132 * @param properties of Map<Object, Object> 133 * @param name of String 134 * @param flowDescriptor of Map<String, String> 135 * @param stopJobsOnExit of boolean 136 * @param jobConf of JobConf 137 * @param jobConfs of JobConf... 138 */ 139 public MultiMapReduceFlow( Map<Object, Object> properties, String name, Map<String, String> flowDescriptor, boolean stopJobsOnExit, JobConf jobConf, JobConf... jobConfs ) 140 { 141 this( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), properties, name, flowDescriptor ); 142 this.stopJobsOnExit = stopJobsOnExit; 143 144 initializeFrom( asList( jobConf, jobConfs ) ); 145 } 146 147 /** 148 * Constructor MultiMapReduceFlow creates a new MultiMapReduceFlow instance. 149 * 150 * @param platformInfo of PlatformInfo 151 * @param properties of Map<Object, Object> 152 * @param name of String 153 */ 154 protected MultiMapReduceFlow( PlatformInfo platformInfo, Map<Object, Object> properties, String name ) 155 { 156 this( platformInfo, properties, name, null ); 157 } 158 159 /** 160 * Constructor MultiMapReduceFlow creates a new MultiMapReduceFlow instance. 161 * 162 * @param platformInfo of PlatformInfo 163 * @param properties of Map<Object, Object> 164 * @param name of String 165 * @param flowDescriptor of Map<String, String> 166 */ 167 protected MultiMapReduceFlow( PlatformInfo platformInfo, Map<Object, Object> properties, String name, Map<String, String> flowDescriptor ) 168 { 169 super( platformInfo, properties, name, flowDescriptor, false ); 170 } 171 172 protected void initializeFrom( List<JobConf> jobConfs ) 173 { 174 List<MapReduceFlowStep> steps = new ArrayList<>(); 175 176 for( JobConf jobConf : jobConfs ) 177 steps.add( createMapReduceFlowStep( jobConf ) ); 178 179 updateWithFlowSteps( steps ); 180 } 181 182 protected MapReduceFlowStep createMapReduceFlowStep( JobConf jobConf ) 183 { 184 return new MapReduceFlowStep( this, jobConf ); 185 } 186 187 public void notifyComplete() 188 { 189 completeCalled = true; 190 191 synchronized( lock ) 192 { 193 // forces blockingContinuePollingSteps to stop blocking 194 lock.notifyAll(); 195 } 196 } 197 198 @Override 199 public void complete() 200 { 201 notifyComplete(); 202 203 super.complete(); 204 } 205 206 @Override 207 protected boolean spawnSteps() throws InterruptedException, ExecutionException 208 { 209 // continue to spawn jobs until no longer required 210 while( !stop && throwable == null ) 211 { 212 if( !blockingContinuePollingSteps() ) 213 return true; 214 215 if( isInfoEnabled() ) 216 { 217 logInfo( "updated" ); 218 219 for( Tap source : getSourcesCollection() ) 220 logInfo( " source: " + source ); 221 for( Tap sink : getSinksCollection() ) 222 logInfo( " sink: " + sink ); 223 } 224 225 // will not return until all current steps are complete, or one failed 226 if( !super.spawnSteps() ) 227 return false; 228 } 229 230 return true; 231 } 232 233 protected boolean blockingContinuePollingSteps() 234 { 235 synchronized( lock ) 236 { 237 // block until queue has items, or complete is called 238 while( queuedSteps.isEmpty() && !completeCalled ) 239 { 240 try 241 { 242 lock.wait(); 243 } 244 catch( InterruptedException exception ) 245 { 246 // do nothing 247 } 248 } 249 250 updateWithFlowSteps( queuedSteps ).clear(); 251 } 252 253 if( getEligibleJobsSize() != 0 ) // new ones were added 254 return true; 255 256 return !completeCalled; 257 } 258 259 @Override 260 protected Tap createTap( JobConf jobConf, Path path, SinkMode sinkMode ) 261 { 262 Tap tap = tapCache.get( path.toString() ); 263 264 if( tap == null ) 265 { 266 tap = super.createTap( jobConf, path, sinkMode ); 267 tapCache.put( path.toString(), tap ); 268 } 269 270 return tap; 271 } 272 273 public void attachFlowStep( JobConf jobConf ) 274 { 275 if( completeCalled ) 276 throw new IllegalStateException( "cannot attach new FlowStep after complete() has been called" ); 277 278 addFlowStep( createMapReduceFlowStep( jobConf ) ); 279 } 280 281 protected void addFlowStep( MapReduceFlowStep flowStep ) 282 { 283 synchronized( lock ) 284 { 285 queuedSteps.add( flowStep ); 286 lock.notifyAll(); 287 } 288 } 289 290 protected FlowStepGraph getOrCreateFlowStepGraph() 291 { 292 FlowStepGraph flowStepGraph = getFlowStepGraph(); 293 294 if( flowStepGraph == null ) 295 { 296 flowStepGraph = new FlowStepGraph(); 297 setFlowStepGraph( flowStepGraph ); 298 } 299 300 return flowStepGraph; 301 } 302 303 protected Collection<MapReduceFlowStep> updateWithFlowSteps( Collection<MapReduceFlowStep> flowSteps ) 304 { 305 if( flowSteps.isEmpty() ) 306 return flowSteps; 307 308 FlowStepGraph flowStepGraph = getOrCreateFlowStepGraph(); 309 310 updateFlowStepGraph( flowStepGraph, flowSteps ); 311 312 setFlowElementGraph( asFlowElementGraph( platformInfo, flowStepGraph ) ); 313 314 removeListeners( getSourcesCollection() ); 315 removeListeners( getSinksCollection() ); 316 removeListeners( getTrapsCollection() ); 317 318 // re-adds listeners 319 setSources( flowStepGraph.getSourceTapsMap() ); 320 setSinks( flowStepGraph.getSinkTapsMap() ); 321 setTraps( flowStepGraph.getTrapsMap() ); 322 323 // this mirrors BaseFlow#initialize() 324 325 initSteps(); 326 327 if( flowStats == null ) 328 flowStats = createPrepareFlowStats(); // must be last 329 330 if( !isJobsMapInitialized() ) 331 initializeNewJobsMap(); 332 else 333 updateJobsMap(); 334 335 initializeChildStats(); 336 337 return flowSteps; 338 } 339 340 protected FlowStepGraph updateFlowStepGraph( FlowStepGraph flowStepGraph, Collection<MapReduceFlowStep> flowSteps ) 341 { 342 for( MapReduceFlowStep flowStep : flowSteps ) 343 flowStepGraph.addVertex( flowStep ); 344 345 flowStepGraph.bindEdges(); 346 347 return flowStepGraph; 348 } 349 }