001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.flow.hadoop;
022    
023    import java.beans.ConstructorProperties;
024    import java.util.HashMap;
025    import java.util.LinkedHashMap;
026    import java.util.Map;
027    import java.util.Properties;
028    
029    import cascading.flow.FlowStep;
030    import cascading.flow.hadoop.planner.HadoopStepGraph;
031    import cascading.flow.hadoop.util.HadoopUtil;
032    import cascading.flow.planner.FlowStepGraph;
033    import cascading.scheme.NullScheme;
034    import cascading.tap.Tap;
035    import cascading.tap.hadoop.Hfs;
036    import org.apache.hadoop.fs.Path;
037    import org.apache.hadoop.mapred.FileInputFormat;
038    import org.apache.hadoop.mapred.FileOutputFormat;
039    import org.apache.hadoop.mapred.JobConf;
040    
041    /**
042     * Class MapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs
043     * pre-configured via the {@link JobConf} object.
044     * <p/>
045     * Use this class to allow custom MapReduce jobs to participate in the {@link cascading.cascade.Cascade} scheduler. If
046     * other Flow instances in the Cascade share resources with this Flow instance, all participants will be scheduled
047     * according to their dependencies (topologically).
048     * <p/>
049     * Set the parameter {@code deleteSinkOnInit} to {@code true} if the outputPath in the jobConf should be deleted before executing the MapReduce job.
050     * <p/>
051     * MapReduceFlow assumes the underlying input and output paths are compatible with the {@link Hfs} Tap.
052     * <p/>
053     * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the
054     * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and
055     * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into
056     * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone.
057     */
058    public class MapReduceFlow extends HadoopFlow
059      {
060      /** Field deleteSinkOnInit */
061      protected boolean deleteSinkOnInit = false;
062    
063      /**
064       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
065       *
066       * @param jobConf of type JobConf
067       */
068      @ConstructorProperties({"jobConf"})
069      public MapReduceFlow( JobConf jobConf )
070        {
071        this( jobConf.getJobName(), jobConf, false );
072        }
073    
074      /**
075       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
076       *
077       * @param jobConf          of type JobConf
078       * @param deleteSinkOnInit of type boolean
079       */
080      @ConstructorProperties({"jobConf", "deleteSinkOnInit"})
081      public MapReduceFlow( JobConf jobConf, boolean deleteSinkOnInit )
082        {
083        this( jobConf.getJobName(), jobConf, deleteSinkOnInit );
084        }
085    
086      /**
087       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
088       *
089       * @param name    of type String
090       * @param jobConf of type JobConf
091       */
092      @ConstructorProperties({"name", "jobConf"})
093      public MapReduceFlow( String name, JobConf jobConf )
094        {
095        this( name, jobConf, false );
096        }
097    
098      /**
099       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
100       *
101       * @param name             of type String
102       * @param jobConf          of type JobConf
103       * @param deleteSinkOnInit of type boolean
104       */
105      @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit"})
106      public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit )
107        {
108        this( name, jobConf, deleteSinkOnInit, true );
109        }
110    
111      /**
112       * Constructor MapReduceFlow creates a new MapReduceFlow instance.
113       *
114       * @param name             of type String
115       * @param jobConf          of type JobConf
116       * @param deleteSinkOnInit of type boolean
117       * @param stopJobsOnExit   of type boolean
118       */
119      @ConstructorProperties({"name", "jobConf", "deleteSinkOnInit", "stopJobsOnExit"})
120      public MapReduceFlow( String name, JobConf jobConf, boolean deleteSinkOnInit, boolean stopJobsOnExit )
121        {
122        super( HadoopUtil.getPlatformInfo(), new Properties(), jobConf, name, null );
123        this.deleteSinkOnInit = deleteSinkOnInit;
124        this.stopJobsOnExit = stopJobsOnExit;
125    
126        setSources( createSources( jobConf ) );
127        setSinks( createSinks( jobConf ) );
128        setTraps( createTraps( jobConf ) );
129        setFlowStepGraph( makeStepGraph( jobConf ) );
130        initSteps();
131    
132        initializeNewJobsMap();
133        }
134    
135      private FlowStepGraph makeStepGraph( JobConf jobConf )
136        {
137        FlowStepGraph<JobConf> flowStepGraph = new HadoopStepGraph();
138    
139        Tap sink = getSinksCollection().iterator().next();
140        FlowStep<JobConf> step = new MapReduceFlowStep( getName(), sink.toString(), jobConf, sink );
141    
142        flowStepGraph.addVertex( step );
143    
144        return flowStepGraph;
145        }
146    
147      protected Map<String, Tap> createSources( JobConf jobConf )
148        {
149        Path[] paths = FileInputFormat.getInputPaths( jobConf );
150    
151        Map<String, Tap> taps = new HashMap<String, Tap>();
152    
153        for( Path path : paths )
154          taps.put( path.toString(), new Hfs( new NullScheme(), path.toString() ) );
155    
156        return taps;
157        }
158    
159      protected Map<String, Tap> createSinks( JobConf jobConf )
160        {
161        Map<String, Tap> taps = new HashMap<String, Tap>();
162    
163        String path = FileOutputFormat.getOutputPath( jobConf ).toString();
164    
165        taps.put( path, new Hfs( new NullScheme(), path, deleteSinkOnInit ) );
166    
167        return taps;
168        }
169    
170      protected Map<String, Tap> createTraps( JobConf jobConf )
171        {
172        return new HashMap<String, Tap>();
173        }
174      }