001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.flow.stream.element; 022 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.Set; 027 028import cascading.flow.FlowProcess; 029import cascading.flow.FlowProcessWrapper; 030import cascading.property.ConfigDef; 031 032/** 033 * 034 */ 035public class ElementFlowProcess extends FlowProcessWrapper 036 { 037 private final ConfigDef configDef; 038 private final ConfigDef.Getter getter; 039 040 public ElementFlowProcess( FlowProcess flowProcess, ConfigDef configDef ) 041 { 042 super( flowProcess ); 043 044 this.configDef = configDef; 045 this.getter = new ConfigDef.Getter() 046 { 047 @Override 048 public String update( String key, String value ) 049 { 050 String result = get( key ); 051 052 if( result == null ) 053 return value; 054 055 if( result.contains( value ) ) 056 return result; 057 058 return result + "," + value; 059 } 060 061 @Override 062 public String get( String key ) 063 { 064 Object value = getDelegate().getProperty( key ); 065 066 if( value == null ) 067 return null; 068 069 return value.toString(); 070 } 071 }; 072 } 073 074 @Override 075 public Object getProperty( String key ) 076 { 077 return configDef.apply( key, getter ); 078 } 079 080 @Override 081 public Collection<String> getPropertyKeys() 082 { 083 Set<String> keys = new HashSet<String>(); 084 085 keys.addAll( getDelegate().getPropertyKeys() ); 086 keys.addAll( configDef.getAllKeys() ); 087 088 return Collections.unmodifiableSet( keys ); 089 } 090 }