001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.hadoop.stream.graph; 022 023import java.io.IOException; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.Set; 027 028import cascading.flow.FlowException; 029import cascading.flow.FlowNode; 030import cascading.flow.FlowProcess; 031import cascading.flow.hadoop.HadoopFlowProcess; 032import cascading.flow.hadoop.stream.HadoopMemoryJoinGate; 033import cascading.flow.hadoop.stream.element.HadoopCoGroupGate; 034import cascading.flow.hadoop.stream.element.HadoopGroupByGate; 035import cascading.flow.hadoop.stream.element.HadoopSinkStage; 036import cascading.flow.hadoop.util.HadoopUtil; 037import cascading.flow.planner.graph.ElementGraphs; 038import cascading.flow.stream.duct.Gate; 039import cascading.flow.stream.element.GroupingSpliceGate; 040import cascading.flow.stream.element.SinkStage; 041import cascading.flow.stream.element.SourceStage; 042import cascading.flow.stream.graph.IORole; 043import cascading.flow.stream.graph.NodeStreamGraph; 044import cascading.pipe.CoGroup; 045import cascading.pipe.GroupBy; 046import cascading.pipe.HashJoin; 047import cascading.tap.Tap; 048import org.apache.hadoop.mapred.JobConf; 049 050/** 051 * 052 */ 053public class HadoopMapStreamGraph extends NodeStreamGraph 054 { 055 private final Tap source; 056 private SourceStage streamedHead; 057 058 public HadoopMapStreamGraph( HadoopFlowProcess flowProcess, FlowNode node, Tap source ) 059 { 060 super( flowProcess, node, source ); 061 this.source = source; 062 063 buildGraph(); 064 065 setTraps(); 066 setScopes(); 067 068 printGraph( node.getID(), "map", flowProcess.getCurrentSliceNum() ); 069 bind(); 070 } 071 072 public SourceStage getStreamedHead() 073 { 074 return streamedHead; 075 } 076 077 protected void buildGraph() 078 { 079 streamedHead = handleHead( this.source, flowProcess ); 080 081 Set<Tap> tributaries = ElementGraphs.findSources( elementGraph, Tap.class ); 082 083 tributaries.remove( this.source ); // we cannot stream and accumulate the same source 084 085 // accumulated paths 086 for( Object source : tributaries ) 087 { 088 HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess; 089 JobConf conf = hadoopProcess.getJobConf(); 090 091 // allows client side config to be used cluster side 092 String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( (Tap) source ) ); 093 094 if( property == null ) 095 throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() ); 096 097 conf = getSourceConf( hadoopProcess, conf, property ); 098 flowProcess = new HadoopFlowProcess( hadoopProcess, conf ); 099 100 handleHead( (Tap) source, flowProcess ); 101 } 102 } 103 104 private JobConf getSourceConf( HadoopFlowProcess flowProcess, JobConf conf, String property ) 105 { 106 Map<String, String> priorConf; 107 try 108 { 109 priorConf = (Map<String, String>) HadoopUtil.deserializeBase64( property, conf, HashMap.class, true ); 110 } 111 catch( IOException exception ) 112 { 113 throw new FlowException( "unable to deserialize properties", exception ); 114 } 115 116 return flowProcess.mergeMapIntoConfig( conf, priorConf ); 117 } 118 119 private SourceStage handleHead( Tap source, FlowProcess flowProcess ) 120 { 121 SourceStage sourceDuct = new SourceStage( flowProcess, source ); 122 123 addHead( sourceDuct ); 124 125 handleDuct( source, sourceDuct ); 126 127 return sourceDuct; 128 } 129 130 @Override 131 protected SinkStage createSinkStage( Tap element ) 132 { 133 return new HadoopSinkStage( flowProcess, element ); 134 } 135 136 @Override 137 protected Gate createCoGroupGate( CoGroup element, IORole role ) 138 { 139 return new HadoopCoGroupGate( flowProcess, element, IORole.sink ); 140 } 141 142 @Override 143 protected Gate createGroupByGate( GroupBy element, IORole role ) 144 { 145 return new HadoopGroupByGate( flowProcess, element, role ); 146 } 147 148 @Override 149 protected GroupingSpliceGate createNonBlockingJoinGate( HashJoin join ) 150 { 151 return new HadoopMemoryJoinGate( flowProcess, join ); // does not use a latch 152 } 153 }