001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats; 022 023import java.io.Serializable; 024import java.util.Collection; 025import java.util.LinkedHashSet; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.atomic.AtomicLong; 029 030import cascading.flow.Flow; 031import cascading.management.state.ClientState; 032import cascading.util.ProcessLogger; 033 034/** 035 * Class CascadingStats is the base class for all Cascading statistics gathering. It also reports the status of 036 * core elements that have state. 037 * <p/> 038 * There are eight states the stats object reports; PENDING, SKIPPED, STARTED, SUBMITTED, RUNNING, SUCCESSFUL, STOPPED, and FAILED. 039 * <ul> 040 * <li>{@code pending} - when the Flow or Cascade has yet to start.</li> 041 * <li>{@code skipped} - when the Flow was skipped by the parent Cascade.</li> 042 * <li>{@code started} - when {@link cascading.flow.Flow#start()} was called.</li> 043 * <li>{@code submitted} - when the Step was submitted to the underlying platform for work.</li> 044 * <li>{@code running} - when the Flow or Cascade is executing a workload.</li> 045 * <li>{@code stopped} - when the user calls {@link cascading.flow.Flow#stop()} on the Flow or Cascade.</li> 046 * <li>{@code failed} - when the Flow or Cascade threw an error and failed to finish the workload.</li> 047 * <li>{@code successful} - when the Flow or Cascade naturally completed its workload without failure.</li> 048 * </ul> 049 * <p/> 050 * CascadingStats also reports four unique timestamps. 051 * <ul> 052 * <li>{@code startTime} - when the {@code start()} method was called.</li> 053 * <li>{@code submitTime} - when the unit of work was actually submitted for execution. Not supported by all sub-classes.</li> 054 * <li>{@code runTime} - when the unit of work actually began to execute work. This value may be affected by any "polling interval" in place.</li> 055 * <li>{@code finishedTime} - when all work has completed successfully, failed, or stopped.</li> 056 * </ul> 057 * <p/> 058 * A unit of work is considered {@code finished} when the Flow or Cascade is no longer processing a workload and {@code successful}, 059 * {@code skipped}, {@code failed}, or {@code stopped} is true. 060 * <p/> 061 * It is important to note all the timestamps are client side observations. Not values reported by the underlying 062 * platform. That said, the transitions are seen by polling the client interface to the underlying platform so are 063 * effected by the {@link cascading.flow.FlowProps#getJobPollingInterval()} value. 064 * 065 * @see CascadeStats 066 * @see FlowStats 067 * @see FlowStepStats 068 */ 069public abstract class CascadingStats<Child> implements ProvidesCounters, Serializable 070 { 071 public static final String STATS_STORE_INTERVAL = "cascading.stats.store.interval"; 072 public static final String STATS_COMPLETE_CHILD_DETAILS_BLOCK_DURATION = "cascading.stats.complete_child_details.block.duration"; 073 074 /** 075 * Method setStatsStoreInterval sets the interval time between store operations against the underlying 076 * document storage services. This affects the rate at which metrics and status information is updated. 077 * 078 * @param properties of type Properties 079 * @param intervalMs milliseconds between storage calls 080 */ 081 public static void setStatsStoreInterval( Map<Object, Object> properties, long intervalMs ) 082 { 083 if( intervalMs <= 0 ) 084 throw new IllegalArgumentException( "interval must be greater than zero, got: " + intervalMs ); 085 086 properties.put( STATS_STORE_INTERVAL, Long.toString( intervalMs ) ); 087 } 088 089 public enum Type 090 { 091 CASCADE, FLOW, STEP, NODE, SLICE, ATTEMPT; 092 093 public boolean isChild( Type type ) 094 { 095 return ordinal() < type.ordinal(); 096 } 097 } 098 099 public enum Status 100 { 101 PENDING( false ), SKIPPED( true ), STARTED( false ), SUBMITTED( false ), RUNNING( false ), SUCCESSFUL( true ), STOPPED( true ), FAILED( true ); 102 103 boolean isFinished = false; // is this a completed state 104 105 Status( boolean isFinished ) 106 { 107 this.isFinished = isFinished; 108 } 109 110 public boolean isFinished() 111 { 112 return isFinished; 113 } 114 } 115 116 private transient String prefixID; // cached sub-string 117 118 /** Field name */ 119 protected final String name; 120 protected final ClientState clientState; 121 122 /** Field status */ 123 protected Status status = Status.PENDING; 124 125 protected Set<StatsListener> listeners; 126 127 /** Field pendingTime */ 128 protected long pendingTime; 129 /** Field startTime */ 130 protected long startTime; 131 /** Field submitTime */ 132 protected long submitTime; 133 /** Field runTime */ 134 protected long runTime; 135 /** Field finishedTime */ 136 protected long finishedTime; 137 /** Field throwable */ 138 protected Throwable throwable; 139 /** Field throwableTrace */ 140 protected String[] throwableTrace; 141 142 protected AtomicLong lastCaptureDetail = new AtomicLong( 0 ); 143 144 protected CascadingStats( String name, ClientState clientState ) 145 { 146 this.name = name; 147 this.clientState = clientState; 148 } 149 150 /** Method prepare initializes this instance. */ 151 public void prepare() 152 { 153 clientState.startService(); 154 } 155 156 /** Method cleanup destroys any resources allocated by this instance. */ 157 public void cleanup() 158 { 159 clientState.stopService(); 160 } 161 162 /** 163 * Method getID returns the ID of this CascadingStats object. 164 * 165 * @return the ID (type Object) of this CascadingStats object. 166 */ 167 public abstract String getID(); 168 169 /** 170 * Method getName returns the name of this CascadingStats object. 171 * 172 * @return the name (type String) of this CascadingStats object. 173 */ 174 public String getName() 175 { 176 return name; 177 } 178 179 public abstract Type getType(); 180 181 /** 182 * Method getThrowable returns the throwable of this CascadingStats object. 183 * 184 * @return the throwable (type Throwable) of this CascadingStats object. 185 */ 186 public Throwable getThrowable() 187 { 188 return throwable; 189 } 190 191 /** 192 * Method getThrowableTrace returns the throwableTrace of this CascadingStats object. 193 * <p/> 194 * Will return null if not set. 195 * 196 * @return the throwableTrace (type String[]) of this CascadingStats object. 197 */ 198 public String[] getThrowableTrace() 199 { 200 return throwableTrace; 201 } 202 203 /** 204 * Method isPending returns true if no work has been submitted. 205 * 206 * @return the pending (type boolean) of this CascadingStats object. 207 */ 208 public boolean isPending() 209 { 210 return status == Status.PENDING; 211 } 212 213 /** 214 * Method isSkipped returns true when the works was skipped. 215 * <p/> 216 * Flows are skipped if the appropriate {@link cascading.flow.FlowSkipStrategy#skipFlow(Flow)} 217 * returns {@code true}; 218 * 219 * @return the skipped (type boolean) of this CascadingStats object. 220 */ 221 public boolean isSkipped() 222 { 223 return status == Status.SKIPPED; 224 } 225 226 /** 227 * Method isStarted returns true when work has started. 228 * 229 * @return the started (type boolean) of this CascadingStats object. 230 */ 231 public boolean isStarted() 232 { 233 return status == Status.STARTED; 234 } 235 236 /** 237 * Method isSubmitted returns true if no work has started. 238 * 239 * @return the submitted (type boolean) of this CascadingStats object. 240 */ 241 public boolean isSubmitted() 242 { 243 return status == Status.SUBMITTED; 244 } 245 246 /** 247 * Method isRunning returns true when work has begun. 248 * 249 * @return the running (type boolean) of this CascadingStats object. 250 */ 251 public boolean isRunning() 252 { 253 return status == Status.RUNNING; 254 } 255 256 /** 257 * Method isEngaged returns true when there is work being executed, if 258 * {@link #isStarted()}, {@link #isSubmitted()}, or {@link #isRunning()} returns true; 259 * 260 * @return the engaged (type boolean) of this CascadingStats object. 261 */ 262 public boolean isEngaged() 263 { 264 return isStarted() || isSubmitted() || isRunning(); 265 } 266 267 /** 268 * Method isSuccessful returns true when work has completed successfully. 269 * 270 * @return the completed (type boolean) of this CascadingStats object. 271 */ 272 public boolean isSuccessful() 273 { 274 return status == Status.SUCCESSFUL; 275 } 276 277 /** 278 * Method isFailed returns true when the work ended with an error. 279 * 280 * @return the failed (type boolean) of this CascadingStats object. 281 */ 282 public boolean isFailed() 283 { 284 return status == Status.FAILED; 285 } 286 287 /** 288 * Method isStopped returns true when the user stopped the work. 289 * 290 * @return the stopped (type boolean) of this CascadingStats object. 291 */ 292 public boolean isStopped() 293 { 294 return status == Status.STOPPED; 295 } 296 297 /** 298 * Method isFinished returns true if the current status shows no work currently being executed, 299 * if {@link #isSkipped()}, {@link #isSuccessful()}, {@link #isFailed()}, or {@link #isStopped()} returns true. 300 * 301 * @return the finished (type boolean) of this CascadingStats object. 302 */ 303 public boolean isFinished() 304 { 305 return status == Status.SUCCESSFUL || status == Status.FAILED || status == Status.STOPPED || status == Status.SKIPPED; 306 } 307 308 /** 309 * Method getStatus returns the {@link Status} of this CascadingStats object. 310 * 311 * @return the status (type Status) of this CascadingStats object. 312 */ 313 public Status getStatus() 314 { 315 return status; 316 } 317 318 /** Method recordStats forces recording of current status information. */ 319 public void recordStats() 320 { 321 clientState.recordStats( this ); 322 } 323 324 public abstract void recordInfo(); 325 326 /** Method markPending sets the status to {@link Status#PENDING}. */ 327 public synchronized void markPending() 328 { 329 markPendingTime(); 330 331 fireListeners( null, Status.PENDING ); 332 333 recordStats(); 334 recordInfo(); 335 } 336 337 protected void markPendingTime() 338 { 339 if( pendingTime == 0 ) 340 pendingTime = System.currentTimeMillis(); 341 } 342 343 /** 344 * Method markStartedThenRunning consecutively marks the status as {@link Status#STARTED} then {@link Status#RUNNING} 345 * and forces the start and running time to be equals. 346 */ 347 public synchronized void markStartedThenRunning() 348 { 349 if( status != Status.PENDING ) 350 throw new IllegalStateException( "may not mark as " + Status.STARTED + ", is already " + status ); 351 352 markStartToRunTime(); 353 markStarted(); 354 markRunning(); 355 } 356 357 protected void markStartToRunTime() 358 { 359 startTime = submitTime = runTime = System.currentTimeMillis(); 360 } 361 362 /** Method markStarted sets the status to {@link Status#STARTED}. */ 363 public synchronized void markStarted() 364 { 365 if( status != Status.PENDING ) 366 throw new IllegalStateException( "may not mark as " + Status.STARTED + ", is already " + status ); 367 368 Status priorStatus = status; 369 status = Status.STARTED; 370 markStartTime(); 371 372 fireListeners( priorStatus, status ); 373 374 clientState.start( startTime ); 375 clientState.setStatus( status, startTime ); 376 recordStats(); 377 recordInfo(); 378 } 379 380 protected void markStartTime() 381 { 382 if( startTime == 0 ) 383 startTime = System.currentTimeMillis(); 384 } 385 386 /** Method markSubmitted sets the status to {@link Status#SUBMITTED}. */ 387 public synchronized void markSubmitted() 388 { 389 if( status == Status.SUBMITTED ) 390 return; 391 392 if( status != Status.STARTED ) 393 throw new IllegalStateException( "may not mark as " + Status.SUBMITTED + ", is already " + status ); 394 395 Status priorStatus = status; 396 status = Status.SUBMITTED; 397 markSubmitTime(); 398 399 fireListeners( priorStatus, status ); 400 401 clientState.submit( submitTime ); 402 clientState.setStatus( status, submitTime ); 403 recordStats(); 404 recordInfo(); 405 } 406 407 protected void markSubmitTime() 408 { 409 if( submitTime == 0 ) 410 submitTime = System.currentTimeMillis(); 411 } 412 413 /** Method markRunning sets the status to {@link Status#RUNNING}. */ 414 public synchronized void markRunning() 415 { 416 if( status == Status.RUNNING ) 417 return; 418 419 if( status != Status.STARTED && status != Status.SUBMITTED ) 420 throw new IllegalStateException( "may not mark as " + Status.RUNNING + ", is already " + status ); 421 422 Status priorStatus = status; 423 status = Status.RUNNING; 424 markRunTime(); 425 426 fireListeners( priorStatus, status ); 427 428 clientState.run( runTime ); 429 clientState.setStatus( status, runTime ); 430 recordStats(); 431 recordInfo(); 432 } 433 434 protected void markRunTime() 435 { 436 if( runTime == 0 ) 437 runTime = System.currentTimeMillis(); 438 } 439 440 /** Method markSuccessful sets the status to {@link Status#SUCCESSFUL}. */ 441 public synchronized void markSuccessful() 442 { 443 if( status != Status.RUNNING && status != Status.SUBMITTED ) 444 throw new IllegalStateException( "may not mark as " + Status.SUCCESSFUL + ", is already " + status ); 445 446 Status priorStatus = status; 447 status = Status.SUCCESSFUL; 448 markFinishedTime(); 449 450 fireListeners( priorStatus, status ); 451 452 clientState.setStatus( status, finishedTime ); 453 clientState.stop( finishedTime ); 454 recordStats(); 455 recordInfo(); 456 } 457 458 protected void markFinishedTime() 459 { 460 finishedTime = System.currentTimeMillis(); 461 } 462 463 /** 464 * Method markFailed sets the status to {@link Status#FAILED}. 465 */ 466 public void markFailed() 467 { 468 markFailed( null, null ); 469 } 470 471 /** 472 * Method markFailed sets the status to {@link Status#FAILED}. 473 * 474 * @param throwable of type Throwable 475 */ 476 public synchronized void markFailed( Throwable throwable ) 477 { 478 markFailed( throwable, null ); 479 } 480 481 /** 482 * Method markFailed sets the status to {@link Status#FAILED}. 483 * 484 * @param throwableTrace of type String[] 485 */ 486 public synchronized void markFailed( String[] throwableTrace ) 487 { 488 markFailed( null, throwableTrace ); 489 } 490 491 protected synchronized void markFailed( Throwable throwable, String[] throwableTrace ) 492 { 493 if( status != Status.STARTED && status != Status.RUNNING && status != Status.SUBMITTED ) 494 throw new IllegalStateException( "may not mark as " + Status.FAILED + ", is already " + status ); 495 496 Status priorStatus = status; 497 status = Status.FAILED; 498 markFinishedTime(); 499 this.throwable = throwable; 500 this.throwableTrace = throwableTrace; 501 502 fireListeners( priorStatus, status ); 503 504 clientState.setStatus( status, finishedTime ); 505 clientState.stop( finishedTime ); 506 recordStats(); 507 recordInfo(); 508 } 509 510 /** Method markStopped sets the status to {@link Status#STOPPED}. */ 511 public synchronized void markStopped() 512 { 513 if( status != Status.PENDING && status != Status.STARTED && status != Status.SUBMITTED && status != Status.RUNNING ) 514 throw new IllegalStateException( "may not mark as " + Status.STOPPED + ", is already " + status ); 515 516 Status priorStatus = status; 517 status = Status.STOPPED; 518 markFinishedTime(); 519 520 fireListeners( priorStatus, status ); 521 522 clientState.setStatus( status, finishedTime ); 523 recordStats(); 524 recordInfo(); 525 clientState.stop( finishedTime ); 526 } 527 528 /** Method markSkipped sets the status to {@link Status#SKIPPED}. */ 529 public synchronized void markSkipped() 530 { 531 if( status != Status.PENDING ) 532 throw new IllegalStateException( "may not mark as " + Status.SKIPPED + ", is already " + status ); 533 534 Status priorStatus = status; 535 status = Status.SKIPPED; 536 537 fireListeners( priorStatus, status ); 538 539 clientState.setStatus( status, System.currentTimeMillis() ); 540 recordStats(); 541 recordInfo(); 542 } 543 544 /** 545 * Method getPendingTime returns the pendingTime of this CascadingStats object. 546 * 547 * @return the pendingTime (type long) of this CascadingStats object. 548 */ 549 public long getPendingTime() 550 { 551 return pendingTime; 552 } 553 554 /** 555 * Method getStartTime returns the startTime of this CascadingStats object. 556 * 557 * @return the startTime (type long) of this CascadingStats object. 558 */ 559 public long getStartTime() 560 { 561 return startTime; 562 } 563 564 /** 565 * Method getSubmitTime returns the submitTime of this CascadingStats object. 566 * 567 * @return the submitTime (type long) of this CascadingStats object. 568 */ 569 public long getSubmitTime() 570 { 571 return submitTime; 572 } 573 574 /** 575 * Method getRunTime returns the runTime of this CascadingStats object. 576 * 577 * @return the runTime (type long) of this CascadingStats object. 578 */ 579 public long getRunTime() 580 { 581 return runTime; 582 } 583 584 /** 585 * Method getFinishedTime returns the finishedTime of this CascadingStats object. 586 * 587 * @return the finishedTime (type long) of this CascadingStats object. 588 */ 589 public long getFinishedTime() 590 { 591 return finishedTime; 592 } 593 594 /** 595 * Method getDuration returns the duration the work executed before being finished. 596 * <p/> 597 * This method will return zero until the work is finished. See {@link #getCurrentDuration()} 598 * if you wish to poll for the current duration value. 599 * <p/> 600 * Duration is calculated as {@code finishedTime - startTime}. 601 * 602 * @return the duration (type long) of this CascadingStats object. 603 */ 604 public long getDuration() 605 { 606 if( finishedTime != 0 ) 607 return finishedTime - startTime; 608 else 609 return 0; 610 } 611 612 /** 613 * Method getCurrentDuration returns the current duration of the current work whether or not 614 * the work is finished. When finished, the return value will be the same as {@link #getDuration()}. 615 * <p/> 616 * Duration is calculated as {@code finishedTime - startTime}. 617 * 618 * @return the currentDuration (type long) of this CascadingStats object. 619 */ 620 public long getCurrentDuration() 621 { 622 if( finishedTime != 0 ) 623 return finishedTime - startTime; 624 else 625 return System.currentTimeMillis() - startTime; 626 } 627 628 @Override 629 public Collection<String> getCountersFor( Class<? extends Enum> group ) 630 { 631 return getCountersFor( group.getName() ); 632 } 633 634 /** 635 * Method getCounterGroupsMatching returns all the available counter group names that match 636 * the given regular expression. 637 * 638 * @param regex of type String 639 * @return Collection<String> 640 */ 641 public abstract Collection<String> getCounterGroupsMatching( String regex ); 642 643 /** 644 * Method captureDetail will recursively capture details about nested systems. Use this method to persist 645 * statistics about a given Cascade, Flow, FlowStep, or FlowNode. 646 * <p/> 647 * Each CascadingStats object must be individually inspected for any system specific details. 648 * <p/> 649 * Each call to this method will refresh the internal cache unless the current Stats object is marked finished. One 650 * additional refresh will happen after this instance is marked finished. 651 */ 652 public void captureDetail() 653 { 654 captureDetail( Type.ATTEMPT ); 655 } 656 657 public abstract void captureDetail( Type depth ); 658 659 /** 660 * For rate limiting access to the backend. 661 * <p/> 662 * Currently used at the Step and below. 663 */ 664 protected boolean isDetailStale() 665 { 666 return ( System.currentTimeMillis() - lastCaptureDetail.get() ) > 500; 667 } 668 669 protected void markDetailCaptured() 670 { 671 lastCaptureDetail.set( System.currentTimeMillis() ); 672 } 673 674 /** 675 * Method getChildren returns any relevant child statistics instances. They may not be of type CascadingStats, but 676 * instead platform specific. 677 * 678 * @return a Collection of child statistics 679 */ 680 public abstract Collection<Child> getChildren(); 681 682 /** 683 * Method getChildWith returns a child stats instance with the given ID value. 684 * 685 * @param id the id of a child instance 686 * @return the child stats instance or null if not found 687 */ 688 public abstract Child getChildWith( String id ); 689 690 public synchronized void addListener( StatsListener statsListener ) 691 { 692 if( listeners == null ) 693 listeners = new LinkedHashSet<>(); 694 695 listeners.add( statsListener ); 696 } 697 698 public synchronized boolean removeListener( StatsListener statsListener ) 699 { 700 return listeners != null && listeners.remove( statsListener ); 701 } 702 703 protected synchronized void fireListeners( CascadingStats.Status fromStatus, CascadingStats.Status toStatus ) 704 { 705 if( listeners == null ) 706 return; 707 708 for( StatsListener listener : listeners ) 709 { 710 try 711 { 712 listener.notify( this, fromStatus, toStatus ); 713 } 714 catch( Throwable throwable ) 715 { 716 logWarn( "error during listener notification, continuing with remaining listener notification", throwable ); 717 } 718 } 719 } 720 721 protected abstract ProcessLogger getProcessLogger(); 722 723 protected String getStatsString() 724 { 725 String string = "status=" + status + ", startTime=" + startTime; 726 727 if( finishedTime != 0 ) 728 string += ", duration=" + ( finishedTime - startTime ); 729 730 return string; 731 } 732 733 @Override 734 public String toString() 735 { 736 return "Cascading{" + getStatsString() + '}'; 737 } 738 739 protected void logInfo( String message, Object... arguments ) 740 { 741 getProcessLogger().logInfo( getPrefix() + message, arguments ); 742 } 743 744 protected void logDebug( String message, Object... arguments ) 745 { 746 getProcessLogger().logDebug( getPrefix() + message, arguments ); 747 } 748 749 protected void logWarn( String message, Object... arguments ) 750 { 751 getProcessLogger().logWarn( getPrefix() + message, arguments ); 752 } 753 754 protected void logError( String message, Object... arguments ) 755 { 756 getProcessLogger().logError( getPrefix() + message, arguments ); 757 } 758 759 protected void logError( String message, Throwable throwable ) 760 { 761 getProcessLogger().logError( getPrefix() + message, throwable ); 762 } 763 764 protected String getPrefix() 765 { 766 if( prefixID == null ) 767 prefixID = "[" + getType().name().toLowerCase() + ":" + getID().substring( 0, 5 ) + "] "; 768 769 return prefixID; 770 } 771 }