001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.applications.distributedshell; 020 021import java.io.BufferedReader; 022import java.io.DataInputStream; 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.io.StringReader; 027import java.lang.reflect.UndeclaredThrowableException; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.nio.ByteBuffer; 031import java.security.PrivilegedExceptionAction; 032import java.util.ArrayList; 033import java.util.Collections; 034import java.util.HashMap; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.Set; 039import java.util.Vector; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.ConcurrentMap; 042import java.util.concurrent.atomic.AtomicInteger; 043 044import org.apache.commons.cli.CommandLine; 045import org.apache.commons.cli.GnuParser; 046import org.apache.commons.cli.HelpFormatter; 047import org.apache.commons.cli.Options; 048import org.apache.commons.cli.ParseException; 049import org.apache.commons.logging.Log; 050import org.apache.commons.logging.LogFactory; 051import org.apache.hadoop.classification.InterfaceAudience; 052import org.apache.hadoop.classification.InterfaceAudience.Private; 053import org.apache.hadoop.classification.InterfaceStability; 054import org.apache.hadoop.conf.Configuration; 055import org.apache.hadoop.fs.FileSystem; 056import org.apache.hadoop.fs.Path; 057import org.apache.hadoop.io.DataOutputBuffer; 058import org.apache.hadoop.io.IOUtils; 059import org.apache.hadoop.net.NetUtils; 060import org.apache.hadoop.security.Credentials; 061import org.apache.hadoop.security.UserGroupInformation; 062import org.apache.hadoop.security.token.Token; 063import org.apache.hadoop.util.ExitUtil; 064import org.apache.hadoop.util.Shell; 065import org.apache.hadoop.yarn.api.ApplicationConstants; 066import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 067import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; 068import org.apache.hadoop.yarn.api.ContainerManagementProtocol; 069import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; 070import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 071import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; 072import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 073import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 074import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 075import org.apache.hadoop.yarn.api.records.Container; 076import org.apache.hadoop.yarn.api.records.ContainerExitStatus; 077import org.apache.hadoop.yarn.api.records.ContainerId; 078import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 079import org.apache.hadoop.yarn.api.records.ContainerState; 080import org.apache.hadoop.yarn.api.records.ContainerStatus; 081import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 082import org.apache.hadoop.yarn.api.records.LocalResource; 083import org.apache.hadoop.yarn.api.records.LocalResourceType; 084import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 085import org.apache.hadoop.yarn.api.records.NodeReport; 086import org.apache.hadoop.yarn.api.records.Priority; 087import org.apache.hadoop.yarn.api.records.Resource; 088import org.apache.hadoop.yarn.api.records.ResourceRequest; 089import org.apache.hadoop.yarn.api.records.URL; 090import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; 091import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; 092import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; 093import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 094import org.apache.hadoop.yarn.client.api.TimelineClient; 095import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; 096import org.apache.hadoop.yarn.client.api.async.NMClientAsync; 097import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; 098import org.apache.hadoop.yarn.conf.YarnConfiguration; 099import org.apache.hadoop.yarn.exceptions.YarnException; 100import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 101import org.apache.hadoop.yarn.util.ConverterUtils; 102import org.apache.log4j.LogManager; 103 104import com.google.common.annotations.VisibleForTesting; 105 106/** 107 * An ApplicationMaster for executing shell commands on a set of launched 108 * containers using the YARN framework. 109 * 110 * <p> 111 * This class is meant to act as an example on how to write yarn-based 112 * application masters. 113 * </p> 114 * 115 * <p> 116 * The ApplicationMaster is started on a container by the 117 * <code>ResourceManager</code>'s launcher. The first thing that the 118 * <code>ApplicationMaster</code> needs to do is to connect and register itself 119 * with the <code>ResourceManager</code>. The registration sets up information 120 * within the <code>ResourceManager</code> regarding what host:port the 121 * ApplicationMaster is listening on to provide any form of functionality to a 122 * client as well as a tracking url that a client can use to keep track of 123 * status/job history if needed. However, in the distributedshell, trackingurl 124 * and appMasterHost:appMasterRpcPort are not supported. 125 * </p> 126 * 127 * <p> 128 * The <code>ApplicationMaster</code> needs to send a heartbeat to the 129 * <code>ResourceManager</code> at regular intervals to inform the 130 * <code>ResourceManager</code> that it is up and alive. The 131 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the 132 * <code>ApplicationMaster</code> acts as a heartbeat. 133 * 134 * <p> 135 * For the actual handling of the job, the <code>ApplicationMaster</code> has to 136 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the 137 * required no. of containers using {@link ResourceRequest} with the necessary 138 * resource specifications such as node location, computational 139 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code> 140 * responds with an {@link AllocateResponse} that informs the 141 * <code>ApplicationMaster</code> of the set of newly allocated containers, 142 * completed containers as well as current state of available resources. 143 * </p> 144 * 145 * <p> 146 * For each allocated container, the <code>ApplicationMaster</code> can then set 147 * up the necessary launch context via {@link ContainerLaunchContext} to specify 148 * the allocated container id, local resources required by the executable, the 149 * environment to be setup for the executable, commands to execute, etc. and 150 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to 151 * launch and execute the defined commands on the given allocated container. 152 * </p> 153 * 154 * <p> 155 * The <code>ApplicationMaster</code> can monitor the launched container by 156 * either querying the <code>ResourceManager</code> using 157 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via 158 * the {@link ContainerManagementProtocol} by querying for the status of the allocated 159 * container's {@link ContainerId}. 160 * 161 * <p> 162 * After the job has been completed, the <code>ApplicationMaster</code> has to 163 * send a {@link FinishApplicationMasterRequest} to the 164 * <code>ResourceManager</code> to inform it that the 165 * <code>ApplicationMaster</code> has been completed. 166 */ 167@InterfaceAudience.Public 168@InterfaceStability.Unstable 169public class ApplicationMaster { 170 171 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); 172 173 @VisibleForTesting 174 @Private 175 public static enum DSEvent { 176 DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END 177 } 178 179 @VisibleForTesting 180 @Private 181 public static enum DSEntity { 182 DS_APP_ATTEMPT, DS_CONTAINER 183 } 184 185 private static final String YARN_SHELL_ID = "YARN_SHELL_ID"; 186 187 // Configuration 188 private Configuration conf; 189 190 // Handle to communicate with the Resource Manager 191 @SuppressWarnings("rawtypes") 192 private AMRMClientAsync amRMClient; 193 194 // In both secure and non-secure modes, this points to the job-submitter. 195 @VisibleForTesting 196 UserGroupInformation appSubmitterUgi; 197 198 // Handle to communicate with the Node Manager 199 private NMClientAsync nmClientAsync; 200 // Listen to process the response from the Node Manager 201 private NMCallbackHandler containerListener; 202 203 // Application Attempt Id ( combination of attemptId and fail count ) 204 @VisibleForTesting 205 protected ApplicationAttemptId appAttemptID; 206 207 // TODO 208 // For status update for clients - yet to be implemented 209 // Hostname of the container 210 private String appMasterHostname = ""; 211 // Port on which the app master listens for status updates from clients 212 private int appMasterRpcPort = -1; 213 // Tracking url to which app master publishes info for clients to monitor 214 private String appMasterTrackingUrl = ""; 215 216 // App Master configuration 217 // No. of containers to run shell command on 218 @VisibleForTesting 219 protected int numTotalContainers = 1; 220 // Memory to request for the container on which the shell command will run 221 private int containerMemory = 10; 222 // VirtualCores to request for the container on which the shell command will run 223 private int containerVirtualCores = 1; 224 // Priority of the request 225 private int requestPriority; 226 227 // Counter for completed containers ( complete denotes successful or failed ) 228 private AtomicInteger numCompletedContainers = new AtomicInteger(); 229 // Allocated container count so that we know how many containers has the RM 230 // allocated to us 231 @VisibleForTesting 232 protected AtomicInteger numAllocatedContainers = new AtomicInteger(); 233 // Count of failed containers 234 private AtomicInteger numFailedContainers = new AtomicInteger(); 235 // Count of containers already requested from the RM 236 // Needed as once requested, we should not request for containers again. 237 // Only request for more if the original requirement changes. 238 @VisibleForTesting 239 protected AtomicInteger numRequestedContainers = new AtomicInteger(); 240 241 // Shell command to be executed 242 private String shellCommand = ""; 243 // Args to be passed to the shell command 244 private String shellArgs = ""; 245 // Env variables to be setup for the shell command 246 private Map<String, String> shellEnv = new HashMap<String, String>(); 247 248 // Location of shell script ( obtained from info set in env ) 249 // Shell script path in fs 250 private String scriptPath = ""; 251 // Timestamp needed for creating a local resource 252 private long shellScriptPathTimestamp = 0; 253 // File length needed for local resource 254 private long shellScriptPathLen = 0; 255 256 // Timeline domain ID 257 private String domainId = null; 258 259 // Hardcoded path to shell script in launch container's local env 260 private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh"; 261 private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH 262 + ".bat"; 263 264 // Hardcoded path to custom log_properties 265 private static final String log4jPath = "log4j.properties"; 266 267 private static final String shellCommandPath = "shellCommands"; 268 private static final String shellArgsPath = "shellArgs"; 269 270 private volatile boolean done; 271 272 private ByteBuffer allTokens; 273 274 // Launch threads 275 private List<Thread> launchThreads = new ArrayList<Thread>(); 276 277 // Timeline Client 278 @VisibleForTesting 279 TimelineClient timelineClient; 280 281 private final String linux_bash_command = "bash"; 282 private final String windows_command = "cmd /c"; 283 284 private int yarnShellIdCounter = 1; 285 286 @VisibleForTesting 287 protected final Set<ContainerId> launchedContainers = 288 Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>()); 289 290 /** 291 * @param args Command line args 292 */ 293 public static void main(String[] args) { 294 boolean result = false; 295 try { 296 ApplicationMaster appMaster = new ApplicationMaster(); 297 LOG.info("Initializing ApplicationMaster"); 298 boolean doRun = appMaster.init(args); 299 if (!doRun) { 300 System.exit(0); 301 } 302 appMaster.run(); 303 result = appMaster.finish(); 304 } catch (Throwable t) { 305 LOG.fatal("Error running ApplicationMaster", t); 306 LogManager.shutdown(); 307 ExitUtil.terminate(1, t); 308 } 309 if (result) { 310 LOG.info("Application Master completed successfully. exiting"); 311 System.exit(0); 312 } else { 313 LOG.info("Application Master failed. exiting"); 314 System.exit(2); 315 } 316 } 317 318 /** 319 * Dump out contents of $CWD and the environment to stdout for debugging 320 */ 321 private void dumpOutDebugInfo() { 322 323 LOG.info("Dump debug output"); 324 Map<String, String> envs = System.getenv(); 325 for (Map.Entry<String, String> env : envs.entrySet()) { 326 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); 327 System.out.println("System env: key=" + env.getKey() + ", val=" 328 + env.getValue()); 329 } 330 331 BufferedReader buf = null; 332 try { 333 String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : 334 Shell.execCommand("ls", "-al"); 335 buf = new BufferedReader(new StringReader(lines)); 336 String line = ""; 337 while ((line = buf.readLine()) != null) { 338 LOG.info("System CWD content: " + line); 339 System.out.println("System CWD content: " + line); 340 } 341 } catch (IOException e) { 342 e.printStackTrace(); 343 } finally { 344 IOUtils.cleanup(LOG, buf); 345 } 346 } 347 348 public ApplicationMaster() { 349 // Set up the configuration 350 conf = new YarnConfiguration(); 351 } 352 353 /** 354 * Parse command line options 355 * 356 * @param args Command line args 357 * @return Whether init successful and run should be invoked 358 * @throws ParseException 359 * @throws IOException 360 */ 361 public boolean init(String[] args) throws ParseException, IOException { 362 Options opts = new Options(); 363 opts.addOption("app_attempt_id", true, 364 "App Attempt ID. Not to be used unless for testing purposes"); 365 opts.addOption("shell_env", true, 366 "Environment for shell script. Specified as env_key=env_val pairs"); 367 opts.addOption("container_memory", true, 368 "Amount of memory in MB to be requested to run the shell command"); 369 opts.addOption("container_vcores", true, 370 "Amount of virtual cores to be requested to run the shell command"); 371 opts.addOption("num_containers", true, 372 "No. of containers on which the shell command needs to be executed"); 373 opts.addOption("priority", true, "Application Priority. Default 0"); 374 opts.addOption("debug", false, "Dump out debug information"); 375 376 opts.addOption("help", false, "Print usage"); 377 CommandLine cliParser = new GnuParser().parse(opts, args); 378 379 if (args.length == 0) { 380 printUsage(opts); 381 throw new IllegalArgumentException( 382 "No args specified for application master to initialize"); 383 } 384 385 //Check whether customer log4j.properties file exists 386 if (fileExist(log4jPath)) { 387 try { 388 Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, 389 log4jPath); 390 } catch (Exception e) { 391 LOG.warn("Can not set up custom log4j properties. " + e); 392 } 393 } 394 395 if (cliParser.hasOption("help")) { 396 printUsage(opts); 397 return false; 398 } 399 400 if (cliParser.hasOption("debug")) { 401 dumpOutDebugInfo(); 402 } 403 404 Map<String, String> envs = System.getenv(); 405 406 if (!envs.containsKey(Environment.CONTAINER_ID.name())) { 407 if (cliParser.hasOption("app_attempt_id")) { 408 String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); 409 appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr); 410 } else { 411 throw new IllegalArgumentException( 412 "Application Attempt Id not set in the environment"); 413 } 414 } else { 415 ContainerId containerId = ConverterUtils.toContainerId(envs 416 .get(Environment.CONTAINER_ID.name())); 417 appAttemptID = containerId.getApplicationAttemptId(); 418 } 419 420 if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { 421 throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV 422 + " not set in the environment"); 423 } 424 if (!envs.containsKey(Environment.NM_HOST.name())) { 425 throw new RuntimeException(Environment.NM_HOST.name() 426 + " not set in the environment"); 427 } 428 if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) { 429 throw new RuntimeException(Environment.NM_HTTP_PORT 430 + " not set in the environment"); 431 } 432 if (!envs.containsKey(Environment.NM_PORT.name())) { 433 throw new RuntimeException(Environment.NM_PORT.name() 434 + " not set in the environment"); 435 } 436 437 LOG.info("Application master for app" + ", appId=" 438 + appAttemptID.getApplicationId().getId() + ", clustertimestamp=" 439 + appAttemptID.getApplicationId().getClusterTimestamp() 440 + ", attemptId=" + appAttemptID.getAttemptId()); 441 442 if (!fileExist(shellCommandPath) 443 && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) { 444 throw new IllegalArgumentException( 445 "No shell command or shell script specified to be executed by application master"); 446 } 447 448 if (fileExist(shellCommandPath)) { 449 shellCommand = readContent(shellCommandPath); 450 } 451 452 if (fileExist(shellArgsPath)) { 453 shellArgs = readContent(shellArgsPath); 454 } 455 456 if (cliParser.hasOption("shell_env")) { 457 String shellEnvs[] = cliParser.getOptionValues("shell_env"); 458 for (String env : shellEnvs) { 459 env = env.trim(); 460 int index = env.indexOf('='); 461 if (index == -1) { 462 shellEnv.put(env, ""); 463 continue; 464 } 465 String key = env.substring(0, index); 466 String val = ""; 467 if (index < (env.length() - 1)) { 468 val = env.substring(index + 1); 469 } 470 shellEnv.put(key, val); 471 } 472 } 473 474 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { 475 scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); 476 477 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { 478 shellScriptPathTimestamp = Long.parseLong(envs 479 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); 480 } 481 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { 482 shellScriptPathLen = Long.parseLong(envs 483 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); 484 } 485 if (!scriptPath.isEmpty() 486 && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { 487 LOG.error("Illegal values in env for shell script path" + ", path=" 488 + scriptPath + ", len=" + shellScriptPathLen + ", timestamp=" 489 + shellScriptPathTimestamp); 490 throw new IllegalArgumentException( 491 "Illegal values in env for shell script path"); 492 } 493 } 494 495 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { 496 domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); 497 } 498 499 containerMemory = Integer.parseInt(cliParser.getOptionValue( 500 "container_memory", "10")); 501 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( 502 "container_vcores", "1")); 503 numTotalContainers = Integer.parseInt(cliParser.getOptionValue( 504 "num_containers", "1")); 505 if (numTotalContainers == 0) { 506 throw new IllegalArgumentException( 507 "Cannot run distributed shell with no containers"); 508 } 509 requestPriority = Integer.parseInt(cliParser 510 .getOptionValue("priority", "0")); 511 return true; 512 } 513 514 /** 515 * Helper function to print usage 516 * 517 * @param opts Parsed command line options 518 */ 519 private void printUsage(Options opts) { 520 new HelpFormatter().printHelp("ApplicationMaster", opts); 521 } 522 523 /** 524 * Main run function for the application master 525 * 526 * @throws YarnException 527 * @throws IOException 528 */ 529 @SuppressWarnings({ "unchecked" }) 530 public void run() throws YarnException, IOException, InterruptedException { 531 LOG.info("Starting ApplicationMaster"); 532 533 // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class 534 // are marked as LimitedPrivate 535 Credentials credentials = 536 UserGroupInformation.getCurrentUser().getCredentials(); 537 DataOutputBuffer dob = new DataOutputBuffer(); 538 credentials.writeTokenStorageToStream(dob); 539 // Now remove the AM->RM token so that containers cannot access it. 540 Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); 541 LOG.info("Executing with tokens:"); 542 while (iter.hasNext()) { 543 Token<?> token = iter.next(); 544 LOG.info(token); 545 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { 546 iter.remove(); 547 } 548 } 549 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 550 551 // Create appSubmitterUgi and add original tokens to it 552 String appSubmitterUserName = 553 System.getenv(ApplicationConstants.Environment.USER.name()); 554 appSubmitterUgi = 555 UserGroupInformation.createRemoteUser(appSubmitterUserName); 556 appSubmitterUgi.addCredentials(credentials); 557 558 559 AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); 560 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); 561 amRMClient.init(conf); 562 amRMClient.start(); 563 564 containerListener = createNMCallbackHandler(); 565 nmClientAsync = new NMClientAsyncImpl(containerListener); 566 nmClientAsync.init(conf); 567 nmClientAsync.start(); 568 569 startTimelineClient(conf); 570 if(timelineClient != null) { 571 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 572 DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); 573 } 574 575 // Setup local RPC Server to accept status requests directly from clients 576 // TODO need to setup a protocol for client to be able to communicate to 577 // the RPC server 578 // TODO use the rpc port info to register with the RM for the client to 579 // send requests to this app master 580 581 // Register self with ResourceManager 582 // This will start heartbeating to the RM 583 appMasterHostname = NetUtils.getHostname(); 584 RegisterApplicationMasterResponse response = amRMClient 585 .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 586 appMasterTrackingUrl); 587 // Dump out information about cluster capability as seen by the 588 // resource manager 589 int maxMem = response.getMaximumResourceCapability().getMemory(); 590 LOG.info("Max mem capability of resources in this cluster " + maxMem); 591 592 int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); 593 LOG.info("Max vcores capability of resources in this cluster " + maxVCores); 594 595 // A resource ask cannot exceed the max. 596 if (containerMemory > maxMem) { 597 LOG.info("Container memory specified above max threshold of cluster." 598 + " Using max value." + ", specified=" + containerMemory + ", max=" 599 + maxMem); 600 containerMemory = maxMem; 601 } 602 603 if (containerVirtualCores > maxVCores) { 604 LOG.info("Container virtual cores specified above max threshold of cluster." 605 + " Using max value." + ", specified=" + containerVirtualCores + ", max=" 606 + maxVCores); 607 containerVirtualCores = maxVCores; 608 } 609 610 List<Container> previousAMRunningContainers = 611 response.getContainersFromPreviousAttempts(); 612 LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() 613 + " previous attempts' running containers on AM registration."); 614 for(Container container: previousAMRunningContainers) { 615 launchedContainers.add(container.getId()); 616 } 617 numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); 618 619 620 int numTotalContainersToRequest = 621 numTotalContainers - previousAMRunningContainers.size(); 622 // Setup ask for containers from RM 623 // Send request for containers to RM 624 // Until we get our fully allocated quota, we keep on polling RM for 625 // containers 626 // Keep looping until all the containers are launched and shell script 627 // executed on them ( regardless of success/failure). 628 for (int i = 0; i < numTotalContainersToRequest; ++i) { 629 ContainerRequest containerAsk = setupContainerAskForRM(); 630 amRMClient.addContainerRequest(containerAsk); 631 } 632 numRequestedContainers.set(numTotalContainers); 633 } 634 635 @VisibleForTesting 636 void startTimelineClient(final Configuration conf) 637 throws YarnException, IOException, InterruptedException { 638 try { 639 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 640 @Override 641 public Void run() throws Exception { 642 if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, 643 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { 644 // Creating the Timeline Client 645 timelineClient = TimelineClient.createTimelineClient(); 646 timelineClient.init(conf); 647 timelineClient.start(); 648 } else { 649 timelineClient = null; 650 LOG.warn("Timeline service is not enabled"); 651 } 652 return null; 653 } 654 }); 655 } catch (UndeclaredThrowableException e) { 656 throw new YarnException(e.getCause()); 657 } 658 } 659 660 @VisibleForTesting 661 NMCallbackHandler createNMCallbackHandler() { 662 return new NMCallbackHandler(this); 663 } 664 665 @VisibleForTesting 666 protected boolean finish() { 667 // wait for completion. 668 while (!done 669 && (numCompletedContainers.get() != numTotalContainers)) { 670 try { 671 Thread.sleep(200); 672 } catch (InterruptedException ex) {} 673 } 674 675 if(timelineClient != null) { 676 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 677 DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); 678 } 679 680 // Join all launched threads 681 // needed for when we time out 682 // and we need to release containers 683 for (Thread launchThread : launchThreads) { 684 try { 685 launchThread.join(10000); 686 } catch (InterruptedException e) { 687 LOG.info("Exception thrown in thread join: " + e.getMessage()); 688 e.printStackTrace(); 689 } 690 } 691 692 // When the application completes, it should stop all running containers 693 LOG.info("Application completed. Stopping running containers"); 694 nmClientAsync.stop(); 695 696 // When the application completes, it should send a finish application 697 // signal to the RM 698 LOG.info("Application completed. Signalling finish to RM"); 699 700 FinalApplicationStatus appStatus; 701 String appMessage = null; 702 boolean success = true; 703 if (numFailedContainers.get() == 0 && 704 numCompletedContainers.get() == numTotalContainers) { 705 appStatus = FinalApplicationStatus.SUCCEEDED; 706 } else { 707 appStatus = FinalApplicationStatus.FAILED; 708 appMessage = "Diagnostics." + ", total=" + numTotalContainers 709 + ", completed=" + numCompletedContainers.get() + ", allocated=" 710 + numAllocatedContainers.get() + ", failed=" 711 + numFailedContainers.get(); 712 LOG.info(appMessage); 713 success = false; 714 } 715 try { 716 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); 717 } catch (YarnException ex) { 718 LOG.error("Failed to unregister application", ex); 719 } catch (IOException e) { 720 LOG.error("Failed to unregister application", e); 721 } 722 723 amRMClient.stop(); 724 725 // Stop Timeline Client 726 if(timelineClient != null) { 727 timelineClient.stop(); 728 } 729 730 return success; 731 } 732 733 @VisibleForTesting 734 class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { 735 @SuppressWarnings("unchecked") 736 @Override 737 public void onContainersCompleted(List<ContainerStatus> completedContainers) { 738 LOG.info("Got response from RM for container ask, completedCnt=" 739 + completedContainers.size()); 740 for (ContainerStatus containerStatus : completedContainers) { 741 LOG.info(appAttemptID + " got container status for containerID=" 742 + containerStatus.getContainerId() + ", state=" 743 + containerStatus.getState() + ", exitStatus=" 744 + containerStatus.getExitStatus() + ", diagnostics=" 745 + containerStatus.getDiagnostics()); 746 747 // non complete containers should not be here 748 assert (containerStatus.getState() == ContainerState.COMPLETE); 749 // ignore containers we know nothing about - probably from a previous 750 // attempt 751 if (!launchedContainers.contains(containerStatus.getContainerId())) { 752 LOG.info("Ignoring completed status of " 753 + containerStatus.getContainerId() 754 + "; unknown container(probably launched by previous attempt)"); 755 continue; 756 } 757 758 // increment counters for completed/failed containers 759 int exitStatus = containerStatus.getExitStatus(); 760 if (0 != exitStatus) { 761 // container failed 762 if (ContainerExitStatus.ABORTED != exitStatus) { 763 // shell script failed 764 // counts as completed 765 numCompletedContainers.incrementAndGet(); 766 numFailedContainers.incrementAndGet(); 767 } else { 768 // container was killed by framework, possibly preempted 769 // we should re-try as the container was lost for some reason 770 numAllocatedContainers.decrementAndGet(); 771 numRequestedContainers.decrementAndGet(); 772 // we do not need to release the container as it would be done 773 // by the RM 774 } 775 } else { 776 // nothing to do 777 // container completed successfully 778 numCompletedContainers.incrementAndGet(); 779 LOG.info("Container completed successfully." + ", containerId=" 780 + containerStatus.getContainerId()); 781 } 782 if(timelineClient != null) { 783 publishContainerEndEvent( 784 timelineClient, containerStatus, domainId, appSubmitterUgi); 785 } 786 } 787 788 // ask for more containers if any failed 789 int askCount = numTotalContainers - numRequestedContainers.get(); 790 numRequestedContainers.addAndGet(askCount); 791 792 if (askCount > 0) { 793 for (int i = 0; i < askCount; ++i) { 794 ContainerRequest containerAsk = setupContainerAskForRM(); 795 amRMClient.addContainerRequest(containerAsk); 796 } 797 } 798 799 if (numCompletedContainers.get() == numTotalContainers) { 800 done = true; 801 } 802 } 803 804 @Override 805 public void onContainersAllocated(List<Container> allocatedContainers) { 806 LOG.info("Got response from RM for container ask, allocatedCnt=" 807 + allocatedContainers.size()); 808 numAllocatedContainers.addAndGet(allocatedContainers.size()); 809 for (Container allocatedContainer : allocatedContainers) { 810 String yarnShellId = Integer.toString(yarnShellIdCounter); 811 yarnShellIdCounter++; 812 LOG.info("Launching shell command on a new container." 813 + ", containerId=" + allocatedContainer.getId() 814 + ", yarnShellId=" + yarnShellId 815 + ", containerNode=" + allocatedContainer.getNodeId().getHost() 816 + ":" + allocatedContainer.getNodeId().getPort() 817 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() 818 + ", containerResourceMemory" 819 + allocatedContainer.getResource().getMemory() 820 + ", containerResourceVirtualCores" 821 + allocatedContainer.getResource().getVirtualCores()); 822 // + ", containerToken" 823 // +allocatedContainer.getContainerToken().getIdentifier().toString()); 824 825 Thread launchThread = createLaunchContainerThread(allocatedContainer, 826 yarnShellId); 827 828 // launch and start the container on a separate thread to keep 829 // the main thread unblocked 830 // as all containers may not be allocated at one go. 831 launchThreads.add(launchThread); 832 launchedContainers.add(allocatedContainer.getId()); 833 launchThread.start(); 834 } 835 } 836 837 @Override 838 public void onShutdownRequest() { 839 done = true; 840 } 841 842 @Override 843 public void onNodesUpdated(List<NodeReport> updatedNodes) {} 844 845 @Override 846 public float getProgress() { 847 // set progress to deliver to RM on next heartbeat 848 float progress = (float) numCompletedContainers.get() 849 / numTotalContainers; 850 return progress; 851 } 852 853 @Override 854 public void onError(Throwable e) { 855 done = true; 856 amRMClient.stop(); 857 } 858 } 859 860 @VisibleForTesting 861 static class NMCallbackHandler 862 implements NMClientAsync.CallbackHandler { 863 864 private ConcurrentMap<ContainerId, Container> containers = 865 new ConcurrentHashMap<ContainerId, Container>(); 866 private final ApplicationMaster applicationMaster; 867 868 public NMCallbackHandler(ApplicationMaster applicationMaster) { 869 this.applicationMaster = applicationMaster; 870 } 871 872 public void addContainer(ContainerId containerId, Container container) { 873 containers.putIfAbsent(containerId, container); 874 } 875 876 @Override 877 public void onContainerStopped(ContainerId containerId) { 878 if (LOG.isDebugEnabled()) { 879 LOG.debug("Succeeded to stop Container " + containerId); 880 } 881 containers.remove(containerId); 882 } 883 884 @Override 885 public void onContainerStatusReceived(ContainerId containerId, 886 ContainerStatus containerStatus) { 887 if (LOG.isDebugEnabled()) { 888 LOG.debug("Container Status: id=" + containerId + ", status=" + 889 containerStatus); 890 } 891 } 892 893 @Override 894 public void onContainerStarted(ContainerId containerId, 895 Map<String, ByteBuffer> allServiceResponse) { 896 if (LOG.isDebugEnabled()) { 897 LOG.debug("Succeeded to start Container " + containerId); 898 } 899 Container container = containers.get(containerId); 900 if (container != null) { 901 applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); 902 } 903 if(applicationMaster.timelineClient != null) { 904 ApplicationMaster.publishContainerStartEvent( 905 applicationMaster.timelineClient, container, 906 applicationMaster.domainId, applicationMaster.appSubmitterUgi); 907 } 908 } 909 910 @Override 911 public void onStartContainerError(ContainerId containerId, Throwable t) { 912 LOG.error("Failed to start Container " + containerId); 913 containers.remove(containerId); 914 applicationMaster.numCompletedContainers.incrementAndGet(); 915 applicationMaster.numFailedContainers.incrementAndGet(); 916 } 917 918 @Override 919 public void onGetContainerStatusError( 920 ContainerId containerId, Throwable t) { 921 LOG.error("Failed to query the status of Container " + containerId); 922 } 923 924 @Override 925 public void onStopContainerError(ContainerId containerId, Throwable t) { 926 LOG.error("Failed to stop Container " + containerId); 927 containers.remove(containerId); 928 } 929 } 930 931 /** 932 * Thread to connect to the {@link ContainerManagementProtocol} and launch the container 933 * that will execute the shell command. 934 */ 935 private class LaunchContainerRunnable implements Runnable { 936 937 // Allocated container 938 private Container container; 939 private String shellId; 940 941 NMCallbackHandler containerListener; 942 943 /** 944 * @param lcontainer Allocated container 945 * @param containerListener Callback handler of the container 946 */ 947 public LaunchContainerRunnable(Container lcontainer, 948 NMCallbackHandler containerListener, String shellId) { 949 this.container = lcontainer; 950 this.containerListener = containerListener; 951 this.shellId = shellId; 952 } 953 954 @Override 955 /** 956 * Connects to CM, sets up container launch context 957 * for shell command and eventually dispatches the container 958 * start request to the CM. 959 */ 960 public void run() { 961 LOG.info("Setting up container launch container for containerid=" 962 + container.getId() + " with shellid=" + shellId); 963 964 // Set the local resources 965 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 966 967 // The container for the eventual shell commands needs its own local 968 // resources too. 969 // In this scenario, if a shell script is specified, we need to have it 970 // copied and made available to the container. 971 if (!scriptPath.isEmpty()) { 972 Path renamedScriptPath = null; 973 if (Shell.WINDOWS) { 974 renamedScriptPath = new Path(scriptPath + ".bat"); 975 } else { 976 renamedScriptPath = new Path(scriptPath + ".sh"); 977 } 978 979 try { 980 // rename the script file based on the underlying OS syntax. 981 renameScriptFile(renamedScriptPath); 982 } catch (Exception e) { 983 LOG.error( 984 "Not able to add suffix (.bat/.sh) to the shell script filename", 985 e); 986 // We know we cannot continue launching the container 987 // so we should release it. 988 numCompletedContainers.incrementAndGet(); 989 numFailedContainers.incrementAndGet(); 990 return; 991 } 992 993 URL yarnUrl = null; 994 try { 995 yarnUrl = ConverterUtils.getYarnUrlFromURI( 996 new URI(renamedScriptPath.toString())); 997 } catch (URISyntaxException e) { 998 LOG.error("Error when trying to use shell script path specified" 999 + " in env, path=" + renamedScriptPath, e); 1000 // A failure scenario on bad input such as invalid shell script path 1001 // We know we cannot continue launching the container 1002 // so we should release it. 1003 // TODO 1004 numCompletedContainers.incrementAndGet(); 1005 numFailedContainers.incrementAndGet(); 1006 return; 1007 } 1008 LocalResource shellRsrc = LocalResource.newInstance(yarnUrl, 1009 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 1010 shellScriptPathLen, shellScriptPathTimestamp); 1011 localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath : 1012 ExecShellStringPath, shellRsrc); 1013 shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; 1014 } 1015 1016 // Set the necessary command to execute on the allocated container 1017 Vector<CharSequence> vargs = new Vector<CharSequence>(5); 1018 1019 // Set executable command 1020 vargs.add(shellCommand); 1021 // Set shell script path 1022 if (!scriptPath.isEmpty()) { 1023 vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath 1024 : ExecShellStringPath); 1025 } 1026 1027 // Set args for the shell command if any 1028 vargs.add(shellArgs); 1029 // Add log redirect params 1030 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); 1031 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); 1032 1033 // Get final commmand 1034 StringBuilder command = new StringBuilder(); 1035 for (CharSequence str : vargs) { 1036 command.append(str).append(" "); 1037 } 1038 1039 List<String> commands = new ArrayList<String>(); 1040 commands.add(command.toString()); 1041 1042 // Set up ContainerLaunchContext, setting local resource, environment, 1043 // command and token for constructor. 1044 1045 // Note for tokens: Set up tokens for the container too. Today, for normal 1046 // shell commands, the container in distribute-shell doesn't need any 1047 // tokens. We are populating them mainly for NodeManagers to be able to 1048 // download anyfiles in the distributed file-system. The tokens are 1049 // otherwise also useful in cases, for e.g., when one is running a 1050 // "hadoop dfs" command inside the distributed shell. 1051 Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv); 1052 myShellEnv.put(YARN_SHELL_ID, shellId); 1053 ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( 1054 localResources, myShellEnv, commands, null, allTokens.duplicate(), 1055 null); 1056 containerListener.addContainer(container.getId(), container); 1057 nmClientAsync.startContainerAsync(container, ctx); 1058 } 1059 } 1060 1061 private void renameScriptFile(final Path renamedScriptPath) 1062 throws IOException, InterruptedException { 1063 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 1064 @Override 1065 public Void run() throws IOException { 1066 FileSystem fs = renamedScriptPath.getFileSystem(conf); 1067 fs.rename(new Path(scriptPath), renamedScriptPath); 1068 return null; 1069 } 1070 }); 1071 LOG.info("User " + appSubmitterUgi.getUserName() 1072 + " added suffix(.sh/.bat) to script file as " + renamedScriptPath); 1073 } 1074 1075 /** 1076 * Setup the request that will be sent to the RM for the container ask. 1077 * 1078 * @return the setup ResourceRequest to be sent to RM 1079 */ 1080 private ContainerRequest setupContainerAskForRM() { 1081 // setup requirements for hosts 1082 // using * as any host will do for the distributed shell app 1083 // set the priority for the request 1084 // TODO - what is the range for priority? how to decide? 1085 Priority pri = Priority.newInstance(requestPriority); 1086 1087 // Set up resource type requirements 1088 // For now, memory and CPU are supported so we set memory and cpu requirements 1089 Resource capability = Resource.newInstance(containerMemory, 1090 containerVirtualCores); 1091 1092 ContainerRequest request = new ContainerRequest(capability, null, null, 1093 pri); 1094 LOG.info("Requested container ask: " + request.toString()); 1095 return request; 1096 } 1097 1098 private boolean fileExist(String filePath) { 1099 return new File(filePath).exists(); 1100 } 1101 1102 private String readContent(String filePath) throws IOException { 1103 DataInputStream ds = null; 1104 try { 1105 ds = new DataInputStream(new FileInputStream(filePath)); 1106 return ds.readUTF(); 1107 } finally { 1108 org.apache.commons.io.IOUtils.closeQuietly(ds); 1109 } 1110 } 1111 1112 private static void publishContainerStartEvent( 1113 final TimelineClient timelineClient, Container container, String domainId, 1114 UserGroupInformation ugi) { 1115 final TimelineEntity entity = new TimelineEntity(); 1116 entity.setEntityId(container.getId().toString()); 1117 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1118 entity.setDomainId(domainId); 1119 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1120 TimelineEvent event = new TimelineEvent(); 1121 event.setTimestamp(System.currentTimeMillis()); 1122 event.setEventType(DSEvent.DS_CONTAINER_START.toString()); 1123 event.addEventInfo("Node", container.getNodeId().toString()); 1124 event.addEventInfo("Resources", container.getResource().toString()); 1125 entity.addEvent(event); 1126 1127 try { 1128 ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() { 1129 @Override 1130 public TimelinePutResponse run() throws Exception { 1131 return timelineClient.putEntities(entity); 1132 } 1133 }); 1134 } catch (Exception e) { 1135 LOG.error("Container start event could not be published for " 1136 + container.getId().toString(), 1137 e instanceof UndeclaredThrowableException ? e.getCause() : e); 1138 } 1139 } 1140 1141 private static void publishContainerEndEvent( 1142 final TimelineClient timelineClient, ContainerStatus container, 1143 String domainId, UserGroupInformation ugi) { 1144 final TimelineEntity entity = new TimelineEntity(); 1145 entity.setEntityId(container.getContainerId().toString()); 1146 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1147 entity.setDomainId(domainId); 1148 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1149 TimelineEvent event = new TimelineEvent(); 1150 event.setTimestamp(System.currentTimeMillis()); 1151 event.setEventType(DSEvent.DS_CONTAINER_END.toString()); 1152 event.addEventInfo("State", container.getState().name()); 1153 event.addEventInfo("Exit Status", container.getExitStatus()); 1154 entity.addEvent(event); 1155 try { 1156 timelineClient.putEntities(entity); 1157 } catch (YarnException | IOException e) { 1158 LOG.error("Container end event could not be published for " 1159 + container.getContainerId().toString(), e); 1160 } 1161 } 1162 1163 private static void publishApplicationAttemptEvent( 1164 final TimelineClient timelineClient, String appAttemptId, 1165 DSEvent appEvent, String domainId, UserGroupInformation ugi) { 1166 final TimelineEntity entity = new TimelineEntity(); 1167 entity.setEntityId(appAttemptId); 1168 entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); 1169 entity.setDomainId(domainId); 1170 entity.addPrimaryFilter("user", ugi.getShortUserName()); 1171 TimelineEvent event = new TimelineEvent(); 1172 event.setEventType(appEvent.toString()); 1173 event.setTimestamp(System.currentTimeMillis()); 1174 entity.addEvent(event); 1175 try { 1176 timelineClient.putEntities(entity); 1177 } catch (YarnException | IOException e) { 1178 LOG.error("App Attempt " 1179 + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") 1180 + " event could not be published for " 1181 + appAttemptId.toString(), e); 1182 } 1183 } 1184 1185 RMCallbackHandler getRMCallbackHandler() { 1186 return new RMCallbackHandler(); 1187 } 1188 1189 @VisibleForTesting 1190 void setAmRMClient(AMRMClientAsync client) { 1191 this.amRMClient = client; 1192 } 1193 1194 @VisibleForTesting 1195 int getNumCompletedContainers() { 1196 return numCompletedContainers.get(); 1197 } 1198 1199 @VisibleForTesting 1200 boolean getDone() { 1201 return done; 1202 } 1203 1204 @VisibleForTesting 1205 Thread createLaunchContainerThread(Container allocatedContainer, 1206 String shellId) { 1207 LaunchContainerRunnable runnableLaunchContainer = 1208 new LaunchContainerRunnable(allocatedContainer, containerListener, 1209 shellId); 1210 return new Thread(runnableLaunchContainer); 1211 } 1212}