001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.util;
022
023import java.beans.Expression;
024import java.io.BufferedReader;
025import java.io.File;
026import java.io.FileWriter;
027import java.io.IOException;
028import java.io.InputStreamReader;
029import java.io.Writer;
030import java.lang.reflect.Constructor;
031import java.lang.reflect.Field;
032import java.lang.reflect.Method;
033import java.lang.reflect.Type;
034import java.net.URL;
035import java.net.URLDecoder;
036import java.security.MessageDigest;
037import java.security.NoSuchAlgorithmException;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.Collection;
041import java.util.Collections;
042import java.util.Enumeration;
043import java.util.HashMap;
044import java.util.HashSet;
045import java.util.IdentityHashMap;
046import java.util.Iterator;
047import java.util.LinkedHashSet;
048import java.util.List;
049import java.util.Map;
050import java.util.Set;
051import java.util.TreeSet;
052import java.util.UUID;
053import java.util.concurrent.Callable;
054import java.util.concurrent.ExecutionException;
055import java.util.concurrent.ExecutorService;
056import java.util.concurrent.Executors;
057import java.util.concurrent.Future;
058import java.util.concurrent.TimeUnit;
059import java.util.concurrent.TimeoutException;
060import java.util.regex.Pattern;
061
062import cascading.CascadingException;
063import cascading.flow.FlowException;
064import cascading.tap.MultiSourceTap;
065import cascading.tap.Tap;
066import cascading.tuple.coerce.Coercions;
067import cascading.util.jgrapht.ComponentAttributeProvider;
068import cascading.util.jgrapht.DOTExporter;
069import cascading.util.jgrapht.EdgeNameProvider;
070import cascading.util.jgrapht.IntegerNameProvider;
071import cascading.util.jgrapht.VertexNameProvider;
072import org.jgrapht.DirectedGraph;
073import org.jgrapht.graph.SimpleDirectedGraph;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077/** Class Util provides reusable operations. */
078public class Util
079  {
080  /**
081   * On OS X only, and if the graphviz dot binary is installed, when true, dot will be invoked to convert the dot file
082   * to a pdf document.
083   */
084  public static final String CONVERT_DOT_TO_PDF = "util.dot.to.pdf.enabled";
085  public static int ID_LENGTH = 32;
086
087  private static final Logger LOG = LoggerFactory.getLogger( Util.class );
088  private static final String HEXES = "0123456789ABCDEF";
089
090  public static final boolean IS_OSX = System.getProperty( "os.name" ).toLowerCase().contains( "Mac OS X".toLowerCase() );
091  public static final boolean HAS_DOT_EXEC = IS_OSX && Boolean.getBoolean( CONVERT_DOT_TO_PDF ) && hasDOT();
092
093  public static <K, V> HashMap<K, V> createHashMap()
094    {
095    return new HashMap<K, V>();
096    }
097
098  public static <K, V> boolean reverseMap( Map<V, K> from, Map<K, V> to )
099    {
100    boolean dupes = false;
101
102    for( Map.Entry<V, K> entry : from.entrySet() )
103      dupes |= to.put( entry.getValue(), entry.getKey() ) != null;
104
105    return dupes;
106    }
107
108  public static <V> Set<V> createIdentitySet()
109    {
110    return Collections.<V>newSetFromMap( new IdentityHashMap() );
111    }
112
113  public static <V> Set<V> createIdentitySet( Collection<V> collection )
114    {
115    Set<V> identitySet = createIdentitySet();
116
117    if( collection != null )
118      identitySet.addAll( collection );
119
120    return identitySet;
121    }
122
123  public static <V> V getFirst( Collection<V> collection )
124    {
125    if( collection == null || collection.isEmpty() )
126      return null;
127
128    return collection.iterator().next();
129    }
130
131  public static <N extends Number> N max( Collection<N> collection )
132    {
133    return new TreeSet<>( collection ).first();
134    }
135
136  public static <N extends Number> N min( Collection<N> collection )
137    {
138    return new TreeSet<>( collection ).last();
139    }
140
141  public static <T> Set<T> narrowSet( Class<T> type, Collection collection )
142    {
143    return narrowSet( type, collection.iterator() );
144    }
145
146  public static <T> Set<T> narrowSet( Class<T> type, Collection collection, boolean include )
147    {
148    return narrowSet( type, collection.iterator(), include );
149    }
150
151  public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator )
152    {
153    return narrowSet( type, iterator, true );
154    }
155
156  public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator, boolean include )
157    {
158    Set<T> set = new HashSet<>();
159
160    while( iterator.hasNext() )
161      {
162      Object o = iterator.next();
163
164      if( type.isInstance( o ) == include )
165        set.add( (T) o );
166      }
167
168    return set;
169    }
170
171  public static <T> boolean contains( Class<T> type, Collection collection )
172    {
173    return contains( type, collection.iterator() );
174    }
175
176  public static <T> boolean contains( Class<T> type, Iterator iterator )
177    {
178    while( iterator.hasNext() )
179      {
180      Object o = iterator.next();
181
182      if( type.isInstance( o ) )
183        return true;
184      }
185
186    return false;
187    }
188
189  public static <T> Set<T> differenceIdentity( Set<T> lhs, Set<T> rhs )
190    {
191    Set<T> diff = createIdentitySet( lhs );
192
193    diff.removeAll( rhs );
194
195    return diff;
196    }
197
198  public static synchronized String createUniqueIDWhichStartsWithAChar()
199    {
200    String value;
201
202    do
203      {
204      value = createUniqueID();
205      }
206    while( Character.isDigit( value.charAt( 0 ) ) );
207
208    return value;
209    }
210
211  public static synchronized String createUniqueID()
212    {
213    // creates a cryptographically secure random value
214    String value = UUID.randomUUID().toString();
215    return value.toUpperCase().replaceAll( "-", "" );
216    }
217
218  public static String createID( String rawID )
219    {
220    return createID( rawID.getBytes() );
221    }
222
223  /**
224   * Method CreateID returns a HEX hash of the given bytes with length 32 characters long.
225   *
226   * @param bytes the bytes
227   * @return string
228   */
229  public static String createID( byte[] bytes )
230    {
231    try
232      {
233      return getHex( MessageDigest.getInstance( "MD5" ).digest( bytes ) );
234      }
235    catch( NoSuchAlgorithmException exception )
236      {
237      throw new RuntimeException( "unable to digest string" );
238      }
239    }
240
241  public static String getHex( byte[] bytes )
242    {
243    if( bytes == null )
244      return null;
245
246    final StringBuilder hex = new StringBuilder( 2 * bytes.length );
247
248    for( final byte b : bytes )
249      hex.append( HEXES.charAt( ( b & 0xF0 ) >> 4 ) ).append( HEXES.charAt( b & 0x0F ) );
250
251    return hex.toString();
252    }
253
254  public static byte[] longToByteArray( long value )
255    {
256    return new byte[]{
257      (byte) ( value >> 56 ),
258      (byte) ( value >> 48 ),
259      (byte) ( value >> 40 ),
260      (byte) ( value >> 32 ),
261      (byte) ( value >> 24 ),
262      (byte) ( value >> 16 ),
263      (byte) ( value >> 8 ),
264      (byte) value
265    };
266    }
267
268  public static byte[] intToByteArray( int value )
269    {
270    return new byte[]{
271      (byte) ( value >> 24 ),
272      (byte) ( value >> 16 ),
273      (byte) ( value >> 8 ),
274      (byte) value
275    };
276    }
277
278  public static <T> T[] copy( T[] source )
279    {
280    if( source == null )
281      return null;
282
283    return Arrays.copyOf( source, source.length );
284    }
285
286  public static String unique( String value, String delim )
287    {
288    String[] split = value.split( delim );
289
290    Set<String> values = new LinkedHashSet<String>();
291
292    Collections.addAll( values, split );
293
294    return join( values, delim );
295    }
296
297  /**
298   * This method joins the values in the given list with the delim String value.
299   *
300   * @param list
301   * @param delim
302   * @return String
303   */
304  public static String join( int[] list, String delim )
305    {
306    return join( list, delim, false );
307    }
308
309  public static String join( int[] list, String delim, boolean printNull )
310    {
311    StringBuffer buffer = new StringBuffer();
312    int count = 0;
313
314    for( Object s : list )
315      {
316      if( count != 0 )
317        buffer.append( delim );
318
319      if( printNull || s != null )
320        buffer.append( s );
321
322      count++;
323      }
324
325    return buffer.toString();
326    }
327
328  public static String join( String delim, String... strings )
329    {
330    return join( delim, false, strings );
331    }
332
333  public static String join( String delim, boolean printNull, String... strings )
334    {
335    return join( strings, delim, printNull );
336    }
337
338  /**
339   * This method joins the values in the given list with the delim String value.
340   *
341   * @param list
342   * @param delim
343   * @return a String
344   */
345  public static String join( Object[] list, String delim )
346    {
347    return join( list, delim, false );
348    }
349
350  public static String join( Object[] list, String delim, boolean printNull )
351    {
352    return join( list, delim, printNull, 0 );
353    }
354
355  public static String join( Object[] list, String delim, boolean printNull, int beginAt )
356    {
357    return join( list, delim, printNull, beginAt, list.length - beginAt );
358    }
359
360  public static String join( Object[] list, String delim, boolean printNull, int beginAt, int length )
361    {
362    StringBuffer buffer = new StringBuffer();
363    int count = 0;
364
365    for( int i = beginAt; i < beginAt + length; i++ )
366      {
367      Object s = list[ i ];
368      if( count != 0 )
369        buffer.append( delim );
370
371      if( printNull || s != null )
372        buffer.append( s );
373
374      count++;
375      }
376
377    return buffer.toString();
378    }
379
380  public static String join( Iterable iterable, String delim, boolean printNull )
381    {
382    int count = 0;
383
384    StringBuilder buffer = new StringBuilder();
385
386    for( Object s : iterable )
387      {
388      if( count != 0 )
389        buffer.append( delim );
390
391      if( printNull || s != null )
392        buffer.append( s );
393
394      count++;
395      }
396
397    return buffer.toString();
398    }
399
400  /**
401   * This method joins each value in the collection with a tab character as the delimiter.
402   *
403   * @param collection
404   * @return a String
405   */
406  public static String join( Collection collection )
407    {
408    return join( collection, "\t" );
409    }
410
411  /**
412   * This method joins each valuein the collection with the given delimiter.
413   *
414   * @param collection
415   * @param delim
416   * @return a String
417   */
418  public static String join( Collection collection, String delim )
419    {
420    return join( collection, delim, false );
421    }
422
423  public static String join( Collection collection, String delim, boolean printNull )
424    {
425    StringBuffer buffer = new StringBuffer();
426
427    join( buffer, collection, delim, printNull );
428
429    return buffer.toString();
430    }
431
432  /**
433   * This method joins each value in the collection with the given delimiter. All results are appended to the
434   * given {@link StringBuffer} instance.
435   *
436   * @param buffer
437   * @param collection
438   * @param delim
439   */
440  public static void join( StringBuffer buffer, Collection collection, String delim )
441    {
442    join( buffer, collection, delim, false );
443    }
444
445  public static void join( StringBuffer buffer, Collection collection, String delim, boolean printNull )
446    {
447    int count = 0;
448
449    for( Object s : collection )
450      {
451      if( count != 0 )
452        buffer.append( delim );
453
454      if( printNull || s != null )
455        buffer.append( s );
456
457      count++;
458      }
459    }
460
461  public static <T> List<T> split( Class<T> type, String values )
462    {
463    return split( type, ",", values );
464    }
465
466  public static <T> List<T> split( Class<T> type, String delim, String values )
467    {
468    String[] split = values.split( delim );
469
470    List<T> results = new ArrayList<>();
471
472    for( String value : split )
473      results.add( Coercions.<T>coerce( value, type ) );
474
475    return results;
476    }
477
478  public static String[] removeNulls( String... strings )
479    {
480    List<String> list = new ArrayList<String>();
481
482    for( String string : strings )
483      {
484      if( string != null )
485        list.add( string );
486      }
487
488    return list.toArray( new String[ list.size() ] );
489    }
490
491  public static Collection<String> quote( Collection<String> collection, String quote )
492    {
493    List<String> list = new ArrayList<String>();
494
495    for( String string : collection )
496      list.add( quote + string + quote );
497
498    return list;
499    }
500
501  public static String print( Collection collection, String delim )
502    {
503    StringBuffer buffer = new StringBuffer();
504
505    print( buffer, collection, delim );
506
507    return buffer.toString();
508    }
509
510  public static void print( StringBuffer buffer, Collection collection, String delim )
511    {
512    int count = 0;
513
514    for( Object s : collection )
515      {
516      if( count != 0 )
517        buffer.append( delim );
518
519      buffer.append( "[" );
520      buffer.append( s );
521      buffer.append( "]" );
522
523      count++;
524      }
525    }
526
527  /**
528   * This method attempts to remove any username and password from the given url String.
529   *
530   * @param url
531   * @return a String
532   */
533  public static String sanitizeUrl( String url )
534    {
535    if( url == null )
536      return null;
537
538    return url.replaceAll( "(?<=//).*:.*@", "" );
539    }
540
541  /**
542   * This method attempts to remove duplicate consecutive forward slashes from the given url.
543   *
544   * @param url
545   * @return a String
546   */
547  public static String normalizeUrl( String url )
548    {
549    if( url == null )
550      return null;
551
552    return url.replaceAll( "([^:]/)/{2,}", "$1/" );
553    }
554
555  /**
556   * This method returns the {@link Object#toString()} of the given object, or an empty String if the object
557   * is null.
558   *
559   * @param object
560   * @return a String
561   */
562  public static String toNull( Object object )
563    {
564    if( object == null )
565      return "";
566
567    return object.toString();
568    }
569
570  /**
571   * This method truncates the given String value to the given size, but appends an ellipse ("...") if the
572   * String is larger than maxSize.
573   *
574   * @param string
575   * @param maxSize
576   * @return a String
577   */
578  public static String truncate( String string, int maxSize )
579    {
580    string = toNull( string );
581
582    if( string.length() <= maxSize )
583      return string;
584
585    return String.format( "%s...", string.subSequence( 0, maxSize - 3 ) );
586    }
587
588  public static void printGraph( String filename, SimpleDirectedGraph graph )
589    {
590    try
591      {
592      new File( filename ).getParentFile().mkdirs();
593      Writer writer = new FileWriter( filename );
594
595      try
596        {
597        printGraph( writer, graph );
598        }
599      finally
600        {
601        writer.close();
602        }
603      }
604    catch( IOException exception )
605      {
606      LOG.error( "failed printing graph to {}, with exception: {}", filename, exception );
607      }
608    }
609
610  @SuppressWarnings({"unchecked"})
611  private static void printGraph( Writer writer, SimpleDirectedGraph graph )
612    {
613    DOTExporter dot = new DOTExporter( new IntegerNameProvider(), new VertexNameProvider()
614    {
615    public String getVertexName( Object object )
616      {
617      if( object == null )
618        return "none";
619
620      return object.toString().replaceAll( "\"", "\'" );
621      }
622    }, new EdgeNameProvider<Object>()
623    {
624    public String getEdgeName( Object object )
625      {
626      if( object == null )
627        return "none";
628
629      return object.toString().replaceAll( "\"", "\'" );
630      }
631    }
632    );
633
634    dot.export( writer, graph );
635    }
636
637  /**
638   * This method removes all nulls from the given List.
639   *
640   * @param list
641   */
642  @SuppressWarnings({"StatementWithEmptyBody"})
643  public static void removeAllNulls( List list )
644    {
645    while( list.remove( null ) )
646      ;
647    }
648
649  public static void writeDOT( Writer writer, DirectedGraph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider )
650    {
651    new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider ).export( writer, graph );
652    }
653
654  public static void writeDOT( Writer writer, DirectedGraph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider,
655                               ComponentAttributeProvider vertexAttributeProvider, ComponentAttributeProvider edgeAttributeProvider )
656    {
657    new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider, vertexAttributeProvider, edgeAttributeProvider ).export( writer, graph );
658    }
659
660  public static boolean isEmpty( String string )
661    {
662    return string == null || string.isEmpty();
663    }
664
665  private static String[] findSplitName( String path )
666    {
667    String separator = "/";
668
669    if( path.lastIndexOf( "/" ) < path.lastIndexOf( "\\" ) )
670      separator = "\\\\";
671
672    String[] split = path.split( separator );
673
674    path = split[ split.length - 1 ];
675
676    path = path.substring( 0, path.lastIndexOf( '.' ) ); // remove .jar
677
678    return path.split( "-(?=\\d)", 2 );
679    }
680
681  public static String findVersion( String path )
682    {
683    if( path == null || path.isEmpty() )
684      return null;
685
686    String[] split = findSplitName( path );
687
688    if( split.length == 2 )
689      return split[ 1 ];
690
691    return null;
692    }
693
694  public static String findName( String path )
695    {
696    if( path == null || path.isEmpty() )
697      return null;
698
699    String[] split = findSplitName( path );
700
701    if( split.length == 0 )
702      return null;
703
704    return split[ 0 ];
705    }
706
707  public static long getSourceModified( Object confCopy, Iterator<Tap> values, long sinkModified ) throws IOException
708    {
709    long sourceModified = 0;
710
711    while( values.hasNext() )
712      {
713      Tap source = values.next();
714
715      if( source instanceof MultiSourceTap )
716        return getSourceModified( confCopy, ( (MultiSourceTap) source ).getChildTaps(), sinkModified );
717
718      sourceModified = source.getModifiedTime( confCopy );
719
720      // source modified returns zero if does not exist
721      // this should minimize number of times we touch any file meta-data server
722      if( sourceModified == 0 && !source.resourceExists( confCopy ) )
723        throw new FlowException( "source does not exist: " + source );
724
725      if( sinkModified < sourceModified )
726        return sourceModified;
727      }
728
729    return sourceModified;
730    }
731
732  public static long getSinkModified( Object config, Collection<Tap> sinks ) throws IOException
733    {
734    long sinkModified = Long.MAX_VALUE;
735
736    for( Tap sink : sinks )
737      {
738      if( sink.isReplace() || sink.isUpdate() )
739        sinkModified = -1L;
740      else
741        {
742        if( !sink.resourceExists( config ) )
743          sinkModified = 0L;
744        else
745          sinkModified = Math.min( sinkModified, sink.getModifiedTime( config ) ); // return youngest mod date
746        }
747      }
748    return sinkModified;
749    }
750
751  public static String getTypeName( Type type )
752    {
753    if( type == null )
754      return null;
755
756    return type instanceof Class ? ( (Class) type ).getCanonicalName() : type.toString();
757    }
758
759  public static String getSimpleTypeName( Type type )
760    {
761    if( type == null )
762      return null;
763
764    return type instanceof Class ? ( (Class) type ).getSimpleName() : type.toString();
765    }
766
767  public static String[] typeNames( Type[] types )
768    {
769    String[] names = new String[ types.length ];
770
771    for( int i = 0; i < types.length; i++ )
772      names[ i ] = getTypeName( types[ i ] );
773
774    return names;
775    }
776
777  public static String[] simpleTypeNames( Type[] types )
778    {
779    String[] names = new String[ types.length ];
780
781    for( int i = 0; i < types.length; i++ )
782      names[ i ] = getSimpleTypeName( types[ i ] );
783
784    return names;
785    }
786
787  public static boolean containsNull( Object[] values )
788    {
789    for( Object value : values )
790      {
791      if( value == null )
792        return true;
793      }
794
795    return false;
796    }
797
798  public static void safeSleep( long durationMillis )
799    {
800    try
801      {
802      Thread.sleep( durationMillis );
803      }
804    catch( InterruptedException exception )
805      {
806      // do nothing
807      }
808    }
809
810  public static void writePDF( String path )
811    {
812    if( !HAS_DOT_EXEC )
813      return;
814
815    // dot *.dot -Tpdf -O -Nshape=box
816    File file = new File( path );
817    execProcess( file.getParentFile(), "dot", file.getName(), "-Tpdf", "-O" );
818    }
819
820  static boolean hasDOT()
821    {
822    return execProcess( null, "which", "dot" ) == 0;
823    }
824
825  public static int execProcess( File parentFile, String... command )
826    {
827    try
828      {
829      String commandLine = join( command, " " );
830
831      LOG.debug( "command: {}", commandLine );
832
833      Process process = Runtime.getRuntime().exec( commandLine, null, parentFile );
834
835      int result = process.waitFor();
836
837      BufferedReader reader = new BufferedReader( new InputStreamReader( process.getInputStream() ) );
838
839      String line = reader.readLine();
840
841      while( line != null )
842        {
843        LOG.warn( "{} stdout returned: {}", command[ 0 ], line );
844        line = reader.readLine();
845        }
846
847      reader = new BufferedReader( new InputStreamReader( process.getErrorStream() ) );
848
849      line = reader.readLine();
850
851      while( line != null )
852        {
853        LOG.warn( "{} stderr returned: {}", command[ 0 ], line );
854        line = reader.readLine();
855        }
856
857      return result;
858      }
859    catch( IOException exception )
860      {
861      LOG.warn( "unable to exec " + command[ 0 ], exception );
862      }
863    catch( InterruptedException exception )
864      {
865      LOG.warn( "interrupted exec " + command[ 0 ], exception );
866      }
867
868    return Integer.MIN_VALUE;
869    }
870
871  public static String formatDurationFromMillis( long duration )
872    {
873    if( duration / 1000 / 60 / 60 / 24 > 0.0 )
874      return formatDurationDHMSms( duration );
875    if( duration / 1000 / 60 / 60 > 0.0 )
876      return formatDurationHMSms( duration );
877    else
878      return formatDurationMSms( duration );
879    }
880
881  public static String formatDurationMSms( long duration )
882    {
883    long ms = duration % 1000;
884    long durationSeconds = duration / 1000;
885    long seconds = durationSeconds % 60;
886    long minutes = durationSeconds / 60;
887
888    return String.format( "%02d:%02d.%03d", minutes, seconds, ms );
889    }
890
891  public static String formatDurationHMSms( long duration )
892    {
893    long ms = duration % 1000;
894    long durationSeconds = duration / 1000;
895    long seconds = durationSeconds % 60;
896    long minutes = ( durationSeconds / 60 ) % 60;
897    long hours = durationSeconds / 60 / 60;
898
899    return String.format( "%02d:%02d:%02d.%03d", hours, minutes, seconds, ms );
900    }
901
902  public static String formatDurationDHMSms( long duration )
903    {
904    long ms = duration % 1000;
905    long durationSeconds = duration / 1000;
906    long seconds = durationSeconds % 60;
907    long minutes = ( durationSeconds / 60 ) % 60;
908    long hours = ( durationSeconds / 60 / 60 ) % 24;
909    long days = durationSeconds / 60 / 60 / 24;
910
911    return String.format( "%02d:%02d:%02d:%02d.%03d", days, hours, minutes, seconds, ms );
912    }
913
914  /**
915   * Converts a given comma separated String of Exception names into a List of classes.
916   * ClassNotFound exceptions are ignored if no warningMessage is given, otherwise logged as a warning.
917   *
918   * @param classNames A comma separated String of Exception names.
919   * @return List of Exception classes.
920   */
921  public static Set<Class<? extends Exception>> asClasses( String classNames, String warningMessage )
922    {
923    Set<Class<? extends Exception>> exceptionClasses = new HashSet<Class<? extends Exception>>();
924    String[] split = classNames.split( "," );
925
926    // possibly user provided type, load from context
927    ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
928
929    for( String className : split )
930      {
931      if( className != null )
932        className = className.trim();
933
934      if( isEmpty( className ) )
935        continue;
936
937      try
938        {
939        Class<? extends Exception> exceptionClass = contextClassLoader.loadClass( className ).asSubclass( Exception.class );
940
941        exceptionClasses.add( exceptionClass );
942        }
943      catch( ClassNotFoundException exception )
944        {
945        if( !Util.isEmpty( warningMessage ) )
946          LOG.warn( "{}: {}", warningMessage, className );
947        }
948      }
949
950    return exceptionClasses;
951    }
952
953  public static Boolean submitWithTimeout( Callable<Boolean> task, int timeout, TimeUnit timeUnit ) throws Exception
954    {
955    ExecutorService executor = Executors.newFixedThreadPool( 1 );
956
957    Future<Boolean> future = executor.submit( task );
958
959    executor.shutdown();
960
961    try
962      {
963      return future.get( timeout, timeUnit );
964      }
965    catch( TimeoutException exception )
966      {
967      future.cancel( true );
968      }
969    catch( ExecutionException exception )
970      {
971      Throwable cause = exception.getCause();
972
973      if( cause instanceof RuntimeException )
974        throw (RuntimeException) cause;
975
976      throw (Exception) cause;
977      }
978
979    return null;
980    }
981
982  public interface RetryOperator<T>
983    {
984    T operate() throws Exception;
985
986    boolean rethrow( Exception exception );
987    }
988
989  public static <T> T retry( Logger logger, int retries, int secondsDelay, String message, RetryOperator<T> operator ) throws Exception
990    {
991    Exception saved = null;
992
993    for( int i = 0; i < retries; i++ )
994      {
995      try
996        {
997        return operator.operate();
998        }
999      catch( Exception exception )
1000        {
1001        if( operator.rethrow( exception ) )
1002          {
1003          logger.warn( message + ", but not retrying", exception );
1004
1005          throw exception;
1006          }
1007
1008        saved = exception;
1009
1010        logger.warn( message + ", attempt: " + ( i + 1 ), exception );
1011
1012        try
1013          {
1014          Thread.sleep( secondsDelay * 1000 );
1015          }
1016        catch( InterruptedException exception1 )
1017          {
1018          // do nothing
1019          }
1020        }
1021      }
1022
1023    logger.warn( message + ", done retrying after attempts: " + retries, saved );
1024
1025    throw saved;
1026    }
1027
1028  public static Object createProtectedObject( Class type, Object[] parameters, Class[] parameterTypes )
1029    {
1030    try
1031      {
1032      Constructor constructor = type.getDeclaredConstructor( parameterTypes );
1033
1034      constructor.setAccessible( true );
1035
1036      return constructor.newInstance( parameters );
1037      }
1038    catch( Exception exception )
1039      {
1040      LOG.error( "unable to instantiate type: {}, with exception: {}", type.getName(), exception );
1041
1042      throw new FlowException( "unable to instantiate type: " + type.getName(), exception );
1043      }
1044    }
1045
1046  public static boolean hasClass( String typeString )
1047    {
1048    try
1049      {
1050      Util.class.getClassLoader().loadClass( typeString );
1051
1052      return true;
1053      }
1054    catch( ClassNotFoundException exception )
1055      {
1056      return false;
1057      }
1058    }
1059
1060  public static <T> T newInstance( String className, Object... parameters )
1061    {
1062    try
1063      {
1064      Class<T> type = (Class<T>) Util.class.getClassLoader().loadClass( className );
1065
1066      return newInstance( type, parameters );
1067      }
1068    catch( ClassNotFoundException exception )
1069      {
1070      throw new CascadingException( "unable to load class: " + className, exception );
1071      }
1072    }
1073
1074  public static <T> T newInstance( Class<T> target, Object... parameters )
1075    {
1076    // using Expression makes sure that constructors using sub-types properly work, otherwise we get a
1077    // NoSuchMethodException.
1078    Expression expr = new Expression( target, "new", parameters );
1079
1080    try
1081      {
1082      return (T) expr.getValue();
1083      }
1084    catch( Exception exception )
1085      {
1086      throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception );
1087      }
1088    }
1089
1090  public static Object invokeStaticMethod( String typeString, String methodName, Object[] parameters, Class[] parameterTypes )
1091    {
1092    Class type = loadClass( typeString );
1093
1094    return invokeStaticMethod( type, methodName, parameters, parameterTypes );
1095    }
1096
1097  public static Class<?> loadClass( String typeString )
1098    {
1099    try
1100      {
1101      return Thread.currentThread().getContextClassLoader().loadClass( typeString );
1102      }
1103    catch( ClassNotFoundException exception )
1104      {
1105      throw new CascadingException( "unable to load class: " + typeString, exception );
1106      }
1107    }
1108
1109  public static Class<?> loadClassSafe( String typeString )
1110    {
1111    try
1112      {
1113      return Thread.currentThread().getContextClassLoader().loadClass( typeString );
1114      }
1115    catch( ClassNotFoundException exception )
1116      {
1117      return null;
1118      }
1119    }
1120
1121  public static Object invokeStaticMethod( Class type, String methodName, Object[] parameters, Class[] parameterTypes )
1122    {
1123    try
1124      {
1125      Method method = type.getDeclaredMethod( methodName, parameterTypes );
1126
1127      method.setAccessible( true );
1128
1129      return method.invoke( null, parameters );
1130      }
1131    catch( Exception exception )
1132      {
1133      throw new CascadingException( "unable to invoke static method: " + type.getName() + "." + methodName, exception );
1134      }
1135    }
1136
1137  public static boolean hasInstanceMethod( Object target, String methodName, Class[] parameterTypes )
1138    {
1139    try
1140      {
1141      return target.getClass().getMethod( methodName, parameterTypes ) != null;
1142      }
1143    catch( NoSuchMethodException exception )
1144      {
1145      return false;
1146      }
1147    }
1148
1149  public static Object invokeInstanceMethodSafe( Object target, String methodName, Object[] parameters, Class[] parameterTypes )
1150    {
1151    try
1152      {
1153      return invokeInstanceMethod( target, methodName, parameters, parameterTypes );
1154      }
1155    catch( Exception exception )
1156      {
1157      return null;
1158      }
1159    }
1160
1161  public static Object invokeInstanceMethod( Object target, String methodName, Object[] parameters, Class[] parameterTypes )
1162    {
1163    try
1164      {
1165      Method method = target.getClass().getMethod( methodName, parameterTypes );
1166
1167      method.setAccessible( true );
1168
1169      return method.invoke( target, parameters );
1170      }
1171    catch( Exception exception )
1172      {
1173      throw new CascadingException( "unable to invoke instance method: " + target.getClass().getName() + "." + methodName, exception );
1174      }
1175    }
1176
1177  public static <R> R returnInstanceFieldIfExistsSafe( Object target, String fieldName )
1178    {
1179    try
1180      {
1181      return returnInstanceFieldIfExists( target, fieldName );
1182      }
1183    catch( Exception exception )
1184      {
1185      // do nothing
1186      return null;
1187      }
1188    }
1189
1190  public static Object invokeConstructor( String className, Object[] parameters, Class[] parameterTypes )
1191    {
1192    try
1193      {
1194      Class type = Util.class.getClassLoader().loadClass( className );
1195
1196      return invokeConstructor( type, parameters, parameterTypes );
1197      }
1198    catch( ClassNotFoundException exception )
1199      {
1200      throw new CascadingException( "unable to load class: " + className, exception );
1201      }
1202    }
1203
1204  public static <T> T invokeConstructor( Class<T> target, Object[] parameters, Class[] parameterTypes )
1205    {
1206    try
1207      {
1208      Constructor<T> constructor = target.getConstructor( parameterTypes );
1209
1210      constructor.setAccessible( true );
1211
1212      return constructor.newInstance( parameters );
1213      }
1214    catch( Exception exception )
1215      {
1216      throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception );
1217      }
1218    }
1219
1220  public static <R> R returnInstanceFieldIfExists( Object target, String fieldName )
1221    {
1222    try
1223      {
1224      Class<?> type = target.getClass();
1225      Field field = getDeclaredField( fieldName, type );
1226
1227      field.setAccessible( true );
1228
1229      return (R) field.get( target );
1230      }
1231    catch( Exception exception )
1232      {
1233      throw new CascadingException( "unable to get instance field: " + target.getClass().getName() + "." + fieldName, exception );
1234      }
1235    }
1236
1237  public static <R> boolean setInstanceFieldIfExistsSafe( Object target, String fieldName, R value )
1238    {
1239    try
1240      {
1241      setInstanceFieldIfExists( target, fieldName, value );
1242      }
1243    catch( Exception exception )
1244      {
1245      return false;
1246      }
1247
1248    return true;
1249    }
1250
1251  public static <R> void setInstanceFieldIfExists( Object target, String fieldName, R value )
1252    {
1253    try
1254      {
1255      Class<?> type = target.getClass();
1256      Field field = getDeclaredField( fieldName, type );
1257
1258      field.setAccessible( true );
1259
1260      field.set( target, value );
1261      }
1262    catch( Exception exception )
1263      {
1264      throw new CascadingException( "unable to set instance field: " + target.getClass().getName() + "." + fieldName, exception );
1265      }
1266    }
1267
1268  private static Field getDeclaredField( String fieldName, Class<?> type )
1269    {
1270    if( type == Object.class )
1271      {
1272      if( LOG.isDebugEnabled() )
1273        LOG.debug( "did not find {} field on {}", fieldName, type.getName() );
1274
1275      return null;
1276      }
1277
1278    try
1279      {
1280      return type.getDeclaredField( fieldName );
1281      }
1282    catch( NoSuchFieldException exception )
1283      {
1284      return getDeclaredField( fieldName, type.getSuperclass() );
1285      }
1286    }
1287
1288  public static String makePath( String prefix, String name )
1289    {
1290    if( name == null || name.isEmpty() )
1291      throw new IllegalArgumentException( "name may not be null or empty " );
1292
1293    if( prefix == null || prefix.isEmpty() )
1294      prefix = Long.toString( (long) ( Math.random() * 10000000000L ) );
1295
1296    name = cleansePathName( name.substring( 0, name.length() < 25 ? name.length() : 25 ) );
1297
1298    return prefix + "/" + name + "/";
1299    }
1300
1301  public static String cleansePathName( String name )
1302    {
1303    return name.replaceAll( "\\s+|\\*|\\+|/+", "_" );
1304    }
1305
1306  public static Class findMainClass( Class defaultType, String packageExclude )
1307    {
1308    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
1309
1310    for( StackTraceElement stackTraceElement : stackTrace )
1311      {
1312      if( stackTraceElement.getMethodName().equals( "main" ) && !stackTraceElement.getClassName().startsWith( packageExclude ) )
1313        {
1314        try
1315          {
1316          LOG.info( "resolving application jar from found main method on: {}", stackTraceElement.getClassName() );
1317
1318          return Thread.currentThread().getContextClassLoader().loadClass( stackTraceElement.getClassName() );
1319          }
1320        catch( ClassNotFoundException exception )
1321          {
1322          LOG.warn( "unable to load class while discovering application jar: {}", stackTraceElement.getClassName(), exception );
1323          }
1324        }
1325      }
1326
1327    LOG.info( "using default application jar, may cause class not found exceptions on the cluster" );
1328
1329    return defaultType;
1330    }
1331
1332  public static String findContainingJar( Class<?> type )
1333    {
1334    ClassLoader classLoader = type.getClassLoader();
1335
1336    String classFile = type.getName().replaceAll( "\\.", "/" ) + ".class";
1337
1338    try
1339      {
1340      for( Enumeration<URL> iterator = classLoader.getResources( classFile ); iterator.hasMoreElements(); )
1341        {
1342        URL url = iterator.nextElement();
1343
1344        if( !"jar".equals( url.getProtocol() ) )
1345          continue;
1346
1347        String path = url.getPath();
1348
1349        if( path.startsWith( "file:" ) )
1350          path = path.substring( "file:".length() );
1351
1352        path = URLDecoder.decode( path, "UTF-8" );
1353
1354        return path.replaceAll( "!.*$", "" );
1355        }
1356      }
1357    catch( IOException exception )
1358      {
1359      throw new CascadingException( exception );
1360      }
1361
1362    return null;
1363    }
1364
1365  public static boolean containsWhitespace( String string )
1366    {
1367    return Pattern.compile( "\\s" ).matcher( string ).find();
1368    }
1369  }