001/*
002 * Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats;
022
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.LinkedHashMap;
026import java.util.List;
027import java.util.Map;
028
029import cascading.flow.FlowStep;
030import cascading.management.state.ClientState;
031import cascading.util.ProcessLogger;
032
033/** Class FlowStepStats collects {@link cascading.flow.FlowStep} specific statistics. */
034public abstract class FlowStepStats extends CascadingStats<FlowNodeStats>
035  {
036  private final FlowStep flowStep;
037  private Map<String, FlowNodeStats> flowNodeStatsMap = new LinkedHashMap<>(); // topologically ordered
038
039  protected FlowStepStats( FlowStep flowStep, ClientState clientState )
040    {
041    super( flowStep.getName(), clientState );
042    this.flowStep = flowStep;
043    this.flowStep.setFlowStepStats( this );
044    }
045
046  @Override
047  protected ProcessLogger getProcessLogger()
048    {
049    if( flowStep != null && flowStep instanceof ProcessLogger )
050      return (ProcessLogger) flowStep;
051
052    return ProcessLogger.NULL;
053    }
054
055  @Override
056  public String getID()
057    {
058    return flowStep.getID();
059    }
060
061  @Override
062  public Type getType()
063    {
064    return Type.STEP;
065    }
066
067  public FlowStep getFlowStep()
068    {
069    return flowStep;
070    }
071
072  public void addNodeStats( FlowNodeStats flowNodeStats )
073    {
074    flowNodeStatsMap.put( flowNodeStats.getID(), flowNodeStats );
075    }
076
077  protected Map<String, FlowNodeStats> getFlowNodeStatsMap()
078    {
079    return flowNodeStatsMap;
080    }
081
082  public List<FlowNodeStats> getFlowNodeStats()
083    {
084    return new ArrayList<>( flowNodeStatsMap.values() );
085    }
086
087  public int getNodesCount()
088    {
089    return flowNodeStatsMap.size();
090    }
091
092  protected Collection<String> getFlowNodeIDs()
093    {
094    return flowNodeStatsMap.keySet();
095    }
096
097  @Override
098  public Collection<FlowNodeStats> getChildren()
099    {
100    return flowNodeStatsMap.values();
101    }
102
103  @Override
104  public FlowNodeStats getChildWith( String id )
105    {
106    return flowNodeStatsMap.get( id );
107    }
108
109  @Override
110  public synchronized void recordInfo()
111    {
112    clientState.recordFlowStep( flowStep );
113    }
114
115  @Override
116  public String toString()
117    {
118    return "Step{" + getStatsString() + '}';
119    }
120
121  public boolean hasCapturedFinalDetail()
122    {
123    Collection<FlowNodeStats> values = flowNodeStatsMap.values();
124
125    for( FlowNodeStats nodeStats : values )
126      {
127      if( !nodeStats.getChildren().isEmpty() && !nodeStats.hasCapturedFinalDetail() )
128        return false;
129      }
130
131    return true;
132    }
133
134  /**
135   * An internal method that will refresh current counters and, if a clientState client is enabled, the child details
136   * including node and slice statistics.
137   * <p/>
138   * All results will be then recorded by the clientState implementation.
139   * <p/>
140   * See {@link #captureDetail()} to force all statistics to be cached and locally accessible.
141   */
142  public abstract void recordChildStats();
143
144  /**
145   * Method getProcessStepID returns the ID representing the under platform process.
146   * <p/>
147   * A FlowStep represents a unit of work on a remote platform, in the case of MapReduce, a
148   * MapReduce job. The step ID would be the job id.
149   *
150   * @return a String or null if unavailable
151   */
152  public abstract String getProcessStepID();
153
154  public String getProcessStatusURL()
155    {
156    return null;
157    }
158  }