001/*
002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.local;
023
024import java.io.File;
025import java.io.IOException;
026import java.net.MalformedURLException;
027import java.net.URL;
028import java.net.URLClassLoader;
029import java.util.Map;
030import java.util.Properties;
031
032import cascading.flow.BaseFlow;
033import cascading.flow.FlowDef;
034import cascading.flow.FlowException;
035import cascading.flow.FlowProcess;
036import cascading.flow.planner.PlatformInfo;
037import riffle.process.ProcessConfiguration;
038
039/**
040 * Class LocalFlow is the local mode specific implementation of a {@link cascading.flow.Flow}.
041 * <p/>
042 * LocalFlow must be created through a {@link LocalFlowConnector} instance.
043 * <p/>
044 * If classpath paths are provided on the {@link FlowDef}, the context classloader used to internally urn the current
045 * Flow will be swapped out with an URLClassLoader pointing to each element.
046 *
047 * @see LocalFlowConnector
048 */
049public class LocalFlow extends BaseFlow<Properties>
050  {
051  private Properties config;
052  private FlowProcess<Properties> flowProcess;
053
054  public LocalFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Properties config, FlowDef flowDef )
055    {
056    super( platformInfo, properties, config, flowDef );
057
058    initFromProperties( properties );
059    }
060
061  @Override
062  protected void initConfig( Map<Object, Object> properties, Properties parentConfig )
063    {
064    this.config = createConfig( properties, parentConfig );
065    this.flowProcess = new LocalFlowProcess( getFlowSession(), config );
066    }
067
068  @Override
069  protected void setConfigProperty( Properties properties, Object key, Object value )
070    {
071    properties.setProperty( key.toString(), value.toString() );
072    }
073
074  @Override
075  protected Properties newConfig( Properties defaultConfig )
076    {
077    return defaultConfig == null ? new Properties() : new Properties( defaultConfig );
078    }
079
080  @ProcessConfiguration
081  @Override
082  public Properties getConfig()
083    {
084    return config;
085    }
086
087  @Override
088  public Properties getConfigCopy()
089    {
090    return new Properties( config );
091    }
092
093  @Override
094  public Map<Object, Object> getConfigAsProperties()
095    {
096    return config;
097    }
098
099  @Override
100  public String getProperty( String key )
101    {
102    return config.getProperty( key );
103    }
104
105  @Override
106  public FlowProcess<Properties> getFlowProcess()
107    {
108    return flowProcess;
109    }
110
111  @Override
112  protected void internalStart()
113    {
114    try
115      {
116      deleteSinksIfReplace();
117      deleteTrapsIfReplace();
118      }
119    catch( IOException exception )
120      {
121      throw new FlowException( "unable to delete sinks", exception );
122      }
123    }
124
125  @Override
126  protected Thread createFlowThread( String threadName )
127    {
128    Thread flowThread = super.createFlowThread( threadName );
129
130    flowThread.setContextClassLoader( createClassPathClassloader( flowThread.getContextClassLoader() ) );
131
132    return flowThread;
133    }
134
135  private ClassLoader createClassPathClassloader( ClassLoader classLoader )
136    {
137    if( getClassPath() == null || getClassPath().isEmpty() )
138      return classLoader;
139
140    URL[] urls = new URL[ getClassPath().size() ];
141
142    for( int i = 0; i < getClassPath().size(); i++ )
143      {
144      String path = getClassPath().get( i );
145      File file = new File( path ).getAbsoluteFile();
146
147      if( !file.exists() )
148        throw new FlowException( "path does not exist: " + file );
149
150      try
151        {
152        urls[ i ] = file.toURI().toURL();
153        }
154      catch( MalformedURLException exception )
155        {
156        throw new FlowException( "bad path: " + file, exception );
157        }
158      }
159
160    return new URLClassLoader( urls, classLoader );
161    }
162
163  @Override
164  protected void internalClean( boolean stop )
165    {
166    }
167
168  @Override
169  public boolean stepsAreLocal()
170    {
171    return false;
172    }
173
174  @Override
175  protected int getMaxNumParallelSteps()
176    {
177    return 0;
178    }
179
180  @Override
181  protected void internalShutdown()
182    {
183    }
184  }