001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop; 022 023 import java.beans.ConstructorProperties; 024 import java.util.HashMap; 025 import java.util.LinkedHashMap; 026 import java.util.Map; 027 import java.util.Properties; 028 029 import cascading.flow.FlowStep; 030 import cascading.flow.hadoop.planner.HadoopStepGraph; 031 import cascading.flow.hadoop.util.HadoopUtil; 032 import cascading.flow.planner.FlowStepGraph; 033 import cascading.scheme.NullScheme; 034 import cascading.tap.Tap; 035 import cascading.tap.hadoop.Hfs; 036 import org.apache.hadoop.fs.Path; 037 import org.apache.hadoop.mapred.FileInputFormat; 038 import org.apache.hadoop.mapred.FileOutputFormat; 039 import org.apache.hadoop.mapred.JobConf; 040 041 /** 042 * Class MapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs 043 * pre-configured via the {@link JobConf} object. 044 * <p/> 045 * Use this class to allow custom MapReduce jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If 046 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled 047 * according to their dependencies (topologically). 048 * <p/> 049 * Set the parameter {@code deleteSinkOnInit} to {@code true} if the outputPath in the jobConf should be deleted before executing the MapReduce job. 050 * <p/> 051 * MapReduceFlow assumes the underlying input and output paths are compatible with the {@link Hfs} Tap. 052 * <p/> 053 * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the 054 * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and 055 * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into 056 * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone. 057 */ 058 public class MapReduceFlow extends HadoopFlow 059 { 060 /** Field deleteSinkOnInit */ 061 protected boolean deleteSinkOnInit = false; 062 063 /** 064 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 065 * 066 * @param jobConf of type JobConf 067 */ 068 @ConstructorProperties({"jobConf"}) 069 public MapReduceFlow( JobConf jobConf ) 070 { 071 this( jobConf.getJobName(), jobConf, false ); 072 } 073 074 /** 075 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 076 * 077 * @param jobConf of type JobConf 078 * @param deleteSinkOnInit of type boolean 079 */ 080 @ConstructorProperties({"jobConf", "deleteSinkOnInit"}) 081 public MapReduceFlow( JobConf jobConf, boolean deleteSinkOnInit ) 082 { 083 this( jobConf.getJobName(), jobConf, deleteSinkOnInit ); 084 } 085 086 /** 087 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 088 * 089 * @param name of type String 090 * @param jobConf of type JobConf 091 */ 092 @ConstructorProperties({"name", "jobConf"}) 093 public MapReduceFlow( String name, JobConf jobConf ) 094 { 095 this( name, jobConf, false ); 096 } 097 098 /** 099 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 100 * 101 * @param name of type String 102 * @param jobConf of type JobConf 103 * @param deleteSinkOnInit of type boolean 104 */ 105 @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit"}) 106 public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit ) 107 { 108 this( name, jobConf, deleteSinkOnInit, true ); 109 } 110 111 /** 112 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 113 * 114 * @param name of type String 115 * @param jobConf of type JobConf 116 * @param deleteSinkOnInit of type boolean 117 * @param stopJobsOnExit of type boolean 118 */ 119 @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit", "stopJobsOnExit"}) 120 public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit, boolean stopJobsOnExit ) 121 { 122 super( HadoopUtil.getPlatformInfo(), new Properties(), jobConf, name, null ); 123 this.deleteSinkOnInit = deleteSinkOnInit; 124 this.stopJobsOnExit = stopJobsOnExit; 125 126 setSources( createSources( jobConf ) ); 127 setSinks( createSinks( jobConf ) ); 128 setTraps( createTraps( jobConf ) ); 129 setFlowStepGraph( makeStepGraph( jobConf ) ); 130 initSteps(); 131 132 initializeNewJobsMap(); 133 } 134 135 private FlowStepGraph makeStepGraph( JobConf jobConf ) 136 { 137 FlowStepGraph<JobConf> flowStepGraph = new HadoopStepGraph(); 138 139 Tap sink = getSinksCollection().iterator().next(); 140 FlowStep<JobConf> step = new MapReduceFlowStep( getName(), sink.toString(), jobConf, sink ); 141 142 flowStepGraph.addVertex( step ); 143 144 return flowStepGraph; 145 } 146 147 protected Map<String, Tap> createSources( JobConf jobConf ) 148 { 149 Path[] paths = FileInputFormat.getInputPaths( jobConf ); 150 151 Map<String, Tap> taps = new HashMap<String, Tap>(); 152 153 for( Path path : paths ) 154 taps.put( path.toString(), new Hfs( new NullScheme(), path.toString() ) ); 155 156 return taps; 157 } 158 159 protected Map<String, Tap> createSinks( JobConf jobConf ) 160 { 161 Map<String, Tap> taps = new HashMap<String, Tap>(); 162 163 String path = FileOutputFormat.getOutputPath( jobConf ).toString(); 164 165 taps.put( path, new Hfs( new NullScheme(), path, deleteSinkOnInit ) ); 166 167 return taps; 168 } 169 170 protected Map<String, Tap> createTraps( JobConf jobConf ) 171 { 172 return new HashMap<String, Tap>(); 173 } 174 }