001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.io.IOException; 025 import java.util.HashMap; 026 import java.util.Map; 027 import java.util.Properties; 028 029 import cascading.CascadingException; 030 import cascading.flow.FlowStep; 031 import cascading.flow.hadoop.planner.HadoopStepGraph; 032 import cascading.flow.hadoop.util.HadoopUtil; 033 import cascading.flow.planner.FlowStepGraph; 034 import cascading.scheme.NullScheme; 035 import cascading.tap.Tap; 036 import cascading.tap.hadoop.Hfs; 037 import org.apache.hadoop.fs.Path; 038 import org.apache.hadoop.mapred.FileInputFormat; 039 import org.apache.hadoop.mapred.FileOutputFormat; 040 import org.apache.hadoop.mapred.JobConf; 041 import org.apache.hadoop.mapreduce.Job; 042 043 /** 044 * Class MapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs 045 * pre-configured via the {@link JobConf} object. 046 * <p/> 047 * Use this class to allow custom MapReduce jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If 048 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled 049 * according to their dependencies (topologically). 050 * <p/> 051 * Set the parameter {@code deleteSinkOnInit} to {@code true} if the outputPath in the jobConf should be deleted before executing the MapReduce job. 052 * <p/> 053 * MapReduceFlow assumes the underlying input and output paths are compatible with the {@link Hfs} Tap. 054 * <p/> 055 * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the 056 * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and 057 * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into 058 * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone. 059 * <p/> 060 * MapReduceFlow supports both org.apache.hadoop.mapred.* and org.apache.hadoop.mapreduce.* API Jobs. 061 */ 062 public class MapReduceFlow extends HadoopFlow 063 { 064 /** Field deleteSinkOnInit */ 065 protected boolean deleteSinkOnInit = false; 066 067 /** 068 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 069 * 070 * @param jobConf of type JobConf 071 */ 072 @ConstructorProperties( {"jobConf"} ) 073 public MapReduceFlow( JobConf jobConf ) 074 { 075 this( jobConf.getJobName(), jobConf, false ); 076 } 077 078 /** 079 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 080 * 081 * @param jobConf of type JobConf 082 * @param deleteSinkOnInit of type boolean 083 */ 084 @ConstructorProperties( {"jobConf", "deleteSinkOnInit"} ) 085 public MapReduceFlow( JobConf jobConf, boolean deleteSinkOnInit ) 086 { 087 this( jobConf.getJobName(), jobConf, deleteSinkOnInit ); 088 } 089 090 /** 091 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 092 * 093 * @param name of type String 094 * @param jobConf of type JobConf 095 */ 096 @ConstructorProperties( {"name", "jobConf"} ) 097 public MapReduceFlow( String name, JobConf jobConf ) 098 { 099 this( name, jobConf, false ); 100 } 101 102 /** 103 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 104 * 105 * @param name of type String 106 * @param jobConf of type JobConf 107 * @param deleteSinkOnInit of type boolean 108 */ 109 @ConstructorProperties( {"name", "jobConf", "deleteSinkOnInit"} ) 110 public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit ) 111 { 112 this( name, jobConf, deleteSinkOnInit, true ); 113 } 114 115 /** 116 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 117 * 118 * @param name of type String 119 * @param jobConf of type JobConf 120 * @param deleteSinkOnInit of type boolean 121 * @param stopJobsOnExit of type boolean 122 */ 123 @ConstructorProperties( {"name", "jobConf", "deleteSinkOnInit", "stopJobsOnExit"} ) 124 public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit, boolean stopJobsOnExit ) 125 { 126 super( HadoopUtil.getPlatformInfo(), new Properties(), jobConf, name, null ); 127 this.deleteSinkOnInit = deleteSinkOnInit; 128 this.stopJobsOnExit = stopJobsOnExit; 129 130 setSources( createSources( jobConf ) ); 131 setSinks( createSinks( jobConf ) ); 132 setTraps( createTraps( jobConf ) ); 133 setFlowStepGraph( makeStepGraph( jobConf ) ); 134 initSteps(); 135 136 initializeNewJobsMap(); 137 } 138 139 private FlowStepGraph makeStepGraph( JobConf jobConf ) 140 { 141 FlowStepGraph<JobConf> flowStepGraph = new HadoopStepGraph(); 142 143 Tap sink = getSinksCollection().iterator().next(); 144 FlowStep<JobConf> step = new MapReduceFlowStep( getName(), sink.toString(), jobConf, sink ); 145 146 flowStepGraph.addVertex( step ); 147 148 return flowStepGraph; 149 } 150 151 protected Map<String, Tap> createSources( JobConf jobConf ) 152 { 153 Path[] paths = FileInputFormat.getInputPaths( jobConf ); 154 155 if( paths.length == 0 ) 156 { 157 try 158 { 159 paths = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getInputPaths( new Job( jobConf ) ); 160 } 161 catch( IOException exception ) 162 { 163 throw new CascadingException( exception ); 164 } 165 } 166 167 Map<String, Tap> taps = new HashMap<String, Tap>(); 168 169 for( Path path : paths ) 170 taps.put( path.toString(), new Hfs( new NullScheme(), path.toString() ) ); 171 172 return taps; 173 } 174 175 protected Map<String, Tap> createSinks( JobConf jobConf ) 176 { 177 Map<String, Tap> taps = new HashMap<String, Tap>(); 178 179 Path path = FileOutputFormat.getOutputPath( jobConf ); 180 181 if( path == null ) 182 { 183 try 184 { 185 path = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath( new Job( jobConf ) ); 186 } 187 catch( IOException exception ) 188 { 189 throw new CascadingException( exception ); 190 } 191 } 192 193 taps.put( path.toString(), new Hfs( new NullScheme(), path.toString(), deleteSinkOnInit ) ); 194 195 return taps; 196 } 197 198 protected Map<String, Tap> createTraps( JobConf jobConf ) 199 { 200 return new HashMap<String, Tap>(); 201 } 202 }