001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.stream.element;
022
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.Set;
027
028import cascading.flow.FlowProcess;
029import cascading.flow.FlowProcessWrapper;
030import cascading.property.ConfigDef;
031
032/**
033 *
034 */
035public class ElementFlowProcess extends FlowProcessWrapper
036  {
037  private final ConfigDef configDef;
038  private final ConfigDef.Getter getter;
039
040  public ElementFlowProcess( FlowProcess flowProcess, ConfigDef configDef )
041    {
042    super( flowProcess );
043
044    this.configDef = configDef;
045    this.getter = new ConfigDef.Getter()
046    {
047    @Override
048    public String update( String key, String value )
049      {
050      String result = get( key );
051
052      if( result == null )
053        return value;
054
055      if( result.contains( value ) )
056        return result;
057
058      return result + "," + value;
059      }
060
061    @Override
062    public String get( String key )
063      {
064      Object value = getDelegate().getProperty( key );
065
066      if( value == null )
067        return null;
068
069      return value.toString();
070      }
071    };
072    }
073
074  @Override
075  public Object getProperty( String key )
076    {
077    return configDef.apply( key, getter );
078    }
079
080  @Override
081  public Collection<String> getPropertyKeys()
082    {
083    Set<String> keys = new HashSet<String>();
084
085    keys.addAll( getDelegate().getPropertyKeys() );
086    keys.addAll( configDef.getAllKeys() );
087
088    return Collections.unmodifiableSet( keys );
089    }
090  }