001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.io.IOException; 024import java.util.HashMap; 025import java.util.Map; 026 027import cascading.flow.FlowNode; 028import cascading.management.state.ClientState; 029import cascading.stats.FlowNodeStats; 030import cascading.stats.FlowSliceStats; 031import cascading.util.Util; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.mapred.JobClient; 034import org.apache.hadoop.mapred.RunningJob; 035import org.apache.hadoop.mapred.TaskCompletionEvent; 036import org.apache.hadoop.mapred.TaskID; 037import org.apache.hadoop.mapred.TaskReport; 038 039import static cascading.util.Util.formatDurationFromMillis; 040 041/** 042 * 043 */ 044public class HadoopNodeStats extends BaseHadoopNodeStats<FlowNodeStats, Map<String, Map<String, Long>>> 045 { 046 private Map<TaskID, String> sliceIDCache = new HashMap<TaskID, String>( 4999 ); // caching for ids 047 048 private HadoopStepStats parentStepStats; 049 private HadoopSliceStats.Kind kind; 050 051 /** 052 * Constructor CascadingStats creates a new CascadingStats instance. 053 * 054 * @param parentStepStats 055 * @param configuration 056 * @param kind 057 * @param flowNode 058 * @param clientState 059 */ 060 protected HadoopNodeStats( final HadoopStepStats parentStepStats, Configuration configuration, HadoopSliceStats.Kind kind, FlowNode flowNode, ClientState clientState ) 061 { 062 super( flowNode, clientState ); 063 this.parentStepStats = parentStepStats; 064 this.kind = kind; 065 066 this.counterCache = new HadoopNodeCounterCache( this, configuration ); 067 } 068 069 @Override 070 public String getKind() 071 { 072 if( kind == null ) 073 return null; 074 075 return kind.name(); 076 } 077 078 private Status getParentStatus() 079 { 080 return parentStepStats.getStatus(); 081 } 082 083 @Override 084 protected boolean captureChildDetailInternal() 085 { 086 if( allChildrenFinished ) 087 return true; 088 089 JobClient jobClient = parentStepStats.getJobClient(); 090 RunningJob runningJob = parentStepStats.getJobStatusClient(); 091 092 if( jobClient == null || runningJob == null ) 093 return false; 094 095 try 096 { 097 TaskReport[] taskReports; // todo: use Job task reports 098 099 if( kind == HadoopSliceStats.Kind.MAPPER ) 100 taskReports = jobClient.getMapTaskReports( runningJob.getID() ); 101 else 102 taskReports = jobClient.getReduceTaskReports( runningJob.getID() ); 103 104 if( taskReports.length == 0 ) 105 return false; 106 107 addTaskStats( taskReports, false ); 108 109 return true; 110 } 111 catch( IOException exception ) 112 { 113 logWarn( "unable to retrieve slice stats via task reports", exception ); 114 } 115 116 return false; 117 } 118 119 protected void addTaskStats( TaskReport[] taskReports, boolean skipLast ) 120 { 121 logInfo( "retrieved task reports: {}", taskReports.length ); 122 123 long lastFetch = System.currentTimeMillis(); 124 boolean fetchedAreFinished = true; 125 126 synchronized( sliceStatsMap ) 127 { 128 int added = 0; 129 int updated = 0; 130 131 for( int i = 0; i < taskReports.length - ( skipLast ? 1 : 0 ); i++ ) 132 { 133 TaskReport taskReport = taskReports[ i ]; 134 135 if( taskReport == null ) 136 { 137 logWarn( "found empty task report" ); 138 continue; 139 } 140 141 String id = getSliceIDFor( taskReport.getTaskID() ); 142 HadoopSliceStats sliceStats = new HadoopSliceStats( id, getParentStatus(), kind, taskReport, lastFetch ); 143 144 if( sliceStatsMap.put( id, sliceStats ) != null ) 145 updated++; 146 else 147 added++; 148 149 if( !sliceStats.getStatus().isFinished() ) 150 fetchedAreFinished = false; 151 } 152 153 int total = sliceStatsMap.size(); 154 String duration = formatDurationFromMillis( System.currentTimeMillis() - lastFetch ); 155 156 logInfo( "added {}, updated: {} slices, with duration: {}, total fetched: {}", added, updated, duration, total ); 157 } 158 159 allChildrenFinished = taskReports.length != 0 && fetchedAreFinished; 160 } 161 162 protected void addAttempt( TaskCompletionEvent event ) 163 { 164 // the event could be a housekeeping task, which we are not tracking 165 String sliceID = sliceIDCache.get( event.getTaskAttemptId().getTaskID() ); 166 167 if( sliceID == null ) 168 return; 169 170 FlowSliceStats stats; 171 172 synchronized( sliceStatsMap ) 173 { 174 stats = sliceStatsMap.get( sliceID ); 175 } 176 177 if( stats == null ) 178 return; 179 180 ( (HadoopSliceStats) stats ).addAttempt( event ); 181 } 182 183 private String getSliceIDFor( TaskID taskID ) 184 { 185 // using taskID instance as #toString is quite painful 186 String id = sliceIDCache.get( taskID ); 187 188 if( id == null ) 189 { 190 id = Util.createUniqueID(); 191 sliceIDCache.put( taskID, id ); 192 } 193 194 return id; 195 } 196 }