001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.local;
022
023import java.util.HashMap;
024import java.util.Map;
025import java.util.Properties;
026import java.util.Set;
027
028import cascading.flow.FlowProcess;
029import cascading.flow.local.planner.LocalFlowStepJob;
030import cascading.flow.planner.BaseFlowStep;
031import cascading.flow.planner.FlowStepJob;
032import cascading.flow.planner.graph.ElementGraph;
033import cascading.flow.planner.process.FlowNodeGraph;
034import cascading.management.state.ClientState;
035import cascading.property.ConfigDef;
036import cascading.tap.Tap;
037import cascading.util.Util;
038
039/** Class LocalFlowStep is the local mode implementation of {@link cascading.flow.FlowStep}. */
040public class LocalFlowStep extends BaseFlowStep<Properties>
041  {
042  /** Map of Properties modified by each Tap's sourceConfInit/sinkConfInit */
043  private final Map<Tap, Properties> tapProperties = new HashMap<Tap, Properties>();
044
045  public LocalFlowStep( ElementGraph elementGraph, FlowNodeGraph flowNodeGraph )
046    {
047    super( elementGraph, flowNodeGraph );
048    }
049
050  @Override
051  public Map<Object, Object> getConfigAsProperties()
052    {
053    return getConfig();
054    }
055
056  @Override
057  public Properties createInitializedConfig( FlowProcess<Properties> flowProcess, Properties parentConfig )
058    {
059    Properties currentProperties = parentConfig == null ? new Properties() : new Properties( parentConfig );
060
061    initTaps( flowProcess, currentProperties, getSourceTaps(), false );
062    initTaps( flowProcess, currentProperties, getSinkTaps(), true );
063    initTaps( flowProcess, currentProperties, getTraps(), true );
064
065    initFromStepConfigDef( currentProperties );
066    initFromNodeConfigDef( currentProperties );
067
068    return currentProperties;
069    }
070
071  protected void initTaps( FlowProcess<Properties> flowProcess, Properties conf, Set<Tap> taps, boolean isSink )
072    {
073    if( !taps.isEmpty() )
074      {
075      for( Tap tap : taps )
076        {
077        Properties confCopy = flowProcess.copyConfig( conf );
078        tapProperties.put( tap, confCopy ); // todo: store the diff, not the copy
079
080        if( isSink )
081          tap.sinkConfInit( flowProcess, confCopy );
082        else
083          tap.sourceConfInit( flowProcess, confCopy );
084        }
085      }
086    }
087
088  private void initFromNodeConfigDef( final Properties properties )
089    {
090    initConfFromNodeConfigDef( Util.getFirst( getFlowNodeGraph().vertexSet() ).getElementGraph(), getSetterFor( properties ) );
091    }
092
093  private void initFromStepConfigDef( final Properties properties )
094    {
095    initConfFromStepConfigDef( getSetterFor( properties ) );
096    }
097
098  private ConfigDef.Setter getSetterFor( final Properties properties )
099    {
100    return new ConfigDef.Setter()
101    {
102    @Override
103    public String set( String key, String value )
104      {
105      String oldValue = get( key );
106
107      properties.setProperty( key, value );
108
109      return oldValue;
110      }
111
112    @Override
113    public String update( String key, String value )
114      {
115      String oldValue = get( key );
116
117      if( oldValue == null )
118        properties.setProperty( key, value );
119      else if( !oldValue.contains( value ) )
120        properties.setProperty( key, oldValue + "," + value );
121
122      return oldValue;
123      }
124
125    @Override
126    public String get( String key )
127      {
128      String value = properties.getProperty( key );
129
130      if( value == null || value.isEmpty() )
131        return null;
132
133      return value;
134      }
135    };
136    }
137
138  @Override
139  public void clean( Properties config )
140    {
141    }
142
143  @Override
144  protected FlowStepJob<Properties> createFlowStepJob( ClientState clientState, FlowProcess<Properties> flowProcess, Properties initializedStepConfig )
145    {
146    // localize a flow process
147    flowProcess = flowProcess.copyWith( initializedStepConfig );
148
149    return new LocalFlowStepJob( clientState, (LocalFlowProcess) flowProcess, this );
150    }
151
152  public Map<Tap, Properties> getPropertiesMap()
153    {
154    return tapProperties;
155    }
156  }