001/* 002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved. 003 * 004 * Project and contact information: http://www.cascading.org/ 005 * 006 * This file is part of the Cascading project. 007 * 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package cascading.stats.hadoop; 022 023import java.util.Collection; 024import java.util.Map; 025 026import cascading.flow.FlowStep; 027import cascading.management.state.ClientState; 028import cascading.stats.FlowNodeStats; 029import cascading.stats.FlowStepStats; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * 035 */ 036public abstract class BaseHadoopStepStats<JobStatusClient, Counters> extends FlowStepStats 037 { 038 private static final Logger LOG = LoggerFactory.getLogger( BaseHadoopStepStats.class ); 039 040 protected CounterCache<JobStatusClient, Counters> counterCache; 041 042 public BaseHadoopStepStats( FlowStep flowStep, ClientState clientState ) 043 { 044 super( flowStep, clientState ); 045 } 046 047 /** 048 * Method getRunningJob returns the Hadoop {@link org.apache.hadoop.mapred.RunningJob} managing this Hadoop job. 049 * 050 * @return the runningJob (type RunningJob) of this HadoopStepStats object. 051 */ 052 public abstract JobStatusClient getJobStatusClient(); 053 054 @Override 055 public long getLastSuccessfulCounterFetchTime() 056 { 057 if( counterCache != null ) 058 return counterCache.getLastSuccessfulFetch(); 059 060 return -1; 061 } 062 063 /** 064 * Method getCounterGroups returns all of the Hadoop counter groups. 065 * 066 * @return the counterGroups (type Collection<String>) of this HadoopStepStats object. 067 */ 068 @Override 069 public Collection<String> getCounterGroups() 070 { 071 return counterCache.getCounterGroups(); 072 } 073 074 /** 075 * Method getCounterGroupsMatching returns all the Hadoop counter groups that match the give regex pattern. 076 * 077 * @param regex of String 078 * @return Collection<String> 079 */ 080 @Override 081 public Collection<String> getCounterGroupsMatching( String regex ) 082 { 083 return counterCache.getCounterGroupsMatching( regex ); 084 } 085 086 /** 087 * Method getCountersFor returns the Hadoop counters for the given group. 088 * 089 * @param group of String 090 * @return Collection<String> 091 */ 092 @Override 093 public Collection<String> getCountersFor( String group ) 094 { 095 return counterCache.getCountersFor( group ); 096 } 097 098 /** 099 * Method getCounterValue returns the Hadoop counter value for the given counter enum. 100 * 101 * @param counter of Enum 102 * @return long 103 */ 104 @Override 105 public long getCounterValue( Enum counter ) 106 { 107 return counterCache.getCounterValue( counter ); 108 } 109 110 /** 111 * Method getCounterValue returns the Hadoop counter value for the given group and counter name. 112 * 113 * @param group of String 114 * @param counter of String 115 * @return long 116 */ 117 @Override 118 public long getCounterValue( String group, String counter ) 119 { 120 return counterCache.getCounterValue( group, counter ); 121 } 122 123 protected synchronized Counters cachedCounters( boolean force ) 124 { 125 return counterCache.cachedCounters( force ); 126 } 127 128 /** Synchronized to prevent state changes mid record, #stop may be called out of band */ 129 @Override 130 public synchronized void recordChildStats() 131 { 132 try 133 { 134 cachedCounters( true ); 135 } 136 catch( Exception exception ) 137 { 138 // do nothing 139 } 140 141 if( !clientState.isEnabled() ) 142 return; 143 144 captureDetail( Type.ATTEMPT ); 145 146 try 147 { 148 for( Map.Entry<String, FlowNodeStats> entry : getFlowNodeStatsMap().entrySet() ) 149 { 150 entry.getValue().recordStats(); 151 entry.getValue().recordChildStats(); 152 } 153 } 154 catch( Exception exception ) 155 { 156 LOG.error( "unable to record node stats", exception ); 157 } 158 } 159 }