001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.util;
023
024import java.beans.Expression;
025import java.io.BufferedReader;
026import java.io.File;
027import java.io.FileWriter;
028import java.io.IOException;
029import java.io.InputStreamReader;
030import java.io.Writer;
031import java.lang.reflect.Constructor;
032import java.lang.reflect.Field;
033import java.lang.reflect.Method;
034import java.lang.reflect.Type;
035import java.net.URL;
036import java.net.URLDecoder;
037import java.security.MessageDigest;
038import java.security.NoSuchAlgorithmException;
039import java.util.ArrayList;
040import java.util.Arrays;
041import java.util.Collection;
042import java.util.Collections;
043import java.util.Enumeration;
044import java.util.HashMap;
045import java.util.HashSet;
046import java.util.IdentityHashMap;
047import java.util.Iterator;
048import java.util.LinkedHashSet;
049import java.util.List;
050import java.util.Map;
051import java.util.Set;
052import java.util.TreeSet;
053import java.util.UUID;
054import java.util.concurrent.Callable;
055import java.util.concurrent.ExecutionException;
056import java.util.concurrent.ExecutorService;
057import java.util.concurrent.Executors;
058import java.util.concurrent.Future;
059import java.util.concurrent.TimeUnit;
060import java.util.concurrent.TimeoutException;
061import java.util.regex.Pattern;
062
063import cascading.CascadingException;
064import cascading.flow.FlowException;
065import cascading.tap.MultiSourceTap;
066import cascading.tap.Tap;
067import cascading.tuple.coerce.Coercions;
068import cascading.util.jgrapht.ComponentAttributeProvider;
069import cascading.util.jgrapht.DOTExporter;
070import cascading.util.jgrapht.EdgeNameProvider;
071import cascading.util.jgrapht.IntegerNameProvider;
072import cascading.util.jgrapht.VertexNameProvider;
073import org.jgrapht.DirectedGraph;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077/** Class Util provides reusable operations. */
078public class Util
079  {
080  /**
081   * On OS X only, and if the graphviz dot binary is installed, when true, dot will be invoked to convert the dot file
082   * to a pdf document.
083   */
084  public static final String CONVERT_DOT_TO_PDF = "util.dot.to.pdf.enabled";
085  public static int ID_LENGTH = 32;
086
087  private static final Logger LOG = LoggerFactory.getLogger( Util.class );
088  private static final String HEXES = "0123456789ABCDEF";
089
090  public static final boolean IS_OSX = System.getProperty( "os.name" ).toLowerCase().contains( "Mac OS X".toLowerCase() );
091  public static final boolean HAS_DOT_EXEC = IS_OSX && Boolean.getBoolean( CONVERT_DOT_TO_PDF ) && hasDOT();
092
093  public static <K, V> HashMap<K, V> createHashMap()
094    {
095    return new HashMap<K, V>();
096    }
097
098  public static <K, V> boolean reverseMap( Map<V, K> from, Map<K, V> to )
099    {
100    boolean dupes = false;
101
102    for( Map.Entry<V, K> entry : from.entrySet() )
103      dupes |= to.put( entry.getValue(), entry.getKey() ) != null;
104
105    return dupes;
106    }
107
108  public static <V> Set<V> createIdentitySet()
109    {
110    return Collections.<V>newSetFromMap( new IdentityHashMap() );
111    }
112
113  public static <V> Set<V> createIdentitySet( Collection<V> collection )
114    {
115    Set<V> identitySet = createIdentitySet();
116
117    if( collection != null )
118      identitySet.addAll( collection );
119
120    return identitySet;
121    }
122
123  public static <V> V getFirst( Collection<V> collection )
124    {
125    if( collection == null || collection.isEmpty() )
126      return null;
127
128    return collection.iterator().next();
129    }
130
131  public static <V> V getFirst( Iterator<V> iterator )
132    {
133    if( iterator == null || !iterator.hasNext() )
134      return null;
135
136    return iterator.next();
137    }
138
139  public static <V> V getLast( Iterator<V> iterator )
140    {
141    if( iterator == null || !iterator.hasNext() )
142      return null;
143
144    V v = iterator.next();
145
146    while( iterator.hasNext() )
147      v = iterator.next();
148
149    return v;
150    }
151
152  public static <T> List<T> asList( T t, T[] ts )
153    {
154    List<T> list = new ArrayList<>( 1 + ts.length );
155
156    list.add( t );
157    Collections.addAll( list, ts );
158
159    return list;
160    }
161
162  public static <N extends Number> N max( Collection<N> collection )
163    {
164    return new TreeSet<>( collection ).first();
165    }
166
167  public static <N extends Number> N min( Collection<N> collection )
168    {
169    return new TreeSet<>( collection ).last();
170    }
171
172  public static <T> Set<T> narrowSet( Class<T> type, Collection collection )
173    {
174    return narrowSet( type, collection.iterator() );
175    }
176
177  public static <T> Set<T> narrowIdentitySet( Class<T> type, Collection collection )
178    {
179    return narrowIdentitySet( type, collection.iterator() );
180    }
181
182  public static <T> Set<T> narrowSet( Class<T> type, Collection collection, boolean include )
183    {
184    return narrowSet( type, collection.iterator(), include );
185    }
186
187  public static <T> Set<T> narrowIdentitySet( Class<T> type, Collection collection, boolean include )
188    {
189    return narrowIdentitySet( type, collection.iterator(), include );
190    }
191
192  public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator )
193    {
194    return narrowSet( type, iterator, true );
195    }
196
197  public static <T> Set<T> narrowIdentitySet( Class<T> type, Iterator iterator )
198    {
199    return narrowIdentitySet( type, iterator, true );
200    }
201
202  public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator, boolean include )
203    {
204    return narrowSetInternal( type, iterator, include, new HashSet<T>() );
205    }
206
207  public static <T> Set<T> narrowIdentitySet( Class<T> type, Iterator iterator, boolean include )
208    {
209    return narrowSetInternal( type, iterator, include, Util.<T>createIdentitySet() );
210    }
211
212  private static <T> Set<T> narrowSetInternal( Class<T> type, Iterator iterator, boolean include, Set<T> set )
213    {
214    while( iterator.hasNext() )
215      {
216      Object o = iterator.next();
217
218      if( type.isInstance( o ) == include )
219        set.add( (T) o );
220      }
221
222    return set;
223    }
224
225  public static <T> boolean contains( Class<T> type, Collection collection )
226    {
227    return contains( type, collection.iterator() );
228    }
229
230  public static <T> boolean contains( Class<T> type, Iterator iterator )
231    {
232    while( iterator.hasNext() )
233      {
234      Object o = iterator.next();
235
236      if( type.isInstance( o ) )
237        return true;
238      }
239
240    return false;
241    }
242
243  public static <T> Set<T> differenceIdentity( Set<T> lhs, Set<T> rhs )
244    {
245    Set<T> diff = createIdentitySet( lhs );
246
247    diff.removeAll( rhs );
248
249    return diff;
250    }
251
252  public static synchronized String createUniqueIDWhichStartsWithAChar()
253    {
254    String value;
255
256    do
257      {
258      value = createUniqueID();
259      }
260    while( Character.isDigit( value.charAt( 0 ) ) );
261
262    return value;
263    }
264
265  public static synchronized String createUniqueID()
266    {
267    // creates a cryptographically secure random value
268    String value = UUID.randomUUID().toString();
269    return value.toUpperCase().replaceAll( "-", "" );
270    }
271
272  public static String createID( String rawID )
273    {
274    return createID( rawID.getBytes() );
275    }
276
277  /**
278   * Method CreateID returns a HEX hash of the given bytes with length 32 characters long.
279   *
280   * @param bytes the bytes
281   * @return string
282   */
283  public static String createID( byte[] bytes )
284    {
285    try
286      {
287      return getHex( MessageDigest.getInstance( "MD5" ).digest( bytes ) );
288      }
289    catch( NoSuchAlgorithmException exception )
290      {
291      throw new RuntimeException( "unable to digest string" );
292      }
293    }
294
295  public static String getHex( byte[] bytes )
296    {
297    if( bytes == null )
298      return null;
299
300    final StringBuilder hex = new StringBuilder( 2 * bytes.length );
301
302    for( final byte b : bytes )
303      hex.append( HEXES.charAt( ( b & 0xF0 ) >> 4 ) ).append( HEXES.charAt( b & 0x0F ) );
304
305    return hex.toString();
306    }
307
308  public static byte[] longToByteArray( long value )
309    {
310    return new byte[]{
311      (byte) ( value >> 56 ),
312      (byte) ( value >> 48 ),
313      (byte) ( value >> 40 ),
314      (byte) ( value >> 32 ),
315      (byte) ( value >> 24 ),
316      (byte) ( value >> 16 ),
317      (byte) ( value >> 8 ),
318      (byte) value
319    };
320    }
321
322  public static byte[] intToByteArray( int value )
323    {
324    return new byte[]{
325      (byte) ( value >> 24 ),
326      (byte) ( value >> 16 ),
327      (byte) ( value >> 8 ),
328      (byte) value
329    };
330    }
331
332  public static <T> T[] copy( T[] source )
333    {
334    if( source == null )
335      return null;
336
337    return Arrays.copyOf( source, source.length );
338    }
339
340  public static String unique( String value, String delim )
341    {
342    String[] split = value.split( delim );
343
344    Set<String> values = new LinkedHashSet<String>();
345
346    Collections.addAll( values, split );
347
348    return join( values, delim );
349    }
350
351  /**
352   * This method joins the values in the given list with the delim String value.
353   *
354   * @param list
355   * @param delim
356   * @return String
357   */
358  public static String join( int[] list, String delim )
359    {
360    return join( list, delim, false );
361    }
362
363  public static String join( int[] list, String delim, boolean printNull )
364    {
365    StringBuffer buffer = new StringBuffer();
366    int count = 0;
367
368    for( Object s : list )
369      {
370      if( count != 0 )
371        buffer.append( delim );
372
373      if( printNull || s != null )
374        buffer.append( s );
375
376      count++;
377      }
378
379    return buffer.toString();
380    }
381
382  public static String join( String delim, String... strings )
383    {
384    return join( delim, false, strings );
385    }
386
387  public static String join( String delim, boolean printNull, String... strings )
388    {
389    return join( strings, delim, printNull );
390    }
391
392  /**
393   * This method joins the values in the given list with the delim String value.
394   *
395   * @param list
396   * @param delim
397   * @return a String
398   */
399  public static String join( Object[] list, String delim )
400    {
401    return join( list, delim, false );
402    }
403
404  public static String join( Object[] list, String delim, boolean printNull )
405    {
406    return join( list, delim, printNull, 0 );
407    }
408
409  public static String join( Object[] list, String delim, boolean printNull, int beginAt )
410    {
411    return join( list, delim, printNull, beginAt, list.length - beginAt );
412    }
413
414  public static String join( Object[] list, String delim, boolean printNull, int beginAt, int length )
415    {
416    StringBuffer buffer = new StringBuffer();
417    int count = 0;
418
419    for( int i = beginAt; i < beginAt + length; i++ )
420      {
421      Object s = list[ i ];
422      if( count != 0 )
423        buffer.append( delim );
424
425      if( printNull || s != null )
426        buffer.append( s );
427
428      count++;
429      }
430
431    return buffer.toString();
432    }
433
434  public static String join( Iterable iterable, String delim, boolean printNull )
435    {
436    int count = 0;
437
438    StringBuilder buffer = new StringBuilder();
439
440    for( Object s : iterable )
441      {
442      if( count != 0 )
443        buffer.append( delim );
444
445      if( printNull || s != null )
446        buffer.append( s );
447
448      count++;
449      }
450
451    return buffer.toString();
452    }
453
454  /**
455   * This method joins each value in the collection with a tab character as the delimiter.
456   *
457   * @param collection
458   * @return a String
459   */
460  public static String join( Collection collection )
461    {
462    return join( collection, "\t" );
463    }
464
465  /**
466   * This method joins each valuein the collection with the given delimiter.
467   *
468   * @param collection
469   * @param delim
470   * @return a String
471   */
472  public static String join( Collection collection, String delim )
473    {
474    return join( collection, delim, false );
475    }
476
477  public static String join( Collection collection, String delim, boolean printNull )
478    {
479    StringBuffer buffer = new StringBuffer();
480
481    join( buffer, collection, delim, printNull );
482
483    return buffer.toString();
484    }
485
486  /**
487   * This method joins each value in the collection with the given delimiter. All results are appended to the
488   * given {@link StringBuffer} instance.
489   *
490   * @param buffer
491   * @param collection
492   * @param delim
493   */
494  public static void join( StringBuffer buffer, Collection collection, String delim )
495    {
496    join( buffer, collection, delim, false );
497    }
498
499  public static void join( StringBuffer buffer, Collection collection, String delim, boolean printNull )
500    {
501    int count = 0;
502
503    for( Object s : collection )
504      {
505      if( count != 0 )
506        buffer.append( delim );
507
508      if( printNull || s != null )
509        buffer.append( s );
510
511      count++;
512      }
513    }
514
515  public static <T> List<T> split( Class<T> type, String values )
516    {
517    return split( type, ",", values );
518    }
519
520  public static <T> List<T> split( Class<T> type, String delim, String values )
521    {
522    List<T> results = new ArrayList<>();
523
524    if( values == null )
525      return results;
526
527    String[] split = values.split( delim );
528
529    for( String value : split )
530      results.add( Coercions.<T>coerce( value, type ) );
531
532    return results;
533    }
534
535  public static String[] removeNulls( String... strings )
536    {
537    List<String> list = new ArrayList<String>();
538
539    for( String string : strings )
540      {
541      if( string != null )
542        list.add( string );
543      }
544
545    return list.toArray( new String[ list.size() ] );
546    }
547
548  public static Collection<String> quote( Collection<String> collection, String quote )
549    {
550    List<String> list = new ArrayList<String>();
551
552    for( String string : collection )
553      list.add( quote + string + quote );
554
555    return list;
556    }
557
558  public static String print( Collection collection, String delim )
559    {
560    StringBuffer buffer = new StringBuffer();
561
562    print( buffer, collection, delim );
563
564    return buffer.toString();
565    }
566
567  public static void print( StringBuffer buffer, Collection collection, String delim )
568    {
569    int count = 0;
570
571    for( Object s : collection )
572      {
573      if( count != 0 )
574        buffer.append( delim );
575
576      buffer.append( "[" );
577      buffer.append( s );
578      buffer.append( "]" );
579
580      count++;
581      }
582    }
583
584  /**
585   * This method attempts to remove any username and password from the given url String.
586   *
587   * @param url
588   * @return a String
589   */
590  public static String sanitizeUrl( String url )
591    {
592    if( url == null )
593      return null;
594
595    return url.replaceAll( "(?<=//).*:.*@", "" );
596    }
597
598  /**
599   * This method attempts to remove duplicate consecutive forward slashes from the given url.
600   *
601   * @param url
602   * @return a String
603   */
604  public static String normalizeUrl( String url )
605    {
606    if( url == null )
607      return null;
608
609    return url.replaceAll( "([^:]/)/{2,}", "$1/" );
610    }
611
612  /**
613   * This method returns the {@link Object#toString()} of the given object, or an empty String if the object
614   * is null.
615   *
616   * @param object
617   * @return a String
618   */
619  public static String toNull( Object object )
620    {
621    if( object == null )
622      return "";
623
624    return object.toString();
625    }
626
627  /**
628   * This method truncates the given String value to the given size, but appends an ellipse ("...") if the
629   * String is larger than maxSize.
630   *
631   * @param string
632   * @param maxSize
633   * @return a String
634   */
635  public static String truncate( String string, int maxSize )
636    {
637    string = toNull( string );
638
639    if( string.length() <= maxSize )
640      return string;
641
642    return String.format( "%s...", string.subSequence( 0, maxSize - 3 ) );
643    }
644
645  public static void printGraph( String filename, DirectedGraph graph )
646    {
647    try
648      {
649      new File( filename ).getParentFile().mkdirs();
650      Writer writer = new FileWriter( filename );
651
652      try
653        {
654        printGraph( writer, graph );
655        }
656      finally
657        {
658        writer.close();
659        }
660      }
661    catch( IOException exception )
662      {
663      LOG.error( "failed printing graph to {}, with exception: {}", filename, exception );
664      }
665    }
666
667  @SuppressWarnings({"unchecked"})
668  private static void printGraph( Writer writer, DirectedGraph graph )
669    {
670    DOTExporter dot = new DOTExporter( new IntegerNameProvider(), new VertexNameProvider()
671      {
672      public String getVertexName( Object object )
673        {
674        if( object == null )
675          return "none";
676
677        return object.toString().replaceAll( "\"", "\'" );
678        }
679      }, new EdgeNameProvider<Object>()
680      {
681      public String getEdgeName( Object object )
682        {
683        if( object == null )
684          return "none";
685
686        return object.toString().replaceAll( "\"", "\'" );
687        }
688      }
689    );
690
691    dot.export( writer, graph );
692    }
693
694  /**
695   * This method removes all nulls from the given List.
696   *
697   * @param list
698   */
699  @SuppressWarnings({"StatementWithEmptyBody"})
700  public static void removeAllNulls( List list )
701    {
702    while( list.remove( null ) )
703      ;
704    }
705
706  public static void writeDOT( Writer writer, DirectedGraph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider )
707    {
708    new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider ).export( writer, graph );
709    }
710
711  public static void writeDOT( Writer writer, DirectedGraph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider,
712                               ComponentAttributeProvider vertexAttributeProvider, ComponentAttributeProvider edgeAttributeProvider )
713    {
714    new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider, vertexAttributeProvider, edgeAttributeProvider ).export( writer, graph );
715    }
716
717  public static boolean isEmpty( String string )
718    {
719    return string == null || string.isEmpty();
720    }
721
722  private static String[] findSplitName( String path )
723    {
724    String separator = "/";
725
726    if( path.lastIndexOf( "/" ) < path.lastIndexOf( "\\" ) )
727      separator = "\\\\";
728
729    String[] split = path.split( separator );
730
731    path = split[ split.length - 1 ];
732
733    path = path.substring( 0, path.lastIndexOf( '.' ) ); // remove .jar
734
735    return path.split( "-(?=\\d)", 2 );
736    }
737
738  public static String findVersion( String path )
739    {
740    if( path == null || path.isEmpty() )
741      return null;
742
743    String[] split = findSplitName( path );
744
745    if( split.length == 2 )
746      return split[ 1 ];
747
748    return null;
749    }
750
751  public static String findName( String path )
752    {
753    if( path == null || path.isEmpty() )
754      return null;
755
756    String[] split = findSplitName( path );
757
758    if( split.length == 0 )
759      return null;
760
761    return split[ 0 ];
762    }
763
764  public static long getSourceModified( Object confCopy, Iterator<Tap> values, long sinkModified ) throws IOException
765    {
766    long sourceModified = 0;
767
768    while( values.hasNext() )
769      {
770      Tap source = values.next();
771
772      if( source instanceof MultiSourceTap )
773        return getSourceModified( confCopy, ( (MultiSourceTap) source ).getChildTaps(), sinkModified );
774
775      sourceModified = source.getModifiedTime( confCopy );
776
777      // source modified returns zero if does not exist
778      // this should minimize number of times we touch any file meta-data server
779      if( sourceModified == 0 && !source.resourceExists( confCopy ) )
780        throw new FlowException( "source does not exist: " + source );
781
782      if( sinkModified < sourceModified )
783        return sourceModified;
784      }
785
786    return sourceModified;
787    }
788
789  public static long getSinkModified( Object config, Collection<Tap> sinks ) throws IOException
790    {
791    long sinkModified = Long.MAX_VALUE;
792
793    for( Tap sink : sinks )
794      {
795      if( sink.isReplace() || sink.isUpdate() )
796        sinkModified = -1L;
797      else
798        {
799        if( !sink.resourceExists( config ) )
800          sinkModified = 0L;
801        else
802          sinkModified = Math.min( sinkModified, sink.getModifiedTime( config ) ); // return youngest mod date
803        }
804      }
805
806    return sinkModified;
807    }
808
809  public static String getTypeName( Type type )
810    {
811    if( type == null )
812      return null;
813
814    return type instanceof Class ? ( (Class) type ).getCanonicalName() : type.toString();
815    }
816
817  public static String getSimpleTypeName( Type type )
818    {
819    if( type == null )
820      return null;
821
822    return type instanceof Class ? ( (Class) type ).getSimpleName() : type.toString();
823    }
824
825  public static String[] typeNames( Type[] types )
826    {
827    String[] names = new String[ types.length ];
828
829    for( int i = 0; i < types.length; i++ )
830      names[ i ] = getTypeName( types[ i ] );
831
832    return names;
833    }
834
835  public static String[] simpleTypeNames( Type[] types )
836    {
837    String[] names = new String[ types.length ];
838
839    for( int i = 0; i < types.length; i++ )
840      names[ i ] = getSimpleTypeName( types[ i ] );
841
842    return names;
843    }
844
845  public static boolean containsNull( Object[] values )
846    {
847    for( Object value : values )
848      {
849      if( value == null )
850        return true;
851      }
852
853    return false;
854    }
855
856  public static void safeSleep( long durationMillis )
857    {
858    try
859      {
860      Thread.sleep( durationMillis );
861      }
862    catch( InterruptedException exception )
863      {
864      // do nothing
865      }
866    }
867
868  public static void writePDF( String path )
869    {
870    if( !HAS_DOT_EXEC )
871      return;
872
873    // dot *.dot -Tpdf -O -Nshape=box
874    File file = new File( path );
875    execProcess( file.getParentFile(), "dot", file.getName(), "-Tpdf", "-O" );
876    }
877
878  static boolean hasDOT()
879    {
880    return execProcess( null, "which", "dot" ) == 0;
881    }
882
883  public static int execProcess( File parentFile, String... command )
884    {
885    try
886      {
887      String commandLine = join( command, " " );
888
889      LOG.debug( "command: {}", commandLine );
890
891      Process process = Runtime.getRuntime().exec( commandLine, null, parentFile );
892
893      int result = process.waitFor();
894
895      BufferedReader reader = new BufferedReader( new InputStreamReader( process.getInputStream() ) );
896
897      String line = reader.readLine();
898
899      while( line != null )
900        {
901        LOG.warn( "{} stdout returned: {}", command[ 0 ], line );
902        line = reader.readLine();
903        }
904
905      reader = new BufferedReader( new InputStreamReader( process.getErrorStream() ) );
906
907      line = reader.readLine();
908
909      while( line != null )
910        {
911        LOG.warn( "{} stderr returned: {}", command[ 0 ], line );
912        line = reader.readLine();
913        }
914
915      return result;
916      }
917    catch( IOException exception )
918      {
919      LOG.warn( "unable to exec " + command[ 0 ], exception );
920      }
921    catch( InterruptedException exception )
922      {
923      LOG.warn( "interrupted exec " + command[ 0 ], exception );
924      }
925
926    return Integer.MIN_VALUE;
927    }
928
929  public static String formatDurationFromMillis( long duration )
930    {
931    if( duration / 1000 / 60 / 60 / 24 > 0.0 )
932      return formatDurationDHMSms( duration );
933    if( duration / 1000 / 60 / 60 > 0.0 )
934      return formatDurationHMSms( duration );
935    else
936      return formatDurationMSms( duration );
937    }
938
939  public static String formatDurationMSms( long duration )
940    {
941    long ms = duration % 1000;
942    long durationSeconds = duration / 1000;
943    long seconds = durationSeconds % 60;
944    long minutes = durationSeconds / 60;
945
946    return String.format( "%02d:%02d.%03d", minutes, seconds, ms );
947    }
948
949  public static String formatDurationHMSms( long duration )
950    {
951    long ms = duration % 1000;
952    long durationSeconds = duration / 1000;
953    long seconds = durationSeconds % 60;
954    long minutes = ( durationSeconds / 60 ) % 60;
955    long hours = durationSeconds / 60 / 60;
956
957    return String.format( "%02d:%02d:%02d.%03d", hours, minutes, seconds, ms );
958    }
959
960  public static String formatDurationDHMSms( long duration )
961    {
962    long ms = duration % 1000;
963    long durationSeconds = duration / 1000;
964    long seconds = durationSeconds % 60;
965    long minutes = ( durationSeconds / 60 ) % 60;
966    long hours = ( durationSeconds / 60 / 60 ) % 24;
967    long days = durationSeconds / 60 / 60 / 24;
968
969    return String.format( "%02d:%02d:%02d:%02d.%03d", days, hours, minutes, seconds, ms );
970    }
971
972  /**
973   * Converts a given comma separated String of Exception names into a List of classes.
974   * ClassNotFound exceptions are ignored if no warningMessage is given, otherwise logged as a warning.
975   *
976   * @param classNames A comma separated String of Exception names.
977   * @return List of Exception classes.
978   */
979  public static Set<Class<? extends Exception>> asClasses( String classNames, String warningMessage )
980    {
981    Set<Class<? extends Exception>> exceptionClasses = new HashSet<Class<? extends Exception>>();
982    String[] split = classNames.split( "," );
983
984    // possibly user provided type, load from context
985    ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
986
987    for( String className : split )
988      {
989      if( className != null )
990        className = className.trim();
991
992      if( isEmpty( className ) )
993        continue;
994
995      try
996        {
997        Class<? extends Exception> exceptionClass = contextClassLoader.loadClass( className ).asSubclass( Exception.class );
998
999        exceptionClasses.add( exceptionClass );
1000        }
1001      catch( ClassNotFoundException exception )
1002        {
1003        if( !Util.isEmpty( warningMessage ) )
1004          LOG.warn( "{}: {}", warningMessage, className );
1005        }
1006      }
1007
1008    return exceptionClasses;
1009    }
1010
1011  public static Boolean submitWithTimeout( Callable<Boolean> task, int timeout, TimeUnit timeUnit ) throws Exception
1012    {
1013    ExecutorService executor = Executors.newFixedThreadPool( 1 );
1014
1015    Future<Boolean> future = executor.submit( task );
1016
1017    executor.shutdown();
1018
1019    try
1020      {
1021      return future.get( timeout, timeUnit );
1022      }
1023    catch( TimeoutException exception )
1024      {
1025      future.cancel( true );
1026      }
1027    catch( ExecutionException exception )
1028      {
1029      Throwable cause = exception.getCause();
1030
1031      if( cause instanceof RuntimeException )
1032        throw (RuntimeException) cause;
1033
1034      throw (Exception) cause;
1035      }
1036
1037    return null;
1038    }
1039
1040  public interface RetryOperator<T>
1041    {
1042    T operate() throws Exception;
1043
1044    boolean rethrow( Exception exception );
1045    }
1046
1047  public static <T> T retry( Logger logger, int retries, int secondsDelay, String message, RetryOperator<T> operator ) throws Exception
1048    {
1049    Exception saved = null;
1050
1051    for( int i = 0; i < retries; i++ )
1052      {
1053      try
1054        {
1055        return operator.operate();
1056        }
1057      catch( Exception exception )
1058        {
1059        if( operator.rethrow( exception ) )
1060          {
1061          logger.warn( message + ", but not retrying", exception );
1062
1063          throw exception;
1064          }
1065
1066        saved = exception;
1067
1068        logger.warn( message + ", attempt: " + ( i + 1 ), exception );
1069
1070        try
1071          {
1072          Thread.sleep( secondsDelay * 1000 );
1073          }
1074        catch( InterruptedException exception1 )
1075          {
1076          // do nothing
1077          }
1078        }
1079      }
1080
1081    logger.warn( message + ", done retrying after attempts: " + retries, saved );
1082
1083    throw saved;
1084    }
1085
1086  public static Object createProtectedObject( Class type, Object[] parameters, Class[] parameterTypes )
1087    {
1088    try
1089      {
1090      Constructor constructor = type.getDeclaredConstructor( parameterTypes );
1091
1092      constructor.setAccessible( true );
1093
1094      return constructor.newInstance( parameters );
1095      }
1096    catch( Exception exception )
1097      {
1098      LOG.error( "unable to instantiate type: {}, with exception: {}", type.getName(), exception );
1099
1100      throw new FlowException( "unable to instantiate type: " + type.getName(), exception );
1101      }
1102    }
1103
1104  public static boolean hasClass( String typeString )
1105    {
1106    try
1107      {
1108      Util.class.getClassLoader().loadClass( typeString );
1109
1110      return true;
1111      }
1112    catch( ClassNotFoundException exception )
1113      {
1114      return false;
1115      }
1116    }
1117
1118  public static <T> T newInstance( String className, Object... parameters )
1119    {
1120    try
1121      {
1122      Class<T> type = (Class<T>) Util.class.getClassLoader().loadClass( className );
1123
1124      return newInstance( type, parameters );
1125      }
1126    catch( ClassNotFoundException exception )
1127      {
1128      throw new CascadingException( "unable to load class: " + className, exception );
1129      }
1130    }
1131
1132  public static <T> T newInstance( Class<T> target, Object... parameters )
1133    {
1134    // using Expression makes sure that constructors using sub-types properly work, otherwise we get a
1135    // NoSuchMethodException.
1136    Expression expr = new Expression( target, "new", parameters );
1137
1138    try
1139      {
1140      return (T) expr.getValue();
1141      }
1142    catch( Exception exception )
1143      {
1144      throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception );
1145      }
1146    }
1147
1148  public static Object invokeStaticMethod( String typeString, String methodName, Object[] parameters, Class[] parameterTypes )
1149    {
1150    Class type = loadClass( typeString );
1151
1152    return invokeStaticMethod( type, methodName, parameters, parameterTypes );
1153    }
1154
1155  public static Class<?> loadClass( String typeString )
1156    {
1157    try
1158      {
1159      return Thread.currentThread().getContextClassLoader().loadClass( typeString );
1160      }
1161    catch( ClassNotFoundException exception )
1162      {
1163      throw new CascadingException( "unable to load class: " + typeString, exception );
1164      }
1165    }
1166
1167  public static Class<?> loadClassSafe( String typeString )
1168    {
1169    try
1170      {
1171      return Thread.currentThread().getContextClassLoader().loadClass( typeString );
1172      }
1173    catch( ClassNotFoundException exception )
1174      {
1175      return null;
1176      }
1177    }
1178
1179  public static Object invokeStaticMethod( Class type, String methodName, Object[] parameters, Class[] parameterTypes )
1180    {
1181    try
1182      {
1183      Method method = type.getDeclaredMethod( methodName, parameterTypes );
1184
1185      method.setAccessible( true );
1186
1187      return method.invoke( null, parameters );
1188      }
1189    catch( Exception exception )
1190      {
1191      throw new CascadingException( "unable to invoke static method: " + type.getName() + "." + methodName, exception );
1192      }
1193    }
1194
1195  public static boolean hasInstanceMethod( Object target, String methodName, Class[] parameterTypes )
1196    {
1197    try
1198      {
1199      return target.getClass().getMethod( methodName, parameterTypes ) != null;
1200      }
1201    catch( NoSuchMethodException exception )
1202      {
1203      return false;
1204      }
1205    }
1206
1207  public static Object invokeInstanceMethodSafe( Object target, String methodName, Object[] parameters, Class[] parameterTypes )
1208    {
1209    try
1210      {
1211      return invokeInstanceMethod( target, methodName, parameters, parameterTypes );
1212      }
1213    catch( Exception exception )
1214      {
1215      return null;
1216      }
1217    }
1218
1219  public static Object invokeInstanceMethod( Object target, String methodName, Object[] parameters, Class[] parameterTypes )
1220    {
1221    try
1222      {
1223      Method method = target.getClass().getMethod( methodName, parameterTypes );
1224
1225      method.setAccessible( true );
1226
1227      return method.invoke( target, parameters );
1228      }
1229    catch( Exception exception )
1230      {
1231      throw new CascadingException( "unable to invoke instance method: " + target.getClass().getName() + "." + methodName, exception );
1232      }
1233    }
1234
1235  public static <R> R returnInstanceFieldIfExistsSafe( Object target, String fieldName )
1236    {
1237    try
1238      {
1239      return returnInstanceFieldIfExists( target, fieldName );
1240      }
1241    catch( Exception exception )
1242      {
1243      // do nothing
1244      return null;
1245      }
1246    }
1247
1248  public static Object invokeConstructor( String className, Object[] parameters, Class[] parameterTypes )
1249    {
1250    try
1251      {
1252      Class type = Util.class.getClassLoader().loadClass( className );
1253
1254      return invokeConstructor( type, parameters, parameterTypes );
1255      }
1256    catch( ClassNotFoundException exception )
1257      {
1258      throw new CascadingException( "unable to load class: " + className, exception );
1259      }
1260    }
1261
1262  public static <T> T invokeConstructor( Class<T> target, Object[] parameters, Class[] parameterTypes )
1263    {
1264    try
1265      {
1266      Constructor<T> constructor = target.getConstructor( parameterTypes );
1267
1268      constructor.setAccessible( true );
1269
1270      return constructor.newInstance( parameters );
1271      }
1272    catch( Exception exception )
1273      {
1274      throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception );
1275      }
1276    }
1277
1278  public static <R> R returnInstanceFieldIfExists( Object target, String fieldName )
1279    {
1280    try
1281      {
1282      Class<?> type = target.getClass();
1283      Field field = getDeclaredField( fieldName, type );
1284
1285      field.setAccessible( true );
1286
1287      return (R) field.get( target );
1288      }
1289    catch( Exception exception )
1290      {
1291      throw new CascadingException( "unable to get instance field: " + target.getClass().getName() + "." + fieldName, exception );
1292      }
1293    }
1294
1295  public static <R> boolean setInstanceFieldIfExistsSafe( Object target, String fieldName, R value )
1296    {
1297    try
1298      {
1299      setInstanceFieldIfExists( target, fieldName, value );
1300      }
1301    catch( Exception exception )
1302      {
1303      return false;
1304      }
1305
1306    return true;
1307    }
1308
1309  public static <R> void setInstanceFieldIfExists( Object target, String fieldName, R value )
1310    {
1311    try
1312      {
1313      Class<?> type = target.getClass();
1314      Field field = getDeclaredField( fieldName, type );
1315
1316      field.setAccessible( true );
1317
1318      field.set( target, value );
1319      }
1320    catch( Exception exception )
1321      {
1322      throw new CascadingException( "unable to set instance field: " + target.getClass().getName() + "." + fieldName, exception );
1323      }
1324    }
1325
1326  private static Field getDeclaredField( String fieldName, Class<?> type )
1327    {
1328    if( type == Object.class )
1329      {
1330      if( LOG.isDebugEnabled() )
1331        LOG.debug( "did not find {} field on {}", fieldName, type.getName() );
1332
1333      return null;
1334      }
1335
1336    try
1337      {
1338      return type.getDeclaredField( fieldName );
1339      }
1340    catch( NoSuchFieldException exception )
1341      {
1342      return getDeclaredField( fieldName, type.getSuperclass() );
1343      }
1344    }
1345
1346  public static String makePath( String prefix, String name )
1347    {
1348    if( name == null || name.isEmpty() )
1349      throw new IllegalArgumentException( "name may not be null or empty " );
1350
1351    if( prefix == null || prefix.isEmpty() )
1352      prefix = Long.toString( (long) ( Math.random() * 10000000000L ) );
1353
1354    name = cleansePathName( name.substring( 0, name.length() < 25 ? name.length() : 25 ) );
1355
1356    return prefix + "/" + name + "/";
1357    }
1358
1359  public static String cleansePathName( String name )
1360    {
1361    return name.replaceAll( "\\s+|\\*|\\+|/+", "_" );
1362    }
1363
1364  public static Class findMainClass( Class defaultType, String packageExclude )
1365    {
1366    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
1367
1368    for( StackTraceElement stackTraceElement : stackTrace )
1369      {
1370      if( stackTraceElement.getMethodName().equals( "main" ) && !stackTraceElement.getClassName().startsWith( packageExclude ) )
1371        {
1372        try
1373          {
1374          LOG.info( "resolving application jar from found main method on: {}", stackTraceElement.getClassName() );
1375
1376          return Thread.currentThread().getContextClassLoader().loadClass( stackTraceElement.getClassName() );
1377          }
1378        catch( ClassNotFoundException exception )
1379          {
1380          LOG.warn( "unable to load class while discovering application jar: {}", stackTraceElement.getClassName(), exception );
1381          }
1382        }
1383      }
1384
1385    LOG.info( "using default application jar, may cause class not found exceptions on the cluster" );
1386
1387    return defaultType;
1388    }
1389
1390  public static String findContainingJar( Class<?> type )
1391    {
1392    ClassLoader classLoader = type.getClassLoader();
1393
1394    String classFile = type.getName().replaceAll( "\\.", "/" ) + ".class";
1395
1396    try
1397      {
1398      for( Enumeration<URL> iterator = classLoader.getResources( classFile ); iterator.hasMoreElements(); )
1399        {
1400        URL url = iterator.nextElement();
1401
1402        if( !"jar".equals( url.getProtocol() ) )
1403          continue;
1404
1405        String path = url.getPath();
1406
1407        if( path.startsWith( "file:" ) )
1408          path = path.substring( "file:".length() );
1409
1410        path = URLDecoder.decode( path, "UTF-8" );
1411
1412        return path.replaceAll( "!.*$", "" );
1413        }
1414      }
1415    catch( IOException exception )
1416      {
1417      throw new CascadingException( exception );
1418      }
1419
1420    return null;
1421    }
1422
1423  public static boolean containsWhitespace( String string )
1424    {
1425    return Pattern.compile( "\\s" ).matcher( string ).find();
1426    }
1427
1428  public static String parseHostname( String uri )
1429    {
1430    if( isEmpty( uri ) )
1431      return null;
1432
1433    String[] parts = uri.split( "://", 2 );
1434    String result;
1435
1436    // missing protocol
1437    result = parts[ parts.length - 1 ];
1438
1439    // user:pass@hostname:port/stuff
1440    parts = result.split( "/", 2 );
1441    result = parts[ 0 ];
1442
1443    // user:pass@hostname:port
1444    parts = result.split( "@", 2 );
1445    result = parts[ parts.length - 1 ];
1446
1447    // hostname:port
1448    parts = result.split( ":", 2 );
1449    result = parts[ 0 ];
1450
1451    return result;
1452    }
1453  }