001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.ServiceLoader; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.apache.hadoop.classification.InterfaceAudience; 032import org.apache.hadoop.classification.InterfaceStability; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.io.Text; 037import org.apache.hadoop.mapred.JobConf; 038import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 039import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; 040import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 041import org.apache.hadoop.mapreduce.util.ConfigUtil; 042import org.apache.hadoop.mapreduce.v2.LogParams; 043import org.apache.hadoop.security.UserGroupInformation; 044import org.apache.hadoop.security.token.SecretManager.InvalidToken; 045import org.apache.hadoop.security.token.Token; 046 047/** 048 * Provides a way to access information about the map/reduce cluster. 049 */ 050@InterfaceAudience.Public 051@InterfaceStability.Evolving 052public class Cluster { 053 054 @InterfaceStability.Evolving 055 public static enum JobTrackerStatus {INITIALIZING, RUNNING}; 056 057 private ClientProtocolProvider clientProtocolProvider; 058 private ClientProtocol client; 059 private UserGroupInformation ugi; 060 private Configuration conf; 061 private FileSystem fs = null; 062 private Path sysDir = null; 063 private Path stagingAreaDir = null; 064 private Path jobHistoryDir = null; 065 private static final Log LOG = LogFactory.getLog(Cluster.class); 066 067 private static ServiceLoader<ClientProtocolProvider> frameworkLoader = 068 ServiceLoader.load(ClientProtocolProvider.class); 069 private volatile List<ClientProtocolProvider> providerList = null; 070 071 private void initProviderList() { 072 if (providerList == null) { 073 synchronized (frameworkLoader) { 074 if (providerList == null) { 075 List<ClientProtocolProvider> localProviderList = 076 new ArrayList<ClientProtocolProvider>(); 077 for (ClientProtocolProvider provider : frameworkLoader) { 078 localProviderList.add(provider); 079 } 080 providerList = localProviderList; 081 } 082 } 083 } 084 } 085 086 static { 087 ConfigUtil.loadResources(); 088 } 089 090 public Cluster(Configuration conf) throws IOException { 091 this(null, conf); 092 } 093 094 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 095 throws IOException { 096 this.conf = conf; 097 this.ugi = UserGroupInformation.getCurrentUser(); 098 initialize(jobTrackAddr, conf); 099 } 100 101 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) 102 throws IOException { 103 104 initProviderList(); 105 final IOException initEx = new IOException( 106 "Cannot initialize Cluster. Please check your configuration for " 107 + MRConfig.FRAMEWORK_NAME 108 + " and the correspond server addresses."); 109 for (ClientProtocolProvider provider : providerList) { 110 LOG.debug("Trying ClientProtocolProvider : " 111 + provider.getClass().getName()); 112 ClientProtocol clientProtocol = null; 113 try { 114 if (jobTrackAddr == null) { 115 clientProtocol = provider.create(conf); 116 } else { 117 clientProtocol = provider.create(jobTrackAddr, conf); 118 } 119 120 if (clientProtocol != null) { 121 clientProtocolProvider = provider; 122 client = clientProtocol; 123 LOG.debug("Picked " + provider.getClass().getName() 124 + " as the ClientProtocolProvider"); 125 break; 126 } else { 127 LOG.debug("Cannot pick " + provider.getClass().getName() 128 + " as the ClientProtocolProvider - returned null protocol"); 129 } 130 } catch (Exception e) { 131 final String errMsg = "Failed to use " + provider.getClass().getName() 132 + " due to error: "; 133 initEx.addSuppressed(new IOException(errMsg, e)); 134 LOG.info(errMsg, e); 135 } 136 } 137 138 if (null == clientProtocolProvider || null == client) { 139 throw initEx; 140 } 141 } 142 143 ClientProtocol getClient() { 144 return client; 145 } 146 147 Configuration getConf() { 148 return conf; 149 } 150 151 /** 152 * Close the <code>Cluster</code>. 153 * @throws IOException 154 */ 155 public synchronized void close() throws IOException { 156 clientProtocolProvider.close(client); 157 } 158 159 private Job[] getJobs(JobStatus[] stats) throws IOException { 160 List<Job> jobs = new ArrayList<Job>(); 161 for (JobStatus stat : stats) { 162 jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile()))); 163 } 164 return jobs.toArray(new Job[0]); 165 } 166 167 /** 168 * Get the file system where job-specific files are stored 169 * 170 * @return object of FileSystem 171 * @throws IOException 172 * @throws InterruptedException 173 */ 174 public synchronized FileSystem getFileSystem() 175 throws IOException, InterruptedException { 176 if (this.fs == null) { 177 try { 178 this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 179 public FileSystem run() throws IOException, InterruptedException { 180 final Path sysDir = new Path(client.getSystemDir()); 181 return sysDir.getFileSystem(getConf()); 182 } 183 }); 184 } catch (InterruptedException e) { 185 throw new RuntimeException(e); 186 } 187 } 188 return fs; 189 } 190 191 /** 192 * Get job corresponding to jobid. 193 * 194 * @param jobId 195 * @return object of {@link Job} 196 * @throws IOException 197 * @throws InterruptedException 198 */ 199 public Job getJob(JobID jobId) throws IOException, InterruptedException { 200 JobStatus status = client.getJobStatus(jobId); 201 if (status != null) { 202 final JobConf conf = new JobConf(); 203 final Path jobPath = new Path(client.getFilesystemName(), 204 status.getJobFile()); 205 final FileSystem fs = FileSystem.get(jobPath.toUri(), getConf()); 206 try { 207 conf.addResource(fs.open(jobPath), jobPath.toString()); 208 } catch (FileNotFoundException fnf) { 209 if (LOG.isWarnEnabled()) { 210 LOG.warn("Job conf missing on cluster", fnf); 211 } 212 } 213 return Job.getInstance(this, status, conf); 214 } 215 return null; 216 } 217 218 /** 219 * Get all the queues in cluster. 220 * 221 * @return array of {@link QueueInfo} 222 * @throws IOException 223 * @throws InterruptedException 224 */ 225 public QueueInfo[] getQueues() throws IOException, InterruptedException { 226 return client.getQueues(); 227 } 228 229 /** 230 * Get queue information for the specified name. 231 * 232 * @param name queuename 233 * @return object of {@link QueueInfo} 234 * @throws IOException 235 * @throws InterruptedException 236 */ 237 public QueueInfo getQueue(String name) 238 throws IOException, InterruptedException { 239 return client.getQueue(name); 240 } 241 242 /** 243 * Get log parameters for the specified jobID or taskAttemptID 244 * @param jobID the job id. 245 * @param taskAttemptID the task attempt id. Optional. 246 * @return the LogParams 247 * @throws IOException 248 * @throws InterruptedException 249 */ 250 public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID) 251 throws IOException, InterruptedException { 252 return client.getLogFileParams(jobID, taskAttemptID); 253 } 254 255 /** 256 * Get current cluster status. 257 * 258 * @return object of {@link ClusterMetrics} 259 * @throws IOException 260 * @throws InterruptedException 261 */ 262 public ClusterMetrics getClusterStatus() throws IOException, InterruptedException { 263 return client.getClusterMetrics(); 264 } 265 266 /** 267 * Get all active trackers in the cluster. 268 * 269 * @return array of {@link TaskTrackerInfo} 270 * @throws IOException 271 * @throws InterruptedException 272 */ 273 public TaskTrackerInfo[] getActiveTaskTrackers() 274 throws IOException, InterruptedException { 275 return client.getActiveTrackers(); 276 } 277 278 /** 279 * Get blacklisted trackers. 280 * 281 * @return array of {@link TaskTrackerInfo} 282 * @throws IOException 283 * @throws InterruptedException 284 */ 285 public TaskTrackerInfo[] getBlackListedTaskTrackers() 286 throws IOException, InterruptedException { 287 return client.getBlacklistedTrackers(); 288 } 289 290 /** 291 * Get all the jobs in cluster. 292 * 293 * @return array of {@link Job} 294 * @throws IOException 295 * @throws InterruptedException 296 * @deprecated Use {@link #getAllJobStatuses()} instead. 297 */ 298 @Deprecated 299 public Job[] getAllJobs() throws IOException, InterruptedException { 300 return getJobs(client.getAllJobs()); 301 } 302 303 /** 304 * Get job status for all jobs in the cluster. 305 * @return job status for all jobs in cluster 306 * @throws IOException 307 * @throws InterruptedException 308 */ 309 public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException { 310 return client.getAllJobs(); 311 } 312 313 /** 314 * Grab the jobtracker system directory path where 315 * job-specific files will be placed. 316 * 317 * @return the system directory where job-specific files are to be placed. 318 */ 319 public Path getSystemDir() throws IOException, InterruptedException { 320 if (sysDir == null) { 321 sysDir = new Path(client.getSystemDir()); 322 } 323 return sysDir; 324 } 325 326 /** 327 * Grab the jobtracker's view of the staging directory path where 328 * job-specific files will be placed. 329 * 330 * @return the staging directory where job-specific files are to be placed. 331 */ 332 public Path getStagingAreaDir() throws IOException, InterruptedException { 333 if (stagingAreaDir == null) { 334 stagingAreaDir = new Path(client.getStagingAreaDir()); 335 } 336 return stagingAreaDir; 337 } 338 339 /** 340 * Get the job history file path for a given job id. The job history file at 341 * this path may or may not be existing depending on the job completion state. 342 * The file is present only for the completed jobs. 343 * @param jobId the JobID of the job submitted by the current user. 344 * @return the file path of the job history file 345 * @throws IOException 346 * @throws InterruptedException 347 */ 348 public String getJobHistoryUrl(JobID jobId) throws IOException, 349 InterruptedException { 350 if (jobHistoryDir == null) { 351 jobHistoryDir = new Path(client.getJobHistoryDir()); 352 } 353 return new Path(jobHistoryDir, jobId.toString() + "_" 354 + ugi.getShortUserName()).toString(); 355 } 356 357 /** 358 * Gets the Queue ACLs for current user 359 * @return array of QueueAclsInfo object for current user. 360 * @throws IOException 361 */ 362 public QueueAclsInfo[] getQueueAclsForCurrentUser() 363 throws IOException, InterruptedException { 364 return client.getQueueAclsForCurrentUser(); 365 } 366 367 /** 368 * Gets the root level queues. 369 * @return array of JobQueueInfo object. 370 * @throws IOException 371 */ 372 public QueueInfo[] getRootQueues() throws IOException, InterruptedException { 373 return client.getRootQueues(); 374 } 375 376 /** 377 * Returns immediate children of queueName. 378 * @param queueName 379 * @return array of JobQueueInfo which are children of queueName 380 * @throws IOException 381 */ 382 public QueueInfo[] getChildQueues(String queueName) 383 throws IOException, InterruptedException { 384 return client.getChildQueues(queueName); 385 } 386 387 /** 388 * Get the JobTracker's status. 389 * 390 * @return {@link JobTrackerStatus} of the JobTracker 391 * @throws IOException 392 * @throws InterruptedException 393 */ 394 public JobTrackerStatus getJobTrackerStatus() throws IOException, 395 InterruptedException { 396 return client.getJobTrackerStatus(); 397 } 398 399 /** 400 * Get the tasktracker expiry interval for the cluster 401 * @return the expiry interval in msec 402 */ 403 public long getTaskTrackerExpiryInterval() throws IOException, 404 InterruptedException { 405 return client.getTaskTrackerExpiryInterval(); 406 } 407 408 /** 409 * Get a delegation token for the user from the JobTracker. 410 * @param renewer the user who can renew the token 411 * @return the new token 412 * @throws IOException 413 */ 414 public Token<DelegationTokenIdentifier> 415 getDelegationToken(Text renewer) throws IOException, InterruptedException{ 416 // client has already set the service 417 return client.getDelegationToken(renewer); 418 } 419 420 /** 421 * Renew a delegation token 422 * @param token the token to renew 423 * @return the new expiration time 424 * @throws InvalidToken 425 * @throws IOException 426 * @deprecated Use {@link Token#renew} instead 427 */ 428 public long renewDelegationToken(Token<DelegationTokenIdentifier> token 429 ) throws InvalidToken, IOException, 430 InterruptedException { 431 return token.renew(getConf()); 432 } 433 434 /** 435 * Cancel a delegation token from the JobTracker 436 * @param token the token to cancel 437 * @throws IOException 438 * @deprecated Use {@link Token#cancel} instead 439 */ 440 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token 441 ) throws IOException, 442 InterruptedException { 443 token.cancel(getConf()); 444 } 445 446}