001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.util;
022
023import java.beans.Expression;
024import java.io.BufferedReader;
025import java.io.File;
026import java.io.FileWriter;
027import java.io.IOException;
028import java.io.InputStreamReader;
029import java.io.Writer;
030import java.lang.reflect.Constructor;
031import java.lang.reflect.Field;
032import java.lang.reflect.Method;
033import java.lang.reflect.Type;
034import java.net.URL;
035import java.net.URLDecoder;
036import java.security.MessageDigest;
037import java.security.NoSuchAlgorithmException;
038import java.util.ArrayList;
039import java.util.Arrays;
040import java.util.Collection;
041import java.util.Collections;
042import java.util.Enumeration;
043import java.util.HashMap;
044import java.util.HashSet;
045import java.util.IdentityHashMap;
046import java.util.Iterator;
047import java.util.LinkedHashSet;
048import java.util.List;
049import java.util.Map;
050import java.util.Set;
051import java.util.TreeSet;
052import java.util.UUID;
053import java.util.concurrent.Callable;
054import java.util.concurrent.ExecutionException;
055import java.util.concurrent.ExecutorService;
056import java.util.concurrent.Executors;
057import java.util.concurrent.Future;
058import java.util.concurrent.TimeUnit;
059import java.util.concurrent.TimeoutException;
060import java.util.regex.Pattern;
061
062import cascading.CascadingException;
063import cascading.flow.FlowException;
064import cascading.tap.MultiSourceTap;
065import cascading.tap.Tap;
066import cascading.tuple.coerce.Coercions;
067import cascading.util.jgrapht.ComponentAttributeProvider;
068import cascading.util.jgrapht.DOTExporter;
069import cascading.util.jgrapht.EdgeNameProvider;
070import cascading.util.jgrapht.IntegerNameProvider;
071import cascading.util.jgrapht.VertexNameProvider;
072import org.jgrapht.DirectedGraph;
073import org.jgrapht.graph.SimpleDirectedGraph;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077/** Class Util provides reusable operations. */
078public class Util
079  {
080  /**
081   * On OS X only, and if the graphviz dot binary is installed, when true, dot will be invoked to convert the dot file
082   * to a pdf document.
083   */
084  public static final String CONVERT_DOT_TO_PDF = "util.dot.to.pdf.enabled";
085  public static int ID_LENGTH = 32;
086
087  private static final Logger LOG = LoggerFactory.getLogger( Util.class );
088  private static final String HEXES = "0123456789ABCDEF";
089
090  public static final boolean IS_OSX = System.getProperty( "os.name" ).toLowerCase().contains( "Mac OS X".toLowerCase() );
091  public static final boolean HAS_DOT_EXEC = IS_OSX && Boolean.getBoolean( CONVERT_DOT_TO_PDF ) && hasDOT();
092
093  public static <K, V> HashMap<K, V> createHashMap()
094    {
095    return new HashMap<K, V>();
096    }
097
098  public static <K, V> boolean reverseMap( Map<V, K> from, Map<K, V> to )
099    {
100    boolean dupes = false;
101
102    for( Map.Entry<V, K> entry : from.entrySet() )
103      dupes |= to.put( entry.getValue(), entry.getKey() ) != null;
104
105    return dupes;
106    }
107
108  public static <V> Set<V> createIdentitySet()
109    {
110    return Collections.<V>newSetFromMap( new IdentityHashMap() );
111    }
112
113  public static <V> Set<V> createIdentitySet( Collection<V> collection )
114    {
115    Set<V> identitySet = createIdentitySet();
116
117    if( collection != null )
118      identitySet.addAll( collection );
119
120    return identitySet;
121    }
122
123  public static <V> V getFirst( Collection<V> collection )
124    {
125    if( collection == null || collection.isEmpty() )
126      return null;
127
128    return collection.iterator().next();
129    }
130
131  public static <V> V getFirst( Iterator<V> iterator )
132    {
133    if( iterator == null || !iterator.hasNext() )
134      return null;
135
136    return iterator.next();
137    }
138
139  public static <V> V getLast( Iterator<V> iterator )
140    {
141    if( iterator == null || !iterator.hasNext() )
142      return null;
143
144    V v = iterator.next();
145
146    while( iterator.hasNext() )
147      v = iterator.next();
148
149    return v;
150    }
151
152  public static <N extends Number> N max( Collection<N> collection )
153    {
154    return new TreeSet<>( collection ).first();
155    }
156
157  public static <N extends Number> N min( Collection<N> collection )
158    {
159    return new TreeSet<>( collection ).last();
160    }
161
162  public static <T> Set<T> narrowSet( Class<T> type, Collection collection )
163    {
164    return narrowSet( type, collection.iterator() );
165    }
166
167  public static <T> Set<T> narrowIdentitySet( Class<T> type, Collection collection )
168    {
169    return narrowIdentitySet( type, collection.iterator() );
170    }
171
172  public static <T> Set<T> narrowSet( Class<T> type, Collection collection, boolean include )
173    {
174    return narrowSet( type, collection.iterator(), include );
175    }
176
177  public static <T> Set<T> narrowIdentitySet( Class<T> type, Collection collection, boolean include )
178    {
179    return narrowIdentitySet( type, collection.iterator(), include );
180    }
181
182  public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator )
183    {
184    return narrowSet( type, iterator, true );
185    }
186
187  public static <T> Set<T> narrowIdentitySet( Class<T> type, Iterator iterator )
188    {
189    return narrowIdentitySet( type, iterator, true );
190    }
191
192  public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator, boolean include )
193    {
194    return narrowSetInternal( type, iterator, include, new HashSet<T>() );
195    }
196
197  public static <T> Set<T> narrowIdentitySet( Class<T> type, Iterator iterator, boolean include )
198    {
199    return narrowSetInternal( type, iterator, include, Util.<T>createIdentitySet() );
200    }
201
202  private static <T> Set<T> narrowSetInternal( Class<T> type, Iterator iterator, boolean include, Set<T> set )
203    {
204    while( iterator.hasNext() )
205      {
206      Object o = iterator.next();
207
208      if( type.isInstance( o ) == include )
209        set.add( (T) o );
210      }
211
212    return set;
213    }
214
215  public static <T> boolean contains( Class<T> type, Collection collection )
216    {
217    return contains( type, collection.iterator() );
218    }
219
220  public static <T> boolean contains( Class<T> type, Iterator iterator )
221    {
222    while( iterator.hasNext() )
223      {
224      Object o = iterator.next();
225
226      if( type.isInstance( o ) )
227        return true;
228      }
229
230    return false;
231    }
232
233  public static <T> Set<T> differenceIdentity( Set<T> lhs, Set<T> rhs )
234    {
235    Set<T> diff = createIdentitySet( lhs );
236
237    diff.removeAll( rhs );
238
239    return diff;
240    }
241
242  public static synchronized String createUniqueIDWhichStartsWithAChar()
243    {
244    String value;
245
246    do
247      {
248      value = createUniqueID();
249      }
250    while( Character.isDigit( value.charAt( 0 ) ) );
251
252    return value;
253    }
254
255  public static synchronized String createUniqueID()
256    {
257    // creates a cryptographically secure random value
258    String value = UUID.randomUUID().toString();
259    return value.toUpperCase().replaceAll( "-", "" );
260    }
261
262  public static String createID( String rawID )
263    {
264    return createID( rawID.getBytes() );
265    }
266
267  /**
268   * Method CreateID returns a HEX hash of the given bytes with length 32 characters long.
269   *
270   * @param bytes the bytes
271   * @return string
272   */
273  public static String createID( byte[] bytes )
274    {
275    try
276      {
277      return getHex( MessageDigest.getInstance( "MD5" ).digest( bytes ) );
278      }
279    catch( NoSuchAlgorithmException exception )
280      {
281      throw new RuntimeException( "unable to digest string" );
282      }
283    }
284
285  public static String getHex( byte[] bytes )
286    {
287    if( bytes == null )
288      return null;
289
290    final StringBuilder hex = new StringBuilder( 2 * bytes.length );
291
292    for( final byte b : bytes )
293      hex.append( HEXES.charAt( ( b & 0xF0 ) >> 4 ) ).append( HEXES.charAt( b & 0x0F ) );
294
295    return hex.toString();
296    }
297
298  public static byte[] longToByteArray( long value )
299    {
300    return new byte[]{
301      (byte) ( value >> 56 ),
302      (byte) ( value >> 48 ),
303      (byte) ( value >> 40 ),
304      (byte) ( value >> 32 ),
305      (byte) ( value >> 24 ),
306      (byte) ( value >> 16 ),
307      (byte) ( value >> 8 ),
308      (byte) value
309    };
310    }
311
312  public static byte[] intToByteArray( int value )
313    {
314    return new byte[]{
315      (byte) ( value >> 24 ),
316      (byte) ( value >> 16 ),
317      (byte) ( value >> 8 ),
318      (byte) value
319    };
320    }
321
322  public static <T> T[] copy( T[] source )
323    {
324    if( source == null )
325      return null;
326
327    return Arrays.copyOf( source, source.length );
328    }
329
330  public static String unique( String value, String delim )
331    {
332    String[] split = value.split( delim );
333
334    Set<String> values = new LinkedHashSet<String>();
335
336    Collections.addAll( values, split );
337
338    return join( values, delim );
339    }
340
341  /**
342   * This method joins the values in the given list with the delim String value.
343   *
344   * @param list
345   * @param delim
346   * @return String
347   */
348  public static String join( int[] list, String delim )
349    {
350    return join( list, delim, false );
351    }
352
353  public static String join( int[] list, String delim, boolean printNull )
354    {
355    StringBuffer buffer = new StringBuffer();
356    int count = 0;
357
358    for( Object s : list )
359      {
360      if( count != 0 )
361        buffer.append( delim );
362
363      if( printNull || s != null )
364        buffer.append( s );
365
366      count++;
367      }
368
369    return buffer.toString();
370    }
371
372  public static String join( String delim, String... strings )
373    {
374    return join( delim, false, strings );
375    }
376
377  public static String join( String delim, boolean printNull, String... strings )
378    {
379    return join( strings, delim, printNull );
380    }
381
382  /**
383   * This method joins the values in the given list with the delim String value.
384   *
385   * @param list
386   * @param delim
387   * @return a String
388   */
389  public static String join( Object[] list, String delim )
390    {
391    return join( list, delim, false );
392    }
393
394  public static String join( Object[] list, String delim, boolean printNull )
395    {
396    return join( list, delim, printNull, 0 );
397    }
398
399  public static String join( Object[] list, String delim, boolean printNull, int beginAt )
400    {
401    return join( list, delim, printNull, beginAt, list.length - beginAt );
402    }
403
404  public static String join( Object[] list, String delim, boolean printNull, int beginAt, int length )
405    {
406    StringBuffer buffer = new StringBuffer();
407    int count = 0;
408
409    for( int i = beginAt; i < beginAt + length; i++ )
410      {
411      Object s = list[ i ];
412      if( count != 0 )
413        buffer.append( delim );
414
415      if( printNull || s != null )
416        buffer.append( s );
417
418      count++;
419      }
420
421    return buffer.toString();
422    }
423
424  public static String join( Iterable iterable, String delim, boolean printNull )
425    {
426    int count = 0;
427
428    StringBuilder buffer = new StringBuilder();
429
430    for( Object s : iterable )
431      {
432      if( count != 0 )
433        buffer.append( delim );
434
435      if( printNull || s != null )
436        buffer.append( s );
437
438      count++;
439      }
440
441    return buffer.toString();
442    }
443
444  /**
445   * This method joins each value in the collection with a tab character as the delimiter.
446   *
447   * @param collection
448   * @return a String
449   */
450  public static String join( Collection collection )
451    {
452    return join( collection, "\t" );
453    }
454
455  /**
456   * This method joins each valuein the collection with the given delimiter.
457   *
458   * @param collection
459   * @param delim
460   * @return a String
461   */
462  public static String join( Collection collection, String delim )
463    {
464    return join( collection, delim, false );
465    }
466
467  public static String join( Collection collection, String delim, boolean printNull )
468    {
469    StringBuffer buffer = new StringBuffer();
470
471    join( buffer, collection, delim, printNull );
472
473    return buffer.toString();
474    }
475
476  /**
477   * This method joins each value in the collection with the given delimiter. All results are appended to the
478   * given {@link StringBuffer} instance.
479   *
480   * @param buffer
481   * @param collection
482   * @param delim
483   */
484  public static void join( StringBuffer buffer, Collection collection, String delim )
485    {
486    join( buffer, collection, delim, false );
487    }
488
489  public static void join( StringBuffer buffer, Collection collection, String delim, boolean printNull )
490    {
491    int count = 0;
492
493    for( Object s : collection )
494      {
495      if( count != 0 )
496        buffer.append( delim );
497
498      if( printNull || s != null )
499        buffer.append( s );
500
501      count++;
502      }
503    }
504
505  public static <T> List<T> split( Class<T> type, String values )
506    {
507    return split( type, ",", values );
508    }
509
510  public static <T> List<T> split( Class<T> type, String delim, String values )
511    {
512    List<T> results = new ArrayList<>();
513
514    if( values == null )
515      return results;
516
517    String[] split = values.split( delim );
518
519    for( String value : split )
520      results.add( Coercions.<T>coerce( value, type ) );
521
522    return results;
523    }
524
525  public static String[] removeNulls( String... strings )
526    {
527    List<String> list = new ArrayList<String>();
528
529    for( String string : strings )
530      {
531      if( string != null )
532        list.add( string );
533      }
534
535    return list.toArray( new String[ list.size() ] );
536    }
537
538  public static Collection<String> quote( Collection<String> collection, String quote )
539    {
540    List<String> list = new ArrayList<String>();
541
542    for( String string : collection )
543      list.add( quote + string + quote );
544
545    return list;
546    }
547
548  public static String print( Collection collection, String delim )
549    {
550    StringBuffer buffer = new StringBuffer();
551
552    print( buffer, collection, delim );
553
554    return buffer.toString();
555    }
556
557  public static void print( StringBuffer buffer, Collection collection, String delim )
558    {
559    int count = 0;
560
561    for( Object s : collection )
562      {
563      if( count != 0 )
564        buffer.append( delim );
565
566      buffer.append( "[" );
567      buffer.append( s );
568      buffer.append( "]" );
569
570      count++;
571      }
572    }
573
574  /**
575   * This method attempts to remove any username and password from the given url String.
576   *
577   * @param url
578   * @return a String
579   */
580  public static String sanitizeUrl( String url )
581    {
582    if( url == null )
583      return null;
584
585    return url.replaceAll( "(?<=//).*:.*@", "" );
586    }
587
588  /**
589   * This method attempts to remove duplicate consecutive forward slashes from the given url.
590   *
591   * @param url
592   * @return a String
593   */
594  public static String normalizeUrl( String url )
595    {
596    if( url == null )
597      return null;
598
599    return url.replaceAll( "([^:]/)/{2,}", "$1/" );
600    }
601
602  /**
603   * This method returns the {@link Object#toString()} of the given object, or an empty String if the object
604   * is null.
605   *
606   * @param object
607   * @return a String
608   */
609  public static String toNull( Object object )
610    {
611    if( object == null )
612      return "";
613
614    return object.toString();
615    }
616
617  /**
618   * This method truncates the given String value to the given size, but appends an ellipse ("...") if the
619   * String is larger than maxSize.
620   *
621   * @param string
622   * @param maxSize
623   * @return a String
624   */
625  public static String truncate( String string, int maxSize )
626    {
627    string = toNull( string );
628
629    if( string.length() <= maxSize )
630      return string;
631
632    return String.format( "%s...", string.subSequence( 0, maxSize - 3 ) );
633    }
634
635  public static void printGraph( String filename, SimpleDirectedGraph graph )
636    {
637    try
638      {
639      new File( filename ).getParentFile().mkdirs();
640      Writer writer = new FileWriter( filename );
641
642      try
643        {
644        printGraph( writer, graph );
645        }
646      finally
647        {
648        writer.close();
649        }
650      }
651    catch( IOException exception )
652      {
653      LOG.error( "failed printing graph to {}, with exception: {}", filename, exception );
654      }
655    }
656
657  @SuppressWarnings({"unchecked"})
658  private static void printGraph( Writer writer, SimpleDirectedGraph graph )
659    {
660    DOTExporter dot = new DOTExporter( new IntegerNameProvider(), new VertexNameProvider()
661      {
662      public String getVertexName( Object object )
663        {
664        if( object == null )
665          return "none";
666
667        return object.toString().replaceAll( "\"", "\'" );
668        }
669      }, new EdgeNameProvider<Object>()
670      {
671      public String getEdgeName( Object object )
672        {
673        if( object == null )
674          return "none";
675
676        return object.toString().replaceAll( "\"", "\'" );
677        }
678      }
679    );
680
681    dot.export( writer, graph );
682    }
683
684  /**
685   * This method removes all nulls from the given List.
686   *
687   * @param list
688   */
689  @SuppressWarnings({"StatementWithEmptyBody"})
690  public static void removeAllNulls( List list )
691    {
692    while( list.remove( null ) )
693      ;
694    }
695
696  public static void writeDOT( Writer writer, DirectedGraph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider )
697    {
698    new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider ).export( writer, graph );
699    }
700
701  public static void writeDOT( Writer writer, DirectedGraph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider,
702                               ComponentAttributeProvider vertexAttributeProvider, ComponentAttributeProvider edgeAttributeProvider )
703    {
704    new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider, vertexAttributeProvider, edgeAttributeProvider ).export( writer, graph );
705    }
706
707  public static boolean isEmpty( String string )
708    {
709    return string == null || string.isEmpty();
710    }
711
712  private static String[] findSplitName( String path )
713    {
714    String separator = "/";
715
716    if( path.lastIndexOf( "/" ) < path.lastIndexOf( "\\" ) )
717      separator = "\\\\";
718
719    String[] split = path.split( separator );
720
721    path = split[ split.length - 1 ];
722
723    path = path.substring( 0, path.lastIndexOf( '.' ) ); // remove .jar
724
725    return path.split( "-(?=\\d)", 2 );
726    }
727
728  public static String findVersion( String path )
729    {
730    if( path == null || path.isEmpty() )
731      return null;
732
733    String[] split = findSplitName( path );
734
735    if( split.length == 2 )
736      return split[ 1 ];
737
738    return null;
739    }
740
741  public static String findName( String path )
742    {
743    if( path == null || path.isEmpty() )
744      return null;
745
746    String[] split = findSplitName( path );
747
748    if( split.length == 0 )
749      return null;
750
751    return split[ 0 ];
752    }
753
754  public static long getSourceModified( Object confCopy, Iterator<Tap> values, long sinkModified ) throws IOException
755    {
756    long sourceModified = 0;
757
758    while( values.hasNext() )
759      {
760      Tap source = values.next();
761
762      if( source instanceof MultiSourceTap )
763        return getSourceModified( confCopy, ( (MultiSourceTap) source ).getChildTaps(), sinkModified );
764
765      sourceModified = source.getModifiedTime( confCopy );
766
767      // source modified returns zero if does not exist
768      // this should minimize number of times we touch any file meta-data server
769      if( sourceModified == 0 && !source.resourceExists( confCopy ) )
770        throw new FlowException( "source does not exist: " + source );
771
772      if( sinkModified < sourceModified )
773        return sourceModified;
774      }
775
776    return sourceModified;
777    }
778
779  public static long getSinkModified( Object config, Collection<Tap> sinks ) throws IOException
780    {
781    long sinkModified = Long.MAX_VALUE;
782
783    for( Tap sink : sinks )
784      {
785      if( sink.isReplace() || sink.isUpdate() )
786        sinkModified = -1L;
787      else
788        {
789        if( !sink.resourceExists( config ) )
790          sinkModified = 0L;
791        else
792          sinkModified = Math.min( sinkModified, sink.getModifiedTime( config ) ); // return youngest mod date
793        }
794      }
795    return sinkModified;
796    }
797
798  public static String getTypeName( Type type )
799    {
800    if( type == null )
801      return null;
802
803    return type instanceof Class ? ( (Class) type ).getCanonicalName() : type.toString();
804    }
805
806  public static String getSimpleTypeName( Type type )
807    {
808    if( type == null )
809      return null;
810
811    return type instanceof Class ? ( (Class) type ).getSimpleName() : type.toString();
812    }
813
814  public static String[] typeNames( Type[] types )
815    {
816    String[] names = new String[ types.length ];
817
818    for( int i = 0; i < types.length; i++ )
819      names[ i ] = getTypeName( types[ i ] );
820
821    return names;
822    }
823
824  public static String[] simpleTypeNames( Type[] types )
825    {
826    String[] names = new String[ types.length ];
827
828    for( int i = 0; i < types.length; i++ )
829      names[ i ] = getSimpleTypeName( types[ i ] );
830
831    return names;
832    }
833
834  public static boolean containsNull( Object[] values )
835    {
836    for( Object value : values )
837      {
838      if( value == null )
839        return true;
840      }
841
842    return false;
843    }
844
845  public static void safeSleep( long durationMillis )
846    {
847    try
848      {
849      Thread.sleep( durationMillis );
850      }
851    catch( InterruptedException exception )
852      {
853      // do nothing
854      }
855    }
856
857  public static void writePDF( String path )
858    {
859    if( !HAS_DOT_EXEC )
860      return;
861
862    // dot *.dot -Tpdf -O -Nshape=box
863    File file = new File( path );
864    execProcess( file.getParentFile(), "dot", file.getName(), "-Tpdf", "-O" );
865    }
866
867  static boolean hasDOT()
868    {
869    return execProcess( null, "which", "dot" ) == 0;
870    }
871
872  public static int execProcess( File parentFile, String... command )
873    {
874    try
875      {
876      String commandLine = join( command, " " );
877
878      LOG.debug( "command: {}", commandLine );
879
880      Process process = Runtime.getRuntime().exec( commandLine, null, parentFile );
881
882      int result = process.waitFor();
883
884      BufferedReader reader = new BufferedReader( new InputStreamReader( process.getInputStream() ) );
885
886      String line = reader.readLine();
887
888      while( line != null )
889        {
890        LOG.warn( "{} stdout returned: {}", command[ 0 ], line );
891        line = reader.readLine();
892        }
893
894      reader = new BufferedReader( new InputStreamReader( process.getErrorStream() ) );
895
896      line = reader.readLine();
897
898      while( line != null )
899        {
900        LOG.warn( "{} stderr returned: {}", command[ 0 ], line );
901        line = reader.readLine();
902        }
903
904      return result;
905      }
906    catch( IOException exception )
907      {
908      LOG.warn( "unable to exec " + command[ 0 ], exception );
909      }
910    catch( InterruptedException exception )
911      {
912      LOG.warn( "interrupted exec " + command[ 0 ], exception );
913      }
914
915    return Integer.MIN_VALUE;
916    }
917
918  public static String formatDurationFromMillis( long duration )
919    {
920    if( duration / 1000 / 60 / 60 / 24 > 0.0 )
921      return formatDurationDHMSms( duration );
922    if( duration / 1000 / 60 / 60 > 0.0 )
923      return formatDurationHMSms( duration );
924    else
925      return formatDurationMSms( duration );
926    }
927
928  public static String formatDurationMSms( long duration )
929    {
930    long ms = duration % 1000;
931    long durationSeconds = duration / 1000;
932    long seconds = durationSeconds % 60;
933    long minutes = durationSeconds / 60;
934
935    return String.format( "%02d:%02d.%03d", minutes, seconds, ms );
936    }
937
938  public static String formatDurationHMSms( long duration )
939    {
940    long ms = duration % 1000;
941    long durationSeconds = duration / 1000;
942    long seconds = durationSeconds % 60;
943    long minutes = ( durationSeconds / 60 ) % 60;
944    long hours = durationSeconds / 60 / 60;
945
946    return String.format( "%02d:%02d:%02d.%03d", hours, minutes, seconds, ms );
947    }
948
949  public static String formatDurationDHMSms( long duration )
950    {
951    long ms = duration % 1000;
952    long durationSeconds = duration / 1000;
953    long seconds = durationSeconds % 60;
954    long minutes = ( durationSeconds / 60 ) % 60;
955    long hours = ( durationSeconds / 60 / 60 ) % 24;
956    long days = durationSeconds / 60 / 60 / 24;
957
958    return String.format( "%02d:%02d:%02d:%02d.%03d", days, hours, minutes, seconds, ms );
959    }
960
961  /**
962   * Converts a given comma separated String of Exception names into a List of classes.
963   * ClassNotFound exceptions are ignored if no warningMessage is given, otherwise logged as a warning.
964   *
965   * @param classNames A comma separated String of Exception names.
966   * @return List of Exception classes.
967   */
968  public static Set<Class<? extends Exception>> asClasses( String classNames, String warningMessage )
969    {
970    Set<Class<? extends Exception>> exceptionClasses = new HashSet<Class<? extends Exception>>();
971    String[] split = classNames.split( "," );
972
973    // possibly user provided type, load from context
974    ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
975
976    for( String className : split )
977      {
978      if( className != null )
979        className = className.trim();
980
981      if( isEmpty( className ) )
982        continue;
983
984      try
985        {
986        Class<? extends Exception> exceptionClass = contextClassLoader.loadClass( className ).asSubclass( Exception.class );
987
988        exceptionClasses.add( exceptionClass );
989        }
990      catch( ClassNotFoundException exception )
991        {
992        if( !Util.isEmpty( warningMessage ) )
993          LOG.warn( "{}: {}", warningMessage, className );
994        }
995      }
996
997    return exceptionClasses;
998    }
999
1000  public static Boolean submitWithTimeout( Callable<Boolean> task, int timeout, TimeUnit timeUnit ) throws Exception
1001    {
1002    ExecutorService executor = Executors.newFixedThreadPool( 1 );
1003
1004    Future<Boolean> future = executor.submit( task );
1005
1006    executor.shutdown();
1007
1008    try
1009      {
1010      return future.get( timeout, timeUnit );
1011      }
1012    catch( TimeoutException exception )
1013      {
1014      future.cancel( true );
1015      }
1016    catch( ExecutionException exception )
1017      {
1018      Throwable cause = exception.getCause();
1019
1020      if( cause instanceof RuntimeException )
1021        throw (RuntimeException) cause;
1022
1023      throw (Exception) cause;
1024      }
1025
1026    return null;
1027    }
1028
1029  public interface RetryOperator<T>
1030    {
1031    T operate() throws Exception;
1032
1033    boolean rethrow( Exception exception );
1034    }
1035
1036  public static <T> T retry( Logger logger, int retries, int secondsDelay, String message, RetryOperator<T> operator ) throws Exception
1037    {
1038    Exception saved = null;
1039
1040    for( int i = 0; i < retries; i++ )
1041      {
1042      try
1043        {
1044        return operator.operate();
1045        }
1046      catch( Exception exception )
1047        {
1048        if( operator.rethrow( exception ) )
1049          {
1050          logger.warn( message + ", but not retrying", exception );
1051
1052          throw exception;
1053          }
1054
1055        saved = exception;
1056
1057        logger.warn( message + ", attempt: " + ( i + 1 ), exception );
1058
1059        try
1060          {
1061          Thread.sleep( secondsDelay * 1000 );
1062          }
1063        catch( InterruptedException exception1 )
1064          {
1065          // do nothing
1066          }
1067        }
1068      }
1069
1070    logger.warn( message + ", done retrying after attempts: " + retries, saved );
1071
1072    throw saved;
1073    }
1074
1075  public static Object createProtectedObject( Class type, Object[] parameters, Class[] parameterTypes )
1076    {
1077    try
1078      {
1079      Constructor constructor = type.getDeclaredConstructor( parameterTypes );
1080
1081      constructor.setAccessible( true );
1082
1083      return constructor.newInstance( parameters );
1084      }
1085    catch( Exception exception )
1086      {
1087      LOG.error( "unable to instantiate type: {}, with exception: {}", type.getName(), exception );
1088
1089      throw new FlowException( "unable to instantiate type: " + type.getName(), exception );
1090      }
1091    }
1092
1093  public static boolean hasClass( String typeString )
1094    {
1095    try
1096      {
1097      Util.class.getClassLoader().loadClass( typeString );
1098
1099      return true;
1100      }
1101    catch( ClassNotFoundException exception )
1102      {
1103      return false;
1104      }
1105    }
1106
1107  public static <T> T newInstance( String className, Object... parameters )
1108    {
1109    try
1110      {
1111      Class<T> type = (Class<T>) Util.class.getClassLoader().loadClass( className );
1112
1113      return newInstance( type, parameters );
1114      }
1115    catch( ClassNotFoundException exception )
1116      {
1117      throw new CascadingException( "unable to load class: " + className, exception );
1118      }
1119    }
1120
1121  public static <T> T newInstance( Class<T> target, Object... parameters )
1122    {
1123    // using Expression makes sure that constructors using sub-types properly work, otherwise we get a
1124    // NoSuchMethodException.
1125    Expression expr = new Expression( target, "new", parameters );
1126
1127    try
1128      {
1129      return (T) expr.getValue();
1130      }
1131    catch( Exception exception )
1132      {
1133      throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception );
1134      }
1135    }
1136
1137  public static Object invokeStaticMethod( String typeString, String methodName, Object[] parameters, Class[] parameterTypes )
1138    {
1139    Class type = loadClass( typeString );
1140
1141    return invokeStaticMethod( type, methodName, parameters, parameterTypes );
1142    }
1143
1144  public static Class<?> loadClass( String typeString )
1145    {
1146    try
1147      {
1148      return Thread.currentThread().getContextClassLoader().loadClass( typeString );
1149      }
1150    catch( ClassNotFoundException exception )
1151      {
1152      throw new CascadingException( "unable to load class: " + typeString, exception );
1153      }
1154    }
1155
1156  public static Class<?> loadClassSafe( String typeString )
1157    {
1158    try
1159      {
1160      return Thread.currentThread().getContextClassLoader().loadClass( typeString );
1161      }
1162    catch( ClassNotFoundException exception )
1163      {
1164      return null;
1165      }
1166    }
1167
1168  public static Object invokeStaticMethod( Class type, String methodName, Object[] parameters, Class[] parameterTypes )
1169    {
1170    try
1171      {
1172      Method method = type.getDeclaredMethod( methodName, parameterTypes );
1173
1174      method.setAccessible( true );
1175
1176      return method.invoke( null, parameters );
1177      }
1178    catch( Exception exception )
1179      {
1180      throw new CascadingException( "unable to invoke static method: " + type.getName() + "." + methodName, exception );
1181      }
1182    }
1183
1184  public static boolean hasInstanceMethod( Object target, String methodName, Class[] parameterTypes )
1185    {
1186    try
1187      {
1188      return target.getClass().getMethod( methodName, parameterTypes ) != null;
1189      }
1190    catch( NoSuchMethodException exception )
1191      {
1192      return false;
1193      }
1194    }
1195
1196  public static Object invokeInstanceMethodSafe( Object target, String methodName, Object[] parameters, Class[] parameterTypes )
1197    {
1198    try
1199      {
1200      return invokeInstanceMethod( target, methodName, parameters, parameterTypes );
1201      }
1202    catch( Exception exception )
1203      {
1204      return null;
1205      }
1206    }
1207
1208  public static Object invokeInstanceMethod( Object target, String methodName, Object[] parameters, Class[] parameterTypes )
1209    {
1210    try
1211      {
1212      Method method = target.getClass().getMethod( methodName, parameterTypes );
1213
1214      method.setAccessible( true );
1215
1216      return method.invoke( target, parameters );
1217      }
1218    catch( Exception exception )
1219      {
1220      throw new CascadingException( "unable to invoke instance method: " + target.getClass().getName() + "." + methodName, exception );
1221      }
1222    }
1223
1224  public static <R> R returnInstanceFieldIfExistsSafe( Object target, String fieldName )
1225    {
1226    try
1227      {
1228      return returnInstanceFieldIfExists( target, fieldName );
1229      }
1230    catch( Exception exception )
1231      {
1232      // do nothing
1233      return null;
1234      }
1235    }
1236
1237  public static Object invokeConstructor( String className, Object[] parameters, Class[] parameterTypes )
1238    {
1239    try
1240      {
1241      Class type = Util.class.getClassLoader().loadClass( className );
1242
1243      return invokeConstructor( type, parameters, parameterTypes );
1244      }
1245    catch( ClassNotFoundException exception )
1246      {
1247      throw new CascadingException( "unable to load class: " + className, exception );
1248      }
1249    }
1250
1251  public static <T> T invokeConstructor( Class<T> target, Object[] parameters, Class[] parameterTypes )
1252    {
1253    try
1254      {
1255      Constructor<T> constructor = target.getConstructor( parameterTypes );
1256
1257      constructor.setAccessible( true );
1258
1259      return constructor.newInstance( parameters );
1260      }
1261    catch( Exception exception )
1262      {
1263      throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception );
1264      }
1265    }
1266
1267  public static <R> R returnInstanceFieldIfExists( Object target, String fieldName )
1268    {
1269    try
1270      {
1271      Class<?> type = target.getClass();
1272      Field field = getDeclaredField( fieldName, type );
1273
1274      field.setAccessible( true );
1275
1276      return (R) field.get( target );
1277      }
1278    catch( Exception exception )
1279      {
1280      throw new CascadingException( "unable to get instance field: " + target.getClass().getName() + "." + fieldName, exception );
1281      }
1282    }
1283
1284  public static <R> boolean setInstanceFieldIfExistsSafe( Object target, String fieldName, R value )
1285    {
1286    try
1287      {
1288      setInstanceFieldIfExists( target, fieldName, value );
1289      }
1290    catch( Exception exception )
1291      {
1292      return false;
1293      }
1294
1295    return true;
1296    }
1297
1298  public static <R> void setInstanceFieldIfExists( Object target, String fieldName, R value )
1299    {
1300    try
1301      {
1302      Class<?> type = target.getClass();
1303      Field field = getDeclaredField( fieldName, type );
1304
1305      field.setAccessible( true );
1306
1307      field.set( target, value );
1308      }
1309    catch( Exception exception )
1310      {
1311      throw new CascadingException( "unable to set instance field: " + target.getClass().getName() + "." + fieldName, exception );
1312      }
1313    }
1314
1315  private static Field getDeclaredField( String fieldName, Class<?> type )
1316    {
1317    if( type == Object.class )
1318      {
1319      if( LOG.isDebugEnabled() )
1320        LOG.debug( "did not find {} field on {}", fieldName, type.getName() );
1321
1322      return null;
1323      }
1324
1325    try
1326      {
1327      return type.getDeclaredField( fieldName );
1328      }
1329    catch( NoSuchFieldException exception )
1330      {
1331      return getDeclaredField( fieldName, type.getSuperclass() );
1332      }
1333    }
1334
1335  public static String makePath( String prefix, String name )
1336    {
1337    if( name == null || name.isEmpty() )
1338      throw new IllegalArgumentException( "name may not be null or empty " );
1339
1340    if( prefix == null || prefix.isEmpty() )
1341      prefix = Long.toString( (long) ( Math.random() * 10000000000L ) );
1342
1343    name = cleansePathName( name.substring( 0, name.length() < 25 ? name.length() : 25 ) );
1344
1345    return prefix + "/" + name + "/";
1346    }
1347
1348  public static String cleansePathName( String name )
1349    {
1350    return name.replaceAll( "\\s+|\\*|\\+|/+", "_" );
1351    }
1352
1353  public static Class findMainClass( Class defaultType, String packageExclude )
1354    {
1355    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
1356
1357    for( StackTraceElement stackTraceElement : stackTrace )
1358      {
1359      if( stackTraceElement.getMethodName().equals( "main" ) && !stackTraceElement.getClassName().startsWith( packageExclude ) )
1360        {
1361        try
1362          {
1363          LOG.info( "resolving application jar from found main method on: {}", stackTraceElement.getClassName() );
1364
1365          return Thread.currentThread().getContextClassLoader().loadClass( stackTraceElement.getClassName() );
1366          }
1367        catch( ClassNotFoundException exception )
1368          {
1369          LOG.warn( "unable to load class while discovering application jar: {}", stackTraceElement.getClassName(), exception );
1370          }
1371        }
1372      }
1373
1374    LOG.info( "using default application jar, may cause class not found exceptions on the cluster" );
1375
1376    return defaultType;
1377    }
1378
1379  public static String findContainingJar( Class<?> type )
1380    {
1381    ClassLoader classLoader = type.getClassLoader();
1382
1383    String classFile = type.getName().replaceAll( "\\.", "/" ) + ".class";
1384
1385    try
1386      {
1387      for( Enumeration<URL> iterator = classLoader.getResources( classFile ); iterator.hasMoreElements(); )
1388        {
1389        URL url = iterator.nextElement();
1390
1391        if( !"jar".equals( url.getProtocol() ) )
1392          continue;
1393
1394        String path = url.getPath();
1395
1396        if( path.startsWith( "file:" ) )
1397          path = path.substring( "file:".length() );
1398
1399        path = URLDecoder.decode( path, "UTF-8" );
1400
1401        return path.replaceAll( "!.*$", "" );
1402        }
1403      }
1404    catch( IOException exception )
1405      {
1406      throw new CascadingException( exception );
1407      }
1408
1409    return null;
1410    }
1411
1412  public static boolean containsWhitespace( String string )
1413    {
1414    return Pattern.compile( "\\s" ).matcher( string ).find();
1415    }
1416
1417  public static String parseHostname( String uri )
1418    {
1419    if( isEmpty( uri ) )
1420      return null;
1421
1422    String[] parts = uri.split( "://", 2 );
1423    String result;
1424
1425    // missing protocol
1426    result = parts[ parts.length - 1 ];
1427
1428    // user:pass@hostname:port/stuff
1429    parts = result.split( "/", 2 );
1430    result = parts[ 0 ];
1431
1432    // user:pass@hostname:port
1433    parts = result.split( "@", 2 );
1434    result = parts[ parts.length - 1 ];
1435
1436    // hostname:port
1437    parts = result.split( ":", 2 );
1438    result = parts[ 0 ];
1439
1440    return result;
1441    }
1442  }