001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Properties;
030import java.util.concurrent.ExecutionException;
031
032import cascading.flow.hadoop.util.HadoopUtil;
033import cascading.flow.planner.PlatformInfo;
034import cascading.flow.planner.process.FlowStepGraph;
035import cascading.tap.SinkMode;
036import cascading.tap.Tap;
037import cascading.tap.hadoop.Hfs;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.mapred.JobConf;
040
041import static cascading.flow.planner.graph.ElementGraphs.asFlowElementGraph;
042import static cascading.util.Util.asList;
043
044/**
045 * Class MultiMapReduceFlow is a {@link cascading.flow.hadoop.HadoopFlow} subclass that supports custom MapReduce jobs
046 * pre-configured via one or more {@link JobConf} objects.
047 * <p/>
048 * Use this class to group multiple JobConf instances together as a single Flow. MultiMapReduceFlow will automatically
049 * topologically order the JobConf instances and schedule them on the cluster once {@link #start()} or {@link #complete()}
050 * are called.
051 * <p>
052 * If you have a single JobConf instance, see {@link MapReduceFlow} as a alternative to this class.
053 * <p>
054 * This class will not delete any sinks before execution, it is up to the developer to make sure any intermediate and
055 * sink paths be removed/deleted before calling {@link #start()} or {@link #complete()}, otherwise Hadoop will throw
056 * an exception.
057 * <p>
058 * JobConf instances can be incrementally added at any point before the {@link #complete()} method is called. But they must
059 * logically (topologically) come after any previously provided JobConf instances. In practice the Flow will fail if
060 * the input source path is missing because a prior JobConf was not provided before the Flow was started.
061 * <p>
062 * The ordering is done by comparing the input and output paths of the given JobConf instances. By default, this class
063 * only works with JobConf instances that read and write from the Hadoop FileSystem (HDFS) (any path that would work
064 * with the {@link Hfs} Tap.
065 * <p>
066 * If the configured JobConf instance uses some other identifier instead of Hadoop FS paths, you should override the
067 * {@link #createSources(org.apache.hadoop.mapred.JobConf)}, {@link #createSinks(org.apache.hadoop.mapred.JobConf)}, and
068 * {@link #createTraps(org.apache.hadoop.mapred.JobConf)} methods to properly resolve the configured paths into
069 * usable {@link Tap} instances. By default createTraps returns an empty collection and should probably be left alone.
070 * <p/>
071 * MultiMapReduceFlow supports both org.apache.hadoop.mapred.* and org.apache.hadoop.mapreduce.* API Jobs.
072 */
073public class MultiMapReduceFlow extends BaseMapReduceFlow
074  {
075  /** Field tapCache */
076  private Map<String, Tap> tapCache = new HashMap<>();
077  /** Field queuedSteps */
078  private List<MapReduceFlowStep> queuedSteps = new LinkedList<>();
079  /** Field completeCalled */
080  private volatile boolean completeCalled = false;
081  /** Field block */
082  private final Object lock = new Object();
083
084  /**
085   * Constructor MultiMapReduceFlow creates a new MultiMapReduceFlow instance.
086   *
087   * @param name     of String
088   * @param jobConf  of JobConf
089   * @param jobConfs of JobConf...
090   */
091  public MultiMapReduceFlow( String name, JobConf jobConf, JobConf... jobConfs )
092    {
093    this( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), new Properties(), name );
094
095    initializeFrom( asList( jobConf, jobConfs ) );
096    }
097
098  /**
099   * Constructor MultiMapReduceFlow creates a new MultiMapReduceFlow instance.
100   *
101   * @param properties of Map<Object, Object>
102   * @param name       of String
103   * @param jobConf    of JobConf
104   * @param jobConfs   of JobConf...
105   */
106  public MultiMapReduceFlow( Map<Object, Object> properties, String name, JobConf jobConf, JobConf... jobConfs )
107    {
108    this( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), properties, name, null );
109
110    initializeFrom( asList( jobConf, jobConfs ) );
111    }
112
113  /**
114   * Constructor MultiMapReduceFlow creates a new MultiMapReduceFlow instance.
115   *
116   * @param properties     of Map<Object, Object>
117   * @param name           of String
118   * @param flowDescriptor of Map<String, String>
119   * @param jobConf        of JobConf
120   * @param jobConfs       of JobConf...
121   */
122  public MultiMapReduceFlow( Map<Object, Object> properties, String name, Map<String, String> flowDescriptor, JobConf jobConf, JobConf... jobConfs )
123    {
124    this( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), properties, name, flowDescriptor );
125
126    initializeFrom( asList( jobConf, jobConfs ) );
127    }
128
129  /**
130   * Constructor MultiMapReduceFlow creates a new MultiMapReduceFlow instance.
131   *
132   * @param properties     of Map<Object, Object>
133   * @param name           of String
134   * @param flowDescriptor of Map<String, String>
135   * @param stopJobsOnExit of boolean
136   * @param jobConf        of JobConf
137   * @param jobConfs       of JobConf...
138   */
139  public MultiMapReduceFlow( Map<Object, Object> properties, String name, Map<String, String> flowDescriptor, boolean stopJobsOnExit, JobConf jobConf, JobConf... jobConfs )
140    {
141    this( HadoopUtil.getPlatformInfo( JobConf.class, "org/apache/hadoop", "Hadoop MR" ), properties, name, flowDescriptor );
142    this.stopJobsOnExit = stopJobsOnExit;
143
144    initializeFrom( asList( jobConf, jobConfs ) );
145    }
146
147  /**
148   * Constructor MultiMapReduceFlow creates a new MultiMapReduceFlow instance.
149   *
150   * @param platformInfo of PlatformInfo
151   * @param properties   of Map<Object, Object>
152   * @param name         of String
153   */
154  protected MultiMapReduceFlow( PlatformInfo platformInfo, Map<Object, Object> properties, String name )
155    {
156    this( platformInfo, properties, name, null );
157    }
158
159  /**
160   * Constructor MultiMapReduceFlow creates a new MultiMapReduceFlow instance.
161   *
162   * @param platformInfo   of PlatformInfo
163   * @param properties     of Map<Object, Object>
164   * @param name           of String
165   * @param flowDescriptor of Map<String, String>
166   */
167  protected MultiMapReduceFlow( PlatformInfo platformInfo, Map<Object, Object> properties, String name, Map<String, String> flowDescriptor )
168    {
169    super( platformInfo, properties, name, flowDescriptor, false );
170    }
171
172  protected void initializeFrom( List<JobConf> jobConfs )
173    {
174    List<MapReduceFlowStep> steps = new ArrayList<>();
175
176    for( JobConf jobConf : jobConfs )
177      steps.add( createMapReduceFlowStep( jobConf ) );
178
179    updateWithFlowSteps( steps );
180    }
181
182  protected MapReduceFlowStep createMapReduceFlowStep( JobConf jobConf )
183    {
184    return new MapReduceFlowStep( this, jobConf );
185    }
186
187  public void notifyComplete()
188    {
189    completeCalled = true;
190
191    synchronized( lock )
192      {
193      // forces blockingContinuePollingSteps to stop blocking
194      lock.notifyAll();
195      }
196    }
197
198  @Override
199  public void complete()
200    {
201    notifyComplete();
202
203    super.complete();
204    }
205
206  @Override
207  protected boolean spawnSteps() throws InterruptedException, ExecutionException
208    {
209    // continue to spawn jobs until no longer required
210    while( !stop && throwable == null )
211      {
212      if( !blockingContinuePollingSteps() )
213        return true;
214
215      if( isInfoEnabled() )
216        {
217        logInfo( "updated" );
218
219        for( Tap source : getSourcesCollection() )
220          logInfo( " source: " + source );
221        for( Tap sink : getSinksCollection() )
222          logInfo( " sink: " + sink );
223        }
224
225      // will not return until all current steps are complete, or one failed
226      if( !super.spawnSteps() )
227        return false;
228      }
229
230    return true;
231    }
232
233  protected boolean blockingContinuePollingSteps()
234    {
235    synchronized( lock )
236      {
237      // block until queue has items, or complete is called
238      while( queuedSteps.isEmpty() && !completeCalled )
239        {
240        try
241          {
242          lock.wait();
243          }
244        catch( InterruptedException exception )
245          {
246          // do nothing
247          }
248        }
249
250      updateWithFlowSteps( queuedSteps ).clear();
251      }
252
253    if( getEligibleJobsSize() != 0 ) // new ones were added
254      return true;
255
256    return !completeCalled;
257    }
258
259  @Override
260  protected Tap createTap( JobConf jobConf, Path path, SinkMode sinkMode )
261    {
262    Tap tap = tapCache.get( path.toString() );
263
264    if( tap == null )
265      {
266      tap = super.createTap( jobConf, path, sinkMode );
267      tapCache.put( path.toString(), tap );
268      }
269
270    return tap;
271    }
272
273  public void attachFlowStep( JobConf jobConf )
274    {
275    if( completeCalled )
276      throw new IllegalStateException( "cannot attach new FlowStep after complete() has been called" );
277
278    addFlowStep( createMapReduceFlowStep( jobConf ) );
279    }
280
281  protected void addFlowStep( MapReduceFlowStep flowStep )
282    {
283    synchronized( lock )
284      {
285      queuedSteps.add( flowStep );
286      lock.notifyAll();
287      }
288    }
289
290  protected FlowStepGraph getOrCreateFlowStepGraph()
291    {
292    FlowStepGraph flowStepGraph = getFlowStepGraph();
293
294    if( flowStepGraph == null )
295      {
296      flowStepGraph = new FlowStepGraph();
297      setFlowStepGraph( flowStepGraph );
298      }
299
300    return flowStepGraph;
301    }
302
303  protected Collection<MapReduceFlowStep> updateWithFlowSteps( Collection<MapReduceFlowStep> flowSteps )
304    {
305    if( flowSteps.isEmpty() )
306      return flowSteps;
307
308    FlowStepGraph flowStepGraph = getOrCreateFlowStepGraph();
309
310    updateFlowStepGraph( flowStepGraph, flowSteps );
311
312    setFlowElementGraph( asFlowElementGraph( platformInfo, flowStepGraph ) );
313
314    removeListeners( getSourcesCollection() );
315    removeListeners( getSinksCollection() );
316    removeListeners( getTrapsCollection() );
317
318    // re-adds listeners
319    setSources( flowStepGraph.getSourceTapsMap() );
320    setSinks( flowStepGraph.getSinkTapsMap() );
321    setTraps( flowStepGraph.getTrapsMap() );
322
323    // this mirrors BaseFlow#initialize()
324
325    initSteps();
326
327    if( flowStats == null )
328      flowStats = createPrepareFlowStats(); // must be last
329
330    if( !isJobsMapInitialized() )
331      initializeNewJobsMap();
332    else
333      updateJobsMap();
334
335    initializeChildStats();
336
337    return flowSteps;
338    }
339
340  protected FlowStepGraph updateFlowStepGraph( FlowStepGraph flowStepGraph, Collection<MapReduceFlowStep> flowSteps )
341    {
342    for( MapReduceFlowStep flowStep : flowSteps )
343      flowStepGraph.addVertex( flowStep );
344
345    flowStepGraph.bindEdges();
346
347    return flowStepGraph;
348    }
349  }