001 /* 002 * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021 package cascading.stats; 022 023 import java.io.Serializable; 024 import java.util.Collection; 025 import java.util.Map; 026 027 import cascading.flow.Flow; 028 import cascading.management.state.ClientState; 029 030 /** 031 * Class CascadingStats is the base class for all Cascading statistics gathering. It also reports the status of 032 * core elements that have state. 033 * <p/> 034 * There are eight states the stats object reports; PENDING, SKIPPED, STARTED, SUBMITTED, RUNNING, SUCCESSFUL, STOPPED, and FAILED. 035 * <ul> 036 * <li>{@code pending} - when the Flow or Cascade has yet to start.</li> 037 * <li>{@code skipped} - when the Flow was skipped by the parent Cascade.</li> 038 * <li>{@code started} - when {@link cascading.flow.Flow#start()} was called.</li> 039 * <li>{@code submitted} - when the Step was submitted to the underlying platform for work.</li> 040 * <li>{@code running} - when the Flow or Cascade is executing a workload.</li> 041 * <li>{@code stopped} - when the user calls {@link cascading.flow.Flow#stop()} on the Flow or Cascade.</li> 042 * <li>{@code failed} - when the Flow or Cascade threw an error and failed to finish the workload.</li> 043 * <li>{@code successful} - when the Flow or Cascade naturally completed its workload without failure.</li> 044 * </ul> 045 * <p/> 046 * CascadingStats also reports four unique timestamps. 047 * <ul> 048 * <li>{@code startTime} - when the {@code start()} method was called.</li> 049 * <li>{@code submitTime} - when the unit of work was actually submitted for execution. Not supported by all sub-classes.</li> 050 * <li>{@code runTime} - when the unit of work actually began to execute work. This value may be affected by any "polling interval" in place.</li> 051 * <li>{@code finishedTime} - when all work has completed successfully, failed, or stopped.</li> 052 * </ul> 053 * <p/> 054 * A unit of work is considered {@code finished} when the Flow or Cascade is no longer processing a workload and {@code successful}, 055 * {@code failed}, or {@code stopped} is true. 056 * 057 * @see CascadeStats 058 * @see FlowStats 059 * @see FlowStepStats 060 */ 061 public abstract class CascadingStats implements Serializable 062 { 063 public static final String STATS_STORE_INTERVAL = "cascading.stats.store.interval"; 064 065 /** 066 * Method setStatsStoreInterval sets the interval time between store operations against the underlying 067 * document storage services. This affects the rate at which metrics and status information is updated. 068 * 069 * @param properties of type Properties 070 * @param intervalMs milliseconds between storage calls 071 */ 072 public static void setStatsStoreInterval( Map<Object, Object> properties, long intervalMs ) 073 { 074 if( intervalMs <= 0 ) 075 throw new IllegalArgumentException( "interval must be greater than zero, got: " + intervalMs ); 076 077 properties.put( STATS_STORE_INTERVAL, Long.toString( intervalMs ) ); 078 } 079 080 public enum Status 081 { 082 PENDING, SKIPPED, STARTED, SUBMITTED, RUNNING, SUCCESSFUL, STOPPED, FAILED 083 } 084 085 /** Field name */ 086 final String name; 087 protected final ClientState clientState; 088 089 /** Field status */ 090 Status status = Status.PENDING; 091 092 /** Field pendingTime */ 093 long pendingTime; 094 /** Field startTime */ 095 long startTime; 096 /** Field submitTime */ 097 long submitTime; 098 /** Field runTime */ 099 long runTime; 100 /** Field finishedTime */ 101 long finishedTime; 102 /** Field throwable */ 103 Throwable throwable; 104 105 protected CascadingStats( String name, ClientState clientState ) 106 { 107 this.name = name; 108 this.clientState = clientState; 109 } 110 111 /** Method prepare initializes this instance. */ 112 public void prepare() 113 { 114 clientState.startService(); 115 } 116 117 /** Method cleanup destroys any resources allocated by this instance. */ 118 public void cleanup() 119 { 120 clientState.stopService(); 121 } 122 123 /** 124 * Method getID returns the ID of this CascadingStats object. 125 * 126 * @return the ID (type Object) of this CascadingStats object. 127 */ 128 public abstract String getID(); 129 130 /** 131 * Method getName returns the name of this CascadingStats object. 132 * 133 * @return the name (type String) of this CascadingStats object. 134 */ 135 public String getName() 136 { 137 return name; 138 } 139 140 /** 141 * Method getThrowable returns the throwable of this CascadingStats object. 142 * 143 * @return the throwable (type Throwable) of this CascadingStats object. 144 */ 145 public Throwable getThrowable() 146 { 147 return throwable; 148 } 149 150 /** 151 * Method isPending returns true if no work has been submitted. 152 * 153 * @return the pending (type boolean) of this CascadingStats object. 154 */ 155 public boolean isPending() 156 { 157 return status == Status.PENDING; 158 } 159 160 /** 161 * Method isSkipped returns true when the works was skipped. 162 * <p/> 163 * Flows are skipped if the appropriate {@link cascading.flow.FlowSkipStrategy#skipFlow(Flow)} 164 * returns {@code true}; 165 * 166 * @return the skipped (type boolean) of this CascadingStats object. 167 */ 168 public boolean isSkipped() 169 { 170 return status == Status.SKIPPED; 171 } 172 173 /** 174 * Method isStarted returns true when work has started. 175 * 176 * @return the started (type boolean) of this CascadingStats object. 177 */ 178 public boolean isStarted() 179 { 180 return status == Status.STARTED; 181 } 182 183 /** 184 * Method isSubmitted returns true if no work has started. 185 * 186 * @return the submitted (type boolean) of this CascadingStats object. 187 */ 188 public boolean isSubmitted() 189 { 190 return status == Status.SUBMITTED; 191 } 192 193 /** 194 * Method isRunning returns true when work has begun. 195 * 196 * @return the running (type boolean) of this CascadingStats object. 197 */ 198 public boolean isRunning() 199 { 200 return status == Status.RUNNING; 201 } 202 203 /** 204 * Method isEngaged returns true when there is work being executed, if 205 * {@link #isStarted()}, {@link #isSubmitted()}, or {@link #isRunning()} returns true; 206 * 207 * @return the engaged (type boolean) of this CascadingStats object. 208 */ 209 public boolean isEngaged() 210 { 211 return isStarted() || isSubmitted() || isRunning(); 212 } 213 214 /** 215 * Method isSuccessful returns true when work has completed successfully. 216 * 217 * @return the completed (type boolean) of this CascadingStats object. 218 */ 219 public boolean isSuccessful() 220 { 221 return status == Status.SUCCESSFUL; 222 } 223 224 /** 225 * Method isFailed returns true when the work ended with an error. 226 * 227 * @return the failed (type boolean) of this CascadingStats object. 228 */ 229 public boolean isFailed() 230 { 231 return status == Status.FAILED; 232 } 233 234 /** 235 * Method isStopped returns true when the user stopped the work. 236 * 237 * @return the stopped (type boolean) of this CascadingStats object. 238 */ 239 public boolean isStopped() 240 { 241 return status == Status.STOPPED; 242 } 243 244 /** 245 * Method isFinished returns true if the current status shows no work currently being executed, 246 * if {@link #isSkipped()}, {@link #isSuccessful()}, {@link #isFailed()}, or {@link #isStopped()} returns true. 247 * 248 * @return the finished (type boolean) of this CascadingStats object. 249 */ 250 public boolean isFinished() 251 { 252 return status == Status.SUCCESSFUL || status == Status.FAILED || status == Status.STOPPED || status == Status.SKIPPED; 253 } 254 255 /** 256 * Method getStatus returns the {@link Status} of this CascadingStats object. 257 * 258 * @return the status (type Status) of this CascadingStats object. 259 */ 260 public Status getStatus() 261 { 262 return status; 263 } 264 265 /** Method recordStats forces recording of current status information. */ 266 public void recordStats() 267 { 268 this.clientState.recordStats( this ); 269 } 270 271 public abstract void recordInfo(); 272 273 /** Method markPending sets the status to {@link Status#PENDING}. */ 274 public synchronized void markPending() 275 { 276 markPendingTime(); 277 recordStats(); 278 recordInfo(); 279 } 280 281 protected void markPendingTime() 282 { 283 if( pendingTime == 0 ) 284 pendingTime = System.currentTimeMillis(); 285 } 286 287 /** 288 * Method markStartedThenRunning consecutively marks the status as {@link Status#STARTED} then {@link Status#RUNNING} 289 * and forces the start and running time to be equals. 290 */ 291 public synchronized void markStartedThenRunning() 292 { 293 if( status != Status.PENDING ) 294 throw new IllegalStateException( "may not mark as " + Status.STARTED + ", is already " + status ); 295 296 markStartToRunTime(); 297 markStarted(); 298 markRunning(); 299 } 300 301 protected void markStartToRunTime() 302 { 303 startTime = submitTime = runTime = System.currentTimeMillis(); 304 } 305 306 /** Method markStarted sets the status to {@link Status#STARTED}. */ 307 public synchronized void markStarted() 308 { 309 if( status != Status.PENDING ) 310 throw new IllegalStateException( "may not mark as " + Status.STARTED + ", is already " + status ); 311 312 status = Status.STARTED; 313 markStartTime(); 314 315 clientState.start( startTime ); 316 clientState.setStatus( status, startTime ); 317 recordStats(); 318 } 319 320 protected void markStartTime() 321 { 322 if( startTime == 0 ) 323 startTime = System.currentTimeMillis(); 324 } 325 326 /** Method markSubmitted sets the status to {@link Status#SUBMITTED}. */ 327 public synchronized void markSubmitted() 328 { 329 if( status == Status.SUBMITTED ) 330 return; 331 332 if( status != Status.STARTED ) 333 throw new IllegalStateException( "may not mark as " + Status.SUBMITTED + ", is already " + status ); 334 335 status = Status.SUBMITTED; 336 markSubmitTime(); 337 338 clientState.submit( submitTime ); 339 clientState.setStatus( status, submitTime ); 340 recordStats(); 341 recordInfo(); 342 } 343 344 protected void markSubmitTime() 345 { 346 if( submitTime == 0 ) 347 submitTime = System.currentTimeMillis(); 348 } 349 350 /** Method markRunning sets the status to {@link Status#RUNNING}. */ 351 public synchronized void markRunning() 352 { 353 if( status == Status.RUNNING ) 354 return; 355 356 if( status != Status.STARTED && status != Status.SUBMITTED ) 357 throw new IllegalStateException( "may not mark as " + Status.RUNNING + ", is already " + status ); 358 359 status = Status.RUNNING; 360 markRunTime(); 361 362 clientState.run( runTime ); 363 clientState.setStatus( status, runTime ); 364 recordStats(); 365 } 366 367 protected void markRunTime() 368 { 369 if( runTime == 0 ) 370 runTime = System.currentTimeMillis(); 371 } 372 373 /** Method markSuccessful sets the status to {@link Status#SUCCESSFUL}. */ 374 public synchronized void markSuccessful() 375 { 376 if( status != Status.RUNNING && status != Status.SUBMITTED ) 377 throw new IllegalStateException( "may not mark as " + Status.SUCCESSFUL + ", is already " + status ); 378 379 status = Status.SUCCESSFUL; 380 markFinishedTime(); 381 382 clientState.setStatus( status, finishedTime ); 383 clientState.stop( finishedTime ); 384 recordStats(); 385 recordInfo(); 386 } 387 388 private void markFinishedTime() 389 { 390 finishedTime = System.currentTimeMillis(); 391 } 392 393 /** 394 * Method markFailed sets the status to {@link Status#FAILED}. 395 * 396 * @param throwable of type Throwable 397 */ 398 public synchronized void markFailed( Throwable throwable ) 399 { 400 if( status != Status.STARTED && status != Status.RUNNING && status != Status.SUBMITTED ) 401 throw new IllegalStateException( "may not mark as " + Status.FAILED + ", is already " + status ); 402 403 status = Status.FAILED; 404 markFinishedTime(); 405 this.throwable = throwable; 406 407 clientState.setStatus( status, finishedTime ); 408 clientState.stop( finishedTime ); 409 recordStats(); 410 recordInfo(); 411 } 412 413 /** Method markStopped sets the status to {@link Status#STOPPED}. */ 414 public synchronized void markStopped() 415 { 416 if( status != Status.PENDING && status != Status.STARTED && status != Status.SUBMITTED && status != Status.RUNNING ) 417 throw new IllegalStateException( "may not mark as " + Status.STOPPED + ", is already " + status ); 418 419 status = Status.STOPPED; 420 markFinishedTime(); 421 422 clientState.setStatus( status, finishedTime ); 423 recordStats(); 424 recordInfo(); 425 clientState.stop( finishedTime ); 426 } 427 428 /** Method markSkipped sets the status to {@link Status#SKIPPED}. */ 429 public synchronized void markSkipped() 430 { 431 if( status != Status.PENDING ) 432 throw new IllegalStateException( "may not mark as " + Status.SKIPPED + ", is already " + status ); 433 434 status = Status.SKIPPED; 435 436 clientState.setStatus( status, System.currentTimeMillis() ); 437 recordStats(); 438 } 439 440 /** 441 * Method getPendingTime returns the pendingTime of this CascadingStats object. 442 * 443 * @return the pendingTime (type long) of this CascadingStats object. 444 */ 445 public long getPendingTime() 446 { 447 return pendingTime; 448 } 449 450 /** 451 * Method getStartTime returns the startTime of this CascadingStats object. 452 * 453 * @return the startTime (type long) of this CascadingStats object. 454 */ 455 public long getStartTime() 456 { 457 return startTime; 458 } 459 460 /** 461 * Method getSubmitTime returns the submitTime of this CascadingStats object. 462 * 463 * @return the submitTime (type long) of this CascadingStats object. 464 */ 465 public long getSubmitTime() 466 { 467 return submitTime; 468 } 469 470 /** 471 * Method getRunTime returns the runTime of this CascadingStats object. 472 * 473 * @return the runTime (type long) of this CascadingStats object. 474 */ 475 public long getRunTime() 476 { 477 return runTime; 478 } 479 480 /** 481 * Method getFinishedTime returns the finishedTime of this CascadingStats object. 482 * 483 * @return the finishedTime (type long) of this CascadingStats object. 484 */ 485 public long getFinishedTime() 486 { 487 return finishedTime; 488 } 489 490 /** 491 * Method getDuration returns the duration the work executed before being finished. 492 * <p/> 493 * This method will return zero until the work is finished. See {@link #getCurrentDuration()} 494 * if you wish to poll for the current duration value. 495 * <p/> 496 * Duration is calculated as {@code finishedTime - startTime}. 497 * 498 * @return the duration (type long) of this CascadingStats object. 499 */ 500 public long getDuration() 501 { 502 if( finishedTime != 0 ) 503 return finishedTime - startTime; 504 else 505 return 0; 506 } 507 508 /** 509 * Method getCurrentDuration returns the current duration of the current work whether or not 510 * the work is finished. When finished, the return value will be the same as {@link #getDuration()}. 511 * <p/> 512 * Duration is calculated as {@code finishedTime - startTime}. 513 * 514 * @return the currentDuration (type long) of this CascadingStats object. 515 */ 516 public long getCurrentDuration() 517 { 518 if( finishedTime != 0 ) 519 return finishedTime - startTime; 520 else 521 return System.currentTimeMillis() - startTime; 522 } 523 524 /** 525 * Method getCounterGroups returns all the available counter group names. 526 * 527 * @return the counterGroups (type Collection<String>) of this CascadingStats object. 528 */ 529 public abstract Collection<String> getCounterGroups(); 530 531 /** 532 * Method getCounterGroupsMatching returns all the available counter group names that match 533 * the given regular expression. 534 * 535 * @param regex of type String 536 * @return Collection<String> 537 */ 538 public abstract Collection<String> getCounterGroupsMatching( String regex ); 539 540 /** 541 * Method getCountersFor returns all the counter names for the give group name. 542 * 543 * @param group 544 * @return Collection<String> 545 */ 546 public abstract Collection<String> getCountersFor( String group ); 547 548 /** 549 * Method getCountersFor returns all the counter names for the counter Enum. 550 * 551 * @param group 552 * @return Collection<String> 553 */ 554 public Collection<String> getCountersFor( Class<? extends Enum> group ) 555 { 556 return getCountersFor( group.getName() ); 557 } 558 559 /** 560 * Method getCounter returns the current value for the given counter Enum. 561 * 562 * @param counter of type Enum 563 * @return the current counter value 564 */ 565 public abstract long getCounterValue( Enum counter ); 566 567 /** 568 * Method getCounter returns the current value for the given group and counter. 569 * 570 * @param group of type String 571 * @param counter of type String 572 * @return the current counter value 573 */ 574 public abstract long getCounterValue( String group, String counter ); 575 576 /** 577 * Method captureDetail will recursively capture details about nested systems. Use this method to persist 578 * statistics about a given Cascade, Flow, or FlowStep. 579 * <p/> 580 * Each CascadingStats object must be individually inspected for any system specific details. 581 */ 582 public abstract void captureDetail(); 583 584 /** 585 * Method getChildren returns any relevant child statistics instances. They may not be of type CascadingStats, but 586 * instead platform specific. 587 * 588 * @return a Collection of child statistics 589 */ 590 public abstract Collection getChildren(); 591 592 protected String getStatsString() 593 { 594 String string = "status=" + status + ", startTime=" + startTime; 595 596 if( finishedTime != 0 ) 597 string += ", duration=" + ( finishedTime - startTime ); 598 599 return string; 600 } 601 602 @Override 603 public String toString() 604 { 605 return "Cascading{" + getStatsString() + '}'; 606 } 607 }