001/* 002 * Copyright (c) 2016-2017 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.flow.local; 023 024import java.io.File; 025import java.io.IOException; 026import java.net.MalformedURLException; 027import java.net.URL; 028import java.net.URLClassLoader; 029import java.util.Map; 030import java.util.Properties; 031 032import cascading.flow.BaseFlow; 033import cascading.flow.FlowDef; 034import cascading.flow.FlowException; 035import cascading.flow.FlowProcess; 036import cascading.flow.planner.PlatformInfo; 037import riffle.process.ProcessConfiguration; 038 039/** 040 * Class LocalFlow is the local mode specific implementation of a {@link cascading.flow.Flow}. 041 * <p/> 042 * LocalFlow must be created through a {@link LocalFlowConnector} instance. 043 * <p/> 044 * If classpath paths are provided on the {@link FlowDef}, the context classloader used to internally urn the current 045 * Flow will be swapped out with an URLClassLoader pointing to each element. 046 * 047 * @see LocalFlowConnector 048 */ 049public class LocalFlow extends BaseFlow<Properties> 050 { 051 private Properties config; 052 private FlowProcess<Properties> flowProcess; 053 054 public LocalFlow( PlatformInfo platformInfo, Map<Object, Object> properties, Properties config, FlowDef flowDef ) 055 { 056 super( platformInfo, properties, config, flowDef ); 057 058 initFromProperties( properties ); 059 } 060 061 @Override 062 protected void initConfig( Map<Object, Object> properties, Properties parentConfig ) 063 { 064 this.config = createConfig( properties, parentConfig ); 065 this.flowProcess = new LocalFlowProcess( getFlowSession(), config ); 066 } 067 068 @Override 069 protected void setConfigProperty( Properties properties, Object key, Object value ) 070 { 071 properties.setProperty( key.toString(), value.toString() ); 072 } 073 074 @Override 075 protected Properties newConfig( Properties defaultConfig ) 076 { 077 return defaultConfig == null ? new Properties() : new Properties( defaultConfig ); 078 } 079 080 @ProcessConfiguration 081 @Override 082 public Properties getConfig() 083 { 084 return config; 085 } 086 087 @Override 088 public Properties getConfigCopy() 089 { 090 return new Properties( config ); 091 } 092 093 @Override 094 public Map<Object, Object> getConfigAsProperties() 095 { 096 return config; 097 } 098 099 @Override 100 public String getProperty( String key ) 101 { 102 return config.getProperty( key ); 103 } 104 105 @Override 106 public FlowProcess<Properties> getFlowProcess() 107 { 108 return flowProcess; 109 } 110 111 @Override 112 protected void internalStart() 113 { 114 try 115 { 116 deleteSinksIfReplace(); 117 deleteTrapsIfReplace(); 118 } 119 catch( IOException exception ) 120 { 121 throw new FlowException( "unable to delete sinks", exception ); 122 } 123 } 124 125 @Override 126 protected Thread createFlowThread( String threadName ) 127 { 128 Thread flowThread = super.createFlowThread( threadName ); 129 130 flowThread.setContextClassLoader( createClassPathClassloader( flowThread.getContextClassLoader() ) ); 131 132 return flowThread; 133 } 134 135 private ClassLoader createClassPathClassloader( ClassLoader classLoader ) 136 { 137 if( getClassPath() == null || getClassPath().isEmpty() ) 138 return classLoader; 139 140 URL[] urls = new URL[ getClassPath().size() ]; 141 142 for( int i = 0; i < getClassPath().size(); i++ ) 143 { 144 String path = getClassPath().get( i ); 145 File file = new File( path ).getAbsoluteFile(); 146 147 if( !file.exists() ) 148 throw new FlowException( "path does not exist: " + file ); 149 150 try 151 { 152 urls[ i ] = file.toURI().toURL(); 153 } 154 catch( MalformedURLException exception ) 155 { 156 throw new FlowException( "bad path: " + file, exception ); 157 } 158 } 159 160 return new URLClassLoader( urls, classLoader ); 161 } 162 163 @Override 164 protected void internalClean( boolean stop ) 165 { 166 } 167 168 @Override 169 public boolean stepsAreLocal() 170 { 171 return false; 172 } 173 174 @Override 175 protected int getMaxNumParallelSteps() 176 { 177 return 0; 178 } 179 180 @Override 181 protected void internalShutdown() 182 { 183 } 184 }