001/* 002 * Copyright (c) 2007-2016 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats; 022 023import java.util.Collection; 024import java.util.Collections; 025import java.util.LinkedHashMap; 026import java.util.Map; 027 028import cascading.flow.FlowNode; 029import cascading.management.state.ClientState; 030 031/** 032 * 033 */ 034public abstract class BaseCachedNodeStats<Config, JobStatus, Counters> extends FlowNodeStats 035 { 036 protected final Map<String, FlowSliceStats> sliceStatsMap = new LinkedHashMap<>(); 037 protected CounterCache<Config, JobStatus, Counters> counterCache; 038 039 protected boolean allChildrenFinished; 040 041 /** 042 * Constructor BaseHadoopNodeStats creates a new BaseHadoopNodeStats instance. 043 * 044 * @param flowNode 045 * @param clientState 046 */ 047 protected BaseCachedNodeStats( FlowNode flowNode, ClientState clientState ) 048 { 049 super( flowNode, clientState ); 050 } 051 052 @Override 053 public long getLastSuccessfulCounterFetchTime() 054 { 055 if( counterCache != null ) 056 return counterCache.getLastSuccessfulFetch(); 057 058 return -1; 059 } 060 061 public boolean isAllChildrenFinished() 062 { 063 return allChildrenFinished; 064 } 065 066 /** 067 * Method getCounterGroups returns all of the Hadoop counter groups. 068 * 069 * @return the counterGroups (type Collection<String>) of this HadoopStepStats object. 070 */ 071 @Override 072 public Collection<String> getCounterGroups() 073 { 074 return counterCache.getCounterGroups(); 075 } 076 077 /** 078 * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern. 079 * 080 * @param regex of String 081 * @return Collection<String> 082 */ 083 @Override 084 public Collection<String> getCounterGroupsMatching( String regex ) 085 { 086 return counterCache.getCounterGroupsMatching( regex ); 087 } 088 089 /** 090 * Method getCountersFor returns the Hadoop counters for the given group. 091 * 092 * @param group of String 093 * @return Collection<String> 094 */ 095 @Override 096 public Collection<String> getCountersFor( String group ) 097 { 098 return counterCache.getCountersFor( group ); 099 } 100 101 /** 102 * Method getCounterValue returns the Hadoop counter value for the given counter enum. 103 * 104 * @param counter of Enum 105 * @return long 106 */ 107 @Override 108 public long getCounterValue( Enum counter ) 109 { 110 return counterCache.getCounterValue( counter ); 111 } 112 113 /** 114 * Method getCounterValue returns the Hadoop counter value for the given group and counter name. 115 * 116 * @param group of String 117 * @param counter of String 118 * @return long 119 */ 120 @Override 121 public long getCounterValue( String group, String counter ) 122 { 123 return counterCache.getCounterValue( group, counter ); 124 } 125 126 protected synchronized Counters cachedCounters( boolean force ) 127 { 128 return counterCache.cachedCounters( force ); 129 } 130 131 @Override 132 public Collection<FlowSliceStats> getChildren() 133 { 134 synchronized( sliceStatsMap ) 135 { 136 return Collections.unmodifiableCollection( sliceStatsMap.values() ); 137 } 138 } 139 140 @Override 141 public FlowSliceStats getChildWith( String id ) 142 { 143 return sliceStatsMap.get( id ); 144 } 145 146 @Override 147 public final void captureDetail( Type depth ) 148 { 149 boolean finished = isFinished(); 150 151 if( finished && hasCapturedFinalDetail() ) 152 return; 153 154 synchronized( this ) 155 { 156 if( !getType().isChild( depth ) || !isDetailStale() ) 157 return; 158 159 boolean success = captureChildDetailInternal(); 160 161 markDetailCaptured(); // always mark to prevent double calls 162 163 if( success ) 164 logDebug( "captured remote node statistic details" ); 165 166 hasCapturedFinalDetail = finished && success && allChildrenFinished; 167 168 if( allChildrenFinished ) 169 logInfo( "all {} children are in finished state, have captured final details: {}", sliceStatsMap.size(), hasCapturedFinalDetail() ); 170 } 171 } 172 173 /** 174 * Returns true if was able to capture/refresh the internal child stats cache. 175 * 176 * @return true if successful 177 */ 178 protected abstract boolean captureChildDetailInternal(); 179 180 /** Synchronized to prevent state changes mid record, #stop may be called out of band */ 181 @Override 182 public synchronized void recordChildStats() 183 { 184 try 185 { 186 cachedCounters( true ); 187 } 188 catch( Exception exception ) 189 { 190 // do nothing 191 } 192 193 if( !clientState.isEnabled() ) 194 return; 195 196 captureDetail( Type.ATTEMPT ); 197 198 // FlowSliceStats are not full blown Stats types, but implementation specific 199 // so we can't call recordStats/recordChildStats 200 try 201 { 202 // must use the local ID as the stored id, not task id 203 for( FlowSliceStats value : sliceStatsMap.values() ) 204 clientState.record( value.getID(), value ); 205 } 206 catch( Exception exception ) 207 { 208 logError( "unable to record node stats", exception ); 209 } 210 } 211 }