001/* 002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.io.IOException; 024import java.util.HashMap; 025import java.util.Map; 026 027import cascading.flow.FlowNode; 028import cascading.management.state.ClientState; 029import cascading.stats.BaseCachedNodeStats; 030import cascading.stats.CounterCache; 031import cascading.stats.FlowNodeStats; 032import cascading.stats.FlowSliceStats; 033import cascading.util.Util; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.mapred.RunningJob; 036import org.apache.hadoop.mapreduce.Job; 037import org.apache.hadoop.mapreduce.TaskCompletionEvent; 038import org.apache.hadoop.mapreduce.TaskID; 039import org.apache.hadoop.mapreduce.TaskReport; 040import org.apache.hadoop.mapreduce.TaskType; 041 042import static cascading.util.Util.formatDurationFromMillis; 043 044/** 045 * 046 */ 047public class HadoopNodeStats extends BaseCachedNodeStats<Configuration, FlowNodeStats, Map<String, Map<String, Long>>> 048 { 049 private Map<TaskID, String> sliceIDCache = new HashMap<TaskID, String>( 4999 ); // caching for ids 050 051 private HadoopStepStats parentStepStats; 052 private HadoopSliceStats.Kind kind; 053 054 /** 055 * Constructor CascadingStats creates a new CascadingStats instance. 056 * 057 * @param parentStepStats 058 * @param configuration 059 * @param kind 060 * @param flowNode 061 * @param clientState 062 */ 063 protected HadoopNodeStats( final HadoopStepStats parentStepStats, Configuration configuration, HadoopSliceStats.Kind kind, FlowNode flowNode, ClientState clientState ) 064 { 065 super( flowNode, clientState ); 066 this.parentStepStats = parentStepStats; 067 this.kind = kind; 068 069 this.counterCache = new HadoopNodeCounterCache( this, configuration ); 070 } 071 072 @Override 073 public String getKind() 074 { 075 if( kind == null ) 076 return null; 077 078 return kind.name(); 079 } 080 081 private Status getParentStatus() 082 { 083 return parentStepStats.getStatus(); 084 } 085 086 private RunningJob getJobStatusClient() 087 { 088 return parentStepStats.getJobStatusClient(); 089 } 090 091 /** 092 * Retrieves the TaskReports via the mapreduce API. 093 * 094 * @param kind The kind of TaskReport to retrieve. 095 * @return An array of TaskReports, but never <code>nul</code>. 096 * @throws IOException 097 */ 098 private TaskReport[] retrieveTaskReports( HadoopSliceStats.Kind kind ) throws IOException, InterruptedException 099 { 100 Job job = HadoopStepStats.getJob( getJobStatusClient() ); 101 102 if( job == null ) 103 return new TaskReport[ 0 ]; 104 105 switch( kind ) 106 { 107 case MAPPER: 108 return job.getTaskReports( TaskType.MAP ); 109 case REDUCER: 110 return job.getTaskReports( TaskType.REDUCE ); 111 case SETUP: 112 return job.getTaskReports( TaskType.JOB_SETUP ); 113 case CLEANUP: 114 return job.getTaskReports( TaskType.JOB_CLEANUP ); 115 default: 116 return new TaskReport[ 0 ]; 117 } 118 } 119 120 @Override 121 protected boolean captureChildDetailInternal() 122 { 123 if( allChildrenFinished ) 124 return true; 125 126 Job job = HadoopStepStats.getJob( getJobStatusClient() ); 127 128 if( job == null ) 129 return false; 130 131 try 132 { 133 TaskReport[] taskReports = retrieveTaskReports( kind ); 134 135 if( taskReports.length == 0 ) 136 return false; 137 138 addTaskStats( taskReports, false ); 139 140 return true; 141 } 142 catch( IOException exception ) 143 { 144 logWarn( "unable to retrieve slice stats via task reports", exception ); 145 } 146 catch( InterruptedException exception ) 147 { 148 logWarn( "retrieving task reports timed out, consider increasing timeout delay in CounterCache via: '{}', message: {}", CounterCache.COUNTER_TIMEOUT_PROPERTY, exception.getMessage() ); 149 } 150 151 return false; 152 } 153 154 protected void addTaskStats( TaskReport[] taskReports, boolean skipLast ) 155 { 156 logInfo( "retrieved task reports: {}", taskReports.length ); 157 158 long lastFetch = System.currentTimeMillis(); 159 boolean fetchedAreFinished = true; 160 161 synchronized( sliceStatsMap ) 162 { 163 int added = 0; 164 int updated = 0; 165 166 for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ ) 167 { 168 TaskReport taskReport = taskReports[ i ]; 169 170 if( taskReport == null ) 171 { 172 logWarn( "found empty task report" ); 173 continue; 174 } 175 176 String id = getSliceIDFor( taskReport.getTaskID() ); 177 HadoopSliceStats sliceStats = (HadoopSliceStats) sliceStatsMap.get( id ); 178 179 if( sliceStats != null ) 180 { 181 sliceStats.update( getParentStatus(), kind, taskReport, lastFetch ); 182 updated++; 183 } 184 else 185 { 186 sliceStats = new HadoopSliceStats( id, getParentStatus(), kind, taskReport, lastFetch ); 187 sliceStatsMap.put( id, sliceStats ); 188 added++; 189 } 190 191 if( !sliceStats.getStatus().isFinished() ) 192 fetchedAreFinished = false; 193 } 194 195 int total = sliceStatsMap.size(); 196 String duration = formatDurationFromMillis( System.currentTimeMillis() - lastFetch ); 197 198 logInfo( "added {}, updated: {} slices, with duration: {}, total fetched: {}", added, updated, duration, total ); 199 } 200 201 allChildrenFinished = taskReports.length != 0 && fetchedAreFinished; 202 } 203 204 protected void addAttempt( TaskCompletionEvent event ) 205 { 206 // the event could be a housekeeping task, which we are not tracking 207 String sliceID = sliceIDCache.get( event.getTaskAttemptId().getTaskID() ); 208 209 if( sliceID == null ) 210 return; 211 212 FlowSliceStats stats; 213 214 synchronized( sliceStatsMap ) 215 { 216 stats = sliceStatsMap.get( sliceID ); 217 } 218 219 if( stats == null ) 220 return; 221 222 ( (HadoopSliceStats) stats ).addAttempt( event ); 223 } 224 225 private String getSliceIDFor( TaskID taskID ) 226 { 227 // using taskID instance as #toString is quite painful 228 String id = sliceIDCache.get( taskID ); 229 230 if( id == null ) 231 { 232 id = Util.createUniqueID(); 233 sliceIDCache.put( taskID, id ); 234 } 235 236 return id; 237 } 238 }