001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.hadoop.stream.graph;
022
023import java.io.IOException;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Set;
027
028import cascading.flow.FlowException;
029import cascading.flow.FlowNode;
030import cascading.flow.FlowProcess;
031import cascading.flow.hadoop.HadoopFlowProcess;
032import cascading.flow.hadoop.stream.HadoopMemoryJoinGate;
033import cascading.flow.hadoop.stream.element.HadoopCoGroupGate;
034import cascading.flow.hadoop.stream.element.HadoopGroupByGate;
035import cascading.flow.hadoop.stream.element.HadoopSinkStage;
036import cascading.flow.hadoop.util.HadoopUtil;
037import cascading.flow.planner.graph.ElementGraphs;
038import cascading.flow.stream.duct.Gate;
039import cascading.flow.stream.element.GroupingSpliceGate;
040import cascading.flow.stream.element.SinkStage;
041import cascading.flow.stream.element.SourceStage;
042import cascading.flow.stream.graph.IORole;
043import cascading.flow.stream.graph.NodeStreamGraph;
044import cascading.pipe.CoGroup;
045import cascading.pipe.GroupBy;
046import cascading.pipe.HashJoin;
047import cascading.tap.Tap;
048import org.apache.hadoop.mapred.JobConf;
049
050/**
051 *
052 */
053public class HadoopMapStreamGraph extends NodeStreamGraph
054  {
055  private final Tap source;
056  private SourceStage streamedHead;
057
058  public HadoopMapStreamGraph( HadoopFlowProcess flowProcess, FlowNode node, Tap source )
059    {
060    super( flowProcess, node, source );
061    this.source = source;
062
063    buildGraph();
064
065    setTraps();
066    setScopes();
067
068    printGraph( node.getID(), "map", flowProcess.getCurrentSliceNum() );
069    bind();
070    }
071
072  public SourceStage getStreamedHead()
073    {
074    return streamedHead;
075    }
076
077  protected void buildGraph()
078    {
079    streamedHead = handleHead( this.source, flowProcess );
080
081    Set<Tap> tributaries = ElementGraphs.findSources( elementGraph, Tap.class );
082
083    tributaries.remove( this.source ); // we cannot stream and accumulate the same source
084
085    // accumulated paths
086    for( Object source : tributaries )
087      {
088      HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess;
089      JobConf conf = hadoopProcess.getJobConf();
090
091      // allows client side config to be used cluster side
092      String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( (Tap) source ) );
093
094      if( property == null )
095        throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() );
096
097      conf = getSourceConf( hadoopProcess, conf, property );
098      flowProcess = new HadoopFlowProcess( hadoopProcess, conf );
099
100      handleHead( (Tap) source, flowProcess );
101      }
102    }
103
104  private JobConf getSourceConf( HadoopFlowProcess flowProcess, JobConf conf, String property )
105    {
106    Map<String, String> priorConf;
107    try
108      {
109      priorConf = (Map<String, String>) HadoopUtil.deserializeBase64( property, conf, HashMap.class, true );
110      }
111    catch( IOException exception )
112      {
113      throw new FlowException( "unable to deserialize properties", exception );
114      }
115
116    return flowProcess.mergeMapIntoConfig( conf, priorConf );
117    }
118
119  private SourceStage handleHead( Tap source, FlowProcess flowProcess )
120    {
121    SourceStage sourceDuct = new SourceStage( flowProcess, source );
122
123    addHead( sourceDuct );
124
125    handleDuct( source, sourceDuct );
126
127    return sourceDuct;
128    }
129
130  @Override
131  protected SinkStage createSinkStage( Tap element )
132    {
133    return new HadoopSinkStage( flowProcess, element );
134    }
135
136  @Override
137  protected Gate createCoGroupGate( CoGroup element, IORole role )
138    {
139    return new HadoopCoGroupGate( flowProcess, element, IORole.sink );
140    }
141
142  @Override
143  protected Gate createGroupByGate( GroupBy element, IORole role )
144    {
145    return new HadoopGroupByGate( flowProcess, element, role );
146    }
147
148  @Override
149  protected GroupingSpliceGate createNonBlockingJoinGate( HashJoin join )
150    {
151    return new HadoopMemoryJoinGate( flowProcess, join ); // does not use a latch
152    }
153  }