001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.ServiceLoader; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.apache.hadoop.classification.InterfaceAudience; 032import org.apache.hadoop.classification.InterfaceStability; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.io.Text; 037import org.apache.hadoop.mapred.JobConf; 038import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 039import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; 040import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 041import org.apache.hadoop.mapreduce.util.ConfigUtil; 042import org.apache.hadoop.mapreduce.v2.LogParams; 043import org.apache.hadoop.security.UserGroupInformation; 044import org.apache.hadoop.security.token.SecretManager.InvalidToken; 045import org.apache.hadoop.security.token.Token; 046 047/** 048 * Provides a way to access information about the map/reduce cluster. 049 */ 050@InterfaceAudience.Public 051@InterfaceStability.Evolving 052public class Cluster { 053 054 @InterfaceStability.Evolving 055 public static enum JobTrackerStatus {INITIALIZING, RUNNING}; 056 057 private ClientProtocolProvider clientProtocolProvider; 058 private ClientProtocol client; 059 private UserGroupInformation ugi; 060 private Configuration conf; 061 private FileSystem fs = null; 062 private Path sysDir = null; 063 private Path stagingAreaDir = null; 064 private Path jobHistoryDir = null; 065 private static final Log LOG = LogFactory.getLog(Cluster.class); 066 067 private static ServiceLoader<ClientProtocolProvider> frameworkLoader = 068 ServiceLoader.load(ClientProtocolProvider.class); 069 private volatile List<ClientProtocolProvider> providerList = null; 070 071 private void initProviderList() { 072 if (providerList == null) { 073 synchronized (frameworkLoader) { 074 if (providerList == null) { 075 List<ClientProtocolProvider> localProviderList = 076 new ArrayList<ClientProtocolProvider>(); 077 for (ClientProtocolProvider provider : frameworkLoader) { 078 localProviderList.add(provider); 079 } 080 providerList = localProviderList; 081 } 082 } 083 } 084 } 085 086 static { 087 ConfigUtil.loadResources(); 088 } 089 090 public Cluster(Configuration conf) throws IOException { 091 this(null, conf); 092 } 093 094 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 095 throws IOException { 096 this.conf = conf; 097 this.ugi = UserGroupInformation.getCurrentUser(); 098 initialize(jobTrackAddr, conf); 099 } 100 101 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) 102 throws IOException { 103 104 initProviderList(); 105 for (ClientProtocolProvider provider : providerList) { 106 LOG.debug("Trying ClientProtocolProvider : " 107 + provider.getClass().getName()); 108 ClientProtocol clientProtocol = null; 109 try { 110 if (jobTrackAddr == null) { 111 clientProtocol = provider.create(conf); 112 } else { 113 clientProtocol = provider.create(jobTrackAddr, conf); 114 } 115 116 if (clientProtocol != null) { 117 clientProtocolProvider = provider; 118 client = clientProtocol; 119 LOG.debug("Picked " + provider.getClass().getName() 120 + " as the ClientProtocolProvider"); 121 break; 122 } else { 123 LOG.debug("Cannot pick " + provider.getClass().getName() 124 + " as the ClientProtocolProvider - returned null protocol"); 125 } 126 } catch (Exception e) { 127 LOG.info("Failed to use " + provider.getClass().getName() 128 + " due to error: ", e); 129 } 130 } 131 132 if (null == clientProtocolProvider || null == client) { 133 throw new IOException( 134 "Cannot initialize Cluster. Please check your configuration for " 135 + MRConfig.FRAMEWORK_NAME 136 + " and the correspond server addresses."); 137 } 138 } 139 140 ClientProtocol getClient() { 141 return client; 142 } 143 144 Configuration getConf() { 145 return conf; 146 } 147 148 /** 149 * Close the <code>Cluster</code>. 150 * @throws IOException 151 */ 152 public synchronized void close() throws IOException { 153 clientProtocolProvider.close(client); 154 } 155 156 private Job[] getJobs(JobStatus[] stats) throws IOException { 157 List<Job> jobs = new ArrayList<Job>(); 158 for (JobStatus stat : stats) { 159 jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile()))); 160 } 161 return jobs.toArray(new Job[0]); 162 } 163 164 /** 165 * Get the file system where job-specific files are stored 166 * 167 * @return object of FileSystem 168 * @throws IOException 169 * @throws InterruptedException 170 */ 171 public synchronized FileSystem getFileSystem() 172 throws IOException, InterruptedException { 173 if (this.fs == null) { 174 try { 175 this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 176 public FileSystem run() throws IOException, InterruptedException { 177 final Path sysDir = new Path(client.getSystemDir()); 178 return sysDir.getFileSystem(getConf()); 179 } 180 }); 181 } catch (InterruptedException e) { 182 throw new RuntimeException(e); 183 } 184 } 185 return fs; 186 } 187 188 /** 189 * Get job corresponding to jobid. 190 * 191 * @param jobId 192 * @return object of {@link Job} 193 * @throws IOException 194 * @throws InterruptedException 195 */ 196 public Job getJob(JobID jobId) throws IOException, InterruptedException { 197 JobStatus status = client.getJobStatus(jobId); 198 if (status != null) { 199 final JobConf conf = new JobConf(); 200 final Path jobPath = new Path(client.getFilesystemName(), 201 status.getJobFile()); 202 final FileSystem fs = FileSystem.get(jobPath.toUri(), getConf()); 203 try { 204 conf.addResource(fs.open(jobPath), jobPath.toString()); 205 } catch (FileNotFoundException fnf) { 206 if (LOG.isWarnEnabled()) { 207 LOG.warn("Job conf missing on cluster", fnf); 208 } 209 } 210 return Job.getInstance(this, status, conf); 211 } 212 return null; 213 } 214 215 /** 216 * Get all the queues in cluster. 217 * 218 * @return array of {@link QueueInfo} 219 * @throws IOException 220 * @throws InterruptedException 221 */ 222 public QueueInfo[] getQueues() throws IOException, InterruptedException { 223 return client.getQueues(); 224 } 225 226 /** 227 * Get queue information for the specified name. 228 * 229 * @param name queuename 230 * @return object of {@link QueueInfo} 231 * @throws IOException 232 * @throws InterruptedException 233 */ 234 public QueueInfo getQueue(String name) 235 throws IOException, InterruptedException { 236 return client.getQueue(name); 237 } 238 239 /** 240 * Get log parameters for the specified jobID or taskAttemptID 241 * @param jobID the job id. 242 * @param taskAttemptID the task attempt id. Optional. 243 * @return the LogParams 244 * @throws IOException 245 * @throws InterruptedException 246 */ 247 public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID) 248 throws IOException, InterruptedException { 249 return client.getLogFileParams(jobID, taskAttemptID); 250 } 251 252 /** 253 * Get current cluster status. 254 * 255 * @return object of {@link ClusterMetrics} 256 * @throws IOException 257 * @throws InterruptedException 258 */ 259 public ClusterMetrics getClusterStatus() throws IOException, InterruptedException { 260 return client.getClusterMetrics(); 261 } 262 263 /** 264 * Get all active trackers in the cluster. 265 * 266 * @return array of {@link TaskTrackerInfo} 267 * @throws IOException 268 * @throws InterruptedException 269 */ 270 public TaskTrackerInfo[] getActiveTaskTrackers() 271 throws IOException, InterruptedException { 272 return client.getActiveTrackers(); 273 } 274 275 /** 276 * Get blacklisted trackers. 277 * 278 * @return array of {@link TaskTrackerInfo} 279 * @throws IOException 280 * @throws InterruptedException 281 */ 282 public TaskTrackerInfo[] getBlackListedTaskTrackers() 283 throws IOException, InterruptedException { 284 return client.getBlacklistedTrackers(); 285 } 286 287 /** 288 * Get all the jobs in cluster. 289 * 290 * @return array of {@link Job} 291 * @throws IOException 292 * @throws InterruptedException 293 * @deprecated Use {@link #getAllJobStatuses()} instead. 294 */ 295 @Deprecated 296 public Job[] getAllJobs() throws IOException, InterruptedException { 297 return getJobs(client.getAllJobs()); 298 } 299 300 /** 301 * Get job status for all jobs in the cluster. 302 * @return job status for all jobs in cluster 303 * @throws IOException 304 * @throws InterruptedException 305 */ 306 public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException { 307 return client.getAllJobs(); 308 } 309 310 /** 311 * Grab the jobtracker system directory path where 312 * job-specific files will be placed. 313 * 314 * @return the system directory where job-specific files are to be placed. 315 */ 316 public Path getSystemDir() throws IOException, InterruptedException { 317 if (sysDir == null) { 318 sysDir = new Path(client.getSystemDir()); 319 } 320 return sysDir; 321 } 322 323 /** 324 * Grab the jobtracker's view of the staging directory path where 325 * job-specific files will be placed. 326 * 327 * @return the staging directory where job-specific files are to be placed. 328 */ 329 public Path getStagingAreaDir() throws IOException, InterruptedException { 330 if (stagingAreaDir == null) { 331 stagingAreaDir = new Path(client.getStagingAreaDir()); 332 } 333 return stagingAreaDir; 334 } 335 336 /** 337 * Get the job history file path for a given job id. The job history file at 338 * this path may or may not be existing depending on the job completion state. 339 * The file is present only for the completed jobs. 340 * @param jobId the JobID of the job submitted by the current user. 341 * @return the file path of the job history file 342 * @throws IOException 343 * @throws InterruptedException 344 */ 345 public String getJobHistoryUrl(JobID jobId) throws IOException, 346 InterruptedException { 347 if (jobHistoryDir == null) { 348 jobHistoryDir = new Path(client.getJobHistoryDir()); 349 } 350 return new Path(jobHistoryDir, jobId.toString() + "_" 351 + ugi.getShortUserName()).toString(); 352 } 353 354 /** 355 * Gets the Queue ACLs for current user 356 * @return array of QueueAclsInfo object for current user. 357 * @throws IOException 358 */ 359 public QueueAclsInfo[] getQueueAclsForCurrentUser() 360 throws IOException, InterruptedException { 361 return client.getQueueAclsForCurrentUser(); 362 } 363 364 /** 365 * Gets the root level queues. 366 * @return array of JobQueueInfo object. 367 * @throws IOException 368 */ 369 public QueueInfo[] getRootQueues() throws IOException, InterruptedException { 370 return client.getRootQueues(); 371 } 372 373 /** 374 * Returns immediate children of queueName. 375 * @param queueName 376 * @return array of JobQueueInfo which are children of queueName 377 * @throws IOException 378 */ 379 public QueueInfo[] getChildQueues(String queueName) 380 throws IOException, InterruptedException { 381 return client.getChildQueues(queueName); 382 } 383 384 /** 385 * Get the JobTracker's status. 386 * 387 * @return {@link JobTrackerStatus} of the JobTracker 388 * @throws IOException 389 * @throws InterruptedException 390 */ 391 public JobTrackerStatus getJobTrackerStatus() throws IOException, 392 InterruptedException { 393 return client.getJobTrackerStatus(); 394 } 395 396 /** 397 * Get the tasktracker expiry interval for the cluster 398 * @return the expiry interval in msec 399 */ 400 public long getTaskTrackerExpiryInterval() throws IOException, 401 InterruptedException { 402 return client.getTaskTrackerExpiryInterval(); 403 } 404 405 /** 406 * Get a delegation token for the user from the JobTracker. 407 * @param renewer the user who can renew the token 408 * @return the new token 409 * @throws IOException 410 */ 411 public Token<DelegationTokenIdentifier> 412 getDelegationToken(Text renewer) throws IOException, InterruptedException{ 413 // client has already set the service 414 return client.getDelegationToken(renewer); 415 } 416 417 /** 418 * Renew a delegation token 419 * @param token the token to renew 420 * @return the new expiration time 421 * @throws InvalidToken 422 * @throws IOException 423 * @deprecated Use {@link Token#renew} instead 424 */ 425 public long renewDelegationToken(Token<DelegationTokenIdentifier> token 426 ) throws InvalidToken, IOException, 427 InterruptedException { 428 return token.renew(getConf()); 429 } 430 431 /** 432 * Cancel a delegation token from the JobTracker 433 * @param token the token to cancel 434 * @throws IOException 435 * @deprecated Use {@link Token#cancel} instead 436 */ 437 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token 438 ) throws IOException, 439 InterruptedException { 440 token.cancel(getConf()); 441 } 442 443}