001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.hadoop;
022
023import java.io.IOException;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Map;
028import java.util.Set;
029
030import cascading.flow.hadoop.util.HadoopUtil;
031import cascading.stats.CascadingStats;
032import cascading.stats.FlowNodeStats;
033import cascading.stats.FlowSliceStats;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.mapreduce.Counter;
036import org.apache.hadoop.mapreduce.CounterGroup;
037import org.apache.hadoop.mapreduce.Counters;
038import org.apache.hadoop.mapreduce.TaskReport;
039
040/**
041 *
042 */
043public class HadoopNodeCounterCache extends HadoopCounterCache<FlowNodeStats, Map<String, Map<String, Long>>>
044  {
045  private FlowNodeStats flowNodeStats;
046
047  private Configuration configuration;
048
049  protected HadoopNodeCounterCache( FlowNodeStats flowNodeStats, Configuration configuration )
050    {
051    super( flowNodeStats, configuration );
052
053    this.flowNodeStats = flowNodeStats;
054    this.configuration = configuration;
055    }
056
057  @Override
058  protected FlowNodeStats getJobStatusClient()
059    {
060    return flowNodeStats;
061    }
062
063  @Override
064  protected boolean areCountersAvailable( FlowNodeStats runningJob )
065    {
066    return !HadoopUtil.isLocal( this.configuration );
067    }
068
069  protected Map<String, Map<String, Long>> getCounters( FlowNodeStats flowNodeStats ) throws IOException
070    {
071    // will use final or cached remote stats
072    flowNodeStats.captureDetail( CascadingStats.Type.SLICE );
073
074    Map<String, Map<String, Long>> allCounters = new HashMap<>();
075
076    Collection<FlowSliceStats> children = flowNodeStats.getChildren();
077
078    for( FlowSliceStats sliceStats : children )
079      {
080      TaskReport taskReport = ( (HadoopSliceStats) sliceStats ).getTaskReport();
081
082      Counters counters = taskReport.getTaskCounters();
083
084      for( CounterGroup group : counters )
085        {
086        Map<String, Long> values = allCounters.get( group.getName() );
087
088        if( values == null )
089          {
090          values = new HashMap<>();
091          allCounters.put( group.getName(), values );
092          }
093
094        for( Counter counter : group )
095          {
096          Long value = values.get( counter.getName() );
097
098          if( value == null )
099            value = 0L;
100
101          value += counter.getValue();
102
103          values.put( counter.getName(), value );
104          }
105        }
106      }
107
108    return allCounters;
109    }
110
111  protected Collection<String> getGroupNames( Map<String, Map<String, Long>> groups )
112    {
113    return groups.keySet();
114    }
115
116  protected Set<String> getCountersFor( Map<String, Map<String, Long>> counters, String group )
117    {
118    Set<String> results = new HashSet<>();
119
120    Map<String, Long> map = counters.get( group );
121
122    if( map != null )
123      results.addAll( map.keySet() );
124
125    return results;
126    }
127
128  protected long getCounterValue( Map<String, Map<String, Long>> counters, Enum counter )
129    {
130    return getCounterValue( counters, counter.getDeclaringClass().getName(), counter.name() );
131    }
132
133  protected long getCounterValue( Map<String, Map<String, Long>> counters, String groupName, String counterName )
134    {
135    Map<String, Long> counterGroup = counters.get( groupName );
136
137    if( counterGroup == null )
138      return 0;
139
140    Long counterValue = counterGroup.get( counterName );
141
142    if( counterValue == null )
143      return 0;
144
145    return counterValue;
146    }
147  }