001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.tez.util;
022
023import java.io.IOException;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.Map;
027import java.util.Set;
028import javax.annotation.Nullable;
029
030import cascading.CascadingException;
031import org.apache.hadoop.yarn.api.records.ApplicationId;
032import org.apache.tez.client.FrameworkClient;
033import org.apache.tez.common.ATSConstants;
034import org.apache.tez.dag.api.TezConfiguration;
035import org.apache.tez.dag.api.TezException;
036import org.apache.tez.dag.api.client.DAGClient;
037import org.apache.tez.dag.api.client.DAGClientTimelineImpl;
038import org.apache.tez.dag.api.client.DAGStatus;
039import org.apache.tez.dag.api.client.StatusGetOpts;
040import org.apache.tez.dag.api.client.VertexStatus;
041import org.codehaus.jettison.json.JSONArray;
042import org.codehaus.jettison.json.JSONException;
043import org.codehaus.jettison.json.JSONObject;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import static org.apache.tez.common.ATSConstants.*;
048import static org.apache.tez.dag.history.logging.EntityTypes.TEZ_TASK_ID;
049
050/**
051 *
052 */
053public class TezTimelineClient extends DAGClientTimelineImpl implements TimelineClient
054  {
055  private static final Logger LOG = LoggerFactory.getLogger( TezTimelineClient.class );
056
057  private static final String FILTER_BY_FIELDS = "primaryfilters,otherinfo";
058
059  private final String dagId;
060  private final FrameworkClient frameworkClient;
061  private final DAGClient dagClient;
062
063  public TezTimelineClient( ApplicationId appId, String dagId, TezConfiguration conf, FrameworkClient frameworkClient, DAGClient dagClient ) throws TezException
064    {
065    super( appId, dagId, conf, frameworkClient, 5000 );
066    this.dagId = dagId;
067    this.frameworkClient = frameworkClient;
068    this.dagClient = dagClient;
069    }
070
071  public DAGClient getDAGClient()
072    {
073    return dagClient;
074    }
075
076  public FrameworkClient getFrameworkClient()
077    {
078    return frameworkClient;
079    }
080
081  @Override
082  public DAGStatus getDAGStatus( @Nullable Set<StatusGetOpts> statusOptions ) throws IOException, TezException
083    {
084    return dagClient.getDAGStatus( statusOptions );
085    }
086
087  @Override
088  public VertexStatus getVertexStatus( String vertexName, Set<StatusGetOpts> statusOptions ) throws IOException, TezException
089    {
090    return dagClient.getVertexStatus( vertexName, statusOptions );
091    }
092
093  @Override
094  public String getVertexID( String vertexName ) throws IOException, TezException
095    {
096    // the filter 'vertexName' is in the 'otherinfo' field, so it must be requested, otherwise timeline server throws
097    // an NPE. to be safe, we include both fields in the result
098    String format = "%s/%s?primaryFilter=%s:%s&secondaryFilter=vertexName:%s&fields=%s";
099    String url = String.format( format, baseUri, TEZ_VERTEX_ID, TEZ_DAG_ID, dagId, vertexName, FILTER_BY_FIELDS );
100
101    JSONObject jsonRoot = getJsonRootEntity( url );
102    JSONArray entitiesNode = jsonRoot.optJSONArray( ENTITIES );
103
104    if( entitiesNode == null || entitiesNode.length() != 1 )
105      throw new CascadingException( "failed to get vertex status from timeline server" );
106
107    try
108      {
109      return getJsonObject( entitiesNode, 0 ).getString( ENTITY );
110      }
111    catch( JSONException exception )
112      {
113      throw new CascadingException( "unable to get vertex node", exception );
114      }
115    }
116
117  @Override
118  public Iterator<TaskStatus> getVertexChildren( String vertexID, int limit, String startTaskID ) throws IOException, TezException
119    {
120    if( vertexID == null )
121      throw new IllegalArgumentException( "vertexID is required" );
122
123    String format = "%s/%s?primaryFilter=%s:%s&fields=%s&limit=%s";
124    String url = String.format( format, baseUri, TEZ_TASK_ID, TEZ_VERTEX_ID, vertexID, FILTER_BY_FIELDS, limit );
125
126    if( startTaskID != null )
127      url = String.format( "%s&fromId=%s", url, startTaskID );
128
129    JSONObject jsonRoot = getJsonRootEntity( url );
130    final JSONArray entitiesNode = jsonRoot.optJSONArray( ATSConstants.ENTITIES );
131
132    if( entitiesNode == null )
133      throw new CascadingException( "failed to get vertex task statuses from timeline server" );
134
135    LOG.debug( "vertex: {}, retrieved {} tasks", vertexID, entitiesNode.length() );
136
137    return new Iterator<TaskStatus>()
138    {
139    int index = 0;
140
141    @Override
142    public boolean hasNext()
143      {
144      return entitiesNode.length() != index;
145      }
146
147    @Override
148    public TaskStatus next()
149      {
150      return parseTaskStatus( getJsonObject( entitiesNode, index++ ) );
151      }
152
153    @Override
154    public void remove()
155      {
156
157      }
158    };
159    }
160
161  @Override
162  public TaskStatus getVertexChild( String taskID ) throws TezException
163    {
164    String format = "%s/%s/%s?fields=%s";
165    String url = String.format( format, baseUri, TEZ_TASK_ID, taskID, FILTER_BY_FIELDS );
166
167    JSONObject jsonRoot = getJsonRootEntity( url );
168
169    if( jsonRoot == null )
170      throw new CascadingException( "failed to get vertex task status from timeline server, for id: " + taskID );
171
172    return parseTaskStatus( jsonRoot );
173    }
174
175  private TaskStatus parseTaskStatus( JSONObject jsonRoot )
176    {
177    try
178      {
179      String taskID = jsonRoot.optString( ATSConstants.ENTITY );
180      JSONObject otherInfoNode = jsonRoot.getJSONObject( ATSConstants.OTHER_INFO );
181      String status = otherInfoNode.optString( ATSConstants.STATUS );
182      long scheduledTime = otherInfoNode.optLong( ATSConstants.SCHEDULED_TIME, -1 );
183      long startTime = otherInfoNode.optLong( ATSConstants.START_TIME, -1 ); // actual attempt launch time
184      long endTime = otherInfoNode.optLong( ATSConstants.FINISH_TIME, -1 ); // endTime
185      String successfulAttemptID = otherInfoNode.optString( ATSConstants.SUCCESSFUL_ATTEMPT_ID );
186      String diagnostics = otherInfoNode.optString( ATSConstants.DIAGNOSTICS );
187
188      if( status.equals( "" ) )
189        return new TaskStatus( taskID );
190
191      JSONObject countersNode = otherInfoNode.optJSONObject( ATSConstants.COUNTERS );
192      Map<String, Map<String, Long>> counters = parseDagCounters( countersNode );
193
194      return new TaskStatus( taskID, status, scheduledTime, startTime, endTime, successfulAttemptID, counters, diagnostics );
195      }
196    catch( JSONException exception )
197      {
198      throw new CascadingException( exception );
199      }
200    }
201
202  private Map<String, Map<String, Long>> parseDagCounters( JSONObject countersNode ) throws JSONException
203    {
204    if( countersNode == null )
205      return null;
206
207    JSONArray counterGroupNodes = countersNode.optJSONArray( ATSConstants.COUNTER_GROUPS );
208
209    if( counterGroupNodes == null )
210      return null;
211
212    Map<String, Map<String, Long>> counters = new HashMap<>();
213    int numCounterGroups = counterGroupNodes.length();
214
215    for( int i = 0; i < numCounterGroups; i++ )
216      parseCounterGroup( counters, counterGroupNodes.optJSONObject( i ) );
217
218    return counters;
219    }
220
221  private void parseCounterGroup( Map<String, Map<String, Long>> counters, JSONObject counterGroupNode ) throws JSONException
222    {
223    if( counterGroupNode == null )
224      return;
225
226    final String groupName = counterGroupNode.optString( ATSConstants.COUNTER_GROUP_NAME );
227//    final String groupDisplayName = counterGroupNode.optString( ATSConstants.COUNTER_GROUP_DISPLAY_NAME );
228    final JSONArray counterNodes = counterGroupNode.optJSONArray( ATSConstants.COUNTERS );
229    final int numCounters = counterNodes.length();
230
231    Map<String, Long> values = new HashMap<>();
232
233    counters.put( groupName, values );
234
235    for( int i = 0; i < numCounters; i++ )
236      {
237      JSONObject counterNode = counterNodes.getJSONObject( i );
238      String counterName = counterNode.getString( ATSConstants.COUNTER_NAME );
239//      String counterDisplayName = counterNode.getString( ATSConstants.COUNTER_DISPLAY_NAME );
240      long counterValue = counterNode.getLong( ATSConstants.COUNTER_VALUE );
241
242      values.put( counterName, counterValue );
243      }
244    }
245
246  // remove is unsupported in jettison on hadoop 24
247  protected JSONObject getRemoveJsonObject( JSONArray entitiesNode, int index, boolean doRemove )
248    {
249    try
250      {
251      JSONObject jsonObject = entitiesNode.getJSONObject( index );
252
253      if( doRemove )
254        entitiesNode.remove( jsonObject );
255
256      return jsonObject;
257      }
258    catch( JSONException exception )
259      {
260      throw new CascadingException( exception );
261      }
262    }
263
264  protected JSONObject getJsonObject( JSONArray entitiesNode, int index )
265    {
266    try
267      {
268      return entitiesNode.getJSONObject( index );
269      }
270    catch( JSONException exception )
271      {
272      throw new CascadingException( exception );
273      }
274    }
275  }