001    /*
002     * Copyright (c) 2007-2014 Concurrent, Inc. All Rights Reserved.
003     *
004     * Project and contact information: http://www.cascading.org/
005     *
006     * This file is part of the Cascading project.
007     *
008     * Licensed under the Apache License, Version 2.0 (the "License");
009     * you may not use this file except in compliance with the License.
010     * You may obtain a copy of the License at
011     *
012     *     http://www.apache.org/licenses/LICENSE-2.0
013     *
014     * Unless required by applicable law or agreed to in writing, software
015     * distributed under the License is distributed on an "AS IS" BASIS,
016     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017     * See the License for the specific language governing permissions and
018     * limitations under the License.
019     */
020    
021    package cascading.stats.hadoop;
022    
023    import java.io.IOException;
024    import java.util.Collection;
025    import java.util.Collections;
026    import java.util.HashMap;
027    import java.util.HashSet;
028    import java.util.Map;
029    import java.util.Set;
030    import java.util.concurrent.Callable;
031    import java.util.concurrent.ExecutionException;
032    import java.util.concurrent.ExecutorService;
033    import java.util.concurrent.Executors;
034    import java.util.concurrent.Future;
035    import java.util.concurrent.ThreadFactory;
036    import java.util.concurrent.TimeUnit;
037    import java.util.concurrent.TimeoutException;
038    
039    import cascading.flow.FlowException;
040    import cascading.flow.FlowStep;
041    import cascading.management.state.ClientState;
042    import cascading.stats.FlowStepStats;
043    import org.apache.hadoop.mapred.Counters;
044    import org.apache.hadoop.mapred.JobClient;
045    import org.apache.hadoop.mapred.JobConf;
046    import org.apache.hadoop.mapred.RunningJob;
047    import org.slf4j.Logger;
048    import org.slf4j.LoggerFactory;
049    
050    /** Class BaseHadoopStepStats is a base class to Hadoop specific statistics and methods to underlying Hadoop facilities. */
051    public abstract class BaseHadoopStepStats extends FlowStepStats
052      {
053      public static final String COUNTER_TIMEOUT_PROPERTY = "cascading.step.counter.timeout";
054    
055      /** Field LOG */
056      private static final Logger LOG = LoggerFactory.getLogger( BaseHadoopStepStats.class );
057    
058      public static final int TIMEOUT_MAX = 3;
059    
060      /** Field numMapTasks */
061      int numMapTasks;
062      /** Field numReducerTasks */
063      int numReduceTasks;
064    
065      /** Fields counters */
066      private Counters cachedCounters = null;
067    
068      /** Fields timeouts */
069      private int timeouts;
070    
071      /** Field taskStats */
072      Map<String, HadoopSliceStats> taskStats = (Map<String, HadoopSliceStats>) Collections.EMPTY_MAP;
073    
074      protected BaseHadoopStepStats( FlowStep<JobConf> flowStep, ClientState clientState )
075        {
076        super( flowStep, clientState );
077        }
078    
079      /**
080       * Method getTaskStats returns the taskStats of this HadoopStepStats object.
081       *
082       * @return the taskStats (type ArrayList<HadoopTaskStats>) of this HadoopStepStats object.
083       */
084      public Map<String, HadoopSliceStats> getTaskStats()
085        {
086        return taskStats;
087        }
088    
089      protected void setTaskStats( Map<String, HadoopSliceStats> taskStats )
090        {
091        this.taskStats = taskStats;
092        }
093    
094      /**
095       * Method getNumMapTasks returns the numMapTasks from the Hadoop job file.
096       *
097       * @return the numMapTasks (type int) of this HadoopStepStats object.
098       */
099      public int getNumMapTasks()
100        {
101        return numMapTasks;
102        }
103    
104      void setNumMapTasks( int numMapTasks )
105        {
106        this.numMapTasks = numMapTasks;
107        }
108    
109      /**
110       * Method getNumReduceTasks returns the numReducerTasks from the Hadoop job file.
111       *
112       * @return the numReducerTasks (type int) of this HadoopStepStats object.
113       */
114      public int getNumReduceTasks()
115        {
116        return numReduceTasks;
117        }
118    
119      void setNumReduceTasks( int numReduceTasks )
120        {
121        this.numReduceTasks = numReduceTasks;
122        }
123    
124      /**
125       * Method getJobID returns the Hadoop running job JobID.
126       *
127       * @return the jobID (type String) of this HadoopStepStats object.
128       */
129      public String getJobID()
130        {
131        if( getRunningJob() == null )
132          return null;
133    
134        return getRunningJob().getJobID();
135        }
136    
137      /**
138       * Method getJobClient returns the Hadoop {@link JobClient} managing this Hadoop job.
139       *
140       * @return the jobClient (type JobClient) of this HadoopStepStats object.
141       */
142      public abstract JobClient getJobClient();
143    
144      /**
145       * Method getRunningJob returns the Hadoop {@link RunningJob} managing this Hadoop job.
146       *
147       * @return the runningJob (type RunningJob) of this HadoopStepStats object.
148       */
149      public abstract RunningJob getRunningJob();
150    
151      /**
152       * Method getCounterGroups returns all of the Hadoop counter groups.
153       *
154       * @return the counterGroups (type Collection<String>) of this HadoopStepStats object.
155       */
156      @Override
157      public Collection<String> getCounterGroups()
158        {
159        Counters counters = cachedCounters();
160    
161        if( counters == null )
162          return Collections.emptySet();
163    
164        return Collections.unmodifiableCollection( counters.getGroupNames() );
165        }
166    
167      /**
168       * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern.
169       *
170       * @param regex of String
171       * @return Collection<String>
172       */
173      @Override
174      public Collection<String> getCounterGroupsMatching( String regex )
175        {
176        Counters counters = cachedCounters();
177    
178        if( counters == null )
179          return Collections.emptySet();
180    
181        Set<String> results = new HashSet<String>();
182    
183        for( String counter : counters.getGroupNames() )
184          {
185          if( counter.matches( regex ) )
186            results.add( counter );
187          }
188    
189        return Collections.unmodifiableCollection( results );
190        }
191    
192      /**
193       * Method getCountersFor returns the Hadoop counters for the given group.
194       *
195       * @param group of String
196       * @return Collection<String>
197       */
198      @Override
199      public Collection<String> getCountersFor( String group )
200        {
201        Counters counters = cachedCounters();
202    
203        if( counters == null )
204          return Collections.emptySet();
205    
206        Set<String> results = new HashSet<String>();
207    
208        for( Counters.Counter counter : counters.getGroup( group ) )
209          results.add( counter.getName() );
210    
211        return Collections.unmodifiableCollection( results );
212        }
213    
214      /**
215       * Method getCounterValue returns the Hadoop counter value for the given counter enum.
216       *
217       * @param counter of Enum
218       * @return long
219       */
220      @Override
221      public long getCounterValue( Enum counter )
222        {
223        Counters counters = cachedCounters();
224    
225        if( counters == null )
226          return 0;
227    
228        return counters.getCounter( counter );
229        }
230    
231      /**
232       * Method getCounterValue returns the Hadoop counter value for the given group and counter name.
233       *
234       * @param group   of String
235       * @param counter of String
236       * @return long
237       */
238      @Override
239      public long getCounterValue( String group, String counter )
240        {
241        Counters counters = cachedCounters();
242    
243        if( counters == null )
244          return 0;
245    
246        Counters.Group counterGroup = counters.getGroup( group );
247    
248        if( group == null )
249          return 0;
250    
251        // geCounter actually searches the display name, wtf
252        // in theory this is lazily created if does not exist, but don't rely on it
253        Counters.Counter counterValue = counterGroup.getCounterForName( counter );
254    
255        if( counter == null )
256          return 0;
257    
258        return counterValue.getValue();
259        }
260    
261      protected Counters cachedCounters()
262        {
263        return cachedCounters( false );
264        }
265    
266      protected synchronized Counters cachedCounters( boolean force )
267        {
268        if( !force && ( isFinished() || timeouts >= TIMEOUT_MAX ) )
269          return cachedCounters;
270    
271        RunningJob runningJob = getRunningJob();
272    
273        if( runningJob == null )
274          return cachedCounters;
275    
276        Future<Counters> future = runFuture( runningJob );
277    
278        int timeout = ( (JobConf) getFlowStep().getConfig() ).getInt( COUNTER_TIMEOUT_PROPERTY, 5 );
279    
280        try
281          {
282          Counters fetched = future.get( timeout, TimeUnit.SECONDS );
283    
284          if( fetched != null )
285            cachedCounters = fetched;
286          }
287        catch( InterruptedException exception )
288          {
289          LOG.warn( "fetching counters was interrupted" );
290          }
291        catch( ExecutionException exception )
292          {
293          if( cachedCounters != null )
294            {
295            LOG.error( "unable to get remote counters, returning cached values", exception.getCause() );
296    
297            return cachedCounters;
298            }
299    
300          LOG.error( "unable to get remote counters, no cached values, throwing exception", exception.getCause() );
301    
302          if( exception.getCause() instanceof FlowException )
303            throw (FlowException) exception.getCause();
304    
305          throw new FlowException( exception.getCause() );
306          }
307        catch( TimeoutException exception )
308          {
309          timeouts++;
310    
311          if( timeouts >= TIMEOUT_MAX )
312            LOG.warn( "fetching counters timed out after: {} seconds, final attempt: {}", timeout, timeouts );
313          else
314            LOG.warn( "fetching counters timed out after: {} seconds, attempts: {}", timeout, timeouts );
315          }
316    
317        return cachedCounters;
318        }
319    
320      // hardcoded at one thread to force serialization across all requesters in the jvm
321      // this likely prevents the deadlocks the futures are safeguards against
322      private static ExecutorService futuresPool = Executors.newSingleThreadExecutor( new ThreadFactory()
323      {
324      @Override
325      public Thread newThread( Runnable runnable )
326        {
327        Thread thread = new Thread( runnable, "stats-futures" );
328    
329        thread.setDaemon( true );
330    
331        return thread;
332        }
333      } );
334    
335      private Future<Counters> runFuture( final RunningJob runningJob )
336        {
337        Callable<Counters> task = new Callable<Counters>()
338        {
339        @Override
340        public Counters call() throws Exception
341          {
342          try
343            {
344            return runningJob.getCounters();
345            }
346          catch( IOException exception )
347            {
348            throw new FlowException( "unable to get remote counter values" );
349            }
350          }
351        };
352    
353        return futuresPool.submit( task );
354        }
355    
356      /**
357       * Returns the underlying Map tasks progress percentage.
358       * <p/>
359       * This method is experimental.
360       *
361       * @return float
362       */
363      public float getMapProgress()
364        {
365        RunningJob runningJob = getRunningJob();
366    
367        if( runningJob == null )
368          return 0;
369    
370        try
371          {
372          return runningJob.mapProgress();
373          }
374        catch( IOException exception )
375          {
376          throw new FlowException( "unable to get progress" );
377          }
378        }
379    
380      /**
381       * Returns the underlying Reduce tasks progress percentage.
382       * <p/>
383       * This method is experimental.
384       *
385       * @return float
386       */
387      public float getReduceProgress()
388        {
389        RunningJob runningJob = getRunningJob();
390    
391        if( runningJob == null )
392          return 0;
393    
394        try
395          {
396          return runningJob.reduceProgress();
397          }
398        catch( IOException exception )
399          {
400          throw new FlowException( "unable to get progress" );
401          }
402        }
403    
404      public String getStatusURL()
405        {
406        RunningJob runningJob = getRunningJob();
407    
408        if( runningJob == null )
409          return null;
410    
411        return runningJob.getTrackingURL();
412        }
413    
414      /**
415       * Method getChildren returns the children of this HadoopStepStats object.
416       *
417       * @return the children (type Collection) of this HadoopStepStats object.
418       */
419      @Override
420      public Collection getChildren()
421        {
422        return getTaskStats().values();
423        }
424    
425      public Set<String> getChildIDs()
426        {
427        return getTaskStats().keySet();
428        }
429    
430      /** Synchronized to prevent state changes mid record, #stop may be called out of band */
431      @Override
432      public synchronized void recordChildStats()
433        {
434        try
435          {
436          cachedCounters( true );
437          }
438        catch( Exception exception )
439          {
440          // do nothing
441          }
442    
443        // if null instance don't bother capturing detail
444        if( !clientState.isEnabled() )
445          return;
446    
447        captureDetail();
448    
449        try
450          {
451          for( String id : taskStats.keySet() )
452            clientState.record( id, taskStats.get( id ) );
453          }
454        catch( Exception exception )
455          {
456          LOG.error( "unable to record slice stats", exception );
457          }
458        }
459    
460      /** Method captureDetail captures statistics task details and completion events. */
461      @Override
462      public synchronized void captureDetail()
463        {
464        captureDetail( true );
465        }
466    
467      public void captureDetail( boolean captureAttempts )
468        {
469        Map<String, HadoopSliceStats> newStats = new HashMap<String, HadoopSliceStats>();
470    
471        JobClient jobClient = getJobClient();
472        RunningJob runningJob = getRunningJob();
473    
474        if( jobClient == null || runningJob == null )
475          return;
476    
477        numMapTasks = 0;
478        numReduceTasks = 0;
479    
480        try
481          {
482          // cleanup/setup tasks have no useful info so far.
483    //      addTaskStats( newStats, HadoopSliceStats.Kind.SETUP, false );
484    //      addTaskStats( newStats, HadoopSliceStats.Kind.CLEANUP, false );
485          addTaskStats( newStats, HadoopSliceStats.Kind.MAPPER, false );
486          addTaskStats( newStats, HadoopSliceStats.Kind.REDUCER, false );
487    
488          addAttemptsToTaskStats( newStats, captureAttempts );
489    
490          setTaskStats( newStats );
491          }
492        catch( IOException exception )
493          {
494          LOG.warn( "unable to get task stats", exception );
495          }
496        }
497    
498      boolean stepHasReducers()
499        {
500        return !getFlowStep().getGroups().isEmpty();
501        }
502    
503      void incrementKind( HadoopSliceStats.Kind kind )
504        {
505        switch( kind )
506          {
507          case SETUP:
508            break;
509          case MAPPER:
510            numMapTasks++;
511            break;
512          case REDUCER:
513            numReduceTasks++;
514            break;
515          case CLEANUP:
516            break;
517          }
518        }
519    
520      protected abstract void addTaskStats( Map<String, HadoopSliceStats> taskStats, HadoopSliceStats.Kind kind, boolean skipLast ) throws IOException;
521    
522      protected abstract void addAttemptsToTaskStats( Map<String, HadoopSliceStats> taskStats, boolean captureAttempts );
523      }