001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.applications.distributedshell;
020
021import java.io.IOException;
022import java.nio.ByteBuffer;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Vector;
028
029import org.apache.commons.cli.CommandLine;
030import org.apache.commons.cli.GnuParser;
031import org.apache.commons.cli.HelpFormatter;
032import org.apache.commons.cli.Option;
033import org.apache.commons.cli.Options;
034import org.apache.commons.cli.ParseException;
035import org.apache.commons.io.IOUtils;
036import org.apache.commons.lang.StringUtils;
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.apache.hadoop.classification.InterfaceAudience;
040import org.apache.hadoop.classification.InterfaceStability;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FSDataOutputStream;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.permission.FsPermission;
047import org.apache.hadoop.io.DataOutputBuffer;
048import org.apache.hadoop.security.Credentials;
049import org.apache.hadoop.security.UserGroupInformation;
050import org.apache.hadoop.security.token.Token;
051import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
052import org.apache.hadoop.yarn.api.ApplicationConstants;
053import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
054import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
055import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
056import org.apache.hadoop.yarn.api.records.ApplicationId;
057import org.apache.hadoop.yarn.api.records.ApplicationReport;
058import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
059import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
060import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
061import org.apache.hadoop.yarn.api.records.LocalResource;
062import org.apache.hadoop.yarn.api.records.LocalResourceType;
063import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
064import org.apache.hadoop.yarn.api.records.NodeReport;
065import org.apache.hadoop.yarn.api.records.NodeState;
066import org.apache.hadoop.yarn.api.records.Priority;
067import org.apache.hadoop.yarn.api.records.QueueACL;
068import org.apache.hadoop.yarn.api.records.QueueInfo;
069import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
070import org.apache.hadoop.yarn.api.records.Resource;
071import org.apache.hadoop.yarn.api.records.URL;
072import org.apache.hadoop.yarn.api.records.YarnApplicationState;
073import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
074import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
075import org.apache.hadoop.yarn.client.api.TimelineClient;
076import org.apache.hadoop.yarn.client.api.YarnClient;
077import org.apache.hadoop.yarn.client.api.YarnClientApplication;
078import org.apache.hadoop.yarn.client.util.YarnClientUtils;
079import org.apache.hadoop.yarn.conf.YarnConfiguration;
080import org.apache.hadoop.yarn.exceptions.YarnException;
081import org.apache.hadoop.yarn.util.ConverterUtils;
082import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
083
084/**
085 * Client for Distributed Shell application submission to YARN.
086 * 
087 * <p> The distributed shell client allows an application master to be launched that in turn would run 
088 * the provided shell command on a set of containers. </p>
089 * 
090 * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
091 * 
092 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 
093 * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} 
094 * provides a way for the client to get access to cluster information and to request for a
095 * new {@link ApplicationId}. <p>
096 * 
097 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 
098 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 
099 * and application name, the priority assigned to the application and the queue
100 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
101 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 
102 * the {@link ApplicationMaster} is launched. </p>
103 * 
104 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 
105 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 
106 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 
107 * {@link ApplicationMaster}. <p>
108 * 
109 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 
110 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 
111 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 
112 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
113 *
114 */
115@InterfaceAudience.Public
116@InterfaceStability.Unstable
117public class Client {
118
119  private static final Log LOG = LogFactory.getLog(Client.class);
120  
121  // Configuration
122  private Configuration conf;
123  private YarnClient yarnClient;
124  // Application master specific info to register a new Application with RM/ASM
125  private String appName = "";
126  // App master priority
127  private int amPriority = 0;
128  // Queue for App master
129  private String amQueue = "";
130  // Amt. of memory resource to request for to run the App Master
131  private long amMemory = 10;
132  // Amt. of virtual core resource to request for to run the App Master
133  private int amVCores = 1;
134
135  // Application master jar file
136  private String appMasterJar = ""; 
137  // Main class to invoke application master
138  private final String appMasterMainClass;
139
140  // Shell command to be executed 
141  private String shellCommand = ""; 
142  // Location of shell script 
143  private String shellScriptPath = ""; 
144  // Args to be passed to the shell command
145  private String[] shellArgs = new String[] {};
146  // Env variables to be setup for the shell command 
147  private Map<String, String> shellEnv = new HashMap<String, String>();
148  // Shell Command Container priority 
149  private int shellCmdPriority = 0;
150
151  // Amt of memory to request for container in which shell script will be executed
152  private int containerMemory = 10; 
153  // Amt. of virtual cores to request for container in which shell script will be executed
154  private int containerVirtualCores = 1;
155  // No. of containers in which the shell script needs to be executed
156  private int numContainers = 1;
157  private String nodeLabelExpression = null;
158
159  // log4j.properties file 
160  // if available, add to local resources and set into classpath 
161  private String log4jPropFile = "";    
162
163  // Start time for client
164  private final long clientStartTime = System.currentTimeMillis();
165  // Timeout threshold for client. Kill app after time interval expires.
166  private long clientTimeout = 600000;
167
168  // flag to indicate whether to keep containers across application attempts.
169  private boolean keepContainers = false;
170
171  private long attemptFailuresValidityInterval = -1;
172
173  private Vector<CharSequence> containerRetryOptions = new Vector<>(5);
174
175  // Debug flag
176  boolean debugFlag = false;
177
178  // Timeline domain ID
179  private String domainId = null;
180
181  // Flag to indicate whether to create the domain of the given ID
182  private boolean toCreateDomain = false;
183
184  // Timeline domain reader access control
185  private String viewACLs = null;
186
187  // Timeline domain writer access control
188  private String modifyACLs = null;
189
190  // Command line options
191  private Options opts;
192
193  private static final String shellCommandPath = "shellCommands";
194  private static final String shellArgsPath = "shellArgs";
195  private static final String appMasterJarPath = "AppMaster.jar";
196  // Hardcoded path to custom log_properties
197  private static final String log4jPath = "log4j.properties";
198
199  public static final String SCRIPT_PATH = "ExecScript";
200
201  /**
202   * @param args Command line arguments 
203   */
204  public static void main(String[] args) {
205    boolean result = false;
206    try {
207      Client client = new Client();
208      LOG.info("Initializing Client");
209      try {
210        boolean doRun = client.init(args);
211        if (!doRun) {
212          System.exit(0);
213        }
214      } catch (IllegalArgumentException e) {
215        System.err.println(e.getLocalizedMessage());
216        client.printUsage();
217        System.exit(-1);
218      }
219      result = client.run();
220    } catch (Throwable t) {
221      LOG.fatal("Error running Client", t);
222      System.exit(1);
223    }
224    if (result) {
225      LOG.info("Application completed successfully");
226      System.exit(0);                   
227    } 
228    LOG.error("Application failed to complete successfully");
229    System.exit(2);
230  }
231
232  /**
233   */
234  public Client(Configuration conf) throws Exception  {
235    this(
236      "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster",
237      conf);
238  }
239
240  Client(String appMasterMainClass, Configuration conf) {
241    this.conf = conf;
242    this.appMasterMainClass = appMasterMainClass;
243    yarnClient = YarnClient.createYarnClient();
244    yarnClient.init(conf);
245    opts = new Options();
246    opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
247    opts.addOption("priority", true, "Application Priority. Default 0");
248    opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
249    opts.addOption("timeout", true, "Application timeout in milliseconds");
250    opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
251    opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
252    opts.addOption("jar", true, "Jar file containing the application master");
253    opts.addOption("shell_command", true, "Shell command to be executed by " +
254        "the Application Master. Can only specify either --shell_command " +
255        "or --shell_script");
256    opts.addOption("shell_script", true, "Location of the shell script to be " +
257        "executed. Can only specify either --shell_command or --shell_script");
258    opts.addOption("shell_args", true, "Command line args for the shell script." +
259        "Multiple args can be separated by empty space.");
260    opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
261    opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
262    opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
263    opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
264    opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
265    opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
266    opts.addOption("log_properties", true, "log4j.properties file");
267    opts.addOption("keep_containers_across_application_attempts", false,
268      "Flag to indicate whether to keep containers across application attempts." +
269      " If the flag is true, running containers will not be killed when" +
270      " application attempt fails and these containers will be retrieved by" +
271      " the new application attempt ");
272    opts.addOption("attempt_failures_validity_interval", true,
273      "when attempt_failures_validity_interval in milliseconds is set to > 0," +
274      "the failure number will not take failures which happen out of " +
275      "the validityInterval into failure count. " +
276      "If failure count reaches to maxAppAttempts, " +
277      "the application will be failed.");
278    opts.addOption("debug", false, "Dump out debug information");
279    opts.addOption("domain", true, "ID of the timeline domain where the "
280        + "timeline entities will be put");
281    opts.addOption("view_acls", true, "Users and groups that allowed to "
282        + "view the timeline entities in the given domain");
283    opts.addOption("modify_acls", true, "Users and groups that allowed to "
284        + "modify the timeline entities in the given domain");
285    opts.addOption("create", false, "Flag to indicate whether to create the "
286        + "domain specified with -domain.");
287    opts.addOption("help", false, "Print usage");
288    opts.addOption("node_label_expression", true,
289        "Node label expression to determine the nodes"
290            + " where all the containers of this application"
291            + " will be allocated, \"\" means containers"
292            + " can be allocated anywhere, if you don't specify the option,"
293            + " default node_label_expression of queue will be used.");
294    opts.addOption("container_retry_policy", true,
295        "Retry policy when container fails to run, "
296            + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, "
297            + "2: RETRY_ON_SPECIFIC_ERROR_CODES");
298    opts.addOption("container_retry_error_codes", true,
299        "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error "
300            + "codes is specified with this option, "
301            + "e.g. --container_retry_error_codes 1,2,3");
302    opts.addOption("container_max_retries", true,
303        "If container could retry, it specifies max retires");
304    opts.addOption("container_retry_interval", true,
305        "Interval between each retry, unit is milliseconds");
306  }
307
308  /**
309   */
310  public Client() throws Exception  {
311    this(new YarnConfiguration());
312  }
313
314  /**
315   * Helper function to print out usage
316   */
317  private void printUsage() {
318    new HelpFormatter().printHelp("Client", opts);
319  }
320
321  /**
322   * Parse command line options
323   * @param args Parsed command line options 
324   * @return Whether the init was successful to run the client
325   * @throws ParseException
326   */
327  public boolean init(String[] args) throws ParseException {
328
329    CommandLine cliParser = new GnuParser().parse(opts, args);
330
331    if (args.length == 0) {
332      throw new IllegalArgumentException("No args specified for client to initialize");
333    }
334
335    if (cliParser.hasOption("log_properties")) {
336      String log4jPath = cliParser.getOptionValue("log_properties");
337      try {
338        Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath);
339      } catch (Exception e) {
340        LOG.warn("Can not set up custom log4j properties. " + e);
341      }
342    }
343
344    if (cliParser.hasOption("help")) {
345      printUsage();
346      return false;
347    }
348
349    if (cliParser.hasOption("debug")) {
350      debugFlag = true;
351
352    }
353
354    if (cliParser.hasOption("keep_containers_across_application_attempts")) {
355      LOG.info("keep_containers_across_application_attempts");
356      keepContainers = true;
357    }
358
359    appName = cliParser.getOptionValue("appname", "DistributedShell");
360    amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
361    amQueue = cliParser.getOptionValue("queue", "default");
362    amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));               
363    amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
364
365    if (amMemory < 0) {
366      throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
367          + " Specified memory=" + amMemory);
368    }
369    if (amVCores < 0) {
370      throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
371          + " Specified virtual cores=" + amVCores);
372    }
373
374    if (!cliParser.hasOption("jar")) {
375      throw new IllegalArgumentException("No jar file specified for application master");
376    }           
377
378    appMasterJar = cliParser.getOptionValue("jar");
379
380    if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) {
381      throw new IllegalArgumentException(
382          "No shell command or shell script specified to be executed by application master");
383    } else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) {
384      throw new IllegalArgumentException("Can not specify shell_command option " +
385          "and shell_script option at the same time");
386    } else if (cliParser.hasOption("shell_command")) {
387      shellCommand = cliParser.getOptionValue("shell_command");
388    } else {
389      shellScriptPath = cliParser.getOptionValue("shell_script");
390    }
391    if (cliParser.hasOption("shell_args")) {
392      shellArgs = cliParser.getOptionValues("shell_args");
393    }
394    if (cliParser.hasOption("shell_env")) { 
395      String envs[] = cliParser.getOptionValues("shell_env");
396      for (String env : envs) {
397        env = env.trim();
398        int index = env.indexOf('=');
399        if (index == -1) {
400          shellEnv.put(env, "");
401          continue;
402        }
403        String key = env.substring(0, index);
404        String val = "";
405        if (index < (env.length()-1)) {
406          val = env.substring(index+1);
407        }
408        shellEnv.put(key, val);
409      }
410    }
411    shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
412
413    containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
414    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
415    numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
416    
417
418    if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
419      throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
420          + " exiting."
421          + " Specified containerMemory=" + containerMemory
422          + ", containerVirtualCores=" + containerVirtualCores
423          + ", numContainer=" + numContainers);
424    }
425    
426    nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null);
427
428    clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
429
430    attemptFailuresValidityInterval =
431        Long.parseLong(cliParser.getOptionValue(
432          "attempt_failures_validity_interval", "-1"));
433
434    log4jPropFile = cliParser.getOptionValue("log_properties", "");
435
436    // Get timeline domain options
437    if (cliParser.hasOption("domain")) {
438      domainId = cliParser.getOptionValue("domain");
439      toCreateDomain = cliParser.hasOption("create");
440      if (cliParser.hasOption("view_acls")) {
441        viewACLs = cliParser.getOptionValue("view_acls");
442      }
443      if (cliParser.hasOption("modify_acls")) {
444        modifyACLs = cliParser.getOptionValue("modify_acls");
445      }
446    }
447
448    // Get container retry options
449    if (cliParser.hasOption("container_retry_policy")) {
450      containerRetryOptions.add("--container_retry_policy "
451          + cliParser.getOptionValue("container_retry_policy"));
452    }
453    if (cliParser.hasOption("container_retry_error_codes")) {
454      containerRetryOptions.add("--container_retry_error_codes "
455          + cliParser.getOptionValue("container_retry_error_codes"));
456    }
457    if (cliParser.hasOption("container_max_retries")) {
458      containerRetryOptions.add("--container_max_retries "
459          + cliParser.getOptionValue("container_max_retries"));
460    }
461    if (cliParser.hasOption("container_retry_interval")) {
462      containerRetryOptions.add("--container_retry_interval "
463          + cliParser.getOptionValue("container_retry_interval"));
464    }
465
466    return true;
467  }
468
469  /**
470   * Main run function for the client
471   * @return true if application completed successfully
472   * @throws IOException
473   * @throws YarnException
474   */
475  public boolean run() throws IOException, YarnException {
476
477    LOG.info("Running Client");
478    yarnClient.start();
479
480    YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
481    LOG.info("Got Cluster metric info from ASM" 
482        + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
483
484    List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
485        NodeState.RUNNING);
486    LOG.info("Got Cluster node info from ASM");
487    for (NodeReport node : clusterNodeReports) {
488      LOG.info("Got node report from ASM for"
489          + ", nodeId=" + node.getNodeId() 
490          + ", nodeAddress=" + node.getHttpAddress()
491          + ", nodeRackName=" + node.getRackName()
492          + ", nodeNumContainers=" + node.getNumContainers());
493    }
494
495    QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
496    LOG.info("Queue info"
497        + ", queueName=" + queueInfo.getQueueName()
498        + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
499        + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
500        + ", queueApplicationCount=" + queueInfo.getApplications().size()
501        + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());               
502
503    List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
504    for (QueueUserACLInfo aclInfo : listAclInfo) {
505      for (QueueACL userAcl : aclInfo.getUserAcls()) {
506        LOG.info("User ACL Info for Queue"
507            + ", queueName=" + aclInfo.getQueueName()                   
508            + ", userAcl=" + userAcl.name());
509      }
510    }           
511
512    if (domainId != null && domainId.length() > 0 && toCreateDomain) {
513      prepareTimelineDomain();
514    }
515
516    // Get a new application id
517    YarnClientApplication app = yarnClient.createApplication();
518    GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
519    // TODO get min/max resource capabilities from RM and change memory ask if needed
520    // If we do not have min/max, we may not be able to correctly request 
521    // the required resources from the RM for the app master
522    // Memory ask has to be a multiple of min and less than max. 
523    // Dump out information about cluster capability as seen by the resource manager
524    long maxMem = appResponse.getMaximumResourceCapability().getMemorySize();
525    LOG.info("Max mem capability of resources in this cluster " + maxMem);
526
527    // A resource ask cannot exceed the max. 
528    if (amMemory > maxMem) {
529      LOG.info("AM memory specified above max threshold of cluster. Using max value."
530          + ", specified=" + amMemory
531          + ", max=" + maxMem);
532      amMemory = maxMem;
533    }                           
534
535    int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
536    LOG.info("Max virtual cores capability of resources in this cluster " + maxVCores);
537    
538    if (amVCores > maxVCores) {
539      LOG.info("AM virtual cores specified above max threshold of cluster. " 
540          + "Using max value." + ", specified=" + amVCores 
541          + ", max=" + maxVCores);
542      amVCores = maxVCores;
543    }
544    
545    // set the application name
546    ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
547    ApplicationId appId = appContext.getApplicationId();
548
549    appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
550    appContext.setApplicationName(appName);
551
552    if (attemptFailuresValidityInterval >= 0) {
553      appContext
554        .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
555    }
556
557    // set local resources for the application master
558    // local files or archives as needed
559    // In this scenario, the jar file for the application master is part of the local resources                 
560    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
561
562    LOG.info("Copy App Master jar from local filesystem and add to local environment");
563    // Copy the application master jar to the filesystem 
564    // Create a local resource to point to the destination jar path 
565    FileSystem fs = FileSystem.get(conf);
566    addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
567        localResources, null);
568
569    // Set the log4j properties if needed 
570    if (!log4jPropFile.isEmpty()) {
571      addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
572          localResources, null);
573    }                   
574
575    // The shell script has to be made available on the final container(s)
576    // where it will be executed. 
577    // To do this, we need to first copy into the filesystem that is visible 
578    // to the yarn framework. 
579    // We do not need to set this as a local resource for the application 
580    // master as the application master does not need it.               
581    String hdfsShellScriptLocation = ""; 
582    long hdfsShellScriptLen = 0;
583    long hdfsShellScriptTimestamp = 0;
584    if (!shellScriptPath.isEmpty()) {
585      Path shellSrc = new Path(shellScriptPath);
586      String shellPathSuffix =
587          appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
588      Path shellDst =
589          new Path(fs.getHomeDirectory(), shellPathSuffix);
590      fs.copyFromLocalFile(false, true, shellSrc, shellDst);
591      hdfsShellScriptLocation = shellDst.toUri().toString(); 
592      FileStatus shellFileStatus = fs.getFileStatus(shellDst);
593      hdfsShellScriptLen = shellFileStatus.getLen();
594      hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
595    }
596
597    if (!shellCommand.isEmpty()) {
598      addToLocalResources(fs, null, shellCommandPath, appId.toString(),
599          localResources, shellCommand);
600    }
601
602    if (shellArgs.length > 0) {
603      addToLocalResources(fs, null, shellArgsPath, appId.toString(),
604          localResources, StringUtils.join(shellArgs, " "));
605    }
606
607    // Set the necessary security tokens as needed
608    //amContainer.setContainerTokens(containerToken);
609
610    // Set the env variables to be setup in the env where the application master will be run
611    LOG.info("Set the environment for the application master");
612    Map<String, String> env = new HashMap<String, String>();
613
614    // put location of shell script into env
615    // using the env info, the application master will create the correct local resource for the 
616    // eventual containers that will be launched to execute the shell scripts
617    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
618    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
619    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
620    if (domainId != null && domainId.length() > 0) {
621      env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId);
622    }
623
624    // Add AppMaster.jar location to classpath          
625    // At some point we should not be required to add 
626    // the hadoop specific classpaths to the env. 
627    // It should be provided out of the box. 
628    // For now setting all required classpaths including
629    // the classpath to "." for the application jar
630    StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
631      .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
632    for (String c : conf.getStrings(
633        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
634        YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
635      classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
636      classPathEnv.append(c.trim());
637    }
638    classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
639      "./log4j.properties");
640
641    // add the runtime classpath needed for tests to work
642    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
643      classPathEnv.append(':');
644      classPathEnv.append(System.getProperty("java.class.path"));
645    }
646
647    env.put("CLASSPATH", classPathEnv.toString());
648
649    // Set the necessary command to execute the application master 
650    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
651
652    // Set java executable command 
653    LOG.info("Setting up app master command");
654    vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
655    // Set Xmx based on am memory size
656    vargs.add("-Xmx" + amMemory + "m");
657    // Set class name 
658    vargs.add(appMasterMainClass);
659    // Set params for Application Master
660    vargs.add("--container_memory " + String.valueOf(containerMemory));
661    vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
662    vargs.add("--num_containers " + String.valueOf(numContainers));
663    if (null != nodeLabelExpression) {
664      appContext.setNodeLabelExpression(nodeLabelExpression);
665    }
666    vargs.add("--priority " + String.valueOf(shellCmdPriority));
667
668    for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
669      vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
670    }                   
671    if (debugFlag) {
672      vargs.add("--debug");
673    }
674
675    vargs.addAll(containerRetryOptions);
676
677    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
678    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
679
680    // Get final commmand
681    StringBuilder command = new StringBuilder();
682    for (CharSequence str : vargs) {
683      command.append(str).append(" ");
684    }
685
686    LOG.info("Completed setting up app master command " + command.toString());     
687    List<String> commands = new ArrayList<String>();
688    commands.add(command.toString());           
689
690    // Set up the container launch context for the application master
691    ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
692      localResources, env, commands, null, null, null);
693
694    // Set up resource type requirements
695    // For now, both memory and vcores are supported, so we set memory and 
696    // vcores requirements
697    Resource capability = Resource.newInstance(amMemory, amVCores);
698    appContext.setResource(capability);
699
700    // Service data is a binary blob that can be passed to the application
701    // Not needed in this scenario
702    // amContainer.setServiceData(serviceData);
703
704    // Setup security tokens
705    if (UserGroupInformation.isSecurityEnabled()) {
706      // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
707      Credentials credentials = new Credentials();
708      String tokenRenewer = YarnClientUtils.getRmPrincipal(conf);
709      if (tokenRenewer == null || tokenRenewer.length() == 0) {
710        throw new IOException(
711          "Can't get Master Kerberos principal for the RM to use as renewer");
712      }
713
714      // For now, only getting tokens for the default file-system.
715      final Token<?> tokens[] =
716          fs.addDelegationTokens(tokenRenewer, credentials);
717      if (tokens != null) {
718        for (Token<?> token : tokens) {
719          LOG.info("Got dt for " + fs.getUri() + "; " + token);
720        }
721      }
722      DataOutputBuffer dob = new DataOutputBuffer();
723      credentials.writeTokenStorageToStream(dob);
724      ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
725      amContainer.setTokens(fsTokens);
726    }
727
728    appContext.setAMContainerSpec(amContainer);
729
730    // Set the priority for the application master
731    // TODO - what is the range for priority? how to decide? 
732    Priority pri = Priority.newInstance(amPriority);
733    appContext.setPriority(pri);
734
735    // Set the queue to which this application is to be submitted in the RM
736    appContext.setQueue(amQueue);
737
738    // Submit the application to the applications manager
739    // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
740    // Ignore the response as either a valid response object is returned on success 
741    // or an exception thrown to denote some form of a failure
742    LOG.info("Submitting application to ASM");
743
744    yarnClient.submitApplication(appContext);
745
746    // TODO
747    // Try submitting the same request again
748    // app submission failure?
749
750    // Monitor the application
751    return monitorApplication(appId);
752
753  }
754
755  /**
756   * Monitor the submitted application for completion. 
757   * Kill application if time expires. 
758   * @param appId Application Id of application to be monitored
759   * @return true if application completed successfully
760   * @throws YarnException
761   * @throws IOException
762   */
763  private boolean monitorApplication(ApplicationId appId)
764      throws YarnException, IOException {
765
766    while (true) {
767
768      // Check app status every 1 second.
769      try {
770        Thread.sleep(1000);
771      } catch (InterruptedException e) {
772        LOG.debug("Thread sleep in monitoring loop interrupted");
773      }
774
775      // Get application report for the appId we are interested in 
776      ApplicationReport report = yarnClient.getApplicationReport(appId);
777
778      LOG.info("Got application report from ASM for"
779          + ", appId=" + appId.getId()
780          + ", clientToAMToken=" + report.getClientToAMToken()
781          + ", appDiagnostics=" + report.getDiagnostics()
782          + ", appMasterHost=" + report.getHost()
783          + ", appQueue=" + report.getQueue()
784          + ", appMasterRpcPort=" + report.getRpcPort()
785          + ", appStartTime=" + report.getStartTime()
786          + ", yarnAppState=" + report.getYarnApplicationState().toString()
787          + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
788          + ", appTrackingUrl=" + report.getTrackingUrl()
789          + ", appUser=" + report.getUser());
790
791      YarnApplicationState state = report.getYarnApplicationState();
792      FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
793      if (YarnApplicationState.FINISHED == state) {
794        if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
795          LOG.info("Application has completed successfully. Breaking monitoring loop");
796          return true;        
797        }
798        else {
799          LOG.info("Application did finished unsuccessfully."
800              + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
801              + ". Breaking monitoring loop");
802          return false;
803        }                         
804      }
805      else if (YarnApplicationState.KILLED == state     
806          || YarnApplicationState.FAILED == state) {
807        LOG.info("Application did not finish."
808            + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
809            + ". Breaking monitoring loop");
810        return false;
811      }                 
812
813      if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
814        LOG.info("Reached client specified timeout for application. Killing application");
815        forceKillApplication(appId);
816        return false;                           
817      }
818    }                   
819
820  }
821
822  /**
823   * Kill a submitted application by sending a call to the ASM
824   * @param appId Application Id to be killed. 
825   * @throws YarnException
826   * @throws IOException
827   */
828  private void forceKillApplication(ApplicationId appId)
829      throws YarnException, IOException {
830    // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 
831    // the same time. 
832    // If yes, can we kill a particular attempt only?
833
834    // Response can be ignored as it is non-null on success or 
835    // throws an exception in case of failures
836    yarnClient.killApplication(appId);  
837  }
838
839  private void addToLocalResources(FileSystem fs, String fileSrcPath,
840      String fileDstPath, String appId, Map<String, LocalResource> localResources,
841      String resources) throws IOException {
842    String suffix =
843        appName + "/" + appId + "/" + fileDstPath;
844    Path dst =
845        new Path(fs.getHomeDirectory(), suffix);
846    if (fileSrcPath == null) {
847      FSDataOutputStream ostream = null;
848      try {
849        ostream = FileSystem
850            .create(fs, dst, new FsPermission((short) 0710));
851        ostream.writeUTF(resources);
852      } finally {
853        IOUtils.closeQuietly(ostream);
854      }
855    } else {
856      fs.copyFromLocalFile(new Path(fileSrcPath), dst);
857    }
858    FileStatus scFileStatus = fs.getFileStatus(dst);
859    LocalResource scRsrc =
860        LocalResource.newInstance(
861            URL.fromURI(dst.toUri()),
862            LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
863            scFileStatus.getLen(), scFileStatus.getModificationTime());
864    localResources.put(fileDstPath, scRsrc);
865  }
866
867  private void prepareTimelineDomain() {
868    TimelineClient timelineClient = null;
869    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
870        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
871      timelineClient = TimelineClient.createTimelineClient();
872      timelineClient.init(conf);
873      timelineClient.start();
874    } else {
875      LOG.warn("Cannot put the domain " + domainId +
876          " because the timeline service is not enabled");
877      return;
878    }
879    try {
880      //TODO: we need to check and combine the existing timeline domain ACLs,
881      //but let's do it once we have client java library to query domains.
882      TimelineDomain domain = new TimelineDomain();
883      domain.setId(domainId);
884      domain.setReaders(
885          viewACLs != null && viewACLs.length() > 0 ? viewACLs : " ");
886      domain.setWriters(
887          modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " ");
888      timelineClient.putDomain(domain);
889      LOG.info("Put the timeline domain: " +
890          TimelineUtils.dumpTimelineRecordtoJSON(domain));
891    } catch (Exception e) {
892      LOG.error("Error when putting the timeline domain", e);
893    } finally {
894      timelineClient.stop();
895    }
896  }
897}