001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.util;
022    
023    import java.io.FileWriter;
024    import java.io.IOException;
025    import java.io.PrintStream;
026    import java.io.PrintWriter;
027    import java.io.StringWriter;
028    import java.io.Writer;
029    import java.lang.reflect.Constructor;
030    import java.lang.reflect.Method;
031    import java.lang.reflect.Type;
032    import java.security.MessageDigest;
033    import java.security.NoSuchAlgorithmException;
034    import java.util.ArrayList;
035    import java.util.Arrays;
036    import java.util.Collection;
037    import java.util.Collections;
038    import java.util.Iterator;
039    import java.util.LinkedHashSet;
040    import java.util.List;
041    import java.util.Set;
042    import java.util.UUID;
043    
044    import cascading.CascadingException;
045    import cascading.flow.FlowElement;
046    import cascading.flow.FlowException;
047    import cascading.flow.planner.Scope;
048    import cascading.operation.BaseOperation;
049    import cascading.operation.Operation;
050    import cascading.pipe.Pipe;
051    import cascading.scheme.Scheme;
052    import cascading.tap.MultiSourceTap;
053    import cascading.tap.Tap;
054    import org.jgrapht.ext.DOTExporter;
055    import org.jgrapht.ext.EdgeNameProvider;
056    import org.jgrapht.ext.IntegerNameProvider;
057    import org.jgrapht.ext.MatrixExporter;
058    import org.jgrapht.ext.VertexNameProvider;
059    import org.jgrapht.graph.SimpleDirectedGraph;
060    import org.slf4j.Logger;
061    import org.slf4j.LoggerFactory;
062    
063    /** Class Util provides reusable operations. */
064    public class Util
065      {
066      public static int ID_LENGTH = 32;
067    
068      private static final Logger LOG = LoggerFactory.getLogger( Util.class );
069      private static final String HEXES = "0123456789ABCDEF";
070    
071      public static synchronized String createUniqueID()
072        {
073        // creates a cryptographically secure random value
074        String value = UUID.randomUUID().toString();
075        return value.toUpperCase().replaceAll( "-", "" );
076        }
077    
078      public static String createID( String rawID )
079        {
080        return createID( rawID.getBytes() );
081        }
082    
083      /**
084       * Method CreateID returns a HEX hash of the given bytes with length 32 characters long.
085       *
086       * @param bytes the bytes
087       * @return string
088       */
089      public static String createID( byte[] bytes )
090        {
091        try
092          {
093          return getHex( MessageDigest.getInstance( "MD5" ).digest( bytes ) );
094          }
095        catch( NoSuchAlgorithmException exception )
096          {
097          throw new RuntimeException( "unable to digest string" );
098          }
099        }
100    
101      private static String getHex( byte[] bytes )
102        {
103        if( bytes == null )
104          return null;
105    
106        final StringBuilder hex = new StringBuilder( 2 * bytes.length );
107    
108        for( final byte b : bytes )
109          hex.append( HEXES.charAt( ( b & 0xF0 ) >> 4 ) ).append( HEXES.charAt( b & 0x0F ) );
110    
111        return hex.toString();
112        }
113    
114      public static <T> T[] copy( T[] source )
115        {
116        if( source == null )
117          return null;
118    
119        return Arrays.copyOf( source, source.length );
120        }
121    
122      public static String unique( String value, String delim )
123        {
124        String[] split = value.split( delim );
125    
126        Set<String> values = new LinkedHashSet<String>();
127    
128        Collections.addAll( values, split );
129    
130        return join( values, delim );
131        }
132    
133      /**
134       * This method joins the values in the given list with the delim String value.
135       *
136       * @param list
137       * @param delim
138       * @return String
139       */
140      public static String join( int[] list, String delim )
141        {
142        return join( list, delim, false );
143        }
144    
145      public static String join( int[] list, String delim, boolean printNull )
146        {
147        StringBuffer buffer = new StringBuffer();
148        int count = 0;
149    
150        for( Object s : list )
151          {
152          if( count != 0 )
153            buffer.append( delim );
154    
155          if( printNull || s != null )
156            buffer.append( s );
157    
158          count++;
159          }
160    
161        return buffer.toString();
162        }
163    
164      public static String join( String delim, String... strings )
165        {
166        return join( delim, false, strings );
167        }
168    
169      public static String join( String delim, boolean printNull, String... strings )
170        {
171        return join( strings, delim, printNull );
172        }
173    
174      /**
175       * This method joins the values in the given list with the delim String value.
176       *
177       * @param list
178       * @param delim
179       * @return a String
180       */
181      public static String join( Object[] list, String delim )
182        {
183        return join( list, delim, false );
184        }
185    
186      public static String join( Object[] list, String delim, boolean printNull )
187        {
188        return join( list, delim, printNull, 0 );
189        }
190    
191      public static String join( Object[] list, String delim, boolean printNull, int beginAt )
192        {
193        return join( list, delim, printNull, beginAt, list.length - beginAt );
194        }
195    
196      public static String join( Object[] list, String delim, boolean printNull, int beginAt, int length )
197        {
198        StringBuffer buffer = new StringBuffer();
199        int count = 0;
200    
201        for( int i = beginAt; i < beginAt + length; i++ )
202          {
203          Object s = list[ i ];
204          if( count != 0 )
205            buffer.append( delim );
206    
207          if( printNull || s != null )
208            buffer.append( s );
209    
210          count++;
211          }
212    
213        return buffer.toString();
214        }
215    
216      public static String join( Iterable iterable, String delim, boolean printNull )
217        {
218        int count = 0;
219    
220        StringBuilder buffer = new StringBuilder();
221    
222        for( Object s : iterable )
223          {
224          if( count != 0 )
225            buffer.append( delim );
226    
227          if( printNull || s != null )
228            buffer.append( s );
229    
230          count++;
231          }
232    
233        return buffer.toString();
234        }
235    
236      /**
237       * This method joins each value in the collection with a tab character as the delimiter.
238       *
239       * @param collection
240       * @return a String
241       */
242      public static String join( Collection collection )
243        {
244        return join( collection, "\t" );
245        }
246    
247      /**
248       * This method joins each valuein the collection with the given delimiter.
249       *
250       * @param collection
251       * @param delim
252       * @return a String
253       */
254      public static String join( Collection collection, String delim )
255        {
256        return join( collection, delim, false );
257        }
258    
259      public static String join( Collection collection, String delim, boolean printNull )
260        {
261        StringBuffer buffer = new StringBuffer();
262    
263        join( buffer, collection, delim, printNull );
264    
265        return buffer.toString();
266        }
267    
268      /**
269       * This method joins each value in the collection with the given delimiter. All results are appended to the
270       * given {@link StringBuffer} instance.
271       *
272       * @param buffer
273       * @param collection
274       * @param delim
275       */
276      public static void join( StringBuffer buffer, Collection collection, String delim )
277        {
278        join( buffer, collection, delim, false );
279        }
280    
281      public static void join( StringBuffer buffer, Collection collection, String delim, boolean printNull )
282        {
283        int count = 0;
284    
285        for( Object s : collection )
286          {
287          if( count != 0 )
288            buffer.append( delim );
289    
290          if( printNull || s != null )
291            buffer.append( s );
292    
293          count++;
294          }
295        }
296    
297      public static String[] removeNulls( String... strings )
298        {
299        List<String> list = new ArrayList<String>();
300    
301        for( String string : strings )
302          {
303          if( string != null )
304            list.add( string );
305          }
306    
307        return list.toArray( new String[ list.size() ] );
308        }
309    
310      public static Collection<String> quote( Collection<String> collection, String quote )
311        {
312        List<String> list = new ArrayList<String>();
313    
314        for( String string : collection )
315          list.add( quote + string + quote );
316    
317        return list;
318        }
319    
320      public static String print( Collection collection, String delim )
321        {
322        StringBuffer buffer = new StringBuffer();
323    
324        print( buffer, collection, delim );
325    
326        return buffer.toString();
327        }
328    
329      public static void print( StringBuffer buffer, Collection collection, String delim )
330        {
331        int count = 0;
332    
333        for( Object s : collection )
334          {
335          if( count != 0 )
336            buffer.append( delim );
337    
338          buffer.append( "[" );
339          buffer.append( s );
340          buffer.append( "]" );
341    
342          count++;
343          }
344        }
345    
346      /**
347       * This method attempts to remove any username and password from the given url String.
348       *
349       * @param url
350       * @return a String
351       */
352      public static String sanitizeUrl( String url )
353        {
354        if( url == null )
355          return null;
356    
357        return url.replaceAll( "(?<=//).*:.*@", "" );
358        }
359    
360      /**
361       * This method attempts to remove duplicate consecutive forward slashes from the given url.
362       *
363       * @param url
364       * @return a String
365       */
366      public static String normalizeUrl( String url )
367        {
368        if( url == null )
369          return null;
370    
371        return url.replaceAll( "([^:]/)/{2,}", "$1/" );
372        }
373    
374      /**
375       * This method returns the {@link Object#toString()} of the given object, or an empty String if the object
376       * is null.
377       *
378       * @param object
379       * @return a String
380       */
381      public static String toNull( Object object )
382        {
383        if( object == null )
384          return "";
385    
386        return object.toString();
387        }
388    
389      /**
390       * This method truncates the given String value to the given size, but appends an ellipse ("...") if the
391       * String is larger than maxSize.
392       *
393       * @param string
394       * @param maxSize
395       * @return a String
396       */
397      public static String truncate( String string, int maxSize )
398        {
399        string = toNull( string );
400    
401        if( string.length() <= maxSize )
402          return string;
403    
404        return String.format( "%s...", string.subSequence( 0, maxSize - 3 ) );
405        }
406    
407      public static String printGraph( SimpleDirectedGraph graph )
408        {
409        StringWriter writer = new StringWriter();
410    
411        printGraph( writer, graph );
412    
413        return writer.toString();
414        }
415    
416      public static void printGraph( PrintStream out, SimpleDirectedGraph graph )
417        {
418        PrintWriter printWriter = new PrintWriter( out );
419    
420        printGraph( printWriter, graph );
421        }
422    
423      public static void printGraph( String filename, SimpleDirectedGraph graph )
424        {
425        try
426          {
427          Writer writer = new FileWriter( filename );
428    
429          try
430            {
431            printGraph( writer, graph );
432            }
433          finally
434            {
435            writer.close();
436            }
437          }
438        catch( IOException exception )
439          {
440          LOG.error( "failed printing graph to {}, with exception: {}", filename, exception );
441          }
442        }
443    
444      @SuppressWarnings({"unchecked"})
445      private static void printGraph( Writer writer, SimpleDirectedGraph graph )
446        {
447        DOTExporter dot = new DOTExporter( new IntegerNameProvider(), new VertexNameProvider()
448        {
449        public String getVertexName( Object object )
450          {
451          if( object == null )
452            return "none";
453    
454          return object.toString().replaceAll( "\"", "\'" );
455          }
456        }, new EdgeNameProvider<Object>()
457        {
458        public String getEdgeName( Object object )
459          {
460          if( object == null )
461            return "none";
462    
463          return object.toString().replaceAll( "\"", "\'" );
464          }
465        }
466        );
467    
468        dot.export( writer, graph );
469        }
470    
471      public static void printMatrix( PrintStream out, SimpleDirectedGraph<FlowElement, Scope> graph )
472        {
473        new MatrixExporter().exportAdjacencyMatrix( new PrintWriter( out ), graph );
474        }
475    
476      /**
477       * This method removes all nulls from the given List.
478       *
479       * @param list
480       */
481      @SuppressWarnings({"StatementWithEmptyBody"})
482      public static void removeAllNulls( List list )
483        {
484        while( list.remove( null ) )
485          ;
486        }
487    
488      public static String formatTrace( Scheme scheme, String message )
489        {
490        if( scheme == null )
491          return message;
492    
493        String trace = scheme.getTrace();
494    
495        if( trace == null )
496          return message;
497    
498        return "[" + truncate( scheme.toString(), 25 ) + "][" + trace + "] " + message;
499        }
500    
501    
502      /**
503       * Method formatRawTrace does not include the pipe name
504       *
505       * @param pipe    of type Pipe
506       * @param message of type String
507       * @return String
508       */
509      public static String formatRawTrace( Pipe pipe, String message )
510        {
511        if( pipe == null )
512          return message;
513    
514        String trace = pipe.getTrace();
515    
516        if( trace == null )
517          return message;
518    
519        return "[" + trace + "] " + message;
520        }
521    
522      public static String formatTrace( Pipe pipe, String message )
523        {
524        if( pipe == null )
525          return message;
526    
527        String trace = pipe.getTrace();
528    
529        if( trace == null )
530          return message;
531    
532        return "[" + truncate( pipe.getName(), 25 ) + "][" + trace + "] " + message;
533        }
534    
535      public static String formatTrace( Tap tap, String message )
536        {
537        if( tap == null )
538          return message;
539    
540        String trace = tap.getTrace();
541    
542        if( trace == null )
543          return message;
544    
545        return "[" + truncate( tap.toString(), 25 ) + "][" + trace + "] " + message;
546        }
547    
548      public static String formatTrace( Operation operation, String message )
549        {
550        if( !( operation instanceof BaseOperation ) )
551          return message;
552    
553        String trace = ( (BaseOperation) operation ).getTrace();
554    
555        if( trace == null )
556          return message;
557    
558        return "[" + trace + "] " + message;
559        }
560    
561      public static String captureDebugTrace( Class type )
562        {
563        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
564        Package packageName = type.getPackage();
565        String typeName = type.getName();
566    
567        boolean skip = true;
568    
569        for( StackTraceElement stackTraceElement : stackTrace )
570          {
571          String className = stackTraceElement.getClassName();
572    
573          if( skip )
574            {
575            skip = !className.equals( typeName );
576            continue;
577            }
578          else
579            {
580            if( packageName != null && stackTraceElement.getClassName().equals( typeName ) )
581              {
582              continue;
583              }
584            }
585    
586          return stackTraceElement.toString();
587          }
588    
589        return null;
590        }
591    
592      public static void writeDOT( Writer writer, SimpleDirectedGraph graph, IntegerNameProvider vertexIdProvider, VertexNameProvider vertexNameProvider, EdgeNameProvider edgeNameProvider )
593        {
594        new DOTExporter( vertexIdProvider, vertexNameProvider, edgeNameProvider ).export( writer, graph );
595        }
596    
597      public static boolean isEmpty( String string )
598        {
599        return string == null || string.isEmpty();
600        }
601    
602      private static String[] findSplitName( String path )
603        {
604        String separator = "/";
605    
606        if( path.lastIndexOf( "/" ) < path.lastIndexOf( "\\" ) )
607          separator = "\\\\";
608    
609        String[] split = path.split( separator );
610    
611        path = split[ split.length - 1 ];
612    
613        path = path.substring( 0, path.lastIndexOf( '.' ) ); // remove .jar
614    
615        return path.split( "-(?=\\d)", 2 );
616        }
617    
618      public static String findVersion( String path )
619        {
620        if( path == null || path.isEmpty() )
621          return null;
622    
623        String[] split = findSplitName( path );
624    
625        if( split.length == 2 )
626          return split[ 1 ];
627    
628        return null;
629        }
630    
631      public static String findName( String path )
632        {
633        if( path == null || path.isEmpty() )
634          return null;
635    
636        String[] split = findSplitName( path );
637    
638        if( split.length == 0 )
639          return null;
640    
641        return split[ 0 ];
642        }
643    
644      public static long getSourceModified( Object confCopy, Iterator<Tap> values, long sinkModified ) throws IOException
645        {
646        long sourceModified = 0;
647    
648        while( values.hasNext() )
649          {
650          Tap source = values.next();
651    
652          if( source instanceof MultiSourceTap )
653            return getSourceModified( confCopy, ( (MultiSourceTap) source ).getChildTaps(), sinkModified );
654    
655          sourceModified = source.getModifiedTime( confCopy );
656    
657          // source modified returns zero if does not exist
658          // this should minimize number of times we touch any file meta-data server
659          if( sourceModified == 0 && !source.resourceExists( confCopy ) )
660            throw new FlowException( "source does not exist: " + source );
661    
662          if( sinkModified < sourceModified )
663            return sourceModified;
664          }
665    
666        return sourceModified;
667        }
668    
669      public static long getSinkModified( Object config, Collection<Tap> sinks ) throws IOException
670        {
671        long sinkModified = Long.MAX_VALUE;
672    
673        for( Tap sink : sinks )
674          {
675          if( sink.isReplace() || sink.isUpdate() )
676            sinkModified = -1L;
677          else
678            {
679            if( !sink.resourceExists( config ) )
680              sinkModified = 0L;
681            else
682              sinkModified = Math.min( sinkModified, sink.getModifiedTime( config ) ); // return youngest mod date
683            }
684          }
685        return sinkModified;
686        }
687    
688      public static String getTypeName( Type type )
689        {
690        if( type == null )
691          return null;
692    
693        return type instanceof Class ? ( (Class) type ).getCanonicalName() : type.toString();
694        }
695    
696      public static String getSimpleTypeName( Type type )
697        {
698        if( type == null )
699          return null;
700    
701        return type instanceof Class ? ( (Class) type ).getSimpleName() : type.toString();
702        }
703    
704      public static String[] typeNames( Type[] types )
705        {
706        String[] names = new String[ types.length ];
707    
708        for( int i = 0; i < types.length; i++ )
709          names[ i ] = getTypeName( types[ i ] );
710    
711        return names;
712        }
713    
714      public static String[] simpleTypeNames( Type[] types )
715        {
716        String[] names = new String[ types.length ];
717    
718        for( int i = 0; i < types.length; i++ )
719          names[ i ] = getSimpleTypeName( types[ i ] );
720    
721        return names;
722        }
723    
724      public static boolean containsNull( Object[] values )
725        {
726        for( Object value : values )
727          {
728          if( value == null )
729            return true;
730          }
731    
732        return false;
733        }
734    
735      public static void safeSleep( long durationMillis )
736        {
737        try
738          {
739          Thread.sleep( durationMillis );
740          }
741        catch( InterruptedException exception )
742          {
743          // do nothing
744          }
745        }
746    
747      public interface RetryOperator<T>
748        {
749        T operate() throws Exception;
750    
751        boolean rethrow( Exception exception );
752        }
753    
754      public static <T> T retry( Logger logger, int retries, int secondsDelay, String message, RetryOperator<T> operator ) throws Exception
755        {
756        Exception saved = null;
757    
758        for( int i = 0; i < retries; i++ )
759          {
760          try
761            {
762            return operator.operate();
763            }
764          catch( Exception exception )
765            {
766            if( operator.rethrow( exception ) )
767              {
768              logger.warn( message + ", but not retrying", exception );
769    
770              throw exception;
771              }
772    
773            saved = exception;
774    
775            logger.warn( message + ", attempt: " + ( i + 1 ), exception );
776    
777            try
778              {
779              Thread.sleep( secondsDelay * 1000 );
780              }
781            catch( InterruptedException exception1 )
782              {
783              // do nothing
784              }
785            }
786          }
787    
788        logger.warn( message + ", done retrying after attempts: " + retries, saved );
789    
790        throw saved;
791        }
792    
793      public static Object createProtectedObject( Class type, Object[] parameters, Class[] parameterTypes )
794        {
795        try
796          {
797          Constructor constructor = type.getDeclaredConstructor( parameterTypes );
798    
799          constructor.setAccessible( true );
800    
801          return constructor.newInstance( parameters );
802          }
803        catch( Exception exception )
804          {
805          LOG.error( "unable to instantiate type: {}, with exception: {}", type.getName(), exception );
806    
807          throw new FlowException( "unable to instantiate type: " + type.getName(), exception );
808          }
809        }
810    
811      public static boolean hasClass( String typeString )
812        {
813        try
814          {
815          Util.class.getClassLoader().loadClass( typeString );
816    
817          return true;
818          }
819        catch( ClassNotFoundException exception )
820          {
821          return false;
822          }
823        }
824    
825      public static Object invokeStaticMethod( String typeString, String methodName, Object[] parameters, Class[] parameterTypes )
826        {
827        try
828          {
829          Class type = Util.class.getClassLoader().loadClass( typeString );
830    
831          return invokeStaticMethod( type, methodName, parameters, parameterTypes );
832          }
833        catch( ClassNotFoundException exception )
834          {
835          throw new CascadingException( "unable to load class: " + typeString, exception );
836          }
837        }
838    
839      public static Object invokeStaticMethod( Class type, String methodName, Object[] parameters, Class[] parameterTypes )
840        {
841        try
842          {
843          Method method = type.getDeclaredMethod( methodName, parameterTypes );
844    
845          method.setAccessible( true );
846    
847          return method.invoke( null, parameters );
848          }
849        catch( Exception exception )
850          {
851          throw new CascadingException( "unable to invoke static method: " + type.getName() + "." + methodName, exception );
852          }
853        }
854    
855      public static Object invokeInstanceMethod( Object target, String methodName, Object[] parameters, Class[] parameterTypes )
856        {
857        try
858          {
859          Method method = target.getClass().getMethod( methodName, parameterTypes );
860    
861          method.setAccessible( true );
862    
863          return method.invoke( target, parameters );
864          }
865        catch( Exception exception )
866          {
867          throw new CascadingException( "unable to invoke instance method: " + target.getClass().getName() + "." + methodName, exception );
868          }
869        }
870    
871      @Deprecated
872      public static String makeTempPath( String name )
873        {
874        if( name == null || name.isEmpty() )
875          throw new IllegalArgumentException( "name may not be null or empty " );
876    
877        name = cleansePathName( name.substring( 0, name.length() < 25 ? name.length() : 25 ) );
878    
879        return name + "/" + (int) ( Math.random() * 100000 ) + "/";
880        }
881    
882      public static String makePath( String prefix, String name )
883        {
884        if( name == null || name.isEmpty() )
885          throw new IllegalArgumentException( "name may not be null or empty " );
886    
887        if( prefix == null || prefix.isEmpty() )
888          prefix = Long.toString( (long) ( Math.random() * 10000000000L ) );
889    
890        name = cleansePathName( name.substring( 0, name.length() < 25 ? name.length() : 25 ) );
891    
892        return prefix + "/" + name + "/";
893        }
894    
895      public static String cleansePathName( String name )
896        {
897        return name.replaceAll( "\\s+|\\*|\\+|/+", "_" );
898        }
899      }