001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.ServiceLoader;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.apache.hadoop.classification.InterfaceAudience;
032import org.apache.hadoop.classification.InterfaceStability;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.io.Text;
037import org.apache.hadoop.mapred.JobConf;
038import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
039import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
040import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
041import org.apache.hadoop.mapreduce.util.ConfigUtil;
042import org.apache.hadoop.mapreduce.v2.LogParams;
043import org.apache.hadoop.security.UserGroupInformation;
044import org.apache.hadoop.security.token.SecretManager.InvalidToken;
045import org.apache.hadoop.security.token.Token;
046
047/**
048 * Provides a way to access information about the map/reduce cluster.
049 */
050@InterfaceAudience.Public
051@InterfaceStability.Evolving
052public class Cluster {
053  
054  @InterfaceStability.Evolving
055  public static enum JobTrackerStatus {INITIALIZING, RUNNING};
056  
057  private ClientProtocolProvider clientProtocolProvider;
058  private ClientProtocol client;
059  private UserGroupInformation ugi;
060  private Configuration conf;
061  private FileSystem fs = null;
062  private Path sysDir = null;
063  private Path stagingAreaDir = null;
064  private Path jobHistoryDir = null;
065  private static final Log LOG = LogFactory.getLog(Cluster.class);
066
067  private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
068      ServiceLoader.load(ClientProtocolProvider.class);
069  
070  static {
071    ConfigUtil.loadResources();
072  }
073  
074  public Cluster(Configuration conf) throws IOException {
075    this(null, conf);
076  }
077
078  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
079      throws IOException {
080    this.conf = conf;
081    this.ugi = UserGroupInformation.getCurrentUser();
082    initialize(jobTrackAddr, conf);
083  }
084  
085  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
086      throws IOException {
087
088    synchronized (frameworkLoader) {
089      for (ClientProtocolProvider provider : frameworkLoader) {
090        LOG.debug("Trying ClientProtocolProvider : "
091            + provider.getClass().getName());
092        ClientProtocol clientProtocol = null; 
093        try {
094          if (jobTrackAddr == null) {
095            clientProtocol = provider.create(conf);
096          } else {
097            clientProtocol = provider.create(jobTrackAddr, conf);
098          }
099
100          if (clientProtocol != null) {
101            clientProtocolProvider = provider;
102            client = clientProtocol;
103            LOG.debug("Picked " + provider.getClass().getName()
104                + " as the ClientProtocolProvider");
105            break;
106          }
107          else {
108            LOG.debug("Cannot pick " + provider.getClass().getName()
109                + " as the ClientProtocolProvider - returned null protocol");
110          }
111        } 
112        catch (Exception e) {
113          LOG.info("Failed to use " + provider.getClass().getName()
114              + " due to error: " + e.getMessage());
115        }
116      }
117    }
118
119    if (null == clientProtocolProvider || null == client) {
120      throw new IOException(
121          "Cannot initialize Cluster. Please check your configuration for "
122              + MRConfig.FRAMEWORK_NAME
123              + " and the correspond server addresses.");
124    }
125  }
126
127  ClientProtocol getClient() {
128    return client;
129  }
130  
131  Configuration getConf() {
132    return conf;
133  }
134  
135  /**
136   * Close the <code>Cluster</code>.
137   * @throws IOException
138   */
139  public synchronized void close() throws IOException {
140    clientProtocolProvider.close(client);
141  }
142
143  private Job[] getJobs(JobStatus[] stats) throws IOException {
144    List<Job> jobs = new ArrayList<Job>();
145    for (JobStatus stat : stats) {
146      jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
147    }
148    return jobs.toArray(new Job[0]);
149  }
150
151  /**
152   * Get the file system where job-specific files are stored
153   * 
154   * @return object of FileSystem
155   * @throws IOException
156   * @throws InterruptedException
157   */
158  public synchronized FileSystem getFileSystem() 
159      throws IOException, InterruptedException {
160    if (this.fs == null) {
161      try {
162        this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
163          public FileSystem run() throws IOException, InterruptedException {
164            final Path sysDir = new Path(client.getSystemDir());
165            return sysDir.getFileSystem(getConf());
166          }
167        });
168      } catch (InterruptedException e) {
169        throw new RuntimeException(e);
170      }
171    }
172    return fs;
173  }
174
175  /**
176   * Get job corresponding to jobid.
177   * 
178   * @param jobId
179   * @return object of {@link Job}
180   * @throws IOException
181   * @throws InterruptedException
182   */
183  public Job getJob(JobID jobId) throws IOException, InterruptedException {
184    JobStatus status = client.getJobStatus(jobId);
185    if (status != null) {
186      final JobConf conf = new JobConf();
187      final Path jobPath = new Path(client.getFilesystemName(),
188          status.getJobFile());
189      final FileSystem fs = FileSystem.get(jobPath.toUri(), getConf());
190      try {
191        conf.addResource(fs.open(jobPath), jobPath.toString());
192      } catch (FileNotFoundException fnf) {
193        if (LOG.isWarnEnabled()) {
194          LOG.warn("Job conf missing on cluster", fnf);
195        }
196      }
197      return Job.getInstance(this, status, conf);
198    }
199    return null;
200  }
201  
202  /**
203   * Get all the queues in cluster.
204   * 
205   * @return array of {@link QueueInfo}
206   * @throws IOException
207   * @throws InterruptedException
208   */
209  public QueueInfo[] getQueues() throws IOException, InterruptedException {
210    return client.getQueues();
211  }
212  
213  /**
214   * Get queue information for the specified name.
215   * 
216   * @param name queuename
217   * @return object of {@link QueueInfo}
218   * @throws IOException
219   * @throws InterruptedException
220   */
221  public QueueInfo getQueue(String name) 
222      throws IOException, InterruptedException {
223    return client.getQueue(name);
224  }
225
226  /**
227   * Get log parameters for the specified jobID or taskAttemptID
228   * @param jobID the job id.
229   * @param taskAttemptID the task attempt id. Optional.
230   * @return the LogParams
231   * @throws IOException
232   * @throws InterruptedException
233   */
234  public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
235      throws IOException, InterruptedException {
236    return client.getLogFileParams(jobID, taskAttemptID);
237  }
238
239  /**
240   * Get current cluster status.
241   * 
242   * @return object of {@link ClusterMetrics}
243   * @throws IOException
244   * @throws InterruptedException
245   */
246  public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
247    return client.getClusterMetrics();
248  }
249  
250  /**
251   * Get all active trackers in the cluster.
252   * 
253   * @return array of {@link TaskTrackerInfo}
254   * @throws IOException
255   * @throws InterruptedException
256   */
257  public TaskTrackerInfo[] getActiveTaskTrackers() 
258      throws IOException, InterruptedException  {
259    return client.getActiveTrackers();
260  }
261  
262  /**
263   * Get blacklisted trackers.
264   * 
265   * @return array of {@link TaskTrackerInfo}
266   * @throws IOException
267   * @throws InterruptedException
268   */
269  public TaskTrackerInfo[] getBlackListedTaskTrackers() 
270      throws IOException, InterruptedException  {
271    return client.getBlacklistedTrackers();
272  }
273  
274  /**
275   * Get all the jobs in cluster.
276   * 
277   * @return array of {@link Job}
278   * @throws IOException
279   * @throws InterruptedException
280   * @deprecated Use {@link #getAllJobStatuses()} instead.
281   */
282  @Deprecated
283  public Job[] getAllJobs() throws IOException, InterruptedException {
284    return getJobs(client.getAllJobs());
285  }
286
287  /**
288   * Get job status for all jobs in the cluster.
289   * @return job status for all jobs in cluster
290   * @throws IOException
291   * @throws InterruptedException
292   */
293  public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
294    return client.getAllJobs();
295  }
296
297  /**
298   * Grab the jobtracker system directory path where 
299   * job-specific files will  be placed.
300   * 
301   * @return the system directory where job-specific files are to be placed.
302   */
303  public Path getSystemDir() throws IOException, InterruptedException {
304    if (sysDir == null) {
305      sysDir = new Path(client.getSystemDir());
306    }
307    return sysDir;
308  }
309  
310  /**
311   * Grab the jobtracker's view of the staging directory path where 
312   * job-specific files will  be placed.
313   * 
314   * @return the staging directory where job-specific files are to be placed.
315   */
316  public Path getStagingAreaDir() throws IOException, InterruptedException {
317    if (stagingAreaDir == null) {
318      stagingAreaDir = new Path(client.getStagingAreaDir());
319    }
320    return stagingAreaDir;
321  }
322
323  /**
324   * Get the job history file path for a given job id. The job history file at 
325   * this path may or may not be existing depending on the job completion state.
326   * The file is present only for the completed jobs.
327   * @param jobId the JobID of the job submitted by the current user.
328   * @return the file path of the job history file
329   * @throws IOException
330   * @throws InterruptedException
331   */
332  public String getJobHistoryUrl(JobID jobId) throws IOException, 
333    InterruptedException {
334    if (jobHistoryDir == null) {
335      jobHistoryDir = new Path(client.getJobHistoryDir());
336    }
337    return new Path(jobHistoryDir, jobId.toString() + "_"
338                    + ugi.getShortUserName()).toString();
339  }
340
341  /**
342   * Gets the Queue ACLs for current user
343   * @return array of QueueAclsInfo object for current user.
344   * @throws IOException
345   */
346  public QueueAclsInfo[] getQueueAclsForCurrentUser() 
347      throws IOException, InterruptedException  {
348    return client.getQueueAclsForCurrentUser();
349  }
350
351  /**
352   * Gets the root level queues.
353   * @return array of JobQueueInfo object.
354   * @throws IOException
355   */
356  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
357    return client.getRootQueues();
358  }
359  
360  /**
361   * Returns immediate children of queueName.
362   * @param queueName
363   * @return array of JobQueueInfo which are children of queueName
364   * @throws IOException
365   */
366  public QueueInfo[] getChildQueues(String queueName) 
367      throws IOException, InterruptedException {
368    return client.getChildQueues(queueName);
369  }
370  
371  /**
372   * Get the JobTracker's status.
373   * 
374   * @return {@link JobTrackerStatus} of the JobTracker
375   * @throws IOException
376   * @throws InterruptedException
377   */
378  public JobTrackerStatus getJobTrackerStatus() throws IOException,
379      InterruptedException {
380    return client.getJobTrackerStatus();
381  }
382  
383  /**
384   * Get the tasktracker expiry interval for the cluster
385   * @return the expiry interval in msec
386   */
387  public long getTaskTrackerExpiryInterval() throws IOException,
388      InterruptedException {
389    return client.getTaskTrackerExpiryInterval();
390  }
391
392  /**
393   * Get a delegation token for the user from the JobTracker.
394   * @param renewer the user who can renew the token
395   * @return the new token
396   * @throws IOException
397   */
398  public Token<DelegationTokenIdentifier> 
399      getDelegationToken(Text renewer) throws IOException, InterruptedException{
400    // client has already set the service
401    return client.getDelegationToken(renewer);
402  }
403
404  /**
405   * Renew a delegation token
406   * @param token the token to renew
407   * @return the new expiration time
408   * @throws InvalidToken
409   * @throws IOException
410   * @deprecated Use {@link Token#renew} instead
411   */
412  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
413                                   ) throws InvalidToken, IOException,
414                                            InterruptedException {
415    return token.renew(getConf());
416  }
417
418  /**
419   * Cancel a delegation token from the JobTracker
420   * @param token the token to cancel
421   * @throws IOException
422   * @deprecated Use {@link Token#cancel} instead
423   */
424  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
425                                    ) throws IOException,
426                                             InterruptedException {
427    token.cancel(getConf());
428  }
429
430}