001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.flow.local;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.Map;
027import java.util.Properties;
028
029import cascading.CascadingException;
030import cascading.flow.FlowProcess;
031import cascading.flow.FlowSession;
032import cascading.stats.local.LocalStepStats;
033import cascading.tap.Tap;
034import cascading.tuple.TupleEntryCollector;
035import cascading.tuple.TupleEntryIterator;
036
037/** Class LocalFlowProcess is the local mode implementation of {@link FlowProcess}. */
038public class LocalFlowProcess extends FlowProcess<Properties>
039  {
040  private final Properties config;
041  private LocalStepStats stepStats;
042
043  public LocalFlowProcess()
044    {
045    config = new Properties();
046    }
047
048  public LocalFlowProcess( Properties config )
049    {
050    this.config = config;
051    }
052
053  public LocalFlowProcess( FlowSession flowSession, Properties config )
054    {
055    super( flowSession );
056    this.config = config;
057    }
058
059  public LocalFlowProcess( LocalFlowProcess flowProcess, Properties properties )
060    {
061    super( flowProcess );
062    this.config = properties;
063    this.stepStats = flowProcess.stepStats;
064    }
065
066  public void setStepStats( LocalStepStats stepStats )
067    {
068    this.stepStats = stepStats;
069    }
070
071  @Override
072  public int getNumProcessSlices()
073    {
074    return 1;
075    }
076
077  @Override
078  public int getCurrentSliceNum()
079    {
080    return 0;
081    }
082
083  @Override
084  public Object getProperty( String key )
085    {
086    return config.getProperty( key );
087    }
088
089  @Override
090  public Collection<String> getPropertyKeys()
091    {
092    return Collections.unmodifiableSet( config.stringPropertyNames() );
093    }
094
095  @Override
096  public Object newInstance( String className )
097    {
098    if( className == null || className.isEmpty() )
099      return null;
100
101    try
102      {
103      Class type = (Class) LocalFlowProcess.class.getClassLoader().loadClass( className.toString() );
104
105      return type.newInstance();
106      }
107    catch( ClassNotFoundException exception )
108      {
109      throw new CascadingException( "unable to load class: " + className.toString(), exception );
110      }
111    catch( InstantiationException exception )
112      {
113      throw new CascadingException( "unable to instantiate class: " + className.toString(), exception );
114      }
115    catch( IllegalAccessException exception )
116      {
117      throw new CascadingException( "unable to access class: " + className.toString(), exception );
118      }
119    }
120
121  @Override
122  public void keepAlive()
123    {
124    }
125
126  @Override
127  public void increment( Enum counter, long amount )
128    {
129    stepStats.increment( counter, amount );
130    }
131
132  @Override
133  public void increment( String group, String counter, long amount )
134    {
135    stepStats.increment( group, counter, amount );
136    }
137
138  @Override
139  public long getCounterValue( Enum counter )
140    {
141    return stepStats.getCounterValue( counter );
142    }
143
144  @Override
145  public long getCounterValue( String group, String counter )
146    {
147    return stepStats.getCounterValue( group, counter );
148    }
149
150  @Override
151  public void setStatus( String status )
152    {
153
154    }
155
156  @Override
157  public boolean isCounterStatusInitialized()
158    {
159    return true;
160    }
161
162  @Override
163  public TupleEntryIterator openTapForRead( Tap tap ) throws IOException
164    {
165    return tap.openForRead( this );
166    }
167
168  @Override
169  public TupleEntryCollector openTapForWrite( Tap tap ) throws IOException
170    {
171    return tap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks
172    }
173
174  @Override
175  public TupleEntryCollector openTrapForWrite( Tap trap ) throws IOException
176    {
177    return trap.openForWrite( this, null ); // do not honor sinkmode as this may be opened across tasks
178    }
179
180  @Override
181  public TupleEntryCollector openSystemIntermediateForWrite() throws IOException
182    {
183    return null;
184    }
185
186  @Override
187  public FlowProcess copyWith( Properties object )
188    {
189    return new LocalFlowProcess( this, object );
190    }
191
192  @Override
193  public Properties getConfig()
194    {
195    return config;
196    }
197
198  @Override
199  public Properties getConfigCopy()
200    {
201    return new Properties( config );
202    }
203
204  @Override
205  public <C> C copyConfig( C config )
206    {
207    return (C) new Properties( (Properties) config );
208    }
209
210  @Override
211  public <C> Map<String, String> diffConfigIntoMap( C defaultConfig, C updatedConfig )
212    {
213    return null;
214    }
215
216  @Override
217  public Properties mergeMapIntoConfig( Properties defaultConfig, Map<String, String> map )
218    {
219    return null;
220    }
221  }