001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.util.Collection;
024import java.util.Collections;
025import java.util.LinkedHashMap;
026import java.util.Map;
027
028import cascading.flow.FlowNode;
029import cascading.management.state.ClientState;
030import cascading.stats.FlowNodeStats;
031import cascading.stats.FlowSliceStats;
032
033/**
034 *
035 */
036public abstract class BaseHadoopNodeStats<JobStatus, Counters> extends FlowNodeStats
037  {
038  protected final Map<String, FlowSliceStats> sliceStatsMap = new LinkedHashMap<>();
039  protected CounterCache<JobStatus, Counters> counterCache;
040
041  protected boolean allChildrenFinished;
042
043  /**
044   * Constructor CascadingStats creates a new CascadingStats instance.
045   *
046   * @param flowNode
047   * @param clientState
048   */
049  protected BaseHadoopNodeStats( FlowNode flowNode, ClientState clientState )
050    {
051    super( flowNode, clientState );
052    }
053
054  @Override
055  public long getLastSuccessfulCounterFetchTime()
056    {
057    if( counterCache != null )
058      return counterCache.getLastSuccessfulFetch();
059
060    return -1;
061    }
062
063  public boolean isAllChildrenFinished()
064    {
065    return allChildrenFinished;
066    }
067
068  /**
069   * Method getCounterGroups returns all of the Hadoop counter groups.
070   *
071   * @return the counterGroups (type Collection<String>) of this HadoopStepStats object.
072   */
073  @Override
074  public Collection<String> getCounterGroups()
075    {
076    return counterCache.getCounterGroups();
077    }
078
079  /**
080   * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern.
081   *
082   * @param regex of String
083   * @return Collection<String>
084   */
085  @Override
086  public Collection<String> getCounterGroupsMatching( String regex )
087    {
088    return counterCache.getCounterGroupsMatching( regex );
089    }
090
091  /**
092   * Method getCountersFor returns the Hadoop counters for the given group.
093   *
094   * @param group of String
095   * @return Collection<String>
096   */
097  @Override
098  public Collection<String> getCountersFor( String group )
099    {
100    return counterCache.getCountersFor( group );
101    }
102
103  /**
104   * Method getCounterValue returns the Hadoop counter value for the given counter enum.
105   *
106   * @param counter of Enum
107   * @return long
108   */
109  @Override
110  public long getCounterValue( Enum counter )
111    {
112    return counterCache.getCounterValue( counter );
113    }
114
115  /**
116   * Method getCounterValue returns the Hadoop counter value for the given group and counter name.
117   *
118   * @param group   of String
119   * @param counter of String
120   * @return long
121   */
122  @Override
123  public long getCounterValue( String group, String counter )
124    {
125    return counterCache.getCounterValue( group, counter );
126    }
127
128  protected synchronized Counters cachedCounters( boolean force )
129    {
130    return counterCache.cachedCounters( force );
131    }
132
133  @Override
134  public Collection<FlowSliceStats> getChildren()
135    {
136    synchronized( sliceStatsMap )
137      {
138      return Collections.unmodifiableCollection( sliceStatsMap.values() );
139      }
140    }
141
142  @Override
143  public FlowSliceStats getChildWith( String id )
144    {
145    return sliceStatsMap.get( id );
146    }
147
148  @Override
149  public final void captureDetail( Type depth )
150    {
151    boolean finished = isFinished();
152
153    if( finished && hasCapturedFinalDetail() )
154      return;
155
156    synchronized( this )
157      {
158      if( !getType().isChild( depth ) || !isDetailStale() )
159        return;
160
161      boolean success = captureChildDetailInternal();
162
163      markDetailCaptured(); // always mark to prevent double calls
164
165      if( success )
166        logDebug( "captured remote node statistic details" );
167
168      hasCapturedFinalDetail = finished && success && allChildrenFinished;
169
170      if( allChildrenFinished )
171        logInfo( "all {} children are in finished state, have captured final details: {}", sliceStatsMap.size(), hasCapturedFinalDetail() );
172      }
173    }
174
175  /**
176   * Returns true if was able to capture/refresh the internal child stats cache.
177   *
178   * @return true if successful
179   */
180  protected abstract boolean captureChildDetailInternal();
181
182  /** Synchronized to prevent state changes mid record, #stop may be called out of band */
183  @Override
184  public synchronized void recordChildStats()
185    {
186    try
187      {
188      cachedCounters( true );
189      }
190    catch( Exception exception )
191      {
192      // do nothing
193      }
194
195    if( !clientState.isEnabled() )
196      return;
197
198    captureDetail( Type.ATTEMPT );
199
200    // FlowSliceStats are not full blown Stats types, but implementation specific
201    // so we can't call recordStats/recordChildStats
202    try
203      {
204      // must use the local ID as the stored id, not task id
205      for( FlowSliceStats value : sliceStatsMap.values() )
206        clientState.record( value.getID(), value );
207      }
208    catch( Exception exception )
209      {
210      logError( "unable to record node stats", exception );
211      }
212    }
213  }