001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.client;
020
021import java.io.EOFException;
022import java.io.IOException;
023import java.net.ConnectException;
024import java.net.InetSocketAddress;
025import java.net.NoRouteToHostException;
026import java.net.SocketException;
027import java.net.UnknownHostException;
028import java.security.PrivilegedAction;
029import java.util.HashMap;
030import java.util.Map;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.apache.hadoop.classification.InterfaceAudience;
036import org.apache.hadoop.classification.InterfaceAudience.Private;
037import org.apache.hadoop.classification.InterfaceStability;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.io.retry.RetryPolicies;
040import org.apache.hadoop.io.retry.RetryPolicy;
041import org.apache.hadoop.io.retry.RetryProxy;
042import org.apache.hadoop.ipc.RetriableException;
043import org.apache.hadoop.ipc.StandbyException;
044import org.apache.hadoop.net.ConnectTimeoutException;
045import org.apache.hadoop.security.UserGroupInformation;
046import org.apache.hadoop.util.ReflectionUtils;
047import org.apache.hadoop.yarn.conf.HAUtil;
048import org.apache.hadoop.yarn.conf.YarnConfiguration;
049import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
050import org.apache.hadoop.yarn.ipc.YarnRPC;
051
052import com.google.common.annotations.VisibleForTesting;
053
054@InterfaceAudience.Public
055@InterfaceStability.Evolving
056@SuppressWarnings("unchecked")
057public class RMProxy<T> {
058
059  private static final Log LOG = LogFactory.getLog(RMProxy.class);
060
061  protected RMProxy() {}
062
063  /**
064   * Verify the passed protocol is supported.
065   */
066  @Private
067  protected void checkAllowedProtocols(Class<?> protocol) {}
068
069  /**
070   * Get the ResourceManager address from the provided Configuration for the
071   * given protocol.
072   */
073  @Private
074  protected InetSocketAddress getRMAddress(
075      YarnConfiguration conf, Class<?> protocol) throws IOException {
076    throw new UnsupportedOperationException("This method should be invoked " +
077        "from an instance of ClientRMProxy or ServerRMProxy");
078  }
079
080  /**
081   * Currently, used by Client and AM only
082   * Create a proxy for the specified protocol. For non-HA,
083   * this is a direct connection to the ResourceManager address. When HA is
084   * enabled, the proxy handles the failover between the ResourceManagers as
085   * well.
086   */
087  @Private
088  protected static <T> T createRMProxy(final Configuration configuration,
089      final Class<T> protocol, RMProxy instance) throws IOException {
090    YarnConfiguration conf = (configuration instanceof YarnConfiguration)
091        ? (YarnConfiguration) configuration
092        : new YarnConfiguration(configuration);
093    RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf));
094    return newProxyInstance(conf, protocol, instance, retryPolicy);
095  }
096
097  /**
098   * Currently, used by NodeManagers only.
099   * Create a proxy for the specified protocol. For non-HA,
100   * this is a direct connection to the ResourceManager address. When HA is
101   * enabled, the proxy handles the failover between the ResourceManagers as
102   * well.
103   */
104  @Private
105  protected static <T> T createRMProxy(final Configuration configuration,
106      final Class<T> protocol, RMProxy instance, final long retryTime,
107      final long retryInterval) throws IOException {
108    YarnConfiguration conf = (configuration instanceof YarnConfiguration)
109        ? (YarnConfiguration) configuration
110        : new YarnConfiguration(configuration);
111    RetryPolicy retryPolicy = createRetryPolicy(conf, retryTime, retryInterval,
112        HAUtil.isHAEnabled(conf));
113    return newProxyInstance(conf, protocol, instance, retryPolicy);
114  }
115
116  private static <T> T newProxyInstance(final YarnConfiguration conf,
117      final Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy)
118          throws IOException{
119    if (HAUtil.isHAEnabled(conf)) {
120      RMFailoverProxyProvider<T> provider =
121          instance.createRMFailoverProxyProvider(conf, protocol);
122      return (T) RetryProxy.create(protocol, provider, retryPolicy);
123    } else {
124      InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
125      LOG.info("Connecting to ResourceManager at " + rmAddress);
126      T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
127      return (T) RetryProxy.create(protocol, proxy, retryPolicy);
128    }
129  }
130
131  /**
132   * @deprecated
133   * This method is deprecated and is not used by YARN internally any more.
134   * To create a proxy to the RM, use ClientRMProxy#createRMProxy or
135   * ServerRMProxy#createRMProxy.
136   *
137   * Create a proxy to the ResourceManager at the specified address.
138   *
139   * @param conf Configuration to generate retry policy
140   * @param protocol Protocol for the proxy
141   * @param rmAddress Address of the ResourceManager
142   * @param <T> Type information of the proxy
143   * @return Proxy to the RM
144   * @throws IOException
145   */
146  @Deprecated
147  public static <T> T createRMProxy(final Configuration conf,
148      final Class<T> protocol, InetSocketAddress rmAddress) throws IOException {
149    RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf));
150    T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
151    LOG.info("Connecting to ResourceManager at " + rmAddress);
152    return (T) RetryProxy.create(protocol, proxy, retryPolicy);
153  }
154
155  /**
156   * Get a proxy to the RM at the specified address. To be used to create a
157   * RetryProxy.
158   */
159  @Private
160  static <T> T getProxy(final Configuration conf,
161      final Class<T> protocol, final InetSocketAddress rmAddress)
162      throws IOException {
163    return UserGroupInformation.getCurrentUser().doAs(
164      new PrivilegedAction<T>() {
165        @Override
166        public T run() {
167          return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf);
168        }
169      });
170  }
171
172  /**
173   * Helper method to create FailoverProxyProvider.
174   */
175  private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider(
176      Configuration conf, Class<T> protocol) {
177    Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass;
178    try {
179      defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>)
180          Class.forName(
181              YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER);
182    } catch (Exception e) {
183      throw new YarnRuntimeException("Invalid default failover provider class" +
184          YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e);
185    }
186
187    RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance(
188        conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
189            defaultProviderClass, RMFailoverProxyProvider.class), conf);
190    provider.init(conf, (RMProxy<T>) this, protocol);
191    return provider;
192  }
193
194  /**
195   * Fetch retry policy from Configuration
196   */
197  @Private
198  @VisibleForTesting
199  public static RetryPolicy createRetryPolicy(Configuration conf,
200      boolean isHAEnabled) {
201    long rmConnectWaitMS =
202        conf.getLong(
203            YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
204            YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS);
205    long rmConnectionRetryIntervalMS =
206        conf.getLong(
207            YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
208            YarnConfiguration
209                .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
210
211    return createRetryPolicy(conf, rmConnectWaitMS, rmConnectionRetryIntervalMS,
212        isHAEnabled);
213  }
214
215  /**
216   * Fetch retry policy from Configuration and create the
217   * retry policy with specified retryTime and retry interval.
218   */
219  protected static RetryPolicy createRetryPolicy(Configuration conf,
220      long retryTime, long retryInterval, boolean isHAEnabled) {
221    long rmConnectWaitMS = retryTime;
222    long rmConnectionRetryIntervalMS = retryInterval;
223
224    boolean waitForEver = (rmConnectWaitMS == -1);
225    if (!waitForEver) {
226      if (rmConnectWaitMS < 0) {
227        throw new YarnRuntimeException("Invalid Configuration. "
228            + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
229            + " can be -1, but can not be other negative numbers");
230      }
231
232      // try connect once
233      if (rmConnectWaitMS < rmConnectionRetryIntervalMS) {
234        LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS
235            + " is smaller than "
236            + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS
237            + ". Only try connect once.");
238        rmConnectWaitMS = 0;
239      }
240    }
241
242    // Handle HA case first
243    if (isHAEnabled) {
244      final long failoverSleepBaseMs = conf.getLong(
245          YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
246          rmConnectionRetryIntervalMS);
247
248      final long failoverSleepMaxMs = conf.getLong(
249          YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS,
250          rmConnectionRetryIntervalMS);
251
252      int maxFailoverAttempts = conf.getInt(
253          YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1);
254
255      if (maxFailoverAttempts == -1) {
256        if (waitForEver) {
257          maxFailoverAttempts = Integer.MAX_VALUE;
258        } else {
259          maxFailoverAttempts = (int) (rmConnectWaitMS / failoverSleepBaseMs);
260        }
261      }
262
263      return RetryPolicies.failoverOnNetworkException(
264          RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
265          failoverSleepBaseMs, failoverSleepMaxMs);
266    }
267
268    if (rmConnectionRetryIntervalMS < 0) {
269      throw new YarnRuntimeException("Invalid Configuration. " +
270          YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS +
271          " should not be negative.");
272    }
273
274    RetryPolicy retryPolicy = null;
275    if (waitForEver) {
276      retryPolicy = RetryPolicies.retryForeverWithFixedSleep(
277          rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
278    } else {
279      retryPolicy =
280          RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS,
281              rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS);
282    }
283
284    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
285        new HashMap<Class<? extends Exception>, RetryPolicy>();
286
287    exceptionToPolicyMap.put(EOFException.class, retryPolicy);
288    exceptionToPolicyMap.put(ConnectException.class, retryPolicy);
289    exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy);
290    exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy);
291    exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
292    exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
293    exceptionToPolicyMap.put(SocketException.class, retryPolicy);
294    exceptionToPolicyMap.put(StandbyException.class, retryPolicy);
295    // YARN-4288: local IOException is also possible.
296    exceptionToPolicyMap.put(IOException.class, retryPolicy);
297    // Not retry on remote IO exception.
298    return RetryPolicies.retryOtherThanRemoteException(
299        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
300  }
301}