001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapreduce.tools;
019
020import java.io.BufferedOutputStream;
021import java.io.File;
022import java.io.FileOutputStream;
023import java.io.IOException;
024import java.io.OutputStreamWriter;
025import java.io.PrintStream;
026import java.io.PrintWriter;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Set;
030import java.util.HashSet;
031import java.util.Arrays;
032
033import com.google.common.annotations.VisibleForTesting;
034import org.apache.commons.lang.StringUtils;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.apache.hadoop.classification.InterfaceAudience;
038import org.apache.hadoop.classification.InterfaceStability;
039import org.apache.hadoop.classification.InterfaceAudience.Private;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.conf.Configured;
042import org.apache.hadoop.ipc.RemoteException;
043import org.apache.hadoop.mapred.JobConf;
044import org.apache.hadoop.mapred.TIPStatus;
045import org.apache.hadoop.mapreduce.Cluster;
046import org.apache.hadoop.mapreduce.Counters;
047import org.apache.hadoop.mapreduce.Job;
048import org.apache.hadoop.mapreduce.JobID;
049import org.apache.hadoop.mapreduce.JobPriority;
050import org.apache.hadoop.mapreduce.JobStatus;
051import org.apache.hadoop.mapreduce.MRJobConfig;
052import org.apache.hadoop.mapreduce.TaskAttemptID;
053import org.apache.hadoop.mapreduce.TaskCompletionEvent;
054import org.apache.hadoop.mapreduce.TaskReport;
055import org.apache.hadoop.mapreduce.TaskTrackerInfo;
056import org.apache.hadoop.mapreduce.TaskType;
057import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
058import org.apache.hadoop.mapreduce.v2.LogParams;
059import org.apache.hadoop.security.AccessControlException;
060import org.apache.hadoop.util.ExitUtil;
061import org.apache.hadoop.util.Tool;
062import org.apache.hadoop.util.ToolRunner;
063import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
064
065import com.google.common.base.Charsets;
066
067/**
068 * Interprets the map reduce cli options 
069 */
070@InterfaceAudience.Public
071@InterfaceStability.Stable
072public class CLI extends Configured implements Tool {
073  private static final Log LOG = LogFactory.getLog(CLI.class);
074  protected Cluster cluster;
075  private final Set<String> taskStates = new HashSet<String>(
076              Arrays.asList("pending", "running", "completed", "failed", "killed"));
077  private static final Set<String> taskTypes = new HashSet<String>(
078      Arrays.asList("MAP", "REDUCE"));
079  
080  public CLI() {
081  }
082  
083  public CLI(Configuration conf) {
084    setConf(conf);
085  }
086  
087  public int run(String[] argv) throws Exception {
088    int exitCode = -1;
089    if (argv.length < 1) {
090      displayUsage("");
091      return exitCode;
092    }    
093    // process arguments
094    String cmd = argv[0];
095    String submitJobFile = null;
096    String jobid = null;
097    String taskid = null;
098    String historyFileOrJobId = null;
099    String historyOutFile = null;
100    String historyOutFormat = HistoryViewer.HUMAN_FORMAT;
101    String counterGroupName = null;
102    String counterName = null;
103    JobPriority jp = null;
104    String taskType = null;
105    String taskState = null;
106    int fromEvent = 0;
107    int nEvents = 0;
108    int jpvalue = 0;
109    boolean getStatus = false;
110    boolean getCounter = false;
111    boolean killJob = false;
112    boolean listEvents = false;
113    boolean viewHistory = false;
114    boolean viewAllHistory = false;
115    boolean listJobs = false;
116    boolean listAllJobs = false;
117    boolean listActiveTrackers = false;
118    boolean listBlacklistedTrackers = false;
119    boolean displayTasks = false;
120    boolean killTask = false;
121    boolean failTask = false;
122    boolean setJobPriority = false;
123    boolean logs = false;
124
125    if ("-submit".equals(cmd)) {
126      if (argv.length != 2) {
127        displayUsage(cmd);
128        return exitCode;
129      }
130      submitJobFile = argv[1];
131    } else if ("-status".equals(cmd)) {
132      if (argv.length != 2) {
133        displayUsage(cmd);
134        return exitCode;
135      }
136      jobid = argv[1];
137      getStatus = true;
138    } else if("-counter".equals(cmd)) {
139      if (argv.length != 4) {
140        displayUsage(cmd);
141        return exitCode;
142      }
143      getCounter = true;
144      jobid = argv[1];
145      counterGroupName = argv[2];
146      counterName = argv[3];
147    } else if ("-kill".equals(cmd)) {
148      if (argv.length != 2) {
149        displayUsage(cmd);
150        return exitCode;
151      }
152      jobid = argv[1];
153      killJob = true;
154    } else if ("-set-priority".equals(cmd)) {
155      if (argv.length != 3) {
156        displayUsage(cmd);
157        return exitCode;
158      }
159      jobid = argv[1];
160      try {
161        jp = JobPriority.valueOf(argv[2]);
162      } catch (IllegalArgumentException iae) {
163        try {
164          jpvalue = Integer.parseInt(argv[2]);
165        } catch (NumberFormatException ne) {
166          LOG.info(ne);
167          displayUsage(cmd);
168          return exitCode;
169        }
170      }
171      setJobPriority = true; 
172    } else if ("-events".equals(cmd)) {
173      if (argv.length != 4) {
174        displayUsage(cmd);
175        return exitCode;
176      }
177      jobid = argv[1];
178      fromEvent = Integer.parseInt(argv[2]);
179      nEvents = Integer.parseInt(argv[3]);
180      listEvents = true;
181    } else if ("-history".equals(cmd)) {
182      viewHistory = true;
183      if (argv.length < 2 || argv.length > 7) {
184        displayUsage(cmd);
185        return exitCode;
186      }
187
188      // Some arguments are optional while others are not, and some require
189      // second arguments.  Due to this, the indexing can vary depending on
190      // what's specified and what's left out, as summarized in the below table:
191      // [all] <jobHistoryFile|jobId> [-outfile <file>] [-format <human|json>]
192      //   1                  2            3       4         5         6
193      //   1                  2            3       4
194      //   1                  2                              3         4
195      //   1                  2
196      //                      1            2       3         4         5
197      //                      1            2       3
198      //                      1                              2         3
199      //                      1
200
201      // "all" is optional, but comes first if specified
202      int index = 1;
203      if ("all".equals(argv[index])) {
204        index++;
205        viewAllHistory = true;
206        if (argv.length == 2) {
207          displayUsage(cmd);
208          return exitCode;
209        }
210      }
211      // Get the job history file or job id argument
212      historyFileOrJobId = argv[index++];
213      // "-outfile" is optional, but if specified requires a second argument
214      if (argv.length > index + 1 && "-outfile".equals(argv[index])) {
215        index++;
216        historyOutFile = argv[index++];
217      }
218      // "-format" is optional, but if specified required a second argument
219      if (argv.length > index + 1 && "-format".equals(argv[index])) {
220        index++;
221        historyOutFormat = argv[index++];
222      }
223      // Check for any extra arguments that don't belong here
224      if (argv.length > index) {
225        displayUsage(cmd);
226        return exitCode;
227      }
228    } else if ("-list".equals(cmd)) {
229      if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
230        displayUsage(cmd);
231        return exitCode;
232      }
233      if (argv.length == 2 && "all".equals(argv[1])) {
234        listAllJobs = true;
235      } else {
236        listJobs = true;
237      }
238    } else if("-kill-task".equals(cmd)) {
239      if (argv.length != 2) {
240        displayUsage(cmd);
241        return exitCode;
242      }
243      killTask = true;
244      taskid = argv[1];
245    } else if("-fail-task".equals(cmd)) {
246      if (argv.length != 2) {
247        displayUsage(cmd);
248        return exitCode;
249      }
250      failTask = true;
251      taskid = argv[1];
252    } else if ("-list-active-trackers".equals(cmd)) {
253      if (argv.length != 1) {
254        displayUsage(cmd);
255        return exitCode;
256      }
257      listActiveTrackers = true;
258    } else if ("-list-blacklisted-trackers".equals(cmd)) {
259      if (argv.length != 1) {
260        displayUsage(cmd);
261        return exitCode;
262      }
263      listBlacklistedTrackers = true;
264    } else if ("-list-attempt-ids".equals(cmd)) {
265      if (argv.length != 4) {
266        displayUsage(cmd);
267        return exitCode;
268      }
269      jobid = argv[1];
270      taskType = argv[2];
271      taskState = argv[3];
272      displayTasks = true;
273      if (!taskTypes.contains(
274          org.apache.hadoop.util.StringUtils.toUpperCase(taskType))) {
275        System.out.println("Error: Invalid task-type: " + taskType);
276        displayUsage(cmd);
277        return exitCode;
278      }
279      if (!taskStates.contains(
280          org.apache.hadoop.util.StringUtils.toLowerCase(taskState))) {
281        System.out.println("Error: Invalid task-state: " + taskState);
282        displayUsage(cmd);
283        return exitCode;
284      }
285    } else if ("-logs".equals(cmd)) {
286      if (argv.length == 2 || argv.length ==3) {
287        logs = true;
288        jobid = argv[1];
289        if (argv.length == 3) {
290          taskid = argv[2];
291        }  else {
292          taskid = null;
293        }
294      } else {
295        displayUsage(cmd);
296        return exitCode;
297      }
298    } else {
299      displayUsage(cmd);
300      return exitCode;
301    }
302
303    // initialize cluster
304    cluster = createCluster();
305        
306    // Submit the request
307    try {
308      if (submitJobFile != null) {
309        Job job = Job.getInstance(new JobConf(submitJobFile));
310        job.submit();
311        System.out.println("Created job " + job.getJobID());
312        exitCode = 0;
313      } else if (getStatus) {
314        Job job = getJob(JobID.forName(jobid));
315        if (job == null) {
316          System.out.println("Could not find job " + jobid);
317        } else {
318          Counters counters = job.getCounters();
319          System.out.println();
320          System.out.println(job);
321          if (counters != null) {
322            System.out.println(counters);
323          } else {
324            System.out.println("Counters not available. Job is retired.");
325          }
326          exitCode = 0;
327        }
328      } else if (getCounter) {
329        Job job = getJob(JobID.forName(jobid));
330        if (job == null) {
331          System.out.println("Could not find job " + jobid);
332        } else {
333          Counters counters = job.getCounters();
334          if (counters == null) {
335            System.out.println("Counters not available for retired job " + 
336            jobid);
337            exitCode = -1;
338          } else {
339            System.out.println(getCounter(counters,
340              counterGroupName, counterName));
341            exitCode = 0;
342          }
343        }
344      } else if (killJob) {
345        Job job = getJob(JobID.forName(jobid));
346        if (job == null) {
347          System.out.println("Could not find job " + jobid);
348        } else {
349          JobStatus jobStatus = job.getStatus();
350          if (jobStatus.getState() == JobStatus.State.FAILED) {
351            System.out.println("Could not mark the job " + jobid
352                + " as killed, as it has already failed.");
353            exitCode = -1;
354          } else if (jobStatus.getState() == JobStatus.State.KILLED) {
355            System.out
356                .println("The job " + jobid + " has already been killed.");
357            exitCode = -1;
358          } else if (jobStatus.getState() == JobStatus.State.SUCCEEDED) {
359            System.out.println("Could not kill the job " + jobid
360                + ", as it has already succeeded.");
361            exitCode = -1;
362          } else {
363            job.killJob();
364            System.out.println("Killed job " + jobid);
365            exitCode = 0;
366          }
367        }
368      } else if (setJobPriority) {
369        Job job = getJob(JobID.forName(jobid));
370        if (job == null) {
371          System.out.println("Could not find job " + jobid);
372        } else {
373          if (jp != null) {
374            job.setPriority(jp);
375          } else {
376            job.setPriorityAsInteger(jpvalue);
377          }
378          System.out.println("Changed job priority.");
379          exitCode = 0;
380        } 
381      } else if (viewHistory) {
382        // If it ends with .jhist, assume it's a jhist file; otherwise, assume
383        // it's a Job ID
384        if (historyFileOrJobId.endsWith(".jhist")) {
385          viewHistory(historyFileOrJobId, viewAllHistory, historyOutFile,
386              historyOutFormat);
387          exitCode = 0;
388        } else {
389          Job job = getJob(JobID.forName(historyFileOrJobId));
390          if (job == null) {
391            System.out.println("Could not find job " + jobid);
392          } else {
393            String historyUrl = job.getHistoryUrl();
394            if (historyUrl == null || historyUrl.isEmpty()) {
395              System.out.println("History file for job " + historyFileOrJobId +
396                  " is currently unavailable.");
397            } else {
398              viewHistory(historyUrl, viewAllHistory, historyOutFile,
399                  historyOutFormat);
400              exitCode = 0;
401            }
402          }
403        }
404      } else if (listEvents) {
405        listEvents(getJob(JobID.forName(jobid)), fromEvent, nEvents);
406        exitCode = 0;
407      } else if (listJobs) {
408        listJobs(cluster);
409        exitCode = 0;
410      } else if (listAllJobs) {
411        listAllJobs(cluster);
412        exitCode = 0;
413      } else if (listActiveTrackers) {
414        listActiveTrackers(cluster);
415        exitCode = 0;
416      } else if (listBlacklistedTrackers) {
417        listBlacklistedTrackers(cluster);
418        exitCode = 0;
419      } else if (displayTasks) {
420        displayTasks(getJob(JobID.forName(jobid)), taskType, taskState);
421        exitCode = 0;
422      } else if(killTask) {
423        TaskAttemptID taskID = TaskAttemptID.forName(taskid);
424        Job job = getJob(taskID.getJobID());
425        if (job == null) {
426          System.out.println("Could not find job " + jobid);
427        } else if (job.killTask(taskID, false)) {
428          System.out.println("Killed task " + taskid);
429          exitCode = 0;
430        } else {
431          System.out.println("Could not kill task " + taskid);
432          exitCode = -1;
433        }
434      } else if(failTask) {
435        TaskAttemptID taskID = TaskAttemptID.forName(taskid);
436        Job job = getJob(taskID.getJobID());
437        if (job == null) {
438            System.out.println("Could not find job " + jobid);
439        } else if(job.killTask(taskID, true)) {
440          System.out.println("Killed task " + taskID + " by failing it");
441          exitCode = 0;
442        } else {
443          System.out.println("Could not fail task " + taskid);
444          exitCode = -1;
445        }
446      } else if (logs) {
447        try {
448        JobID jobID = JobID.forName(jobid);
449        TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
450        LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
451        LogCLIHelpers logDumper = new LogCLIHelpers();
452        logDumper.setConf(getConf());
453        exitCode = logDumper.dumpAContainersLogs(logParams.getApplicationId(),
454            logParams.getContainerId(), logParams.getNodeId(),
455            logParams.getOwner());
456        } catch (IOException e) {
457          if (e instanceof RemoteException) {
458            throw e;
459          } 
460          System.out.println(e.getMessage());
461        }
462      }
463    } catch (RemoteException re) {
464      IOException unwrappedException = re.unwrapRemoteException();
465      if (unwrappedException instanceof AccessControlException) {
466        System.out.println(unwrappedException.getMessage());
467      } else {
468        throw re;
469      }
470    } finally {
471      cluster.close();
472    }
473    return exitCode;
474  }
475
476  Cluster createCluster() throws IOException {
477    return new Cluster(getConf());
478  }
479  
480  private String getJobPriorityNames() {
481    StringBuffer sb = new StringBuffer();
482    for (JobPriority p : JobPriority.values()) {
483      // UNDEFINED_PRIORITY need not to be displayed in usage
484      if (JobPriority.UNDEFINED_PRIORITY == p) {
485        continue;
486      }
487      sb.append(p.name()).append(" ");
488    }
489    return sb.substring(0, sb.length()-1);
490  }
491
492  private String getTaskTypes() {
493    return StringUtils.join(taskTypes, " ");
494  }
495  
496  /**
497   * Display usage of the command-line tool and terminate execution.
498   */
499  private void displayUsage(String cmd) {
500    String prefix = "Usage: job ";
501    String jobPriorityValues = getJobPriorityNames();
502    String taskStates = "pending, running, completed, failed, killed";
503    
504    if ("-submit".equals(cmd)) {
505      System.err.println(prefix + "[" + cmd + " <job-file>]");
506    } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
507      System.err.println(prefix + "[" + cmd + " <job-id>]");
508    } else if ("-counter".equals(cmd)) {
509      System.err.println(prefix + "[" + cmd + 
510        " <job-id> <group-name> <counter-name>]");
511    } else if ("-events".equals(cmd)) {
512      System.err.println(prefix + "[" + cmd + 
513        " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
514    } else if ("-history".equals(cmd)) {
515      System.err.println(prefix + "[" + cmd + " [all] <jobHistoryFile|jobId> " +
516          "[-outfile <file>] [-format <human|json>]]");
517    } else if ("-list".equals(cmd)) {
518      System.err.println(prefix + "[" + cmd + " [all]]");
519    } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
520      System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
521    } else if ("-set-priority".equals(cmd)) {
522      System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
523          "Valid values for priorities are: " 
524          + jobPriorityValues
525          + ". In addition to this, integers also can be used.");
526    } else if ("-list-active-trackers".equals(cmd)) {
527      System.err.println(prefix + "[" + cmd + "]");
528    } else if ("-list-blacklisted-trackers".equals(cmd)) {
529      System.err.println(prefix + "[" + cmd + "]");
530    } else if ("-list-attempt-ids".equals(cmd)) {
531      System.err.println(prefix + "[" + cmd + 
532          " <job-id> <task-type> <task-state>]. " +
533          "Valid values for <task-type> are " + getTaskTypes() + ". " +
534          "Valid values for <task-state> are " + taskStates);
535    } else if ("-logs".equals(cmd)) {
536      System.err.println(prefix + "[" + cmd +
537          " <job-id> <task-attempt-id>]. " +
538          " <task-attempt-id> is optional to get task attempt logs.");      
539    } else {
540      System.err.printf(prefix + "<command> <args>%n");
541      System.err.printf("\t[-submit <job-file>]%n");
542      System.err.printf("\t[-status <job-id>]%n");
543      System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n");
544      System.err.printf("\t[-kill <job-id>]%n");
545      System.err.printf("\t[-set-priority <job-id> <priority>]. " +
546          "Valid values for priorities are: " + jobPriorityValues +
547          ". In addition to this, integers also can be used." + "%n");
548      System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
549      System.err.printf("\t[-history [all] <jobHistoryFile|jobId> " +
550          "[-outfile <file>] [-format <human|json>]]%n");
551      System.err.printf("\t[-list [all]]%n");
552      System.err.printf("\t[-list-active-trackers]%n");
553      System.err.printf("\t[-list-blacklisted-trackers]%n");
554      System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
555        "<task-state>]. " +
556        "Valid values for <task-type> are " + getTaskTypes() + ". " +
557        "Valid values for <task-state> are " + taskStates);
558      System.err.printf("\t[-kill-task <task-attempt-id>]%n");
559      System.err.printf("\t[-fail-task <task-attempt-id>]%n");
560      System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n%n");
561      ToolRunner.printGenericCommandUsage(System.out);
562    }
563  }
564    
565  private void viewHistory(String historyFile, boolean all,
566      String historyOutFile, String format) throws IOException {
567    HistoryViewer historyViewer = new HistoryViewer(historyFile,
568        getConf(), all, format);
569    PrintStream ps = System.out;
570    if (historyOutFile != null) {
571      ps = new PrintStream(new BufferedOutputStream(new FileOutputStream(
572          new File(historyOutFile))), true, "UTF-8");
573    }
574    historyViewer.print(ps);
575  }
576
577  protected long getCounter(Counters counters, String counterGroupName,
578      String counterName) throws IOException {
579    return counters.findCounter(counterGroupName, counterName).getValue();
580  }
581  
582  /**
583   * List the events for the given job
584   * @param jobId the job id for the job's events to list
585   * @throws IOException
586   */
587  private void listEvents(Job job, int fromEventId, int numEvents)
588      throws IOException, InterruptedException {
589    TaskCompletionEvent[] events = job.
590      getTaskCompletionEvents(fromEventId, numEvents);
591    System.out.println("Task completion events for " + job.getJobID());
592    System.out.println("Number of events (from " + fromEventId + ") are: " 
593      + events.length);
594    for(TaskCompletionEvent event: events) {
595      System.out.println(event.getStatus() + " " + 
596        event.getTaskAttemptId() + " " + 
597        getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
598    }
599  }
600
601  protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
602    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
603  }
604
605  @VisibleForTesting
606  Job getJob(JobID jobid) throws IOException, InterruptedException {
607
608    int maxRetry = getConf().getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
609        MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
610    long retryInterval = getConf()
611        .getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
612            MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
613    Job job = cluster.getJob(jobid);
614
615    for (int i = 0; i < maxRetry; ++i) {
616      if (job != null) {
617        return job;
618      }
619      LOG.info("Could not obtain job info after " + String.valueOf(i + 1)
620          + " attempt(s). Sleeping for " + String.valueOf(retryInterval / 1000)
621          + " seconds and retrying.");
622      Thread.sleep(retryInterval);
623      job = cluster.getJob(jobid);
624    }
625    return job;
626  }
627  
628
629  /**
630   * Dump a list of currently running jobs
631   * @throws IOException
632   */
633  private void listJobs(Cluster cluster) 
634      throws IOException, InterruptedException {
635    List<JobStatus> runningJobs = new ArrayList<JobStatus>();
636    for (JobStatus job : cluster.getAllJobStatuses()) {
637      if (!job.isJobComplete()) {
638        runningJobs.add(job);
639      }
640    }
641    displayJobList(runningJobs.toArray(new JobStatus[0]));
642  }
643    
644  /**
645   * Dump a list of all jobs submitted.
646   * @throws IOException
647   */
648  private void listAllJobs(Cluster cluster) 
649      throws IOException, InterruptedException {
650    displayJobList(cluster.getAllJobStatuses());
651  }
652  
653  /**
654   * Display the list of active trackers
655   */
656  private void listActiveTrackers(Cluster cluster) 
657      throws IOException, InterruptedException {
658    TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
659    for (TaskTrackerInfo tracker : trackers) {
660      System.out.println(tracker.getTaskTrackerName());
661    }
662  }
663
664  /**
665   * Display the list of blacklisted trackers
666   */
667  private void listBlacklistedTrackers(Cluster cluster) 
668      throws IOException, InterruptedException {
669    TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
670    if (trackers.length > 0) {
671      System.out.println("BlackListedNode \t Reason");
672    }
673    for (TaskTrackerInfo tracker : trackers) {
674      System.out.println(tracker.getTaskTrackerName() + "\t" + 
675        tracker.getReasonForBlacklist());
676    }
677  }
678
679  private void printTaskAttempts(TaskReport report) {
680    if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
681      System.out.println(report.getSuccessfulTaskAttemptId());
682    } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
683      for (TaskAttemptID t : 
684        report.getRunningTaskAttemptIds()) {
685        System.out.println(t);
686      }
687    }
688  }
689
690  /**
691   * Display the information about a job's tasks, of a particular type and
692   * in a particular state
693   * 
694   * @param job the job
695   * @param type the type of the task (map/reduce/setup/cleanup)
696   * @param state the state of the task 
697   * (pending/running/completed/failed/killed)
698   * @throws IOException when there is an error communicating with the master
699   * @throws InterruptedException
700   * @throws IllegalArgumentException if an invalid type/state is passed
701   */
702  protected void displayTasks(Job job, String type, String state) 
703  throws IOException, InterruptedException {
704          
705    TaskReport[] reports=null;
706    reports = job.getTaskReports(TaskType.valueOf(
707        org.apache.hadoop.util.StringUtils.toUpperCase(type)));
708    for (TaskReport report : reports) {
709      TIPStatus status = report.getCurrentStatus();
710      if ((state.equalsIgnoreCase("pending") && status ==TIPStatus.PENDING) ||
711          (state.equalsIgnoreCase("running") && status ==TIPStatus.RUNNING) ||
712          (state.equalsIgnoreCase("completed") && status == TIPStatus.COMPLETE) ||
713          (state.equalsIgnoreCase("failed") && status == TIPStatus.FAILED) ||
714          (state.equalsIgnoreCase("killed") && status == TIPStatus.KILLED)) {
715        printTaskAttempts(report);
716      }
717    }
718  }
719
720  public void displayJobList(JobStatus[] jobs) 
721      throws IOException, InterruptedException {
722    displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out,
723        Charsets.UTF_8)));
724  }
725
726  @Private
727  public static String headerPattern = "%23s\t%20s\t%10s\t%14s\t%12s\t%12s" +
728      "\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
729  @Private
730  public static String dataPattern   = "%23s\t%20s\t%10s\t%14d\t%12s\t%12s" +
731      "\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
732  private static String memPattern   = "%dM";
733  private static String UNAVAILABLE  = "N/A";
734
735  @Private
736  public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
737    writer.println("Total jobs:" + jobs.length);
738    writer.printf(headerPattern, "JobId", "JobName", "State", "StartTime",
739      "UserName", "Queue", "Priority", "UsedContainers",
740      "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
741    for (JobStatus job : jobs) {
742      int numUsedSlots = job.getNumUsedSlots();
743      int numReservedSlots = job.getNumReservedSlots();
744      int usedMem = job.getUsedMem();
745      int rsvdMem = job.getReservedMem();
746      int neededMem = job.getNeededMem();
747      int jobNameLength = job.getJobName().length();
748      writer.printf(dataPattern, job.getJobID().toString(),
749          job.getJobName().substring(0, jobNameLength > 20 ? 20 : jobNameLength),
750          job.getState(), job.getStartTime(), job.getUsername(),
751          job.getQueue(), job.getPriority().name(),
752          numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
753          numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
754          usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
755          rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
756          neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
757          job.getSchedulingInfo());
758    }
759    writer.flush();
760  }
761  
762  public static void main(String[] argv) throws Exception {
763    int res = ToolRunner.run(new CLI(), argv);
764    ExitUtil.terminate(res);
765  }
766}