001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.local.stream.graph; 023 024import java.util.Properties; 025 026import cascading.flow.FlowElement; 027import cascading.flow.FlowNode; 028import cascading.flow.FlowProcess; 029import cascading.flow.local.LocalFlowProcess; 030import cascading.flow.local.LocalFlowStep; 031import cascading.flow.local.stream.duct.ParallelFork; 032import cascading.flow.local.stream.element.LocalGroupByGate; 033import cascading.flow.local.stream.element.SyncMergeStage; 034import cascading.flow.stream.duct.Duct; 035import cascading.flow.stream.duct.Gate; 036import cascading.flow.stream.element.MemoryCoGroupGate; 037import cascading.flow.stream.element.SinkStage; 038import cascading.flow.stream.element.SourceStage; 039import cascading.flow.stream.graph.IORole; 040import cascading.flow.stream.graph.NodeStreamGraph; 041import cascading.pipe.CoGroup; 042import cascading.pipe.GroupBy; 043import cascading.pipe.Merge; 044import cascading.property.PropertyUtil; 045import cascading.tap.Tap; 046 047/** 048 * 049 */ 050public class LocalStepStreamGraph extends NodeStreamGraph 051 { 052 private LocalFlowStep step; 053 054 public LocalStepStreamGraph( FlowProcess<Properties> flowProcess, LocalFlowStep step, FlowNode node ) 055 { 056 super( flowProcess, node ); 057 this.step = step; 058 059 buildGraph(); 060 setTraps(); 061 setScopes(); 062 063 printGraph( node.getID(), "local", 0 ); 064 065 bind(); 066 067 printBoundGraph( node.getID(), "local", 0 ); 068 } 069 070 protected void buildGraph() 071 { 072 for( Object rhsElement : node.getSourceTaps() ) 073 { 074 Duct rhsDuct = new SourceStage( tapFlowProcess( (Tap) rhsElement ), (Tap) rhsElement ); 075 076 addHead( rhsDuct ); 077 078 handleDuct( (FlowElement) rhsElement, rhsDuct ); 079 } 080 } 081 082 @Override 083 protected Duct createFork( Duct[] allNext ) 084 { 085 return new ParallelFork( allNext ); 086 } 087 088 protected Gate createCoGroupGate( CoGroup element, IORole role ) 089 { 090 return new MemoryCoGroupGate( flowProcess, element ); 091 } 092 093 protected Gate createGroupByGate( GroupBy element, IORole source ) 094 { 095 return new LocalGroupByGate( flowProcess, element ); 096 } 097 098 @Override 099 protected Duct createMergeStage( Merge merge, IORole both ) 100 { 101 return new SyncMergeStage( flowProcess, merge ); 102 } 103 104 @Override 105 protected SinkStage createSinkStage( Tap element ) 106 { 107 return new SinkStage( tapFlowProcess( element ), element ); 108 } 109 110 private LocalFlowProcess tapFlowProcess( Tap tap ) 111 { 112 Properties defaultProperties = ( (LocalFlowProcess) flowProcess ).getConfig(); 113 Properties tapProperties = step.getPropertiesMap().get( tap ); 114 115 tapProperties = PropertyUtil.createProperties( tapProperties, defaultProperties ); 116 117 return new LocalFlowProcess( (LocalFlowProcess) flowProcess, tapProperties ); 118 } 119 120 }