001 /* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.hadoop.stream; 022 023 import java.io.IOException; 024 import java.util.HashMap; 025 import java.util.List; 026 import java.util.Map; 027 import java.util.Set; 028 029 import cascading.flow.FlowElement; 030 import cascading.flow.FlowException; 031 import cascading.flow.FlowProcess; 032 import cascading.flow.hadoop.HadoopFlowProcess; 033 import cascading.flow.hadoop.HadoopFlowStep; 034 import cascading.flow.hadoop.util.HadoopUtil; 035 import cascading.flow.stream.Gate; 036 import cascading.flow.stream.MemoryHashJoinGate; 037 import cascading.flow.stream.SinkStage; 038 import cascading.flow.stream.SourceStage; 039 import cascading.flow.stream.SpliceGate; 040 import cascading.flow.stream.StepStreamGraph; 041 import cascading.pipe.CoGroup; 042 import cascading.pipe.Group; 043 import cascading.pipe.GroupBy; 044 import cascading.pipe.HashJoin; 045 import cascading.tap.Tap; 046 import org.apache.hadoop.mapred.JobConf; 047 048 /** 049 * 050 */ 051 public class HadoopMapStreamGraph extends StepStreamGraph 052 { 053 private final Tap source; 054 private SourceStage streamedHead; 055 056 public HadoopMapStreamGraph( HadoopFlowProcess flowProcess, HadoopFlowStep step, Tap source ) 057 { 058 super( flowProcess, step ); 059 this.source = source; 060 061 buildGraph(); 062 063 setTraps(); 064 setScopes(); 065 066 printGraph( step.getID(), "map", flowProcess.getCurrentSliceNum() ); 067 bind(); 068 } 069 070 public SourceStage getStreamedHead() 071 { 072 return streamedHead; 073 } 074 075 protected void buildGraph() 076 { 077 streamedHead = handleHead( this.source, flowProcess ); 078 079 FlowElement tail = step.getGroup() != null ? step.getGroup() : step.getSink(); 080 Set<Tap> tributaries = step.getJoinTributariesBetween( this.source, tail ); 081 082 tributaries.remove( this.source ); // we cannot stream and accumulate the same source 083 084 // accumulated paths 085 for( Object source : tributaries ) 086 { 087 HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess; 088 JobConf conf = hadoopProcess.getJobConf(); 089 090 // allows client side config to be used cluster side 091 String property = conf.getRaw( "cascading.step.accumulated.source.conf." + Tap.id( (Tap) source ) ); 092 093 if( property == null ) 094 throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() ); 095 096 conf = getSourceConf( hadoopProcess, conf, property ); 097 flowProcess = new HadoopFlowProcess( hadoopProcess, conf ); 098 099 handleHead( (Tap) source, flowProcess ); 100 } 101 } 102 103 private JobConf getSourceConf( HadoopFlowProcess flowProcess, JobConf conf, String property ) 104 { 105 Map<String, String> priorConf; 106 try 107 { 108 priorConf = (Map<String, String>) HadoopUtil.deserializeBase64( property, conf, HashMap.class, true ); 109 } 110 catch( IOException exception ) 111 { 112 throw new FlowException( "unable to deserialize properties", exception ); 113 } 114 115 return flowProcess.mergeMapIntoConfig( conf, priorConf ); 116 } 117 118 private SourceStage handleHead( Tap source, FlowProcess flowProcess ) 119 { 120 SourceStage sourceDuct = new SourceStage( flowProcess, source ); 121 122 addHead( sourceDuct ); 123 124 handleDuct( source, sourceDuct ); 125 126 return sourceDuct; 127 } 128 129 @Override 130 protected SinkStage createSinkStage( Tap element ) 131 { 132 return new HadoopSinkStage( flowProcess, element ); 133 } 134 135 protected Gate createCoGroupGate( CoGroup element ) 136 { 137 return new HadoopCoGroupGate( flowProcess, element, SpliceGate.Role.sink ); 138 } 139 140 protected Gate createGroupByGate( GroupBy element ) 141 { 142 return new HadoopGroupByGate( flowProcess, element, SpliceGate.Role.sink ); 143 } 144 145 @Override 146 protected MemoryHashJoinGate createNonBlockingJoinGate( HashJoin join ) 147 { 148 return new HadoopMemoryJoinGate( flowProcess, join ); // does not use a latch 149 } 150 151 protected boolean stopOnElement( FlowElement lhsElement, List<FlowElement> successors ) 152 { 153 if( lhsElement instanceof Group ) 154 return true; 155 156 if( successors.isEmpty() ) 157 { 158 if( !( lhsElement instanceof Tap ) ) 159 throw new IllegalStateException( "expected a Tap instance" ); 160 161 return true; 162 } 163 164 return false; 165 } 166 }