001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapreduce.tools; 019 020import java.io.BufferedOutputStream; 021import java.io.File; 022import java.io.FileOutputStream; 023import java.io.IOException; 024import java.io.OutputStreamWriter; 025import java.io.PrintStream; 026import java.io.PrintWriter; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Set; 030import java.util.HashSet; 031import java.util.Arrays; 032 033import com.google.common.annotations.VisibleForTesting; 034import org.apache.commons.lang.StringUtils; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.apache.hadoop.classification.InterfaceAudience; 038import org.apache.hadoop.classification.InterfaceStability; 039import org.apache.hadoop.classification.InterfaceAudience.Private; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.conf.Configured; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.ipc.RemoteException; 045import org.apache.hadoop.mapred.JobConf; 046import org.apache.hadoop.mapred.TIPStatus; 047import org.apache.hadoop.mapreduce.Cluster; 048import org.apache.hadoop.mapreduce.Counters; 049import org.apache.hadoop.mapreduce.Job; 050import org.apache.hadoop.mapreduce.JobID; 051import org.apache.hadoop.mapreduce.JobPriority; 052import org.apache.hadoop.mapreduce.JobStatus; 053import org.apache.hadoop.mapreduce.MRJobConfig; 054import org.apache.hadoop.mapreduce.TaskAttemptID; 055import org.apache.hadoop.mapreduce.TaskCompletionEvent; 056import org.apache.hadoop.mapreduce.TaskReport; 057import org.apache.hadoop.mapreduce.TaskTrackerInfo; 058import org.apache.hadoop.mapreduce.TaskType; 059import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer; 060import org.apache.hadoop.mapreduce.v2.LogParams; 061import org.apache.hadoop.security.AccessControlException; 062import org.apache.hadoop.util.ExitUtil; 063import org.apache.hadoop.util.Tool; 064import org.apache.hadoop.util.ToolRunner; 065import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; 066 067import com.google.common.base.Charsets; 068 069/** 070 * Interprets the map reduce cli options 071 */ 072@InterfaceAudience.Public 073@InterfaceStability.Stable 074public class CLI extends Configured implements Tool { 075 private static final Log LOG = LogFactory.getLog(CLI.class); 076 protected Cluster cluster; 077 private final Set<String> taskStates = new HashSet<String>( 078 Arrays.asList("pending", "running", "completed", "failed", "killed")); 079 private static final Set<String> taskTypes = new HashSet<String>( 080 Arrays.asList("MAP", "REDUCE")); 081 082 public CLI() { 083 } 084 085 public CLI(Configuration conf) { 086 setConf(conf); 087 } 088 089 public int run(String[] argv) throws Exception { 090 int exitCode = -1; 091 if (argv.length < 1) { 092 displayUsage(""); 093 return exitCode; 094 } 095 // process arguments 096 String cmd = argv[0]; 097 String submitJobFile = null; 098 String jobid = null; 099 String taskid = null; 100 String historyFileOrJobId = null; 101 String historyOutFile = null; 102 String historyOutFormat = HistoryViewer.HUMAN_FORMAT; 103 String counterGroupName = null; 104 String counterName = null; 105 JobPriority jp = null; 106 String taskType = null; 107 String taskState = null; 108 int fromEvent = 0; 109 int nEvents = 0; 110 int jpvalue = 0; 111 String configOutFile = null; 112 boolean getStatus = false; 113 boolean getCounter = false; 114 boolean killJob = false; 115 boolean listEvents = false; 116 boolean viewHistory = false; 117 boolean viewAllHistory = false; 118 boolean listJobs = false; 119 boolean listAllJobs = false; 120 boolean listActiveTrackers = false; 121 boolean listBlacklistedTrackers = false; 122 boolean displayTasks = false; 123 boolean killTask = false; 124 boolean failTask = false; 125 boolean setJobPriority = false; 126 boolean logs = false; 127 boolean downloadConfig = false; 128 129 if ("-submit".equals(cmd)) { 130 if (argv.length != 2) { 131 displayUsage(cmd); 132 return exitCode; 133 } 134 submitJobFile = argv[1]; 135 } else if ("-status".equals(cmd)) { 136 if (argv.length != 2) { 137 displayUsage(cmd); 138 return exitCode; 139 } 140 jobid = argv[1]; 141 getStatus = true; 142 } else if("-counter".equals(cmd)) { 143 if (argv.length != 4) { 144 displayUsage(cmd); 145 return exitCode; 146 } 147 getCounter = true; 148 jobid = argv[1]; 149 counterGroupName = argv[2]; 150 counterName = argv[3]; 151 } else if ("-kill".equals(cmd)) { 152 if (argv.length != 2) { 153 displayUsage(cmd); 154 return exitCode; 155 } 156 jobid = argv[1]; 157 killJob = true; 158 } else if ("-set-priority".equals(cmd)) { 159 if (argv.length != 3) { 160 displayUsage(cmd); 161 return exitCode; 162 } 163 jobid = argv[1]; 164 try { 165 jp = JobPriority.valueOf(argv[2]); 166 } catch (IllegalArgumentException iae) { 167 try { 168 jpvalue = Integer.parseInt(argv[2]); 169 } catch (NumberFormatException ne) { 170 LOG.info(ne); 171 displayUsage(cmd); 172 return exitCode; 173 } 174 } 175 setJobPriority = true; 176 } else if ("-events".equals(cmd)) { 177 if (argv.length != 4) { 178 displayUsage(cmd); 179 return exitCode; 180 } 181 jobid = argv[1]; 182 fromEvent = Integer.parseInt(argv[2]); 183 nEvents = Integer.parseInt(argv[3]); 184 listEvents = true; 185 } else if ("-history".equals(cmd)) { 186 viewHistory = true; 187 if (argv.length < 2 || argv.length > 7) { 188 displayUsage(cmd); 189 return exitCode; 190 } 191 192 // Some arguments are optional while others are not, and some require 193 // second arguments. Due to this, the indexing can vary depending on 194 // what's specified and what's left out, as summarized in the below table: 195 // [all] <jobHistoryFile|jobId> [-outfile <file>] [-format <human|json>] 196 // 1 2 3 4 5 6 197 // 1 2 3 4 198 // 1 2 3 4 199 // 1 2 200 // 1 2 3 4 5 201 // 1 2 3 202 // 1 2 3 203 // 1 204 205 // "all" is optional, but comes first if specified 206 int index = 1; 207 if ("all".equals(argv[index])) { 208 index++; 209 viewAllHistory = true; 210 if (argv.length == 2) { 211 displayUsage(cmd); 212 return exitCode; 213 } 214 } 215 // Get the job history file or job id argument 216 historyFileOrJobId = argv[index++]; 217 // "-outfile" is optional, but if specified requires a second argument 218 if (argv.length > index + 1 && "-outfile".equals(argv[index])) { 219 index++; 220 historyOutFile = argv[index++]; 221 } 222 // "-format" is optional, but if specified required a second argument 223 if (argv.length > index + 1 && "-format".equals(argv[index])) { 224 index++; 225 historyOutFormat = argv[index++]; 226 } 227 // Check for any extra arguments that don't belong here 228 if (argv.length > index) { 229 displayUsage(cmd); 230 return exitCode; 231 } 232 } else if ("-list".equals(cmd)) { 233 if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) { 234 displayUsage(cmd); 235 return exitCode; 236 } 237 if (argv.length == 2 && "all".equals(argv[1])) { 238 listAllJobs = true; 239 } else { 240 listJobs = true; 241 } 242 } else if("-kill-task".equals(cmd)) { 243 if (argv.length != 2) { 244 displayUsage(cmd); 245 return exitCode; 246 } 247 killTask = true; 248 taskid = argv[1]; 249 } else if("-fail-task".equals(cmd)) { 250 if (argv.length != 2) { 251 displayUsage(cmd); 252 return exitCode; 253 } 254 failTask = true; 255 taskid = argv[1]; 256 } else if ("-list-active-trackers".equals(cmd)) { 257 if (argv.length != 1) { 258 displayUsage(cmd); 259 return exitCode; 260 } 261 listActiveTrackers = true; 262 } else if ("-list-blacklisted-trackers".equals(cmd)) { 263 if (argv.length != 1) { 264 displayUsage(cmd); 265 return exitCode; 266 } 267 listBlacklistedTrackers = true; 268 } else if ("-list-attempt-ids".equals(cmd)) { 269 if (argv.length != 4) { 270 displayUsage(cmd); 271 return exitCode; 272 } 273 jobid = argv[1]; 274 taskType = argv[2]; 275 taskState = argv[3]; 276 displayTasks = true; 277 if (!taskTypes.contains( 278 org.apache.hadoop.util.StringUtils.toUpperCase(taskType))) { 279 System.out.println("Error: Invalid task-type: " + taskType); 280 displayUsage(cmd); 281 return exitCode; 282 } 283 if (!taskStates.contains( 284 org.apache.hadoop.util.StringUtils.toLowerCase(taskState))) { 285 System.out.println("Error: Invalid task-state: " + taskState); 286 displayUsage(cmd); 287 return exitCode; 288 } 289 } else if ("-logs".equals(cmd)) { 290 if (argv.length == 2 || argv.length ==3) { 291 logs = true; 292 jobid = argv[1]; 293 if (argv.length == 3) { 294 taskid = argv[2]; 295 } else { 296 taskid = null; 297 } 298 } else { 299 displayUsage(cmd); 300 return exitCode; 301 } 302 } else if ("-config".equals(cmd)) { 303 downloadConfig = true; 304 if (argv.length != 3) { 305 displayUsage(cmd); 306 return exitCode; 307 } 308 jobid = argv[1]; 309 configOutFile = argv[2]; 310 } else { 311 displayUsage(cmd); 312 return exitCode; 313 } 314 315 // initialize cluster 316 cluster = createCluster(); 317 318 // Submit the request 319 try { 320 if (submitJobFile != null) { 321 Job job = Job.getInstance(new JobConf(submitJobFile)); 322 job.submit(); 323 System.out.println("Created job " + job.getJobID()); 324 exitCode = 0; 325 } else if (getStatus) { 326 Job job = getJob(JobID.forName(jobid)); 327 if (job == null) { 328 System.out.println("Could not find job " + jobid); 329 } else { 330 Counters counters = job.getCounters(); 331 System.out.println(); 332 System.out.println(job); 333 if (counters != null) { 334 System.out.println(counters); 335 } else { 336 System.out.println("Counters not available. Job is retired."); 337 } 338 exitCode = 0; 339 } 340 } else if (getCounter) { 341 Job job = getJob(JobID.forName(jobid)); 342 if (job == null) { 343 System.out.println("Could not find job " + jobid); 344 } else { 345 Counters counters = job.getCounters(); 346 if (counters == null) { 347 System.out.println("Counters not available for retired job " + 348 jobid); 349 exitCode = -1; 350 } else { 351 System.out.println(getCounter(counters, 352 counterGroupName, counterName)); 353 exitCode = 0; 354 } 355 } 356 } else if (killJob) { 357 Job job = getJob(JobID.forName(jobid)); 358 if (job == null) { 359 System.out.println("Could not find job " + jobid); 360 } else { 361 JobStatus jobStatus = job.getStatus(); 362 if (jobStatus.getState() == JobStatus.State.FAILED) { 363 System.out.println("Could not mark the job " + jobid 364 + " as killed, as it has already failed."); 365 exitCode = -1; 366 } else if (jobStatus.getState() == JobStatus.State.KILLED) { 367 System.out 368 .println("The job " + jobid + " has already been killed."); 369 exitCode = -1; 370 } else if (jobStatus.getState() == JobStatus.State.SUCCEEDED) { 371 System.out.println("Could not kill the job " + jobid 372 + ", as it has already succeeded."); 373 exitCode = -1; 374 } else { 375 job.killJob(); 376 System.out.println("Killed job " + jobid); 377 exitCode = 0; 378 } 379 } 380 } else if (setJobPriority) { 381 Job job = getJob(JobID.forName(jobid)); 382 if (job == null) { 383 System.out.println("Could not find job " + jobid); 384 } else { 385 if (jp != null) { 386 job.setPriority(jp); 387 } else { 388 job.setPriorityAsInteger(jpvalue); 389 } 390 System.out.println("Changed job priority."); 391 exitCode = 0; 392 } 393 } else if (viewHistory) { 394 // If it ends with .jhist, assume it's a jhist file; otherwise, assume 395 // it's a Job ID 396 if (historyFileOrJobId.endsWith(".jhist")) { 397 viewHistory(historyFileOrJobId, viewAllHistory, historyOutFile, 398 historyOutFormat); 399 exitCode = 0; 400 } else { 401 Job job = getJob(JobID.forName(historyFileOrJobId)); 402 if (job == null) { 403 System.out.println("Could not find job " + jobid); 404 } else { 405 String historyUrl = job.getHistoryUrl(); 406 if (historyUrl == null || historyUrl.isEmpty()) { 407 System.out.println("History file for job " + historyFileOrJobId + 408 " is currently unavailable."); 409 } else { 410 viewHistory(historyUrl, viewAllHistory, historyOutFile, 411 historyOutFormat); 412 exitCode = 0; 413 } 414 } 415 } 416 } else if (listEvents) { 417 Job job = getJob(JobID.forName(jobid)); 418 if (job == null) { 419 System.out.println("Could not find job " + jobid); 420 } else { 421 listEvents(job, fromEvent, nEvents); 422 exitCode = 0; 423 } 424 } else if (listJobs) { 425 listJobs(cluster); 426 exitCode = 0; 427 } else if (listAllJobs) { 428 listAllJobs(cluster); 429 exitCode = 0; 430 } else if (listActiveTrackers) { 431 listActiveTrackers(cluster); 432 exitCode = 0; 433 } else if (listBlacklistedTrackers) { 434 listBlacklistedTrackers(cluster); 435 exitCode = 0; 436 } else if (displayTasks) { 437 Job job = getJob(JobID.forName(jobid)); 438 if (job == null) { 439 System.out.println("Could not find job " + jobid); 440 } else { 441 displayTasks(getJob(JobID.forName(jobid)), taskType, taskState); 442 exitCode = 0; 443 } 444 } else if(killTask) { 445 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 446 Job job = getJob(taskID.getJobID()); 447 if (job == null) { 448 System.out.println("Could not find job " + jobid); 449 } else if (job.killTask(taskID, false)) { 450 System.out.println("Killed task " + taskid); 451 exitCode = 0; 452 } else { 453 System.out.println("Could not kill task " + taskid); 454 exitCode = -1; 455 } 456 } else if(failTask) { 457 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 458 Job job = getJob(taskID.getJobID()); 459 if (job == null) { 460 System.out.println("Could not find job " + jobid); 461 } else if(job.killTask(taskID, true)) { 462 System.out.println("Killed task " + taskID + " by failing it"); 463 exitCode = 0; 464 } else { 465 System.out.println("Could not fail task " + taskid); 466 exitCode = -1; 467 } 468 } else if (logs) { 469 JobID jobID = JobID.forName(jobid); 470 if (getJob(jobID) == null) { 471 System.out.println("Could not find job " + jobid); 472 } else { 473 try { 474 TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid); 475 LogParams logParams = cluster.getLogParams(jobID, taskAttemptID); 476 LogCLIHelpers logDumper = new LogCLIHelpers(); 477 logDumper.setConf(getConf()); 478 exitCode = logDumper.dumpAContainersLogs( 479 logParams.getApplicationId(), logParams.getContainerId(), 480 logParams.getNodeId(), logParams.getOwner()); 481 } catch (IOException e) { 482 if (e instanceof RemoteException) { 483 throw e; 484 } 485 System.out.println(e.getMessage()); 486 } 487 } 488 } else if (downloadConfig) { 489 Job job = getJob(JobID.forName(jobid)); 490 if (job == null) { 491 System.out.println("Could not find job " + jobid); 492 } else { 493 String jobFile = job.getJobFile(); 494 if (jobFile == null || jobFile.isEmpty()) { 495 System.out.println("Config file for job " + jobFile + 496 " could not be found."); 497 } else { 498 Path configPath = new Path(jobFile); 499 FileSystem fs = FileSystem.get(getConf()); 500 fs.copyToLocalFile(configPath, new Path(configOutFile)); 501 exitCode = 0; 502 } 503 } 504 } 505 } catch (RemoteException re) { 506 IOException unwrappedException = re.unwrapRemoteException(); 507 if (unwrappedException instanceof AccessControlException) { 508 System.out.println(unwrappedException.getMessage()); 509 } else { 510 throw re; 511 } 512 } finally { 513 cluster.close(); 514 } 515 return exitCode; 516 } 517 518 Cluster createCluster() throws IOException { 519 return new Cluster(getConf()); 520 } 521 522 private String getJobPriorityNames() { 523 StringBuffer sb = new StringBuffer(); 524 for (JobPriority p : JobPriority.values()) { 525 // UNDEFINED_PRIORITY need not to be displayed in usage 526 if (JobPriority.UNDEFINED_PRIORITY == p) { 527 continue; 528 } 529 sb.append(p.name()).append(" "); 530 } 531 return sb.substring(0, sb.length()-1); 532 } 533 534 private String getTaskTypes() { 535 return StringUtils.join(taskTypes, " "); 536 } 537 538 /** 539 * Display usage of the command-line tool and terminate execution. 540 */ 541 private void displayUsage(String cmd) { 542 String prefix = "Usage: job "; 543 String jobPriorityValues = getJobPriorityNames(); 544 String taskStates = "pending, running, completed, failed, killed"; 545 546 if ("-submit".equals(cmd)) { 547 System.err.println(prefix + "[" + cmd + " <job-file>]"); 548 } else if ("-status".equals(cmd) || "-kill".equals(cmd)) { 549 System.err.println(prefix + "[" + cmd + " <job-id>]"); 550 } else if ("-counter".equals(cmd)) { 551 System.err.println(prefix + "[" + cmd + 552 " <job-id> <group-name> <counter-name>]"); 553 } else if ("-events".equals(cmd)) { 554 System.err.println(prefix + "[" + cmd + 555 " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1."); 556 } else if ("-history".equals(cmd)) { 557 System.err.println(prefix + "[" + cmd + " [all] <jobHistoryFile|jobId> " + 558 "[-outfile <file>] [-format <human|json>]]"); 559 } else if ("-list".equals(cmd)) { 560 System.err.println(prefix + "[" + cmd + " [all]]"); 561 } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) { 562 System.err.println(prefix + "[" + cmd + " <task-attempt-id>]"); 563 } else if ("-set-priority".equals(cmd)) { 564 System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " + 565 "Valid values for priorities are: " 566 + jobPriorityValues 567 + ". In addition to this, integers also can be used."); 568 } else if ("-list-active-trackers".equals(cmd)) { 569 System.err.println(prefix + "[" + cmd + "]"); 570 } else if ("-list-blacklisted-trackers".equals(cmd)) { 571 System.err.println(prefix + "[" + cmd + "]"); 572 } else if ("-list-attempt-ids".equals(cmd)) { 573 System.err.println(prefix + "[" + cmd + 574 " <job-id> <task-type> <task-state>]. " + 575 "Valid values for <task-type> are " + getTaskTypes() + ". " + 576 "Valid values for <task-state> are " + taskStates); 577 } else if ("-logs".equals(cmd)) { 578 System.err.println(prefix + "[" + cmd + 579 " <job-id> <task-attempt-id>]. " + 580 " <task-attempt-id> is optional to get task attempt logs."); 581 } else if ("-config".equals(cmd)) { 582 System.err.println(prefix + "[" + cmd + " <job-id> <file>]"); 583 } else { 584 System.err.printf(prefix + "<command> <args>%n"); 585 System.err.printf("\t[-submit <job-file>]%n"); 586 System.err.printf("\t[-status <job-id>]%n"); 587 System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n"); 588 System.err.printf("\t[-kill <job-id>]%n"); 589 System.err.printf("\t[-set-priority <job-id> <priority>]. " + 590 "Valid values for priorities are: " + jobPriorityValues + 591 ". In addition to this, integers also can be used." + "%n"); 592 System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n"); 593 System.err.printf("\t[-history [all] <jobHistoryFile|jobId> " + 594 "[-outfile <file>] [-format <human|json>]]%n"); 595 System.err.printf("\t[-list [all]]%n"); 596 System.err.printf("\t[-list-active-trackers]%n"); 597 System.err.printf("\t[-list-blacklisted-trackers]%n"); 598 System.err.println("\t[-list-attempt-ids <job-id> <task-type> " + 599 "<task-state>]. " + 600 "Valid values for <task-type> are " + getTaskTypes() + ". " + 601 "Valid values for <task-state> are " + taskStates); 602 System.err.printf("\t[-kill-task <task-attempt-id>]%n"); 603 System.err.printf("\t[-fail-task <task-attempt-id>]%n"); 604 System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n"); 605 System.err.printf("\t[-config <job-id> <file>%n%n"); 606 ToolRunner.printGenericCommandUsage(System.out); 607 } 608 } 609 610 private void viewHistory(String historyFile, boolean all, 611 String historyOutFile, String format) throws IOException { 612 HistoryViewer historyViewer = new HistoryViewer(historyFile, 613 getConf(), all, format); 614 PrintStream ps = System.out; 615 if (historyOutFile != null) { 616 ps = new PrintStream(new BufferedOutputStream(new FileOutputStream( 617 new File(historyOutFile))), true, "UTF-8"); 618 } 619 historyViewer.print(ps); 620 } 621 622 protected long getCounter(Counters counters, String counterGroupName, 623 String counterName) throws IOException { 624 return counters.findCounter(counterGroupName, counterName).getValue(); 625 } 626 627 /** 628 * List the events for the given job 629 * @param jobId the job id for the job's events to list 630 * @throws IOException 631 */ 632 private void listEvents(Job job, int fromEventId, int numEvents) 633 throws IOException, InterruptedException { 634 TaskCompletionEvent[] events = job. 635 getTaskCompletionEvents(fromEventId, numEvents); 636 System.out.println("Task completion events for " + job.getJobID()); 637 System.out.println("Number of events (from " + fromEventId + ") are: " 638 + events.length); 639 for(TaskCompletionEvent event: events) { 640 System.out.println(event.getStatus() + " " + 641 event.getTaskAttemptId() + " " + 642 getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp())); 643 } 644 } 645 646 protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { 647 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 648 } 649 650 @VisibleForTesting 651 Job getJob(JobID jobid) throws IOException, InterruptedException { 652 653 int maxRetry = getConf().getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, 654 MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES); 655 long retryInterval = getConf() 656 .getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL, 657 MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL); 658 Job job = cluster.getJob(jobid); 659 660 for (int i = 0; i < maxRetry; ++i) { 661 if (job != null) { 662 return job; 663 } 664 LOG.info("Could not obtain job info after " + String.valueOf(i + 1) 665 + " attempt(s). Sleeping for " + String.valueOf(retryInterval / 1000) 666 + " seconds and retrying."); 667 Thread.sleep(retryInterval); 668 job = cluster.getJob(jobid); 669 } 670 return job; 671 } 672 673 674 /** 675 * Dump a list of currently running jobs 676 * @throws IOException 677 */ 678 private void listJobs(Cluster cluster) 679 throws IOException, InterruptedException { 680 List<JobStatus> runningJobs = new ArrayList<JobStatus>(); 681 for (JobStatus job : cluster.getAllJobStatuses()) { 682 if (!job.isJobComplete()) { 683 runningJobs.add(job); 684 } 685 } 686 displayJobList(runningJobs.toArray(new JobStatus[0])); 687 } 688 689 /** 690 * Dump a list of all jobs submitted. 691 * @throws IOException 692 */ 693 private void listAllJobs(Cluster cluster) 694 throws IOException, InterruptedException { 695 displayJobList(cluster.getAllJobStatuses()); 696 } 697 698 /** 699 * Display the list of active trackers 700 */ 701 private void listActiveTrackers(Cluster cluster) 702 throws IOException, InterruptedException { 703 TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers(); 704 for (TaskTrackerInfo tracker : trackers) { 705 System.out.println(tracker.getTaskTrackerName()); 706 } 707 } 708 709 /** 710 * Display the list of blacklisted trackers 711 */ 712 private void listBlacklistedTrackers(Cluster cluster) 713 throws IOException, InterruptedException { 714 TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers(); 715 if (trackers.length > 0) { 716 System.out.println("BlackListedNode \t Reason"); 717 } 718 for (TaskTrackerInfo tracker : trackers) { 719 System.out.println(tracker.getTaskTrackerName() + "\t" + 720 tracker.getReasonForBlacklist()); 721 } 722 } 723 724 private void printTaskAttempts(TaskReport report) { 725 if (report.getCurrentStatus() == TIPStatus.COMPLETE) { 726 System.out.println(report.getSuccessfulTaskAttemptId()); 727 } else if (report.getCurrentStatus() == TIPStatus.RUNNING) { 728 for (TaskAttemptID t : 729 report.getRunningTaskAttemptIds()) { 730 System.out.println(t); 731 } 732 } 733 } 734 735 /** 736 * Display the information about a job's tasks, of a particular type and 737 * in a particular state 738 * 739 * @param job the job 740 * @param type the type of the task (map/reduce/setup/cleanup) 741 * @param state the state of the task 742 * (pending/running/completed/failed/killed) 743 * @throws IOException when there is an error communicating with the master 744 * @throws InterruptedException 745 * @throws IllegalArgumentException if an invalid type/state is passed 746 */ 747 protected void displayTasks(Job job, String type, String state) 748 throws IOException, InterruptedException { 749 750 TaskReport[] reports=null; 751 reports = job.getTaskReports(TaskType.valueOf( 752 org.apache.hadoop.util.StringUtils.toUpperCase(type))); 753 for (TaskReport report : reports) { 754 TIPStatus status = report.getCurrentStatus(); 755 if ((state.equalsIgnoreCase("pending") && status ==TIPStatus.PENDING) || 756 (state.equalsIgnoreCase("running") && status ==TIPStatus.RUNNING) || 757 (state.equalsIgnoreCase("completed") && status == TIPStatus.COMPLETE) || 758 (state.equalsIgnoreCase("failed") && status == TIPStatus.FAILED) || 759 (state.equalsIgnoreCase("killed") && status == TIPStatus.KILLED)) { 760 printTaskAttempts(report); 761 } 762 } 763 } 764 765 public void displayJobList(JobStatus[] jobs) 766 throws IOException, InterruptedException { 767 displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out, 768 Charsets.UTF_8))); 769 } 770 771 @Private 772 public static String headerPattern = "%23s\t%20s\t%10s\t%14s\t%12s\t%12s" + 773 "\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 774 @Private 775 public static String dataPattern = "%23s\t%20s\t%10s\t%14d\t%12s\t%12s" + 776 "\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 777 private static String memPattern = "%dM"; 778 private static String UNAVAILABLE = "N/A"; 779 780 @Private 781 public void displayJobList(JobStatus[] jobs, PrintWriter writer) { 782 writer.println("Total jobs:" + jobs.length); 783 writer.printf(headerPattern, "JobId", "JobName", "State", "StartTime", 784 "UserName", "Queue", "Priority", "UsedContainers", 785 "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info"); 786 for (JobStatus job : jobs) { 787 int numUsedSlots = job.getNumUsedSlots(); 788 int numReservedSlots = job.getNumReservedSlots(); 789 long usedMem = job.getUsedMem(); 790 long rsvdMem = job.getReservedMem(); 791 long neededMem = job.getNeededMem(); 792 int jobNameLength = job.getJobName().length(); 793 writer.printf(dataPattern, job.getJobID().toString(), 794 job.getJobName().substring(0, jobNameLength > 20 ? 20 : jobNameLength), 795 job.getState(), job.getStartTime(), job.getUsername(), 796 job.getQueue(), job.getPriority().name(), 797 numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots, 798 numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots, 799 usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem), 800 rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem), 801 neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem), 802 job.getSchedulingInfo()); 803 } 804 writer.flush(); 805 } 806 807 public static void main(String[] argv) throws Exception { 808 int res = ToolRunner.run(new CLI(), argv); 809 ExitUtil.terminate(res); 810 } 811}