001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.local;
022
023import java.util.Collection;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.LinkedHashSet;
028import java.util.Map;
029import java.util.Properties;
030import java.util.Set;
031
032import cascading.flow.FlowStep;
033import cascading.management.state.ClientState;
034import cascading.stats.FlowStepStats;
035
036import static java.util.Collections.synchronizedMap;
037import static java.util.Collections.unmodifiableCollection;
038
039/**
040 *
041 */
042public class LocalStepStats extends FlowStepStats
043  {
044  final Map<String, Map<String, Long>> counters = synchronizedMap( new HashMap<String, Map<String, Long>>() );
045
046  /** Constructor CascadingStats creates a new CascadingStats instance. */
047  public LocalStepStats( FlowStep<Properties> flowStep, ClientState clientState )
048    {
049    super( flowStep, clientState );
050    }
051
052  @Override
053  public void recordChildStats()
054    {
055    }
056
057  @Override
058  public long getLastSuccessfulCounterFetchTime()
059    {
060    return System.currentTimeMillis();
061    }
062
063  @Override
064  public Collection<String> getCounterGroups()
065    {
066    synchronized( counters )
067      {
068      return unmodifiableCollection( new LinkedHashSet<String>( counters.keySet() ) );
069      }
070    }
071
072  @Override
073  public Collection<String> getCounterGroupsMatching( String regex )
074    {
075    Collection<String> counterGroups = getCounterGroups();
076
077    Set<String> results = new HashSet<String>();
078
079    synchronized( counters )
080      {
081      for( String group : counterGroups )
082        {
083        if( group.matches( regex ) )
084          results.add( group );
085        }
086      }
087
088    return unmodifiableCollection( results );
089    }
090
091  @Override
092  public Collection<String> getCountersFor( String group )
093    {
094    Map<String, Long> groupCollection = counters.get( group );
095
096    if( groupCollection == null )
097      return Collections.emptySet();
098
099    synchronized( groupCollection )
100      {
101      return unmodifiableCollection( new LinkedHashSet<String>( groupCollection.keySet() ) );
102      }
103    }
104
105  @Override
106  public long getCounterValue( Enum counter )
107    {
108    Map<String, Long> counterMap = counters.get( counter.getDeclaringClass().getName() );
109
110    String counterString = counter.toString();
111
112    // we don't remove counters, so safe
113    if( counterMap == null || !counterMap.containsKey( counterString ) )
114      return 0;
115
116    return counterMap.get( counterString );
117    }
118
119  @Override
120  public long getCounterValue( String group, String counter )
121    {
122    Map<String, Long> counterMap = counters.get( group );
123
124    // we don't remove counters, so safe
125    if( counterMap == null || !counterMap.containsKey( counter ) )
126      return 0;
127
128    return counterMap.get( counter );
129    }
130
131  public void increment( Enum counter, long amount )
132    {
133    increment( counter.getDeclaringClass().getName(), counter.toString(), amount );
134    }
135
136  public void increment( String group, String counter, long amount )
137    {
138    Map<String, Long> groupMap = getCreateCounter( group );
139
140    synchronized( groupMap )
141      {
142      Long value = groupMap.get( counter );
143
144      if( value == null )
145        value = 0L;
146
147      groupMap.put( counter, value + amount );
148      }
149    }
150
151  private Map<String, Long> getCreateCounter( String group )
152    {
153    synchronized( counters )
154      {
155      Map<String, Long> counterMap = counters.get( group );
156
157      if( counterMap == null )
158        {
159        counterMap = synchronizedMap( new HashMap<String, Long>() );
160        counters.put( group, counterMap );
161        }
162
163      return counterMap;
164      }
165    }
166
167  @Override
168  public void captureDetail( Type depth )
169    {
170    }
171
172  @Override
173  public String getProcessStepID()
174    {
175    return getID();
176    }
177
178  @Override
179  public Collection getChildren()
180    {
181    return Collections.emptyList();
182    }
183  }