001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.local; 022 023import java.util.HashMap; 024import java.util.Map; 025import java.util.Properties; 026import java.util.Set; 027 028import cascading.flow.FlowProcess; 029import cascading.flow.local.planner.LocalFlowStepJob; 030import cascading.flow.planner.BaseFlowStep; 031import cascading.flow.planner.FlowStepJob; 032import cascading.flow.planner.graph.ElementGraph; 033import cascading.flow.planner.process.FlowNodeGraph; 034import cascading.management.state.ClientState; 035import cascading.property.ConfigDef; 036import cascading.tap.Tap; 037import cascading.util.Util; 038 039/** Class LocalFlowStep is the local mode implementation of {@link cascading.flow.FlowStep}. */ 040public class LocalFlowStep extends BaseFlowStep<Properties> 041 { 042 /** Map of Properties modified by each Tap's sourceConfInit/sinkConfInit */ 043 private final Map<Tap, Properties> tapProperties = new HashMap<Tap, Properties>(); 044 045 public LocalFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph ) 046 { 047 super( elementGraph, flowNodeGraph ); 048 } 049 050 @Override 051 public Map<Object, Object> getConfigAsProperties() 052 { 053 return getConfig(); 054 } 055 056 @Override 057 public Properties createInitializedConfig( FlowProcess<Properties> flowProcess, Properties parentConfig ) 058 { 059 Properties currentProperties = parentConfig == null ? new Properties() : new Properties( parentConfig ); 060 061 initTaps( flowProcess, currentProperties, getSourceTaps(), false ); 062 initTaps( flowProcess, currentProperties, getSinkTaps(), true ); 063 initTaps( flowProcess, currentProperties, getTraps(), true ); 064 065 initFromStepConfigDef( currentProperties ); 066 initFromNodeConfigDef( currentProperties ); 067 068 return currentProperties; 069 } 070 071 protected void initTaps( FlowProcess<Properties> flowProcess, Properties conf, Set<Tap> taps, boolean isSink ) 072 { 073 if( !taps.isEmpty() ) 074 { 075 for( Tap tap : taps ) 076 { 077 Properties confCopy = flowProcess.copyConfig( conf ); 078 tapProperties.put( tap, confCopy ); // todo: store the diff, not the copy 079 080 if( isSink ) 081 tap.sinkConfInit( flowProcess, confCopy ); 082 else 083 tap.sourceConfInit( flowProcess, confCopy ); 084 } 085 } 086 } 087 088 private void initFromNodeConfigDef( final Properties properties ) 089 { 090 initConfFromNodeConfigDef( Util.getFirst( getFlowNodeGraph().vertexSet() ).getElementGraph(), getSetterFor( properties ) ); 091 } 092 093 private void initFromStepConfigDef( final Properties properties ) 094 { 095 initConfFromStepConfigDef( getSetterFor( properties ) ); 096 } 097 098 private ConfigDef.Setter getSetterFor( final Properties properties ) 099 { 100 return new ConfigDef.Setter() 101 { 102 @Override 103 public String set( String key, String value ) 104 { 105 String oldValue = get( key ); 106 107 properties.setProperty( key, value ); 108 109 return oldValue; 110 } 111 112 @Override 113 public String update( String key, String value ) 114 { 115 String oldValue = get( key ); 116 117 if( oldValue == null ) 118 properties.setProperty( key, value ); 119 else if( !oldValue.contains( value ) ) 120 properties.setProperty( key, oldValue + "," + value ); 121 122 return oldValue; 123 } 124 125 @Override 126 public String get( String key ) 127 { 128 String value = properties.getProperty( key ); 129 130 if( value == null || value.isEmpty() ) 131 return null; 132 133 return value; 134 } 135 }; 136 } 137 138 @Override 139 public void clean( Properties config ) 140 { 141 } 142 143 @Override 144 protected FlowStepJob<Properties> createFlowStepJob( ClientState clientState, FlowProcess<Properties> flowProcess, Properties initializedStepConfig ) 145 { 146 // localize a flow process 147 flowProcess = flowProcess.copyWith( initializedStepConfig ); 148 149 return new LocalFlowStepJob( clientState, (LocalFlowProcess) flowProcess, this ); 150 } 151 152 public Map<Tap, Properties> getPropertiesMap() 153 { 154 return tapProperties; 155 } 156 }