001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.management;
022
023import java.io.IOException;
024import java.io.InputStream;
025import java.net.URI;
026import java.net.URL;
027import java.net.URLClassLoader;
028import java.util.List;
029import java.util.Map;
030import java.util.Properties;
031
032import cascading.cascade.CascadeException;
033import cascading.management.state.ClientState;
034import cascading.property.PropertyUtil;
035import cascading.provider.CascadingService;
036import cascading.provider.ServiceLoader;
037import cascading.util.ShutdownUtil;
038import cascading.util.Util;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Class CascadingServices is the root class for pluggable services Cascading can call out to for distributed
044 * monitoring and management systems.
045 * <p/>
046 * Be default all services will be loaded from the jar {@code cascading/management/service.properties}
047 * ({@link #DEFAULT_PROPERTIES}) resource is found in. If the
048 * property {@link #CONTAINER_ENABLED} value is {@code false}, a ClassLoader container will not be created.
049 * <p/>
050 * For this to work, all service implementation and dependencies must be archived into a single jar.
051 * <p/>
052 * If any packages in the jar should be excluded, set a comma delimited list of names via the {@link #CONTAINER_EXCLUDE}
053 * property.
054 * <p/>
055 * If the file {@code cascading-service.properties} ({@link CascadingServices#CASCADING_SERVICES}) is found in the
056 * CLASSPATH, the {@code cascading.management.service.jar} property value will be used to search for
057 * {@code cascading/management/service.properties} resource.
058 *
059 * @see CascadingService
060 */
061public class CascadingServices
062  {
063  private static final Logger LOG = LoggerFactory.getLogger( CascadingServices.class );
064
065  public static final String CASCADING_SERVICES = "cascading-service.properties";
066  public static final String CASCADING_SERVICES_JAR = "cascading.management.service.jar";
067
068  public static final String DEFAULT_PROPERTIES = "cascading/management/service.properties";
069  public static final String CONTAINER_ENABLED = "cascading.management.container.enabled";
070  public static final String CONTAINER_EXCLUDE = "cascading.management.container.exclude";
071
072  static Properties defaultProperties;
073  static URL libraryURL;
074  static String[] exclusions;
075
076  Map<Object, Object> properties;
077
078  MetricsService metricsService;
079  DocumentService documentService;
080  boolean enableContainer;
081
082  static
083    {
084    ClassLoader classLoader = CascadingServices.class.getClassLoader();
085
086    // load all properties from cascading-services.properties
087    defaultProperties = loadProperties( new Properties(), CASCADING_SERVICES, classLoader );
088
089    libraryURL = getLibraryURL();
090
091    if( libraryURL != null )
092      classLoader = new URLClassLoader( new URL[]{libraryURL} );
093
094    // load additional, if only, properties from file in resource jar
095    defaultProperties = loadProperties( new Properties( defaultProperties ), DEFAULT_PROPERTIES, classLoader );
096
097    // if resources jar is on primary classpath, find the jar so we can isolate it when loading services
098    if( libraryURL == null )
099      libraryURL = parseLibraryURL( classLoader, DEFAULT_PROPERTIES );
100
101    exclusions = Util.removeNulls( defaultProperties.getProperty( CONTAINER_EXCLUDE, "" ).split( "," ) );
102    }
103
104  private static URL parseLibraryURL( ClassLoader classLoader, String resource )
105    {
106    URL url = classLoader.getResource( resource );
107
108    if( url != null )
109      {
110      try
111        {
112        String path = url.toURI().getSchemeSpecificPart();
113        int endIndex = path.lastIndexOf( '!' );
114
115        if( endIndex != -1 )
116          path = path.substring( 0, endIndex );
117
118        if( path.endsWith( ".jar" ) )
119          return new URL( path );
120        }
121      catch( Exception exception )
122        {
123        LOG.warn( "unable to parse resource library: {}", url, exception );
124        }
125      }
126
127    return null;
128    }
129
130  private static URL getLibraryURL()
131    {
132    String property = defaultProperties.getProperty( CASCADING_SERVICES_JAR );
133
134    if( property == null )
135      return null;
136
137    try
138      {
139      URI uri = URI.create( property );
140
141      if( !uri.isAbsolute() )
142        uri = new URI( "file", uri.getAuthority(), uri.getPath(), uri.getQuery(), uri.getFragment() );
143
144      return uri.toURL();
145      }
146    catch( Exception exception )
147      {
148      LOG.warn( "property: {}, has invalid URL value: {}", CASCADING_SERVICES_JAR, property );
149      }
150
151    return null;
152    }
153
154  private static Properties loadProperties( Properties properties, String resource, ClassLoader classLoader )
155    {
156    InputStream input = classLoader.getResourceAsStream( resource );
157
158    try
159      {
160      if( input != null )
161        {
162        URL url = parseLibraryURL( classLoader, resource );
163
164        if( url != null )
165          LOG.info( "loading properties: {}, from jar: {}", resource, url );
166        else
167          LOG.info( "loading properties: {}", resource );
168
169        properties.load( input );
170        }
171      }
172    catch( IOException exception )
173      {
174      LOG.warn( "unable to load properties from {}", resource, exception );
175      }
176
177    return properties;
178    }
179
180  private synchronized ServiceLoader getServiceUtil()
181    {
182    return ServiceLoader.getInstance( enableContainer ? libraryURL : null, exclusions );
183    }
184
185  public CascadingServices( Map<Object, Object> properties )
186    {
187    this.properties = PropertyUtil.createProperties( properties, defaultProperties );
188    this.enableContainer = PropertyUtil.getProperty( properties, CONTAINER_ENABLED, defaultProperties.getProperty( CONTAINER_ENABLED, "false" ) ).equalsIgnoreCase( "true" );
189    }
190
191  private Map<Object, Object> getProperties()
192    {
193    return properties;
194    }
195
196  public MetricsService getMetricsService()
197    {
198    if( metricsService == null )
199      metricsService = createMetricsService();
200
201    return metricsService;
202    }
203
204  public DocumentService getDocumentService()
205    {
206    if( documentService == null )
207      documentService = createDocumentService();
208
209    return documentService;
210    }
211
212  public ClientState createClientState( String id )
213    {
214    ClientState clientState = (ClientState) getServiceUtil().loadServiceFrom( defaultProperties, getProperties(), ClientState.STATE_SERVICE_CLASS_PROPERTY );
215
216    if( clientState != null )
217      {
218      clientState.initialize( this, id );
219
220      return clientState;
221      }
222
223    return ClientState.NULL;
224    }
225
226  protected MetricsService createMetricsService()
227    {
228    MetricsService service = (MetricsService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), MetricsService.METRICS_SERVICE_CLASS_PROPERTY );
229
230    if( service != null )
231      {
232      registerShutdownHook( service );
233
234      return service;
235      }
236
237    return new NullMetricsService();
238    }
239
240  protected DocumentService createDocumentService()
241    {
242    DocumentService service = (DocumentService) getServiceUtil().loadSingletonServiceFrom( defaultProperties, getProperties(), DocumentService.DOCUMENT_SERVICE_CLASS_PROPERTY );
243
244    if( service != null )
245      {
246      registerShutdownHook( service );
247
248      return service;
249      }
250
251    return new NullDocumentService();
252    }
253
254  private void registerShutdownHook( final CascadingService service )
255    {
256    if( service == null )
257      return;
258
259    ShutdownUtil.addHook( new ShutdownUtil.Hook()
260    {
261    @Override
262    public Priority priority()
263      {
264      return Priority.SERVICE_PROVIDER;
265      }
266
267    @Override
268    public void execute()
269      {
270      try
271        {
272        service.stopService();
273        }
274      catch( Throwable throwable )
275        {
276        // safe to throw exception here so message is logged
277        LOG.error( "failed stopping cascading service", throwable );
278        throw new CascadeException( "failed stopping cascading service", throwable );
279        }
280      }
281    } );
282    }
283
284  /** Class NullDocumentService provides a null implementation. */
285  public static class NullDocumentService implements DocumentService
286    {
287    @Override
288    public boolean isEnabled()
289      {
290      return false;
291      }
292
293    @Override
294    public void setProperties( Map<Object, Object> properties )
295      {
296      }
297
298    @Override
299    public void startService()
300      {
301      }
302
303    @Override
304    public void stopService()
305      {
306      }
307
308    @Override
309    public void put( String key, Object object )
310      {
311      }
312
313    @Override
314    public void put( String type, String key, Object object )
315      {
316      }
317
318    @Override
319    public Map get( String type, String key )
320      {
321      return null;
322      }
323
324    @Override
325    public boolean supportsFind()
326      {
327      return false;
328      }
329
330    @Override
331    public List<Map<String, Object>> find( String type, String[] query )
332      {
333      return null;
334      }
335    }
336
337  /** Class NullMetricsService provides a null implementation. */
338  public static class NullMetricsService implements MetricsService
339    {
340    @Override
341    public boolean isEnabled()
342      {
343      return false;
344      }
345
346    @Override
347    public void increment( String[] context, int amount )
348      {
349      }
350
351    @Override
352    public void set( String[] context, String value )
353      {
354      }
355
356    @Override
357    public void set( String[] context, int value )
358      {
359      }
360
361    @Override
362    public void set( String[] context, long value )
363      {
364      }
365
366    @Override
367    public String getString( String[] context )
368      {
369      return null;
370      }
371
372    @Override
373    public int getInt( String[] context )
374      {
375      return 0;
376      }
377
378    @Override
379    public long getLong( String[] context )
380      {
381      return 0;
382      }
383
384    @Override
385    public boolean compareSet( String[] context, String isValue, String toValue )
386      {
387      return true;
388      }
389
390    @Override
391    public boolean compareSet( String[] context, int isValue, int toValue )
392      {
393      return true;
394      }
395
396    @Override
397    public boolean compareSet( String[] context, long isValue, long toValue )
398      {
399      return true;
400      }
401
402    @Override
403    public void setProperties( Map<Object, Object> properties )
404      {
405      }
406
407    @Override
408    public void startService()
409      {
410      }
411
412    @Override
413    public void stopService()
414      {
415      }
416    }
417  }