001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.util.Collection;
024import java.util.Map;
025
026import cascading.flow.FlowStep;
027import cascading.management.state.ClientState;
028import cascading.stats.FlowNodeStats;
029import cascading.stats.FlowStepStats;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 *
035 */
036public abstract class BaseHadoopStepStats<JobStatusClient, Counters> extends FlowStepStats
037  {
038  private static final Logger LOG = LoggerFactory.getLogger( BaseHadoopStepStats.class );
039
040  protected CounterCache<JobStatusClient, Counters> counterCache;
041
042  public BaseHadoopStepStats( FlowStep flowStep, ClientState clientState )
043    {
044    super( flowStep, clientState );
045    }
046
047  /**
048   * Method getRunningJob returns the Hadoop {@link org.apache.hadoop.mapred.RunningJob} managing this Hadoop job.
049   *
050   * @return the runningJob (type RunningJob) of this HadoopStepStats object.
051   */
052  public abstract JobStatusClient getJobStatusClient();
053
054  @Override
055  public long getLastSuccessfulCounterFetchTime()
056    {
057    if( counterCache != null )
058      return counterCache.getLastSuccessfulFetch();
059
060    return -1;
061    }
062
063  /**
064   * Method getCounterGroups returns all of the Hadoop counter groups.
065   *
066   * @return the counterGroups (type Collection<String>) of this HadoopStepStats object.
067   */
068  @Override
069  public Collection<String> getCounterGroups()
070    {
071    return counterCache.getCounterGroups();
072    }
073
074  /**
075   * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern.
076   *
077   * @param regex of String
078   * @return Collection<String>
079   */
080  @Override
081  public Collection<String> getCounterGroupsMatching( String regex )
082    {
083    return counterCache.getCounterGroupsMatching( regex );
084    }
085
086  /**
087   * Method getCountersFor returns the Hadoop counters for the given group.
088   *
089   * @param group of String
090   * @return Collection<String>
091   */
092  @Override
093  public Collection<String> getCountersFor( String group )
094    {
095    return counterCache.getCountersFor( group );
096    }
097
098  /**
099   * Method getCounterValue returns the Hadoop counter value for the given counter enum.
100   *
101   * @param counter of Enum
102   * @return long
103   */
104  @Override
105  public long getCounterValue( Enum counter )
106    {
107    return counterCache.getCounterValue( counter );
108    }
109
110  /**
111   * Method getCounterValue returns the Hadoop counter value for the given group and counter name.
112   *
113   * @param group   of String
114   * @param counter of String
115   * @return long
116   */
117  @Override
118  public long getCounterValue( String group, String counter )
119    {
120    return counterCache.getCounterValue( group, counter );
121    }
122
123  protected synchronized Counters cachedCounters( boolean force )
124    {
125    return counterCache.cachedCounters( force );
126    }
127
128  /** Synchronized to prevent state changes mid record, #stop may be called out of band */
129  @Override
130  public synchronized void recordChildStats()
131    {
132    try
133      {
134      cachedCounters( true );
135      }
136    catch( Exception exception )
137      {
138      // do nothing
139      }
140
141    if( !clientState.isEnabled() )
142      return;
143
144    captureDetail( Type.ATTEMPT );
145
146    try
147      {
148      for( Map.Entry<String, FlowNodeStats> entry : getFlowNodeStatsMap().entrySet() )
149        {
150        entry.getValue().recordStats();
151        entry.getValue().recordChildStats();
152        }
153      }
154    catch( Exception exception )
155      {
156      LOG.error( "unable to record node stats", exception );
157      }
158    }
159  }