001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.Collections; 026import java.util.HashSet; 027import java.util.Set; 028import java.util.concurrent.Callable; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.ExecutorService; 031import java.util.concurrent.Executors; 032import java.util.concurrent.Future; 033import java.util.concurrent.ThreadFactory; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.TimeoutException; 036 037import cascading.flow.FlowException; 038import cascading.stats.CascadingStats; 039import org.apache.hadoop.conf.Configuration; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import static cascading.util.Util.formatDurationFromMillis; 044import static java.lang.System.currentTimeMillis; 045 046/** 047 * 048 */ 049public abstract class CounterCache<JobStatus, Counters> 050 { 051 public static final String COUNTER_TIMEOUT_PROPERTY = "cascading.counter.timeout.seconds"; 052 public static final String COUNTER_FETCH_RETRIES_PROPERTY = "cascading.counter.fetch.retries"; 053 public static final String COUNTER_MAX_AGE_PROPERTY = "cascading.counter.age.max.seconds"; 054 public static final int DEFAULT_TIMEOUT_TIMEOUT_SEC = 0; // zero means making the call synchronously 055 public static final int DEFAULT_FETCH_RETRIES = 3; 056 public static final int DEFAULT_CACHED_AGE_MAX = 0; // rely on client interface caching 057 058 private static final Logger LOG = LoggerFactory.getLogger( CounterCache.class ); 059 060 // hardcoded at one thread to force serialization across all requesters in the jvm 061 // this likely prevents the deadlocks the futures are safeguards against 062 private static ExecutorService futuresPool = Executors.newSingleThreadExecutor( new ThreadFactory() 063 { 064 @Override 065 public Thread newThread( Runnable runnable ) 066 { 067 Thread thread = new Thread( runnable, "stats-counter-future" ); 068 069 thread.setDaemon( true ); 070 071 return thread; 072 } 073 } ); 074 075 private CascadingStats stats; 076 private boolean hasCapturedFinalCounters; 077 private boolean hasAvailableCounters = true; 078 private Counters cachedCounters = null; 079 private long lastFetch = -1; 080 private boolean warnedStale = false; 081 082 protected int maxFetchAttempts; 083 protected int fetchAttempts; 084 protected int timeout; 085 protected int maxAge; 086 087 protected CounterCache( CascadingStats stats, Configuration configuration ) 088 { 089 this.stats = stats; 090 this.timeout = configuration.getInt( COUNTER_TIMEOUT_PROPERTY, DEFAULT_TIMEOUT_TIMEOUT_SEC ); 091 this.maxFetchAttempts = configuration.getInt( COUNTER_FETCH_RETRIES_PROPERTY, DEFAULT_FETCH_RETRIES ); 092 this.maxAge = configuration.getInt( COUNTER_MAX_AGE_PROPERTY, DEFAULT_CACHED_AGE_MAX ); 093 } 094 095 public long getLastSuccessfulFetch() 096 { 097 return lastFetch; 098 } 099 100 protected abstract JobStatus getJobStatusClient(); 101 102 protected abstract boolean areCountersAvailable( JobStatus runningJob ); 103 104 protected abstract Counters getCounters( JobStatus runningJob ) throws IOException; 105 106 protected abstract Collection<String> getGroupNames( Counters counters ); 107 108 protected abstract Set<String> getCountersFor( Counters counters, String group ); 109 110 protected abstract long getCounterValue( Counters counters, Enum counter ); 111 112 protected abstract long getCounterValue( Counters counters, String group, String counter ); 113 114 public Collection<String> getCounterGroups() 115 { 116 Counters counters = cachedCounters(); 117 118 if( counters == null ) 119 return Collections.emptySet(); 120 121 return Collections.unmodifiableCollection( getGroupNames( counters ) ); 122 } 123 124 public Collection<String> getCounterGroupsMatching( String regex ) 125 { 126 Counters counters = cachedCounters(); 127 128 if( counters == null ) 129 return Collections.emptySet(); 130 131 Set<String> results = new HashSet<String>(); 132 133 for( String counter : getGroupNames( counters ) ) 134 { 135 if( counter.matches( regex ) ) 136 results.add( counter ); 137 } 138 139 return Collections.unmodifiableCollection( results ); 140 } 141 142 public Collection<String> getCountersFor( String group ) 143 { 144 Counters counters = cachedCounters(); 145 146 if( counters == null ) 147 return Collections.emptySet(); 148 149 Set<String> results = getCountersFor( counters, group ); 150 151 return Collections.unmodifiableCollection( results ); 152 } 153 154 public long getCounterValue( Enum counter ) 155 { 156 Counters counters = cachedCounters(); 157 158 if( counters == null ) 159 return 0; 160 161 return getCounterValue( counters, counter ); 162 } 163 164 public long getCounterValue( String group, String counter ) 165 { 166 Counters counters = cachedCounters(); 167 168 if( counters == null ) 169 return 0; 170 171 return getCounterValue( counters, group, counter ); 172 } 173 174 protected Counters cachedCounters() 175 { 176 return cachedCounters( false ); 177 } 178 179 protected synchronized Counters cachedCounters( boolean force ) 180 { 181 if( !hasAvailableCounters ) 182 return cachedCounters; 183 184 // no point in trying again 185 if( fetchAttempts >= maxFetchAttempts ) 186 { 187 if( !hasCapturedFinalCounters && !warnedStale ) 188 { 189 if( cachedCounters == null ) 190 LOG.warn( "no counters fetched, max num consecutive retries reached: {}, type: {}, status: {}", maxFetchAttempts, stats.getType(), stats.getStatus() ); 191 else 192 LOG.warn( "stale counters being returned, max num consecutive retries reached, age: {}, type: {}, status: {}", formatDurationFromMillis( currentTimeMillis() - lastFetch ), stats.getType(), stats.getStatus() ); 193 194 warnedStale = true; 195 } 196 197 return cachedCounters; 198 } 199 200 boolean isProcessFinished = stats.isFinished(); 201 202 // ignore force, no reason to refresh completed stats 203 if( isProcessFinished && hasCapturedFinalCounters ) 204 return cachedCounters; 205 206 // have not capturedFinalCounters - force it 207 if( !force && isProcessFinished ) 208 force = true; 209 210 int currentAge = (int) ( ( lastFetch - currentTimeMillis() ) / 1000 ); 211 212 boolean isStale = currentAge >= maxAge; 213 214 // if we have counters, aren't being forced to update, and values aren't considered stale, return them 215 if( cachedCounters != null && !force && !isStale ) 216 return cachedCounters; 217 218 JobStatus runningJob = getJobStatusClient(); 219 220 if( runningJob == null ) 221 return cachedCounters; 222 223 if( !areCountersAvailable( runningJob ) ) 224 { 225 hasAvailableCounters = false; 226 return cachedCounters; 227 } 228 229 boolean success = false; 230 231 try 232 { 233 Counters fetched = fetchCounters( runningJob ); 234 235 success = fetched != null; 236 237 if( success ) 238 { 239 cachedCounters = fetched; 240 lastFetch = currentTimeMillis(); 241 fetchAttempts = 0; // reset attempt counter, mitigates for transient non-consecutive failures 242 } 243 } 244 catch( InterruptedException exception ) 245 { 246 LOG.warn( "fetching counters was interrupted" ); 247 } 248 catch( ExecutionException exception ) 249 { 250 fetchAttempts++; 251 252 if( fetchAttempts >= maxFetchAttempts ) 253 LOG.error( "fetching counters failed, was final consecutive attempt: {}, type: {}, status: {}", fetchAttempts, stats.getType(), stats.getStatus(), exception.getCause() ); 254 else 255 LOG.warn( "fetching counters failed, consecutive attempts: {}, type: {}, status: {}, message: {}", fetchAttempts, stats.getType(), stats.getStatus(), exception.getCause().getMessage() ); 256 257 if( cachedCounters != null ) 258 { 259 LOG.error( "returning cached values" ); 260 261 return cachedCounters; 262 } 263 264 LOG.error( "unable to get remote counters, no cached values, rethrowing exception", exception.getCause() ); 265 266 if( exception.getCause() instanceof FlowException ) 267 throw (FlowException) exception.getCause(); 268 269 throw new FlowException( exception.getCause() ); 270 } 271 catch( TimeoutException exception ) 272 { 273 fetchAttempts++; 274 275 if( fetchAttempts >= maxFetchAttempts ) 276 LOG.warn( "fetching counters timed out after: {} seconds, was final consecutive attempt: {}, type: {}, status: {}", timeout, fetchAttempts, stats.getType(), stats.getStatus() ); 277 else 278 LOG.warn( "fetching counters timed out after: {} seconds, consecutive attempts: {}, type: {}, status: {}", timeout, fetchAttempts, stats.getType(), stats.getStatus() ); 279 } 280 281 hasCapturedFinalCounters = isProcessFinished && success; 282 283 return cachedCounters; 284 } 285 286 private Counters fetchCounters( JobStatus runningJob ) throws InterruptedException, ExecutionException, TimeoutException 287 { 288 // if timeout greater than zero, perform async call 289 if( timeout > 0 ) 290 return runFuture( runningJob ).get( timeout, TimeUnit.SECONDS ); 291 292 try 293 { 294 return getCounters( runningJob ); 295 } 296 catch( IOException exception ) 297 { 298 throw new FlowException( "unable to get remote counter values", exception ); 299 } 300 } 301 302 private Future<Counters> runFuture( final JobStatus jobStatus ) 303 { 304 Callable<Counters> task = new Callable<Counters>() 305 { 306 @Override 307 public Counters call() throws Exception 308 { 309 try 310 { 311 return getCounters( jobStatus ); 312 } 313 catch( IOException exception ) 314 { 315 throw new FlowException( "unable to get remote counter values", exception ); 316 } 317 } 318 }; 319 320 return futuresPool.submit( task ); 321 } 322 }