001/*
002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats;
022
023import java.util.Collection;
024import java.util.Collections;
025import java.util.LinkedHashMap;
026import java.util.Map;
027
028import cascading.flow.FlowNode;
029import cascading.management.state.ClientState;
030
031/**
032 *
033 */
034public abstract class BaseCachedNodeStats<Config, JobStatus, Counters> extends FlowNodeStats
035  {
036  protected final Map<String, FlowSliceStats> sliceStatsMap = new LinkedHashMap<>();
037  protected CounterCache<Config, JobStatus, Counters> counterCache;
038
039  protected boolean allChildrenFinished;
040
041  /**
042   * Constructor BaseHadoopNodeStats creates a new BaseHadoopNodeStats instance.
043   *
044   * @param flowNode
045   * @param clientState
046   */
047  protected BaseCachedNodeStats( FlowNode flowNode, ClientState clientState )
048    {
049    super( flowNode, clientState );
050    }
051
052  @Override
053  public long getLastSuccessfulCounterFetchTime()
054    {
055    if( counterCache != null )
056      return counterCache.getLastSuccessfulFetch();
057
058    return -1;
059    }
060
061  public boolean isAllChildrenFinished()
062    {
063    return allChildrenFinished;
064    }
065
066  /**
067   * Method getCounterGroups returns all of the Hadoop counter groups.
068   *
069   * @return the counterGroups (type Collection<String>) of this HadoopStepStats object.
070   */
071  @Override
072  public Collection<String> getCounterGroups()
073    {
074    return counterCache.getCounterGroups();
075    }
076
077  /**
078   * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern.
079   *
080   * @param regex of String
081   * @return Collection<String>
082   */
083  @Override
084  public Collection<String> getCounterGroupsMatching( String regex )
085    {
086    return counterCache.getCounterGroupsMatching( regex );
087    }
088
089  /**
090   * Method getCountersFor returns the Hadoop counters for the given group.
091   *
092   * @param group of String
093   * @return Collection<String>
094   */
095  @Override
096  public Collection<String> getCountersFor( String group )
097    {
098    return counterCache.getCountersFor( group );
099    }
100
101  /**
102   * Method getCounterValue returns the Hadoop counter value for the given counter enum.
103   *
104   * @param counter of Enum
105   * @return long
106   */
107  @Override
108  public long getCounterValue( Enum counter )
109    {
110    return counterCache.getCounterValue( counter );
111    }
112
113  /**
114   * Method getCounterValue returns the Hadoop counter value for the given group and counter name.
115   *
116   * @param group   of String
117   * @param counter of String
118   * @return long
119   */
120  @Override
121  public long getCounterValue( String group, String counter )
122    {
123    return counterCache.getCounterValue( group, counter );
124    }
125
126  protected synchronized Counters cachedCounters( boolean force )
127    {
128    return counterCache.cachedCounters( force );
129    }
130
131  @Override
132  public Collection<FlowSliceStats> getChildren()
133    {
134    synchronized( sliceStatsMap )
135      {
136      return Collections.unmodifiableCollection( sliceStatsMap.values() );
137      }
138    }
139
140  @Override
141  public FlowSliceStats getChildWith( String id )
142    {
143    return sliceStatsMap.get( id );
144    }
145
146  @Override
147  public final void captureDetail( Type depth )
148    {
149    boolean finished = isFinished();
150
151    if( finished && hasCapturedFinalDetail() )
152      return;
153
154    synchronized( this )
155      {
156      if( !getType().isChild( depth ) || !isDetailStale() )
157        return;
158
159      boolean success = captureChildDetailInternal();
160
161      markDetailCaptured(); // always mark to prevent double calls
162
163      if( success )
164        logDebug( "captured remote node statistic details" );
165
166      hasCapturedFinalDetail = finished && success && allChildrenFinished;
167
168      if( allChildrenFinished )
169        logInfo( "all {} children are in finished state, have captured final details: {}", sliceStatsMap.size(), hasCapturedFinalDetail() );
170      }
171    }
172
173  /**
174   * Returns true if was able to capture/refresh the internal child stats cache.
175   *
176   * @return true if successful
177   */
178  protected abstract boolean captureChildDetailInternal();
179
180  /** Synchronized to prevent state changes mid record, #stop may be called out of band */
181  @Override
182  public synchronized void recordChildStats()
183    {
184    try
185      {
186      cachedCounters( true );
187      }
188    catch( Exception exception )
189      {
190      // do nothing
191      }
192
193    if( !clientState.isEnabled() )
194      return;
195
196    captureDetail( Type.ATTEMPT );
197
198    // FlowSliceStats are not full blown Stats types, but implementation specific
199    // so we can't call recordStats/recordChildStats
200    try
201      {
202      // must use the local ID as the stored id, not task id
203      for( FlowSliceStats value : sliceStatsMap.values() )
204        clientState.record( value.getID(), value );
205      }
206    catch( Exception exception )
207      {
208      logError( "unable to record node stats", exception );
209      }
210    }
211  }