001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.beans.ConstructorProperties; 024import java.io.IOException; 025import java.util.HashMap; 026import java.util.Map; 027import java.util.Properties; 028 029import cascading.CascadingException; 030import cascading.flow.FlowStep; 031import cascading.flow.hadoop.util.HadoopUtil; 032import cascading.flow.planner.process.FlowStepGraph; 033import cascading.scheme.NullScheme; 034import cascading.tap.SinkMode; 035import cascading.tap.Tap; 036import cascading.tap.hadoop.Hfs; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.mapred.FileInputFormat; 039import org.apache.hadoop.mapred.FileOutputFormat; 040import org.apache.hadoop.mapred.JobConf; 041import org.apache.hadoop.mapreduce.Job; 042 043/** 044 * Class MapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs 045 * pre-configured via the {@link JobConf} object. 046 * <p/> 047 * Use this class to allow custom MapReduce jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If 048 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled 049 * according to their dependencies (topologically). 050 * <p/> 051 * Set the parameter {@code deleteSinkOnInit} to {@code true} if the outputPath in the jobConf should be deleted before executing the MapReduce job. 052 * <p/> 053 * MapReduceFlow assumes the underlying input and output paths are compatible with the {@link Hfs} Tap. 054 * <p/> 055 * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the 056 * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and 057 * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into 058 * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone. 059 * <p/> 060 * MapReduceFlow supports both org.apache.hadoop.mapred.* and org.apache.hadoop.mapreduce.* API Jobs. 061 */ 062public class MapReduceFlow extends HadoopFlow 063 { 064 /** Field deleteSinkOnInit */ 065 protected boolean deleteSinkOnInit = false; 066 067 /** 068 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 069 * 070 * @param jobConf of type JobConf 071 */ 072 @ConstructorProperties({"jobConf"}) 073 public MapReduceFlow( JobConf jobConf ) 074 { 075 this( jobConf.getJobName(), jobConf, false ); 076 } 077 078 /** 079 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 080 * 081 * @param jobConf of type JobConf 082 * @param deleteSinkOnInit of type boolean 083 */ 084 @ConstructorProperties({"jobConf", "deleteSinkOnInit"}) 085 public MapReduceFlow( JobConf jobConf, boolean deleteSinkOnInit ) 086 { 087 this( jobConf.getJobName(), jobConf, deleteSinkOnInit ); 088 } 089 090 /** 091 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 092 * 093 * @param name of type String 094 * @param jobConf of type JobConf 095 */ 096 @ConstructorProperties({"name", "jobConf"}) 097 public MapReduceFlow( String name, JobConf jobConf ) 098 { 099 this( name, jobConf, false ); 100 } 101 102 /** 103 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 104 * 105 * @param name of type String 106 * @param jobConf of type JobConf 107 * @param deleteSinkOnInit of type boolean 108 */ 109 @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit"}) 110 public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit ) 111 { 112 this( new Properties(), name, jobConf, null, deleteSinkOnInit, true ); 113 } 114 115 /** 116 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 117 * 118 * @param properties of type Properties 119 * @param name of type String 120 * @param jobConf of type JobConf 121 * @param deleteSinkOnInit of type boolean 122 */ 123 @ConstructorProperties({"properties", "name", "jobConf", "deleteSinkOnInit"}) 124 public MapReduceFlow( Properties properties, String name, JobConf jobConf, boolean deleteSinkOnInit ) 125 { 126 this( properties, name, jobConf, null, deleteSinkOnInit, true ); 127 } 128 129 /** 130 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 131 * 132 * @param properties of type Properties 133 * @param name of type String 134 * @param jobConf of type JobConf 135 * @param flowDescriptor of type Map<String, String> 136 * @param deleteSinkOnInit of type boolean 137 */ 138 @ConstructorProperties({"properties", "name", "jobConf", "flowDescriptor", "deleteSinkOnInit"}) 139 public MapReduceFlow( Properties properties, String name, JobConf jobConf, Map<String, String> flowDescriptor, boolean deleteSinkOnInit ) 140 { 141 this( properties, name, jobConf, flowDescriptor, deleteSinkOnInit, true ); 142 } 143 144 /** 145 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 146 * 147 * @param properties of type Properties 148 * @param name of type String 149 * @param jobConf of type JobConf 150 * @param flowDescriptor of type Map<String, String> 151 * @param deleteSinkOnInit of type boolean 152 * @param stopJobsOnExit of type boolean 153 */ 154 @ConstructorProperties({"properties", "name", "jobConf", "flowDescriptor", "deleteSinkOnInit", "stopJobsOnExit"}) 155 public MapReduceFlow( Properties properties, String name, JobConf jobConf, Map<String, String> flowDescriptor, boolean deleteSinkOnInit, boolean stopJobsOnExit ) 156 { 157 super( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), properties, jobConf, name, flowDescriptor ); 158 this.deleteSinkOnInit = deleteSinkOnInit; 159 this.stopJobsOnExit = stopJobsOnExit; 160 161 setSources( createSources( jobConf ) ); 162 setSinks( createSinks( jobConf ) ); 163 setTraps( createTraps( jobConf ) ); 164 setFlowStepGraph( makeStepGraph( jobConf ) ); 165 initSteps(); 166 167 initializeNewJobsMap(); 168 } 169 170 private FlowStepGraph makeStepGraph( JobConf jobConf ) 171 { 172 FlowStepGraph flowStepGraph = new FlowStepGraph(); 173 174 Tap sink = getSinksCollection().iterator().next(); 175 FlowStep<JobConf> step = new MapReduceFlowStep( getName(), sink.toString(), jobConf, sink ); 176 177 flowStepGraph.addVertex( step ); 178 179 return flowStepGraph; 180 } 181 182 protected Map<String, Tap> createSources( JobConf jobConf ) 183 { 184 Path[] paths = FileInputFormat.getInputPaths( jobConf ); 185 186 if( paths.length == 0 ) 187 { 188 try 189 { 190 paths = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getInputPaths( new Job( jobConf ) ); 191 } 192 catch( IOException exception ) 193 { 194 throw new CascadingException( exception ); 195 } 196 } 197 198 Map<String, Tap> taps = new HashMap<String, Tap>(); 199 200 for( Path path : paths ) 201 taps.put( path.toString(), new Hfs( new NullScheme(), path.toString() ) ); 202 203 return taps; 204 } 205 206 protected Map<String, Tap> createSinks( JobConf jobConf ) 207 { 208 Map<String, Tap> taps = new HashMap<String, Tap>(); 209 210 Path path = FileOutputFormat.getOutputPath( jobConf ); 211 212 if( path == null ) 213 { 214 try 215 { 216 path = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath( new Job( jobConf ) ); 217 } 218 catch( IOException exception ) 219 { 220 throw new CascadingException( exception ); 221 } 222 } 223 224 taps.put( path.toString(), new Hfs( new NullScheme(), path.toString(), deleteSinkOnInit ? SinkMode.REPLACE : SinkMode.KEEP ) ); 225 226 return taps; 227 } 228 229 protected Map<String, Tap> createTraps( JobConf jobConf ) 230 { 231 return new HashMap<String, Tap>(); 232 } 233 }