001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.io.IOException; 024import java.util.HashMap; 025import java.util.Map; 026 027import cascading.flow.FlowNode; 028import cascading.management.state.ClientState; 029import cascading.stats.FlowNodeStats; 030import cascading.stats.FlowSliceStats; 031import cascading.util.Util; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.mapred.RunningJob; 034import org.apache.hadoop.mapreduce.Job; 035import org.apache.hadoop.mapreduce.TaskCompletionEvent; 036import org.apache.hadoop.mapreduce.TaskID; 037import org.apache.hadoop.mapreduce.TaskReport; 038import org.apache.hadoop.mapreduce.TaskType; 039 040import static cascading.util.Util.formatDurationFromMillis; 041 042/** 043 * 044 */ 045public class HadoopNodeStats extends BaseHadoopNodeStats<FlowNodeStats, Map<String, Map<String, Long>>> 046 { 047 private Map<TaskID, String> sliceIDCache = new HashMap<TaskID, String>( 4999 ); // caching for ids 048 049 private HadoopStepStats parentStepStats; 050 private HadoopSliceStats.Kind kind; 051 052 /** 053 * Constructor CascadingStats creates a new CascadingStats instance. 054 * 055 * @param parentStepStats 056 * @param configuration 057 * @param kind 058 * @param flowNode 059 * @param clientState 060 */ 061 protected HadoopNodeStats( final HadoopStepStats parentStepStats, Configuration configuration, HadoopSliceStats.Kind kind, FlowNode flowNode, ClientState clientState ) 062 { 063 super( flowNode, clientState ); 064 this.parentStepStats = parentStepStats; 065 this.kind = kind; 066 067 this.counterCache = new HadoopNodeCounterCache( this, configuration ); 068 } 069 070 @Override 071 public String getKind() 072 { 073 if( kind == null ) 074 return null; 075 076 return kind.name(); 077 } 078 079 private Status getParentStatus() 080 { 081 return parentStepStats.getStatus(); 082 } 083 084 private RunningJob getJobStatusClient() 085 { 086 return parentStepStats.getJobStatusClient(); 087 } 088 089 /** 090 * Retrieves the TaskReports via the mapreduce API. 091 * 092 * @param kind The kind of TaskReport to retrieve. 093 * @return An array of TaskReports, but never <code>nul</code>. 094 * @throws IOException 095 */ 096 private TaskReport[] retrieveTaskReports( HadoopSliceStats.Kind kind ) throws IOException, InterruptedException 097 { 098 Job job = HadoopStepStats.getJob( getJobStatusClient() ); 099 100 if( job == null ) 101 return new TaskReport[ 0 ]; 102 103 switch( kind ) 104 { 105 case MAPPER: 106 return job.getTaskReports( TaskType.MAP ); 107 case REDUCER: 108 return job.getTaskReports( TaskType.REDUCE ); 109 case SETUP: 110 return job.getTaskReports( TaskType.JOB_SETUP ); 111 case CLEANUP: 112 return job.getTaskReports( TaskType.JOB_CLEANUP ); 113 default: 114 return new TaskReport[ 0 ]; 115 } 116 } 117 118 @Override 119 protected boolean captureChildDetailInternal() 120 { 121 if( allChildrenFinished ) 122 return true; 123 124 Job job = HadoopStepStats.getJob( getJobStatusClient() ); 125 126 if( job == null ) 127 return false; 128 129 try 130 { 131 TaskReport[] taskReports = retrieveTaskReports( kind ); 132 133 if( taskReports.length == 0 ) 134 return false; 135 136 addTaskStats( taskReports, false ); 137 138 return true; 139 } 140 catch( IOException exception ) 141 { 142 logWarn( "unable to retrieve slice stats via task reports", exception ); 143 } 144 catch( InterruptedException exception ) 145 { 146 logWarn( "retrieving task reports timed out, consider increasing timeout delay in CounterCache via: '{}', message: {}", CounterCache.COUNTER_TIMEOUT_PROPERTY, exception.getMessage() ); 147 } 148 149 return false; 150 } 151 152 protected void addTaskStats( TaskReport[] taskReports, boolean skipLast ) 153 { 154 logInfo( "retrieved task reports: {}", taskReports.length ); 155 156 long lastFetch = System.currentTimeMillis(); 157 boolean fetchedAreFinished = true; 158 159 synchronized( sliceStatsMap ) 160 { 161 int added = 0; 162 int updated = 0; 163 164 for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ ) 165 { 166 TaskReport taskReport = taskReports[ i ]; 167 168 if( taskReport == null ) 169 { 170 logWarn( "found empty task report" ); 171 continue; 172 } 173 174 String id = getSliceIDFor( taskReport.getTaskID() ); 175 HadoopSliceStats sliceStats = (HadoopSliceStats) sliceStatsMap.get( id ); 176 177 if( sliceStats != null ) 178 { 179 sliceStats.update( getParentStatus(), kind, taskReport, lastFetch ); 180 updated++; 181 } 182 else 183 { 184 sliceStats = new HadoopSliceStats( id, getParentStatus(), kind, taskReport, lastFetch ); 185 sliceStatsMap.put( id, sliceStats ); 186 added++; 187 } 188 189 if( !sliceStats.getStatus().isFinished() ) 190 fetchedAreFinished = false; 191 } 192 193 int total = sliceStatsMap.size(); 194 String duration = formatDurationFromMillis( System.currentTimeMillis() - lastFetch ); 195 196 logInfo( "added {}, updated: {} slices, with duration: {}, total fetched: {}", added, updated, duration, total ); 197 } 198 199 allChildrenFinished = taskReports.length != 0 && fetchedAreFinished; 200 } 201 202 protected void addAttempt( TaskCompletionEvent event ) 203 { 204 // the event could be a housekeeping task, which we are not tracking 205 String sliceID = sliceIDCache.get( event.getTaskAttemptId().getTaskID() ); 206 207 if( sliceID == null ) 208 return; 209 210 FlowSliceStats stats; 211 212 synchronized( sliceStatsMap ) 213 { 214 stats = sliceStatsMap.get( sliceID ); 215 } 216 217 if( stats == null ) 218 return; 219 220 ( (HadoopSliceStats) stats ).addAttempt( event ); 221 } 222 223 private String getSliceIDFor( TaskID taskID ) 224 { 225 // using taskID instance as #toString is quite painful 226 String id = sliceIDCache.get( taskID ); 227 228 if( id == null ) 229 { 230 id = Util.createUniqueID(); 231 sliceIDCache.put( taskID, id ); 232 } 233 234 return id; 235 } 236 }