001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.applications.distributedshell; 020 021import java.io.IOException; 022import java.nio.ByteBuffer; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.Vector; 028 029import org.apache.commons.cli.CommandLine; 030import org.apache.commons.cli.GnuParser; 031import org.apache.commons.cli.HelpFormatter; 032import org.apache.commons.cli.Option; 033import org.apache.commons.cli.Options; 034import org.apache.commons.cli.ParseException; 035import org.apache.commons.io.IOUtils; 036import org.apache.commons.lang.StringUtils; 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039import org.apache.hadoop.classification.InterfaceAudience; 040import org.apache.hadoop.classification.InterfaceStability; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FSDataOutputStream; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.fs.permission.FsPermission; 047import org.apache.hadoop.io.DataOutputBuffer; 048import org.apache.hadoop.security.Credentials; 049import org.apache.hadoop.security.UserGroupInformation; 050import org.apache.hadoop.security.token.Token; 051import org.apache.hadoop.yarn.api.ApplicationClientProtocol; 052import org.apache.hadoop.yarn.api.ApplicationConstants; 053import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 054import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; 055import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; 056import org.apache.hadoop.yarn.api.records.ApplicationId; 057import org.apache.hadoop.yarn.api.records.ApplicationReport; 058import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; 059import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 060import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 061import org.apache.hadoop.yarn.api.records.LocalResource; 062import org.apache.hadoop.yarn.api.records.LocalResourceType; 063import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 064import org.apache.hadoop.yarn.api.records.NodeReport; 065import org.apache.hadoop.yarn.api.records.NodeState; 066import org.apache.hadoop.yarn.api.records.Priority; 067import org.apache.hadoop.yarn.api.records.QueueACL; 068import org.apache.hadoop.yarn.api.records.QueueInfo; 069import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; 070import org.apache.hadoop.yarn.api.records.Resource; 071import org.apache.hadoop.yarn.api.records.URL; 072import org.apache.hadoop.yarn.api.records.YarnApplicationState; 073import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; 074import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; 075import org.apache.hadoop.yarn.client.api.TimelineClient; 076import org.apache.hadoop.yarn.client.api.YarnClient; 077import org.apache.hadoop.yarn.client.api.YarnClientApplication; 078import org.apache.hadoop.yarn.client.util.YarnClientUtils; 079import org.apache.hadoop.yarn.conf.YarnConfiguration; 080import org.apache.hadoop.yarn.exceptions.YarnException; 081import org.apache.hadoop.yarn.util.ConverterUtils; 082import org.apache.hadoop.yarn.util.timeline.TimelineUtils; 083 084/** 085 * Client for Distributed Shell application submission to YARN. 086 * 087 * <p> The distributed shell client allows an application master to be launched that in turn would run 088 * the provided shell command on a set of containers. </p> 089 * 090 * <p>This client is meant to act as an example on how to write yarn-based applications. </p> 091 * 092 * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 093 * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} 094 * provides a way for the client to get access to cluster information and to request for a 095 * new {@link ApplicationId}. <p> 096 * 097 * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 098 * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 099 * and application name, the priority assigned to the application and the queue 100 * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext} 101 * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 102 * the {@link ApplicationMaster} is launched. </p> 103 * 104 * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 105 * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 106 * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 107 * {@link ApplicationMaster}. <p> 108 * 109 * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 110 * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 111 * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 112 * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p> 113 * 114 */ 115@InterfaceAudience.Public 116@InterfaceStability.Unstable 117public class Client { 118 119 private static final Log LOG = LogFactory.getLog(Client.class); 120 121 // Configuration 122 private Configuration conf; 123 private YarnClient yarnClient; 124 // Application master specific info to register a new Application with RM/ASM 125 private String appName = ""; 126 // App master priority 127 private int amPriority = 0; 128 // Queue for App master 129 private String amQueue = ""; 130 // Amt. of memory resource to request for to run the App Master 131 private long amMemory = 10; 132 // Amt. of virtual core resource to request for to run the App Master 133 private int amVCores = 1; 134 135 // Application master jar file 136 private String appMasterJar = ""; 137 // Main class to invoke application master 138 private final String appMasterMainClass; 139 140 // Shell command to be executed 141 private String shellCommand = ""; 142 // Location of shell script 143 private String shellScriptPath = ""; 144 // Args to be passed to the shell command 145 private String[] shellArgs = new String[] {}; 146 // Env variables to be setup for the shell command 147 private Map<String, String> shellEnv = new HashMap<String, String>(); 148 // Shell Command Container priority 149 private int shellCmdPriority = 0; 150 151 // Amt of memory to request for container in which shell script will be executed 152 private int containerMemory = 10; 153 // Amt. of virtual cores to request for container in which shell script will be executed 154 private int containerVirtualCores = 1; 155 // No. of containers in which the shell script needs to be executed 156 private int numContainers = 1; 157 private String nodeLabelExpression = null; 158 159 // log4j.properties file 160 // if available, add to local resources and set into classpath 161 private String log4jPropFile = ""; 162 163 // Start time for client 164 private final long clientStartTime = System.currentTimeMillis(); 165 // Timeout threshold for client. Kill app after time interval expires. 166 private long clientTimeout = 600000; 167 168 // flag to indicate whether to keep containers across application attempts. 169 private boolean keepContainers = false; 170 171 private long attemptFailuresValidityInterval = -1; 172 173 private Vector<CharSequence> containerRetryOptions = new Vector<>(5); 174 175 // Debug flag 176 boolean debugFlag = false; 177 178 // Timeline domain ID 179 private String domainId = null; 180 181 // Flag to indicate whether to create the domain of the given ID 182 private boolean toCreateDomain = false; 183 184 // Timeline domain reader access control 185 private String viewACLs = null; 186 187 // Timeline domain writer access control 188 private String modifyACLs = null; 189 190 // Command line options 191 private Options opts; 192 193 private static final String shellCommandPath = "shellCommands"; 194 private static final String shellArgsPath = "shellArgs"; 195 private static final String appMasterJarPath = "AppMaster.jar"; 196 // Hardcoded path to custom log_properties 197 private static final String log4jPath = "log4j.properties"; 198 199 public static final String SCRIPT_PATH = "ExecScript"; 200 201 /** 202 * @param args Command line arguments 203 */ 204 public static void main(String[] args) { 205 boolean result = false; 206 try { 207 Client client = new Client(); 208 LOG.info("Initializing Client"); 209 try { 210 boolean doRun = client.init(args); 211 if (!doRun) { 212 System.exit(0); 213 } 214 } catch (IllegalArgumentException e) { 215 System.err.println(e.getLocalizedMessage()); 216 client.printUsage(); 217 System.exit(-1); 218 } 219 result = client.run(); 220 } catch (Throwable t) { 221 LOG.fatal("Error running Client", t); 222 System.exit(1); 223 } 224 if (result) { 225 LOG.info("Application completed successfully"); 226 System.exit(0); 227 } 228 LOG.error("Application failed to complete successfully"); 229 System.exit(2); 230 } 231 232 /** 233 */ 234 public Client(Configuration conf) throws Exception { 235 this( 236 "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster", 237 conf); 238 } 239 240 Client(String appMasterMainClass, Configuration conf) { 241 this.conf = conf; 242 this.appMasterMainClass = appMasterMainClass; 243 yarnClient = YarnClient.createYarnClient(); 244 yarnClient.init(conf); 245 opts = new Options(); 246 opts.addOption("appname", true, "Application Name. Default value - DistributedShell"); 247 opts.addOption("priority", true, "Application Priority. Default 0"); 248 opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); 249 opts.addOption("timeout", true, "Application timeout in milliseconds"); 250 opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); 251 opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master"); 252 opts.addOption("jar", true, "Jar file containing the application master"); 253 opts.addOption("shell_command", true, "Shell command to be executed by " + 254 "the Application Master. Can only specify either --shell_command " + 255 "or --shell_script"); 256 opts.addOption("shell_script", true, "Location of the shell script to be " + 257 "executed. Can only specify either --shell_command or --shell_script"); 258 opts.addOption("shell_args", true, "Command line args for the shell script." + 259 "Multiple args can be separated by empty space."); 260 opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES); 261 opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs"); 262 opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); 263 opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); 264 opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command"); 265 opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); 266 opts.addOption("log_properties", true, "log4j.properties file"); 267 opts.addOption("keep_containers_across_application_attempts", false, 268 "Flag to indicate whether to keep containers across application attempts." + 269 " If the flag is true, running containers will not be killed when" + 270 " application attempt fails and these containers will be retrieved by" + 271 " the new application attempt "); 272 opts.addOption("attempt_failures_validity_interval", true, 273 "when attempt_failures_validity_interval in milliseconds is set to > 0," + 274 "the failure number will not take failures which happen out of " + 275 "the validityInterval into failure count. " + 276 "If failure count reaches to maxAppAttempts, " + 277 "the application will be failed."); 278 opts.addOption("debug", false, "Dump out debug information"); 279 opts.addOption("domain", true, "ID of the timeline domain where the " 280 + "timeline entities will be put"); 281 opts.addOption("view_acls", true, "Users and groups that allowed to " 282 + "view the timeline entities in the given domain"); 283 opts.addOption("modify_acls", true, "Users and groups that allowed to " 284 + "modify the timeline entities in the given domain"); 285 opts.addOption("create", false, "Flag to indicate whether to create the " 286 + "domain specified with -domain."); 287 opts.addOption("help", false, "Print usage"); 288 opts.addOption("node_label_expression", true, 289 "Node label expression to determine the nodes" 290 + " where all the containers of this application" 291 + " will be allocated, \"\" means containers" 292 + " can be allocated anywhere, if you don't specify the option," 293 + " default node_label_expression of queue will be used."); 294 opts.addOption("container_retry_policy", true, 295 "Retry policy when container fails to run, " 296 + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, " 297 + "2: RETRY_ON_SPECIFIC_ERROR_CODES"); 298 opts.addOption("container_retry_error_codes", true, 299 "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error " 300 + "codes is specified with this option, " 301 + "e.g. --container_retry_error_codes 1,2,3"); 302 opts.addOption("container_max_retries", true, 303 "If container could retry, it specifies max retires"); 304 opts.addOption("container_retry_interval", true, 305 "Interval between each retry, unit is milliseconds"); 306 } 307 308 /** 309 */ 310 public Client() throws Exception { 311 this(new YarnConfiguration()); 312 } 313 314 /** 315 * Helper function to print out usage 316 */ 317 private void printUsage() { 318 new HelpFormatter().printHelp("Client", opts); 319 } 320 321 /** 322 * Parse command line options 323 * @param args Parsed command line options 324 * @return Whether the init was successful to run the client 325 * @throws ParseException 326 */ 327 public boolean init(String[] args) throws ParseException { 328 329 CommandLine cliParser = new GnuParser().parse(opts, args); 330 331 if (args.length == 0) { 332 throw new IllegalArgumentException("No args specified for client to initialize"); 333 } 334 335 if (cliParser.hasOption("log_properties")) { 336 String log4jPath = cliParser.getOptionValue("log_properties"); 337 try { 338 Log4jPropertyHelper.updateLog4jConfiguration(Client.class, log4jPath); 339 } catch (Exception e) { 340 LOG.warn("Can not set up custom log4j properties. " + e); 341 } 342 } 343 344 if (cliParser.hasOption("help")) { 345 printUsage(); 346 return false; 347 } 348 349 if (cliParser.hasOption("debug")) { 350 debugFlag = true; 351 352 } 353 354 if (cliParser.hasOption("keep_containers_across_application_attempts")) { 355 LOG.info("keep_containers_across_application_attempts"); 356 keepContainers = true; 357 } 358 359 appName = cliParser.getOptionValue("appname", "DistributedShell"); 360 amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); 361 amQueue = cliParser.getOptionValue("queue", "default"); 362 amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10")); 363 amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1")); 364 365 if (amMemory < 0) { 366 throw new IllegalArgumentException("Invalid memory specified for application master, exiting." 367 + " Specified memory=" + amMemory); 368 } 369 if (amVCores < 0) { 370 throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting." 371 + " Specified virtual cores=" + amVCores); 372 } 373 374 if (!cliParser.hasOption("jar")) { 375 throw new IllegalArgumentException("No jar file specified for application master"); 376 } 377 378 appMasterJar = cliParser.getOptionValue("jar"); 379 380 if (!cliParser.hasOption("shell_command") && !cliParser.hasOption("shell_script")) { 381 throw new IllegalArgumentException( 382 "No shell command or shell script specified to be executed by application master"); 383 } else if (cliParser.hasOption("shell_command") && cliParser.hasOption("shell_script")) { 384 throw new IllegalArgumentException("Can not specify shell_command option " + 385 "and shell_script option at the same time"); 386 } else if (cliParser.hasOption("shell_command")) { 387 shellCommand = cliParser.getOptionValue("shell_command"); 388 } else { 389 shellScriptPath = cliParser.getOptionValue("shell_script"); 390 } 391 if (cliParser.hasOption("shell_args")) { 392 shellArgs = cliParser.getOptionValues("shell_args"); 393 } 394 if (cliParser.hasOption("shell_env")) { 395 String envs[] = cliParser.getOptionValues("shell_env"); 396 for (String env : envs) { 397 env = env.trim(); 398 int index = env.indexOf('='); 399 if (index == -1) { 400 shellEnv.put(env, ""); 401 continue; 402 } 403 String key = env.substring(0, index); 404 String val = ""; 405 if (index < (env.length()-1)) { 406 val = env.substring(index+1); 407 } 408 shellEnv.put(key, val); 409 } 410 } 411 shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0")); 412 413 containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); 414 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1")); 415 numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); 416 417 418 if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) { 419 throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified," 420 + " exiting." 421 + " Specified containerMemory=" + containerMemory 422 + ", containerVirtualCores=" + containerVirtualCores 423 + ", numContainer=" + numContainers); 424 } 425 426 nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null); 427 428 clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); 429 430 attemptFailuresValidityInterval = 431 Long.parseLong(cliParser.getOptionValue( 432 "attempt_failures_validity_interval", "-1")); 433 434 log4jPropFile = cliParser.getOptionValue("log_properties", ""); 435 436 // Get timeline domain options 437 if (cliParser.hasOption("domain")) { 438 domainId = cliParser.getOptionValue("domain"); 439 toCreateDomain = cliParser.hasOption("create"); 440 if (cliParser.hasOption("view_acls")) { 441 viewACLs = cliParser.getOptionValue("view_acls"); 442 } 443 if (cliParser.hasOption("modify_acls")) { 444 modifyACLs = cliParser.getOptionValue("modify_acls"); 445 } 446 } 447 448 // Get container retry options 449 if (cliParser.hasOption("container_retry_policy")) { 450 containerRetryOptions.add("--container_retry_policy " 451 + cliParser.getOptionValue("container_retry_policy")); 452 } 453 if (cliParser.hasOption("container_retry_error_codes")) { 454 containerRetryOptions.add("--container_retry_error_codes " 455 + cliParser.getOptionValue("container_retry_error_codes")); 456 } 457 if (cliParser.hasOption("container_max_retries")) { 458 containerRetryOptions.add("--container_max_retries " 459 + cliParser.getOptionValue("container_max_retries")); 460 } 461 if (cliParser.hasOption("container_retry_interval")) { 462 containerRetryOptions.add("--container_retry_interval " 463 + cliParser.getOptionValue("container_retry_interval")); 464 } 465 466 return true; 467 } 468 469 /** 470 * Main run function for the client 471 * @return true if application completed successfully 472 * @throws IOException 473 * @throws YarnException 474 */ 475 public boolean run() throws IOException, YarnException { 476 477 LOG.info("Running Client"); 478 yarnClient.start(); 479 480 YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); 481 LOG.info("Got Cluster metric info from ASM" 482 + ", numNodeManagers=" + clusterMetrics.getNumNodeManagers()); 483 484 List<NodeReport> clusterNodeReports = yarnClient.getNodeReports( 485 NodeState.RUNNING); 486 LOG.info("Got Cluster node info from ASM"); 487 for (NodeReport node : clusterNodeReports) { 488 LOG.info("Got node report from ASM for" 489 + ", nodeId=" + node.getNodeId() 490 + ", nodeAddress=" + node.getHttpAddress() 491 + ", nodeRackName=" + node.getRackName() 492 + ", nodeNumContainers=" + node.getNumContainers()); 493 } 494 495 QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue); 496 LOG.info("Queue info" 497 + ", queueName=" + queueInfo.getQueueName() 498 + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity() 499 + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity() 500 + ", queueApplicationCount=" + queueInfo.getApplications().size() 501 + ", queueChildQueueCount=" + queueInfo.getChildQueues().size()); 502 503 List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo(); 504 for (QueueUserACLInfo aclInfo : listAclInfo) { 505 for (QueueACL userAcl : aclInfo.getUserAcls()) { 506 LOG.info("User ACL Info for Queue" 507 + ", queueName=" + aclInfo.getQueueName() 508 + ", userAcl=" + userAcl.name()); 509 } 510 } 511 512 if (domainId != null && domainId.length() > 0 && toCreateDomain) { 513 prepareTimelineDomain(); 514 } 515 516 // Get a new application id 517 YarnClientApplication app = yarnClient.createApplication(); 518 GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); 519 // TODO get min/max resource capabilities from RM and change memory ask if needed 520 // If we do not have min/max, we may not be able to correctly request 521 // the required resources from the RM for the app master 522 // Memory ask has to be a multiple of min and less than max. 523 // Dump out information about cluster capability as seen by the resource manager 524 long maxMem = appResponse.getMaximumResourceCapability().getMemorySize(); 525 LOG.info("Max mem capability of resources in this cluster " + maxMem); 526 527 // A resource ask cannot exceed the max. 528 if (amMemory > maxMem) { 529 LOG.info("AM memory specified above max threshold of cluster. Using max value." 530 + ", specified=" + amMemory 531 + ", max=" + maxMem); 532 amMemory = maxMem; 533 } 534 535 int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores(); 536 LOG.info("Max virtual cores capability of resources in this cluster " + maxVCores); 537 538 if (amVCores > maxVCores) { 539 LOG.info("AM virtual cores specified above max threshold of cluster. " 540 + "Using max value." + ", specified=" + amVCores 541 + ", max=" + maxVCores); 542 amVCores = maxVCores; 543 } 544 545 // set the application name 546 ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); 547 ApplicationId appId = appContext.getApplicationId(); 548 549 appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); 550 appContext.setApplicationName(appName); 551 552 if (attemptFailuresValidityInterval >= 0) { 553 appContext 554 .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); 555 } 556 557 // set local resources for the application master 558 // local files or archives as needed 559 // In this scenario, the jar file for the application master is part of the local resources 560 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 561 562 LOG.info("Copy App Master jar from local filesystem and add to local environment"); 563 // Copy the application master jar to the filesystem 564 // Create a local resource to point to the destination jar path 565 FileSystem fs = FileSystem.get(conf); 566 addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), 567 localResources, null); 568 569 // Set the log4j properties if needed 570 if (!log4jPropFile.isEmpty()) { 571 addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(), 572 localResources, null); 573 } 574 575 // The shell script has to be made available on the final container(s) 576 // where it will be executed. 577 // To do this, we need to first copy into the filesystem that is visible 578 // to the yarn framework. 579 // We do not need to set this as a local resource for the application 580 // master as the application master does not need it. 581 String hdfsShellScriptLocation = ""; 582 long hdfsShellScriptLen = 0; 583 long hdfsShellScriptTimestamp = 0; 584 if (!shellScriptPath.isEmpty()) { 585 Path shellSrc = new Path(shellScriptPath); 586 String shellPathSuffix = 587 appName + "/" + appId.toString() + "/" + SCRIPT_PATH; 588 Path shellDst = 589 new Path(fs.getHomeDirectory(), shellPathSuffix); 590 fs.copyFromLocalFile(false, true, shellSrc, shellDst); 591 hdfsShellScriptLocation = shellDst.toUri().toString(); 592 FileStatus shellFileStatus = fs.getFileStatus(shellDst); 593 hdfsShellScriptLen = shellFileStatus.getLen(); 594 hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); 595 } 596 597 if (!shellCommand.isEmpty()) { 598 addToLocalResources(fs, null, shellCommandPath, appId.toString(), 599 localResources, shellCommand); 600 } 601 602 if (shellArgs.length > 0) { 603 addToLocalResources(fs, null, shellArgsPath, appId.toString(), 604 localResources, StringUtils.join(shellArgs, " ")); 605 } 606 607 // Set the necessary security tokens as needed 608 //amContainer.setContainerTokens(containerToken); 609 610 // Set the env variables to be setup in the env where the application master will be run 611 LOG.info("Set the environment for the application master"); 612 Map<String, String> env = new HashMap<String, String>(); 613 614 // put location of shell script into env 615 // using the env info, the application master will create the correct local resource for the 616 // eventual containers that will be launched to execute the shell scripts 617 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); 618 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); 619 env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); 620 if (domainId != null && domainId.length() > 0) { 621 env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId); 622 } 623 624 // Add AppMaster.jar location to classpath 625 // At some point we should not be required to add 626 // the hadoop specific classpaths to the env. 627 // It should be provided out of the box. 628 // For now setting all required classpaths including 629 // the classpath to "." for the application jar 630 StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$()) 631 .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*"); 632 for (String c : conf.getStrings( 633 YarnConfiguration.YARN_APPLICATION_CLASSPATH, 634 YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) { 635 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR); 636 classPathEnv.append(c.trim()); 637 } 638 classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append( 639 "./log4j.properties"); 640 641 // add the runtime classpath needed for tests to work 642 if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { 643 classPathEnv.append(':'); 644 classPathEnv.append(System.getProperty("java.class.path")); 645 } 646 647 env.put("CLASSPATH", classPathEnv.toString()); 648 649 // Set the necessary command to execute the application master 650 Vector<CharSequence> vargs = new Vector<CharSequence>(30); 651 652 // Set java executable command 653 LOG.info("Setting up app master command"); 654 vargs.add(Environment.JAVA_HOME.$$() + "/bin/java"); 655 // Set Xmx based on am memory size 656 vargs.add("-Xmx" + amMemory + "m"); 657 // Set class name 658 vargs.add(appMasterMainClass); 659 // Set params for Application Master 660 vargs.add("--container_memory " + String.valueOf(containerMemory)); 661 vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); 662 vargs.add("--num_containers " + String.valueOf(numContainers)); 663 if (null != nodeLabelExpression) { 664 appContext.setNodeLabelExpression(nodeLabelExpression); 665 } 666 vargs.add("--priority " + String.valueOf(shellCmdPriority)); 667 668 for (Map.Entry<String, String> entry : shellEnv.entrySet()) { 669 vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); 670 } 671 if (debugFlag) { 672 vargs.add("--debug"); 673 } 674 675 vargs.addAll(containerRetryOptions); 676 677 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); 678 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); 679 680 // Get final commmand 681 StringBuilder command = new StringBuilder(); 682 for (CharSequence str : vargs) { 683 command.append(str).append(" "); 684 } 685 686 LOG.info("Completed setting up app master command " + command.toString()); 687 List<String> commands = new ArrayList<String>(); 688 commands.add(command.toString()); 689 690 // Set up the container launch context for the application master 691 ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( 692 localResources, env, commands, null, null, null); 693 694 // Set up resource type requirements 695 // For now, both memory and vcores are supported, so we set memory and 696 // vcores requirements 697 Resource capability = Resource.newInstance(amMemory, amVCores); 698 appContext.setResource(capability); 699 700 // Service data is a binary blob that can be passed to the application 701 // Not needed in this scenario 702 // amContainer.setServiceData(serviceData); 703 704 // Setup security tokens 705 if (UserGroupInformation.isSecurityEnabled()) { 706 // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce 707 Credentials credentials = new Credentials(); 708 String tokenRenewer = YarnClientUtils.getRmPrincipal(conf); 709 if (tokenRenewer == null || tokenRenewer.length() == 0) { 710 throw new IOException( 711 "Can't get Master Kerberos principal for the RM to use as renewer"); 712 } 713 714 // For now, only getting tokens for the default file-system. 715 final Token<?> tokens[] = 716 fs.addDelegationTokens(tokenRenewer, credentials); 717 if (tokens != null) { 718 for (Token<?> token : tokens) { 719 LOG.info("Got dt for " + fs.getUri() + "; " + token); 720 } 721 } 722 DataOutputBuffer dob = new DataOutputBuffer(); 723 credentials.writeTokenStorageToStream(dob); 724 ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 725 amContainer.setTokens(fsTokens); 726 } 727 728 appContext.setAMContainerSpec(amContainer); 729 730 // Set the priority for the application master 731 // TODO - what is the range for priority? how to decide? 732 Priority pri = Priority.newInstance(amPriority); 733 appContext.setPriority(pri); 734 735 // Set the queue to which this application is to be submitted in the RM 736 appContext.setQueue(amQueue); 737 738 // Submit the application to the applications manager 739 // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); 740 // Ignore the response as either a valid response object is returned on success 741 // or an exception thrown to denote some form of a failure 742 LOG.info("Submitting application to ASM"); 743 744 yarnClient.submitApplication(appContext); 745 746 // TODO 747 // Try submitting the same request again 748 // app submission failure? 749 750 // Monitor the application 751 return monitorApplication(appId); 752 753 } 754 755 /** 756 * Monitor the submitted application for completion. 757 * Kill application if time expires. 758 * @param appId Application Id of application to be monitored 759 * @return true if application completed successfully 760 * @throws YarnException 761 * @throws IOException 762 */ 763 private boolean monitorApplication(ApplicationId appId) 764 throws YarnException, IOException { 765 766 while (true) { 767 768 // Check app status every 1 second. 769 try { 770 Thread.sleep(1000); 771 } catch (InterruptedException e) { 772 LOG.debug("Thread sleep in monitoring loop interrupted"); 773 } 774 775 // Get application report for the appId we are interested in 776 ApplicationReport report = yarnClient.getApplicationReport(appId); 777 778 LOG.info("Got application report from ASM for" 779 + ", appId=" + appId.getId() 780 + ", clientToAMToken=" + report.getClientToAMToken() 781 + ", appDiagnostics=" + report.getDiagnostics() 782 + ", appMasterHost=" + report.getHost() 783 + ", appQueue=" + report.getQueue() 784 + ", appMasterRpcPort=" + report.getRpcPort() 785 + ", appStartTime=" + report.getStartTime() 786 + ", yarnAppState=" + report.getYarnApplicationState().toString() 787 + ", distributedFinalState=" + report.getFinalApplicationStatus().toString() 788 + ", appTrackingUrl=" + report.getTrackingUrl() 789 + ", appUser=" + report.getUser()); 790 791 YarnApplicationState state = report.getYarnApplicationState(); 792 FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); 793 if (YarnApplicationState.FINISHED == state) { 794 if (FinalApplicationStatus.SUCCEEDED == dsStatus) { 795 LOG.info("Application has completed successfully. Breaking monitoring loop"); 796 return true; 797 } 798 else { 799 LOG.info("Application did finished unsuccessfully." 800 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() 801 + ". Breaking monitoring loop"); 802 return false; 803 } 804 } 805 else if (YarnApplicationState.KILLED == state 806 || YarnApplicationState.FAILED == state) { 807 LOG.info("Application did not finish." 808 + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() 809 + ". Breaking monitoring loop"); 810 return false; 811 } 812 813 if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { 814 LOG.info("Reached client specified timeout for application. Killing application"); 815 forceKillApplication(appId); 816 return false; 817 } 818 } 819 820 } 821 822 /** 823 * Kill a submitted application by sending a call to the ASM 824 * @param appId Application Id to be killed. 825 * @throws YarnException 826 * @throws IOException 827 */ 828 private void forceKillApplication(ApplicationId appId) 829 throws YarnException, IOException { 830 // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 831 // the same time. 832 // If yes, can we kill a particular attempt only? 833 834 // Response can be ignored as it is non-null on success or 835 // throws an exception in case of failures 836 yarnClient.killApplication(appId); 837 } 838 839 private void addToLocalResources(FileSystem fs, String fileSrcPath, 840 String fileDstPath, String appId, Map<String, LocalResource> localResources, 841 String resources) throws IOException { 842 String suffix = 843 appName + "/" + appId + "/" + fileDstPath; 844 Path dst = 845 new Path(fs.getHomeDirectory(), suffix); 846 if (fileSrcPath == null) { 847 FSDataOutputStream ostream = null; 848 try { 849 ostream = FileSystem 850 .create(fs, dst, new FsPermission((short) 0710)); 851 ostream.writeUTF(resources); 852 } finally { 853 IOUtils.closeQuietly(ostream); 854 } 855 } else { 856 fs.copyFromLocalFile(new Path(fileSrcPath), dst); 857 } 858 FileStatus scFileStatus = fs.getFileStatus(dst); 859 LocalResource scRsrc = 860 LocalResource.newInstance( 861 URL.fromURI(dst.toUri()), 862 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 863 scFileStatus.getLen(), scFileStatus.getModificationTime()); 864 localResources.put(fileDstPath, scRsrc); 865 } 866 867 private void prepareTimelineDomain() { 868 TimelineClient timelineClient = null; 869 if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, 870 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { 871 timelineClient = TimelineClient.createTimelineClient(); 872 timelineClient.init(conf); 873 timelineClient.start(); 874 } else { 875 LOG.warn("Cannot put the domain " + domainId + 876 " because the timeline service is not enabled"); 877 return; 878 } 879 try { 880 //TODO: we need to check and combine the existing timeline domain ACLs, 881 //but let's do it once we have client java library to query domains. 882 TimelineDomain domain = new TimelineDomain(); 883 domain.setId(domainId); 884 domain.setReaders( 885 viewACLs != null && viewACLs.length() > 0 ? viewACLs : " "); 886 domain.setWriters( 887 modifyACLs != null && modifyACLs.length() > 0 ? modifyACLs : " "); 888 timelineClient.putDomain(domain); 889 LOG.info("Put the timeline domain: " + 890 TimelineUtils.dumpTimelineRecordtoJSON(domain)); 891 } catch (Exception e) { 892 LOG.error("Error when putting the timeline domain", e); 893 } finally { 894 timelineClient.stop(); 895 } 896 } 897}