001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.management;
022
023import java.io.IOException;
024import java.io.InputStream;
025import java.net.URI;
026import java.net.URL;
027import java.net.URLClassLoader;
028import java.util.List;
029import java.util.Map;
030import java.util.Properties;
031
032import cascading.cascade.CascadeException;
033import cascading.management.state.ClientState;
034import cascading.property.PropertyUtil;
035import cascading.provider.CascadingService;
036import cascading.provider.ServiceLoader;
037import cascading.util.ShutdownUtil;
038import cascading.util.Util;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Class CascadingServices is the root class for pluggable services Cascading can call out to for distributed
044 * monitoring and management systems.
045 * <p/>
046 * Be default all services will be loaded from the jar {@code cascading/management/service.properties}
047 * ({@link #DEFAULT_PROPERTIES}) resource is found in. If the
048 * property {@link #CONTAINER_ENABLED} value is {@code false}, a ClassLoader container will not be created.
049 * <p/>
050 * For this to work, all service implementation and dependencies must be archived into a single jar.
051 * <p/>
052 * If any packages in the jar should be excluded, set a comma delimited list of names via the {@link #CONTAINER_EXCLUDE}
053 * property.
054 * <p/>
055 * If the file {@code cascading-service.properties} ({@link CascadingServices#CASCADING_SERVICES}) is found in the
056 * CLASSPATH, the {@code cascading.management.service.jar} property value will be used to search for
057 * {@code cascading/management/service.properties} resource.
058 *
059 * @see CascadingService
060 */
061public class CascadingServices
062  {
063  private static final Logger LOG = LoggerFactory.getLogger( CascadingServices.class );
064
065  public static final String CASCADING_SERVICES = "cascading-service.properties";
066  public static final String CASCADING_SERVICES_JAR = "cascading.management.service.jar";
067  public static final String CASCADING_SERVICES_JAR_DISABLE = "cascading.management.service.jar.disable";
068
069  public static final String DEFAULT_PROPERTIES = "cascading/management/service.properties";
070  public static final String CONTAINER_ENABLED = "cascading.management.container.enabled";
071  public static final String CONTAINER_EXCLUDE = "cascading.management.container.exclude";
072
073  static Properties defaultProperties;
074  static URL libraryURL;
075  static String[] exclusions;
076
077  Map<Object, Object> properties;
078
079  MetricsService metricsService;
080  DocumentService documentService;
081  boolean enableContainer;
082
083  static
084    {
085    ClassLoader classLoader = CascadingServices.class.getClassLoader();
086
087    // load all properties from cascading-service.properties
088    defaultProperties = loadProperties( new Properties(), CASCADING_SERVICES, classLoader );
089
090    libraryURL = getLibraryURL();
091
092    if( libraryURL != null )
093      classLoader = new URLClassLoader( new URL[]{libraryURL} );
094
095    // load additional, if only, properties from file in resource jar
096    defaultProperties = loadProperties( new Properties( defaultProperties ), DEFAULT_PROPERTIES, classLoader );
097
098    // if resources jar is on primary classpath, find the jar so we can isolate it when loading services
099    if( libraryURL == null )
100      libraryURL = parseLibraryURL( classLoader, DEFAULT_PROPERTIES );
101
102    exclusions = Util.removeNulls( defaultProperties.getProperty( CONTAINER_EXCLUDE, "" ).split( "," ) );
103    }
104
105  private static URL parseLibraryURL( ClassLoader classLoader, String resource )
106    {
107    URL url = classLoader.getResource( resource );
108
109    if( url != null )
110      {
111      try
112        {
113        String path = url.toURI().getSchemeSpecificPart();
114        int endIndex = path.lastIndexOf( '!' );
115
116        if( endIndex != -1 )
117          path = path.substring( 0, endIndex );
118
119        if( path.endsWith( ".jar" ) )
120          return new URL( path );
121        }
122      catch( Exception exception )
123        {
124        LOG.warn( "unable to parse resource library: {}", url, exception );
125        }
126      }
127
128    return null;
129    }
130
131  private static URL getLibraryURL()
132    {
133    String property = defaultProperties.getProperty( CASCADING_SERVICES_JAR );
134
135    if( property == null )
136      return null;
137
138    String disableJar = defaultProperties.getProperty( CASCADING_SERVICES_JAR_DISABLE, System.getProperty( CASCADING_SERVICES_JAR_DISABLE, "false" ) );
139
140    if( Boolean.valueOf( disableJar ) )
141      {
142      LOG.info( "property '{}' is set, ignoring services jar: {}", CASCADING_SERVICES_JAR_DISABLE, property );
143      return null;
144      }
145
146    try
147      {
148      URI uri = URI.create( property );
149
150      if( !uri.isAbsolute() )
151        uri = new URI( "file", uri.getAuthority(), uri.getPath(), uri.getQuery(), uri.getFragment() );
152
153      return uri.toURL();
154      }
155    catch( Exception exception )
156      {
157      LOG.warn( "property: {}, has invalid URL value: {}", CASCADING_SERVICES_JAR, property );
158      }
159
160    return null;
161    }
162
163  private static Properties loadProperties( Properties properties, String resource, ClassLoader classLoader )
164    {
165    InputStream input = classLoader.getResourceAsStream( resource );
166
167    try
168      {
169      if( input != null )
170        {
171        URL url = parseLibraryURL( classLoader, resource );
172
173        if( url != null )
174          LOG.info( "loading properties: {}, from jar: {}", resource, url );
175        else
176          LOG.info( "loading properties: {}, from CLASSPATH", resource );
177
178        properties.load( input );
179        }
180      }
181    catch( IOException exception )
182      {
183      LOG.warn( "unable to load properties from {}", resource, exception );
184      }
185
186    return properties;
187    }
188
189  private synchronized ServiceLoader getServiceUtil()
190    {
191    return ServiceLoader.getInstance( enableContainer ? libraryURL : null, exclusions );
192    }
193
194  public CascadingServices( Map<Object, Object> properties )
195    {
196    this.properties = PropertyUtil.createProperties( properties, defaultProperties );
197    this.enableContainer = PropertyUtil.getProperty( properties, CONTAINER_ENABLED, defaultProperties.getProperty( CONTAINER_ENABLED, "false" ) ).equalsIgnoreCase( "true" );
198    }
199
200  private Map<Object, Object> getProperties()
201    {
202    return properties;
203    }
204
205  public MetricsService getMetricsService()
206    {
207    if( metricsService == null )
208      metricsService = createMetricsService();
209
210    return metricsService;
211    }
212
213  public DocumentService getDocumentService()
214    {
215    if( documentService == null )
216      documentService = createDocumentService();
217
218    return documentService;
219    }
220
221  public ClientState createClientState( String id )
222    {
223    ClientState clientState = (ClientState) getServiceUtil().loadServiceFrom( defaultProperties, getProperties(), ClientState.STATE_SERVICE_CLASS_PROPERTY );
224
225    if( clientState != null )
226      {
227      clientState.initialize( this, id );
228
229      return clientState;
230      }
231
232    return ClientState.NULL;
233    }
234
235  protected MetricsService createMetricsService()
236    {
237    MetricsService service = (MetricsService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), MetricsService.METRICS_SERVICE_CLASS_PROPERTY );
238
239    if( service != null )
240      {
241      registerShutdownHook( service );
242
243      return service;
244      }
245
246    return new NullMetricsService();
247    }
248
249  protected DocumentService createDocumentService()
250    {
251    DocumentService service = (DocumentService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), DocumentService.DOCUMENT_SERVICE_CLASS_PROPERTY );
252
253    if( service != null )
254      {
255      registerShutdownHook( service );
256
257      return service;
258      }
259
260    return new NullDocumentService();
261    }
262
263  private void registerShutdownHook( final CascadingService service )
264    {
265    if( service == null )
266      return;
267
268    ShutdownUtil.addHook( new ShutdownUtil.Hook()
269    {
270    @Override
271    public Priority priority()
272      {
273      return Priority.SERVICE_PROVIDER;
274      }
275
276    @Override
277    public void execute()
278      {
279      try
280        {
281        service.stopService();
282        }
283      catch( Throwable throwable )
284        {
285        // safe to throw exception here so message is logged
286        LOG.error( "failed stopping cascading service", throwable );
287        throw new CascadeException( "failed stopping cascading service", throwable );
288        }
289      }
290    } );
291    }
292
293  /** Class NullDocumentService provides a null implementation. */
294  public static class NullDocumentService implements DocumentService
295    {
296    @Override
297    public boolean isEnabled()
298      {
299      return false;
300      }
301
302    @Override
303    public void setProperties( Map<Object, Object> properties )
304      {
305      }
306
307    @Override
308    public void startService()
309      {
310      }
311
312    @Override
313    public void stopService()
314      {
315      }
316
317    @Override
318    public void put( String key, Object object )
319      {
320      }
321
322    @Override
323    public void put( String type, String key, Object object )
324      {
325      }
326
327    @Override
328    public Map get( String type, String key )
329      {
330      return null;
331      }
332
333    @Override
334    public boolean supportsFind()
335      {
336      return false;
337      }
338
339    @Override
340    public List<Map<String, Object>> find( String type, String[] query )
341      {
342      return null;
343      }
344    }
345
346  /** Class NullMetricsService provides a null implementation. */
347  public static class NullMetricsService implements MetricsService
348    {
349    @Override
350    public boolean isEnabled()
351      {
352      return false;
353      }
354
355    @Override
356    public void increment( String[] context, int amount )
357      {
358      }
359
360    @Override
361    public void set( String[] context, String value )
362      {
363      }
364
365    @Override
366    public void set( String[] context, int value )
367      {
368      }
369
370    @Override
371    public void set( String[] context, long value )
372      {
373      }
374
375    @Override
376    public String getString( String[] context )
377      {
378      return null;
379      }
380
381    @Override
382    public int getInt( String[] context )
383      {
384      return 0;
385      }
386
387    @Override
388    public long getLong( String[] context )
389      {
390      return 0;
391      }
392
393    @Override
394    public boolean compareSet( String[] context, String isValue, String toValue )
395      {
396      return true;
397      }
398
399    @Override
400    public boolean compareSet( String[] context, int isValue, int toValue )
401      {
402      return true;
403      }
404
405    @Override
406    public boolean compareSet( String[] context, long isValue, long toValue )
407      {
408      return true;
409      }
410
411    @Override
412    public void setProperties( Map<Object, Object> properties )
413      {
414      }
415
416    @Override
417    public void startService()
418      {
419      }
420
421    @Override
422    public void stopService()
423      {
424      }
425    }
426  }