001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.beans.ConstructorProperties; 024import java.util.Map; 025import java.util.Properties; 026 027import cascading.flow.hadoop.util.HadoopUtil; 028import cascading.tap.Tap; 029import cascading.tap.hadoop.Hfs; 030import org.apache.hadoop.mapred.JobConf; 031 032/** 033 * Class MapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs 034 * pre-configured via the {@link JobConf} object. 035 * <p/> 036 * Use this class to allow custom MapReduce jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If 037 * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled 038 * according to their dependencies (topologically). 039 * <p/> 040 * Set the parameter {@code deleteSinkOnInit} to {@code true} if the outputPath in the jobConf should be deleted before executing the MapReduce job. 041 * <p/> 042 * MapReduceFlow assumes the underlying input and output paths are compatible with the {@link Hfs} Tap. 043 * <p/> 044 * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the 045 * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and 046 * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into 047 * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone. 048 * <p/> 049 * MapReduceFlow supports both org.apache.hadoop.mapred.* and org.apache.hadoop.mapreduce.* API Jobs. 050 */ 051public class MapReduceFlow extends BaseMapReduceFlow 052 { 053 /** 054 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 055 * 056 * @param jobConf of type JobConf 057 */ 058 @ConstructorProperties({"jobConf"}) 059 public MapReduceFlow( JobConf jobConf ) 060 { 061 this( jobConf.getJobName(), jobConf, false ); 062 } 063 064 /** 065 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 066 * 067 * @param jobConf of type JobConf 068 * @param deleteSinkOnInit of type boolean 069 */ 070 @ConstructorProperties({"jobConf", "deleteSinkOnInit"}) 071 public MapReduceFlow( JobConf jobConf, boolean deleteSinkOnInit ) 072 { 073 this( jobConf.getJobName(), jobConf, deleteSinkOnInit ); 074 } 075 076 /** 077 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 078 * 079 * @param name of type String 080 * @param jobConf of type JobConf 081 */ 082 @ConstructorProperties({"name", "jobConf"}) 083 public MapReduceFlow( String name, JobConf jobConf ) 084 { 085 this( name, jobConf, false ); 086 } 087 088 /** 089 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 090 * 091 * @param name of type String 092 * @param jobConf of type JobConf 093 * @param deleteSinkOnInit of type boolean 094 */ 095 @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit"}) 096 public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit ) 097 { 098 this( new Properties(), name, jobConf, null, deleteSinkOnInit, true ); 099 } 100 101 /** 102 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 103 * 104 * @param properties of type Properties 105 * @param name of type String 106 * @param jobConf of type JobConf 107 * @param deleteSinkOnInit of type boolean 108 */ 109 @ConstructorProperties({"properties", "name", "jobConf", "deleteSinkOnInit"}) 110 public MapReduceFlow( Properties properties, String name, JobConf jobConf, boolean deleteSinkOnInit ) 111 { 112 this( properties, name, jobConf, null, deleteSinkOnInit, true ); 113 } 114 115 /** 116 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 117 * 118 * @param properties of type Properties 119 * @param name of type String 120 * @param jobConf of type JobConf 121 * @param flowDescriptor of type Map<String, String> 122 * @param deleteSinkOnInit of type boolean 123 */ 124 @ConstructorProperties({"properties", "name", "jobConf", "flowDescriptor", "deleteSinkOnInit"}) 125 public MapReduceFlow( Properties properties, String name, JobConf jobConf, Map<String, String> flowDescriptor, boolean deleteSinkOnInit ) 126 { 127 this( properties, name, jobConf, flowDescriptor, deleteSinkOnInit, true ); 128 } 129 130 /** 131 * Constructor MapReduceFlow creates a new MapReduceFlow instance. 132 * 133 * @param properties of type Properties 134 * @param name of type String 135 * @param jobConf of type JobConf 136 * @param flowDescriptor of type Map<String, String> 137 * @param deleteSinkOnInit of type boolean 138 * @param stopJobsOnExit of type boolean 139 */ 140 @ConstructorProperties({"properties", "name", "jobConf", "flowDescriptor", "deleteSinkOnInit", "stopJobsOnExit"}) 141 public MapReduceFlow( Properties properties, String name, JobConf jobConf, Map<String, String> flowDescriptor, boolean deleteSinkOnInit, boolean stopJobsOnExit ) 142 { 143 super( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), properties, jobConf, name, flowDescriptor, deleteSinkOnInit ); 144 this.stopJobsOnExit = stopJobsOnExit; 145 146 initializeFrom( jobConf ); // push off initialization allowing for overrides 147 } 148 149 protected void initializeFrom( JobConf jobConf ) 150 { 151 setSources( createSources( jobConf ) ); 152 setSinks( createSinks( jobConf ) ); 153 setTraps( createTraps( jobConf ) ); 154 setFlowStepGraph( makeStepGraph( jobConf ) ); 155 156 // this mirrors BaseFlow#initialize() 157 158 initSteps(); 159 160 this.flowStats = createPrepareFlowStats(); // must be last 161 162 initializeNewJobsMap(); 163 164 initializeChildStats(); 165 } 166 }