001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.Collections;
026import java.util.HashSet;
027import java.util.Set;
028import java.util.concurrent.Callable;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.ExecutorService;
031import java.util.concurrent.Executors;
032import java.util.concurrent.Future;
033import java.util.concurrent.ThreadFactory;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036
037import cascading.flow.FlowException;
038import cascading.stats.CascadingStats;
039import org.apache.hadoop.conf.Configuration;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import static cascading.util.Util.formatDurationFromMillis;
044import static java.lang.System.currentTimeMillis;
045
046/**
047 *
048 */
049public abstract class CounterCache<JobStatus, Counters>
050  {
051  public static final String COUNTER_TIMEOUT_PROPERTY = "cascading.counter.timeout.seconds";
052  public static final String COUNTER_FETCH_RETRIES_PROPERTY = "cascading.counter.fetch.retries";
053  public static final String COUNTER_MAX_AGE_PROPERTY = "cascading.counter.age.max.seconds";
054  public static final int DEFAULT_TIMEOUT_TIMEOUT_SEC = 0; // zero means making the call synchronously
055  public static final int DEFAULT_FETCH_RETRIES = 3;
056  public static final int DEFAULT_CACHED_AGE_MAX = 0; // rely on client interface caching
057
058  private static final Logger LOG = LoggerFactory.getLogger( CounterCache.class );
059
060  // hardcoded at one thread to force serialization across all requesters in the jvm
061  // this likely prevents the deadlocks the futures are safeguards against
062  private static ExecutorService futuresPool = Executors.newSingleThreadExecutor( new ThreadFactory()
063  {
064  @Override
065  public Thread newThread( Runnable runnable )
066    {
067    Thread thread = new Thread( runnable, "stats-counter-future" );
068
069    thread.setDaemon( true );
070
071    return thread;
072    }
073  } );
074
075  private CascadingStats stats;
076  private boolean hasCapturedFinalCounters;
077  private boolean hasAvailableCounters = true;
078  private Counters cachedCounters = null;
079  private long lastFetch = -1;
080  private boolean warnedStale = false;
081
082  protected int maxFetchAttempts;
083  protected int fetchAttempts;
084  protected int timeout;
085  protected int maxAge;
086
087  protected CounterCache( CascadingStats stats, Configuration configuration )
088    {
089    this.stats = stats;
090    this.timeout = configuration.getInt( COUNTER_TIMEOUT_PROPERTY, DEFAULT_TIMEOUT_TIMEOUT_SEC );
091    this.maxFetchAttempts = configuration.getInt( COUNTER_FETCH_RETRIES_PROPERTY, DEFAULT_FETCH_RETRIES );
092    this.maxAge = configuration.getInt( COUNTER_MAX_AGE_PROPERTY, DEFAULT_CACHED_AGE_MAX );
093    }
094
095  public long getLastSuccessfulFetch()
096    {
097    return lastFetch;
098    }
099
100  protected abstract JobStatus getJobStatusClient();
101
102  protected abstract boolean areCountersAvailable( JobStatus runningJob );
103
104  protected abstract Counters getCounters( JobStatus runningJob ) throws IOException;
105
106  protected abstract Collection<String> getGroupNames( Counters counters );
107
108  protected abstract Set<String> getCountersFor( Counters counters, String group );
109
110  protected abstract long getCounterValue( Counters counters, Enum counter );
111
112  protected abstract long getCounterValue( Counters counters, String group, String counter );
113
114  public Collection<String> getCounterGroups()
115    {
116    Counters counters = cachedCounters();
117
118    if( counters == null )
119      return Collections.emptySet();
120
121    return Collections.unmodifiableCollection( getGroupNames( counters ) );
122    }
123
124  public Collection<String> getCounterGroupsMatching( String regex )
125    {
126    Counters counters = cachedCounters();
127
128    if( counters == null )
129      return Collections.emptySet();
130
131    Set<String> results = new HashSet<String>();
132
133    for( String counter : getGroupNames( counters ) )
134      {
135      if( counter.matches( regex ) )
136        results.add( counter );
137      }
138
139    return Collections.unmodifiableCollection( results );
140    }
141
142  public Collection<String> getCountersFor( String group )
143    {
144    Counters counters = cachedCounters();
145
146    if( counters == null )
147      return Collections.emptySet();
148
149    Set<String> results = getCountersFor( counters, group );
150
151    return Collections.unmodifiableCollection( results );
152    }
153
154  public long getCounterValue( Enum counter )
155    {
156    Counters counters = cachedCounters();
157
158    if( counters == null )
159      return 0;
160
161    return getCounterValue( counters, counter );
162    }
163
164  public long getCounterValue( String group, String counter )
165    {
166    Counters counters = cachedCounters();
167
168    if( counters == null )
169      return 0;
170
171    return getCounterValue( counters, group, counter );
172    }
173
174  protected Counters cachedCounters()
175    {
176    return cachedCounters( false );
177    }
178
179  protected synchronized Counters cachedCounters( boolean force )
180    {
181    if( !hasAvailableCounters )
182      return cachedCounters;
183
184    // no point in trying again
185    if( fetchAttempts >= maxFetchAttempts )
186      {
187      if( !hasCapturedFinalCounters && !warnedStale )
188        {
189        if( cachedCounters == null )
190          LOG.warn( "no counters fetched, max num consecutive retries reached: {}, type: {}, status: {}", maxFetchAttempts, stats.getType(), stats.getStatus() );
191        else
192          LOG.warn( "stale counters being returned, max num consecutive retries reached, age: {}, type: {}, status: {}", formatDurationFromMillis( currentTimeMillis() - lastFetch ), stats.getType(), stats.getStatus() );
193
194        warnedStale = true;
195        }
196
197      return cachedCounters;
198      }
199
200    boolean isProcessFinished = stats.isFinished();
201
202    // ignore force, no reason to refresh completed stats
203    if( isProcessFinished && hasCapturedFinalCounters )
204      return cachedCounters;
205
206    // have not capturedFinalCounters - force it
207    if( !force && isProcessFinished )
208      force = true;
209
210    int currentAge = (int) ( ( lastFetch - currentTimeMillis() ) / 1000 );
211
212    boolean isStale = currentAge >= maxAge;
213
214    // if we have counters, aren't being forced to update, and values aren't considered stale, return them
215    if( cachedCounters != null && !force && !isStale )
216      return cachedCounters;
217
218    JobStatus runningJob = getJobStatusClient();
219
220    if( runningJob == null )
221      return cachedCounters;
222
223    if( !areCountersAvailable( runningJob ) )
224      {
225      hasAvailableCounters = false;
226      return cachedCounters;
227      }
228
229    boolean success = false;
230
231    try
232      {
233      Counters fetched = fetchCounters( runningJob );
234
235      success = fetched != null;
236
237      if( success )
238        {
239        cachedCounters = fetched;
240        lastFetch = currentTimeMillis();
241        fetchAttempts = 0; // reset attempt counter, mitigates for transient non-consecutive failures
242        }
243      }
244    catch( InterruptedException exception )
245      {
246      LOG.warn( "fetching counters was interrupted" );
247      }
248    catch( ExecutionException exception )
249      {
250      fetchAttempts++;
251
252      if( fetchAttempts >= maxFetchAttempts )
253        LOG.error( "fetching counters failed, was final consecutive attempt: {}, type: {}, status: {}", fetchAttempts, stats.getType(), stats.getStatus(), exception.getCause() );
254      else
255        LOG.warn( "fetching counters failed, consecutive attempts: {}, type: {}, status: {}, message: {}", fetchAttempts, stats.getType(), stats.getStatus(), exception.getCause().getMessage() );
256
257      if( cachedCounters != null )
258        {
259        LOG.error( "returning cached values" );
260
261        return cachedCounters;
262        }
263
264      LOG.error( "unable to get remote counters, no cached values, rethrowing exception", exception.getCause() );
265
266      if( exception.getCause() instanceof FlowException )
267        throw (FlowException) exception.getCause();
268
269      throw new FlowException( exception.getCause() );
270      }
271    catch( TimeoutException exception )
272      {
273      fetchAttempts++;
274
275      if( fetchAttempts >= maxFetchAttempts )
276        LOG.warn( "fetching counters timed out after: {} seconds, was final consecutive attempt: {}, type: {}, status: {}", timeout, fetchAttempts, stats.getType(), stats.getStatus() );
277      else
278        LOG.warn( "fetching counters timed out after: {} seconds, consecutive attempts: {}, type: {}, status: {}", timeout, fetchAttempts, stats.getType(), stats.getStatus() );
279      }
280
281    hasCapturedFinalCounters = isProcessFinished && success;
282
283    return cachedCounters;
284    }
285
286  private Counters fetchCounters( JobStatus runningJob ) throws InterruptedException, ExecutionException, TimeoutException
287    {
288    // if timeout greater than zero, perform async call
289    if( timeout > 0 )
290      return runFuture( runningJob ).get( timeout, TimeUnit.SECONDS );
291
292    try
293      {
294      return getCounters( runningJob );
295      }
296    catch( IOException exception )
297      {
298      throw new FlowException( "unable to get remote counter values", exception );
299      }
300    }
301
302  private Future<Counters> runFuture( final JobStatus jobStatus )
303    {
304    Callable<Counters> task = new Callable<Counters>()
305    {
306    @Override
307    public Counters call() throws Exception
308      {
309      try
310        {
311        return getCounters( jobStatus );
312        }
313      catch( IOException exception )
314        {
315        throw new FlowException( "unable to get remote counter values", exception );
316        }
317      }
318    };
319
320    return futuresPool.submit( task );
321    }
322  }