001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.util; 022 023import java.beans.Expression; 024import java.io.BufferedReader; 025import java.io.File; 026import java.io.FileWriter; 027import java.io.IOException; 028import java.io.InputStreamReader; 029import java.io.Writer; 030import java.lang.reflect.Constructor; 031import java.lang.reflect.Field; 032import java.lang.reflect.Method; 033import java.lang.reflect.Type; 034import java.net.URL; 035import java.net.URLDecoder; 036import java.security.MessageDigest; 037import java.security.NoSuchAlgorithmException; 038import java.util.ArrayList; 039import java.util.Arrays; 040import java.util.Collection; 041import java.util.Collections; 042import java.util.Enumeration; 043import java.util.HashMap; 044import java.util.HashSet; 045import java.util.IdentityHashMap; 046import java.util.Iterator; 047import java.util.LinkedHashSet; 048import java.util.List; 049import java.util.Map; 050import java.util.Set; 051import java.util.TreeSet; 052import java.util.UUID; 053import java.util.concurrent.Callable; 054import java.util.concurrent.ExecutionException; 055import java.util.concurrent.ExecutorService; 056import java.util.concurrent.Executors; 057import java.util.concurrent.Future; 058import java.util.concurrent.TimeUnit; 059import java.util.concurrent.TimeoutException; 060import java.util.regex.Pattern; 061 062import cascading.CascadingException; 063import cascading.flow.FlowException; 064import cascading.tap.MultiSourceTap; 065import cascading.tap.Tap; 066import cascading.tuple.coerce.Coercions; 067import cascading.util.jgrapht.ComponentAttributeProvider; 068import cascading.util.jgrapht.DOTExporter; 069import cascading.util.jgrapht.EdgeNameProvider; 070import cascading.util.jgrapht.IntegerNameProvider; 071import cascading.util.jgrapht.VertexNameProvider; 072import org.jgrapht.DirectedGraph; 073import org.jgrapht.graph.SimpleDirectedGraph; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077/** Class Util provides reusable operations. */ 078public class Util 079 { 080 /** 081 * On OS X only, and if the graphviz dot binary is installed, when true, dot will be invoked to convert the dot file 082 * to a pdf document. 083 */ 084 public static final String CONVERT_DOT_TO_PDF = "util.dot.to.pdf.enabled"; 085 public static int ID_LENGTH = 32; 086 087 private static final Logger LOG = LoggerFactory.getLogger( Util.class ); 088 private static final String HEXES = "0123456789ABCDEF"; 089 090 public static final boolean IS_OSX = System.getProperty( "os.name" ).toLowerCase().contains( "Mac OS X".toLowerCase() ); 091 public static final boolean HAS_DOT_EXEC = IS_OSX && Boolean.getBoolean( CONVERT_DOT_TO_PDF ) && hasDOT(); 092 093 public static <K, V> HashMap<K, V> createHashMap() 094 { 095 return new HashMap<K, V>(); 096 } 097 098 public static <K, V> boolean reverseMap( Map<V, K> from, Map<K, V> to ) 099 { 100 boolean dupes = false; 101 102 for( Map.Entry<V, K> entry : from.entrySet() ) 103 dupes |= to.put( entry.getValue(), entry.getKey() ) != null; 104 105 return dupes; 106 } 107 108 public static <V> Set<V> createIdentitySet() 109 { 110 return Collections.<V>newSetFromMap( new IdentityHashMap() ); 111 } 112 113 public static <V> Set<V> createIdentitySet( Collection<V> collection ) 114 { 115 Set<V> identitySet = createIdentitySet(); 116 117 if( collection != null ) 118 identitySet.addAll( collection ); 119 120 return identitySet; 121 } 122 123 public static <V> V getFirst( Collection<V> collection ) 124 { 125 if( collection == null || collection.isEmpty() ) 126 return null; 127 128 return collection.iterator().next(); 129 } 130 131 public static <V> V getFirst( Iterator<V> iterator ) 132 { 133 if( iterator == null || !iterator.hasNext() ) 134 return null; 135 136 return iterator.next(); 137 } 138 139 public static <V> V getLast( Iterator<V> iterator ) 140 { 141 if( iterator == null || !iterator.hasNext() ) 142 return null; 143 144 V v = iterator.next(); 145 146 while( iterator.hasNext() ) 147 v = iterator.next(); 148 149 return v; 150 } 151 152 public static <N extends Number> N max( Collection<N> collection ) 153 { 154 return new TreeSet<>( collection ).first(); 155 } 156 157 public static <N extends Number> N min( Collection<N> collection ) 158 { 159 return new TreeSet<>( collection ).last(); 160 } 161 162 public static <T> Set<T> narrowSet( Class<T> type, Collection collection ) 163 { 164 return narrowSet( type, collection.iterator() ); 165 } 166 167 public static <T> Set<T> narrowIdentitySet( Class<T> type, Collection collection ) 168 { 169 return narrowIdentitySet( type, collection.iterator() ); 170 } 171 172 public static <T> Set<T> narrowSet( Class<T> type, Collection collection, boolean include ) 173 { 174 return narrowSet( type, collection.iterator(), include ); 175 } 176 177 public static <T> Set<T> narrowIdentitySet( Class<T> type, Collection collection, boolean include ) 178 { 179 return narrowIdentitySet( type, collection.iterator(), include ); 180 } 181 182 public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator ) 183 { 184 return narrowSet( type, iterator, true ); 185 } 186 187 public static <T> Set<T> narrowIdentitySet( Class<T> type, Iterator iterator ) 188 { 189 return narrowIdentitySet( type, iterator, true ); 190 } 191 192 public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator, boolean include ) 193 { 194 return narrowSetInternal( type, iterator, include, new HashSet<T>() ); 195 } 196 197 public static <T> Set<T> narrowIdentitySet( Class<T> type, Iterator iterator, boolean include ) 198 { 199 return narrowSetInternal( type, iterator, include, Util.<T>createIdentitySet() ); 200 } 201 202 private static <T> Set<T> narrowSetInternal( Class<T> type, Iterator iterator, boolean include, Set<T> set ) 203 { 204 while( iterator.hasNext() ) 205 { 206 Object o = iterator.next(); 207 208 if( type.isInstance( o ) == include ) 209 set.add( (T) o ); 210 } 211 212 return set; 213 } 214 215 public static <T> boolean contains( Class<T> type, Collection collection ) 216 { 217 return contains( type, collection.iterator() ); 218 } 219 220 public static <T> boolean contains( Class<T> type, Iterator iterator ) 221 { 222 while( iterator.hasNext() ) 223 { 224 Object o = iterator.next(); 225 226 if( type.isInstance( o ) ) 227 return true; 228 } 229 230 return false; 231 } 232 233 public static <T> Set<T> differenceIdentity( Set<T> lhs, Set<T> rhs ) 234 { 235 Set<T> diff = createIdentitySet( lhs ); 236 237 diff.removeAll( rhs ); 238 239 return diff; 240 } 241 242 public static synchronized String createUniqueIDWhichStartsWithAChar() 243 { 244 String value; 245 246 do 247 { 248 value = createUniqueID(); 249 } 250 while( Character.isDigit( value.charAt( 0 ) ) ); 251 252 return value; 253 } 254 255 public static synchronized String createUniqueID() 256 { 257 // creates a cryptographically secure random value 258 String value = UUID.randomUUID().toString(); 259 return value.toUpperCase().replaceAll( "-", "" ); 260 } 261 262 public static String createID( String rawID ) 263 { 264 return createID( rawID.getBytes() ); 265 } 266 267 /** 268 * Method CreateID returns a HEX hash of the given bytes with length 32 characters long. 269 * 270 * @param bytes the bytes 271 * @return string 272 */ 273 public static String createID( byte[] bytes ) 274 { 275 try 276 { 277 return getHex( MessageDigest.getInstance( "MD5" ).digest( bytes ) ); 278 } 279 catch( NoSuchAlgorithmException exception ) 280 { 281 throw new RuntimeException( "unable to digest string" ); 282 } 283 } 284 285 public static String getHex( byte[] bytes ) 286 { 287 if( bytes == null ) 288 return null; 289 290 final StringBuilder hex = new StringBuilder( 2 * bytes.length ); 291 292 for( final byte b : bytes ) 293 hex.append( HEXES.charAt( ( b & 0xF0 ) >> 4 ) ).append( HEXES.charAt( b & 0x0F ) ); 294 295 return hex.toString(); 296 } 297 298 public static byte[] longToByteArray( long value ) 299 { 300 return new byte[]{ 301 (byte) ( value >> 56 ), 302 (byte) ( value >> 48 ), 303 (byte) ( value >> 40 ), 304 (byte) ( value >> 32 ), 305 (byte) ( value >> 24 ), 306 (byte) ( value >> 16 ), 307 (byte) ( value >> 8 ), 308 (byte) value 309 }; 310 } 311 312 public static byte[] intToByteArray( int value ) 313 { 314 return new byte[]{ 315 (byte) ( value >> 24 ), 316 (byte) ( value >> 16 ), 317 (byte) ( value >> 8 ), 318 (byte) value 319 }; 320 } 321 322 public static <T> T[] copy( T[] source ) 323 { 324 if( source == null ) 325 return null; 326 327 return Arrays.copyOf( source, source.length ); 328 } 329 330 public static String unique( String value, String delim ) 331 { 332 String[] split = value.split( delim ); 333 334 Set<String> values = new LinkedHashSet<String>(); 335 336 Collections.addAll( values, split ); 337 338 return join( values, delim ); 339 } 340 341 /** 342 * This method joins the values in the given list with the delim String value. 343 * 344 * @param list 345 * @param delim 346 * @return String 347 */ 348 public static String join( int[] list, String delim ) 349 { 350 return join( list, delim, false ); 351 } 352 353 public static String join( int[] list, String delim, boolean printNull ) 354 { 355 StringBuffer buffer = new StringBuffer(); 356 int count = 0; 357 358 for( Object s : list ) 359 { 360 if( count != 0 ) 361 buffer.append( delim ); 362 363 if( printNull || s != null ) 364 buffer.append( s ); 365 366 count++; 367 } 368 369 return buffer.toString(); 370 } 371 372 public static String join( String delim, String... strings ) 373 { 374 return join( delim, false, strings ); 375 } 376 377 public static String join( String delim, boolean printNull, String... strings ) 378 { 379 return join( strings, delim, printNull ); 380 } 381 382 /** 383 * This method joins the values in the given list with the delim String value. 384 * 385 * @param list 386 * @param delim 387 * @return a String 388 */ 389 public static String join( Object[] list, String delim ) 390 { 391 return join( list, delim, false ); 392 } 393 394 public static String join( Object[] list, String delim, boolean printNull ) 395 { 396 return join( list, delim, printNull, 0 ); 397 } 398 399 public static String join( Object[] list, String delim, boolean printNull, int beginAt ) 400 { 401 return join( list, delim, printNull, beginAt, list.length - beginAt ); 402 } 403 404 public static String join( Object[] list, String delim, boolean printNull, int beginAt, int length ) 405 { 406 StringBuffer buffer = new StringBuffer(); 407 int count = 0; 408 409 for( int i = beginAt; i < beginAt + length; i++ ) 410 { 411 Object s = list[ i ]; 412 if( count != 0 ) 413 buffer.append( delim ); 414 415 if( printNull || s != null ) 416 buffer.append( s ); 417 418 count++; 419 } 420 421 return buffer.toString(); 422 } 423 424 public static String join( Iterable iterable, String delim, boolean printNull ) 425 { 426 int count = 0; 427 428 StringBuilder buffer = new StringBuilder(); 429 430 for( Object s : iterable ) 431 { 432 if( count != 0 ) 433 buffer.append( delim ); 434 435 if( printNull || s != null ) 436 buffer.append( s ); 437 438 count++; 439 } 440 441 return buffer.toString(); 442 } 443 444 /** 445 * This method joins each value in the collection with a tab character as the delimiter. 446 * 447 * @param collection 448 * @return a String 449 */ 450 public static String join( Collection collection ) 451 { 452 return join( collection, "\t" ); 453 } 454 455 /** 456 * This method joins each valuein the collection with the given delimiter. 457 * 458 * @param collection 459 * @param delim 460 * @return a String 461 */ 462 public static String join( Collection collection, String delim ) 463 { 464 return join( collection, delim, false ); 465 } 466 467 public static String join( Collection collection, String delim, boolean printNull ) 468 { 469 StringBuffer buffer = new StringBuffer(); 470 471 join( buffer, collection, delim, printNull ); 472 473 return buffer.toString(); 474 } 475 476 /** 477 * This method joins each value in the collection with the given delimiter. All results are appended to the 478 * given {@link StringBuffer} instance. 479 * 480 * @param buffer 481 * @param collection 482 * @param delim 483 */ 484 public static void join( StringBuffer buffer, Collection collection, String delim ) 485 { 486 join( buffer, collection, delim, false ); 487 } 488 489 public static void join( StringBuffer buffer, Collection collection, String delim, boolean printNull ) 490 { 491 int count = 0; 492 493 for( Object s : collection ) 494 { 495 if( count != 0 ) 496 buffer.append( delim ); 497 498 if( printNull || s != null ) 499 buffer.append( s ); 500 501 count++; 502 } 503 } 504 505 public static <T> List<T> split( Class<T> type, String values ) 506 { 507 return split( type, ",", values ); 508 } 509 510 public static <T> List<T> split( Class<T> type, String delim, String values ) 511 { 512 List<T> results = new ArrayList<>(); 513 514 if( values == null ) 515 return results; 516 517 String[] split = values.split( delim ); 518 519 for( String value : split ) 520 results.add( Coercions.<T>coerce( value, type ) ); 521 522 return results; 523 } 524 525 public static String[] removeNulls( String... strings ) 526 { 527 List<String> list = new ArrayList<String>(); 528 529 for( String string : strings ) 530 { 531 if( string != null ) 532 list.add( string ); 533 } 534 535 return list.toArray( new String[ list.size() ] ); 536 } 537 538 public static Collection<String> quote( Collection<String> collection, String quote ) 539 { 540 List<String> list = new ArrayList<String>(); 541 542 for( String string : collection ) 543 list.add( quote + string + quote ); 544 545 return list; 546 } 547 548 public static String print( Collection collection, String delim ) 549 { 550 StringBuffer buffer = new StringBuffer(); 551 552 print( buffer, collection, delim ); 553 554 return buffer.toString(); 555 } 556 557 public static void print( StringBuffer buffer, Collection collection, String delim ) 558 { 559 int count = 0; 560 561 for( Object s : collection ) 562 { 563 if( count != 0 ) 564 buffer.append( delim ); 565 566 buffer.append( "[" ); 567 buffer.append( s ); 568 buffer.append( "]" ); 569 570 count++; 571 } 572 } 573 574 /** 575 * This method attempts to remove any username and password from the given url String. 576 * 577 * @param url 578 * @return a String 579 */ 580 public static String sanitizeUrl( String url ) 581 { 582 if( url == null ) 583 return null; 584 585 return url.replaceAll( "(?<=//).*:.*@", "" ); 586 } 587 588 /** 589 * This method attempts to remove duplicate consecutive forward slashes from the given url. 590 * 591 * @param url 592 * @return a String 593 */ 594 public static String normalizeUrl( String url ) 595 { 596 if( url == null ) 597 return null; 598 599 return url.replaceAll( "([^:]/)/{2,}", "$1/" ); 600 } 601 602 /** 603 * This method returns the {@link Object#toString()} of the given object, or an empty String if the object 604 * is null. 605 * 606 * @param object 607 * @return a String 608 */ 609 public static String toNull( Object object ) 610 { 611 if( object == null ) 612 return ""; 613 614 return object.toString(); 615 } 616 617 /** 618 * This method truncates the given String value to the given size, but appends an ellipse ("...") if the 619 * String is larger than maxSize. 620 * 621 * @param string 622 * @param maxSize 623 * @return a String 624 */ 625 public static String truncate( String string, int maxSize ) 626 { 627 string = toNull( string ); 628 629 if( string.length() <= maxSize ) 630 return string; 631 632 return String.format( "%s...", string.subSequence( 0, maxSize - 3 ) ); 633 } 634 635 public static void printGraph( String filename, SimpleDirectedGraph graph ) 636 { 637 try 638 { 639 new File( filename ).getParentFile().mkdirs(); 640 Writer writer = new FileWriter( filename ); 641 642 try 643 { 644 printGraph( writer, graph ); 645 } 646 finally 647 { 648 writer.close(); 649 } 650 } 651 catch( IOException exception ) 652 { 653 LOG.error( "failed printing graph to {}, with exception: {}", filename, exception ); 654 } 655 } 656 657 @SuppressWarnings({"unchecked"}) 658 private static void printGraph( Writer writer, SimpleDirectedGraph graph ) 659 { 660 DOTExporter dot = new DOTExporter( new IntegerNameProvider(), new VertexNameProvider() 661 { 662 public String getVertexName( Object object ) 663 { 664 if( object == null ) 665 return "none"; 666 667 return object.toString().replaceAll( "\"", "\'" ); 668 } 669 }, new EdgeNameProvider<Object>() 670 { 671 public String getEdgeName( Object object ) 672 { 673 if( object == null ) 674 return "none"; 675 676 return object.toString().replaceAll( "\"", "\'" ); 677 } 678 } 679 ); 680 681 dot.export( writer, graph ); 682 } 683 684 /** 685 * This method removes all nulls from the given List. 686 * 687 * @param list 688 */ 689 @SuppressWarnings({"StatementWithEmptyBody"}) 690 public static void removeAllNulls( List list ) 691 { 692 while( list.remove( null ) ) 693 ; 694 } 695 696 public static void writeDOT( Writer writer, DirectedGraph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider ) 697 { 698 new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider ).export( writer, graph ); 699 } 700 701 public static void writeDOT( Writer writer, DirectedGraph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider, 702 ComponentAttributeProvider vertexAttributeProvider, ComponentAttributeProvider edgeAttributeProvider ) 703 { 704 new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider, vertexAttributeProvider, edgeAttributeProvider ).export( writer, graph ); 705 } 706 707 public static boolean isEmpty( String string ) 708 { 709 return string == null || string.isEmpty(); 710 } 711 712 private static String[] findSplitName( String path ) 713 { 714 String separator = "/"; 715 716 if( path.lastIndexOf( "/" ) < path.lastIndexOf( "\\" ) ) 717 separator = "\\\\"; 718 719 String[] split = path.split( separator ); 720 721 path = split[ split.length - 1 ]; 722 723 path = path.substring( 0, path.lastIndexOf( '.' ) ); // remove .jar 724 725 return path.split( "-(?=\\d)", 2 ); 726 } 727 728 public static String findVersion( String path ) 729 { 730 if( path == null || path.isEmpty() ) 731 return null; 732 733 String[] split = findSplitName( path ); 734 735 if( split.length == 2 ) 736 return split[ 1 ]; 737 738 return null; 739 } 740 741 public static String findName( String path ) 742 { 743 if( path == null || path.isEmpty() ) 744 return null; 745 746 String[] split = findSplitName( path ); 747 748 if( split.length == 0 ) 749 return null; 750 751 return split[ 0 ]; 752 } 753 754 public static long getSourceModified( Object confCopy, Iterator<Tap> values, long sinkModified ) throws IOException 755 { 756 long sourceModified = 0; 757 758 while( values.hasNext() ) 759 { 760 Tap source = values.next(); 761 762 if( source instanceof MultiSourceTap ) 763 return getSourceModified( confCopy, ( (MultiSourceTap) source ).getChildTaps(), sinkModified ); 764 765 sourceModified = source.getModifiedTime( confCopy ); 766 767 // source modified returns zero if does not exist 768 // this should minimize number of times we touch any file meta-data server 769 if( sourceModified == 0 && !source.resourceExists( confCopy ) ) 770 throw new FlowException( "source does not exist: " + source ); 771 772 if( sinkModified < sourceModified ) 773 return sourceModified; 774 } 775 776 return sourceModified; 777 } 778 779 public static long getSinkModified( Object config, Collection<Tap> sinks ) throws IOException 780 { 781 long sinkModified = Long.MAX_VALUE; 782 783 for( Tap sink : sinks ) 784 { 785 if( sink.isReplace() || sink.isUpdate() ) 786 sinkModified = -1L; 787 else 788 { 789 if( !sink.resourceExists( config ) ) 790 sinkModified = 0L; 791 else 792 sinkModified = Math.min( sinkModified, sink.getModifiedTime( config ) ); // return youngest mod date 793 } 794 } 795 return sinkModified; 796 } 797 798 public static String getTypeName( Type type ) 799 { 800 if( type == null ) 801 return null; 802 803 return type instanceof Class ? ( (Class) type ).getCanonicalName() : type.toString(); 804 } 805 806 public static String getSimpleTypeName( Type type ) 807 { 808 if( type == null ) 809 return null; 810 811 return type instanceof Class ? ( (Class) type ).getSimpleName() : type.toString(); 812 } 813 814 public static String[] typeNames( Type[] types ) 815 { 816 String[] names = new String[ types.length ]; 817 818 for( int i = 0; i < types.length; i++ ) 819 names[ i ] = getTypeName( types[ i ] ); 820 821 return names; 822 } 823 824 public static String[] simpleTypeNames( Type[] types ) 825 { 826 String[] names = new String[ types.length ]; 827 828 for( int i = 0; i < types.length; i++ ) 829 names[ i ] = getSimpleTypeName( types[ i ] ); 830 831 return names; 832 } 833 834 public static boolean containsNull( Object[] values ) 835 { 836 for( Object value : values ) 837 { 838 if( value == null ) 839 return true; 840 } 841 842 return false; 843 } 844 845 public static void safeSleep( long durationMillis ) 846 { 847 try 848 { 849 Thread.sleep( durationMillis ); 850 } 851 catch( InterruptedException exception ) 852 { 853 // do nothing 854 } 855 } 856 857 public static void writePDF( String path ) 858 { 859 if( !HAS_DOT_EXEC ) 860 return; 861 862 // dot *.dot -Tpdf -O -Nshape=box 863 File file = new File( path ); 864 execProcess( file.getParentFile(), "dot", file.getName(), "-Tpdf", "-O" ); 865 } 866 867 static boolean hasDOT() 868 { 869 return execProcess( null, "which", "dot" ) == 0; 870 } 871 872 public static int execProcess( File parentFile, String... command ) 873 { 874 try 875 { 876 String commandLine = join( command, " " ); 877 878 LOG.debug( "command: {}", commandLine ); 879 880 Process process = Runtime.getRuntime().exec( commandLine, null, parentFile ); 881 882 int result = process.waitFor(); 883 884 BufferedReader reader = new BufferedReader( new InputStreamReader( process.getInputStream() ) ); 885 886 String line = reader.readLine(); 887 888 while( line != null ) 889 { 890 LOG.warn( "{} stdout returned: {}", command[ 0 ], line ); 891 line = reader.readLine(); 892 } 893 894 reader = new BufferedReader( new InputStreamReader( process.getErrorStream() ) ); 895 896 line = reader.readLine(); 897 898 while( line != null ) 899 { 900 LOG.warn( "{} stderr returned: {}", command[ 0 ], line ); 901 line = reader.readLine(); 902 } 903 904 return result; 905 } 906 catch( IOException exception ) 907 { 908 LOG.warn( "unable to exec " + command[ 0 ], exception ); 909 } 910 catch( InterruptedException exception ) 911 { 912 LOG.warn( "interrupted exec " + command[ 0 ], exception ); 913 } 914 915 return Integer.MIN_VALUE; 916 } 917 918 public static String formatDurationFromMillis( long duration ) 919 { 920 if( duration / 1000 / 60 / 60 / 24 > 0.0 ) 921 return formatDurationDHMSms( duration ); 922 if( duration / 1000 / 60 / 60 > 0.0 ) 923 return formatDurationHMSms( duration ); 924 else 925 return formatDurationMSms( duration ); 926 } 927 928 public static String formatDurationMSms( long duration ) 929 { 930 long ms = duration % 1000; 931 long durationSeconds = duration / 1000; 932 long seconds = durationSeconds % 60; 933 long minutes = durationSeconds / 60; 934 935 return String.format( "%02d:%02d.%03d", minutes, seconds, ms ); 936 } 937 938 public static String formatDurationHMSms( long duration ) 939 { 940 long ms = duration % 1000; 941 long durationSeconds = duration / 1000; 942 long seconds = durationSeconds % 60; 943 long minutes = ( durationSeconds / 60 ) % 60; 944 long hours = durationSeconds / 60 / 60; 945 946 return String.format( "%02d:%02d:%02d.%03d", hours, minutes, seconds, ms ); 947 } 948 949 public static String formatDurationDHMSms( long duration ) 950 { 951 long ms = duration % 1000; 952 long durationSeconds = duration / 1000; 953 long seconds = durationSeconds % 60; 954 long minutes = ( durationSeconds / 60 ) % 60; 955 long hours = ( durationSeconds / 60 / 60 ) % 24; 956 long days = durationSeconds / 60 / 60 / 24; 957 958 return String.format( "%02d:%02d:%02d:%02d.%03d", days, hours, minutes, seconds, ms ); 959 } 960 961 /** 962 * Converts a given comma separated String of Exception names into a List of classes. 963 * ClassNotFound exceptions are ignored if no warningMessage is given, otherwise logged as a warning. 964 * 965 * @param classNames A comma separated String of Exception names. 966 * @return List of Exception classes. 967 */ 968 public static Set<Class<? extends Exception>> asClasses( String classNames, String warningMessage ) 969 { 970 Set<Class<? extends Exception>> exceptionClasses = new HashSet<Class<? extends Exception>>(); 971 String[] split = classNames.split( "," ); 972 973 // possibly user provided type, load from context 974 ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); 975 976 for( String className : split ) 977 { 978 if( className != null ) 979 className = className.trim(); 980 981 if( isEmpty( className ) ) 982 continue; 983 984 try 985 { 986 Class<? extends Exception> exceptionClass = contextClassLoader.loadClass( className ).asSubclass( Exception.class ); 987 988 exceptionClasses.add( exceptionClass ); 989 } 990 catch( ClassNotFoundException exception ) 991 { 992 if( !Util.isEmpty( warningMessage ) ) 993 LOG.warn( "{}: {}", warningMessage, className ); 994 } 995 } 996 997 return exceptionClasses; 998 } 999 1000 public static Boolean submitWithTimeout( Callable<Boolean> task, int timeout, TimeUnit timeUnit ) throws Exception 1001 { 1002 ExecutorService executor = Executors.newFixedThreadPool( 1 ); 1003 1004 Future<Boolean> future = executor.submit( task ); 1005 1006 executor.shutdown(); 1007 1008 try 1009 { 1010 return future.get( timeout, timeUnit ); 1011 } 1012 catch( TimeoutException exception ) 1013 { 1014 future.cancel( true ); 1015 } 1016 catch( ExecutionException exception ) 1017 { 1018 Throwable cause = exception.getCause(); 1019 1020 if( cause instanceof RuntimeException ) 1021 throw (RuntimeException) cause; 1022 1023 throw (Exception) cause; 1024 } 1025 1026 return null; 1027 } 1028 1029 public interface RetryOperator<T> 1030 { 1031 T operate() throws Exception; 1032 1033 boolean rethrow( Exception exception ); 1034 } 1035 1036 public static <T> T retry( Logger logger, int retries, int secondsDelay, String message, RetryOperator<T> operator ) throws Exception 1037 { 1038 Exception saved = null; 1039 1040 for( int i = 0; i < retries; i++ ) 1041 { 1042 try 1043 { 1044 return operator.operate(); 1045 } 1046 catch( Exception exception ) 1047 { 1048 if( operator.rethrow( exception ) ) 1049 { 1050 logger.warn( message + ", but not retrying", exception ); 1051 1052 throw exception; 1053 } 1054 1055 saved = exception; 1056 1057 logger.warn( message + ", attempt: " + ( i + 1 ), exception ); 1058 1059 try 1060 { 1061 Thread.sleep( secondsDelay * 1000 ); 1062 } 1063 catch( InterruptedException exception1 ) 1064 { 1065 // do nothing 1066 } 1067 } 1068 } 1069 1070 logger.warn( message + ", done retrying after attempts: " + retries, saved ); 1071 1072 throw saved; 1073 } 1074 1075 public static Object createProtectedObject( Class type, Object[] parameters, Class[] parameterTypes ) 1076 { 1077 try 1078 { 1079 Constructor constructor = type.getDeclaredConstructor( parameterTypes ); 1080 1081 constructor.setAccessible( true ); 1082 1083 return constructor.newInstance( parameters ); 1084 } 1085 catch( Exception exception ) 1086 { 1087 LOG.error( "unable to instantiate type: {}, with exception: {}", type.getName(), exception ); 1088 1089 throw new FlowException( "unable to instantiate type: " + type.getName(), exception ); 1090 } 1091 } 1092 1093 public static boolean hasClass( String typeString ) 1094 { 1095 try 1096 { 1097 Util.class.getClassLoader().loadClass( typeString ); 1098 1099 return true; 1100 } 1101 catch( ClassNotFoundException exception ) 1102 { 1103 return false; 1104 } 1105 } 1106 1107 public static <T> T newInstance( String className, Object... parameters ) 1108 { 1109 try 1110 { 1111 Class<T> type = (Class<T>) Util.class.getClassLoader().loadClass( className ); 1112 1113 return newInstance( type, parameters ); 1114 } 1115 catch( ClassNotFoundException exception ) 1116 { 1117 throw new CascadingException( "unable to load class: " + className, exception ); 1118 } 1119 } 1120 1121 public static <T> T newInstance( Class<T> target, Object... parameters ) 1122 { 1123 // using Expression makes sure that constructors using sub-types properly work, otherwise we get a 1124 // NoSuchMethodException. 1125 Expression expr = new Expression( target, "new", parameters ); 1126 1127 try 1128 { 1129 return (T) expr.getValue(); 1130 } 1131 catch( Exception exception ) 1132 { 1133 throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception ); 1134 } 1135 } 1136 1137 public static Object invokeStaticMethod( String typeString, String methodName, Object[] parameters, Class[] parameterTypes ) 1138 { 1139 Class type = loadClass( typeString ); 1140 1141 return invokeStaticMethod( type, methodName, parameters, parameterTypes ); 1142 } 1143 1144 public static Class<?> loadClass( String typeString ) 1145 { 1146 try 1147 { 1148 return Thread.currentThread().getContextClassLoader().loadClass( typeString ); 1149 } 1150 catch( ClassNotFoundException exception ) 1151 { 1152 throw new CascadingException( "unable to load class: " + typeString, exception ); 1153 } 1154 } 1155 1156 public static Class<?> loadClassSafe( String typeString ) 1157 { 1158 try 1159 { 1160 return Thread.currentThread().getContextClassLoader().loadClass( typeString ); 1161 } 1162 catch( ClassNotFoundException exception ) 1163 { 1164 return null; 1165 } 1166 } 1167 1168 public static Object invokeStaticMethod( Class type, String methodName, Object[] parameters, Class[] parameterTypes ) 1169 { 1170 try 1171 { 1172 Method method = type.getDeclaredMethod( methodName, parameterTypes ); 1173 1174 method.setAccessible( true ); 1175 1176 return method.invoke( null, parameters ); 1177 } 1178 catch( Exception exception ) 1179 { 1180 throw new CascadingException( "unable to invoke static method: " + type.getName() + "." + methodName, exception ); 1181 } 1182 } 1183 1184 public static boolean hasInstanceMethod( Object target, String methodName, Class[] parameterTypes ) 1185 { 1186 try 1187 { 1188 return target.getClass().getMethod( methodName, parameterTypes ) != null; 1189 } 1190 catch( NoSuchMethodException exception ) 1191 { 1192 return false; 1193 } 1194 } 1195 1196 public static Object invokeInstanceMethodSafe( Object target, String methodName, Object[] parameters, Class[] parameterTypes ) 1197 { 1198 try 1199 { 1200 return invokeInstanceMethod( target, methodName, parameters, parameterTypes ); 1201 } 1202 catch( Exception exception ) 1203 { 1204 return null; 1205 } 1206 } 1207 1208 public static Object invokeInstanceMethod( Object target, String methodName, Object[] parameters, Class[] parameterTypes ) 1209 { 1210 try 1211 { 1212 Method method = target.getClass().getMethod( methodName, parameterTypes ); 1213 1214 method.setAccessible( true ); 1215 1216 return method.invoke( target, parameters ); 1217 } 1218 catch( Exception exception ) 1219 { 1220 throw new CascadingException( "unable to invoke instance method: " + target.getClass().getName() + "." + methodName, exception ); 1221 } 1222 } 1223 1224 public static <R> R returnInstanceFieldIfExistsSafe( Object target, String fieldName ) 1225 { 1226 try 1227 { 1228 return returnInstanceFieldIfExists( target, fieldName ); 1229 } 1230 catch( Exception exception ) 1231 { 1232 // do nothing 1233 return null; 1234 } 1235 } 1236 1237 public static Object invokeConstructor( String className, Object[] parameters, Class[] parameterTypes ) 1238 { 1239 try 1240 { 1241 Class type = Util.class.getClassLoader().loadClass( className ); 1242 1243 return invokeConstructor( type, parameters, parameterTypes ); 1244 } 1245 catch( ClassNotFoundException exception ) 1246 { 1247 throw new CascadingException( "unable to load class: " + className, exception ); 1248 } 1249 } 1250 1251 public static <T> T invokeConstructor( Class<T> target, Object[] parameters, Class[] parameterTypes ) 1252 { 1253 try 1254 { 1255 Constructor<T> constructor = target.getConstructor( parameterTypes ); 1256 1257 constructor.setAccessible( true ); 1258 1259 return constructor.newInstance( parameters ); 1260 } 1261 catch( Exception exception ) 1262 { 1263 throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception ); 1264 } 1265 } 1266 1267 public static <R> R returnInstanceFieldIfExists( Object target, String fieldName ) 1268 { 1269 try 1270 { 1271 Class<?> type = target.getClass(); 1272 Field field = getDeclaredField( fieldName, type ); 1273 1274 field.setAccessible( true ); 1275 1276 return (R) field.get( target ); 1277 } 1278 catch( Exception exception ) 1279 { 1280 throw new CascadingException( "unable to get instance field: " + target.getClass().getName() + "." + fieldName, exception ); 1281 } 1282 } 1283 1284 public static <R> boolean setInstanceFieldIfExistsSafe( Object target, String fieldName, R value ) 1285 { 1286 try 1287 { 1288 setInstanceFieldIfExists( target, fieldName, value ); 1289 } 1290 catch( Exception exception ) 1291 { 1292 return false; 1293 } 1294 1295 return true; 1296 } 1297 1298 public static <R> void setInstanceFieldIfExists( Object target, String fieldName, R value ) 1299 { 1300 try 1301 { 1302 Class<?> type = target.getClass(); 1303 Field field = getDeclaredField( fieldName, type ); 1304 1305 field.setAccessible( true ); 1306 1307 field.set( target, value ); 1308 } 1309 catch( Exception exception ) 1310 { 1311 throw new CascadingException( "unable to set instance field: " + target.getClass().getName() + "." + fieldName, exception ); 1312 } 1313 } 1314 1315 private static Field getDeclaredField( String fieldName, Class<?> type ) 1316 { 1317 if( type == Object.class ) 1318 { 1319 if( LOG.isDebugEnabled() ) 1320 LOG.debug( "did not find {} field on {}", fieldName, type.getName() ); 1321 1322 return null; 1323 } 1324 1325 try 1326 { 1327 return type.getDeclaredField( fieldName ); 1328 } 1329 catch( NoSuchFieldException exception ) 1330 { 1331 return getDeclaredField( fieldName, type.getSuperclass() ); 1332 } 1333 } 1334 1335 public static String makePath( String prefix, String name ) 1336 { 1337 if( name == null || name.isEmpty() ) 1338 throw new IllegalArgumentException( "name may not be null or empty " ); 1339 1340 if( prefix == null || prefix.isEmpty() ) 1341 prefix = Long.toString( (long) ( Math.random() * 10000000000L ) ); 1342 1343 name = cleansePathName( name.substring( 0, name.length() < 25 ? name.length() : 25 ) ); 1344 1345 return prefix + "/" + name + "/"; 1346 } 1347 1348 public static String cleansePathName( String name ) 1349 { 1350 return name.replaceAll( "\\s+|\\*|\\+|/+", "_" ); 1351 } 1352 1353 public static Class findMainClass( Class defaultType, String packageExclude ) 1354 { 1355 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); 1356 1357 for( StackTraceElement stackTraceElement : stackTrace ) 1358 { 1359 if( stackTraceElement.getMethodName().equals( "main" ) && !stackTraceElement.getClassName().startsWith( packageExclude ) ) 1360 { 1361 try 1362 { 1363 LOG.info( "resolving application jar from found main method on: {}", stackTraceElement.getClassName() ); 1364 1365 return Thread.currentThread().getContextClassLoader().loadClass( stackTraceElement.getClassName() ); 1366 } 1367 catch( ClassNotFoundException exception ) 1368 { 1369 LOG.warn( "unable to load class while discovering application jar: {}", stackTraceElement.getClassName(), exception ); 1370 } 1371 } 1372 } 1373 1374 LOG.info( "using default application jar, may cause class not found exceptions on the cluster" ); 1375 1376 return defaultType; 1377 } 1378 1379 public static String findContainingJar( Class<?> type ) 1380 { 1381 ClassLoader classLoader = type.getClassLoader(); 1382 1383 String classFile = type.getName().replaceAll( "\\.", "/" ) + ".class"; 1384 1385 try 1386 { 1387 for( Enumeration<URL> iterator = classLoader.getResources( classFile ); iterator.hasMoreElements(); ) 1388 { 1389 URL url = iterator.nextElement(); 1390 1391 if( !"jar".equals( url.getProtocol() ) ) 1392 continue; 1393 1394 String path = url.getPath(); 1395 1396 if( path.startsWith( "file:" ) ) 1397 path = path.substring( "file:".length() ); 1398 1399 path = URLDecoder.decode( path, "UTF-8" ); 1400 1401 return path.replaceAll( "!.*$", "" ); 1402 } 1403 } 1404 catch( IOException exception ) 1405 { 1406 throw new CascadingException( exception ); 1407 } 1408 1409 return null; 1410 } 1411 1412 public static boolean containsWhitespace( String string ) 1413 { 1414 return Pattern.compile( "\\s" ).matcher( string ).find(); 1415 } 1416 1417 public static String parseHostname( String uri ) 1418 { 1419 if( isEmpty( uri ) ) 1420 return null; 1421 1422 String[] parts = uri.split( "://", 2 ); 1423 String result; 1424 1425 // missing protocol 1426 result = parts[ parts.length - 1 ]; 1427 1428 // user:pass@hostname:port/stuff 1429 parts = result.split( "/", 2 ); 1430 result = parts[ 0 ]; 1431 1432 // user:pass@hostname:port 1433 parts = result.split( "@", 2 ); 1434 result = parts[ parts.length - 1 ]; 1435 1436 // hostname:port 1437 parts = result.split( ":", 2 ); 1438 result = parts[ 0 ]; 1439 1440 return result; 1441 } 1442 }