001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapred; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.net.InetSocketAddress; 023import java.net.URL; 024import java.security.PrivilegedExceptionAction; 025import java.util.ArrayList; 026import java.util.Collection; 027import java.util.List; 028 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.io.Text; 036import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo; 037import org.apache.hadoop.mapreduce.Cluster; 038import org.apache.hadoop.mapreduce.ClusterMetrics; 039import org.apache.hadoop.mapreduce.Job; 040import org.apache.hadoop.mapreduce.MRJobConfig; 041import org.apache.hadoop.mapreduce.QueueInfo; 042import org.apache.hadoop.mapreduce.TaskTrackerInfo; 043import org.apache.hadoop.mapreduce.TaskType; 044import org.apache.hadoop.mapreduce.filecache.DistributedCache; 045import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; 046import org.apache.hadoop.mapreduce.tools.CLI; 047import org.apache.hadoop.mapreduce.util.ConfigUtil; 048import org.apache.hadoop.security.UserGroupInformation; 049import org.apache.hadoop.security.token.SecretManager.InvalidToken; 050import org.apache.hadoop.security.token.Token; 051import org.apache.hadoop.security.token.TokenRenewer; 052import org.apache.hadoop.util.Tool; 053import org.apache.hadoop.util.ToolRunner; 054 055/** 056 * <code>JobClient</code> is the primary interface for the user-job to interact 057 * with the cluster. 058 * 059 * <code>JobClient</code> provides facilities to submit jobs, track their 060 * progress, access component-tasks' reports/logs, get the Map-Reduce cluster 061 * status information etc. 062 * 063 * <p>The job submission process involves: 064 * <ol> 065 * <li> 066 * Checking the input and output specifications of the job. 067 * </li> 068 * <li> 069 * Computing the {@link InputSplit}s for the job. 070 * </li> 071 * <li> 072 * Setup the requisite accounting information for the {@link DistributedCache} 073 * of the job, if necessary. 074 * </li> 075 * <li> 076 * Copying the job's jar and configuration to the map-reduce system directory 077 * on the distributed file-system. 078 * </li> 079 * <li> 080 * Submitting the job to the cluster and optionally monitoring 081 * it's status. 082 * </li> 083 * </ol> 084 * 085 * Normally the user creates the application, describes various facets of the 086 * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 087 * the job and monitor its progress. 088 * 089 * <p>Here is an example on how to use <code>JobClient</code>:</p> 090 * <p><blockquote><pre> 091 * // Create a new JobConf 092 * JobConf job = new JobConf(new Configuration(), MyJob.class); 093 * 094 * // Specify various job-specific parameters 095 * job.setJobName("myjob"); 096 * 097 * job.setInputPath(new Path("in")); 098 * job.setOutputPath(new Path("out")); 099 * 100 * job.setMapperClass(MyJob.MyMapper.class); 101 * job.setReducerClass(MyJob.MyReducer.class); 102 * 103 * // Submit the job, then poll for progress until the job is complete 104 * JobClient.runJob(job); 105 * </pre></blockquote> 106 * 107 * <b id="JobControl">Job Control</b> 108 * 109 * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 110 * which cannot be done via a single map-reduce job. This is fairly easy since 111 * the output of the job, typically, goes to distributed file-system and that 112 * can be used as the input for the next job.</p> 113 * 114 * <p>However, this also means that the onus on ensuring jobs are complete 115 * (success/failure) lies squarely on the clients. In such situations the 116 * various job-control options are: 117 * <ol> 118 * <li> 119 * {@link #runJob(JobConf)} : submits the job and returns only after 120 * the job has completed. 121 * </li> 122 * <li> 123 * {@link #submitJob(JobConf)} : only submits the job, then poll the 124 * returned handle to the {@link RunningJob} to query status and make 125 * scheduling decisions. 126 * </li> 127 * <li> 128 * {@link JobConf#setJobEndNotificationURI(String)} : setup a notification 129 * on job-completion, thus avoiding polling. 130 * </li> 131 * </ol> 132 * 133 * @see JobConf 134 * @see ClusterStatus 135 * @see Tool 136 * @see DistributedCache 137 */ 138@InterfaceAudience.Public 139@InterfaceStability.Stable 140public class JobClient extends CLI implements AutoCloseable { 141 142 @InterfaceAudience.Private 143 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY = 144 "mapreduce.jobclient.retry.policy.enabled"; 145 @InterfaceAudience.Private 146 public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = 147 false; 148 @InterfaceAudience.Private 149 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY = 150 "mapreduce.jobclient.retry.policy.spec"; 151 @InterfaceAudience.Private 152 public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT = 153 "10000,6,60000,10"; // t1,n1,t2,n2,... 154 155 public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } 156 private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 157 158 private int maxRetry = MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES; 159 private long retryInterval = 160 MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL; 161 162 static{ 163 ConfigUtil.loadResources(); 164 } 165 166 /** 167 * A NetworkedJob is an implementation of RunningJob. It holds 168 * a JobProfile object to provide some info, and interacts with the 169 * remote service to provide certain functionality. 170 */ 171 static class NetworkedJob implements RunningJob { 172 Job job; 173 /** 174 * We store a JobProfile and a timestamp for when we last 175 * acquired the job profile. If the job is null, then we cannot 176 * perform any of the tasks. The job might be null if the cluster 177 * has completely forgotten about the job. (eg, 24 hours after the 178 * job completes.) 179 */ 180 public NetworkedJob(JobStatus status, Cluster cluster) throws IOException { 181 this(status, cluster, new JobConf(status.getJobFile())); 182 } 183 184 private NetworkedJob(JobStatus status, Cluster cluster, JobConf conf) 185 throws IOException { 186 this(Job.getInstance(cluster, status, conf)); 187 } 188 189 public NetworkedJob(Job job) throws IOException { 190 this.job = job; 191 } 192 193 public Configuration getConfiguration() { 194 return job.getConfiguration(); 195 } 196 197 /** 198 * An identifier for the job 199 */ 200 public JobID getID() { 201 return JobID.downgrade(job.getJobID()); 202 } 203 204 /** @deprecated This method is deprecated and will be removed. Applications should 205 * rather use {@link #getID()}.*/ 206 @Deprecated 207 public String getJobID() { 208 return getID().toString(); 209 } 210 211 /** 212 * The user-specified job name 213 */ 214 public String getJobName() { 215 return job.getJobName(); 216 } 217 218 /** 219 * The name of the job file 220 */ 221 public String getJobFile() { 222 return job.getJobFile(); 223 } 224 225 /** 226 * A URL where the job's status can be seen 227 */ 228 public String getTrackingURL() { 229 return job.getTrackingURL(); 230 } 231 232 /** 233 * A float between 0.0 and 1.0, indicating the % of map work 234 * completed. 235 */ 236 public float mapProgress() throws IOException { 237 return job.mapProgress(); 238 } 239 240 /** 241 * A float between 0.0 and 1.0, indicating the % of reduce work 242 * completed. 243 */ 244 public float reduceProgress() throws IOException { 245 return job.reduceProgress(); 246 } 247 248 /** 249 * A float between 0.0 and 1.0, indicating the % of cleanup work 250 * completed. 251 */ 252 public float cleanupProgress() throws IOException { 253 try { 254 return job.cleanupProgress(); 255 } catch (InterruptedException ie) { 256 throw new IOException(ie); 257 } 258 } 259 260 /** 261 * A float between 0.0 and 1.0, indicating the % of setup work 262 * completed. 263 */ 264 public float setupProgress() throws IOException { 265 return job.setupProgress(); 266 } 267 268 /** 269 * Returns immediately whether the whole job is done yet or not. 270 */ 271 public synchronized boolean isComplete() throws IOException { 272 return job.isComplete(); 273 } 274 275 /** 276 * True iff job completed successfully. 277 */ 278 public synchronized boolean isSuccessful() throws IOException { 279 return job.isSuccessful(); 280 } 281 282 /** 283 * Blocks until the job is finished 284 */ 285 public void waitForCompletion() throws IOException { 286 try { 287 job.waitForCompletion(false); 288 } catch (InterruptedException ie) { 289 throw new IOException(ie); 290 } catch (ClassNotFoundException ce) { 291 throw new IOException(ce); 292 } 293 } 294 295 /** 296 * Tells the service to get the state of the current job. 297 */ 298 public synchronized int getJobState() throws IOException { 299 try { 300 return job.getJobState().getValue(); 301 } catch (InterruptedException ie) { 302 throw new IOException(ie); 303 } 304 } 305 306 /** 307 * Tells the service to terminate the current job. 308 */ 309 public synchronized void killJob() throws IOException { 310 job.killJob(); 311 } 312 313 314 /** Set the priority of the job. 315 * @param priority new priority of the job. 316 */ 317 public synchronized void setJobPriority(String priority) 318 throws IOException { 319 try { 320 job.setPriority( 321 org.apache.hadoop.mapreduce.JobPriority.valueOf(priority)); 322 } catch (InterruptedException ie) { 323 throw new IOException(ie); 324 } 325 } 326 327 /** 328 * Kill indicated task attempt. 329 * @param taskId the id of the task to kill. 330 * @param shouldFail if true the task is failed and added to failed tasks list, otherwise 331 * it is just killed, w/o affecting job failure status. 332 */ 333 public synchronized void killTask(TaskAttemptID taskId, 334 boolean shouldFail) throws IOException { 335 if (shouldFail) { 336 job.failTask(taskId); 337 } else { 338 job.killTask(taskId); 339 } 340 } 341 342 /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/ 343 @Deprecated 344 public synchronized void killTask(String taskId, boolean shouldFail) throws IOException { 345 killTask(TaskAttemptID.forName(taskId), shouldFail); 346 } 347 348 /** 349 * Fetch task completion events from cluster for this job. 350 */ 351 public synchronized TaskCompletionEvent[] getTaskCompletionEvents( 352 int startFrom) throws IOException { 353 try { 354 org.apache.hadoop.mapreduce.TaskCompletionEvent[] acls = 355 job.getTaskCompletionEvents(startFrom, 10); 356 TaskCompletionEvent[] ret = new TaskCompletionEvent[acls.length]; 357 for (int i = 0 ; i < acls.length; i++ ) { 358 ret[i] = TaskCompletionEvent.downgrade(acls[i]); 359 } 360 return ret; 361 } catch (InterruptedException ie) { 362 throw new IOException(ie); 363 } 364 } 365 366 /** 367 * Dump stats to screen 368 */ 369 @Override 370 public String toString() { 371 return job.toString(); 372 } 373 374 /** 375 * Returns the counters for this job 376 */ 377 public Counters getCounters() throws IOException { 378 Counters result = null; 379 org.apache.hadoop.mapreduce.Counters temp = job.getCounters(); 380 if(temp != null) { 381 result = Counters.downgrade(temp); 382 } 383 return result; 384 } 385 386 @Override 387 public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException { 388 try { 389 return job.getTaskDiagnostics(id); 390 } catch (InterruptedException ie) { 391 throw new IOException(ie); 392 } 393 } 394 395 public String getHistoryUrl() throws IOException { 396 try { 397 return job.getHistoryUrl(); 398 } catch (InterruptedException ie) { 399 throw new IOException(ie); 400 } 401 } 402 403 public boolean isRetired() throws IOException { 404 try { 405 return job.isRetired(); 406 } catch (InterruptedException ie) { 407 throw new IOException(ie); 408 } 409 } 410 411 boolean monitorAndPrintJob() throws IOException, InterruptedException { 412 return job.monitorAndPrintJob(); 413 } 414 415 @Override 416 public String getFailureInfo() throws IOException { 417 try { 418 return job.getStatus().getFailureInfo(); 419 } catch (InterruptedException ie) { 420 throw new IOException(ie); 421 } 422 } 423 424 @Override 425 public JobStatus getJobStatus() throws IOException { 426 try { 427 return JobStatus.downgrade(job.getStatus()); 428 } catch (InterruptedException ie) { 429 throw new IOException(ie); 430 } 431 } 432 } 433 434 /** 435 * Ugi of the client. We store this ugi when the client is created and 436 * then make sure that the same ugi is used to run the various protocols. 437 */ 438 UserGroupInformation clientUgi; 439 440 /** 441 * Create a job client. 442 */ 443 public JobClient() { 444 } 445 446 /** 447 * Build a job client with the given {@link JobConf}, and connect to the 448 * default cluster 449 * 450 * @param conf the job configuration. 451 * @throws IOException 452 */ 453 public JobClient(JobConf conf) throws IOException { 454 init(conf); 455 } 456 457 /** 458 * Build a job client with the given {@link Configuration}, 459 * and connect to the default cluster 460 * 461 * @param conf the configuration. 462 * @throws IOException 463 */ 464 public JobClient(Configuration conf) throws IOException { 465 init(new JobConf(conf)); 466 } 467 468 /** 469 * Connect to the default cluster 470 * @param conf the job configuration. 471 * @throws IOException 472 */ 473 public void init(JobConf conf) throws IOException { 474 setConf(conf); 475 cluster = new Cluster(conf); 476 clientUgi = UserGroupInformation.getCurrentUser(); 477 478 maxRetry = conf.getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, 479 MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES); 480 481 retryInterval = 482 conf.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL, 483 MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL); 484 485 } 486 487 /** 488 * Build a job client, connect to the indicated job tracker. 489 * 490 * @param jobTrackAddr the job tracker to connect to. 491 * @param conf configuration. 492 */ 493 public JobClient(InetSocketAddress jobTrackAddr, 494 Configuration conf) throws IOException { 495 cluster = new Cluster(jobTrackAddr, conf); 496 clientUgi = UserGroupInformation.getCurrentUser(); 497 } 498 499 /** 500 * Close the <code>JobClient</code>. 501 */ 502 @Override 503 public synchronized void close() throws IOException { 504 cluster.close(); 505 } 506 507 /** 508 * Get a filesystem handle. We need this to prepare jobs 509 * for submission to the MapReduce system. 510 * 511 * @return the filesystem handle. 512 */ 513 public synchronized FileSystem getFs() throws IOException { 514 try { 515 return cluster.getFileSystem(); 516 } catch (InterruptedException ie) { 517 throw new IOException(ie); 518 } 519 } 520 521 /** 522 * Get a handle to the Cluster 523 */ 524 public Cluster getClusterHandle() { 525 return cluster; 526 } 527 528 /** 529 * Submit a job to the MR system. 530 * 531 * This returns a handle to the {@link RunningJob} which can be used to track 532 * the running-job. 533 * 534 * @param jobFile the job configuration. 535 * @return a handle to the {@link RunningJob} which can be used to track the 536 * running-job. 537 * @throws FileNotFoundException 538 * @throws InvalidJobConfException 539 * @throws IOException 540 */ 541 public RunningJob submitJob(String jobFile) throws FileNotFoundException, 542 InvalidJobConfException, 543 IOException { 544 // Load in the submitted job details 545 JobConf job = new JobConf(jobFile); 546 return submitJob(job); 547 } 548 549 /** 550 * Submit a job to the MR system. 551 * This returns a handle to the {@link RunningJob} which can be used to track 552 * the running-job. 553 * 554 * @param conf the job configuration. 555 * @return a handle to the {@link RunningJob} which can be used to track the 556 * running-job. 557 * @throws FileNotFoundException 558 * @throws IOException 559 */ 560 public RunningJob submitJob(final JobConf conf) throws FileNotFoundException, 561 IOException { 562 return submitJobInternal(conf); 563 } 564 565 @InterfaceAudience.Private 566 public RunningJob submitJobInternal(final JobConf conf) 567 throws FileNotFoundException, IOException { 568 try { 569 conf.setBooleanIfUnset("mapred.mapper.new-api", false); 570 conf.setBooleanIfUnset("mapred.reducer.new-api", false); 571 Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () { 572 @Override 573 public Job run() throws IOException, ClassNotFoundException, 574 InterruptedException { 575 Job job = Job.getInstance(conf); 576 job.submit(); 577 return job; 578 } 579 }); 580 581 Cluster prev = cluster; 582 // update our Cluster instance with the one created by Job for submission 583 // (we can't pass our Cluster instance to Job, since Job wraps the config 584 // instance, and the two configs would then diverge) 585 cluster = job.getCluster(); 586 587 // It is important to close the previous cluster instance 588 // to cleanup resources. 589 if (prev != null) { 590 prev.close(); 591 } 592 return new NetworkedJob(job); 593 } catch (InterruptedException ie) { 594 throw new IOException("interrupted", ie); 595 } 596 } 597 598 private Job getJobUsingCluster(final JobID jobid) throws IOException, 599 InterruptedException { 600 return clientUgi.doAs(new PrivilegedExceptionAction<Job>() { 601 public Job run() throws IOException, InterruptedException { 602 return cluster.getJob(jobid); 603 } 604 }); 605 } 606 607 protected RunningJob getJobInner(final JobID jobid) throws IOException { 608 try { 609 610 Job job = getJobUsingCluster(jobid); 611 if (job != null) { 612 JobStatus status = JobStatus.downgrade(job.getStatus()); 613 if (status != null) { 614 return new NetworkedJob(status, cluster, 615 new JobConf(job.getConfiguration())); 616 } 617 } 618 } catch (InterruptedException ie) { 619 throw new IOException(ie); 620 } 621 return null; 622 } 623 624 /** 625 * Get an {@link RunningJob} object to track an ongoing job. Returns 626 * null if the id does not correspond to any known job. 627 * 628 * @param jobid the jobid of the job. 629 * @return the {@link RunningJob} handle to track the job, null if the 630 * <code>jobid</code> doesn't correspond to any known job. 631 * @throws IOException 632 */ 633 public RunningJob getJob(final JobID jobid) throws IOException { 634 for (int i = 0;i <= maxRetry;i++) { 635 if (i > 0) { 636 try { 637 Thread.sleep(retryInterval); 638 } catch (Exception e) { } 639 } 640 RunningJob job = getJobInner(jobid); 641 if (job != null) { 642 return job; 643 } 644 } 645 return null; 646 } 647 648 /**@deprecated Applications should rather use {@link #getJob(JobID)}. 649 */ 650 @Deprecated 651 public RunningJob getJob(String jobid) throws IOException { 652 return getJob(JobID.forName(jobid)); 653 } 654 655 private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0]; 656 657 /** 658 * Get the information of the current state of the map tasks of a job. 659 * 660 * @param jobId the job to query. 661 * @return the list of all of the map tips. 662 * @throws IOException 663 */ 664 public TaskReport[] getMapTaskReports(JobID jobId) throws IOException { 665 return getTaskReports(jobId, TaskType.MAP); 666 } 667 668 private TaskReport[] getTaskReports(final JobID jobId, TaskType type) throws 669 IOException { 670 try { 671 Job j = getJobUsingCluster(jobId); 672 if(j == null) { 673 return EMPTY_TASK_REPORTS; 674 } 675 return TaskReport.downgradeArray(j.getTaskReports(type)); 676 } catch (InterruptedException ie) { 677 throw new IOException(ie); 678 } 679 } 680 681 /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/ 682 @Deprecated 683 public TaskReport[] getMapTaskReports(String jobId) throws IOException { 684 return getMapTaskReports(JobID.forName(jobId)); 685 } 686 687 /** 688 * Get the information of the current state of the reduce tasks of a job. 689 * 690 * @param jobId the job to query. 691 * @return the list of all of the reduce tips. 692 * @throws IOException 693 */ 694 public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException { 695 return getTaskReports(jobId, TaskType.REDUCE); 696 } 697 698 /** 699 * Get the information of the current state of the cleanup tasks of a job. 700 * 701 * @param jobId the job to query. 702 * @return the list of all of the cleanup tips. 703 * @throws IOException 704 */ 705 public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException { 706 return getTaskReports(jobId, TaskType.JOB_CLEANUP); 707 } 708 709 /** 710 * Get the information of the current state of the setup tasks of a job. 711 * 712 * @param jobId the job to query. 713 * @return the list of all of the setup tips. 714 * @throws IOException 715 */ 716 public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException { 717 return getTaskReports(jobId, TaskType.JOB_SETUP); 718 } 719 720 721 /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/ 722 @Deprecated 723 public TaskReport[] getReduceTaskReports(String jobId) throws IOException { 724 return getReduceTaskReports(JobID.forName(jobId)); 725 } 726 727 /** 728 * Display the information about a job's tasks, of a particular type and 729 * in a particular state 730 * 731 * @param jobId the ID of the job 732 * @param type the type of the task (map/reduce/setup/cleanup) 733 * @param state the state of the task 734 * (pending/running/completed/failed/killed) 735 * @throws IOException when there is an error communicating with the master 736 * @throws IllegalArgumentException if an invalid type/state is passed 737 */ 738 public void displayTasks(final JobID jobId, String type, String state) 739 throws IOException { 740 try { 741 Job job = getJobUsingCluster(jobId); 742 super.displayTasks(job, type, state); 743 } catch (InterruptedException ie) { 744 throw new IOException(ie); 745 } 746 } 747 748 /** 749 * Get status information about the Map-Reduce cluster. 750 * 751 * @return the status information about the Map-Reduce cluster as an object 752 * of {@link ClusterStatus}. 753 * @throws IOException 754 */ 755 public ClusterStatus getClusterStatus() throws IOException { 756 try { 757 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() { 758 public ClusterStatus run() throws IOException, InterruptedException { 759 ClusterMetrics metrics = cluster.getClusterStatus(); 760 return new ClusterStatus(metrics.getTaskTrackerCount(), metrics 761 .getBlackListedTaskTrackerCount(), cluster 762 .getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(), 763 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), 764 metrics.getReduceSlotCapacity(), cluster.getJobTrackerStatus(), 765 metrics.getDecommissionedTaskTrackerCount(), metrics 766 .getGrayListedTaskTrackerCount()); 767 } 768 }); 769 } catch (InterruptedException ie) { 770 throw new IOException(ie); 771 } 772 } 773 774 private Collection<String> arrayToStringList(TaskTrackerInfo[] objs) { 775 Collection<String> list = new ArrayList<String>(); 776 for (TaskTrackerInfo info: objs) { 777 list.add(info.getTaskTrackerName()); 778 } 779 return list; 780 } 781 782 private Collection<BlackListInfo> arrayToBlackListInfo(TaskTrackerInfo[] objs) { 783 Collection<BlackListInfo> list = new ArrayList<BlackListInfo>(); 784 for (TaskTrackerInfo info: objs) { 785 BlackListInfo binfo = new BlackListInfo(); 786 binfo.setTrackerName(info.getTaskTrackerName()); 787 binfo.setReasonForBlackListing(info.getReasonForBlacklist()); 788 binfo.setBlackListReport(info.getBlacklistReport()); 789 list.add(binfo); 790 } 791 return list; 792 } 793 794 /** 795 * Get status information about the Map-Reduce cluster. 796 * 797 * @param detailed if true then get a detailed status including the 798 * tracker names 799 * @return the status information about the Map-Reduce cluster as an object 800 * of {@link ClusterStatus}. 801 * @throws IOException 802 */ 803 public ClusterStatus getClusterStatus(boolean detailed) throws IOException { 804 try { 805 return clientUgi.doAs(new PrivilegedExceptionAction<ClusterStatus>() { 806 public ClusterStatus run() throws IOException, InterruptedException { 807 ClusterMetrics metrics = cluster.getClusterStatus(); 808 return new ClusterStatus(arrayToStringList(cluster.getActiveTaskTrackers()), 809 arrayToBlackListInfo(cluster.getBlackListedTaskTrackers()), 810 cluster.getTaskTrackerExpiryInterval(), metrics.getOccupiedMapSlots(), 811 metrics.getOccupiedReduceSlots(), metrics.getMapSlotCapacity(), 812 metrics.getReduceSlotCapacity(), 813 cluster.getJobTrackerStatus()); 814 } 815 }); 816 } catch (InterruptedException ie) { 817 throw new IOException(ie); 818 } 819 } 820 821 822 /** 823 * Get the jobs that are not completed and not failed. 824 * 825 * @return array of {@link JobStatus} for the running/to-be-run jobs. 826 * @throws IOException 827 */ 828 public JobStatus[] jobsToComplete() throws IOException { 829 List<JobStatus> stats = new ArrayList<JobStatus>(); 830 for (JobStatus stat : getAllJobs()) { 831 if (!stat.isJobComplete()) { 832 stats.add(stat); 833 } 834 } 835 return stats.toArray(new JobStatus[0]); 836 } 837 838 /** 839 * Get the jobs that are submitted. 840 * 841 * @return array of {@link JobStatus} for the submitted jobs. 842 * @throws IOException 843 */ 844 public JobStatus[] getAllJobs() throws IOException { 845 try { 846 org.apache.hadoop.mapreduce.JobStatus[] jobs = 847 clientUgi.doAs(new PrivilegedExceptionAction< 848 org.apache.hadoop.mapreduce.JobStatus[]> () { 849 public org.apache.hadoop.mapreduce.JobStatus[] run() 850 throws IOException, InterruptedException { 851 return cluster.getAllJobStatuses(); 852 } 853 }); 854 JobStatus[] stats = new JobStatus[jobs.length]; 855 for (int i = 0; i < jobs.length; i++) { 856 stats[i] = JobStatus.downgrade(jobs[i]); 857 } 858 return stats; 859 } catch (InterruptedException ie) { 860 throw new IOException(ie); 861 } 862 } 863 864 /** 865 * Utility that submits a job, then polls for progress until the job is 866 * complete. 867 * 868 * @param job the job configuration. 869 * @throws IOException if the job fails 870 */ 871 public static RunningJob runJob(JobConf job) throws IOException { 872 JobClient jc = new JobClient(job); 873 RunningJob rj = jc.submitJob(job); 874 try { 875 if (!jc.monitorAndPrintJob(job, rj)) { 876 throw new IOException("Job failed!"); 877 } 878 } catch (InterruptedException ie) { 879 Thread.currentThread().interrupt(); 880 } 881 return rj; 882 } 883 884 /** 885 * Monitor a job and print status in real-time as progress is made and tasks 886 * fail. 887 * @param conf the job's configuration 888 * @param job the job to track 889 * @return true if the job succeeded 890 * @throws IOException if communication to the JobTracker fails 891 */ 892 public boolean monitorAndPrintJob(JobConf conf, 893 RunningJob job 894 ) throws IOException, InterruptedException { 895 return ((NetworkedJob)job).monitorAndPrintJob(); 896 } 897 898 static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { 899 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 900 } 901 902 static Configuration getConfiguration(String jobTrackerSpec) 903 { 904 Configuration conf = new Configuration(); 905 if (jobTrackerSpec != null) { 906 if (jobTrackerSpec.indexOf(":") >= 0) { 907 conf.set("mapred.job.tracker", jobTrackerSpec); 908 } else { 909 String classpathFile = "hadoop-" + jobTrackerSpec + ".xml"; 910 URL validate = conf.getResource(classpathFile); 911 if (validate == null) { 912 throw new RuntimeException(classpathFile + " not found on CLASSPATH"); 913 } 914 conf.addResource(classpathFile); 915 } 916 } 917 return conf; 918 } 919 920 /** 921 * Sets the output filter for tasks. only those tasks are printed whose 922 * output matches the filter. 923 * @param newValue task filter. 924 */ 925 @Deprecated 926 public void setTaskOutputFilter(TaskStatusFilter newValue){ 927 this.taskOutputFilter = newValue; 928 } 929 930 /** 931 * Get the task output filter out of the JobConf. 932 * 933 * @param job the JobConf to examine. 934 * @return the filter level. 935 */ 936 public static TaskStatusFilter getTaskOutputFilter(JobConf job) { 937 return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 938 "FAILED")); 939 } 940 941 /** 942 * Modify the JobConf to set the task output filter. 943 * 944 * @param job the JobConf to modify. 945 * @param newValue the value to set. 946 */ 947 public static void setTaskOutputFilter(JobConf job, 948 TaskStatusFilter newValue) { 949 job.set("jobclient.output.filter", newValue.toString()); 950 } 951 952 /** 953 * Returns task output filter. 954 * @return task filter. 955 */ 956 @Deprecated 957 public TaskStatusFilter getTaskOutputFilter(){ 958 return this.taskOutputFilter; 959 } 960 961 protected long getCounter(org.apache.hadoop.mapreduce.Counters cntrs, 962 String counterGroupName, String counterName) throws IOException { 963 Counters counters = Counters.downgrade(cntrs); 964 return counters.findCounter(counterGroupName, counterName).getValue(); 965 } 966 967 /** 968 * Get status information about the max available Maps in the cluster. 969 * 970 * @return the max available Maps in the cluster 971 * @throws IOException 972 */ 973 public int getDefaultMaps() throws IOException { 974 try { 975 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() { 976 @Override 977 public Integer run() throws IOException, InterruptedException { 978 return cluster.getClusterStatus().getMapSlotCapacity(); 979 } 980 }); 981 } catch (InterruptedException ie) { 982 throw new IOException(ie); 983 } 984 } 985 986 /** 987 * Get status information about the max available Reduces in the cluster. 988 * 989 * @return the max available Reduces in the cluster 990 * @throws IOException 991 */ 992 public int getDefaultReduces() throws IOException { 993 try { 994 return clientUgi.doAs(new PrivilegedExceptionAction<Integer>() { 995 @Override 996 public Integer run() throws IOException, InterruptedException { 997 return cluster.getClusterStatus().getReduceSlotCapacity(); 998 } 999 }); 1000 } catch (InterruptedException ie) { 1001 throw new IOException(ie); 1002 } 1003 } 1004 1005 /** 1006 * Grab the jobtracker system directory path where job-specific files are to be placed. 1007 * 1008 * @return the system directory where job-specific files are to be placed. 1009 */ 1010 public Path getSystemDir() { 1011 try { 1012 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() { 1013 @Override 1014 public Path run() throws IOException, InterruptedException { 1015 return cluster.getSystemDir(); 1016 } 1017 }); 1018 } catch (IOException ioe) { 1019 return null; 1020 } catch (InterruptedException ie) { 1021 return null; 1022 } 1023 } 1024 1025 /** 1026 * Checks if the job directory is clean and has all the required components 1027 * for (re) starting the job 1028 */ 1029 public static boolean isJobDirValid(Path jobDirPath, FileSystem fs) 1030 throws IOException { 1031 FileStatus[] contents = fs.listStatus(jobDirPath); 1032 int matchCount = 0; 1033 if (contents != null && contents.length >= 2) { 1034 for (FileStatus status : contents) { 1035 if ("job.xml".equals(status.getPath().getName())) { 1036 ++matchCount; 1037 } 1038 if ("job.split".equals(status.getPath().getName())) { 1039 ++matchCount; 1040 } 1041 } 1042 if (matchCount == 2) { 1043 return true; 1044 } 1045 } 1046 return false; 1047 } 1048 1049 /** 1050 * Fetch the staging area directory for the application 1051 * 1052 * @return path to staging area directory 1053 * @throws IOException 1054 */ 1055 public Path getStagingAreaDir() throws IOException { 1056 try { 1057 return clientUgi.doAs(new PrivilegedExceptionAction<Path>() { 1058 @Override 1059 public Path run() throws IOException, InterruptedException { 1060 return cluster.getStagingAreaDir(); 1061 } 1062 }); 1063 } catch (InterruptedException ie) { 1064 // throw RuntimeException instead for compatibility reasons 1065 throw new RuntimeException(ie); 1066 } 1067 } 1068 1069 private JobQueueInfo getJobQueueInfo(QueueInfo queue) { 1070 JobQueueInfo ret = new JobQueueInfo(queue); 1071 // make sure to convert any children 1072 if (queue.getQueueChildren().size() > 0) { 1073 List<JobQueueInfo> childQueues = new ArrayList<JobQueueInfo>(queue 1074 .getQueueChildren().size()); 1075 for (QueueInfo child : queue.getQueueChildren()) { 1076 childQueues.add(getJobQueueInfo(child)); 1077 } 1078 ret.setChildren(childQueues); 1079 } 1080 return ret; 1081 } 1082 1083 private JobQueueInfo[] getJobQueueInfoArray(QueueInfo[] queues) 1084 throws IOException { 1085 JobQueueInfo[] ret = new JobQueueInfo[queues.length]; 1086 for (int i = 0; i < queues.length; i++) { 1087 ret[i] = getJobQueueInfo(queues[i]); 1088 } 1089 return ret; 1090 } 1091 1092 /** 1093 * Returns an array of queue information objects about root level queues 1094 * configured 1095 * 1096 * @return the array of root level JobQueueInfo objects 1097 * @throws IOException 1098 */ 1099 public JobQueueInfo[] getRootQueues() throws IOException { 1100 try { 1101 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1102 public JobQueueInfo[] run() throws IOException, InterruptedException { 1103 return getJobQueueInfoArray(cluster.getRootQueues()); 1104 } 1105 }); 1106 } catch (InterruptedException ie) { 1107 throw new IOException(ie); 1108 } 1109 } 1110 1111 /** 1112 * Returns an array of queue information objects about immediate children 1113 * of queue queueName. 1114 * 1115 * @param queueName 1116 * @return the array of immediate children JobQueueInfo objects 1117 * @throws IOException 1118 */ 1119 public JobQueueInfo[] getChildQueues(final String queueName) throws IOException { 1120 try { 1121 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1122 public JobQueueInfo[] run() throws IOException, InterruptedException { 1123 return getJobQueueInfoArray(cluster.getChildQueues(queueName)); 1124 } 1125 }); 1126 } catch (InterruptedException ie) { 1127 throw new IOException(ie); 1128 } 1129 } 1130 1131 /** 1132 * Return an array of queue information objects about all the Job Queues 1133 * configured. 1134 * 1135 * @return Array of JobQueueInfo objects 1136 * @throws IOException 1137 */ 1138 public JobQueueInfo[] getQueues() throws IOException { 1139 try { 1140 return clientUgi.doAs(new PrivilegedExceptionAction<JobQueueInfo[]>() { 1141 public JobQueueInfo[] run() throws IOException, InterruptedException { 1142 return getJobQueueInfoArray(cluster.getQueues()); 1143 } 1144 }); 1145 } catch (InterruptedException ie) { 1146 throw new IOException(ie); 1147 } 1148 } 1149 1150 /** 1151 * Gets all the jobs which were added to particular Job Queue 1152 * 1153 * @param queueName name of the Job Queue 1154 * @return Array of jobs present in the job queue 1155 * @throws IOException 1156 */ 1157 1158 public JobStatus[] getJobsFromQueue(final String queueName) throws IOException { 1159 try { 1160 QueueInfo queue = clientUgi.doAs(new PrivilegedExceptionAction<QueueInfo>() { 1161 @Override 1162 public QueueInfo run() throws IOException, InterruptedException { 1163 return cluster.getQueue(queueName); 1164 } 1165 }); 1166 if (queue == null) { 1167 return null; 1168 } 1169 org.apache.hadoop.mapreduce.JobStatus[] stats = 1170 queue.getJobStatuses(); 1171 JobStatus[] ret = new JobStatus[stats.length]; 1172 for (int i = 0 ; i < stats.length; i++ ) { 1173 ret[i] = JobStatus.downgrade(stats[i]); 1174 } 1175 return ret; 1176 } catch (InterruptedException ie) { 1177 throw new IOException(ie); 1178 } 1179 } 1180 1181 /** 1182 * Gets the queue information associated to a particular Job Queue 1183 * 1184 * @param queueName name of the job queue. 1185 * @return Queue information associated to particular queue. 1186 * @throws IOException 1187 */ 1188 public JobQueueInfo getQueueInfo(final String queueName) throws IOException { 1189 try { 1190 QueueInfo queueInfo = clientUgi.doAs(new 1191 PrivilegedExceptionAction<QueueInfo>() { 1192 public QueueInfo run() throws IOException, InterruptedException { 1193 return cluster.getQueue(queueName); 1194 } 1195 }); 1196 if (queueInfo != null) { 1197 return new JobQueueInfo(queueInfo); 1198 } 1199 return null; 1200 } catch (InterruptedException ie) { 1201 throw new IOException(ie); 1202 } 1203 } 1204 1205 /** 1206 * Gets the Queue ACLs for current user 1207 * @return array of QueueAclsInfo object for current user. 1208 * @throws IOException 1209 */ 1210 public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException { 1211 try { 1212 org.apache.hadoop.mapreduce.QueueAclsInfo[] acls = 1213 clientUgi.doAs(new 1214 PrivilegedExceptionAction 1215 <org.apache.hadoop.mapreduce.QueueAclsInfo[]>() { 1216 public org.apache.hadoop.mapreduce.QueueAclsInfo[] run() 1217 throws IOException, InterruptedException { 1218 return cluster.getQueueAclsForCurrentUser(); 1219 } 1220 }); 1221 QueueAclsInfo[] ret = new QueueAclsInfo[acls.length]; 1222 for (int i = 0 ; i < acls.length; i++ ) { 1223 ret[i] = QueueAclsInfo.downgrade(acls[i]); 1224 } 1225 return ret; 1226 } catch (InterruptedException ie) { 1227 throw new IOException(ie); 1228 } 1229 } 1230 1231 /** 1232 * Get a delegation token for the user from the JobTracker. 1233 * @param renewer the user who can renew the token 1234 * @return the new token 1235 * @throws IOException 1236 */ 1237 public Token<DelegationTokenIdentifier> 1238 getDelegationToken(final Text renewer) throws IOException, InterruptedException { 1239 return clientUgi.doAs(new 1240 PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() { 1241 public Token<DelegationTokenIdentifier> run() throws IOException, 1242 InterruptedException { 1243 return cluster.getDelegationToken(renewer); 1244 } 1245 }); 1246 } 1247 1248 /** 1249 * Renew a delegation token 1250 * @param token the token to renew 1251 * @return true if the renewal went well 1252 * @throws InvalidToken 1253 * @throws IOException 1254 * @deprecated Use {@link Token#renew} instead 1255 */ 1256 public long renewDelegationToken(Token<DelegationTokenIdentifier> token 1257 ) throws InvalidToken, IOException, 1258 InterruptedException { 1259 return token.renew(getConf()); 1260 } 1261 1262 /** 1263 * Cancel a delegation token from the JobTracker 1264 * @param token the token to cancel 1265 * @throws IOException 1266 * @deprecated Use {@link Token#cancel} instead 1267 */ 1268 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token 1269 ) throws InvalidToken, IOException, 1270 InterruptedException { 1271 token.cancel(getConf()); 1272 } 1273 1274 /** 1275 */ 1276 public static void main(String argv[]) throws Exception { 1277 int res = ToolRunner.run(new JobClient(), argv); 1278 System.exit(res); 1279 } 1280} 1281