001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.hadoop.stream.graph;
023
024import java.io.IOException;
025import java.util.HashMap;
026import java.util.Map;
027import java.util.Set;
028
029import cascading.flow.FlowException;
030import cascading.flow.FlowNode;
031import cascading.flow.FlowProcess;
032import cascading.flow.hadoop.HadoopFlowProcess;
033import cascading.flow.hadoop.stream.HadoopMemoryJoinGate;
034import cascading.flow.hadoop.stream.element.HadoopCoGroupGate;
035import cascading.flow.hadoop.stream.element.HadoopGroupByGate;
036import cascading.flow.hadoop.stream.element.HadoopSinkStage;
037import cascading.flow.hadoop.util.HadoopUtil;
038import cascading.flow.planner.graph.ElementGraphs;
039import cascading.flow.stream.duct.Gate;
040import cascading.flow.stream.element.GroupingSpliceGate;
041import cascading.flow.stream.element.SinkStage;
042import cascading.flow.stream.element.SourceStage;
043import cascading.flow.stream.graph.IORole;
044import cascading.flow.stream.graph.NodeStreamGraph;
045import cascading.pipe.CoGroup;
046import cascading.pipe.GroupBy;
047import cascading.pipe.HashJoin;
048import cascading.tap.Tap;
049import org.apache.hadoop.mapred.JobConf;
050import org.apache.hadoop.mapred.Reporter;
051
052/**
053 *
054 */
055public class HadoopMapStreamGraph extends NodeStreamGraph
056  {
057  private final Tap source;
058  private SourceStage streamedHead;
059
060  public HadoopMapStreamGraph( HadoopFlowProcess flowProcess, FlowNode node, Tap source )
061    {
062    super( flowProcess, node, source );
063    this.source = source;
064
065    buildGraph();
066
067    setTraps();
068    setScopes();
069
070    printGraph( node.getID(), "map", flowProcess.getCurrentSliceNum() );
071
072    bind();
073
074    printBoundGraph( node.getID(), "map", flowProcess.getCurrentSliceNum() );
075    }
076
077  public SourceStage getStreamedHead()
078    {
079    return streamedHead;
080    }
081
082  protected void buildGraph()
083    {
084    streamedHead = handleHead( this.source, flowProcess );
085
086    Set<Tap> tributaries = ElementGraphs.findSources( elementGraph, Tap.class );
087
088    tributaries.remove( this.source ); // we cannot stream and accumulate the same source
089
090    // accumulated paths
091    for( Object source : tributaries )
092      {
093      final HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess;
094      JobConf conf = hadoopProcess.getJobConf();
095
096      // allows client side config to be used cluster side
097      String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( (Tap) source ) );
098
099      if( property == null )
100        throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() );
101
102      conf = getSourceConf( hadoopProcess, conf, property );
103
104      // the reporter isn't provided until after the #run method is called
105      flowProcess = new HadoopFlowProcess( hadoopProcess, conf )
106        {
107        @Override
108        public Reporter getReporter()
109          {
110          return hadoopProcess.getReporter();
111          }
112        };
113
114      handleHead( (Tap) source, flowProcess );
115      }
116    }
117
118  private JobConf getSourceConf( HadoopFlowProcess flowProcess, JobConf conf, String property )
119    {
120    Map<String, String> priorConf;
121    try
122      {
123      priorConf = (Map<String, String>) HadoopUtil.deserializeBase64( property, conf, HashMap.class, true );
124      }
125    catch( IOException exception )
126      {
127      throw new FlowException( "unable to deserialize properties", exception );
128      }
129
130    return flowProcess.mergeMapIntoConfig( conf, priorConf );
131    }
132
133  private SourceStage handleHead( Tap source, FlowProcess flowProcess )
134    {
135    SourceStage sourceDuct = new SourceStage( flowProcess, source );
136
137    addHead( sourceDuct );
138
139    handleDuct( source, sourceDuct );
140
141    return sourceDuct;
142    }
143
144  @Override
145  protected SinkStage createSinkStage( Tap element )
146    {
147    return new HadoopSinkStage( flowProcess, element );
148    }
149
150  @Override
151  protected Gate createCoGroupGate( CoGroup element, IORole role )
152    {
153    return new HadoopCoGroupGate( flowProcess, element, IORole.sink );
154    }
155
156  @Override
157  protected Gate createGroupByGate( GroupBy element, IORole role )
158    {
159    return new HadoopGroupByGate( flowProcess, element, role );
160    }
161
162  @Override
163  protected GroupingSpliceGate createNonBlockingJoinGate( HashJoin join )
164    {
165    return new HadoopMemoryJoinGate( flowProcess, join ); // does not use a latch
166    }
167  }