001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.util.Collection; 024import java.util.Collections; 025import java.util.LinkedHashMap; 026import java.util.Map; 027 028import cascading.flow.FlowNode; 029import cascading.management.state.ClientState; 030import cascading.stats.FlowNodeStats; 031import cascading.stats.FlowSliceStats; 032 033/** 034 * 035 */ 036public abstract class BaseHadoopNodeStats<JobStatus, Counters> extends FlowNodeStats 037 { 038 protected final Map<String, FlowSliceStats> sliceStatsMap = new LinkedHashMap<>(); 039 protected CounterCache<JobStatus, Counters> counterCache; 040 041 protected boolean allChildrenFinished; 042 043 /** 044 * Constructor CascadingStats creates a new CascadingStats instance. 045 * 046 * @param flowNode 047 * @param clientState 048 */ 049 protected BaseHadoopNodeStats( FlowNode flowNode, ClientState clientState ) 050 { 051 super( flowNode, clientState ); 052 } 053 054 @Override 055 public long getLastSuccessfulCounterFetchTime() 056 { 057 if( counterCache != null ) 058 return counterCache.getLastSuccessfulFetch(); 059 060 return -1; 061 } 062 063 public boolean isAllChildrenFinished() 064 { 065 return allChildrenFinished; 066 } 067 068 /** 069 * Method getCounterGroups returns all of the Hadoop counter groups. 070 * 071 * @return the counterGroups (type Collection<String>) of this HadoopStepStats object. 072 */ 073 @Override 074 public Collection<String> getCounterGroups() 075 { 076 return counterCache.getCounterGroups(); 077 } 078 079 /** 080 * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern. 081 * 082 * @param regex of String 083 * @return Collection<String> 084 */ 085 @Override 086 public Collection<String> getCounterGroupsMatching( String regex ) 087 { 088 return counterCache.getCounterGroupsMatching( regex ); 089 } 090 091 /** 092 * Method getCountersFor returns the Hadoop counters for the given group. 093 * 094 * @param group of String 095 * @return Collection<String> 096 */ 097 @Override 098 public Collection<String> getCountersFor( String group ) 099 { 100 return counterCache.getCountersFor( group ); 101 } 102 103 /** 104 * Method getCounterValue returns the Hadoop counter value for the given counter enum. 105 * 106 * @param counter of Enum 107 * @return long 108 */ 109 @Override 110 public long getCounterValue( Enum counter ) 111 { 112 return counterCache.getCounterValue( counter ); 113 } 114 115 /** 116 * Method getCounterValue returns the Hadoop counter value for the given group and counter name. 117 * 118 * @param group of String 119 * @param counter of String 120 * @return long 121 */ 122 @Override 123 public long getCounterValue( String group, String counter ) 124 { 125 return counterCache.getCounterValue( group, counter ); 126 } 127 128 protected synchronized Counters cachedCounters( boolean force ) 129 { 130 return counterCache.cachedCounters( force ); 131 } 132 133 @Override 134 public Collection<FlowSliceStats> getChildren() 135 { 136 synchronized( sliceStatsMap ) 137 { 138 return Collections.unmodifiableCollection( sliceStatsMap.values() ); 139 } 140 } 141 142 @Override 143 public FlowSliceStats getChildWith( String id ) 144 { 145 return sliceStatsMap.get( id ); 146 } 147 148 @Override 149 public final void captureDetail( Type depth ) 150 { 151 boolean finished = isFinished(); 152 153 if( finished && hasCapturedFinalDetail() ) 154 return; 155 156 synchronized( this ) 157 { 158 if( !getType().isChild( depth ) || !isDetailStale() ) 159 return; 160 161 boolean success = captureChildDetailInternal(); 162 163 markDetailCaptured(); // always mark to prevent double calls 164 165 if( success ) 166 logDebug( "captured remote node statistic details" ); 167 168 hasCapturedFinalDetail = finished && success && allChildrenFinished; 169 170 if( allChildrenFinished ) 171 logInfo( "all {} children are in finished state, have captured final details: {}", sliceStatsMap.size(), hasCapturedFinalDetail() ); 172 } 173 } 174 175 /** 176 * Returns true if was able to capture/refresh the internal child stats cache. 177 * 178 * @return true if successful 179 */ 180 protected abstract boolean captureChildDetailInternal(); 181 182 /** Synchronized to prevent state changes mid record, #stop may be called out of band */ 183 @Override 184 public synchronized void recordChildStats() 185 { 186 try 187 { 188 cachedCounters( true ); 189 } 190 catch( Exception exception ) 191 { 192 // do nothing 193 } 194 195 if( !clientState.isEnabled() ) 196 return; 197 198 captureDetail( Type.ATTEMPT ); 199 200 // FlowSliceStats are not full blown Stats types, but implementation specific 201 // so we can't call recordStats/recordChildStats 202 try 203 { 204 // must use the local ID as the stored id, not task id 205 for( FlowSliceStats value : sliceStatsMap.values() ) 206 clientState.record( value.getID(), value ); 207 } 208 catch( Exception exception ) 209 { 210 logError( "unable to record node stats", exception ); 211 } 212 } 213 }