001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop;
022
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026
027import cascading.flow.FlowNode;
028import cascading.flow.FlowProcess;
029import cascading.flow.hadoop.util.HadoopMRUtil;
030import cascading.flow.planner.BaseFlowNode;
031import cascading.flow.planner.Scope;
032import cascading.flow.planner.graph.ElementDirectedGraph;
033import cascading.flow.planner.graph.ElementGraph;
034import cascading.flow.planner.graph.ElementGraphs;
035import cascading.flow.planner.process.FlowNodeGraph;
036import cascading.flow.planner.process.ProcessEdge;
037import cascading.pipe.Pipe;
038import cascading.tap.Tap;
039import org.apache.hadoop.mapred.JobConf;
040
041/** Class MapReduceFlowStep wraps a {@link JobConf} and allows it to be executed as a {@link cascading.flow.Flow}. */
042public class MapReduceFlowStep extends HadoopFlowStep
043  {
044  public static final String MAP = "Map";
045  public static final String SHUFFLE = "Shuffle";
046  public static final String REDUCE = "Reduce";
047
048  /** Field jobConf */
049  private final JobConf jobConf;
050
051  public MapReduceFlowStep( HadoopFlow flow, JobConf jobConf )
052    {
053    if( flow == null )
054      throw new IllegalArgumentException( "flow may not be null" );
055
056    setName( jobConf.getJobName() );
057    setFlow( flow );
058
059    this.jobConf = jobConf;
060
061    configure(); // requires flow and jobConf
062    }
063
064  protected MapReduceFlowStep( HadoopFlow flow, String stepName, JobConf jobConf, Tap sink )
065    {
066    if( flow == null )
067      throw new IllegalArgumentException( "flow may not be null" );
068
069    setName( stepName );
070    setFlow( flow );
071
072    this.jobConf = jobConf;
073
074    addSink( "default", sink );
075    }
076
077  protected JobConf getJobConf()
078    {
079    return jobConf;
080    }
081
082  @Override
083  public ElementGraph getElementGraph()
084    {
085    if( elementGraph == null )
086      elementGraph = createStepElementGraph( getFlowNodeGraph() );
087
088    return elementGraph;
089    }
090
091  @Override
092  public FlowNodeGraph getFlowNodeGraph()
093    {
094    if( flowNodeGraph == null )
095      flowNodeGraph = createFlowNodeGraph( createNodeElementGraphs( jobConf ) );
096
097    return flowNodeGraph;
098    }
099
100  @Override
101  public JobConf createInitializedConfig( FlowProcess<JobConf> flowProcess, JobConf parentConfig )
102    {
103    return jobConf;
104    }
105
106  private ElementGraph createStepElementGraph( FlowNodeGraph flowNodeGraph )
107    {
108    return ElementGraphs.asElementDirectedGraph( flowNodeGraph.getElementGraphs() ).bindExtents();
109    }
110
111  private List<ElementGraph> createNodeElementGraphs( JobConf jobConf )
112    {
113    BaseMapReduceFlow baseFlow = (BaseMapReduceFlow) getFlow();
114    boolean hasReducer = HadoopMRUtil.hasReducer( jobConf );
115
116    List<ElementGraph> result = new ArrayList<>();
117    ElementGraph mapElementGraph = createElementDirectedGraph();
118    ElementGraph tailElementGraph = mapElementGraph;
119
120    Pipe headOperation = createMapOperation();
121    Pipe tailOperation = headOperation;
122
123    mapElementGraph.addVertex( headOperation );
124
125    result.add( mapElementGraph );
126
127    ElementGraph reduceElementGraph = null;
128
129    if( hasReducer )
130      {
131      Pipe shuffleOperation = createShuffleOperation();
132      mapElementGraph.addVertex( shuffleOperation );
133
134      mapElementGraph.addEdge( headOperation, shuffleOperation );
135
136      reduceElementGraph = createElementDirectedGraph();
137
138      reduceElementGraph.addVertex( shuffleOperation );
139      Pipe reduceOperation = createReduceOperation();
140      reduceElementGraph.addVertex( reduceOperation );
141
142      reduceElementGraph.addEdge( shuffleOperation, reduceOperation );
143
144      tailOperation = reduceOperation;
145      tailElementGraph = reduceElementGraph;
146
147      result.add( reduceElementGraph );
148      }
149
150    Map<String, Tap> sources = baseFlow.createSources( jobConf );
151
152    for( Map.Entry<String, Tap> entry : sources.entrySet() )
153      {
154      mapElementGraph.addVertex( entry.getValue() );
155      mapElementGraph.addEdge( entry.getValue(), headOperation, new Scope( entry.getKey() ) );
156      }
157
158    Map<String, Tap> sinks = baseFlow.createSinks( jobConf );
159
160    for( Map.Entry<String, Tap> entry : sinks.entrySet() )
161      {
162      tailElementGraph.addVertex( entry.getValue() );
163      tailElementGraph.addEdge( tailOperation, entry.getValue(), new Scope( entry.getKey() ) );
164      }
165
166    mapElementGraph.bindExtents();
167
168    if( reduceElementGraph != null )
169      reduceElementGraph.bindExtents();
170
171    return result;
172    }
173
174  protected ElementDirectedGraph createElementDirectedGraph()
175    {
176    return new ElementDirectedGraph();
177    }
178
179  protected Pipe createMapOperation()
180    {
181    return new Pipe( MAP );
182    }
183
184  protected Pipe createShuffleOperation()
185    {
186    return new Pipe( SHUFFLE );
187    }
188
189  protected Pipe createReduceOperation()
190    {
191    return new Pipe( REDUCE );
192    }
193
194  protected FlowNodeGraph createFlowNodeGraph( List<ElementGraph> elementGraphs )
195    {
196    ElementGraph mapElementGraph = elementGraphs.get( 0 );
197    ElementGraph reduceElementGraph = elementGraphs.size() == 2 ? elementGraphs.get( 1 ) : null;
198
199    FlowNodeGraph flowNodeGraph = new FlowNodeGraph();
200    int nodes = elementGraphs.size();
201
202    FlowNode mapperNode = new BaseFlowNode( mapElementGraph, String.format( "(1/%s)", nodes ), 0 );
203    flowNodeGraph.addVertex( mapperNode );
204
205    if( nodes == 2 )
206      {
207      FlowNode reducerNode = new BaseFlowNode( reduceElementGraph, "(2/2)", 1 );
208      flowNodeGraph.addVertex( reducerNode );
209      flowNodeGraph.addEdge( mapperNode, reducerNode, new ProcessEdge( mapperNode, reducerNode ) );
210      }
211
212    return flowNodeGraph;
213    }
214  }