001/*
002 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
003 *
004 * Project and contact information: http://www.cascading.org/
005 *
006 * This file is part of the Cascading project.
007 *
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package cascading.stats.tez.util;
022
023import java.io.IOException;
024import java.util.EnumSet;
025import java.util.Set;
026
027import cascading.CascadingException;
028import cascading.flow.hadoop.util.HadoopUtil;
029import cascading.flow.planner.PlatformInfo;
030import cascading.util.Util;
031import org.apache.hadoop.yarn.api.records.ApplicationId;
032import org.apache.hadoop.yarn.api.records.ApplicationReport;
033import org.apache.hadoop.yarn.conf.YarnConfiguration;
034import org.apache.hadoop.yarn.exceptions.YarnException;
035import org.apache.tez.client.FrameworkClient;
036import org.apache.tez.client.TezClient;
037import org.apache.tez.dag.api.DAG;
038import org.apache.tez.dag.api.TezConfiguration;
039import org.apache.tez.dag.api.TezException;
040import org.apache.tez.dag.api.client.DAGClient;
041import org.apache.tez.dag.api.client.DAGStatus;
042import org.apache.tez.dag.api.client.StatusGetOpts;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 *
048 */
049public class TezStatsUtil
050  {
051  private static final Logger LOG = LoggerFactory.getLogger( TezStatsUtil.class );
052
053  public static final Set<StatusGetOpts> STATUS_GET_COUNTERS = EnumSet.of( StatusGetOpts.GET_COUNTERS );
054
055  static Class<DAGClient> timelineClientClass = null;
056
057  public static final String TIMELINE_CLIENT_CLASS = "cascading.stats.tez.util.TezTimelineClient";
058
059  public static String getPlatformVersion()
060    {
061    PlatformInfo tez = HadoopUtil.getPlatformInfo( DAG.class, null, "Tez" );
062
063    if( tez == null || tez.version == null )
064      return "unknown";
065
066    return tez.version;
067    }
068
069  private static boolean loadClass()
070    {
071    if( timelineClientClass != null )
072      return true;
073
074    try
075      {
076      timelineClientClass = (Class<DAGClient>) Thread.currentThread().getContextClassLoader().loadClass( TIMELINE_CLIENT_CLASS );
077
078      return true;
079      }
080    catch( ClassNotFoundException exception )
081      {
082      LOG.error( "'" + YarnConfiguration.TIMELINE_SERVICE_ENABLED + "' is enabled, yet unable to load Tez YARN timeline client class: {}, ensure these dependencies are in your local CLASSPATH: tez-yarn-timeline-history, org.apache.tez:tez-yarn-timeline-history or org.apache.tez:tez-yarn-timeline-history-with-acls", TIMELINE_CLIENT_CLASS, exception );
083      }
084
085    return false;
086    }
087
088  public static DAGStatus getDagStatusWithCounters( DAGClient dagClient )
089    {
090    if( dagClient == null )
091      return null;
092
093    try
094      {
095      return dagClient.getDAGStatus( STATUS_GET_COUNTERS );
096      }
097    catch( IOException | TezException exception )
098      {
099      throw new CascadingException( "unable to get counters from dag client", exception );
100      }
101    }
102
103  /**
104   * Only called if the service is enabled
105   *
106   * @param dagClient
107   * @return
108   */
109  public static DAGClient createTimelineClient( DAGClient dagClient )
110    {
111    if( dagClient == null )
112      return null;
113
114    if( !loadClass() )
115      return null;
116
117    // TezTimelineClient( ApplicationId appId, String dagId, TezConfiguration conf, FrameworkClient frameworkClient, DAGClient dagClient )
118
119    Class[] types = new Class[]{
120      ApplicationId.class,
121      String.class,
122      TezConfiguration.class,
123      FrameworkClient.class,
124      DAGClient.class
125    };
126
127    ApplicationId appId = Util.returnInstanceFieldIfExistsSafe( dagClient, "appId" );
128    String dagId = Util.returnInstanceFieldIfExistsSafe( dagClient, "dagId" );
129    TezConfiguration conf = Util.returnInstanceFieldIfExistsSafe( dagClient, "conf" );
130    FrameworkClient frameworkClient = Util.returnInstanceFieldIfExistsSafe( dagClient, "frameworkClient" );
131
132    Object[] parameters = new Object[]{
133      appId,
134      dagId,
135      conf,
136      frameworkClient,
137      dagClient
138    };
139
140    try
141      {
142      return Util.invokeConstructor( timelineClientClass, parameters, types );
143      }
144    catch( CascadingException exception )
145      {
146      Throwable cause = exception.getCause();
147
148      if( cause instanceof ReflectiveOperationException && cause.getCause() instanceof TezException )
149        LOG.warn( "unable to construct timeline server client", cause.getCause() );
150      else if( cause instanceof ReflectiveOperationException && cause.getCause() instanceof NoSuchMethodError )
151        LOG.warn( "unable to construct timeline server client, check for compatible Tez version, current: {}", getPlatformVersion(), cause.getCause() );
152      else
153        LOG.warn( "unable to construct timeline server client", exception );
154      }
155
156    return null;
157    }
158
159  public static String getTrackingURL( TezClient tezClient, DAGClient dagClient )
160    {
161    if( tezClient == null || dagClient == null )
162      return null;
163
164    try
165      {
166      ApplicationId applicationId = tezClient.getAppMasterApplicationId();
167      FrameworkClient frameworkClient = getFrameworkClient( dagClient );
168
169      if( frameworkClient == null )
170        {
171        LOG.info( "unable to get framework client" );
172        return null;
173        }
174
175      ApplicationReport report = frameworkClient.getApplicationReport( applicationId );
176
177      if( report != null )
178        return report.getTrackingUrl();
179
180      }
181    catch( YarnException | IOException exception )
182      {
183      LOG.info( "unable to get tracking url" );
184      LOG.debug( "exception retrieving application report", exception );
185      }
186
187    return null;
188    }
189
190  private static FrameworkClient getFrameworkClient( DAGClient dagClient )
191    {
192    if( dagClient instanceof TezTimelineClient )
193      return ( (TezTimelineClient) dagClient ).getFrameworkClient();
194
195    return Util.returnInstanceFieldIfExistsSafe( dagClient, "frameworkClient" );
196    }
197  }