001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.mapreduce.tools;
019
020import java.io.BufferedOutputStream;
021import java.io.File;
022import java.io.FileOutputStream;
023import java.io.IOException;
024import java.io.OutputStreamWriter;
025import java.io.PrintStream;
026import java.io.PrintWriter;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Set;
030import java.util.HashSet;
031import java.util.Arrays;
032
033import com.google.common.annotations.VisibleForTesting;
034import org.apache.commons.lang.StringUtils;
035import org.apache.commons.logging.Log;
036import org.apache.commons.logging.LogFactory;
037import org.apache.hadoop.classification.InterfaceAudience;
038import org.apache.hadoop.classification.InterfaceStability;
039import org.apache.hadoop.classification.InterfaceAudience.Private;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.conf.Configured;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.ipc.RemoteException;
045import org.apache.hadoop.mapred.JobConf;
046import org.apache.hadoop.mapred.TIPStatus;
047import org.apache.hadoop.mapreduce.Cluster;
048import org.apache.hadoop.mapreduce.Counters;
049import org.apache.hadoop.mapreduce.Job;
050import org.apache.hadoop.mapreduce.JobID;
051import org.apache.hadoop.mapreduce.JobPriority;
052import org.apache.hadoop.mapreduce.JobStatus;
053import org.apache.hadoop.mapreduce.MRJobConfig;
054import org.apache.hadoop.mapreduce.TaskAttemptID;
055import org.apache.hadoop.mapreduce.TaskCompletionEvent;
056import org.apache.hadoop.mapreduce.TaskReport;
057import org.apache.hadoop.mapreduce.TaskTrackerInfo;
058import org.apache.hadoop.mapreduce.TaskType;
059import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
060import org.apache.hadoop.mapreduce.v2.LogParams;
061import org.apache.hadoop.security.AccessControlException;
062import org.apache.hadoop.util.ExitUtil;
063import org.apache.hadoop.util.Tool;
064import org.apache.hadoop.util.ToolRunner;
065import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
066
067import com.google.common.base.Charsets;
068
069/**
070 * Interprets the map reduce cli options 
071 */
072@InterfaceAudience.Public
073@InterfaceStability.Stable
074public class CLI extends Configured implements Tool {
075  private static final Log LOG = LogFactory.getLog(CLI.class);
076  protected Cluster cluster;
077  private final Set<String> taskStates = new HashSet<String>(
078              Arrays.asList("pending", "running", "completed", "failed", "killed"));
079  private static final Set<String> taskTypes = new HashSet<String>(
080      Arrays.asList("MAP", "REDUCE"));
081  
082  public CLI() {
083  }
084  
085  public CLI(Configuration conf) {
086    setConf(conf);
087  }
088  
089  public int run(String[] argv) throws Exception {
090    int exitCode = -1;
091    if (argv.length < 1) {
092      displayUsage("");
093      return exitCode;
094    }    
095    // process arguments
096    String cmd = argv[0];
097    String submitJobFile = null;
098    String jobid = null;
099    String taskid = null;
100    String historyFileOrJobId = null;
101    String historyOutFile = null;
102    String historyOutFormat = HistoryViewer.HUMAN_FORMAT;
103    String counterGroupName = null;
104    String counterName = null;
105    JobPriority jp = null;
106    String taskType = null;
107    String taskState = null;
108    int fromEvent = 0;
109    int nEvents = 0;
110    int jpvalue = 0;
111    String configOutFile = null;
112    boolean getStatus = false;
113    boolean getCounter = false;
114    boolean killJob = false;
115    boolean listEvents = false;
116    boolean viewHistory = false;
117    boolean viewAllHistory = false;
118    boolean listJobs = false;
119    boolean listAllJobs = false;
120    boolean listActiveTrackers = false;
121    boolean listBlacklistedTrackers = false;
122    boolean displayTasks = false;
123    boolean killTask = false;
124    boolean failTask = false;
125    boolean setJobPriority = false;
126    boolean logs = false;
127    boolean downloadConfig = false;
128
129    if ("-submit".equals(cmd)) {
130      if (argv.length != 2) {
131        displayUsage(cmd);
132        return exitCode;
133      }
134      submitJobFile = argv[1];
135    } else if ("-status".equals(cmd)) {
136      if (argv.length != 2) {
137        displayUsage(cmd);
138        return exitCode;
139      }
140      jobid = argv[1];
141      getStatus = true;
142    } else if("-counter".equals(cmd)) {
143      if (argv.length != 4) {
144        displayUsage(cmd);
145        return exitCode;
146      }
147      getCounter = true;
148      jobid = argv[1];
149      counterGroupName = argv[2];
150      counterName = argv[3];
151    } else if ("-kill".equals(cmd)) {
152      if (argv.length != 2) {
153        displayUsage(cmd);
154        return exitCode;
155      }
156      jobid = argv[1];
157      killJob = true;
158    } else if ("-set-priority".equals(cmd)) {
159      if (argv.length != 3) {
160        displayUsage(cmd);
161        return exitCode;
162      }
163      jobid = argv[1];
164      try {
165        jp = JobPriority.valueOf(argv[2]);
166      } catch (IllegalArgumentException iae) {
167        try {
168          jpvalue = Integer.parseInt(argv[2]);
169        } catch (NumberFormatException ne) {
170          LOG.info(ne);
171          displayUsage(cmd);
172          return exitCode;
173        }
174      }
175      setJobPriority = true; 
176    } else if ("-events".equals(cmd)) {
177      if (argv.length != 4) {
178        displayUsage(cmd);
179        return exitCode;
180      }
181      jobid = argv[1];
182      fromEvent = Integer.parseInt(argv[2]);
183      nEvents = Integer.parseInt(argv[3]);
184      listEvents = true;
185    } else if ("-history".equals(cmd)) {
186      viewHistory = true;
187      if (argv.length < 2 || argv.length > 7) {
188        displayUsage(cmd);
189        return exitCode;
190      }
191
192      // Some arguments are optional while others are not, and some require
193      // second arguments.  Due to this, the indexing can vary depending on
194      // what's specified and what's left out, as summarized in the below table:
195      // [all] <jobHistoryFile|jobId> [-outfile <file>] [-format <human|json>]
196      //   1                  2            3       4         5         6
197      //   1                  2            3       4
198      //   1                  2                              3         4
199      //   1                  2
200      //                      1            2       3         4         5
201      //                      1            2       3
202      //                      1                              2         3
203      //                      1
204
205      // "all" is optional, but comes first if specified
206      int index = 1;
207      if ("all".equals(argv[index])) {
208        index++;
209        viewAllHistory = true;
210        if (argv.length == 2) {
211          displayUsage(cmd);
212          return exitCode;
213        }
214      }
215      // Get the job history file or job id argument
216      historyFileOrJobId = argv[index++];
217      // "-outfile" is optional, but if specified requires a second argument
218      if (argv.length > index + 1 && "-outfile".equals(argv[index])) {
219        index++;
220        historyOutFile = argv[index++];
221      }
222      // "-format" is optional, but if specified required a second argument
223      if (argv.length > index + 1 && "-format".equals(argv[index])) {
224        index++;
225        historyOutFormat = argv[index++];
226      }
227      // Check for any extra arguments that don't belong here
228      if (argv.length > index) {
229        displayUsage(cmd);
230        return exitCode;
231      }
232    } else if ("-list".equals(cmd)) {
233      if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
234        displayUsage(cmd);
235        return exitCode;
236      }
237      if (argv.length == 2 && "all".equals(argv[1])) {
238        listAllJobs = true;
239      } else {
240        listJobs = true;
241      }
242    } else if("-kill-task".equals(cmd)) {
243      if (argv.length != 2) {
244        displayUsage(cmd);
245        return exitCode;
246      }
247      killTask = true;
248      taskid = argv[1];
249    } else if("-fail-task".equals(cmd)) {
250      if (argv.length != 2) {
251        displayUsage(cmd);
252        return exitCode;
253      }
254      failTask = true;
255      taskid = argv[1];
256    } else if ("-list-active-trackers".equals(cmd)) {
257      if (argv.length != 1) {
258        displayUsage(cmd);
259        return exitCode;
260      }
261      listActiveTrackers = true;
262    } else if ("-list-blacklisted-trackers".equals(cmd)) {
263      if (argv.length != 1) {
264        displayUsage(cmd);
265        return exitCode;
266      }
267      listBlacklistedTrackers = true;
268    } else if ("-list-attempt-ids".equals(cmd)) {
269      if (argv.length != 4) {
270        displayUsage(cmd);
271        return exitCode;
272      }
273      jobid = argv[1];
274      taskType = argv[2];
275      taskState = argv[3];
276      displayTasks = true;
277      if (!taskTypes.contains(
278          org.apache.hadoop.util.StringUtils.toUpperCase(taskType))) {
279        System.out.println("Error: Invalid task-type: " + taskType);
280        displayUsage(cmd);
281        return exitCode;
282      }
283      if (!taskStates.contains(
284          org.apache.hadoop.util.StringUtils.toLowerCase(taskState))) {
285        System.out.println("Error: Invalid task-state: " + taskState);
286        displayUsage(cmd);
287        return exitCode;
288      }
289    } else if ("-logs".equals(cmd)) {
290      if (argv.length == 2 || argv.length ==3) {
291        logs = true;
292        jobid = argv[1];
293        if (argv.length == 3) {
294          taskid = argv[2];
295        }  else {
296          taskid = null;
297        }
298      } else {
299        displayUsage(cmd);
300        return exitCode;
301      }
302    } else if ("-config".equals(cmd)) {
303      downloadConfig = true;
304      if (argv.length != 3) {
305        displayUsage(cmd);
306        return exitCode;
307      }
308      jobid = argv[1];
309      configOutFile = argv[2];
310    } else {
311      displayUsage(cmd);
312      return exitCode;
313    }
314
315    // initialize cluster
316    cluster = createCluster();
317        
318    // Submit the request
319    try {
320      if (submitJobFile != null) {
321        Job job = Job.getInstance(new JobConf(submitJobFile));
322        job.submit();
323        System.out.println("Created job " + job.getJobID());
324        exitCode = 0;
325      } else if (getStatus) {
326        Job job = getJob(JobID.forName(jobid));
327        if (job == null) {
328          System.out.println("Could not find job " + jobid);
329        } else {
330          Counters counters = job.getCounters();
331          System.out.println();
332          System.out.println(job);
333          if (counters != null) {
334            System.out.println(counters);
335          } else {
336            System.out.println("Counters not available. Job is retired.");
337          }
338          exitCode = 0;
339        }
340      } else if (getCounter) {
341        Job job = getJob(JobID.forName(jobid));
342        if (job == null) {
343          System.out.println("Could not find job " + jobid);
344        } else {
345          Counters counters = job.getCounters();
346          if (counters == null) {
347            System.out.println("Counters not available for retired job " + 
348            jobid);
349            exitCode = -1;
350          } else {
351            System.out.println(getCounter(counters,
352              counterGroupName, counterName));
353            exitCode = 0;
354          }
355        }
356      } else if (killJob) {
357        Job job = getJob(JobID.forName(jobid));
358        if (job == null) {
359          System.out.println("Could not find job " + jobid);
360        } else {
361          JobStatus jobStatus = job.getStatus();
362          if (jobStatus.getState() == JobStatus.State.FAILED) {
363            System.out.println("Could not mark the job " + jobid
364                + " as killed, as it has already failed.");
365            exitCode = -1;
366          } else if (jobStatus.getState() == JobStatus.State.KILLED) {
367            System.out
368                .println("The job " + jobid + " has already been killed.");
369            exitCode = -1;
370          } else if (jobStatus.getState() == JobStatus.State.SUCCEEDED) {
371            System.out.println("Could not kill the job " + jobid
372                + ", as it has already succeeded.");
373            exitCode = -1;
374          } else {
375            job.killJob();
376            System.out.println("Killed job " + jobid);
377            exitCode = 0;
378          }
379        }
380      } else if (setJobPriority) {
381        Job job = getJob(JobID.forName(jobid));
382        if (job == null) {
383          System.out.println("Could not find job " + jobid);
384        } else {
385          if (jp != null) {
386            job.setPriority(jp);
387          } else {
388            job.setPriorityAsInteger(jpvalue);
389          }
390          System.out.println("Changed job priority.");
391          exitCode = 0;
392        } 
393      } else if (viewHistory) {
394        // If it ends with .jhist, assume it's a jhist file; otherwise, assume
395        // it's a Job ID
396        if (historyFileOrJobId.endsWith(".jhist")) {
397          viewHistory(historyFileOrJobId, viewAllHistory, historyOutFile,
398              historyOutFormat);
399          exitCode = 0;
400        } else {
401          Job job = getJob(JobID.forName(historyFileOrJobId));
402          if (job == null) {
403            System.out.println("Could not find job " + jobid);
404          } else {
405            String historyUrl = job.getHistoryUrl();
406            if (historyUrl == null || historyUrl.isEmpty()) {
407              System.out.println("History file for job " + historyFileOrJobId +
408                  " is currently unavailable.");
409            } else {
410              viewHistory(historyUrl, viewAllHistory, historyOutFile,
411                  historyOutFormat);
412              exitCode = 0;
413            }
414          }
415        }
416      } else if (listEvents) {
417        Job job = getJob(JobID.forName(jobid));
418        if (job == null) {
419          System.out.println("Could not find job " + jobid);
420        } else {
421          listEvents(job, fromEvent, nEvents);
422          exitCode = 0;
423        }
424      } else if (listJobs) {
425        listJobs(cluster);
426        exitCode = 0;
427      } else if (listAllJobs) {
428        listAllJobs(cluster);
429        exitCode = 0;
430      } else if (listActiveTrackers) {
431        listActiveTrackers(cluster);
432        exitCode = 0;
433      } else if (listBlacklistedTrackers) {
434        listBlacklistedTrackers(cluster);
435        exitCode = 0;
436      } else if (displayTasks) {
437        Job job = getJob(JobID.forName(jobid));
438        if (job == null) {
439          System.out.println("Could not find job " + jobid);
440        } else {
441          displayTasks(getJob(JobID.forName(jobid)), taskType, taskState);
442          exitCode = 0;
443        }
444      } else if(killTask) {
445        TaskAttemptID taskID = TaskAttemptID.forName(taskid);
446        Job job = getJob(taskID.getJobID());
447        if (job == null) {
448          System.out.println("Could not find job " + jobid);
449        } else if (job.killTask(taskID, false)) {
450          System.out.println("Killed task " + taskid);
451          exitCode = 0;
452        } else {
453          System.out.println("Could not kill task " + taskid);
454          exitCode = -1;
455        }
456      } else if(failTask) {
457        TaskAttemptID taskID = TaskAttemptID.forName(taskid);
458        Job job = getJob(taskID.getJobID());
459        if (job == null) {
460            System.out.println("Could not find job " + jobid);
461        } else if(job.killTask(taskID, true)) {
462          System.out.println("Killed task " + taskID + " by failing it");
463          exitCode = 0;
464        } else {
465          System.out.println("Could not fail task " + taskid);
466          exitCode = -1;
467        }
468      } else if (logs) {
469        JobID jobID = JobID.forName(jobid);
470        if (getJob(jobID) == null) {
471          System.out.println("Could not find job " + jobid);
472        } else {
473          try {
474            TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
475            LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
476            LogCLIHelpers logDumper = new LogCLIHelpers();
477            logDumper.setConf(getConf());
478            exitCode = logDumper.dumpAContainersLogs(
479                    logParams.getApplicationId(), logParams.getContainerId(),
480                    logParams.getNodeId(), logParams.getOwner());
481          } catch (IOException e) {
482            if (e instanceof RemoteException) {
483              throw e;
484            }
485            System.out.println(e.getMessage());
486          }
487        }
488      } else if (downloadConfig) {
489        Job job = getJob(JobID.forName(jobid));
490        if (job == null) {
491          System.out.println("Could not find job " + jobid);
492        } else {
493          String jobFile = job.getJobFile();
494          if (jobFile == null || jobFile.isEmpty()) {
495            System.out.println("Config file for job " + jobFile +
496                " could not be found.");
497          } else {
498            Path configPath = new Path(jobFile);
499            FileSystem fs = FileSystem.get(getConf());
500            fs.copyToLocalFile(configPath, new Path(configOutFile));
501            exitCode = 0;
502          }
503        }
504      }
505    } catch (RemoteException re) {
506      IOException unwrappedException = re.unwrapRemoteException();
507      if (unwrappedException instanceof AccessControlException) {
508        System.out.println(unwrappedException.getMessage());
509      } else {
510        throw re;
511      }
512    } finally {
513      cluster.close();
514    }
515    return exitCode;
516  }
517
518  Cluster createCluster() throws IOException {
519    return new Cluster(getConf());
520  }
521  
522  private String getJobPriorityNames() {
523    StringBuffer sb = new StringBuffer();
524    for (JobPriority p : JobPriority.values()) {
525      // UNDEFINED_PRIORITY need not to be displayed in usage
526      if (JobPriority.UNDEFINED_PRIORITY == p) {
527        continue;
528      }
529      sb.append(p.name()).append(" ");
530    }
531    return sb.substring(0, sb.length()-1);
532  }
533
534  private String getTaskTypes() {
535    return StringUtils.join(taskTypes, " ");
536  }
537  
538  /**
539   * Display usage of the command-line tool and terminate execution.
540   */
541  private void displayUsage(String cmd) {
542    String prefix = "Usage: job ";
543    String jobPriorityValues = getJobPriorityNames();
544    String taskStates = "pending, running, completed, failed, killed";
545    
546    if ("-submit".equals(cmd)) {
547      System.err.println(prefix + "[" + cmd + " <job-file>]");
548    } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
549      System.err.println(prefix + "[" + cmd + " <job-id>]");
550    } else if ("-counter".equals(cmd)) {
551      System.err.println(prefix + "[" + cmd + 
552        " <job-id> <group-name> <counter-name>]");
553    } else if ("-events".equals(cmd)) {
554      System.err.println(prefix + "[" + cmd + 
555        " <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
556    } else if ("-history".equals(cmd)) {
557      System.err.println(prefix + "[" + cmd + " [all] <jobHistoryFile|jobId> " +
558          "[-outfile <file>] [-format <human|json>]]");
559    } else if ("-list".equals(cmd)) {
560      System.err.println(prefix + "[" + cmd + " [all]]");
561    } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
562      System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
563    } else if ("-set-priority".equals(cmd)) {
564      System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
565          "Valid values for priorities are: " 
566          + jobPriorityValues
567          + ". In addition to this, integers also can be used.");
568    } else if ("-list-active-trackers".equals(cmd)) {
569      System.err.println(prefix + "[" + cmd + "]");
570    } else if ("-list-blacklisted-trackers".equals(cmd)) {
571      System.err.println(prefix + "[" + cmd + "]");
572    } else if ("-list-attempt-ids".equals(cmd)) {
573      System.err.println(prefix + "[" + cmd + 
574          " <job-id> <task-type> <task-state>]. " +
575          "Valid values for <task-type> are " + getTaskTypes() + ". " +
576          "Valid values for <task-state> are " + taskStates);
577    } else if ("-logs".equals(cmd)) {
578      System.err.println(prefix + "[" + cmd +
579          " <job-id> <task-attempt-id>]. " +
580          " <task-attempt-id> is optional to get task attempt logs.");
581    } else if ("-config".equals(cmd)) {
582      System.err.println(prefix + "[" + cmd + " <job-id> <file>]");
583    } else {
584      System.err.printf(prefix + "<command> <args>%n");
585      System.err.printf("\t[-submit <job-file>]%n");
586      System.err.printf("\t[-status <job-id>]%n");
587      System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n");
588      System.err.printf("\t[-kill <job-id>]%n");
589      System.err.printf("\t[-set-priority <job-id> <priority>]. " +
590          "Valid values for priorities are: " + jobPriorityValues +
591          ". In addition to this, integers also can be used." + "%n");
592      System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
593      System.err.printf("\t[-history [all] <jobHistoryFile|jobId> " +
594          "[-outfile <file>] [-format <human|json>]]%n");
595      System.err.printf("\t[-list [all]]%n");
596      System.err.printf("\t[-list-active-trackers]%n");
597      System.err.printf("\t[-list-blacklisted-trackers]%n");
598      System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
599        "<task-state>]. " +
600        "Valid values for <task-type> are " + getTaskTypes() + ". " +
601        "Valid values for <task-state> are " + taskStates);
602      System.err.printf("\t[-kill-task <task-attempt-id>]%n");
603      System.err.printf("\t[-fail-task <task-attempt-id>]%n");
604      System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n");
605      System.err.printf("\t[-config <job-id> <file>%n%n");
606      ToolRunner.printGenericCommandUsage(System.out);
607    }
608  }
609    
610  private void viewHistory(String historyFile, boolean all,
611      String historyOutFile, String format) throws IOException {
612    HistoryViewer historyViewer = new HistoryViewer(historyFile,
613        getConf(), all, format);
614    PrintStream ps = System.out;
615    if (historyOutFile != null) {
616      ps = new PrintStream(new BufferedOutputStream(new FileOutputStream(
617          new File(historyOutFile))), true, "UTF-8");
618    }
619    historyViewer.print(ps);
620  }
621
622  protected long getCounter(Counters counters, String counterGroupName,
623      String counterName) throws IOException {
624    return counters.findCounter(counterGroupName, counterName).getValue();
625  }
626  
627  /**
628   * List the events for the given job
629   * @param jobId the job id for the job's events to list
630   * @throws IOException
631   */
632  private void listEvents(Job job, int fromEventId, int numEvents)
633      throws IOException, InterruptedException {
634    TaskCompletionEvent[] events = job.
635      getTaskCompletionEvents(fromEventId, numEvents);
636    System.out.println("Task completion events for " + job.getJobID());
637    System.out.println("Number of events (from " + fromEventId + ") are: " 
638      + events.length);
639    for(TaskCompletionEvent event: events) {
640      System.out.println(event.getStatus() + " " + 
641        event.getTaskAttemptId() + " " + 
642        getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
643    }
644  }
645
646  protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
647    return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); 
648  }
649
650  @VisibleForTesting
651  Job getJob(JobID jobid) throws IOException, InterruptedException {
652
653    int maxRetry = getConf().getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
654        MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
655    long retryInterval = getConf()
656        .getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
657            MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
658    Job job = cluster.getJob(jobid);
659
660    for (int i = 0; i < maxRetry; ++i) {
661      if (job != null) {
662        return job;
663      }
664      LOG.info("Could not obtain job info after " + String.valueOf(i + 1)
665          + " attempt(s). Sleeping for " + String.valueOf(retryInterval / 1000)
666          + " seconds and retrying.");
667      Thread.sleep(retryInterval);
668      job = cluster.getJob(jobid);
669    }
670    return job;
671  }
672  
673
674  /**
675   * Dump a list of currently running jobs
676   * @throws IOException
677   */
678  private void listJobs(Cluster cluster) 
679      throws IOException, InterruptedException {
680    List<JobStatus> runningJobs = new ArrayList<JobStatus>();
681    for (JobStatus job : cluster.getAllJobStatuses()) {
682      if (!job.isJobComplete()) {
683        runningJobs.add(job);
684      }
685    }
686    displayJobList(runningJobs.toArray(new JobStatus[0]));
687  }
688    
689  /**
690   * Dump a list of all jobs submitted.
691   * @throws IOException
692   */
693  private void listAllJobs(Cluster cluster) 
694      throws IOException, InterruptedException {
695    displayJobList(cluster.getAllJobStatuses());
696  }
697  
698  /**
699   * Display the list of active trackers
700   */
701  private void listActiveTrackers(Cluster cluster) 
702      throws IOException, InterruptedException {
703    TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
704    for (TaskTrackerInfo tracker : trackers) {
705      System.out.println(tracker.getTaskTrackerName());
706    }
707  }
708
709  /**
710   * Display the list of blacklisted trackers
711   */
712  private void listBlacklistedTrackers(Cluster cluster) 
713      throws IOException, InterruptedException {
714    TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
715    if (trackers.length > 0) {
716      System.out.println("BlackListedNode \t Reason");
717    }
718    for (TaskTrackerInfo tracker : trackers) {
719      System.out.println(tracker.getTaskTrackerName() + "\t" + 
720        tracker.getReasonForBlacklist());
721    }
722  }
723
724  private void printTaskAttempts(TaskReport report) {
725    if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
726      System.out.println(report.getSuccessfulTaskAttemptId());
727    } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
728      for (TaskAttemptID t : 
729        report.getRunningTaskAttemptIds()) {
730        System.out.println(t);
731      }
732    }
733  }
734
735  /**
736   * Display the information about a job's tasks, of a particular type and
737   * in a particular state
738   * 
739   * @param job the job
740   * @param type the type of the task (map/reduce/setup/cleanup)
741   * @param state the state of the task 
742   * (pending/running/completed/failed/killed)
743   * @throws IOException when there is an error communicating with the master
744   * @throws InterruptedException
745   * @throws IllegalArgumentException if an invalid type/state is passed
746   */
747  protected void displayTasks(Job job, String type, String state) 
748  throws IOException, InterruptedException {
749          
750    TaskReport[] reports=null;
751    reports = job.getTaskReports(TaskType.valueOf(
752        org.apache.hadoop.util.StringUtils.toUpperCase(type)));
753    for (TaskReport report : reports) {
754      TIPStatus status = report.getCurrentStatus();
755      if ((state.equalsIgnoreCase("pending") && status ==TIPStatus.PENDING) ||
756          (state.equalsIgnoreCase("running") && status ==TIPStatus.RUNNING) ||
757          (state.equalsIgnoreCase("completed") && status == TIPStatus.COMPLETE) ||
758          (state.equalsIgnoreCase("failed") && status == TIPStatus.FAILED) ||
759          (state.equalsIgnoreCase("killed") && status == TIPStatus.KILLED)) {
760        printTaskAttempts(report);
761      }
762    }
763  }
764
765  public void displayJobList(JobStatus[] jobs) 
766      throws IOException, InterruptedException {
767    displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out,
768        Charsets.UTF_8)));
769  }
770
771  @Private
772  public static String headerPattern = "%23s\t%20s\t%10s\t%14s\t%12s\t%12s" +
773      "\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
774  @Private
775  public static String dataPattern   = "%23s\t%20s\t%10s\t%14d\t%12s\t%12s" +
776      "\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
777  private static String memPattern   = "%dM";
778  private static String UNAVAILABLE  = "N/A";
779
780  @Private
781  public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
782    writer.println("Total jobs:" + jobs.length);
783    writer.printf(headerPattern, "JobId", "JobName", "State", "StartTime",
784      "UserName", "Queue", "Priority", "UsedContainers",
785      "RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
786    for (JobStatus job : jobs) {
787      int numUsedSlots = job.getNumUsedSlots();
788      int numReservedSlots = job.getNumReservedSlots();
789      long usedMem = job.getUsedMem();
790      long rsvdMem = job.getReservedMem();
791      long neededMem = job.getNeededMem();
792      int jobNameLength = job.getJobName().length();
793      writer.printf(dataPattern, job.getJobID().toString(),
794          job.getJobName().substring(0, jobNameLength > 20 ? 20 : jobNameLength),
795          job.getState(), job.getStartTime(), job.getUsername(),
796          job.getQueue(), job.getPriority().name(),
797          numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
798          numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
799          usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
800          rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
801          neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
802          job.getSchedulingInfo());
803    }
804    writer.flush();
805  }
806  
807  public static void main(String[] argv) throws Exception {
808    int res = ToolRunner.run(new CLI(), argv);
809    ExitUtil.terminate(res);
810  }
811}