001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.hadoop.stream.graph; 023 024import java.io.IOException; 025import java.util.HashMap; 026import java.util.Map; 027import java.util.Set; 028 029import cascading.flow.FlowException; 030import cascading.flow.FlowNode; 031import cascading.flow.FlowProcess; 032import cascading.flow.hadoop.HadoopFlowProcess; 033import cascading.flow.hadoop.stream.HadoopMemoryJoinGate; 034import cascading.flow.hadoop.stream.element.HadoopCoGroupGate; 035import cascading.flow.hadoop.stream.element.HadoopGroupByGate; 036import cascading.flow.hadoop.stream.element.HadoopSinkStage; 037import cascading.flow.hadoop.util.HadoopUtil; 038import cascading.flow.planner.graph.ElementGraphs; 039import cascading.flow.stream.duct.Gate; 040import cascading.flow.stream.element.GroupingSpliceGate; 041import cascading.flow.stream.element.SinkStage; 042import cascading.flow.stream.element.SourceStage; 043import cascading.flow.stream.graph.IORole; 044import cascading.flow.stream.graph.NodeStreamGraph; 045import cascading.pipe.CoGroup; 046import cascading.pipe.GroupBy; 047import cascading.pipe.HashJoin; 048import cascading.tap.Tap; 049import org.apache.hadoop.mapred.JobConf; 050import org.apache.hadoop.mapred.Reporter; 051 052/** 053 * 054 */ 055public class HadoopMapStreamGraph extends NodeStreamGraph 056 { 057 private final Tap source; 058 private SourceStage streamedHead; 059 060 public HadoopMapStreamGraph( HadoopFlowProcess flowProcess, FlowNode node, Tap source ) 061 { 062 super( flowProcess, node, source ); 063 this.source = source; 064 065 buildGraph(); 066 067 setTraps(); 068 setScopes(); 069 070 printGraph( node.getID(), "map", flowProcess.getCurrentSliceNum() ); 071 072 bind(); 073 074 printBoundGraph( node.getID(), "map", flowProcess.getCurrentSliceNum() ); 075 } 076 077 public SourceStage getStreamedHead() 078 { 079 return streamedHead; 080 } 081 082 protected void buildGraph() 083 { 084 streamedHead = handleHead( this.source, flowProcess ); 085 086 Set<Tap> tributaries = ElementGraphs.findSources( elementGraph, Tap.class ); 087 088 tributaries.remove( this.source ); // we cannot stream and accumulate the same source 089 090 // accumulated paths 091 for( Object source : tributaries ) 092 { 093 final HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess; 094 JobConf conf = hadoopProcess.getJobConf(); 095 096 // allows client side config to be used cluster side 097 String property = conf.getRaw( "cascading.node.accumulated.source.conf." + Tap.id( (Tap) source ) ); 098 099 if( property == null ) 100 throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() ); 101 102 conf = getSourceConf( hadoopProcess, conf, property ); 103 104 // the reporter isn't provided until after the #run method is called 105 flowProcess = new HadoopFlowProcess( hadoopProcess, conf ) 106 { 107 @Override 108 public Reporter getReporter() 109 { 110 return hadoopProcess.getReporter(); 111 } 112 }; 113 114 handleHead( (Tap) source, flowProcess ); 115 } 116 } 117 118 private JobConf getSourceConf( HadoopFlowProcess flowProcess, JobConf conf, String property ) 119 { 120 Map<String, String> priorConf; 121 try 122 { 123 priorConf = (Map<String, String>) HadoopUtil.deserializeBase64( property, conf, HashMap.class, true ); 124 } 125 catch( IOException exception ) 126 { 127 throw new FlowException( "unable to deserialize properties", exception ); 128 } 129 130 return flowProcess.mergeMapIntoConfig( conf, priorConf ); 131 } 132 133 private SourceStage handleHead( Tap source, FlowProcess flowProcess ) 134 { 135 SourceStage sourceDuct = new SourceStage( flowProcess, source ); 136 137 addHead( sourceDuct ); 138 139 handleDuct( source, sourceDuct ); 140 141 return sourceDuct; 142 } 143 144 @Override 145 protected SinkStage createSinkStage( Tap element ) 146 { 147 return new HadoopSinkStage( flowProcess, element ); 148 } 149 150 @Override 151 protected Gate createCoGroupGate( CoGroup element, IORole role ) 152 { 153 return new HadoopCoGroupGate( flowProcess, element, IORole.sink ); 154 } 155 156 @Override 157 protected Gate createGroupByGate( GroupBy element, IORole role ) 158 { 159 return new HadoopGroupByGate( flowProcess, element, role ); 160 } 161 162 @Override 163 protected GroupingSpliceGate createNonBlockingJoinGate( HashJoin join ) 164 { 165 return new HadoopMemoryJoinGate( flowProcess, join ); // does not use a latch 166 } 167 }