001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.ServiceLoader;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.apache.hadoop.classification.InterfaceAudience;
032import org.apache.hadoop.classification.InterfaceStability;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.io.Text;
037import org.apache.hadoop.mapred.JobConf;
038import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
039import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
040import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
041import org.apache.hadoop.mapreduce.util.ConfigUtil;
042import org.apache.hadoop.mapreduce.v2.LogParams;
043import org.apache.hadoop.security.UserGroupInformation;
044import org.apache.hadoop.security.token.SecretManager.InvalidToken;
045import org.apache.hadoop.security.token.Token;
046
047/**
048 * Provides a way to access information about the map/reduce cluster.
049 */
050@InterfaceAudience.Public
051@InterfaceStability.Evolving
052public class Cluster {
053  
054  @InterfaceStability.Evolving
055  public static enum JobTrackerStatus {INITIALIZING, RUNNING};
056  
057  private ClientProtocolProvider clientProtocolProvider;
058  private ClientProtocol client;
059  private UserGroupInformation ugi;
060  private Configuration conf;
061  private FileSystem fs = null;
062  private Path sysDir = null;
063  private Path stagingAreaDir = null;
064  private Path jobHistoryDir = null;
065  private static final Log LOG = LogFactory.getLog(Cluster.class);
066
067  private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
068      ServiceLoader.load(ClientProtocolProvider.class);
069  private volatile List<ClientProtocolProvider> providerList = null;
070
071  private void initProviderList() {
072    if (providerList == null) {
073      synchronized (frameworkLoader) {
074        if (providerList == null) {
075          List<ClientProtocolProvider> localProviderList =
076              new ArrayList<ClientProtocolProvider>();
077          for (ClientProtocolProvider provider : frameworkLoader) {
078            localProviderList.add(provider);
079          }
080          providerList = localProviderList;
081        }
082      }
083    }
084  }
085
086  static {
087    ConfigUtil.loadResources();
088  }
089  
090  public Cluster(Configuration conf) throws IOException {
091    this(null, conf);
092  }
093
094  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
095      throws IOException {
096    this.conf = conf;
097    this.ugi = UserGroupInformation.getCurrentUser();
098    initialize(jobTrackAddr, conf);
099  }
100  
101  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
102      throws IOException {
103
104    initProviderList();
105    for (ClientProtocolProvider provider : providerList) {
106      LOG.debug("Trying ClientProtocolProvider : "
107          + provider.getClass().getName());
108      ClientProtocol clientProtocol = null;
109      try {
110        if (jobTrackAddr == null) {
111          clientProtocol = provider.create(conf);
112        } else {
113          clientProtocol = provider.create(jobTrackAddr, conf);
114        }
115
116        if (clientProtocol != null) {
117          clientProtocolProvider = provider;
118          client = clientProtocol;
119          LOG.debug("Picked " + provider.getClass().getName()
120              + " as the ClientProtocolProvider");
121          break;
122        } else {
123          LOG.debug("Cannot pick " + provider.getClass().getName()
124              + " as the ClientProtocolProvider - returned null protocol");
125        }
126      } catch (Exception e) {
127        LOG.info("Failed to use " + provider.getClass().getName()
128            + " due to error: ", e);
129      }
130    }
131
132    if (null == clientProtocolProvider || null == client) {
133      throw new IOException(
134          "Cannot initialize Cluster. Please check your configuration for "
135              + MRConfig.FRAMEWORK_NAME
136              + " and the correspond server addresses.");
137    }
138  }
139
140  ClientProtocol getClient() {
141    return client;
142  }
143  
144  Configuration getConf() {
145    return conf;
146  }
147  
148  /**
149   * Close the <code>Cluster</code>.
150   * @throws IOException
151   */
152  public synchronized void close() throws IOException {
153    clientProtocolProvider.close(client);
154  }
155
156  private Job[] getJobs(JobStatus[] stats) throws IOException {
157    List<Job> jobs = new ArrayList<Job>();
158    for (JobStatus stat : stats) {
159      jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
160    }
161    return jobs.toArray(new Job[0]);
162  }
163
164  /**
165   * Get the file system where job-specific files are stored
166   * 
167   * @return object of FileSystem
168   * @throws IOException
169   * @throws InterruptedException
170   */
171  public synchronized FileSystem getFileSystem() 
172      throws IOException, InterruptedException {
173    if (this.fs == null) {
174      try {
175        this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
176          public FileSystem run() throws IOException, InterruptedException {
177            final Path sysDir = new Path(client.getSystemDir());
178            return sysDir.getFileSystem(getConf());
179          }
180        });
181      } catch (InterruptedException e) {
182        throw new RuntimeException(e);
183      }
184    }
185    return fs;
186  }
187
188  /**
189   * Get job corresponding to jobid.
190   * 
191   * @param jobId
192   * @return object of {@link Job}
193   * @throws IOException
194   * @throws InterruptedException
195   */
196  public Job getJob(JobID jobId) throws IOException, InterruptedException {
197    JobStatus status = client.getJobStatus(jobId);
198    if (status != null) {
199      final JobConf conf = new JobConf();
200      final Path jobPath = new Path(client.getFilesystemName(),
201          status.getJobFile());
202      final FileSystem fs = FileSystem.get(jobPath.toUri(), getConf());
203      try {
204        conf.addResource(fs.open(jobPath), jobPath.toString());
205      } catch (FileNotFoundException fnf) {
206        if (LOG.isWarnEnabled()) {
207          LOG.warn("Job conf missing on cluster", fnf);
208        }
209      }
210      return Job.getInstance(this, status, conf);
211    }
212    return null;
213  }
214  
215  /**
216   * Get all the queues in cluster.
217   * 
218   * @return array of {@link QueueInfo}
219   * @throws IOException
220   * @throws InterruptedException
221   */
222  public QueueInfo[] getQueues() throws IOException, InterruptedException {
223    return client.getQueues();
224  }
225  
226  /**
227   * Get queue information for the specified name.
228   * 
229   * @param name queuename
230   * @return object of {@link QueueInfo}
231   * @throws IOException
232   * @throws InterruptedException
233   */
234  public QueueInfo getQueue(String name) 
235      throws IOException, InterruptedException {
236    return client.getQueue(name);
237  }
238
239  /**
240   * Get log parameters for the specified jobID or taskAttemptID
241   * @param jobID the job id.
242   * @param taskAttemptID the task attempt id. Optional.
243   * @return the LogParams
244   * @throws IOException
245   * @throws InterruptedException
246   */
247  public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
248      throws IOException, InterruptedException {
249    return client.getLogFileParams(jobID, taskAttemptID);
250  }
251
252  /**
253   * Get current cluster status.
254   * 
255   * @return object of {@link ClusterMetrics}
256   * @throws IOException
257   * @throws InterruptedException
258   */
259  public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
260    return client.getClusterMetrics();
261  }
262  
263  /**
264   * Get all active trackers in the cluster.
265   * 
266   * @return array of {@link TaskTrackerInfo}
267   * @throws IOException
268   * @throws InterruptedException
269   */
270  public TaskTrackerInfo[] getActiveTaskTrackers() 
271      throws IOException, InterruptedException  {
272    return client.getActiveTrackers();
273  }
274  
275  /**
276   * Get blacklisted trackers.
277   * 
278   * @return array of {@link TaskTrackerInfo}
279   * @throws IOException
280   * @throws InterruptedException
281   */
282  public TaskTrackerInfo[] getBlackListedTaskTrackers() 
283      throws IOException, InterruptedException  {
284    return client.getBlacklistedTrackers();
285  }
286  
287  /**
288   * Get all the jobs in cluster.
289   * 
290   * @return array of {@link Job}
291   * @throws IOException
292   * @throws InterruptedException
293   * @deprecated Use {@link #getAllJobStatuses()} instead.
294   */
295  @Deprecated
296  public Job[] getAllJobs() throws IOException, InterruptedException {
297    return getJobs(client.getAllJobs());
298  }
299
300  /**
301   * Get job status for all jobs in the cluster.
302   * @return job status for all jobs in cluster
303   * @throws IOException
304   * @throws InterruptedException
305   */
306  public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
307    return client.getAllJobs();
308  }
309
310  /**
311   * Grab the jobtracker system directory path where 
312   * job-specific files will  be placed.
313   * 
314   * @return the system directory where job-specific files are to be placed.
315   */
316  public Path getSystemDir() throws IOException, InterruptedException {
317    if (sysDir == null) {
318      sysDir = new Path(client.getSystemDir());
319    }
320    return sysDir;
321  }
322  
323  /**
324   * Grab the jobtracker's view of the staging directory path where 
325   * job-specific files will  be placed.
326   * 
327   * @return the staging directory where job-specific files are to be placed.
328   */
329  public Path getStagingAreaDir() throws IOException, InterruptedException {
330    if (stagingAreaDir == null) {
331      stagingAreaDir = new Path(client.getStagingAreaDir());
332    }
333    return stagingAreaDir;
334  }
335
336  /**
337   * Get the job history file path for a given job id. The job history file at 
338   * this path may or may not be existing depending on the job completion state.
339   * The file is present only for the completed jobs.
340   * @param jobId the JobID of the job submitted by the current user.
341   * @return the file path of the job history file
342   * @throws IOException
343   * @throws InterruptedException
344   */
345  public String getJobHistoryUrl(JobID jobId) throws IOException, 
346    InterruptedException {
347    if (jobHistoryDir == null) {
348      jobHistoryDir = new Path(client.getJobHistoryDir());
349    }
350    return new Path(jobHistoryDir, jobId.toString() + "_"
351                    + ugi.getShortUserName()).toString();
352  }
353
354  /**
355   * Gets the Queue ACLs for current user
356   * @return array of QueueAclsInfo object for current user.
357   * @throws IOException
358   */
359  public QueueAclsInfo[] getQueueAclsForCurrentUser() 
360      throws IOException, InterruptedException  {
361    return client.getQueueAclsForCurrentUser();
362  }
363
364  /**
365   * Gets the root level queues.
366   * @return array of JobQueueInfo object.
367   * @throws IOException
368   */
369  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
370    return client.getRootQueues();
371  }
372  
373  /**
374   * Returns immediate children of queueName.
375   * @param queueName
376   * @return array of JobQueueInfo which are children of queueName
377   * @throws IOException
378   */
379  public QueueInfo[] getChildQueues(String queueName) 
380      throws IOException, InterruptedException {
381    return client.getChildQueues(queueName);
382  }
383  
384  /**
385   * Get the JobTracker's status.
386   * 
387   * @return {@link JobTrackerStatus} of the JobTracker
388   * @throws IOException
389   * @throws InterruptedException
390   */
391  public JobTrackerStatus getJobTrackerStatus() throws IOException,
392      InterruptedException {
393    return client.getJobTrackerStatus();
394  }
395  
396  /**
397   * Get the tasktracker expiry interval for the cluster
398   * @return the expiry interval in msec
399   */
400  public long getTaskTrackerExpiryInterval() throws IOException,
401      InterruptedException {
402    return client.getTaskTrackerExpiryInterval();
403  }
404
405  /**
406   * Get a delegation token for the user from the JobTracker.
407   * @param renewer the user who can renew the token
408   * @return the new token
409   * @throws IOException
410   */
411  public Token<DelegationTokenIdentifier> 
412      getDelegationToken(Text renewer) throws IOException, InterruptedException{
413    // client has already set the service
414    return client.getDelegationToken(renewer);
415  }
416
417  /**
418   * Renew a delegation token
419   * @param token the token to renew
420   * @return the new expiration time
421   * @throws InvalidToken
422   * @throws IOException
423   * @deprecated Use {@link Token#renew} instead
424   */
425  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
426                                   ) throws InvalidToken, IOException,
427                                            InterruptedException {
428    return token.renew(getConf());
429  }
430
431  /**
432   * Cancel a delegation token from the JobTracker
433   * @param token the token to cancel
434   * @throws IOException
435   * @deprecated Use {@link Token#cancel} instead
436   */
437  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
438                                    ) throws IOException,
439                                             InterruptedException {
440    token.cancel(getConf());
441  }
442
443}