001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.applications.distributedshell; 020 021import java.io.BufferedReader; 022import java.io.DataInputStream; 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.io.StringReader; 027import java.lang.reflect.UndeclaredThrowableException; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.nio.ByteBuffer; 031import java.security.PrivilegedExceptionAction; 032import java.util.ArrayList; 033import java.util.Collections; 034import java.util.HashMap; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.Vector; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.atomic.AtomicInteger; 043 044import org.apache.commons.cli.CommandLine; 045import org.apache.commons.cli.GnuParser; 046import org.apache.commons.cli.HelpFormatter; 047import org.apache.commons.cli.Options; 048import org.apache.commons.cli.ParseException; 049import org.apache.commons.logging.Log; 050import org.apache.commons.logging.LogFactory; 051import org.apache.hadoop.classification.InterfaceAudience; 052import org.apache.hadoop.classification.InterfaceAudience.Private; 053import org.apache.hadoop.classification.InterfaceStability; 054import org.apache.hadoop.conf.Configuration; 055import org.apache.hadoop.fs.FileSystem; 056import org.apache.hadoop.fs.Path; 057import org.apache.hadoop.io.DataOutputBuffer; 058import org.apache.hadoop.io.IOUtils; 059import org.apache.hadoop.net.NetUtils; 060import org.apache.hadoop.security.Credentials; 061import org.apache.hadoop.security.UserGroupInformation; 062import org.apache.hadoop.security.token.Token; 063import org.apache.hadoop.util.ExitUtil; 064import org.apache.hadoop.util.Shell; 065import org.apache.hadoop.yarn.api.ApplicationConstants; 066import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 067import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; 068import org.apache.hadoop.yarn.api.ContainerManagementProtocol; 069import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; 070import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 071import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; 072import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 073import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 074import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 075import org.apache.hadoop.yarn.api.records.Container; 076import org.apache.hadoop.yarn.api.records.ContainerExitStatus; 077import org.apache.hadoop.yarn.api.records.ContainerId; 078import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 079import org.apache.hadoop.yarn.api.records.ContainerState; 080import org.apache.hadoop.yarn.api.records.ContainerStatus; 081import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 082import org.apache.hadoop.yarn.api.records.LocalResource; 083import org.apache.hadoop.yarn.api.records.LocalResourceType; 084import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 085import org.apache.hadoop.yarn.api.records.NodeReport; 086import org.apache.hadoop.yarn.api.records.Priority; 087import org.apache.hadoop.yarn.api.records.Resource; 088import org.apache.hadoop.yarn.api.records.ResourceRequest; 089import org.apache.hadoop.yarn.api.records.URL; 090import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; 091import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; 092import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; 093import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 094import org.apache.hadoop.yarn.client.api.TimelineClient; 095import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; 096import org.apache.hadoop.yarn.client.api.async.NMClientAsync; 097import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; 098import org.apache.hadoop.yarn.conf.YarnConfiguration; 099import org.apache.hadoop.yarn.exceptions.YarnException; 100import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 101import org.apache.hadoop.yarn.util.ConverterUtils; 102import org.apache.log4j.LogManager; 103 104import com.google.common.annotations.VisibleForTesting; 105 106/** 107 * An ApplicationMaster for executing shell commands on a set of launched 108 * containers using the YARN framework. 109 * 110 * <p> 111 * This class is meant to act as an example on how to write yarn-based 112 * application masters. 113 * </p> 114 * 115 * <p> 116 * The ApplicationMaster is started on a container by the 117 * <code>ResourceManager</code>'s launcher. The first thing that the 118 * <code>ApplicationMaster</code> needs to do is to connect and register itself 119 * with the <code>ResourceManager</code>. The registration sets up information 120 * within the <code>ResourceManager</code> regarding what host:port the 121 * ApplicationMaster is listening on to provide any form of functionality to a 122 * client as well as a tracking url that a client can use to keep track of 123 * status/job history if needed. However, in the distributedshell, trackingurl 124 * and appMasterHost:appMasterRpcPort are not supported. 125 * </p> 126 * 127 * <p> 128 * The <code>ApplicationMaster</code> needs to send a heartbeat to the 129 * <code>ResourceManager</code> at regular intervals to inform the 130 * <code>ResourceManager</code> that it is up and alive. The 131 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the 132 * <code>ApplicationMaster</code> acts as a heartbeat. 133 * 134 * <p> 135 * For the actual handling of the job, the <code>ApplicationMaster</code> has to 136 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the 137 * required no. of containers using {@link ResourceRequest} with the necessary 138 * resource specifications such as node location, computational 139 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code> 140 * responds with an {@link AllocateResponse} that informs the 141 * <code>ApplicationMaster</code> of the set of newly allocated containers, 142 * completed containers as well as current state of available resources. 143 * </p> 144 * 145 * <p> 146 * For each allocated container, the <code>ApplicationMaster</code> can then set 147 * up the necessary launch context via {@link ContainerLaunchContext} to specify 148 * the allocated container id, local resources required by the executable, the 149 * environment to be setup for the executable, commands to execute, etc. and 150 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to 151 * launch and execute the defined commands on the given allocated container. 152 * </p> 153 * 154 * <p> 155 * The <code>ApplicationMaster</code> can monitor the launched container by 156 * either querying the <code>ResourceManager</code> using 157 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via 158 * the {@link ContainerManagementProtocol} by querying for the status of the allocated 159 * container's {@link ContainerId}. 160 * 161 * <p> 162 * After the job has been completed, the <code>ApplicationMaster</code> has to 163 * send a {@link FinishApplicationMasterRequest} to the 164 * <code>ResourceManager</code> to inform it that the 165 * <code>ApplicationMaster</code> has been completed. 166 */ 167@InterfaceAudience.Public 168@InterfaceStability.Unstable 169public class ApplicationMaster { 170 171 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); 172 173 @VisibleForTesting 174 @Private 175 public static enum DSEvent { 176 DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END 177 } 178 179 @VisibleForTesting 180 @Private 181 public static enum DSEntity { 182 DS_APP_ATTEMPT, DS_CONTAINER 183 } 184 185 private static final String YARN_SHELL_ID = "YARN_SHELL_ID"; 186 187 // Configuration 188 private Configuration conf; 189 190 // Handle to communicate with the Resource Manager 191 @SuppressWarnings("rawtypes") 192 private AMRMClientAsync amRMClient; 193 194 // In both secure and non-secure modes, this points to the job-submitter. 195 @VisibleForTesting 196 UserGroupInformation appSubmitterUgi; 197 198 // Handle to communicate with the Node Manager 199 private NMClientAsync nmClientAsync; 200 // Listen to process the response from the Node Manager 201 private NMCallbackHandler containerListener; 202 203 // Application Attempt Id ( combination of attemptId and fail count ) 204 @VisibleForTesting 205 protected ApplicationAttemptId appAttemptID; 206 207 // TODO 208 // For status update for clients - yet to be implemented 209 // Hostname of the container 210 private String appMasterHostname = ""; 211 // Port on which the app master listens for status updates from clients 212 private int appMasterRpcPort = -1; 213 // Tracking url to which app master publishes info for clients to monitor 214 private String appMasterTrackingUrl = ""; 215 216 // App Master configuration 217 // No. of containers to run shell command on 218 @VisibleForTesting 219 protected int numTotalContainers = 1; 220 // Memory to request for the container on which the shell command will run 221 private int containerMemory = 10; 222 // VirtualCores to request for the container on which the shell command will run 223 private int containerVirtualCores = 1; 224 // Priority of the request 225 private int requestPriority; 226 227 // Counter for completed containers ( complete denotes successful or failed ) 228 private AtomicInteger numCompletedContainers = new AtomicInteger(); 229 // Allocated container count so that we know how many containers has the RM 230 // allocated to us 231 @VisibleForTesting 232 protected AtomicInteger numAllocatedContainers = new AtomicInteger(); 233 // Count of failed containers 234 private AtomicInteger numFailedContainers = new AtomicInteger(); 235 // Count of containers already requested from the RM 236 // Needed as once requested, we should not request for containers again. 237 // Only request for more if the original requirement changes. 238 @VisibleForTesting 239 protected AtomicInteger numRequestedContainers = new AtomicInteger(); 240 241 // Shell command to be executed 242 private String shellCommand = ""; 243 // Args to be passed to the shell command 244 private String shellArgs = ""; 245 // Env variables to be setup for the shell command 246 private Map<String, String> shellEnv = new HashMap<String, String>(); 247 248 // Location of shell script ( obtained from info set in env ) 249 // Shell script path in fs 250 private String scriptPath = ""; 251 // Timestamp needed for creating a local resource 252 private long shellScriptPathTimestamp = 0; 253 // File length needed for local resource 254 private long shellScriptPathLen = 0; 255 256 // Timeline domain ID 257 private String domainId = null; 258 259 // Hardcoded path to shell script in launch container's local env 260 private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh"; 261 private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH 262 + ".bat"; 263 264 // Hardcoded path to custom log_properties 265 private static final String log4jPath = "log4j.properties"; 266 267 private static final String shellCommandPath = "shellCommands"; 268 private static final String shellArgsPath = "shellArgs"; 269 270 private volatile boolean done; 271 272 private ByteBuffer allTokens; 273 274 // Launch threads 275 private List<Thread> launchThreads = new ArrayList<Thread>(); 276 277 // Timeline Client 278 @VisibleForTesting 279 TimelineClient timelineClient; 280 281 private final String linux_bash_command = "bash"; 282 private final String windows_command = "cmd /c"; 283 284 private int yarnShellIdCounter = 1; 285 286 @VisibleForTesting 287 protected final Set<ContainerId> launchedContainers = 288 Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>()); 289 290 /** 291 * @param args Command line args 292 */ 293 public static void main(String[] args) { 294 boolean result = false; 295 try { 296 ApplicationMaster appMaster = new ApplicationMaster(); 297 LOG.info("Initializing ApplicationMaster"); 298 boolean doRun = appMaster.init(args); 299 if (!doRun) { 300 System.exit(0); 301 } 302 appMaster.run(); 303 result = appMaster.finish(); 304 } catch (Throwable t) { 305 LOG.fatal("Error running ApplicationMaster", t); 306 LogManager.shutdown(); 307 ExitUtil.terminate(1, t); 308 } 309 if (result) { 310 LOG.info("Application Master completed successfully. exiting"); 311 System.exit(0); 312 } else { 313 LOG.info("Application Master failed. exiting"); 314 System.exit(2); 315 } 316 } 317 318 /** 319 * Dump out contents of $CWD and the environment to stdout for debugging 320 */ 321 private void dumpOutDebugInfo() { 322 323 LOG.info("Dump debug output"); 324 Map<String, String> envs = System.getenv(); 325 for (Map.Entry<String, String> env : envs.entrySet()) { 326 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); 327 System.out.println("System env: key=" + env.getKey() + ", val=" 328 + env.getValue()); 329 } 330 331 BufferedReader buf = null; 332 try { 333 String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : 334 Shell.execCommand("ls", "-al"); 335 buf = new BufferedReader(new StringReader(lines)); 336 String line = ""; 337 while ((line = buf.readLine()) != null) { 338 LOG.info("System CWD content: " + line); 339 System.out.println("System CWD content: " + line); 340 } 341 } catch (IOException e) { 342 e.printStackTrace(); 343 } finally { 344 IOUtils.cleanup(LOG, buf); 345 } 346 } 347 348 public ApplicationMaster() { 349 // Set up the configuration 350 conf = new YarnConfiguration(); 351 } 352 353 /** 354 * Parse command line options 355 * 356 * @param args Command line args 357 * @return Whether init successful and run should be invoked 358 * @throws ParseException 359 * @throws IOException 360 */ 361 public boolean init(String[] args) throws ParseException, IOException { 362 Options opts = new Options(); 363 opts.addOption("app_attempt_id", true, 364 "App Attempt ID. Not to be used unless for testing purposes"); 365 opts.addOption("shell_env", true, 366 "Environment for shell script. Specified as env_key=env_val pairs"); 367 opts.addOption("container_memory", true, 368 "Amount of memory in MB to be requested to run the shell command"); 369 opts.addOption("container_vcores", true, 370 "Amount of virtual cores to be requested to run the shell command"); 371 opts.addOption("num_containers", true, 372 "No. of containers on which the shell command needs to be executed"); 373 opts.addOption("priority", true, "Application Priority. Default 0"); 374 opts.addOption("debug", false, "Dump out debug information"); 375 376 opts.addOption("help", false, "Print usage"); 377 CommandLine cliParser = new GnuParser().parse(opts, args); 378 379 if (args.length == 0) { 380 printUsage(opts); 381 throw new IllegalArgumentException( 382 "No args specified for application master to initialize"); 383 } 384 385 //Check whether customer log4j.properties file exists 386 if (fileExist(log4jPath)) { 387 try { 388 Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, 389 log4jPath); 390 } catch (Exception e) { 391 LOG.warn("Can not set up custom log4j properties. " + e); 392 } 393 } 394 395 if (cliParser.hasOption("help")) { 396 printUsage(opts); 397 return false; 398 } 399 400 if (cliParser.hasOption("debug")) { 401 dumpOutDebugInfo(); 402 } 403 404 Map<String, String> envs = System.getenv(); 405 406 if (!envs.containsKey(Environment.CONTAINER_ID.name())) { 407 if (cliParser.hasOption("app_attempt_id")) { 408 String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); 409 appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); 410 } else { 411 throw new IllegalArgumentException( 412 "Application Attempt Id not set in the environment"); 413 } 414 } else { 415 ContainerId containerId = ConverterUtils.toContainerId(envs 416 .get(Environment.CONTAINER_ID.name())); 417 appAttemptID = containerId.getApplicationAttemptId(); 418 } 419 420 if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { 421 throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV 422 + " not set in the environment"); 423 } 424 if (!envs.containsKey(Environment.NM_HOST.name())) { 425 throw new RuntimeException(Environment.NM_HOST.name() 426 + " not set in the environment"); 427 } 428 if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) { 429 throw new RuntimeException(Environment.NM_HTTP_PORT 430 + " not set in the environment"); 431 } 432 if (!envs.containsKey(Environment.NM_PORT.name())) { 433 throw new RuntimeException(Environment.NM_PORT.name() 434 + " not set in the environment"); 435 } 436 437 LOG.info("Application master for app" + ", appId=" 438 + appAttemptID.getApplicationId().getId() + ", clustertimestamp=" 439 + appAttemptID.getApplicationId().getClusterTimestamp() 440 + ", attemptId=" + appAttemptID.getAttemptId()); 441 442 if (!fileExist(shellCommandPath) 443 && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) { 444 throw new IllegalArgumentException( 445 "No shell command or shell script specified to be executed by application master"); 446 } 447 448 if (fileExist(shellCommandPath)) { 449 shellCommand = readContent(shellCommandPath); 450 } 451 452 if (fileExist(shellArgsPath)) { 453 shellArgs = readContent(shellArgsPath); 454 } 455 456 if (cliParser.hasOption("shell_env")) { 457 String shellEnvs[] = cliParser.getOptionValues("shell_env"); 458 for (String env : shellEnvs) { 459 env = env.trim(); 460 int index = env.indexOf('='); 461 if (index == -1) { 462 shellEnv.put(env, ""); 463 continue; 464 } 465 String key = env.substring(0, index); 466 String val = ""; 467 if (index < (env.length() - 1)) { 468 val = env.substring(index + 1); 469 } 470 shellEnv.put(key, val); 471 } 472 } 473 474 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { 475 scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); 476 477 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { 478 shellScriptPathTimestamp = Long.parseLong(envs 479 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); 480 } 481 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { 482 shellScriptPathLen = Long.parseLong(envs 483 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); 484 } 485 if (!scriptPath.isEmpty() 486 && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { 487 LOG.error("Illegal values in env for shell script path" + ", path=" 488 + scriptPath + ", len=" + shellScriptPathLen + ", timestamp=" 489 + shellScriptPathTimestamp); 490 throw new IllegalArgumentException( 491 "Illegal values in env for shell script path"); 492 } 493 } 494 495 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { 496 domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); 497 } 498 499 containerMemory = Integer.parseInt(cliParser.getOptionValue( 500 "container_memory", "10")); 501 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( 502 "container_vcores", "1")); 503 numTotalContainers = Integer.parseInt(cliParser.getOptionValue( 504 "num_containers", "1")); 505 if (numTotalContainers == 0) { 506 throw new IllegalArgumentException( 507 "Cannot run distributed shell with no containers"); 508 } 509 requestPriority = Integer.parseInt(cliParser 510 .getOptionValue("priority", "0")); 511 return true; 512 } 513 514 /** 515 * Helper function to print usage 516 * 517 * @param opts Parsed command line options 518 */ 519 private void printUsage(Options opts) { 520 new HelpFormatter().printHelp("ApplicationMaster", opts); 521 } 522 523 /** 524 * Main run function for the application master 525 * 526 * @throws YarnException 527 * @throws IOException 528 */ 529 @SuppressWarnings({ "unchecked" }) 530 public void run() throws YarnException, IOException, InterruptedException { 531 LOG.info("Starting ApplicationMaster"); 532 533 // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class 534 // are marked as LimitedPrivate 535 Credentials credentials = 536 UserGroupInformation.getCurrentUser().getCredentials(); 537 DataOutputBuffer dob = new DataOutputBuffer(); 538 credentials.writeTokenStorageToStream(dob); 539 // Now remove the AM->RM token so that containers cannot access it. 540 Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); 541 LOG.info("Executing with tokens:"); 542 while (iter.hasNext()) { 543 Token<?> token = iter.next(); 544 LOG.info(token); 545 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { 546 iter.remove(); 547 } 548 } 549 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 550 551 // Create appSubmitterUgi and add original tokens to it 552 String appSubmitterUserName = 553 System.getenv(ApplicationConstants.Environment.USER.name()); 554 appSubmitterUgi = 555 UserGroupInformation.createRemoteUser(appSubmitterUserName); 556 appSubmitterUgi.addCredentials(credentials); 557 558 559 AMRMClientAsync.AbstractCallbackHandler allocListener = 560 new RMCallbackHandler(); 561 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); 562 amRMClient.init(conf); 563 amRMClient.start(); 564 565 containerListener = createNMCallbackHandler(); 566 nmClientAsync = new NMClientAsyncImpl(containerListener); 567 nmClientAsync.init(conf); 568 nmClientAsync.start(); 569 570 startTimelineClient(conf); 571 if(timelineClient != null) { 572 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 573 DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); 574 } 575 576 // Setup local RPC Server to accept status requests directly from clients 577 // TODO need to setup a protocol for client to be able to communicate to 578 // the RPC server 579 // TODO use the rpc port info to register with the RM for the client to 580 // send requests to this app master 581 582 // Register self with ResourceManager 583 // This will start heartbeating to the RM 584 appMasterHostname = NetUtils.getHostname(); 585 RegisterApplicationMasterResponse response = amRMClient 586 .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 587 appMasterTrackingUrl); 588 // Dump out information about cluster capability as seen by the 589 // resource manager 590 int maxMem = response.getMaximumResourceCapability().getMemory(); 591 LOG.info("Max mem capability of resources in this cluster " + maxMem); 592 593 int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); 594 LOG.info("Max vcores capability of resources in this cluster " + maxVCores); 595 596 // A resource ask cannot exceed the max. 597 if (containerMemory > maxMem) { 598 LOG.info("Container memory specified above max threshold of cluster." 599 + " Using max value." + ", specified=" + containerMemory + ", max=" 600 + maxMem); 601 containerMemory = maxMem; 602 } 603 604 if (containerVirtualCores > maxVCores) { 605 LOG.info("Container virtual cores specified above max threshold of cluster." 606 + " Using max value." + ", specified=" + containerVirtualCores + ", max=" 607 + maxVCores); 608 containerVirtualCores = maxVCores; 609 } 610 611 List<Container> previousAMRunningContainers = 612 response.getContainersFromPreviousAttempts(); 613 LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() 614 + " previous attempts' running containers on AM registration."); 615 for(Container container: previousAMRunningContainers) { 616 launchedContainers.add(container.getId()); 617 } 618 numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); 619 620 621 int numTotalContainersToRequest = 622 numTotalContainers - previousAMRunningContainers.size(); 623 // Setup ask for containers from RM 624 // Send request for containers to RM 625 // Until we get our fully allocated quota, we keep on polling RM for 626 // containers 627 // Keep looping until all the containers are launched and shell script 628 // executed on them ( regardless of success/failure). 629 for (int i = 0; i < numTotalContainersToRequest; ++i) { 630 ContainerRequest containerAsk = setupContainerAskForRM(); 631 amRMClient.addContainerRequest(containerAsk); 632 } 633 numRequestedContainers.set(numTotalContainers); 634 } 635 636 @VisibleForTesting 637 void startTimelineClient(final Configuration conf) 638 throws YarnException, IOException, InterruptedException { 639 try { 640 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 641 @Override 642 public Void run() throws Exception { 643 if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, 644 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { 645 // Creating the Timeline Client 646 timelineClient = TimelineClient.createTimelineClient(); 647 timelineClient.init(conf); 648 timelineClient.start(); 649 } else { 650 timelineClient = null; 651 LOG.warn("Timeline service is not enabled"); 652 } 653 return null; 654 } 655 }); 656 } catch (UndeclaredThrowableException e) { 657 throw new YarnException(e.getCause()); 658 } 659 } 660 661 @VisibleForTesting 662 NMCallbackHandler createNMCallbackHandler() { 663 return new NMCallbackHandler(this); 664 } 665 666 @VisibleForTesting 667 protected boolean finish() { 668 // wait for completion. 669 while (!done 670 && (numCompletedContainers.get() != numTotalContainers)) { 671 try { 672 Thread.sleep(200); 673 } catch (InterruptedException ex) {} 674 } 675 676 if(timelineClient != null) { 677 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 678 DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); 679 } 680 681 // Join all launched threads 682 // needed for when we time out 683 // and we need to release containers 684 for (Thread launchThread : launchThreads) { 685 try { 686 launchThread.join(10000); 687 } catch (InterruptedException e) { 688 LOG.info("Exception thrown in thread join: " + e.getMessage()); 689 e.printStackTrace(); 690 } 691 } 692 693 // When the application completes, it should stop all running containers 694 LOG.info("Application completed. Stopping running containers"); 695 nmClientAsync.stop(); 696 697 // When the application completes, it should send a finish application 698 // signal to the RM 699 LOG.info("Application completed. Signalling finish to RM"); 700 701 FinalApplicationStatus appStatus; 702 String appMessage = null; 703 boolean success = true; 704 if (numFailedContainers.get() == 0 && 705 numCompletedContainers.get() == numTotalContainers) { 706 appStatus = FinalApplicationStatus.SUCCEEDED; 707 } else { 708 appStatus = FinalApplicationStatus.FAILED; 709 appMessage = "Diagnostics." + ", total=" + numTotalContainers 710 + ", completed=" + numCompletedContainers.get() + ", allocated=" 711 + numAllocatedContainers.get() + ", failed=" 712 + numFailedContainers.get(); 713 LOG.info(appMessage); 714 success = false; 715 } 716 try { 717 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); 718 } catch (YarnException ex) { 719 LOG.error("Failed to unregister application", ex); 720 } catch (IOException e) { 721 LOG.error("Failed to unregister application", e); 722 } 723 724 amRMClient.stop(); 725 726 // Stop Timeline Client 727 if(timelineClient != null) { 728 timelineClient.stop(); 729 } 730 731 return success; 732 } 733 734 @VisibleForTesting 735 class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler { 736 @SuppressWarnings("unchecked") 737 @Override 738 public void onContainersCompleted(List<ContainerStatus> completedContainers) { 739 LOG.info("Got response from RM for container ask, completedCnt=" 740 + completedContainers.size()); 741 for (ContainerStatus containerStatus : completedContainers) { 742 LOG.info(appAttemptID + " got container status for containerID=" 743 + containerStatus.getContainerId() + ", state=" 744 + containerStatus.getState() + ", exitStatus=" 745 + containerStatus.getExitStatus() + ", diagnostics=" 746 + containerStatus.getDiagnostics()); 747 748 // non complete containers should not be here 749 assert (containerStatus.getState() == ContainerState.COMPLETE); 750 // ignore containers we know nothing about - probably from a previous 751 // attempt 752 if (!launchedContainers.contains(containerStatus.getContainerId())) { 753 LOG.info("Ignoring completed status of " 754 + containerStatus.getContainerId() 755 + "; unknown container(probably launched by previous attempt)"); 756 continue; 757 } 758 759 // increment counters for completed/failed containers 760 int exitStatus = containerStatus.getExitStatus(); 761 if (0 != exitStatus) { 762 // container failed 763 if (ContainerExitStatus.ABORTED != exitStatus) { 764 // shell script failed 765 // counts as completed 766 numCompletedContainers.incrementAndGet(); 767 numFailedContainers.incrementAndGet(); 768 } else { 769 // container was killed by framework, possibly preempted 770 // we should re-try as the container was lost for some reason 771 numAllocatedContainers.decrementAndGet(); 772 numRequestedContainers.decrementAndGet(); 773 // we do not need to release the container as it would be done 774 // by the RM 775 } 776 } else { 777 // nothing to do 778 // container completed successfully 779 numCompletedContainers.incrementAndGet(); 780 LOG.info("Container completed successfully." + ", containerId=" 781 + containerStatus.getContainerId()); 782 } 783 if(timelineClient != null) { 784 publishContainerEndEvent( 785 timelineClient, containerStatus, domainId, appSubmitterUgi); 786 } 787 } 788 789 // ask for more containers if any failed 790 int askCount = numTotalContainers - numRequestedContainers.get(); 791 numRequestedContainers.addAndGet(askCount); 792 793 if (askCount > 0) { 794 for (int i = 0; i < askCount; ++i) { 795 ContainerRequest containerAsk = setupContainerAskForRM(); 796 amRMClient.addContainerRequest(containerAsk); 797 } 798 } 799 800 if (numCompletedContainers.get() == numTotalContainers) { 801 done = true; 802 } 803 } 804 805 @Override 806 public void onContainersAllocated(List<Container> allocatedContainers) { 807 LOG.info("Got response from RM for container ask, allocatedCnt=" 808 + allocatedContainers.size()); 809 numAllocatedContainers.addAndGet(allocatedContainers.size()); 810 for (Container allocatedContainer : allocatedContainers) { 811 String yarnShellId = Integer.toString(yarnShellIdCounter); 812 yarnShellIdCounter++; 813 LOG.info("Launching shell command on a new container." 814 + ", containerId=" + allocatedContainer.getId() 815 + ", yarnShellId=" + yarnShellId 816 + ", containerNode=" + allocatedContainer.getNodeId().getHost() 817 + ":" + allocatedContainer.getNodeId().getPort() 818 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() 819 + ", containerResourceMemory" 820 + allocatedContainer.getResource().getMemory() 821 + ", containerResourceVirtualCores" 822 + allocatedContainer.getResource().getVirtualCores()); 823 // + ", containerToken" 824 // +allocatedContainer.getContainerToken().getIdentifier().toString()); 825 826 Thread launchThread = createLaunchContainerThread(allocatedContainer, 827 yarnShellId); 828 829 // launch and start the container on a separate thread to keep 830 // the main thread unblocked 831 // as all containers may not be allocated at one go. 832 launchThreads.add(launchThread); 833 launchedContainers.add(allocatedContainer.getId()); 834 launchThread.start(); 835 } 836 } 837 838 @Override 839 public void onContainersResourceChanged(List<Container> containers) {} 840 841 @Override 842 public void onShutdownRequest() { 843 done = true; 844 } 845 846 @Override 847 public void onNodesUpdated(List<NodeReport> updatedNodes) {} 848 849 @Override 850 public float getProgress() { 851 // set progress to deliver to RM on next heartbeat 852 float progress = (float) numCompletedContainers.get() 853 / numTotalContainers; 854 return progress; 855 } 856 857 @Override 858 public void onError(Throwable e) { 859 done = true; 860 amRMClient.stop(); 861 } 862 } 863 864 @VisibleForTesting 865 static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler { 866 867 private ConcurrentMap<ContainerId, Container> containers = 868 new ConcurrentHashMap<ContainerId, Container>(); 869 private final ApplicationMaster applicationMaster; 870 871 public NMCallbackHandler(ApplicationMaster applicationMaster) { 872 this.applicationMaster = applicationMaster; 873 } 874 875 public void addContainer(ContainerId containerId, Container container) { 876 containers.putIfAbsent(containerId, container); 877 } 878 879 @Override 880 public void onContainerStopped(ContainerId containerId) { 881 if (LOG.isDebugEnabled()) { 882 LOG.debug("Succeeded to stop Container " + containerId); 883 } 884 containers.remove(containerId); 885 } 886 887 @Override 888 public void onContainerStatusReceived(ContainerId containerId, 889 ContainerStatus containerStatus) { 890 if (LOG.isDebugEnabled()) { 891 LOG.debug("Container Status: id=" + containerId + ", status=" + 892 containerStatus); 893 } 894 } 895 896 @Override 897 public void onContainerStarted(ContainerId containerId, 898 Map<String, ByteBuffer> allServiceResponse) { 899 if (LOG.isDebugEnabled()) { 900 LOG.debug("Succeeded to start Container " + containerId); 901 } 902 Container container = containers.get(containerId); 903 if (container != null) { 904 applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); 905 } 906 if(applicationMaster.timelineClient != null) { 907 ApplicationMaster.publishContainerStartEvent( 908 applicationMaster.timelineClient, container, 909 applicationMaster.domainId, applicationMaster.appSubmitterUgi); 910 } 911 } 912 913 @Override 914 public void onContainerResourceIncreased( 915 ContainerId containerId, Resource resource) {} 916 917 @Override 918 public void onStartContainerError(ContainerId containerId, Throwable t) { 919 LOG.error("Failed to start Container " + containerId); 920 containers.remove(containerId); 921 applicationMaster.numCompletedContainers.incrementAndGet(); 922 applicationMaster.numFailedContainers.incrementAndGet(); 923 } 924 925 @Override 926 public void onGetContainerStatusError( 927 ContainerId containerId, Throwable t) { 928 LOG.error("Failed to query the status of Container " + containerId); 929 } 930 931 @Override 932 public void onStopContainerError(ContainerId containerId, Throwable t) { 933 LOG.error("Failed to stop Container " + containerId); 934 containers.remove(containerId); 935 } 936 937 @Override 938 public void onIncreaseContainerResourceError( 939 ContainerId containerId, Throwable t) {} 940 941 } 942 943 /** 944 * Thread to connect to the {@link ContainerManagementProtocol} and launch the container 945 * that will execute the shell command. 946 */ 947 private class LaunchContainerRunnable implements Runnable { 948 949 // Allocated container 950 private Container container; 951 private String shellId; 952 953 NMCallbackHandler containerListener; 954 955 /** 956 * @param lcontainer Allocated container 957 * @param containerListener Callback handler of the container 958 */ 959 public LaunchContainerRunnable(Container lcontainer, 960 NMCallbackHandler containerListener, String shellId) { 961 this.container = lcontainer; 962 this.containerListener = containerListener; 963 this.shellId = shellId; 964 } 965 966 @Override 967 /** 968 * Connects to CM, sets up container launch context 969 * for shell command and eventually dispatches the container 970 * start request to the CM. 971 */ 972 public void run() { 973 LOG.info("Setting up container launch container for containerid=" 974 + container.getId() + " with shellid=" + shellId); 975 976 // Set the local resources 977 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 978 979 // The container for the eventual shell commands needs its own local 980 // resources too. 981 // In this scenario, if a shell script is specified, we need to have it 982 // copied and made available to the container. 983 if (!scriptPath.isEmpty()) { 984 Path renamedScriptPath = null; 985 if (Shell.WINDOWS) { 986 renamedScriptPath = new Path(scriptPath + ".bat"); 987 } else { 988 renamedScriptPath = new Path(scriptPath + ".sh"); 989 } 990 991 try { 992 // rename the script file based on the underlying OS syntax. 993 renameScriptFile(renamedScriptPath); 994 } catch (Exception e) { 995 LOG.error( 996 "Not able to add suffix (.bat/.sh) to the shell script filename", 997 e); 998 // We know we cannot continue launching the container 999 // so we should release it. 1000 numCompletedContainers.incrementAndGet(); 1001 numFailedContainers.incrementAndGet(); 1002 return; 1003 } 1004 1005 URL yarnUrl = null; 1006 try { 1007 yarnUrl = ConverterUtils.getYarnUrlFromURI( 1008 new URI(renamedScriptPath.toString())); 1009 } catch (URISyntaxException e) { 1010 LOG.error("Error when trying to use shell script path specified" 1011 + " in env, path=" + renamedScriptPath, e); 1012 // A failure scenario on bad input such as invalid shell script path 1013 // We know we cannot continue launching the container 1014 // so we should release it. 1015 // TODO 1016 numCompletedContainers.incrementAndGet(); 1017 numFailedContainers.incrementAndGet(); 1018 return; 1019 } 1020 LocalResource shellRsrc = LocalResource.newInstance(yarnUrl, 1021 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 1022 shellScriptPathLen, shellScriptPathTimestamp); 1023 localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath : 1024 ExecShellStringPath, shellRsrc); 1025 shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; 1026 } 1027 1028 // Set the necessary command to execute on the allocated container 1029 Vector<CharSequence> vargs = new Vector<CharSequence>(5); 1030 1031 // Set executable command 1032 vargs.add(shellCommand); 1033 // Set shell script path 1034 if (!scriptPath.isEmpty()) { 1035 vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath 1036 : ExecShellStringPath); 1037 } 1038 1039 // Set args for the shell command if any 1040 vargs.add(shellArgs); 1041 // Add log redirect params 1042 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); 1043 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); 1044 1045 // Get final commmand 1046 StringBuilder command = new StringBuilder(); 1047 for (CharSequence str : vargs) { 1048 command.append(str).append(" "); 1049 } 1050 1051 List<String> commands = new ArrayList<String>(); 1052 commands.add(command.toString()); 1053 1054 // Set up ContainerLaunchContext, setting local resource, environment, 1055 // command and token for constructor. 1056 1057 // Note for tokens: Set up tokens for the container too. Today, for normal 1058 // shell commands, the container in distribute-shell doesn't need any 1059 // tokens. We are populating them mainly for NodeManagers to be able to 1060 // download anyfiles in the distributed file-system. The tokens are 1061 // otherwise also useful in cases, for e.g., when one is running a 1062 // "hadoop dfs" command inside the distributed shell. 1063 Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv); 1064 myShellEnv.put(YARN_SHELL_ID, shellId); 1065 ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( 1066 localResources, myShellEnv, commands, null, allTokens.duplicate(), 1067 null); 1068 containerListener.addContainer(container.getId(), container); 1069 nmClientAsync.startContainerAsync(container, ctx); 1070 } 1071 } 1072 1073 private void renameScriptFile(final Path renamedScriptPath) 1074 throws IOException, InterruptedException { 1075 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 1076 @Override 1077 public Void run() throws IOException { 1078 FileSystem fs = renamedScriptPath.getFileSystem(conf); 1079 fs.rename(new Path(scriptPath), renamedScriptPath); 1080 return null; 1081 } 1082 }); 1083 LOG.info("User " + appSubmitterUgi.getUserName() 1084 + " added suffix(.sh/.bat) to script file as " + renamedScriptPath); 1085 } 1086 1087 /** 1088 * Setup the request that will be sent to the RM for the container ask. 1089 * 1090 * @return the setup ResourceRequest to be sent to RM 1091 */ 1092 private ContainerRequest setupContainerAskForRM() { 1093 // setup requirements for hosts 1094 // using * as any host will do for the distributed shell app 1095 // set the priority for the request 1096 // TODO - what is the range for priority? how to decide? 1097 Priority pri = Priority.newInstance(requestPriority); 1098 1099 // Set up resource type requirements 1100 // For now, memory and CPU are supported so we set memory and cpu requirements 1101 Resource capability = Resource.newInstance(containerMemory, 1102 containerVirtualCores); 1103 1104 ContainerRequest request = new ContainerRequest(capability, null, null, 1105 pri); 1106 LOG.info("Requested container ask: " + request.toString()); 1107 return request; 1108 } 1109 1110 private boolean fileExist(String filePath) { 1111 return new File(filePath).exists(); 1112 } 1113 1114 private String readContent(String filePath) throws IOException { 1115 DataInputStream ds = null; 1116 try { 1117 ds = new DataInputStream(new FileInputStream(filePath)); 1118 return ds.readUTF(); 1119 } finally { 1120 org.apache.commons.io.IOUtils.closeQuietly(ds); 1121 } 1122 } 1123 1124 private static void publishContainerStartEvent( 1125 final TimelineClient timelineClient, Container container, String domainId, 1126 UserGroupInformation ugi) { 1127 final TimelineEntity entity = new TimelineEntity(); 1128 entity.setEntityId(container.getId().toString()); 1129 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1130 entity.setDomainId(domainId); 1131 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1132 TimelineEvent event = new TimelineEvent(); 1133 event.setTimestamp(System.currentTimeMillis()); 1134 event.setEventType(DSEvent.DS_CONTAINER_START.toString()); 1135 event.addEventInfo("Node", container.getNodeId().toString()); 1136 event.addEventInfo("Resources", container.getResource().toString()); 1137 entity.addEvent(event); 1138 1139 try { 1140 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { 1141 @Override 1142 public TimelinePutResponse run() throws Exception { 1143 return processTimelineResponseErrors( 1144 timelineClient.putEntities(entity)); 1145 } 1146 }); 1147 } catch (Exception e) { 1148 LOG.error("Container start event could not be published for " 1149 + container.getId().toString(), 1150 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1151 } 1152 } 1153 1154 private static void publishContainerEndEvent( 1155 final TimelineClient timelineClient, ContainerStatus container, 1156 String domainId, UserGroupInformation ugi) { 1157 final TimelineEntity entity = new TimelineEntity(); 1158 entity.setEntityId(container.getContainerId().toString()); 1159 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1160 entity.setDomainId(domainId); 1161 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1162 TimelineEvent event = new TimelineEvent(); 1163 event.setTimestamp(System.currentTimeMillis()); 1164 event.setEventType(DSEvent.DS_CONTAINER_END.toString()); 1165 event.addEventInfo("State", container.getState().name()); 1166 event.addEventInfo("Exit Status", container.getExitStatus()); 1167 entity.addEvent(event); 1168 try { 1169 TimelinePutResponse response = timelineClient.putEntities(entity); 1170 processTimelineResponseErrors(response); 1171 } catch (YarnException | IOException e) { 1172 LOG.error("Container end event could not be published for " 1173 + container.getContainerId().toString(), e); 1174 } 1175 } 1176 1177 private static void publishApplicationAttemptEvent( 1178 final TimelineClient timelineClient, String appAttemptId, 1179 DSEvent appEvent, String domainId, UserGroupInformation ugi) { 1180 final TimelineEntity entity = new TimelineEntity(); 1181 entity.setEntityId(appAttemptId); 1182 entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); 1183 entity.setDomainId(domainId); 1184 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1185 TimelineEvent event = new TimelineEvent(); 1186 event.setEventType(appEvent.toString()); 1187 event.setTimestamp(System.currentTimeMillis()); 1188 entity.addEvent(event); 1189 try { 1190 TimelinePutResponse response = timelineClient.putEntities(entity); 1191 processTimelineResponseErrors(response); 1192 } catch (YarnException | IOException e) { 1193 LOG.error("App Attempt " 1194 + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") 1195 + " event could not be published for " 1196 + appAttemptId.toString(), e); 1197 } 1198 } 1199 1200 private static TimelinePutResponse processTimelineResponseErrors( 1201 TimelinePutResponse response) { 1202 List<TimelinePutResponse.TimelinePutError> errors = response.getErrors(); 1203 if (errors.size() == 0) { 1204 LOG.debug("Timeline entities are successfully put"); 1205 } else { 1206 for (TimelinePutResponse.TimelinePutError error : errors) { 1207 LOG.error( 1208 "Error when publishing entity [" + error.getEntityType() + "," 1209 + error.getEntityId() + "], server side error code: " 1210 + error.getErrorCode()); 1211 } 1212 } 1213 return response; 1214 } 1215 1216 RMCallbackHandler getRMCallbackHandler() { 1217 return new RMCallbackHandler(); 1218 } 1219 1220 @VisibleForTesting 1221 void setAmRMClient(AMRMClientAsync client) { 1222 this.amRMClient = client; 1223 } 1224 1225 @VisibleForTesting 1226 int getNumCompletedContainers() { 1227 return numCompletedContainers.get(); 1228 } 1229 1230 @VisibleForTesting 1231 boolean getDone() { 1232 return done; 1233 } 1234 1235 @VisibleForTesting 1236 Thread createLaunchContainerThread(Container allocatedContainer, 1237 String shellId) { 1238 LaunchContainerRunnable runnableLaunchContainer = 1239 new LaunchContainerRunnable(allocatedContainer, containerListener, 1240 shellId); 1241 return new Thread(runnableLaunchContainer); 1242 } 1243}