001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.flow.local; 022 023 import java.io.IOException; 024 import java.util.Collection; 025 import java.util.Collections; 026 import java.util.Map; 027 import java.util.Properties; 028 029 import cascading.CascadingException; 030 import cascading.flow.FlowProcess; 031 import cascading.flow.FlowSession; 032 import cascading.stats.local.LocalStepStats; 033 import cascading.tap.Tap; 034 import cascading.tuple.TupleEntryCollector; 035 import cascading.tuple.TupleEntryIterator; 036 037 /** Class LocalFlowProcess is the local mode implementation of {@link FlowProcess}. */ 038 public class LocalFlowProcess extends FlowProcess<Properties> 039 { 040 private final Properties config; 041 private LocalStepStats stepStats; 042 043 public LocalFlowProcess() 044 { 045 config = new Properties(); 046 } 047 048 public LocalFlowProcess( Properties config ) 049 { 050 this.config = config; 051 } 052 053 public LocalFlowProcess( FlowSession flowSession, Properties config ) 054 { 055 super( flowSession ); 056 this.config = config; 057 } 058 059 public LocalFlowProcess( LocalFlowProcess flowProcess, Properties properties ) 060 { 061 super( flowProcess.getCurrentSession() ); 062 this.config = properties; 063 this.stepStats = flowProcess.stepStats; 064 } 065 066 public void setStepStats( LocalStepStats stepStats ) 067 { 068 this.stepStats = stepStats; 069 } 070 071 @Override 072 public int getNumProcessSlices() 073 { 074 return 1; 075 } 076 077 @Override 078 public int getCurrentSliceNum() 079 { 080 return 0; 081 } 082 083 @Override 084 public Object getProperty( String key ) 085 { 086 return config.getProperty( key ); 087 } 088 089 @Override 090 public Collection<String> getPropertyKeys() 091 { 092 return Collections.unmodifiableSet( config.stringPropertyNames() ); 093 } 094 095 @Override 096 public Object newInstance( String className ) 097 { 098 if( className == null || className.isEmpty() ) 099 return null; 100 101 try 102 { 103 Class type = (Class) LocalFlowProcess.class.getClassLoader().loadClass( className.toString() ); 104 105 return type.newInstance(); 106 } 107 catch( ClassNotFoundException exception ) 108 { 109 throw new CascadingException( "unable to load class: " + className.toString(), exception ); 110 } 111 catch( InstantiationException exception ) 112 { 113 throw new CascadingException( "unable to instantiate class: " + className.toString(), exception ); 114 } 115 catch( IllegalAccessException exception ) 116 { 117 throw new CascadingException( "unable to access class: " + className.toString(), exception ); 118 } 119 } 120 121 @Override 122 public void keepAlive() 123 { 124 } 125 126 @Override 127 public void increment( Enum counter, long amount ) 128 { 129 stepStats.increment( counter, amount ); 130 } 131 132 @Override 133 public void increment( String group, String counter, long amount ) 134 { 135 stepStats.increment( group, counter, amount ); 136 } 137 138 @Override 139 public void setStatus( String status ) 140 { 141 142 } 143 144 @Override 145 public boolean isCounterStatusInitialized() 146 { 147 return true; 148 } 149 150 @Override 151 public TupleEntryIterator openTapForRead( Tap tap ) throws IOException 152 { 153 return tap.openForRead( this ); 154 } 155 156 @Override 157 public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException 158 { 159 return tap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks 160 } 161 162 @Override 163 public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException 164 { 165 return trap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks 166 } 167 168 @Override 169 public TupleEntryCollector openSystemIntermediateForWrite() throws IOException 170 { 171 return null; 172 } 173 174 @Override 175 public FlowProcess copyWith( Properties object ) 176 { 177 return new LocalFlowProcess( object ); 178 } 179 180 @Override 181 public Properties getConfigCopy() 182 { 183 return new Properties( config ); 184 } 185 186 @Override 187 public Properties copyConfig( Properties config ) 188 { 189 return new Properties( config ); 190 } 191 192 @Override 193 public Map<String, String> diffConfigIntoMap( Properties defaultConfig, Properties updatedConfig ) 194 { 195 return null; 196 } 197 198 @Override 199 public Properties mergeMapIntoConfig( Properties defaultConfig, Map<String, String> map ) 200 { 201 return null; 202 } 203 }