001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop; 022 023import java.io.IOException; 024import java.util.HashMap; 025import java.util.Map; 026 027import cascading.CascadingException; 028import cascading.flow.FlowStep; 029import cascading.flow.planner.PlatformInfo; 030import cascading.flow.planner.process.FlowStepGraph; 031import cascading.scheme.NullScheme; 032import cascading.tap.SinkMode; 033import cascading.tap.Tap; 034import cascading.tap.hadoop.Hfs; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.mapred.FileInputFormat; 037import org.apache.hadoop.mapred.FileOutputFormat; 038import org.apache.hadoop.mapred.JobConf; 039import org.apache.hadoop.mapreduce.Job; 040 041/** 042 * 043 */ 044public class BaseMapReduceFlow extends HadoopFlow 045 { 046 /** Field deleteSinkOnInit */ 047 protected boolean deleteSinkOnInit = false; 048 049 protected BaseMapReduceFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, String name, Map<String, String> flowDescriptor, boolean deleteSinkOnInit ) 050 { 051 super( platformInfo, properties, jobConf, name, flowDescriptor ); 052 this.deleteSinkOnInit = deleteSinkOnInit; 053 } 054 055 protected BaseMapReduceFlow( PlatformInfo platformInfo, Map<Object, Object> properties, String name, Map<String, String> flowDescriptor, boolean deleteSinkOnInit ) 056 { 057 super( platformInfo, properties, new JobConf(), name, flowDescriptor ); 058 this.deleteSinkOnInit = deleteSinkOnInit; 059 } 060 061 protected FlowStepGraph makeStepGraph( JobConf jobConf ) 062 { 063 FlowStepGraph flowStepGraph = new FlowStepGraph(); 064 065 Tap sink = getSinksCollection().iterator().next(); 066 FlowStep<JobConf> step = createFlowStep( jobConf, sink ); 067 068 flowStepGraph.addVertex( step ); 069 070 return flowStepGraph; 071 } 072 073 protected FlowStep<JobConf> createFlowStep( JobConf jobConf, Tap sink ) 074 { 075 return new MapReduceFlowStep( this, sink.toString(), jobConf, sink ); 076 } 077 078 protected Map<String, Tap> createSources( JobConf jobConf ) 079 { 080 return fileInputToTaps( jobConf ); 081 } 082 083 protected Map<String, Tap> fileInputToTaps( JobConf jobConf ) 084 { 085 Path[] paths = FileInputFormat.getInputPaths( jobConf ); 086 087 if( paths == null || paths.length == 0 ) 088 { 089 try 090 { 091 paths = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getInputPaths( new Job( jobConf ) ); 092 } 093 catch( IOException exception ) 094 { 095 throw new CascadingException( exception ); 096 } 097 } 098 099 Map<String, Tap> taps = new HashMap<>(); 100 101 if( paths == null ) 102 return taps; 103 104 for( Path path : paths ) 105 toSourceTap( jobConf, taps, path ); 106 107 return taps; 108 } 109 110 protected Tap toSourceTap( JobConf jobConf, Map<String, Tap> taps, Path path ) 111 { 112 String name = makeNameFromPath( taps, path ); 113 114 return taps.put( name, createTap( jobConf, path, SinkMode.KEEP ) ); 115 } 116 117 protected Map<String, Tap> createSinks( JobConf jobConf ) 118 { 119 return fileOutputToTaps( jobConf ); 120 } 121 122 protected Map<String, Tap> fileOutputToTaps( JobConf jobConf ) 123 { 124 Path path = FileOutputFormat.getOutputPath( jobConf ); 125 126 if( path == null ) 127 { 128 try 129 { 130 path = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath( new Job( jobConf ) ); 131 } 132 catch( IOException exception ) 133 { 134 throw new CascadingException( exception ); 135 } 136 } 137 138 Map<String, Tap> taps = new HashMap<>(); 139 140 if( path != null ) 141 toSinkTap( jobConf, taps, path ); 142 143 return taps; 144 } 145 146 protected Tap toSinkTap( JobConf jobConf, Map<String, Tap> taps, Path path ) 147 { 148 String name = makeNameFromPath( taps, path ); 149 150 SinkMode sinkMode = deleteSinkOnInit ? SinkMode.REPLACE : SinkMode.KEEP; 151 152 return taps.put( name, createTap( jobConf, path, sinkMode ) ); 153 } 154 155 protected Tap createTap( JobConf jobConf, Path path, SinkMode sinkMode ) 156 { 157 return new Hfs( new NullScheme(), path.toString(), sinkMode ); 158 } 159 160 // find the least sensitive name 161 protected String makeNameFromPath( Map<String, Tap> taps, Path path ) 162 { 163 Path parent = path.getParent(); 164 String name = path.getName(); 165 166 while( taps.containsKey( name ) ) 167 { 168 name = new Path( parent.getName(), name ).toString(); 169 parent = parent.getParent(); 170 } 171 172 return name; 173 } 174 175 protected Map<String, Tap> createTraps( JobConf jobConf ) 176 { 177 return new HashMap<>(); 178 } 179 }