001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.ServiceLoader;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031import org.apache.hadoop.classification.InterfaceAudience;
032import org.apache.hadoop.classification.InterfaceStability;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.io.Text;
037import org.apache.hadoop.mapred.JobConf;
038import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
039import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
040import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
041import org.apache.hadoop.mapreduce.util.ConfigUtil;
042import org.apache.hadoop.mapreduce.v2.LogParams;
043import org.apache.hadoop.security.UserGroupInformation;
044import org.apache.hadoop.security.token.SecretManager.InvalidToken;
045import org.apache.hadoop.security.token.Token;
046
047/**
048 * Provides a way to access information about the map/reduce cluster.
049 */
050@InterfaceAudience.Public
051@InterfaceStability.Evolving
052public class Cluster {
053  
054  @InterfaceStability.Evolving
055  public static enum JobTrackerStatus {INITIALIZING, RUNNING};
056  
057  private ClientProtocolProvider clientProtocolProvider;
058  private ClientProtocol client;
059  private UserGroupInformation ugi;
060  private Configuration conf;
061  private FileSystem fs = null;
062  private Path sysDir = null;
063  private Path stagingAreaDir = null;
064  private Path jobHistoryDir = null;
065  private static final Log LOG = LogFactory.getLog(Cluster.class);
066
067  private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
068      ServiceLoader.load(ClientProtocolProvider.class);
069  private volatile List<ClientProtocolProvider> providerList = null;
070
071  private void initProviderList() {
072    if (providerList == null) {
073      synchronized (frameworkLoader) {
074        if (providerList == null) {
075          List<ClientProtocolProvider> localProviderList =
076              new ArrayList<ClientProtocolProvider>();
077          for (ClientProtocolProvider provider : frameworkLoader) {
078            localProviderList.add(provider);
079          }
080          providerList = localProviderList;
081        }
082      }
083    }
084  }
085
086  static {
087    ConfigUtil.loadResources();
088  }
089  
090  public Cluster(Configuration conf) throws IOException {
091    this(null, conf);
092  }
093
094  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
095      throws IOException {
096    this.conf = conf;
097    this.ugi = UserGroupInformation.getCurrentUser();
098    initialize(jobTrackAddr, conf);
099  }
100  
101  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
102      throws IOException {
103
104    initProviderList();
105    final IOException initEx = new IOException(
106        "Cannot initialize Cluster. Please check your configuration for "
107            + MRConfig.FRAMEWORK_NAME
108            + " and the correspond server addresses.");
109    for (ClientProtocolProvider provider : providerList) {
110      LOG.debug("Trying ClientProtocolProvider : "
111          + provider.getClass().getName());
112      ClientProtocol clientProtocol = null;
113      try {
114        if (jobTrackAddr == null) {
115          clientProtocol = provider.create(conf);
116        } else {
117          clientProtocol = provider.create(jobTrackAddr, conf);
118        }
119
120        if (clientProtocol != null) {
121          clientProtocolProvider = provider;
122          client = clientProtocol;
123          LOG.debug("Picked " + provider.getClass().getName()
124              + " as the ClientProtocolProvider");
125          break;
126        } else {
127          LOG.debug("Cannot pick " + provider.getClass().getName()
128              + " as the ClientProtocolProvider - returned null protocol");
129        }
130      } catch (Exception e) {
131        final String errMsg = "Failed to use " + provider.getClass().getName()
132            + " due to error: ";
133        initEx.addSuppressed(new IOException(errMsg, e));
134        LOG.info(errMsg, e);
135      }
136    }
137
138    if (null == clientProtocolProvider || null == client) {
139      throw initEx;
140    }
141  }
142
143  ClientProtocol getClient() {
144    return client;
145  }
146  
147  Configuration getConf() {
148    return conf;
149  }
150  
151  /**
152   * Close the <code>Cluster</code>.
153   * @throws IOException
154   */
155  public synchronized void close() throws IOException {
156    clientProtocolProvider.close(client);
157  }
158
159  private Job[] getJobs(JobStatus[] stats) throws IOException {
160    List<Job> jobs = new ArrayList<Job>();
161    for (JobStatus stat : stats) {
162      jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
163    }
164    return jobs.toArray(new Job[0]);
165  }
166
167  /**
168   * Get the file system where job-specific files are stored
169   * 
170   * @return object of FileSystem
171   * @throws IOException
172   * @throws InterruptedException
173   */
174  public synchronized FileSystem getFileSystem() 
175      throws IOException, InterruptedException {
176    if (this.fs == null) {
177      try {
178        this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
179          public FileSystem run() throws IOException, InterruptedException {
180            final Path sysDir = new Path(client.getSystemDir());
181            return sysDir.getFileSystem(getConf());
182          }
183        });
184      } catch (InterruptedException e) {
185        throw new RuntimeException(e);
186      }
187    }
188    return fs;
189  }
190
191  /**
192   * Get job corresponding to jobid.
193   * 
194   * @param jobId
195   * @return object of {@link Job}
196   * @throws IOException
197   * @throws InterruptedException
198   */
199  public Job getJob(JobID jobId) throws IOException, InterruptedException {
200    JobStatus status = client.getJobStatus(jobId);
201    if (status != null) {
202      final JobConf conf = new JobConf();
203      final Path jobPath = new Path(client.getFilesystemName(),
204          status.getJobFile());
205      final FileSystem fs = FileSystem.get(jobPath.toUri(), getConf());
206      try {
207        conf.addResource(fs.open(jobPath), jobPath.toString());
208      } catch (FileNotFoundException fnf) {
209        if (LOG.isWarnEnabled()) {
210          LOG.warn("Job conf missing on cluster", fnf);
211        }
212      }
213      return Job.getInstance(this, status, conf);
214    }
215    return null;
216  }
217  
218  /**
219   * Get all the queues in cluster.
220   * 
221   * @return array of {@link QueueInfo}
222   * @throws IOException
223   * @throws InterruptedException
224   */
225  public QueueInfo[] getQueues() throws IOException, InterruptedException {
226    return client.getQueues();
227  }
228  
229  /**
230   * Get queue information for the specified name.
231   * 
232   * @param name queuename
233   * @return object of {@link QueueInfo}
234   * @throws IOException
235   * @throws InterruptedException
236   */
237  public QueueInfo getQueue(String name) 
238      throws IOException, InterruptedException {
239    return client.getQueue(name);
240  }
241
242  /**
243   * Get log parameters for the specified jobID or taskAttemptID
244   * @param jobID the job id.
245   * @param taskAttemptID the task attempt id. Optional.
246   * @return the LogParams
247   * @throws IOException
248   * @throws InterruptedException
249   */
250  public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
251      throws IOException, InterruptedException {
252    return client.getLogFileParams(jobID, taskAttemptID);
253  }
254
255  /**
256   * Get current cluster status.
257   * 
258   * @return object of {@link ClusterMetrics}
259   * @throws IOException
260   * @throws InterruptedException
261   */
262  public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
263    return client.getClusterMetrics();
264  }
265  
266  /**
267   * Get all active trackers in the cluster.
268   * 
269   * @return array of {@link TaskTrackerInfo}
270   * @throws IOException
271   * @throws InterruptedException
272   */
273  public TaskTrackerInfo[] getActiveTaskTrackers() 
274      throws IOException, InterruptedException  {
275    return client.getActiveTrackers();
276  }
277  
278  /**
279   * Get blacklisted trackers.
280   * 
281   * @return array of {@link TaskTrackerInfo}
282   * @throws IOException
283   * @throws InterruptedException
284   */
285  public TaskTrackerInfo[] getBlackListedTaskTrackers() 
286      throws IOException, InterruptedException  {
287    return client.getBlacklistedTrackers();
288  }
289  
290  /**
291   * Get all the jobs in cluster.
292   * 
293   * @return array of {@link Job}
294   * @throws IOException
295   * @throws InterruptedException
296   * @deprecated Use {@link #getAllJobStatuses()} instead.
297   */
298  @Deprecated
299  public Job[] getAllJobs() throws IOException, InterruptedException {
300    return getJobs(client.getAllJobs());
301  }
302
303  /**
304   * Get job status for all jobs in the cluster.
305   * @return job status for all jobs in cluster
306   * @throws IOException
307   * @throws InterruptedException
308   */
309  public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
310    return client.getAllJobs();
311  }
312
313  /**
314   * Grab the jobtracker system directory path where 
315   * job-specific files will  be placed.
316   * 
317   * @return the system directory where job-specific files are to be placed.
318   */
319  public Path getSystemDir() throws IOException, InterruptedException {
320    if (sysDir == null) {
321      sysDir = new Path(client.getSystemDir());
322    }
323    return sysDir;
324  }
325  
326  /**
327   * Grab the jobtracker's view of the staging directory path where 
328   * job-specific files will  be placed.
329   * 
330   * @return the staging directory where job-specific files are to be placed.
331   */
332  public Path getStagingAreaDir() throws IOException, InterruptedException {
333    if (stagingAreaDir == null) {
334      stagingAreaDir = new Path(client.getStagingAreaDir());
335    }
336    return stagingAreaDir;
337  }
338
339  /**
340   * Get the job history file path for a given job id. The job history file at 
341   * this path may or may not be existing depending on the job completion state.
342   * The file is present only for the completed jobs.
343   * @param jobId the JobID of the job submitted by the current user.
344   * @return the file path of the job history file
345   * @throws IOException
346   * @throws InterruptedException
347   */
348  public String getJobHistoryUrl(JobID jobId) throws IOException, 
349    InterruptedException {
350    if (jobHistoryDir == null) {
351      jobHistoryDir = new Path(client.getJobHistoryDir());
352    }
353    return new Path(jobHistoryDir, jobId.toString() + "_"
354                    + ugi.getShortUserName()).toString();
355  }
356
357  /**
358   * Gets the Queue ACLs for current user
359   * @return array of QueueAclsInfo object for current user.
360   * @throws IOException
361   */
362  public QueueAclsInfo[] getQueueAclsForCurrentUser() 
363      throws IOException, InterruptedException  {
364    return client.getQueueAclsForCurrentUser();
365  }
366
367  /**
368   * Gets the root level queues.
369   * @return array of JobQueueInfo object.
370   * @throws IOException
371   */
372  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
373    return client.getRootQueues();
374  }
375  
376  /**
377   * Returns immediate children of queueName.
378   * @param queueName
379   * @return array of JobQueueInfo which are children of queueName
380   * @throws IOException
381   */
382  public QueueInfo[] getChildQueues(String queueName) 
383      throws IOException, InterruptedException {
384    return client.getChildQueues(queueName);
385  }
386  
387  /**
388   * Get the JobTracker's status.
389   * 
390   * @return {@link JobTrackerStatus} of the JobTracker
391   * @throws IOException
392   * @throws InterruptedException
393   */
394  public JobTrackerStatus getJobTrackerStatus() throws IOException,
395      InterruptedException {
396    return client.getJobTrackerStatus();
397  }
398  
399  /**
400   * Get the tasktracker expiry interval for the cluster
401   * @return the expiry interval in msec
402   */
403  public long getTaskTrackerExpiryInterval() throws IOException,
404      InterruptedException {
405    return client.getTaskTrackerExpiryInterval();
406  }
407
408  /**
409   * Get a delegation token for the user from the JobTracker.
410   * @param renewer the user who can renew the token
411   * @return the new token
412   * @throws IOException
413   */
414  public Token<DelegationTokenIdentifier> 
415      getDelegationToken(Text renewer) throws IOException, InterruptedException{
416    // client has already set the service
417    return client.getDelegationToken(renewer);
418  }
419
420  /**
421   * Renew a delegation token
422   * @param token the token to renew
423   * @return the new expiration time
424   * @throws InvalidToken
425   * @throws IOException
426   * @deprecated Use {@link Token#renew} instead
427   */
428  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
429                                   ) throws InvalidToken, IOException,
430                                            InterruptedException {
431    return token.renew(getConf());
432  }
433
434  /**
435   * Cancel a delegation token from the JobTracker
436   * @param token the token to cancel
437   * @throws IOException
438   * @deprecated Use {@link Token#cancel} instead
439   */
440  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
441                                    ) throws IOException,
442                                             InterruptedException {
443    token.cancel(getConf());
444  }
445
446}