001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.io.IOException;
024import java.util.HashMap;
025import java.util.Map;
026
027import cascading.CascadingException;
028import cascading.flow.FlowStep;
029import cascading.flow.planner.PlatformInfo;
030import cascading.flow.planner.process.FlowStepGraph;
031import cascading.scheme.NullScheme;
032import cascading.tap.SinkMode;
033import cascading.tap.Tap;
034import cascading.tap.hadoop.Hfs;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.mapred.FileInputFormat;
037import org.apache.hadoop.mapred.FileOutputFormat;
038import org.apache.hadoop.mapred.JobConf;
039import org.apache.hadoop.mapreduce.Job;
040
041/**
042 *
043 */
044public class BaseMapReduceFlow extends HadoopFlow
045  {
046  /** Field deleteSinkOnInit */
047  protected boolean deleteSinkOnInit = false;
048
049  protected BaseMapReduceFlow( PlatformInfo platformInfo, Map<Object, Object> properties, JobConf jobConf, String name, Map<String, String> flowDescriptor, boolean deleteSinkOnInit )
050    {
051    super( platformInfo, properties, jobConf, name, flowDescriptor );
052    this.deleteSinkOnInit = deleteSinkOnInit;
053    }
054
055  protected BaseMapReduceFlow( PlatformInfo platformInfo, Map<Object, Object> properties, String name, Map<String, String> flowDescriptor, boolean deleteSinkOnInit )
056    {
057    super( platformInfo, properties, new JobConf(), name, flowDescriptor );
058    this.deleteSinkOnInit = deleteSinkOnInit;
059    }
060
061  protected FlowStepGraph makeStepGraph( JobConf jobConf )
062    {
063    FlowStepGraph flowStepGraph = new FlowStepGraph();
064
065    Tap sink = getSinksCollection().iterator().next();
066    FlowStep<JobConf> step = createFlowStep( jobConf, sink );
067
068    flowStepGraph.addVertex( step );
069
070    return flowStepGraph;
071    }
072
073  protected FlowStep<JobConf> createFlowStep( JobConf jobConf, Tap sink )
074    {
075    return new MapReduceFlowStep( this, sink.toString(), jobConf, sink );
076    }
077
078  protected Map<String, Tap> createSources( JobConf jobConf )
079    {
080    return fileInputToTaps( jobConf );
081    }
082
083  protected Map<String, Tap> fileInputToTaps( JobConf jobConf )
084    {
085    Path[] paths = FileInputFormat.getInputPaths( jobConf );
086
087    if( paths == null || paths.length == 0 )
088      {
089      try
090        {
091        paths = org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getInputPaths( new Job( jobConf ) );
092        }
093      catch( IOException exception )
094        {
095        throw new CascadingException( exception );
096        }
097      }
098
099    Map<String, Tap> taps = new HashMap<>();
100
101    if( paths == null )
102      return taps;
103
104    for( Path path : paths )
105      toSourceTap( jobConf, taps, path );
106
107    return taps;
108    }
109
110  protected Tap toSourceTap( JobConf jobConf, Map<String, Tap> taps, Path path )
111    {
112    String name = makeNameFromPath( taps, path );
113
114    return taps.put( name, createTap( jobConf, path, SinkMode.KEEP ) );
115    }
116
117  protected Map<String, Tap> createSinks( JobConf jobConf )
118    {
119    return fileOutputToTaps( jobConf );
120    }
121
122  protected Map<String, Tap> fileOutputToTaps( JobConf jobConf )
123    {
124    Path path = FileOutputFormat.getOutputPath( jobConf );
125
126    if( path == null )
127      {
128      try
129        {
130        path = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.getOutputPath( new Job( jobConf ) );
131        }
132      catch( IOException exception )
133        {
134        throw new CascadingException( exception );
135        }
136      }
137
138    Map<String, Tap> taps = new HashMap<>();
139
140    if( path != null )
141      toSinkTap( jobConf, taps, path );
142
143    return taps;
144    }
145
146  protected Tap toSinkTap( JobConf jobConf, Map<String, Tap> taps, Path path )
147    {
148    String name = makeNameFromPath( taps, path );
149
150    SinkMode sinkMode = deleteSinkOnInit ? SinkMode.REPLACE : SinkMode.KEEP;
151
152    return taps.put( name, createTap( jobConf, path, sinkMode ) );
153    }
154
155  protected Tap createTap( JobConf jobConf, Path path, SinkMode sinkMode )
156    {
157    return new Hfs( new NullScheme(), path.toString(), sinkMode );
158    }
159
160  // find the least sensitive name
161  protected String makeNameFromPath( Map<String, Tap> taps, Path path )
162    {
163    Path parent = path.getParent();
164    String name = path.getName();
165
166    while( taps.containsKey( name ) )
167      {
168      name = new Path( parent.getName(), name ).toString();
169      parent = parent.getParent();
170      }
171
172    return name;
173    }
174
175  protected Map<String, Tap> createTraps( JobConf jobConf )
176    {
177    return new HashMap<>();
178    }
179  }