001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.client; 020 021import java.io.EOFException; 022import java.io.IOException; 023import java.net.ConnectException; 024import java.net.InetSocketAddress; 025import java.net.NoRouteToHostException; 026import java.net.SocketException; 027import java.net.UnknownHostException; 028import java.security.PrivilegedAction; 029import java.util.HashMap; 030import java.util.Map; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.apache.hadoop.classification.InterfaceAudience; 036import org.apache.hadoop.classification.InterfaceAudience.Private; 037import org.apache.hadoop.classification.InterfaceStability; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.io.retry.RetryPolicies; 040import org.apache.hadoop.io.retry.RetryPolicy; 041import org.apache.hadoop.io.retry.RetryProxy; 042import org.apache.hadoop.ipc.RetriableException; 043import org.apache.hadoop.ipc.StandbyException; 044import org.apache.hadoop.net.ConnectTimeoutException; 045import org.apache.hadoop.security.UserGroupInformation; 046import org.apache.hadoop.util.ReflectionUtils; 047import org.apache.hadoop.yarn.conf.HAUtil; 048import org.apache.hadoop.yarn.conf.YarnConfiguration; 049import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 050import org.apache.hadoop.yarn.ipc.YarnRPC; 051 052import com.google.common.annotations.VisibleForTesting; 053 054@InterfaceAudience.Public 055@InterfaceStability.Evolving 056@SuppressWarnings("unchecked") 057public class RMProxy<T> { 058 059 private static final Log LOG = LogFactory.getLog(RMProxy.class); 060 061 protected RMProxy() {} 062 063 /** 064 * Verify the passed protocol is supported. 065 */ 066 @Private 067 protected void checkAllowedProtocols(Class<?> protocol) {} 068 069 /** 070 * Get the ResourceManager address from the provided Configuration for the 071 * given protocol. 072 */ 073 @Private 074 protected InetSocketAddress getRMAddress( 075 YarnConfiguration conf, Class<?> protocol) throws IOException { 076 throw new UnsupportedOperationException("This method should be invoked " + 077 "from an instance of ClientRMProxy or ServerRMProxy"); 078 } 079 080 /** 081 * Currently, used by Client and AM only 082 * Create a proxy for the specified protocol. For non-HA, 083 * this is a direct connection to the ResourceManager address. When HA is 084 * enabled, the proxy handles the failover between the ResourceManagers as 085 * well. 086 */ 087 @Private 088 protected static <T> T createRMProxy(final Configuration configuration, 089 final Class<T> protocol, RMProxy instance) throws IOException { 090 YarnConfiguration conf = (configuration instanceof YarnConfiguration) 091 ? (YarnConfiguration) configuration 092 : new YarnConfiguration(configuration); 093 RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf)); 094 return newProxyInstance(conf, protocol, instance, retryPolicy); 095 } 096 097 /** 098 * Currently, used by NodeManagers only. 099 * Create a proxy for the specified protocol. For non-HA, 100 * this is a direct connection to the ResourceManager address. When HA is 101 * enabled, the proxy handles the failover between the ResourceManagers as 102 * well. 103 */ 104 @Private 105 protected static <T> T createRMProxy(final Configuration configuration, 106 final Class<T> protocol, RMProxy instance, final long retryTime, 107 final long retryInterval) throws IOException { 108 YarnConfiguration conf = (configuration instanceof YarnConfiguration) 109 ? (YarnConfiguration) configuration 110 : new YarnConfiguration(configuration); 111 RetryPolicy retryPolicy = createRetryPolicy(conf, retryTime, retryInterval, 112 HAUtil.isHAEnabled(conf)); 113 return newProxyInstance(conf, protocol, instance, retryPolicy); 114 } 115 116 private static <T> T newProxyInstance(final YarnConfiguration conf, 117 final Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy) 118 throws IOException{ 119 if (HAUtil.isHAEnabled(conf)) { 120 RMFailoverProxyProvider<T> provider = 121 instance.createRMFailoverProxyProvider(conf, protocol); 122 return (T) RetryProxy.create(protocol, provider, retryPolicy); 123 } else { 124 InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol); 125 LOG.info("Connecting to ResourceManager at " + rmAddress); 126 T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress); 127 return (T) RetryProxy.create(protocol, proxy, retryPolicy); 128 } 129 } 130 131 /** 132 * @deprecated 133 * This method is deprecated and is not used by YARN internally any more. 134 * To create a proxy to the RM, use ClientRMProxy#createRMProxy or 135 * ServerRMProxy#createRMProxy. 136 * 137 * Create a proxy to the ResourceManager at the specified address. 138 * 139 * @param conf Configuration to generate retry policy 140 * @param protocol Protocol for the proxy 141 * @param rmAddress Address of the ResourceManager 142 * @param <T> Type information of the proxy 143 * @return Proxy to the RM 144 * @throws IOException 145 */ 146 @Deprecated 147 public static <T> T createRMProxy(final Configuration conf, 148 final Class<T> protocol, InetSocketAddress rmAddress) throws IOException { 149 RetryPolicy retryPolicy = createRetryPolicy(conf, HAUtil.isHAEnabled(conf)); 150 T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress); 151 LOG.info("Connecting to ResourceManager at " + rmAddress); 152 return (T) RetryProxy.create(protocol, proxy, retryPolicy); 153 } 154 155 /** 156 * Get a proxy to the RM at the specified address. To be used to create a 157 * RetryProxy. 158 */ 159 @Private 160 static <T> T getProxy(final Configuration conf, 161 final Class<T> protocol, final InetSocketAddress rmAddress) 162 throws IOException { 163 return UserGroupInformation.getCurrentUser().doAs( 164 new PrivilegedAction<T>() { 165 @Override 166 public T run() { 167 return (T) YarnRPC.create(conf).getProxy(protocol, rmAddress, conf); 168 } 169 }); 170 } 171 172 /** 173 * Helper method to create FailoverProxyProvider. 174 */ 175 private <T> RMFailoverProxyProvider<T> createRMFailoverProxyProvider( 176 Configuration conf, Class<T> protocol) { 177 Class<? extends RMFailoverProxyProvider<T>> defaultProviderClass; 178 try { 179 defaultProviderClass = (Class<? extends RMFailoverProxyProvider<T>>) 180 Class.forName( 181 YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER); 182 } catch (Exception e) { 183 throw new YarnRuntimeException("Invalid default failover provider class" + 184 YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER, e); 185 } 186 187 RMFailoverProxyProvider<T> provider = ReflectionUtils.newInstance( 188 conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER, 189 defaultProviderClass, RMFailoverProxyProvider.class), conf); 190 provider.init(conf, (RMProxy<T>) this, protocol); 191 return provider; 192 } 193 194 /** 195 * Fetch retry policy from Configuration 196 */ 197 @Private 198 @VisibleForTesting 199 public static RetryPolicy createRetryPolicy(Configuration conf, 200 boolean isHAEnabled) { 201 long rmConnectWaitMS = 202 conf.getLong( 203 YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, 204 YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_MAX_WAIT_MS); 205 long rmConnectionRetryIntervalMS = 206 conf.getLong( 207 YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 208 YarnConfiguration 209 .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS); 210 211 return createRetryPolicy(conf, rmConnectWaitMS, rmConnectionRetryIntervalMS, 212 isHAEnabled); 213 } 214 215 /** 216 * Fetch retry policy from Configuration and create the 217 * retry policy with specified retryTime and retry interval. 218 */ 219 protected static RetryPolicy createRetryPolicy(Configuration conf, 220 long retryTime, long retryInterval, boolean isHAEnabled) { 221 long rmConnectWaitMS = retryTime; 222 long rmConnectionRetryIntervalMS = retryInterval; 223 224 boolean waitForEver = (rmConnectWaitMS == -1); 225 if (!waitForEver) { 226 if (rmConnectWaitMS < 0) { 227 throw new YarnRuntimeException("Invalid Configuration. " 228 + YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS 229 + " can be -1, but can not be other negative numbers"); 230 } 231 232 // try connect once 233 if (rmConnectWaitMS < rmConnectionRetryIntervalMS) { 234 LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS 235 + " is smaller than " 236 + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS 237 + ". Only try connect once."); 238 rmConnectWaitMS = 0; 239 } 240 } 241 242 // Handle HA case first 243 if (isHAEnabled) { 244 final long failoverSleepBaseMs = conf.getLong( 245 YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 246 rmConnectionRetryIntervalMS); 247 248 final long failoverSleepMaxMs = conf.getLong( 249 YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_MAX_MS, 250 rmConnectionRetryIntervalMS); 251 252 int maxFailoverAttempts = conf.getInt( 253 YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, -1); 254 255 if (maxFailoverAttempts == -1) { 256 if (waitForEver) { 257 maxFailoverAttempts = Integer.MAX_VALUE; 258 } else { 259 maxFailoverAttempts = (int) (rmConnectWaitMS / failoverSleepBaseMs); 260 } 261 } 262 263 return RetryPolicies.failoverOnNetworkException( 264 RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, 265 failoverSleepBaseMs, failoverSleepMaxMs); 266 } 267 268 if (rmConnectionRetryIntervalMS < 0) { 269 throw new YarnRuntimeException("Invalid Configuration. " + 270 YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS + 271 " should not be negative."); 272 } 273 274 RetryPolicy retryPolicy = null; 275 if (waitForEver) { 276 retryPolicy = RetryPolicies.retryForeverWithFixedSleep( 277 rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS); 278 } else { 279 retryPolicy = 280 RetryPolicies.retryUpToMaximumTimeWithFixedSleep(rmConnectWaitMS, 281 rmConnectionRetryIntervalMS, TimeUnit.MILLISECONDS); 282 } 283 284 Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = 285 new HashMap<Class<? extends Exception>, RetryPolicy>(); 286 287 exceptionToPolicyMap.put(EOFException.class, retryPolicy); 288 exceptionToPolicyMap.put(ConnectException.class, retryPolicy); 289 exceptionToPolicyMap.put(NoRouteToHostException.class, retryPolicy); 290 exceptionToPolicyMap.put(UnknownHostException.class, retryPolicy); 291 exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy); 292 exceptionToPolicyMap.put(RetriableException.class, retryPolicy); 293 exceptionToPolicyMap.put(SocketException.class, retryPolicy); 294 exceptionToPolicyMap.put(StandbyException.class, retryPolicy); 295 // YARN-4288: local IOException is also possible. 296 exceptionToPolicyMap.put(IOException.class, retryPolicy); 297 // Not retry on remote IO exception. 298 return RetryPolicies.retryOtherThanRemoteException( 299 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 300 } 301}