001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats;
022
023import java.io.Serializable;
024import java.util.Collection;
025import java.util.LinkedHashSet;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.atomic.AtomicLong;
029
030import cascading.flow.Flow;
031import cascading.management.state.ClientState;
032import cascading.util.ProcessLogger;
033
034/**
035 * Class CascadingStats is the base class for all Cascading statistics gathering. It also reports the status of
036 * core elements that have state.
037 * <p/>
038 * There are eight states the stats object reports; PENDING, SKIPPED, STARTED, SUBMITTED, RUNNING, SUCCESSFUL, STOPPED, and FAILED.
039 * <ul>
040 * <li>{@code pending} - when the Flow or Cascade has yet to start.</li>
041 * <li>{@code skipped} - when the Flow was skipped by the parent Cascade.</li>
042 * <li>{@code started} - when {@link cascading.flow.Flow#start()} was called.</li>
043 * <li>{@code submitted} - when the Step was submitted to the underlying platform for work.</li>
044 * <li>{@code running} - when the Flow or Cascade is executing a workload.</li>
045 * <li>{@code stopped} - when the user calls {@link cascading.flow.Flow#stop()} on the Flow or Cascade.</li>
046 * <li>{@code failed} - when the Flow or Cascade threw an error and failed to finish the workload.</li>
047 * <li>{@code successful} - when the Flow or Cascade naturally completed its workload without failure.</li>
048 * </ul>
049 * <p/>
050 * CascadingStats also reports four unique timestamps.
051 * <ul>
052 * <li>{@code startTime} - when the {@code start()} method was called.</li>
053 * <li>{@code submitTime} - when the unit of work was actually submitted for execution. Not supported by all sub-classes.</li>
054 * <li>{@code runTime} - when the unit of work actually began to execute work. This value may be affected by any "polling interval" in place.</li>
055 * <li>{@code finishedTime} - when all work has completed successfully, failed, or stopped.</li>
056 * </ul>
057 * <p/>
058 * A unit of work is considered {@code finished} when the Flow or Cascade is no longer processing a workload and {@code successful},
059 * {@code skipped}, {@code failed}, or {@code stopped} is true.
060 * <p/>
061 * It is important to note all the timestamps are client side observations. Not values reported by the underlying
062 * platform. That said, the transitions are seen by polling the client interface to the underlying platform so are
063 * effected by the {@link cascading.flow.FlowProps#getJobPollingInterval()} value.
064 *
065 * @see CascadeStats
066 * @see FlowStats
067 * @see FlowStepStats
068 */
069public abstract class CascadingStats<Child> implements ProvidesCounters, Serializable
070  {
071  public static final String STATS_STORE_INTERVAL = "cascading.stats.store.interval";
072  public static final String STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION = "cascading.stats.complete_child_details.block.duration";
073
074  /**
075   * Method setStatsStoreInterval sets the interval time between store operations against the underlying
076   * document storage services. This affects the rate at which metrics and status information is updated.
077   *
078   * @param properties of type Properties
079   * @param intervalMs milliseconds between storage calls
080   */
081  public static void setStatsStoreInterval( Map<Object, Object> properties, long intervalMs )
082    {
083    if( intervalMs <= 0 )
084      throw new IllegalArgumentException( "interval must be greater than zero, got: " + intervalMs );
085
086    properties.put( STATS_STORE_INTERVAL, Long.toString( intervalMs ) );
087    }
088
089  public enum Type
090    {
091      CASCADE, FLOW, STEP, NODE, SLICE, ATTEMPT;
092
093    public boolean isChild( Type type )
094      {
095      return ordinal() < type.ordinal();
096      }
097    }
098
099  public enum Status
100    {
101      PENDING( false ), SKIPPED( true ), STARTED( false ), SUBMITTED( false ), RUNNING( false ), SUCCESSFUL( true ), STOPPED( true ), FAILED( true );
102
103    boolean isFinished = false; // is this a completed state
104
105    Status( boolean isFinished )
106      {
107      this.isFinished = isFinished;
108      }
109
110    public boolean isFinished()
111      {
112      return isFinished;
113      }
114    }
115
116  private transient String prefixID; // cached sub-string
117
118  /** Field name */
119  final String name;
120  protected final ClientState clientState;
121
122  /** Field status */
123  Status status = Status.PENDING;
124
125  Set<StatsListener> listeners;
126
127  /** Field pendingTime */
128  long pendingTime;
129  /** Field startTime */
130  long startTime;
131  /** Field submitTime */
132  long submitTime;
133  /** Field runTime */
134  long runTime;
135  /** Field finishedTime */
136  long finishedTime;
137  /** Field throwable */
138  Throwable throwable;
139  /** Field throwableTrace */
140  String[] throwableTrace;
141
142  AtomicLong lastCaptureDetail = new AtomicLong( 0 );
143
144  protected CascadingStats( String name, ClientState clientState )
145    {
146    this.name = name;
147    this.clientState = clientState;
148    }
149
150  /** Method prepare initializes this instance. */
151  public void prepare()
152    {
153    clientState.startService();
154    }
155
156  /** Method cleanup destroys any resources allocated by this instance. */
157  public void cleanup()
158    {
159    clientState.stopService();
160    }
161
162  /**
163   * Method getID returns the ID of this CascadingStats object.
164   *
165   * @return the ID (type Object) of this CascadingStats object.
166   */
167  public abstract String getID();
168
169  /**
170   * Method getName returns the name of this CascadingStats object.
171   *
172   * @return the name (type String) of this CascadingStats object.
173   */
174  public String getName()
175    {
176    return name;
177    }
178
179  public abstract Type getType();
180
181  /**
182   * Method getThrowable returns the throwable of this CascadingStats object.
183   *
184   * @return the throwable (type Throwable) of this CascadingStats object.
185   */
186  public Throwable getThrowable()
187    {
188    return throwable;
189    }
190
191  /**
192   * Method getThrowableTrace returns the throwableTrace of this CascadingStats object.
193   * <p/>
194   * Will return null if not set.
195   *
196   * @return the throwableTrace (type String[]) of this CascadingStats object.
197   */
198  public String[] getThrowableTrace()
199    {
200    return throwableTrace;
201    }
202
203  /**
204   * Method isPending returns true if no work has been submitted.
205   *
206   * @return the pending (type boolean) of this CascadingStats object.
207   */
208  public boolean isPending()
209    {
210    return status == Status.PENDING;
211    }
212
213  /**
214   * Method isSkipped returns true when the works was skipped.
215   * <p/>
216   * Flows are skipped if the appropriate {@link cascading.flow.FlowSkipStrategy#skipFlow(Flow)}
217   * returns {@code true};
218   *
219   * @return the skipped (type boolean) of this CascadingStats object.
220   */
221  public boolean isSkipped()
222    {
223    return status == Status.SKIPPED;
224    }
225
226  /**
227   * Method isStarted returns true when work has started.
228   *
229   * @return the started (type boolean) of this CascadingStats object.
230   */
231  public boolean isStarted()
232    {
233    return status == Status.STARTED;
234    }
235
236  /**
237   * Method isSubmitted returns true if no work has started.
238   *
239   * @return the submitted (type boolean) of this CascadingStats object.
240   */
241  public boolean isSubmitted()
242    {
243    return status == Status.SUBMITTED;
244    }
245
246  /**
247   * Method isRunning returns true when work has begun.
248   *
249   * @return the running (type boolean) of this CascadingStats object.
250   */
251  public boolean isRunning()
252    {
253    return status == Status.RUNNING;
254    }
255
256  /**
257   * Method isEngaged returns true when there is work being executed, if
258   * {@link #isStarted()}, {@link #isSubmitted()}, or {@link #isRunning()} returns true;
259   *
260   * @return the engaged (type boolean) of this CascadingStats object.
261   */
262  public boolean isEngaged()
263    {
264    return isStarted() || isSubmitted() || isRunning();
265    }
266
267  /**
268   * Method isSuccessful returns true when work has completed successfully.
269   *
270   * @return the completed (type boolean) of this CascadingStats object.
271   */
272  public boolean isSuccessful()
273    {
274    return status == Status.SUCCESSFUL;
275    }
276
277  /**
278   * Method isFailed returns true when the work ended with an error.
279   *
280   * @return the failed (type boolean) of this CascadingStats object.
281   */
282  public boolean isFailed()
283    {
284    return status == Status.FAILED;
285    }
286
287  /**
288   * Method isStopped returns true when the user stopped the work.
289   *
290   * @return the stopped (type boolean) of this CascadingStats object.
291   */
292  public boolean isStopped()
293    {
294    return status == Status.STOPPED;
295    }
296
297  /**
298   * Method isFinished returns true if the current status shows no work currently being executed,
299   * if {@link #isSkipped()}, {@link #isSuccessful()}, {@link #isFailed()}, or {@link #isStopped()} returns true.
300   *
301   * @return the finished (type boolean) of this CascadingStats object.
302   */
303  public boolean isFinished()
304    {
305    return status == Status.SUCCESSFUL || status == Status.FAILED || status == Status.STOPPED || status == Status.SKIPPED;
306    }
307
308  /**
309   * Method getStatus returns the {@link Status} of this CascadingStats object.
310   *
311   * @return the status (type Status) of this CascadingStats object.
312   */
313  public Status getStatus()
314    {
315    return status;
316    }
317
318  /** Method recordStats forces recording of current status information. */
319  public void recordStats()
320    {
321    clientState.recordStats( this );
322    }
323
324  public abstract void recordInfo();
325
326  /** Method markPending sets the status to {@link Status#PENDING}. */
327  public synchronized void markPending()
328    {
329    markPendingTime();
330
331    fireListeners( null, Status.PENDING );
332
333    recordStats();
334    recordInfo();
335    }
336
337  protected void markPendingTime()
338    {
339    if( pendingTime == 0 )
340      pendingTime = System.currentTimeMillis();
341    }
342
343  /**
344   * Method markStartedThenRunning consecutively marks the status as {@link Status#STARTED} then {@link Status#RUNNING}
345   * and forces the start and running time to be equals.
346   */
347  public synchronized void markStartedThenRunning()
348    {
349    if( status != Status.PENDING )
350      throw new IllegalStateException( "may not mark as " + Status.STARTED + ", is already " + status );
351
352    markStartToRunTime();
353    markStarted();
354    markRunning();
355    }
356
357  protected void markStartToRunTime()
358    {
359    startTime = submitTime = runTime = System.currentTimeMillis();
360    }
361
362  /** Method markStarted sets the status to {@link Status#STARTED}. */
363  public synchronized void markStarted()
364    {
365    if( status != Status.PENDING )
366      throw new IllegalStateException( "may not mark as " + Status.STARTED + ", is already " + status );
367
368    Status priorStatus = status;
369    status = Status.STARTED;
370    markStartTime();
371
372    fireListeners( priorStatus, status );
373
374    clientState.start( startTime );
375    clientState.setStatus( status, startTime );
376    recordStats();
377    recordInfo();
378    }
379
380  protected void markStartTime()
381    {
382    if( startTime == 0 )
383      startTime = System.currentTimeMillis();
384    }
385
386  /** Method markSubmitted sets the status to {@link Status#SUBMITTED}. */
387  public synchronized void markSubmitted()
388    {
389    if( status == Status.SUBMITTED )
390      return;
391
392    if( status != Status.STARTED )
393      throw new IllegalStateException( "may not mark as " + Status.SUBMITTED + ", is already " + status );
394
395    Status priorStatus = status;
396    status = Status.SUBMITTED;
397    markSubmitTime();
398
399    fireListeners( priorStatus, status );
400
401    clientState.submit( submitTime );
402    clientState.setStatus( status, submitTime );
403    recordStats();
404    recordInfo();
405    }
406
407  protected void markSubmitTime()
408    {
409    if( submitTime == 0 )
410      submitTime = System.currentTimeMillis();
411    }
412
413  /** Method markRunning sets the status to {@link Status#RUNNING}. */
414  public synchronized void markRunning()
415    {
416    if( status == Status.RUNNING )
417      return;
418
419    if( status != Status.STARTED && status != Status.SUBMITTED )
420      throw new IllegalStateException( "may not mark as " + Status.RUNNING + ", is already " + status );
421
422    Status priorStatus = status;
423    status = Status.RUNNING;
424    markRunTime();
425
426    fireListeners( priorStatus, status );
427
428    clientState.run( runTime );
429    clientState.setStatus( status, runTime );
430    recordStats();
431    recordInfo();
432    }
433
434  protected void markRunTime()
435    {
436    if( runTime == 0 )
437      runTime = System.currentTimeMillis();
438    }
439
440  /** Method markSuccessful sets the status to {@link Status#SUCCESSFUL}. */
441  public synchronized void markSuccessful()
442    {
443    if( status != Status.RUNNING && status != Status.SUBMITTED )
444      throw new IllegalStateException( "may not mark as " + Status.SUCCESSFUL + ", is already " + status );
445
446    Status priorStatus = status;
447    status = Status.SUCCESSFUL;
448    markFinishedTime();
449
450    fireListeners( priorStatus, status );
451
452    clientState.setStatus( status, finishedTime );
453    clientState.stop( finishedTime );
454    recordStats();
455    recordInfo();
456    }
457
458  private void markFinishedTime()
459    {
460    finishedTime = System.currentTimeMillis();
461    }
462
463  /**
464   * Method markFailed sets the status to {@link Status#FAILED}.
465   */
466  public void markFailed()
467    {
468    markFailed( null, null );
469    }
470
471  /**
472   * Method markFailed sets the status to {@link Status#FAILED}.
473   *
474   * @param throwable of type Throwable
475   */
476  public synchronized void markFailed( Throwable throwable )
477    {
478    markFailed( throwable, null );
479    }
480
481  /**
482   * Method markFailed sets the status to {@link Status#FAILED}.
483   *
484   * @param throwableTrace of type String[]
485   */
486  public synchronized void markFailed( String[] throwableTrace )
487    {
488    markFailed( null, throwableTrace );
489    }
490
491  protected synchronized void markFailed( Throwable throwable, String[] throwableTrace )
492    {
493    if( status != Status.STARTED && status != Status.RUNNING && status != Status.SUBMITTED )
494      throw new IllegalStateException( "may not mark as " + Status.FAILED + ", is already " + status );
495
496    Status priorStatus = status;
497    status = Status.FAILED;
498    markFinishedTime();
499    this.throwable = throwable;
500    this.throwableTrace = throwableTrace;
501
502    fireListeners( priorStatus, status );
503
504    clientState.setStatus( status, finishedTime );
505    clientState.stop( finishedTime );
506    recordStats();
507    recordInfo();
508    }
509
510  /** Method markStopped sets the status to {@link Status#STOPPED}. */
511  public synchronized void markStopped()
512    {
513    if( status != Status.PENDING && status != Status.STARTED && status != Status.SUBMITTED && status != Status.RUNNING )
514      throw new IllegalStateException( "may not mark as " + Status.STOPPED + ", is already " + status );
515
516    Status priorStatus = status;
517    status = Status.STOPPED;
518    markFinishedTime();
519
520    fireListeners( priorStatus, status );
521
522    clientState.setStatus( status, finishedTime );
523    recordStats();
524    recordInfo();
525    clientState.stop( finishedTime );
526    }
527
528  /** Method markSkipped sets the status to {@link Status#SKIPPED}. */
529  public synchronized void markSkipped()
530    {
531    if( status != Status.PENDING )
532      throw new IllegalStateException( "may not mark as " + Status.SKIPPED + ", is already " + status );
533
534    Status priorStatus = status;
535    status = Status.SKIPPED;
536
537    fireListeners( priorStatus, status );
538
539    clientState.setStatus( status, System.currentTimeMillis() );
540    recordStats();
541    recordInfo();
542    }
543
544  /**
545   * Method getPendingTime returns the pendingTime of this CascadingStats object.
546   *
547   * @return the pendingTime (type long) of this CascadingStats object.
548   */
549  public long getPendingTime()
550    {
551    return pendingTime;
552    }
553
554  /**
555   * Method getStartTime returns the startTime of this CascadingStats object.
556   *
557   * @return the startTime (type long) of this CascadingStats object.
558   */
559  public long getStartTime()
560    {
561    return startTime;
562    }
563
564  /**
565   * Method getSubmitTime returns the submitTime of this CascadingStats object.
566   *
567   * @return the submitTime (type long) of this CascadingStats object.
568   */
569  public long getSubmitTime()
570    {
571    return submitTime;
572    }
573
574  /**
575   * Method getRunTime returns the runTime of this CascadingStats object.
576   *
577   * @return the runTime (type long) of this CascadingStats object.
578   */
579  public long getRunTime()
580    {
581    return runTime;
582    }
583
584  /**
585   * Method getFinishedTime returns the finishedTime of this CascadingStats object.
586   *
587   * @return the finishedTime (type long) of this CascadingStats object.
588   */
589  public long getFinishedTime()
590    {
591    return finishedTime;
592    }
593
594  /**
595   * Method getDuration returns the duration the work executed before being finished.
596   * <p/>
597   * This method will return zero until the work is finished. See {@link #getCurrentDuration()}
598   * if you wish to poll for the current duration value.
599   * <p/>
600   * Duration is calculated as {@code finishedTime - startTime}.
601   *
602   * @return the duration (type long) of this CascadingStats object.
603   */
604  public long getDuration()
605    {
606    if( finishedTime != 0 )
607      return finishedTime - startTime;
608    else
609      return 0;
610    }
611
612  /**
613   * Method getCurrentDuration returns the current duration of the current work whether or not
614   * the work is finished. When finished, the return value will be the same as {@link #getDuration()}.
615   * <p/>
616   * Duration is calculated as {@code finishedTime - startTime}.
617   *
618   * @return the currentDuration (type long) of this CascadingStats object.
619   */
620  public long getCurrentDuration()
621    {
622    if( finishedTime != 0 )
623      return finishedTime - startTime;
624    else
625      return System.currentTimeMillis() - startTime;
626    }
627
628  @Override
629  public Collection<String> getCountersFor( Class<? extends Enum> group )
630    {
631    return getCountersFor( group.getName() );
632    }
633
634  /**
635   * Method getCounterGroupsMatching returns all the available counter group names that match
636   * the given regular expression.
637   *
638   * @param regex of type String
639   * @return Collection<String>
640   */
641  public abstract Collection<String> getCounterGroupsMatching( String regex );
642
643  /**
644   * Method captureDetail will recursively capture details about nested systems. Use this method to persist
645   * statistics about a given Cascade, Flow, FlowStep, or FlowNode.
646   * <p/>
647   * Each CascadingStats object must be individually inspected for any system specific details.
648   * <p/>
649   * Each call to this method will refresh the internal cache unless the current Stats object is marked finished. One
650   * additional refresh will happen after this instance is marked finished.
651   */
652  public void captureDetail()
653    {
654    captureDetail( Type.ATTEMPT );
655    }
656
657  public abstract void captureDetail( Type depth );
658
659  /**
660   * For rate limiting access to the backend.
661   * <p/>
662   * Currently used at the Step and below.
663   */
664  protected boolean isDetailStale()
665    {
666    return ( System.currentTimeMillis() - lastCaptureDetail.get() ) > 500;
667    }
668
669  protected void markDetailCaptured()
670    {
671    lastCaptureDetail.set( System.currentTimeMillis() );
672    }
673
674  /**
675   * Method getChildren returns any relevant child statistics instances. They may not be of type CascadingStats, but
676   * instead platform specific.
677   *
678   * @return a Collection of child statistics
679   */
680  public abstract Collection<Child> getChildren();
681
682  /**
683   * Method getChildWith returns a child stats instance with the given ID value.
684   *
685   * @param id the id of a child instance
686   * @return the child stats instance or null if not found
687   */
688  public abstract Child getChildWith( String id );
689
690  public synchronized void addListener( StatsListener statsListener )
691    {
692    if( listeners == null )
693      listeners = new LinkedHashSet<>();
694
695    listeners.add( statsListener );
696    }
697
698  public synchronized boolean removeListener( StatsListener statsListener )
699    {
700    return listeners != null && listeners.remove( statsListener );
701    }
702
703  protected synchronized void fireListeners( CascadingStats.Status fromStatus, CascadingStats.Status toStatus )
704    {
705    if( listeners == null )
706      return;
707
708    for( StatsListener listener : listeners )
709      {
710      try
711        {
712        listener.notify( this, fromStatus, toStatus );
713        }
714      catch( Throwable throwable )
715        {
716        logWarn( "error during listener notification, continuing with remaining listener notification", throwable );
717        }
718      }
719    }
720
721  protected abstract ProcessLogger getProcessLogger();
722
723  protected String getStatsString()
724    {
725    String string = "status=" + status + ", startTime=" + startTime;
726
727    if( finishedTime != 0 )
728      string += ", duration=" + ( finishedTime - startTime );
729
730    return string;
731    }
732
733  @Override
734  public String toString()
735    {
736    return "Cascading{" + getStatsString() + '}';
737    }
738
739  protected void logInfo( String message, Object... arguments )
740    {
741    getProcessLogger().logInfo( getPrefix() + message, arguments );
742    }
743
744  protected void logDebug( String message, Object... arguments )
745    {
746    getProcessLogger().logDebug( getPrefix() + message, arguments );
747    }
748
749  protected void logWarn( String message, Object... arguments )
750    {
751    getProcessLogger().logWarn( getPrefix() + message, arguments );
752    }
753
754  protected void logError( String message, Object... arguments )
755    {
756    getProcessLogger().logError( getPrefix() + message, arguments );
757    }
758
759  protected void logError( String message, Throwable throwable )
760    {
761    getProcessLogger().logError( getPrefix() + message, throwable );
762    }
763
764  private String getPrefix()
765    {
766    if( prefixID == null )
767      prefixID = "[" + getType().name().toLowerCase() + ":" + getID().substring( 0, 5 ) + "] ";
768
769    return prefixID;
770    }
771  }