001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.mapreduce.tools; 019 020import java.io.BufferedOutputStream; 021import java.io.File; 022import java.io.FileOutputStream; 023import java.io.IOException; 024import java.io.OutputStreamWriter; 025import java.io.PrintStream; 026import java.io.PrintWriter; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Set; 030import java.util.HashSet; 031import java.util.Arrays; 032 033import com.google.common.annotations.VisibleForTesting; 034import org.apache.commons.lang.StringUtils; 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.apache.hadoop.classification.InterfaceAudience; 038import org.apache.hadoop.classification.InterfaceStability; 039import org.apache.hadoop.classification.InterfaceAudience.Private; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.conf.Configured; 042import org.apache.hadoop.ipc.RemoteException; 043import org.apache.hadoop.mapred.JobConf; 044import org.apache.hadoop.mapred.TIPStatus; 045import org.apache.hadoop.mapreduce.Cluster; 046import org.apache.hadoop.mapreduce.Counters; 047import org.apache.hadoop.mapreduce.Job; 048import org.apache.hadoop.mapreduce.JobID; 049import org.apache.hadoop.mapreduce.JobPriority; 050import org.apache.hadoop.mapreduce.JobStatus; 051import org.apache.hadoop.mapreduce.MRJobConfig; 052import org.apache.hadoop.mapreduce.TaskAttemptID; 053import org.apache.hadoop.mapreduce.TaskCompletionEvent; 054import org.apache.hadoop.mapreduce.TaskReport; 055import org.apache.hadoop.mapreduce.TaskTrackerInfo; 056import org.apache.hadoop.mapreduce.TaskType; 057import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer; 058import org.apache.hadoop.mapreduce.v2.LogParams; 059import org.apache.hadoop.security.AccessControlException; 060import org.apache.hadoop.util.ExitUtil; 061import org.apache.hadoop.util.Tool; 062import org.apache.hadoop.util.ToolRunner; 063import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers; 064 065import com.google.common.base.Charsets; 066 067/** 068 * Interprets the map reduce cli options 069 */ 070@InterfaceAudience.Public 071@InterfaceStability.Stable 072public class CLI extends Configured implements Tool { 073 private static final Log LOG = LogFactory.getLog(CLI.class); 074 protected Cluster cluster; 075 private final Set<String> taskStates = new HashSet<String>( 076 Arrays.asList("pending", "running", "completed", "failed", "killed")); 077 private static final Set<String> taskTypes = new HashSet<String>( 078 Arrays.asList("MAP", "REDUCE")); 079 080 public CLI() { 081 } 082 083 public CLI(Configuration conf) { 084 setConf(conf); 085 } 086 087 public int run(String[] argv) throws Exception { 088 int exitCode = -1; 089 if (argv.length < 1) { 090 displayUsage(""); 091 return exitCode; 092 } 093 // process arguments 094 String cmd = argv[0]; 095 String submitJobFile = null; 096 String jobid = null; 097 String taskid = null; 098 String historyFileOrJobId = null; 099 String historyOutFile = null; 100 String historyOutFormat = HistoryViewer.HUMAN_FORMAT; 101 String counterGroupName = null; 102 String counterName = null; 103 JobPriority jp = null; 104 String taskType = null; 105 String taskState = null; 106 int fromEvent = 0; 107 int nEvents = 0; 108 int jpvalue = 0; 109 boolean getStatus = false; 110 boolean getCounter = false; 111 boolean killJob = false; 112 boolean listEvents = false; 113 boolean viewHistory = false; 114 boolean viewAllHistory = false; 115 boolean listJobs = false; 116 boolean listAllJobs = false; 117 boolean listActiveTrackers = false; 118 boolean listBlacklistedTrackers = false; 119 boolean displayTasks = false; 120 boolean killTask = false; 121 boolean failTask = false; 122 boolean setJobPriority = false; 123 boolean logs = false; 124 125 if ("-submit".equals(cmd)) { 126 if (argv.length != 2) { 127 displayUsage(cmd); 128 return exitCode; 129 } 130 submitJobFile = argv[1]; 131 } else if ("-status".equals(cmd)) { 132 if (argv.length != 2) { 133 displayUsage(cmd); 134 return exitCode; 135 } 136 jobid = argv[1]; 137 getStatus = true; 138 } else if("-counter".equals(cmd)) { 139 if (argv.length != 4) { 140 displayUsage(cmd); 141 return exitCode; 142 } 143 getCounter = true; 144 jobid = argv[1]; 145 counterGroupName = argv[2]; 146 counterName = argv[3]; 147 } else if ("-kill".equals(cmd)) { 148 if (argv.length != 2) { 149 displayUsage(cmd); 150 return exitCode; 151 } 152 jobid = argv[1]; 153 killJob = true; 154 } else if ("-set-priority".equals(cmd)) { 155 if (argv.length != 3) { 156 displayUsage(cmd); 157 return exitCode; 158 } 159 jobid = argv[1]; 160 try { 161 jp = JobPriority.valueOf(argv[2]); 162 } catch (IllegalArgumentException iae) { 163 try { 164 jpvalue = Integer.parseInt(argv[2]); 165 } catch (NumberFormatException ne) { 166 LOG.info(ne); 167 displayUsage(cmd); 168 return exitCode; 169 } 170 } 171 setJobPriority = true; 172 } else if ("-events".equals(cmd)) { 173 if (argv.length != 4) { 174 displayUsage(cmd); 175 return exitCode; 176 } 177 jobid = argv[1]; 178 fromEvent = Integer.parseInt(argv[2]); 179 nEvents = Integer.parseInt(argv[3]); 180 listEvents = true; 181 } else if ("-history".equals(cmd)) { 182 viewHistory = true; 183 if (argv.length < 2 || argv.length > 7) { 184 displayUsage(cmd); 185 return exitCode; 186 } 187 188 // Some arguments are optional while others are not, and some require 189 // second arguments. Due to this, the indexing can vary depending on 190 // what's specified and what's left out, as summarized in the below table: 191 // [all] <jobHistoryFile|jobId> [-outfile <file>] [-format <human|json>] 192 // 1 2 3 4 5 6 193 // 1 2 3 4 194 // 1 2 3 4 195 // 1 2 196 // 1 2 3 4 5 197 // 1 2 3 198 // 1 2 3 199 // 1 200 201 // "all" is optional, but comes first if specified 202 int index = 1; 203 if ("all".equals(argv[index])) { 204 index++; 205 viewAllHistory = true; 206 if (argv.length == 2) { 207 displayUsage(cmd); 208 return exitCode; 209 } 210 } 211 // Get the job history file or job id argument 212 historyFileOrJobId = argv[index++]; 213 // "-outfile" is optional, but if specified requires a second argument 214 if (argv.length > index + 1 && "-outfile".equals(argv[index])) { 215 index++; 216 historyOutFile = argv[index++]; 217 } 218 // "-format" is optional, but if specified required a second argument 219 if (argv.length > index + 1 && "-format".equals(argv[index])) { 220 index++; 221 historyOutFormat = argv[index++]; 222 } 223 // Check for any extra arguments that don't belong here 224 if (argv.length > index) { 225 displayUsage(cmd); 226 return exitCode; 227 } 228 } else if ("-list".equals(cmd)) { 229 if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) { 230 displayUsage(cmd); 231 return exitCode; 232 } 233 if (argv.length == 2 && "all".equals(argv[1])) { 234 listAllJobs = true; 235 } else { 236 listJobs = true; 237 } 238 } else if("-kill-task".equals(cmd)) { 239 if (argv.length != 2) { 240 displayUsage(cmd); 241 return exitCode; 242 } 243 killTask = true; 244 taskid = argv[1]; 245 } else if("-fail-task".equals(cmd)) { 246 if (argv.length != 2) { 247 displayUsage(cmd); 248 return exitCode; 249 } 250 failTask = true; 251 taskid = argv[1]; 252 } else if ("-list-active-trackers".equals(cmd)) { 253 if (argv.length != 1) { 254 displayUsage(cmd); 255 return exitCode; 256 } 257 listActiveTrackers = true; 258 } else if ("-list-blacklisted-trackers".equals(cmd)) { 259 if (argv.length != 1) { 260 displayUsage(cmd); 261 return exitCode; 262 } 263 listBlacklistedTrackers = true; 264 } else if ("-list-attempt-ids".equals(cmd)) { 265 if (argv.length != 4) { 266 displayUsage(cmd); 267 return exitCode; 268 } 269 jobid = argv[1]; 270 taskType = argv[2]; 271 taskState = argv[3]; 272 displayTasks = true; 273 if (!taskTypes.contains( 274 org.apache.hadoop.util.StringUtils.toUpperCase(taskType))) { 275 System.out.println("Error: Invalid task-type: " + taskType); 276 displayUsage(cmd); 277 return exitCode; 278 } 279 if (!taskStates.contains( 280 org.apache.hadoop.util.StringUtils.toLowerCase(taskState))) { 281 System.out.println("Error: Invalid task-state: " + taskState); 282 displayUsage(cmd); 283 return exitCode; 284 } 285 } else if ("-logs".equals(cmd)) { 286 if (argv.length == 2 || argv.length ==3) { 287 logs = true; 288 jobid = argv[1]; 289 if (argv.length == 3) { 290 taskid = argv[2]; 291 } else { 292 taskid = null; 293 } 294 } else { 295 displayUsage(cmd); 296 return exitCode; 297 } 298 } else { 299 displayUsage(cmd); 300 return exitCode; 301 } 302 303 // initialize cluster 304 cluster = createCluster(); 305 306 // Submit the request 307 try { 308 if (submitJobFile != null) { 309 Job job = Job.getInstance(new JobConf(submitJobFile)); 310 job.submit(); 311 System.out.println("Created job " + job.getJobID()); 312 exitCode = 0; 313 } else if (getStatus) { 314 Job job = getJob(JobID.forName(jobid)); 315 if (job == null) { 316 System.out.println("Could not find job " + jobid); 317 } else { 318 Counters counters = job.getCounters(); 319 System.out.println(); 320 System.out.println(job); 321 if (counters != null) { 322 System.out.println(counters); 323 } else { 324 System.out.println("Counters not available. Job is retired."); 325 } 326 exitCode = 0; 327 } 328 } else if (getCounter) { 329 Job job = getJob(JobID.forName(jobid)); 330 if (job == null) { 331 System.out.println("Could not find job " + jobid); 332 } else { 333 Counters counters = job.getCounters(); 334 if (counters == null) { 335 System.out.println("Counters not available for retired job " + 336 jobid); 337 exitCode = -1; 338 } else { 339 System.out.println(getCounter(counters, 340 counterGroupName, counterName)); 341 exitCode = 0; 342 } 343 } 344 } else if (killJob) { 345 Job job = getJob(JobID.forName(jobid)); 346 if (job == null) { 347 System.out.println("Could not find job " + jobid); 348 } else { 349 JobStatus jobStatus = job.getStatus(); 350 if (jobStatus.getState() == JobStatus.State.FAILED) { 351 System.out.println("Could not mark the job " + jobid 352 + " as killed, as it has already failed."); 353 exitCode = -1; 354 } else if (jobStatus.getState() == JobStatus.State.KILLED) { 355 System.out 356 .println("The job " + jobid + " has already been killed."); 357 exitCode = -1; 358 } else if (jobStatus.getState() == JobStatus.State.SUCCEEDED) { 359 System.out.println("Could not kill the job " + jobid 360 + ", as it has already succeeded."); 361 exitCode = -1; 362 } else { 363 job.killJob(); 364 System.out.println("Killed job " + jobid); 365 exitCode = 0; 366 } 367 } 368 } else if (setJobPriority) { 369 Job job = getJob(JobID.forName(jobid)); 370 if (job == null) { 371 System.out.println("Could not find job " + jobid); 372 } else { 373 if (jp != null) { 374 job.setPriority(jp); 375 } else { 376 job.setPriorityAsInteger(jpvalue); 377 } 378 System.out.println("Changed job priority."); 379 exitCode = 0; 380 } 381 } else if (viewHistory) { 382 // If it ends with .jhist, assume it's a jhist file; otherwise, assume 383 // it's a Job ID 384 if (historyFileOrJobId.endsWith(".jhist")) { 385 viewHistory(historyFileOrJobId, viewAllHistory, historyOutFile, 386 historyOutFormat); 387 exitCode = 0; 388 } else { 389 Job job = getJob(JobID.forName(historyFileOrJobId)); 390 if (job == null) { 391 System.out.println("Could not find job " + jobid); 392 } else { 393 String historyUrl = job.getHistoryUrl(); 394 if (historyUrl == null || historyUrl.isEmpty()) { 395 System.out.println("History file for job " + historyFileOrJobId + 396 " is currently unavailable."); 397 } else { 398 viewHistory(historyUrl, viewAllHistory, historyOutFile, 399 historyOutFormat); 400 exitCode = 0; 401 } 402 } 403 } 404 } else if (listEvents) { 405 listEvents(getJob(JobID.forName(jobid)), fromEvent, nEvents); 406 exitCode = 0; 407 } else if (listJobs) { 408 listJobs(cluster); 409 exitCode = 0; 410 } else if (listAllJobs) { 411 listAllJobs(cluster); 412 exitCode = 0; 413 } else if (listActiveTrackers) { 414 listActiveTrackers(cluster); 415 exitCode = 0; 416 } else if (listBlacklistedTrackers) { 417 listBlacklistedTrackers(cluster); 418 exitCode = 0; 419 } else if (displayTasks) { 420 displayTasks(getJob(JobID.forName(jobid)), taskType, taskState); 421 exitCode = 0; 422 } else if(killTask) { 423 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 424 Job job = getJob(taskID.getJobID()); 425 if (job == null) { 426 System.out.println("Could not find job " + jobid); 427 } else if (job.killTask(taskID, false)) { 428 System.out.println("Killed task " + taskid); 429 exitCode = 0; 430 } else { 431 System.out.println("Could not kill task " + taskid); 432 exitCode = -1; 433 } 434 } else if(failTask) { 435 TaskAttemptID taskID = TaskAttemptID.forName(taskid); 436 Job job = getJob(taskID.getJobID()); 437 if (job == null) { 438 System.out.println("Could not find job " + jobid); 439 } else if(job.killTask(taskID, true)) { 440 System.out.println("Killed task " + taskID + " by failing it"); 441 exitCode = 0; 442 } else { 443 System.out.println("Could not fail task " + taskid); 444 exitCode = -1; 445 } 446 } else if (logs) { 447 try { 448 JobID jobID = JobID.forName(jobid); 449 TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid); 450 LogParams logParams = cluster.getLogParams(jobID, taskAttemptID); 451 LogCLIHelpers logDumper = new LogCLIHelpers(); 452 logDumper.setConf(getConf()); 453 exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(), 454 logParams.getContainerId(), logParams.getNodeId(), 455 logParams.getOwner()); 456 } catch (IOException e) { 457 if (e instanceof RemoteException) { 458 throw e; 459 } 460 System.out.println(e.getMessage()); 461 } 462 } 463 } catch (RemoteException re) { 464 IOException unwrappedException = re.unwrapRemoteException(); 465 if (unwrappedException instanceof AccessControlException) { 466 System.out.println(unwrappedException.getMessage()); 467 } else { 468 throw re; 469 } 470 } finally { 471 cluster.close(); 472 } 473 return exitCode; 474 } 475 476 Cluster createCluster() throws IOException { 477 return new Cluster(getConf()); 478 } 479 480 private String getJobPriorityNames() { 481 StringBuffer sb = new StringBuffer(); 482 for (JobPriority p : JobPriority.values()) { 483 // UNDEFINED_PRIORITY need not to be displayed in usage 484 if (JobPriority.UNDEFINED_PRIORITY == p) { 485 continue; 486 } 487 sb.append(p.name()).append(" "); 488 } 489 return sb.substring(0, sb.length()-1); 490 } 491 492 private String getTaskTypes() { 493 return StringUtils.join(taskTypes, " "); 494 } 495 496 /** 497 * Display usage of the command-line tool and terminate execution. 498 */ 499 private void displayUsage(String cmd) { 500 String prefix = "Usage: job "; 501 String jobPriorityValues = getJobPriorityNames(); 502 String taskStates = "pending, running, completed, failed, killed"; 503 504 if ("-submit".equals(cmd)) { 505 System.err.println(prefix + "[" + cmd + " <job-file>]"); 506 } else if ("-status".equals(cmd) || "-kill".equals(cmd)) { 507 System.err.println(prefix + "[" + cmd + " <job-id>]"); 508 } else if ("-counter".equals(cmd)) { 509 System.err.println(prefix + "[" + cmd + 510 " <job-id> <group-name> <counter-name>]"); 511 } else if ("-events".equals(cmd)) { 512 System.err.println(prefix + "[" + cmd + 513 " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1."); 514 } else if ("-history".equals(cmd)) { 515 System.err.println(prefix + "[" + cmd + " [all] <jobHistoryFile|jobId> " + 516 "[-outfile <file>] [-format <human|json>]]"); 517 } else if ("-list".equals(cmd)) { 518 System.err.println(prefix + "[" + cmd + " [all]]"); 519 } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) { 520 System.err.println(prefix + "[" + cmd + " <task-attempt-id>]"); 521 } else if ("-set-priority".equals(cmd)) { 522 System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " + 523 "Valid values for priorities are: " 524 + jobPriorityValues 525 + ". In addition to this, integers also can be used."); 526 } else if ("-list-active-trackers".equals(cmd)) { 527 System.err.println(prefix + "[" + cmd + "]"); 528 } else if ("-list-blacklisted-trackers".equals(cmd)) { 529 System.err.println(prefix + "[" + cmd + "]"); 530 } else if ("-list-attempt-ids".equals(cmd)) { 531 System.err.println(prefix + "[" + cmd + 532 " <job-id> <task-type> <task-state>]. " + 533 "Valid values for <task-type> are " + getTaskTypes() + ". " + 534 "Valid values for <task-state> are " + taskStates); 535 } else if ("-logs".equals(cmd)) { 536 System.err.println(prefix + "[" + cmd + 537 " <job-id> <task-attempt-id>]. " + 538 " <task-attempt-id> is optional to get task attempt logs."); 539 } else { 540 System.err.printf(prefix + "<command> <args>%n"); 541 System.err.printf("\t[-submit <job-file>]%n"); 542 System.err.printf("\t[-status <job-id>]%n"); 543 System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n"); 544 System.err.printf("\t[-kill <job-id>]%n"); 545 System.err.printf("\t[-set-priority <job-id> <priority>]. " + 546 "Valid values for priorities are: " + jobPriorityValues + 547 ". In addition to this, integers also can be used." + "%n"); 548 System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n"); 549 System.err.printf("\t[-history [all] <jobHistoryFile|jobId> " + 550 "[-outfile <file>] [-format <human|json>]]%n"); 551 System.err.printf("\t[-list [all]]%n"); 552 System.err.printf("\t[-list-active-trackers]%n"); 553 System.err.printf("\t[-list-blacklisted-trackers]%n"); 554 System.err.println("\t[-list-attempt-ids <job-id> <task-type> " + 555 "<task-state>]. " + 556 "Valid values for <task-type> are " + getTaskTypes() + ". " + 557 "Valid values for <task-state> are " + taskStates); 558 System.err.printf("\t[-kill-task <task-attempt-id>]%n"); 559 System.err.printf("\t[-fail-task <task-attempt-id>]%n"); 560 System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n%n"); 561 ToolRunner.printGenericCommandUsage(System.out); 562 } 563 } 564 565 private void viewHistory(String historyFile, boolean all, 566 String historyOutFile, String format) throws IOException { 567 HistoryViewer historyViewer = new HistoryViewer(historyFile, 568 getConf(), all, format); 569 PrintStream ps = System.out; 570 if (historyOutFile != null) { 571 ps = new PrintStream(new BufferedOutputStream(new FileOutputStream( 572 new File(historyOutFile))), true, "UTF-8"); 573 } 574 historyViewer.print(ps); 575 } 576 577 protected long getCounter(Counters counters, String counterGroupName, 578 String counterName) throws IOException { 579 return counters.findCounter(counterGroupName, counterName).getValue(); 580 } 581 582 /** 583 * List the events for the given job 584 * @param jobId the job id for the job's events to list 585 * @throws IOException 586 */ 587 private void listEvents(Job job, int fromEventId, int numEvents) 588 throws IOException, InterruptedException { 589 TaskCompletionEvent[] events = job. 590 getTaskCompletionEvents(fromEventId, numEvents); 591 System.out.println("Task completion events for " + job.getJobID()); 592 System.out.println("Number of events (from " + fromEventId + ") are: " 593 + events.length); 594 for(TaskCompletionEvent event: events) { 595 System.out.println(event.getStatus() + " " + 596 event.getTaskAttemptId() + " " + 597 getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp())); 598 } 599 } 600 601 protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { 602 return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 603 } 604 605 @VisibleForTesting 606 Job getJob(JobID jobid) throws IOException, InterruptedException { 607 608 int maxRetry = getConf().getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES, 609 MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES); 610 long retryInterval = getConf() 611 .getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL, 612 MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL); 613 Job job = cluster.getJob(jobid); 614 615 for (int i = 0; i < maxRetry; ++i) { 616 if (job != null) { 617 return job; 618 } 619 LOG.info("Could not obtain job info after " + String.valueOf(i + 1) 620 + " attempt(s). Sleeping for " + String.valueOf(retryInterval / 1000) 621 + " seconds and retrying."); 622 Thread.sleep(retryInterval); 623 job = cluster.getJob(jobid); 624 } 625 return job; 626 } 627 628 629 /** 630 * Dump a list of currently running jobs 631 * @throws IOException 632 */ 633 private void listJobs(Cluster cluster) 634 throws IOException, InterruptedException { 635 List<JobStatus> runningJobs = new ArrayList<JobStatus>(); 636 for (JobStatus job : cluster.getAllJobStatuses()) { 637 if (!job.isJobComplete()) { 638 runningJobs.add(job); 639 } 640 } 641 displayJobList(runningJobs.toArray(new JobStatus[0])); 642 } 643 644 /** 645 * Dump a list of all jobs submitted. 646 * @throws IOException 647 */ 648 private void listAllJobs(Cluster cluster) 649 throws IOException, InterruptedException { 650 displayJobList(cluster.getAllJobStatuses()); 651 } 652 653 /** 654 * Display the list of active trackers 655 */ 656 private void listActiveTrackers(Cluster cluster) 657 throws IOException, InterruptedException { 658 TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers(); 659 for (TaskTrackerInfo tracker : trackers) { 660 System.out.println(tracker.getTaskTrackerName()); 661 } 662 } 663 664 /** 665 * Display the list of blacklisted trackers 666 */ 667 private void listBlacklistedTrackers(Cluster cluster) 668 throws IOException, InterruptedException { 669 TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers(); 670 if (trackers.length > 0) { 671 System.out.println("BlackListedNode \t Reason"); 672 } 673 for (TaskTrackerInfo tracker : trackers) { 674 System.out.println(tracker.getTaskTrackerName() + "\t" + 675 tracker.getReasonForBlacklist()); 676 } 677 } 678 679 private void printTaskAttempts(TaskReport report) { 680 if (report.getCurrentStatus() == TIPStatus.COMPLETE) { 681 System.out.println(report.getSuccessfulTaskAttemptId()); 682 } else if (report.getCurrentStatus() == TIPStatus.RUNNING) { 683 for (TaskAttemptID t : 684 report.getRunningTaskAttemptIds()) { 685 System.out.println(t); 686 } 687 } 688 } 689 690 /** 691 * Display the information about a job's tasks, of a particular type and 692 * in a particular state 693 * 694 * @param job the job 695 * @param type the type of the task (map/reduce/setup/cleanup) 696 * @param state the state of the task 697 * (pending/running/completed/failed/killed) 698 * @throws IOException when there is an error communicating with the master 699 * @throws InterruptedException 700 * @throws IllegalArgumentException if an invalid type/state is passed 701 */ 702 protected void displayTasks(Job job, String type, String state) 703 throws IOException, InterruptedException { 704 705 TaskReport[] reports=null; 706 reports = job.getTaskReports(TaskType.valueOf( 707 org.apache.hadoop.util.StringUtils.toUpperCase(type))); 708 for (TaskReport report : reports) { 709 TIPStatus status = report.getCurrentStatus(); 710 if ((state.equalsIgnoreCase("pending") && status ==TIPStatus.PENDING) || 711 (state.equalsIgnoreCase("running") && status ==TIPStatus.RUNNING) || 712 (state.equalsIgnoreCase("completed") && status == TIPStatus.COMPLETE) || 713 (state.equalsIgnoreCase("failed") && status == TIPStatus.FAILED) || 714 (state.equalsIgnoreCase("killed") && status == TIPStatus.KILLED)) { 715 printTaskAttempts(report); 716 } 717 } 718 } 719 720 public void displayJobList(JobStatus[] jobs) 721 throws IOException, InterruptedException { 722 displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out, 723 Charsets.UTF_8))); 724 } 725 726 @Private 727 public static String headerPattern = "%23s\t%20s\t%10s\t%14s\t%12s\t%12s" + 728 "\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 729 @Private 730 public static String dataPattern = "%23s\t%20s\t%10s\t%14d\t%12s\t%12s" + 731 "\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n"; 732 private static String memPattern = "%dM"; 733 private static String UNAVAILABLE = "N/A"; 734 735 @Private 736 public void displayJobList(JobStatus[] jobs, PrintWriter writer) { 737 writer.println("Total jobs:" + jobs.length); 738 writer.printf(headerPattern, "JobId", "JobName", "State", "StartTime", 739 "UserName", "Queue", "Priority", "UsedContainers", 740 "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info"); 741 for (JobStatus job : jobs) { 742 int numUsedSlots = job.getNumUsedSlots(); 743 int numReservedSlots = job.getNumReservedSlots(); 744 int usedMem = job.getUsedMem(); 745 int rsvdMem = job.getReservedMem(); 746 int neededMem = job.getNeededMem(); 747 int jobNameLength = job.getJobName().length(); 748 writer.printf(dataPattern, job.getJobID().toString(), 749 job.getJobName().substring(0, jobNameLength > 20 ? 20 : jobNameLength), 750 job.getState(), job.getStartTime(), job.getUsername(), 751 job.getQueue(), job.getPriority().name(), 752 numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots, 753 numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots, 754 usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem), 755 rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem), 756 neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem), 757 job.getSchedulingInfo()); 758 } 759 writer.flush(); 760 } 761 762 public static void main(String[] argv) throws Exception { 763 int res = ToolRunner.run(new CLI(), argv); 764 ExitUtil.terminate(res); 765 } 766}