001/* 002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved. 003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 004 * 005 * Project and contact information: http://www.cascading.org/ 006 * 007 * This file is part of the Cascading project. 008 * 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 */ 021 022package cascading.util; 023 024import java.beans.Expression; 025import java.io.BufferedReader; 026import java.io.File; 027import java.io.FileWriter; 028import java.io.IOException; 029import java.io.InputStreamReader; 030import java.io.Writer; 031import java.lang.reflect.Constructor; 032import java.lang.reflect.Field; 033import java.lang.reflect.Method; 034import java.lang.reflect.Type; 035import java.net.URL; 036import java.net.URLDecoder; 037import java.security.MessageDigest; 038import java.security.NoSuchAlgorithmException; 039import java.util.ArrayList; 040import java.util.Arrays; 041import java.util.Collection; 042import java.util.Collections; 043import java.util.Enumeration; 044import java.util.HashMap; 045import java.util.HashSet; 046import java.util.IdentityHashMap; 047import java.util.Iterator; 048import java.util.LinkedHashSet; 049import java.util.List; 050import java.util.Map; 051import java.util.Set; 052import java.util.TreeSet; 053import java.util.UUID; 054import java.util.concurrent.Callable; 055import java.util.concurrent.ExecutionException; 056import java.util.concurrent.ExecutorService; 057import java.util.concurrent.Executors; 058import java.util.concurrent.Future; 059import java.util.concurrent.TimeUnit; 060import java.util.concurrent.TimeoutException; 061import java.util.regex.Pattern; 062 063import cascading.CascadingException; 064import cascading.flow.FlowException; 065import cascading.tap.MultiSourceTap; 066import cascading.tap.Tap; 067import cascading.tuple.coerce.Coercions; 068import cascading.util.jgrapht.ComponentAttributeProvider; 069import cascading.util.jgrapht.DOTExporter; 070import cascading.util.jgrapht.EdgeNameProvider; 071import cascading.util.jgrapht.IntegerNameProvider; 072import cascading.util.jgrapht.VertexNameProvider; 073import org.jgrapht.DirectedGraph; 074import org.slf4j.Logger; 075import org.slf4j.LoggerFactory; 076 077/** Class Util provides reusable operations. */ 078public class Util 079 { 080 /** 081 * On OS X only, and if the graphviz dot binary is installed, when true, dot will be invoked to convert the dot file 082 * to a pdf document. 083 */ 084 public static final String CONVERT_DOT_TO_PDF = "util.dot.to.pdf.enabled"; 085 public static int ID_LENGTH = 32; 086 087 private static final Logger LOG = LoggerFactory.getLogger( Util.class ); 088 private static final String HEXES = "0123456789ABCDEF"; 089 090 public static final boolean IS_OSX = System.getProperty( "os.name" ).toLowerCase().contains( "Mac OS X".toLowerCase() ); 091 public static final boolean HAS_DOT_EXEC = IS_OSX && Boolean.getBoolean( CONVERT_DOT_TO_PDF ) && hasDOT(); 092 093 public static <K, V> HashMap<K, V> createHashMap() 094 { 095 return new HashMap<K, V>(); 096 } 097 098 public static <K, V> boolean reverseMap( Map<V, K> from, Map<K, V> to ) 099 { 100 boolean dupes = false; 101 102 for( Map.Entry<V, K> entry : from.entrySet() ) 103 dupes |= to.put( entry.getValue(), entry.getKey() ) != null; 104 105 return dupes; 106 } 107 108 public static <V> Set<V> createIdentitySet() 109 { 110 return Collections.<V>newSetFromMap( new IdentityHashMap() ); 111 } 112 113 public static <V> Set<V> createIdentitySet( Collection<V> collection ) 114 { 115 Set<V> identitySet = createIdentitySet(); 116 117 if( collection != null ) 118 identitySet.addAll( collection ); 119 120 return identitySet; 121 } 122 123 public static <V> V getFirst( Collection<V> collection ) 124 { 125 if( collection == null || collection.isEmpty() ) 126 return null; 127 128 return collection.iterator().next(); 129 } 130 131 public static <V> V getFirst( Iterator<V> iterator ) 132 { 133 if( iterator == null || !iterator.hasNext() ) 134 return null; 135 136 return iterator.next(); 137 } 138 139 public static <V> V getLast( Iterator<V> iterator ) 140 { 141 if( iterator == null || !iterator.hasNext() ) 142 return null; 143 144 V v = iterator.next(); 145 146 while( iterator.hasNext() ) 147 v = iterator.next(); 148 149 return v; 150 } 151 152 public static <T> List<T> asList( T t, T[] ts ) 153 { 154 List<T> list = new ArrayList<>( 1 + ts.length ); 155 156 list.add( t ); 157 Collections.addAll( list, ts ); 158 159 return list; 160 } 161 162 public static <N extends Number> N max( Collection<N> collection ) 163 { 164 return new TreeSet<>( collection ).first(); 165 } 166 167 public static <N extends Number> N min( Collection<N> collection ) 168 { 169 return new TreeSet<>( collection ).last(); 170 } 171 172 public static <T> Set<T> narrowSet( Class<T> type, Collection collection ) 173 { 174 return narrowSet( type, collection.iterator() ); 175 } 176 177 public static <T> Set<T> narrowIdentitySet( Class<T> type, Collection collection ) 178 { 179 return narrowIdentitySet( type, collection.iterator() ); 180 } 181 182 public static <T> Set<T> narrowSet( Class<T> type, Collection collection, boolean include ) 183 { 184 return narrowSet( type, collection.iterator(), include ); 185 } 186 187 public static <T> Set<T> narrowIdentitySet( Class<T> type, Collection collection, boolean include ) 188 { 189 return narrowIdentitySet( type, collection.iterator(), include ); 190 } 191 192 public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator ) 193 { 194 return narrowSet( type, iterator, true ); 195 } 196 197 public static <T> Set<T> narrowIdentitySet( Class<T> type, Iterator iterator ) 198 { 199 return narrowIdentitySet( type, iterator, true ); 200 } 201 202 public static <T> Set<T> narrowSet( Class<T> type, Iterator iterator, boolean include ) 203 { 204 return narrowSetInternal( type, iterator, include, new HashSet<T>() ); 205 } 206 207 public static <T> Set<T> narrowIdentitySet( Class<T> type, Iterator iterator, boolean include ) 208 { 209 return narrowSetInternal( type, iterator, include, Util.<T>createIdentitySet() ); 210 } 211 212 private static <T> Set<T> narrowSetInternal( Class<T> type, Iterator iterator, boolean include, Set<T> set ) 213 { 214 while( iterator.hasNext() ) 215 { 216 Object o = iterator.next(); 217 218 if( type.isInstance( o ) == include ) 219 set.add( (T) o ); 220 } 221 222 return set; 223 } 224 225 public static <T> boolean contains( Class<T> type, Collection collection ) 226 { 227 return contains( type, collection.iterator() ); 228 } 229 230 public static <T> boolean contains( Class<T> type, Iterator iterator ) 231 { 232 while( iterator.hasNext() ) 233 { 234 Object o = iterator.next(); 235 236 if( type.isInstance( o ) ) 237 return true; 238 } 239 240 return false; 241 } 242 243 public static <T> Set<T> differenceIdentity( Set<T> lhs, Set<T> rhs ) 244 { 245 Set<T> diff = createIdentitySet( lhs ); 246 247 diff.removeAll( rhs ); 248 249 return diff; 250 } 251 252 public static synchronized String createUniqueIDWhichStartsWithAChar() 253 { 254 String value; 255 256 do 257 { 258 value = createUniqueID(); 259 } 260 while( Character.isDigit( value.charAt( 0 ) ) ); 261 262 return value; 263 } 264 265 public static synchronized String createUniqueID() 266 { 267 // creates a cryptographically secure random value 268 String value = UUID.randomUUID().toString(); 269 return value.toUpperCase().replaceAll( "-", "" ); 270 } 271 272 public static String createID( String rawID ) 273 { 274 return createID( rawID.getBytes() ); 275 } 276 277 /** 278 * Method CreateID returns a HEX hash of the given bytes with length 32 characters long. 279 * 280 * @param bytes the bytes 281 * @return string 282 */ 283 public static String createID( byte[] bytes ) 284 { 285 try 286 { 287 return getHex( MessageDigest.getInstance( "MD5" ).digest( bytes ) ); 288 } 289 catch( NoSuchAlgorithmException exception ) 290 { 291 throw new RuntimeException( "unable to digest string" ); 292 } 293 } 294 295 public static String getHex( byte[] bytes ) 296 { 297 if( bytes == null ) 298 return null; 299 300 final StringBuilder hex = new StringBuilder( 2 * bytes.length ); 301 302 for( final byte b : bytes ) 303 hex.append( HEXES.charAt( ( b & 0xF0 ) >> 4 ) ).append( HEXES.charAt( b & 0x0F ) ); 304 305 return hex.toString(); 306 } 307 308 public static byte[] longToByteArray( long value ) 309 { 310 return new byte[]{ 311 (byte) ( value >> 56 ), 312 (byte) ( value >> 48 ), 313 (byte) ( value >> 40 ), 314 (byte) ( value >> 32 ), 315 (byte) ( value >> 24 ), 316 (byte) ( value >> 16 ), 317 (byte) ( value >> 8 ), 318 (byte) value 319 }; 320 } 321 322 public static byte[] intToByteArray( int value ) 323 { 324 return new byte[]{ 325 (byte) ( value >> 24 ), 326 (byte) ( value >> 16 ), 327 (byte) ( value >> 8 ), 328 (byte) value 329 }; 330 } 331 332 public static <T> T[] copy( T[] source ) 333 { 334 if( source == null ) 335 return null; 336 337 return Arrays.copyOf( source, source.length ); 338 } 339 340 public static String unique( String value, String delim ) 341 { 342 String[] split = value.split( delim ); 343 344 Set<String> values = new LinkedHashSet<String>(); 345 346 Collections.addAll( values, split ); 347 348 return join( values, delim ); 349 } 350 351 /** 352 * This method joins the values in the given list with the delim String value. 353 * 354 * @param list 355 * @param delim 356 * @return String 357 */ 358 public static String join( int[] list, String delim ) 359 { 360 return join( list, delim, false ); 361 } 362 363 public static String join( int[] list, String delim, boolean printNull ) 364 { 365 StringBuffer buffer = new StringBuffer(); 366 int count = 0; 367 368 for( Object s : list ) 369 { 370 if( count != 0 ) 371 buffer.append( delim ); 372 373 if( printNull || s != null ) 374 buffer.append( s ); 375 376 count++; 377 } 378 379 return buffer.toString(); 380 } 381 382 public static String join( String delim, String... strings ) 383 { 384 return join( delim, false, strings ); 385 } 386 387 public static String join( String delim, boolean printNull, String... strings ) 388 { 389 return join( strings, delim, printNull ); 390 } 391 392 /** 393 * This method joins the values in the given list with the delim String value. 394 * 395 * @param list 396 * @param delim 397 * @return a String 398 */ 399 public static String join( Object[] list, String delim ) 400 { 401 return join( list, delim, false ); 402 } 403 404 public static String join( Object[] list, String delim, boolean printNull ) 405 { 406 return join( list, delim, printNull, 0 ); 407 } 408 409 public static String join( Object[] list, String delim, boolean printNull, int beginAt ) 410 { 411 return join( list, delim, printNull, beginAt, list.length - beginAt ); 412 } 413 414 public static String join( Object[] list, String delim, boolean printNull, int beginAt, int length ) 415 { 416 StringBuffer buffer = new StringBuffer(); 417 int count = 0; 418 419 for( int i = beginAt; i < beginAt + length; i++ ) 420 { 421 Object s = list[ i ]; 422 if( count != 0 ) 423 buffer.append( delim ); 424 425 if( printNull || s != null ) 426 buffer.append( s ); 427 428 count++; 429 } 430 431 return buffer.toString(); 432 } 433 434 public static String join( Iterable iterable, String delim, boolean printNull ) 435 { 436 int count = 0; 437 438 StringBuilder buffer = new StringBuilder(); 439 440 for( Object s : iterable ) 441 { 442 if( count != 0 ) 443 buffer.append( delim ); 444 445 if( printNull || s != null ) 446 buffer.append( s ); 447 448 count++; 449 } 450 451 return buffer.toString(); 452 } 453 454 /** 455 * This method joins each value in the collection with a tab character as the delimiter. 456 * 457 * @param collection 458 * @return a String 459 */ 460 public static String join( Collection collection ) 461 { 462 return join( collection, "\t" ); 463 } 464 465 /** 466 * This method joins each valuein the collection with the given delimiter. 467 * 468 * @param collection 469 * @param delim 470 * @return a String 471 */ 472 public static String join( Collection collection, String delim ) 473 { 474 return join( collection, delim, false ); 475 } 476 477 public static String join( Collection collection, String delim, boolean printNull ) 478 { 479 StringBuffer buffer = new StringBuffer(); 480 481 join( buffer, collection, delim, printNull ); 482 483 return buffer.toString(); 484 } 485 486 /** 487 * This method joins each value in the collection with the given delimiter. All results are appended to the 488 * given {@link StringBuffer} instance. 489 * 490 * @param buffer 491 * @param collection 492 * @param delim 493 */ 494 public static void join( StringBuffer buffer, Collection collection, String delim ) 495 { 496 join( buffer, collection, delim, false ); 497 } 498 499 public static void join( StringBuffer buffer, Collection collection, String delim, boolean printNull ) 500 { 501 int count = 0; 502 503 for( Object s : collection ) 504 { 505 if( count != 0 ) 506 buffer.append( delim ); 507 508 if( printNull || s != null ) 509 buffer.append( s ); 510 511 count++; 512 } 513 } 514 515 public static <T> List<T> split( Class<T> type, String values ) 516 { 517 return split( type, ",", values ); 518 } 519 520 public static <T> List<T> split( Class<T> type, String delim, String values ) 521 { 522 List<T> results = new ArrayList<>(); 523 524 if( values == null ) 525 return results; 526 527 String[] split = values.split( delim ); 528 529 for( String value : split ) 530 results.add( Coercions.<T>coerce( value, type ) ); 531 532 return results; 533 } 534 535 public static String[] removeNulls( String... strings ) 536 { 537 List<String> list = new ArrayList<String>(); 538 539 for( String string : strings ) 540 { 541 if( string != null ) 542 list.add( string ); 543 } 544 545 return list.toArray( new String[ list.size() ] ); 546 } 547 548 public static Collection<String> quote( Collection<String> collection, String quote ) 549 { 550 List<String> list = new ArrayList<String>(); 551 552 for( String string : collection ) 553 list.add( quote + string + quote ); 554 555 return list; 556 } 557 558 public static String print( Collection collection, String delim ) 559 { 560 StringBuffer buffer = new StringBuffer(); 561 562 print( buffer, collection, delim ); 563 564 return buffer.toString(); 565 } 566 567 public static void print( StringBuffer buffer, Collection collection, String delim ) 568 { 569 int count = 0; 570 571 for( Object s : collection ) 572 { 573 if( count != 0 ) 574 buffer.append( delim ); 575 576 buffer.append( "[" ); 577 buffer.append( s ); 578 buffer.append( "]" ); 579 580 count++; 581 } 582 } 583 584 /** 585 * This method attempts to remove any username and password from the given url String. 586 * 587 * @param url 588 * @return a String 589 */ 590 public static String sanitizeUrl( String url ) 591 { 592 if( url == null ) 593 return null; 594 595 return url.replaceAll( "(?<=//).*:.*@", "" ); 596 } 597 598 /** 599 * This method attempts to remove duplicate consecutive forward slashes from the given url. 600 * 601 * @param url 602 * @return a String 603 */ 604 public static String normalizeUrl( String url ) 605 { 606 if( url == null ) 607 return null; 608 609 return url.replaceAll( "([^:]/)/{2,}", "$1/" ); 610 } 611 612 /** 613 * This method returns the {@link Object#toString()} of the given object, or an empty String if the object 614 * is null. 615 * 616 * @param object 617 * @return a String 618 */ 619 public static String toNull( Object object ) 620 { 621 if( object == null ) 622 return ""; 623 624 return object.toString(); 625 } 626 627 /** 628 * This method truncates the given String value to the given size, but appends an ellipse ("...") if the 629 * String is larger than maxSize. 630 * 631 * @param string 632 * @param maxSize 633 * @return a String 634 */ 635 public static String truncate( String string, int maxSize ) 636 { 637 string = toNull( string ); 638 639 if( string.length() <= maxSize ) 640 return string; 641 642 return String.format( "%s...", string.subSequence( 0, maxSize - 3 ) ); 643 } 644 645 public static void printGraph( String filename, DirectedGraph graph ) 646 { 647 try 648 { 649 new File( filename ).getParentFile().mkdirs(); 650 Writer writer = new FileWriter( filename ); 651 652 try 653 { 654 printGraph( writer, graph ); 655 } 656 finally 657 { 658 writer.close(); 659 } 660 } 661 catch( IOException exception ) 662 { 663 LOG.error( "failed printing graph to {}, with exception: {}", filename, exception ); 664 } 665 } 666 667 @SuppressWarnings({"unchecked"}) 668 private static void printGraph( Writer writer, DirectedGraph graph ) 669 { 670 DOTExporter dot = new DOTExporter( new IntegerNameProvider(), new VertexNameProvider() 671 { 672 public String getVertexName( Object object ) 673 { 674 if( object == null ) 675 return "none"; 676 677 return object.toString().replaceAll( "\"", "\'" ); 678 } 679 }, new EdgeNameProvider<Object>() 680 { 681 public String getEdgeName( Object object ) 682 { 683 if( object == null ) 684 return "none"; 685 686 return object.toString().replaceAll( "\"", "\'" ); 687 } 688 } 689 ); 690 691 dot.export( writer, graph ); 692 } 693 694 /** 695 * This method removes all nulls from the given List. 696 * 697 * @param list 698 */ 699 @SuppressWarnings({"StatementWithEmptyBody"}) 700 public static void removeAllNulls( List list ) 701 { 702 while( list.remove( null ) ) 703 ; 704 } 705 706 public static void writeDOT( Writer writer, DirectedGraph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider ) 707 { 708 new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider ).export( writer, graph ); 709 } 710 711 public static void writeDOT( Writer writer, DirectedGraph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider, 712 ComponentAttributeProvider vertexAttributeProvider, ComponentAttributeProvider edgeAttributeProvider ) 713 { 714 new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider, vertexAttributeProvider, edgeAttributeProvider ).export( writer, graph ); 715 } 716 717 public static boolean isEmpty( String string ) 718 { 719 return string == null || string.isEmpty(); 720 } 721 722 private static String[] findSplitName( String path ) 723 { 724 String separator = "/"; 725 726 if( path.lastIndexOf( "/" ) < path.lastIndexOf( "\\" ) ) 727 separator = "\\\\"; 728 729 String[] split = path.split( separator ); 730 731 path = split[ split.length - 1 ]; 732 733 path = path.substring( 0, path.lastIndexOf( '.' ) ); // remove .jar 734 735 return path.split( "-(?=\\d)", 2 ); 736 } 737 738 public static String findVersion( String path ) 739 { 740 if( path == null || path.isEmpty() ) 741 return null; 742 743 String[] split = findSplitName( path ); 744 745 if( split.length == 2 ) 746 return split[ 1 ]; 747 748 return null; 749 } 750 751 public static String findName( String path ) 752 { 753 if( path == null || path.isEmpty() ) 754 return null; 755 756 String[] split = findSplitName( path ); 757 758 if( split.length == 0 ) 759 return null; 760 761 return split[ 0 ]; 762 } 763 764 public static long getSourceModified( Object confCopy, Iterator<Tap> values, long sinkModified ) throws IOException 765 { 766 long sourceModified = 0; 767 768 while( values.hasNext() ) 769 { 770 Tap source = values.next(); 771 772 if( source instanceof MultiSourceTap ) 773 return getSourceModified( confCopy, ( (MultiSourceTap) source ).getChildTaps(), sinkModified ); 774 775 sourceModified = source.getModifiedTime( confCopy ); 776 777 // source modified returns zero if does not exist 778 // this should minimize number of times we touch any file meta-data server 779 if( sourceModified == 0 && !source.resourceExists( confCopy ) ) 780 throw new FlowException( "source does not exist: " + source ); 781 782 if( sinkModified < sourceModified ) 783 return sourceModified; 784 } 785 786 return sourceModified; 787 } 788 789 public static long getSinkModified( Object config, Collection<Tap> sinks ) throws IOException 790 { 791 long sinkModified = Long.MAX_VALUE; 792 793 for( Tap sink : sinks ) 794 { 795 if( sink.isReplace() || sink.isUpdate() ) 796 sinkModified = -1L; 797 else 798 { 799 if( !sink.resourceExists( config ) ) 800 sinkModified = 0L; 801 else 802 sinkModified = Math.min( sinkModified, sink.getModifiedTime( config ) ); // return youngest mod date 803 } 804 } 805 806 return sinkModified; 807 } 808 809 public static String getTypeName( Type type ) 810 { 811 if( type == null ) 812 return null; 813 814 return type instanceof Class ? ( (Class) type ).getCanonicalName() : type.toString(); 815 } 816 817 public static String getSimpleTypeName( Type type ) 818 { 819 if( type == null ) 820 return null; 821 822 return type instanceof Class ? ( (Class) type ).getSimpleName() : type.toString(); 823 } 824 825 public static String[] typeNames( Type[] types ) 826 { 827 String[] names = new String[ types.length ]; 828 829 for( int i = 0; i < types.length; i++ ) 830 names[ i ] = getTypeName( types[ i ] ); 831 832 return names; 833 } 834 835 public static String[] simpleTypeNames( Type[] types ) 836 { 837 String[] names = new String[ types.length ]; 838 839 for( int i = 0; i < types.length; i++ ) 840 names[ i ] = getSimpleTypeName( types[ i ] ); 841 842 return names; 843 } 844 845 public static boolean containsNull( Object[] values ) 846 { 847 for( Object value : values ) 848 { 849 if( value == null ) 850 return true; 851 } 852 853 return false; 854 } 855 856 public static void safeSleep( long durationMillis ) 857 { 858 try 859 { 860 Thread.sleep( durationMillis ); 861 } 862 catch( InterruptedException exception ) 863 { 864 // do nothing 865 } 866 } 867 868 public static void writePDF( String path ) 869 { 870 if( !HAS_DOT_EXEC ) 871 return; 872 873 // dot *.dot -Tpdf -O -Nshape=box 874 File file = new File( path ); 875 execProcess( file.getParentFile(), "dot", file.getName(), "-Tpdf", "-O" ); 876 } 877 878 static boolean hasDOT() 879 { 880 return execProcess( null, "which", "dot" ) == 0; 881 } 882 883 public static int execProcess( File parentFile, String... command ) 884 { 885 try 886 { 887 String commandLine = join( command, " " ); 888 889 LOG.debug( "command: {}", commandLine ); 890 891 Process process = Runtime.getRuntime().exec( commandLine, null, parentFile ); 892 893 int result = process.waitFor(); 894 895 BufferedReader reader = new BufferedReader( new InputStreamReader( process.getInputStream() ) ); 896 897 String line = reader.readLine(); 898 899 while( line != null ) 900 { 901 LOG.warn( "{} stdout returned: {}", command[ 0 ], line ); 902 line = reader.readLine(); 903 } 904 905 reader = new BufferedReader( new InputStreamReader( process.getErrorStream() ) ); 906 907 line = reader.readLine(); 908 909 while( line != null ) 910 { 911 LOG.warn( "{} stderr returned: {}", command[ 0 ], line ); 912 line = reader.readLine(); 913 } 914 915 return result; 916 } 917 catch( IOException exception ) 918 { 919 LOG.warn( "unable to exec " + command[ 0 ], exception ); 920 } 921 catch( InterruptedException exception ) 922 { 923 LOG.warn( "interrupted exec " + command[ 0 ], exception ); 924 } 925 926 return Integer.MIN_VALUE; 927 } 928 929 public static String formatDurationFromMillis( long duration ) 930 { 931 if( duration / 1000 / 60 / 60 / 24 > 0.0 ) 932 return formatDurationDHMSms( duration ); 933 if( duration / 1000 / 60 / 60 > 0.0 ) 934 return formatDurationHMSms( duration ); 935 else 936 return formatDurationMSms( duration ); 937 } 938 939 public static String formatDurationMSms( long duration ) 940 { 941 long ms = duration % 1000; 942 long durationSeconds = duration / 1000; 943 long seconds = durationSeconds % 60; 944 long minutes = durationSeconds / 60; 945 946 return String.format( "%02d:%02d.%03d", minutes, seconds, ms ); 947 } 948 949 public static String formatDurationHMSms( long duration ) 950 { 951 long ms = duration % 1000; 952 long durationSeconds = duration / 1000; 953 long seconds = durationSeconds % 60; 954 long minutes = ( durationSeconds / 60 ) % 60; 955 long hours = durationSeconds / 60 / 60; 956 957 return String.format( "%02d:%02d:%02d.%03d", hours, minutes, seconds, ms ); 958 } 959 960 public static String formatDurationDHMSms( long duration ) 961 { 962 long ms = duration % 1000; 963 long durationSeconds = duration / 1000; 964 long seconds = durationSeconds % 60; 965 long minutes = ( durationSeconds / 60 ) % 60; 966 long hours = ( durationSeconds / 60 / 60 ) % 24; 967 long days = durationSeconds / 60 / 60 / 24; 968 969 return String.format( "%02d:%02d:%02d:%02d.%03d", days, hours, minutes, seconds, ms ); 970 } 971 972 /** 973 * Converts a given comma separated String of Exception names into a List of classes. 974 * ClassNotFound exceptions are ignored if no warningMessage is given, otherwise logged as a warning. 975 * 976 * @param classNames A comma separated String of Exception names. 977 * @return List of Exception classes. 978 */ 979 public static Set<Class<? extends Exception>> asClasses( String classNames, String warningMessage ) 980 { 981 Set<Class<? extends Exception>> exceptionClasses = new HashSet<Class<? extends Exception>>(); 982 String[] split = classNames.split( "," ); 983 984 // possibly user provided type, load from context 985 ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); 986 987 for( String className : split ) 988 { 989 if( className != null ) 990 className = className.trim(); 991 992 if( isEmpty( className ) ) 993 continue; 994 995 try 996 { 997 Class<? extends Exception> exceptionClass = contextClassLoader.loadClass( className ).asSubclass( Exception.class ); 998 999 exceptionClasses.add( exceptionClass ); 1000 } 1001 catch( ClassNotFoundException exception ) 1002 { 1003 if( !Util.isEmpty( warningMessage ) ) 1004 LOG.warn( "{}: {}", warningMessage, className ); 1005 } 1006 } 1007 1008 return exceptionClasses; 1009 } 1010 1011 public static Boolean submitWithTimeout( Callable<Boolean> task, int timeout, TimeUnit timeUnit ) throws Exception 1012 { 1013 ExecutorService executor = Executors.newFixedThreadPool( 1 ); 1014 1015 Future<Boolean> future = executor.submit( task ); 1016 1017 executor.shutdown(); 1018 1019 try 1020 { 1021 return future.get( timeout, timeUnit ); 1022 } 1023 catch( TimeoutException exception ) 1024 { 1025 future.cancel( true ); 1026 } 1027 catch( ExecutionException exception ) 1028 { 1029 Throwable cause = exception.getCause(); 1030 1031 if( cause instanceof RuntimeException ) 1032 throw (RuntimeException) cause; 1033 1034 throw (Exception) cause; 1035 } 1036 1037 return null; 1038 } 1039 1040 public interface RetryOperator<T> 1041 { 1042 T operate() throws Exception; 1043 1044 boolean rethrow( Exception exception ); 1045 } 1046 1047 public static <T> T retry( Logger logger, int retries, int secondsDelay, String message, RetryOperator<T> operator ) throws Exception 1048 { 1049 Exception saved = null; 1050 1051 for( int i = 0; i < retries; i++ ) 1052 { 1053 try 1054 { 1055 return operator.operate(); 1056 } 1057 catch( Exception exception ) 1058 { 1059 if( operator.rethrow( exception ) ) 1060 { 1061 logger.warn( message + ", but not retrying", exception ); 1062 1063 throw exception; 1064 } 1065 1066 saved = exception; 1067 1068 logger.warn( message + ", attempt: " + ( i + 1 ), exception ); 1069 1070 try 1071 { 1072 Thread.sleep( secondsDelay * 1000 ); 1073 } 1074 catch( InterruptedException exception1 ) 1075 { 1076 // do nothing 1077 } 1078 } 1079 } 1080 1081 logger.warn( message + ", done retrying after attempts: " + retries, saved ); 1082 1083 throw saved; 1084 } 1085 1086 public static Object createProtectedObject( Class type, Object[] parameters, Class[] parameterTypes ) 1087 { 1088 try 1089 { 1090 Constructor constructor = type.getDeclaredConstructor( parameterTypes ); 1091 1092 constructor.setAccessible( true ); 1093 1094 return constructor.newInstance( parameters ); 1095 } 1096 catch( Exception exception ) 1097 { 1098 LOG.error( "unable to instantiate type: {}, with exception: {}", type.getName(), exception ); 1099 1100 throw new FlowException( "unable to instantiate type: " + type.getName(), exception ); 1101 } 1102 } 1103 1104 public static boolean hasClass( String typeString ) 1105 { 1106 try 1107 { 1108 Util.class.getClassLoader().loadClass( typeString ); 1109 1110 return true; 1111 } 1112 catch( ClassNotFoundException exception ) 1113 { 1114 return false; 1115 } 1116 } 1117 1118 public static <T> T newInstance( String className, Object... parameters ) 1119 { 1120 try 1121 { 1122 Class<T> type = (Class<T>) Util.class.getClassLoader().loadClass( className ); 1123 1124 return newInstance( type, parameters ); 1125 } 1126 catch( ClassNotFoundException exception ) 1127 { 1128 throw new CascadingException( "unable to load class: " + className, exception ); 1129 } 1130 } 1131 1132 public static <T> T newInstance( Class<T> target, Object... parameters ) 1133 { 1134 // using Expression makes sure that constructors using sub-types properly work, otherwise we get a 1135 // NoSuchMethodException. 1136 Expression expr = new Expression( target, "new", parameters ); 1137 1138 try 1139 { 1140 return (T) expr.getValue(); 1141 } 1142 catch( Exception exception ) 1143 { 1144 throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception ); 1145 } 1146 } 1147 1148 public static Object invokeStaticMethod( String typeString, String methodName, Object[] parameters, Class[] parameterTypes ) 1149 { 1150 Class type = loadClass( typeString ); 1151 1152 return invokeStaticMethod( type, methodName, parameters, parameterTypes ); 1153 } 1154 1155 public static Class<?> loadClass( String typeString ) 1156 { 1157 try 1158 { 1159 return Thread.currentThread().getContextClassLoader().loadClass( typeString ); 1160 } 1161 catch( ClassNotFoundException exception ) 1162 { 1163 throw new CascadingException( "unable to load class: " + typeString, exception ); 1164 } 1165 } 1166 1167 public static Class<?> loadClassSafe( String typeString ) 1168 { 1169 try 1170 { 1171 return Thread.currentThread().getContextClassLoader().loadClass( typeString ); 1172 } 1173 catch( ClassNotFoundException exception ) 1174 { 1175 return null; 1176 } 1177 } 1178 1179 public static Object invokeStaticMethod( Class type, String methodName, Object[] parameters, Class[] parameterTypes ) 1180 { 1181 try 1182 { 1183 Method method = type.getDeclaredMethod( methodName, parameterTypes ); 1184 1185 method.setAccessible( true ); 1186 1187 return method.invoke( null, parameters ); 1188 } 1189 catch( Exception exception ) 1190 { 1191 throw new CascadingException( "unable to invoke static method: " + type.getName() + "." + methodName, exception ); 1192 } 1193 } 1194 1195 public static boolean hasInstanceMethod( Object target, String methodName, Class[] parameterTypes ) 1196 { 1197 try 1198 { 1199 return target.getClass().getMethod( methodName, parameterTypes ) != null; 1200 } 1201 catch( NoSuchMethodException exception ) 1202 { 1203 return false; 1204 } 1205 } 1206 1207 public static Object invokeInstanceMethodSafe( Object target, String methodName, Object[] parameters, Class[] parameterTypes ) 1208 { 1209 try 1210 { 1211 return invokeInstanceMethod( target, methodName, parameters, parameterTypes ); 1212 } 1213 catch( Exception exception ) 1214 { 1215 return null; 1216 } 1217 } 1218 1219 public static Object invokeInstanceMethod( Object target, String methodName, Object[] parameters, Class[] parameterTypes ) 1220 { 1221 try 1222 { 1223 Method method = target.getClass().getMethod( methodName, parameterTypes ); 1224 1225 method.setAccessible( true ); 1226 1227 return method.invoke( target, parameters ); 1228 } 1229 catch( Exception exception ) 1230 { 1231 throw new CascadingException( "unable to invoke instance method: " + target.getClass().getName() + "." + methodName, exception ); 1232 } 1233 } 1234 1235 public static <R> R returnInstanceFieldIfExistsSafe( Object target, String fieldName ) 1236 { 1237 try 1238 { 1239 return returnInstanceFieldIfExists( target, fieldName ); 1240 } 1241 catch( Exception exception ) 1242 { 1243 // do nothing 1244 return null; 1245 } 1246 } 1247 1248 public static Object invokeConstructor( String className, Object[] parameters, Class[] parameterTypes ) 1249 { 1250 try 1251 { 1252 Class type = Util.class.getClassLoader().loadClass( className ); 1253 1254 return invokeConstructor( type, parameters, parameterTypes ); 1255 } 1256 catch( ClassNotFoundException exception ) 1257 { 1258 throw new CascadingException( "unable to load class: " + className, exception ); 1259 } 1260 } 1261 1262 public static <T> T invokeConstructor( Class<T> target, Object[] parameters, Class[] parameterTypes ) 1263 { 1264 try 1265 { 1266 Constructor<T> constructor = target.getConstructor( parameterTypes ); 1267 1268 constructor.setAccessible( true ); 1269 1270 return constructor.newInstance( parameters ); 1271 } 1272 catch( Exception exception ) 1273 { 1274 throw new CascadingException( "unable to create new instance: " + target.getName() + "(" + Arrays.toString( parameters ) + ")", exception ); 1275 } 1276 } 1277 1278 public static <R> R returnInstanceFieldIfExists( Object target, String fieldName ) 1279 { 1280 try 1281 { 1282 Class<?> type = target.getClass(); 1283 Field field = getDeclaredField( fieldName, type ); 1284 1285 field.setAccessible( true ); 1286 1287 return (R) field.get( target ); 1288 } 1289 catch( Exception exception ) 1290 { 1291 throw new CascadingException( "unable to get instance field: " + target.getClass().getName() + "." + fieldName, exception ); 1292 } 1293 } 1294 1295 public static <R> boolean setInstanceFieldIfExistsSafe( Object target, String fieldName, R value ) 1296 { 1297 try 1298 { 1299 setInstanceFieldIfExists( target, fieldName, value ); 1300 } 1301 catch( Exception exception ) 1302 { 1303 return false; 1304 } 1305 1306 return true; 1307 } 1308 1309 public static <R> void setInstanceFieldIfExists( Object target, String fieldName, R value ) 1310 { 1311 try 1312 { 1313 Class<?> type = target.getClass(); 1314 Field field = getDeclaredField( fieldName, type ); 1315 1316 field.setAccessible( true ); 1317 1318 field.set( target, value ); 1319 } 1320 catch( Exception exception ) 1321 { 1322 throw new CascadingException( "unable to set instance field: " + target.getClass().getName() + "." + fieldName, exception ); 1323 } 1324 } 1325 1326 private static Field getDeclaredField( String fieldName, Class<?> type ) 1327 { 1328 if( type == Object.class ) 1329 { 1330 if( LOG.isDebugEnabled() ) 1331 LOG.debug( "did not find {} field on {}", fieldName, type.getName() ); 1332 1333 return null; 1334 } 1335 1336 try 1337 { 1338 return type.getDeclaredField( fieldName ); 1339 } 1340 catch( NoSuchFieldException exception ) 1341 { 1342 return getDeclaredField( fieldName, type.getSuperclass() ); 1343 } 1344 } 1345 1346 public static String makePath( String prefix, String name ) 1347 { 1348 if( name == null || name.isEmpty() ) 1349 throw new IllegalArgumentException( "name may not be null or empty " ); 1350 1351 if( prefix == null || prefix.isEmpty() ) 1352 prefix = Long.toString( (long) ( Math.random() * 10000000000L ) ); 1353 1354 name = cleansePathName( name.substring( 0, name.length() < 25 ? name.length() : 25 ) ); 1355 1356 return prefix + "/" + name + "/"; 1357 } 1358 1359 public static String cleansePathName( String name ) 1360 { 1361 return name.replaceAll( "\\s+|\\*|\\+|/+", "_" ); 1362 } 1363 1364 public static Class findMainClass( Class defaultType, String packageExclude ) 1365 { 1366 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); 1367 1368 for( StackTraceElement stackTraceElement : stackTrace ) 1369 { 1370 if( stackTraceElement.getMethodName().equals( "main" ) && !stackTraceElement.getClassName().startsWith( packageExclude ) ) 1371 { 1372 try 1373 { 1374 LOG.info( "resolving application jar from found main method on: {}", stackTraceElement.getClassName() ); 1375 1376 return Thread.currentThread().getContextClassLoader().loadClass( stackTraceElement.getClassName() ); 1377 } 1378 catch( ClassNotFoundException exception ) 1379 { 1380 LOG.warn( "unable to load class while discovering application jar: {}", stackTraceElement.getClassName(), exception ); 1381 } 1382 } 1383 } 1384 1385 LOG.info( "using default application jar, may cause class not found exceptions on the cluster" ); 1386 1387 return defaultType; 1388 } 1389 1390 public static String findContainingJar( Class<?> type ) 1391 { 1392 ClassLoader classLoader = type.getClassLoader(); 1393 1394 String classFile = type.getName().replaceAll( "\\.", "/" ) + ".class"; 1395 1396 try 1397 { 1398 for( Enumeration<URL> iterator = classLoader.getResources( classFile ); iterator.hasMoreElements(); ) 1399 { 1400 URL url = iterator.nextElement(); 1401 1402 if( !"jar".equals( url.getProtocol() ) ) 1403 continue; 1404 1405 String path = url.getPath(); 1406 1407 if( path.startsWith( "file:" ) ) 1408 path = path.substring( "file:".length() ); 1409 1410 path = URLDecoder.decode( path, "UTF-8" ); 1411 1412 return path.replaceAll( "!.*$", "" ); 1413 } 1414 } 1415 catch( IOException exception ) 1416 { 1417 throw new CascadingException( exception ); 1418 } 1419 1420 return null; 1421 } 1422 1423 public static boolean containsWhitespace( String string ) 1424 { 1425 return Pattern.compile( "\\s" ).matcher( string ).find(); 1426 } 1427 1428 public static String parseHostname( String uri ) 1429 { 1430 if( isEmpty( uri ) ) 1431 return null; 1432 1433 String[] parts = uri.split( "://", 2 ); 1434 String result; 1435 1436 // missing protocol 1437 result = parts[ parts.length - 1 ]; 1438 1439 // user:pass@hostname:port/stuff 1440 parts = result.split( "/", 2 ); 1441 result = parts[ 0 ]; 1442 1443 // user:pass@hostname:port 1444 parts = result.split( "@", 2 ); 1445 result = parts[ parts.length - 1 ]; 1446 1447 // hostname:port 1448 parts = result.split( ":", 2 ); 1449 result = parts[ 0 ]; 1450 1451 return result; 1452 } 1453 }