001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.InetSocketAddress; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.ServiceLoader; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031import org.apache.hadoop.classification.InterfaceAudience; 032import org.apache.hadoop.classification.InterfaceStability; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036import org.apache.hadoop.io.Text; 037import org.apache.hadoop.mapred.JobConf; 038import org.apache.hadoop.mapreduce.protocol.ClientProtocol; 039import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; 040import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 041import org.apache.hadoop.mapreduce.util.ConfigUtil; 042import org.apache.hadoop.mapreduce.v2.LogParams; 043import org.apache.hadoop.security.UserGroupInformation; 044import org.apache.hadoop.security.token.SecretManager.InvalidToken; 045import org.apache.hadoop.security.token.Token; 046 047/** 048 * Provides a way to access information about the map/reduce cluster. 049 */ 050@InterfaceAudience.Public 051@InterfaceStability.Evolving 052public class Cluster { 053 054 @InterfaceStability.Evolving 055 public static enum JobTrackerStatus {INITIALIZING, RUNNING}; 056 057 private ClientProtocolProvider clientProtocolProvider; 058 private ClientProtocol client; 059 private UserGroupInformation ugi; 060 private Configuration conf; 061 private FileSystem fs = null; 062 private Path sysDir = null; 063 private Path stagingAreaDir = null; 064 private Path jobHistoryDir = null; 065 private static final Log LOG = LogFactory.getLog(Cluster.class); 066 067 private static ServiceLoader<ClientProtocolProvider> frameworkLoader = 068 ServiceLoader.load(ClientProtocolProvider.class); 069 070 static { 071 ConfigUtil.loadResources(); 072 } 073 074 public Cluster(Configuration conf) throws IOException { 075 this(null, conf); 076 } 077 078 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 079 throws IOException { 080 this.conf = conf; 081 this.ugi = UserGroupInformation.getCurrentUser(); 082 initialize(jobTrackAddr, conf); 083 } 084 085 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) 086 throws IOException { 087 088 synchronized (frameworkLoader) { 089 for (ClientProtocolProvider provider : frameworkLoader) { 090 LOG.debug("Trying ClientProtocolProvider : " 091 + provider.getClass().getName()); 092 ClientProtocol clientProtocol = null; 093 try { 094 if (jobTrackAddr == null) { 095 clientProtocol = provider.create(conf); 096 } else { 097 clientProtocol = provider.create(jobTrackAddr, conf); 098 } 099 100 if (clientProtocol != null) { 101 clientProtocolProvider = provider; 102 client = clientProtocol; 103 LOG.debug("Picked " + provider.getClass().getName() 104 + " as the ClientProtocolProvider"); 105 break; 106 } 107 else { 108 LOG.debug("Cannot pick " + provider.getClass().getName() 109 + " as the ClientProtocolProvider - returned null protocol"); 110 } 111 } 112 catch (Exception e) { 113 LOG.info("Failed to use " + provider.getClass().getName() 114 + " due to error: ", e); 115 } 116 } 117 } 118 119 if (null == clientProtocolProvider || null == client) { 120 throw new IOException( 121 "Cannot initialize Cluster. Please check your configuration for " 122 + MRConfig.FRAMEWORK_NAME 123 + " and the correspond server addresses."); 124 } 125 } 126 127 ClientProtocol getClient() { 128 return client; 129 } 130 131 Configuration getConf() { 132 return conf; 133 } 134 135 /** 136 * Close the <code>Cluster</code>. 137 * @throws IOException 138 */ 139 public synchronized void close() throws IOException { 140 clientProtocolProvider.close(client); 141 } 142 143 private Job[] getJobs(JobStatus[] stats) throws IOException { 144 List<Job> jobs = new ArrayList<Job>(); 145 for (JobStatus stat : stats) { 146 jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile()))); 147 } 148 return jobs.toArray(new Job[0]); 149 } 150 151 /** 152 * Get the file system where job-specific files are stored 153 * 154 * @return object of FileSystem 155 * @throws IOException 156 * @throws InterruptedException 157 */ 158 public synchronized FileSystem getFileSystem() 159 throws IOException, InterruptedException { 160 if (this.fs == null) { 161 try { 162 this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { 163 public FileSystem run() throws IOException, InterruptedException { 164 final Path sysDir = new Path(client.getSystemDir()); 165 return sysDir.getFileSystem(getConf()); 166 } 167 }); 168 } catch (InterruptedException e) { 169 throw new RuntimeException(e); 170 } 171 } 172 return fs; 173 } 174 175 /** 176 * Get job corresponding to jobid. 177 * 178 * @param jobId 179 * @return object of {@link Job} 180 * @throws IOException 181 * @throws InterruptedException 182 */ 183 public Job getJob(JobID jobId) throws IOException, InterruptedException { 184 JobStatus status = client.getJobStatus(jobId); 185 if (status != null) { 186 final JobConf conf = new JobConf(); 187 final Path jobPath = new Path(client.getFilesystemName(), 188 status.getJobFile()); 189 final FileSystem fs = FileSystem.get(jobPath.toUri(), getConf()); 190 try { 191 conf.addResource(fs.open(jobPath), jobPath.toString()); 192 } catch (FileNotFoundException fnf) { 193 if (LOG.isWarnEnabled()) { 194 LOG.warn("Job conf missing on cluster", fnf); 195 } 196 } 197 return Job.getInstance(this, status, conf); 198 } 199 return null; 200 } 201 202 /** 203 * Get all the queues in cluster. 204 * 205 * @return array of {@link QueueInfo} 206 * @throws IOException 207 * @throws InterruptedException 208 */ 209 public QueueInfo[] getQueues() throws IOException, InterruptedException { 210 return client.getQueues(); 211 } 212 213 /** 214 * Get queue information for the specified name. 215 * 216 * @param name queuename 217 * @return object of {@link QueueInfo} 218 * @throws IOException 219 * @throws InterruptedException 220 */ 221 public QueueInfo getQueue(String name) 222 throws IOException, InterruptedException { 223 return client.getQueue(name); 224 } 225 226 /** 227 * Get log parameters for the specified jobID or taskAttemptID 228 * @param jobID the job id. 229 * @param taskAttemptID the task attempt id. Optional. 230 * @return the LogParams 231 * @throws IOException 232 * @throws InterruptedException 233 */ 234 public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID) 235 throws IOException, InterruptedException { 236 return client.getLogFileParams(jobID, taskAttemptID); 237 } 238 239 /** 240 * Get current cluster status. 241 * 242 * @return object of {@link ClusterMetrics} 243 * @throws IOException 244 * @throws InterruptedException 245 */ 246 public ClusterMetrics getClusterStatus() throws IOException, InterruptedException { 247 return client.getClusterMetrics(); 248 } 249 250 /** 251 * Get all active trackers in the cluster. 252 * 253 * @return array of {@link TaskTrackerInfo} 254 * @throws IOException 255 * @throws InterruptedException 256 */ 257 public TaskTrackerInfo[] getActiveTaskTrackers() 258 throws IOException, InterruptedException { 259 return client.getActiveTrackers(); 260 } 261 262 /** 263 * Get blacklisted trackers. 264 * 265 * @return array of {@link TaskTrackerInfo} 266 * @throws IOException 267 * @throws InterruptedException 268 */ 269 public TaskTrackerInfo[] getBlackListedTaskTrackers() 270 throws IOException, InterruptedException { 271 return client.getBlacklistedTrackers(); 272 } 273 274 /** 275 * Get all the jobs in cluster. 276 * 277 * @return array of {@link Job} 278 * @throws IOException 279 * @throws InterruptedException 280 * @deprecated Use {@link #getAllJobStatuses()} instead. 281 */ 282 @Deprecated 283 public Job[] getAllJobs() throws IOException, InterruptedException { 284 return getJobs(client.getAllJobs()); 285 } 286 287 /** 288 * Get job status for all jobs in the cluster. 289 * @return job status for all jobs in cluster 290 * @throws IOException 291 * @throws InterruptedException 292 */ 293 public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException { 294 return client.getAllJobs(); 295 } 296 297 /** 298 * Grab the jobtracker system directory path where 299 * job-specific files will be placed. 300 * 301 * @return the system directory where job-specific files are to be placed. 302 */ 303 public Path getSystemDir() throws IOException, InterruptedException { 304 if (sysDir == null) { 305 sysDir = new Path(client.getSystemDir()); 306 } 307 return sysDir; 308 } 309 310 /** 311 * Grab the jobtracker's view of the staging directory path where 312 * job-specific files will be placed. 313 * 314 * @return the staging directory where job-specific files are to be placed. 315 */ 316 public Path getStagingAreaDir() throws IOException, InterruptedException { 317 if (stagingAreaDir == null) { 318 stagingAreaDir = new Path(client.getStagingAreaDir()); 319 } 320 return stagingAreaDir; 321 } 322 323 /** 324 * Get the job history file path for a given job id. The job history file at 325 * this path may or may not be existing depending on the job completion state. 326 * The file is present only for the completed jobs. 327 * @param jobId the JobID of the job submitted by the current user. 328 * @return the file path of the job history file 329 * @throws IOException 330 * @throws InterruptedException 331 */ 332 public String getJobHistoryUrl(JobID jobId) throws IOException, 333 InterruptedException { 334 if (jobHistoryDir == null) { 335 jobHistoryDir = new Path(client.getJobHistoryDir()); 336 } 337 return new Path(jobHistoryDir, jobId.toString() + "_" 338 + ugi.getShortUserName()).toString(); 339 } 340 341 /** 342 * Gets the Queue ACLs for current user 343 * @return array of QueueAclsInfo object for current user. 344 * @throws IOException 345 */ 346 public QueueAclsInfo[] getQueueAclsForCurrentUser() 347 throws IOException, InterruptedException { 348 return client.getQueueAclsForCurrentUser(); 349 } 350 351 /** 352 * Gets the root level queues. 353 * @return array of JobQueueInfo object. 354 * @throws IOException 355 */ 356 public QueueInfo[] getRootQueues() throws IOException, InterruptedException { 357 return client.getRootQueues(); 358 } 359 360 /** 361 * Returns immediate children of queueName. 362 * @param queueName 363 * @return array of JobQueueInfo which are children of queueName 364 * @throws IOException 365 */ 366 public QueueInfo[] getChildQueues(String queueName) 367 throws IOException, InterruptedException { 368 return client.getChildQueues(queueName); 369 } 370 371 /** 372 * Get the JobTracker's status. 373 * 374 * @return {@link JobTrackerStatus} of the JobTracker 375 * @throws IOException 376 * @throws InterruptedException 377 */ 378 public JobTrackerStatus getJobTrackerStatus() throws IOException, 379 InterruptedException { 380 return client.getJobTrackerStatus(); 381 } 382 383 /** 384 * Get the tasktracker expiry interval for the cluster 385 * @return the expiry interval in msec 386 */ 387 public long getTaskTrackerExpiryInterval() throws IOException, 388 InterruptedException { 389 return client.getTaskTrackerExpiryInterval(); 390 } 391 392 /** 393 * Get a delegation token for the user from the JobTracker. 394 * @param renewer the user who can renew the token 395 * @return the new token 396 * @throws IOException 397 */ 398 public Token<DelegationTokenIdentifier> 399 getDelegationToken(Text renewer) throws IOException, InterruptedException{ 400 // client has already set the service 401 return client.getDelegationToken(renewer); 402 } 403 404 /** 405 * Renew a delegation token 406 * @param token the token to renew 407 * @return the new expiration time 408 * @throws InvalidToken 409 * @throws IOException 410 * @deprecated Use {@link Token#renew} instead 411 */ 412 public long renewDelegationToken(Token<DelegationTokenIdentifier> token 413 ) throws InvalidToken, IOException, 414 InterruptedException { 415 return token.renew(getConf()); 416 } 417 418 /** 419 * Cancel a delegation token from the JobTracker 420 * @param token the token to cancel 421 * @throws IOException 422 * @deprecated Use {@link Token#cancel} instead 423 */ 424 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token 425 ) throws IOException, 426 InterruptedException { 427 token.cancel(getConf()); 428 } 429 430}