001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.applications.distributedshell; 020 021import java.io.BufferedReader; 022import java.io.DataInputStream; 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.IOException; 026import java.io.StringReader; 027import java.lang.reflect.UndeclaredThrowableException; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.nio.ByteBuffer; 031import java.security.PrivilegedExceptionAction; 032import java.util.ArrayList; 033import java.util.Collections; 034import java.util.HashMap; 035import java.util.HashSet; 036import java.util.Iterator; 037import java.util.List; 038import java.util.Map; 039import java.util.Set; 040import java.util.Vector; 041import java.util.concurrent.ConcurrentHashMap; 042import java.util.concurrent.ConcurrentMap; 043import java.util.concurrent.atomic.AtomicInteger; 044 045import org.apache.commons.cli.CommandLine; 046import org.apache.commons.cli.GnuParser; 047import org.apache.commons.cli.HelpFormatter; 048import org.apache.commons.cli.Options; 049import org.apache.commons.cli.ParseException; 050import org.apache.commons.logging.Log; 051import org.apache.commons.logging.LogFactory; 052import org.apache.hadoop.classification.InterfaceAudience; 053import org.apache.hadoop.classification.InterfaceAudience.Private; 054import org.apache.hadoop.classification.InterfaceStability; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.FileSystem; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.io.DataOutputBuffer; 059import org.apache.hadoop.io.IOUtils; 060import org.apache.hadoop.net.NetUtils; 061import org.apache.hadoop.security.Credentials; 062import org.apache.hadoop.security.UserGroupInformation; 063import org.apache.hadoop.security.token.Token; 064import org.apache.hadoop.util.ExitUtil; 065import org.apache.hadoop.util.Shell; 066import org.apache.hadoop.yarn.api.ApplicationConstants; 067import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; 068import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; 069import org.apache.hadoop.yarn.api.ContainerManagementProtocol; 070import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; 071import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; 072import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; 073import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; 074import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 075import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 076import org.apache.hadoop.yarn.api.records.Container; 077import org.apache.hadoop.yarn.api.records.ContainerExitStatus; 078import org.apache.hadoop.yarn.api.records.ContainerId; 079import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; 080import org.apache.hadoop.yarn.api.records.ContainerRetryContext; 081import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; 082import org.apache.hadoop.yarn.api.records.ContainerState; 083import org.apache.hadoop.yarn.api.records.ContainerStatus; 084import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 085import org.apache.hadoop.yarn.api.records.LocalResource; 086import org.apache.hadoop.yarn.api.records.LocalResourceType; 087import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 088import org.apache.hadoop.yarn.api.records.NodeReport; 089import org.apache.hadoop.yarn.api.records.Priority; 090import org.apache.hadoop.yarn.api.records.Resource; 091import org.apache.hadoop.yarn.api.records.ResourceRequest; 092import org.apache.hadoop.yarn.api.records.URL; 093import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; 094import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId; 095import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; 096import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; 097import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; 098import org.apache.hadoop.yarn.client.api.TimelineClient; 099import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; 100import org.apache.hadoop.yarn.client.api.async.NMClientAsync; 101import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; 102import org.apache.hadoop.yarn.conf.YarnConfiguration; 103import org.apache.hadoop.yarn.exceptions.YarnException; 104import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; 105import org.apache.hadoop.yarn.util.ConverterUtils; 106import org.apache.hadoop.yarn.util.timeline.TimelineUtils; 107import org.apache.log4j.LogManager; 108 109import com.google.common.annotations.VisibleForTesting; 110import com.sun.jersey.api.client.ClientHandlerException; 111 112/** 113 * An ApplicationMaster for executing shell commands on a set of launched 114 * containers using the YARN framework. 115 * 116 * <p> 117 * This class is meant to act as an example on how to write yarn-based 118 * application masters. 119 * </p> 120 * 121 * <p> 122 * The ApplicationMaster is started on a container by the 123 * <code>ResourceManager</code>'s launcher. The first thing that the 124 * <code>ApplicationMaster</code> needs to do is to connect and register itself 125 * with the <code>ResourceManager</code>. The registration sets up information 126 * within the <code>ResourceManager</code> regarding what host:port the 127 * ApplicationMaster is listening on to provide any form of functionality to a 128 * client as well as a tracking url that a client can use to keep track of 129 * status/job history if needed. However, in the distributedshell, trackingurl 130 * and appMasterHost:appMasterRpcPort are not supported. 131 * </p> 132 * 133 * <p> 134 * The <code>ApplicationMaster</code> needs to send a heartbeat to the 135 * <code>ResourceManager</code> at regular intervals to inform the 136 * <code>ResourceManager</code> that it is up and alive. The 137 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the 138 * <code>ApplicationMaster</code> acts as a heartbeat. 139 * 140 * <p> 141 * For the actual handling of the job, the <code>ApplicationMaster</code> has to 142 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the 143 * required no. of containers using {@link ResourceRequest} with the necessary 144 * resource specifications such as node location, computational 145 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code> 146 * responds with an {@link AllocateResponse} that informs the 147 * <code>ApplicationMaster</code> of the set of newly allocated containers, 148 * completed containers as well as current state of available resources. 149 * </p> 150 * 151 * <p> 152 * For each allocated container, the <code>ApplicationMaster</code> can then set 153 * up the necessary launch context via {@link ContainerLaunchContext} to specify 154 * the allocated container id, local resources required by the executable, the 155 * environment to be setup for the executable, commands to execute, etc. and 156 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to 157 * launch and execute the defined commands on the given allocated container. 158 * </p> 159 * 160 * <p> 161 * The <code>ApplicationMaster</code> can monitor the launched container by 162 * either querying the <code>ResourceManager</code> using 163 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via 164 * the {@link ContainerManagementProtocol} by querying for the status of the allocated 165 * container's {@link ContainerId}. 166 * 167 * <p> 168 * After the job has been completed, the <code>ApplicationMaster</code> has to 169 * send a {@link FinishApplicationMasterRequest} to the 170 * <code>ResourceManager</code> to inform it that the 171 * <code>ApplicationMaster</code> has been completed. 172 */ 173@InterfaceAudience.Public 174@InterfaceStability.Unstable 175public class ApplicationMaster { 176 177 private static final Log LOG = LogFactory.getLog(ApplicationMaster.class); 178 179 @VisibleForTesting 180 @Private 181 public static enum DSEvent { 182 DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END 183 } 184 185 @VisibleForTesting 186 @Private 187 public static enum DSEntity { 188 DS_APP_ATTEMPT, DS_CONTAINER 189 } 190 191 private static final String YARN_SHELL_ID = "YARN_SHELL_ID"; 192 193 // Configuration 194 private Configuration conf; 195 196 // Handle to communicate with the Resource Manager 197 @SuppressWarnings("rawtypes") 198 private AMRMClientAsync amRMClient; 199 200 // In both secure and non-secure modes, this points to the job-submitter. 201 @VisibleForTesting 202 UserGroupInformation appSubmitterUgi; 203 204 // Handle to communicate with the Node Manager 205 private NMClientAsync nmClientAsync; 206 // Listen to process the response from the Node Manager 207 private NMCallbackHandler containerListener; 208 209 // Application Attempt Id ( combination of attemptId and fail count ) 210 @VisibleForTesting 211 protected ApplicationAttemptId appAttemptID; 212 213 // TODO 214 // For status update for clients - yet to be implemented 215 // Hostname of the container 216 private String appMasterHostname = ""; 217 // Port on which the app master listens for status updates from clients 218 private int appMasterRpcPort = -1; 219 // Tracking url to which app master publishes info for clients to monitor 220 private String appMasterTrackingUrl = ""; 221 222 // App Master configuration 223 // No. of containers to run shell command on 224 @VisibleForTesting 225 protected int numTotalContainers = 1; 226 // Memory to request for the container on which the shell command will run 227 private long containerMemory = 10; 228 // VirtualCores to request for the container on which the shell command will run 229 private int containerVirtualCores = 1; 230 // Priority of the request 231 private int requestPriority; 232 233 // Counter for completed containers ( complete denotes successful or failed ) 234 private AtomicInteger numCompletedContainers = new AtomicInteger(); 235 // Allocated container count so that we know how many containers has the RM 236 // allocated to us 237 @VisibleForTesting 238 protected AtomicInteger numAllocatedContainers = new AtomicInteger(); 239 // Count of failed containers 240 private AtomicInteger numFailedContainers = new AtomicInteger(); 241 // Count of containers already requested from the RM 242 // Needed as once requested, we should not request for containers again. 243 // Only request for more if the original requirement changes. 244 @VisibleForTesting 245 protected AtomicInteger numRequestedContainers = new AtomicInteger(); 246 247 // Shell command to be executed 248 private String shellCommand = ""; 249 // Args to be passed to the shell command 250 private String shellArgs = ""; 251 // Env variables to be setup for the shell command 252 private Map<String, String> shellEnv = new HashMap<String, String>(); 253 254 // Location of shell script ( obtained from info set in env ) 255 // Shell script path in fs 256 private String scriptPath = ""; 257 // Timestamp needed for creating a local resource 258 private long shellScriptPathTimestamp = 0; 259 // File length needed for local resource 260 private long shellScriptPathLen = 0; 261 262 // Container retry options 263 private ContainerRetryPolicy containerRetryPolicy = 264 ContainerRetryPolicy.NEVER_RETRY; 265 private Set<Integer> containerRetryErrorCodes = null; 266 private int containerMaxRetries = 0; 267 private int containrRetryInterval = 0; 268 269 // Timeline domain ID 270 private String domainId = null; 271 272 // Hardcoded path to shell script in launch container's local env 273 private static final String EXEC_SHELL_STRING_PATH = Client.SCRIPT_PATH 274 + ".sh"; 275 private static final String EXEC_BAT_SCRIPT_STRING_PATH = Client.SCRIPT_PATH 276 + ".bat"; 277 278 // Hardcoded path to custom log_properties 279 private static final String log4jPath = "log4j.properties"; 280 281 private static final String shellCommandPath = "shellCommands"; 282 private static final String shellArgsPath = "shellArgs"; 283 284 private volatile boolean done; 285 286 private ByteBuffer allTokens; 287 288 // Launch threads 289 private List<Thread> launchThreads = new ArrayList<Thread>(); 290 291 // Timeline Client 292 @VisibleForTesting 293 TimelineClient timelineClient; 294 static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS"; 295 static final String APPID_TIMELINE_FILTER_NAME = "appId"; 296 static final String USER_TIMELINE_FILTER_NAME = "user"; 297 298 private final String linux_bash_command = "bash"; 299 private final String windows_command = "cmd /c"; 300 301 private int yarnShellIdCounter = 1; 302 303 @VisibleForTesting 304 protected final Set<ContainerId> launchedContainers = 305 Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>()); 306 307 /** 308 * @param args Command line args 309 */ 310 public static void main(String[] args) { 311 boolean result = false; 312 try { 313 ApplicationMaster appMaster = new ApplicationMaster(); 314 LOG.info("Initializing ApplicationMaster"); 315 boolean doRun = appMaster.init(args); 316 if (!doRun) { 317 System.exit(0); 318 } 319 appMaster.run(); 320 result = appMaster.finish(); 321 } catch (Throwable t) { 322 LOG.fatal("Error running ApplicationMaster", t); 323 LogManager.shutdown(); 324 ExitUtil.terminate(1, t); 325 } 326 if (result) { 327 LOG.info("Application Master completed successfully. exiting"); 328 System.exit(0); 329 } else { 330 LOG.info("Application Master failed. exiting"); 331 System.exit(2); 332 } 333 } 334 335 /** 336 * Dump out contents of $CWD and the environment to stdout for debugging 337 */ 338 private void dumpOutDebugInfo() { 339 340 LOG.info("Dump debug output"); 341 Map<String, String> envs = System.getenv(); 342 for (Map.Entry<String, String> env : envs.entrySet()) { 343 LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue()); 344 System.out.println("System env: key=" + env.getKey() + ", val=" 345 + env.getValue()); 346 } 347 348 BufferedReader buf = null; 349 try { 350 String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") : 351 Shell.execCommand("ls", "-al"); 352 buf = new BufferedReader(new StringReader(lines)); 353 String line = ""; 354 while ((line = buf.readLine()) != null) { 355 LOG.info("System CWD content: " + line); 356 System.out.println("System CWD content: " + line); 357 } 358 } catch (IOException e) { 359 e.printStackTrace(); 360 } finally { 361 IOUtils.cleanup(LOG, buf); 362 } 363 } 364 365 public ApplicationMaster() { 366 // Set up the configuration 367 conf = new YarnConfiguration(); 368 } 369 370 /** 371 * Parse command line options 372 * 373 * @param args Command line args 374 * @return Whether init successful and run should be invoked 375 * @throws ParseException 376 * @throws IOException 377 */ 378 public boolean init(String[] args) throws ParseException, IOException { 379 Options opts = new Options(); 380 opts.addOption("app_attempt_id", true, 381 "App Attempt ID. Not to be used unless for testing purposes"); 382 opts.addOption("shell_env", true, 383 "Environment for shell script. Specified as env_key=env_val pairs"); 384 opts.addOption("container_memory", true, 385 "Amount of memory in MB to be requested to run the shell command"); 386 opts.addOption("container_vcores", true, 387 "Amount of virtual cores to be requested to run the shell command"); 388 opts.addOption("num_containers", true, 389 "No. of containers on which the shell command needs to be executed"); 390 opts.addOption("priority", true, "Application Priority. Default 0"); 391 opts.addOption("container_retry_policy", true, 392 "Retry policy when container fails to run, " 393 + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, " 394 + "2: RETRY_ON_SPECIFIC_ERROR_CODES"); 395 opts.addOption("container_retry_error_codes", true, 396 "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error " 397 + "codes is specified with this option, " 398 + "e.g. --container_retry_error_codes 1,2,3"); 399 opts.addOption("container_max_retries", true, 400 "If container could retry, it specifies max retires"); 401 opts.addOption("container_retry_interval", true, 402 "Interval between each retry, unit is milliseconds"); 403 opts.addOption("debug", false, "Dump out debug information"); 404 405 opts.addOption("help", false, "Print usage"); 406 CommandLine cliParser = new GnuParser().parse(opts, args); 407 408 if (args.length == 0) { 409 printUsage(opts); 410 throw new IllegalArgumentException( 411 "No args specified for application master to initialize"); 412 } 413 414 //Check whether customer log4j.properties file exists 415 if (fileExist(log4jPath)) { 416 try { 417 Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class, 418 log4jPath); 419 } catch (Exception e) { 420 LOG.warn("Can not set up custom log4j properties. " + e); 421 } 422 } 423 424 if (cliParser.hasOption("help")) { 425 printUsage(opts); 426 return false; 427 } 428 429 if (cliParser.hasOption("debug")) { 430 dumpOutDebugInfo(); 431 } 432 433 Map<String, String> envs = System.getenv(); 434 435 if (!envs.containsKey(Environment.CONTAINER_ID.name())) { 436 if (cliParser.hasOption("app_attempt_id")) { 437 String appIdStr = cliParser.getOptionValue("app_attempt_id", ""); 438 appAttemptID = ApplicationAttemptId.fromString(appIdStr); 439 } else { 440 throw new IllegalArgumentException( 441 "Application Attempt Id not set in the environment"); 442 } 443 } else { 444 ContainerId containerId = ContainerId.fromString(envs 445 .get(Environment.CONTAINER_ID.name())); 446 appAttemptID = containerId.getApplicationAttemptId(); 447 } 448 449 if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { 450 throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV 451 + " not set in the environment"); 452 } 453 if (!envs.containsKey(Environment.NM_HOST.name())) { 454 throw new RuntimeException(Environment.NM_HOST.name() 455 + " not set in the environment"); 456 } 457 if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) { 458 throw new RuntimeException(Environment.NM_HTTP_PORT 459 + " not set in the environment"); 460 } 461 if (!envs.containsKey(Environment.NM_PORT.name())) { 462 throw new RuntimeException(Environment.NM_PORT.name() 463 + " not set in the environment"); 464 } 465 466 LOG.info("Application master for app" + ", appId=" 467 + appAttemptID.getApplicationId().getId() + ", clustertimestamp=" 468 + appAttemptID.getApplicationId().getClusterTimestamp() 469 + ", attemptId=" + appAttemptID.getAttemptId()); 470 471 if (!fileExist(shellCommandPath) 472 && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) { 473 throw new IllegalArgumentException( 474 "No shell command or shell script specified to be executed by application master"); 475 } 476 477 if (fileExist(shellCommandPath)) { 478 shellCommand = readContent(shellCommandPath); 479 } 480 481 if (fileExist(shellArgsPath)) { 482 shellArgs = readContent(shellArgsPath); 483 } 484 485 if (cliParser.hasOption("shell_env")) { 486 String shellEnvs[] = cliParser.getOptionValues("shell_env"); 487 for (String env : shellEnvs) { 488 env = env.trim(); 489 int index = env.indexOf('='); 490 if (index == -1) { 491 shellEnv.put(env, ""); 492 continue; 493 } 494 String key = env.substring(0, index); 495 String val = ""; 496 if (index < (env.length() - 1)) { 497 val = env.substring(index + 1); 498 } 499 shellEnv.put(key, val); 500 } 501 } 502 503 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) { 504 scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION); 505 506 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) { 507 shellScriptPathTimestamp = Long.parseLong(envs 508 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)); 509 } 510 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) { 511 shellScriptPathLen = Long.parseLong(envs 512 .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)); 513 } 514 if (!scriptPath.isEmpty() 515 && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) { 516 LOG.error("Illegal values in env for shell script path" + ", path=" 517 + scriptPath + ", len=" + shellScriptPathLen + ", timestamp=" 518 + shellScriptPathTimestamp); 519 throw new IllegalArgumentException( 520 "Illegal values in env for shell script path"); 521 } 522 } 523 524 if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) { 525 domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN); 526 } 527 528 containerMemory = Integer.parseInt(cliParser.getOptionValue( 529 "container_memory", "10")); 530 containerVirtualCores = Integer.parseInt(cliParser.getOptionValue( 531 "container_vcores", "1")); 532 numTotalContainers = Integer.parseInt(cliParser.getOptionValue( 533 "num_containers", "1")); 534 if (numTotalContainers == 0) { 535 throw new IllegalArgumentException( 536 "Cannot run distributed shell with no containers"); 537 } 538 requestPriority = Integer.parseInt(cliParser 539 .getOptionValue("priority", "0")); 540 541 containerRetryPolicy = ContainerRetryPolicy.values()[ 542 Integer.parseInt(cliParser.getOptionValue( 543 "container_retry_policy", "0"))]; 544 if (cliParser.hasOption("container_retry_error_codes")) { 545 containerRetryErrorCodes = new HashSet<>(); 546 for (String errorCode : 547 cliParser.getOptionValue("container_retry_error_codes").split(",")) { 548 containerRetryErrorCodes.add(Integer.parseInt(errorCode)); 549 } 550 } 551 containerMaxRetries = Integer.parseInt( 552 cliParser.getOptionValue("container_max_retries", "0")); 553 containrRetryInterval = Integer.parseInt(cliParser.getOptionValue( 554 "container_retry_interval", "0")); 555 return true; 556 } 557 558 /** 559 * Helper function to print usage 560 * 561 * @param opts Parsed command line options 562 */ 563 private void printUsage(Options opts) { 564 new HelpFormatter().printHelp("ApplicationMaster", opts); 565 } 566 567 /** 568 * Main run function for the application master 569 * 570 * @throws YarnException 571 * @throws IOException 572 */ 573 @SuppressWarnings({ "unchecked" }) 574 public void run() throws YarnException, IOException, InterruptedException { 575 LOG.info("Starting ApplicationMaster"); 576 577 // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class 578 // are marked as LimitedPrivate 579 Credentials credentials = 580 UserGroupInformation.getCurrentUser().getCredentials(); 581 DataOutputBuffer dob = new DataOutputBuffer(); 582 credentials.writeTokenStorageToStream(dob); 583 // Now remove the AM->RM token so that containers cannot access it. 584 Iterator<Token<?>> iter = credentials.getAllTokens().iterator(); 585 LOG.info("Executing with tokens:"); 586 while (iter.hasNext()) { 587 Token<?> token = iter.next(); 588 LOG.info(token); 589 if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { 590 iter.remove(); 591 } 592 } 593 allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); 594 595 // Create appSubmitterUgi and add original tokens to it 596 String appSubmitterUserName = 597 System.getenv(ApplicationConstants.Environment.USER.name()); 598 appSubmitterUgi = 599 UserGroupInformation.createRemoteUser(appSubmitterUserName); 600 appSubmitterUgi.addCredentials(credentials); 601 602 603 AMRMClientAsync.AbstractCallbackHandler allocListener = 604 new RMCallbackHandler(); 605 amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); 606 amRMClient.init(conf); 607 amRMClient.start(); 608 609 containerListener = createNMCallbackHandler(); 610 nmClientAsync = new NMClientAsyncImpl(containerListener); 611 nmClientAsync.init(conf); 612 nmClientAsync.start(); 613 614 startTimelineClient(conf); 615 if(timelineClient != null) { 616 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 617 DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi); 618 } 619 620 // Setup local RPC Server to accept status requests directly from clients 621 // TODO need to setup a protocol for client to be able to communicate to 622 // the RPC server 623 // TODO use the rpc port info to register with the RM for the client to 624 // send requests to this app master 625 626 // Register self with ResourceManager 627 // This will start heartbeating to the RM 628 appMasterHostname = NetUtils.getHostname(); 629 RegisterApplicationMasterResponse response = amRMClient 630 .registerApplicationMaster(appMasterHostname, appMasterRpcPort, 631 appMasterTrackingUrl); 632 // Dump out information about cluster capability as seen by the 633 // resource manager 634 long maxMem = response.getMaximumResourceCapability().getMemorySize(); 635 LOG.info("Max mem capability of resources in this cluster " + maxMem); 636 637 int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); 638 LOG.info("Max vcores capability of resources in this cluster " + maxVCores); 639 640 // A resource ask cannot exceed the max. 641 if (containerMemory > maxMem) { 642 LOG.info("Container memory specified above max threshold of cluster." 643 + " Using max value." + ", specified=" + containerMemory + ", max=" 644 + maxMem); 645 containerMemory = maxMem; 646 } 647 648 if (containerVirtualCores > maxVCores) { 649 LOG.info("Container virtual cores specified above max threshold of cluster." 650 + " Using max value." + ", specified=" + containerVirtualCores + ", max=" 651 + maxVCores); 652 containerVirtualCores = maxVCores; 653 } 654 655 List<Container> previousAMRunningContainers = 656 response.getContainersFromPreviousAttempts(); 657 LOG.info(appAttemptID + " received " + previousAMRunningContainers.size() 658 + " previous attempts' running containers on AM registration."); 659 for(Container container: previousAMRunningContainers) { 660 launchedContainers.add(container.getId()); 661 } 662 numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); 663 664 665 int numTotalContainersToRequest = 666 numTotalContainers - previousAMRunningContainers.size(); 667 // Setup ask for containers from RM 668 // Send request for containers to RM 669 // Until we get our fully allocated quota, we keep on polling RM for 670 // containers 671 // Keep looping until all the containers are launched and shell script 672 // executed on them ( regardless of success/failure). 673 for (int i = 0; i < numTotalContainersToRequest; ++i) { 674 ContainerRequest containerAsk = setupContainerAskForRM(); 675 amRMClient.addContainerRequest(containerAsk); 676 } 677 numRequestedContainers.set(numTotalContainers); 678 } 679 680 @VisibleForTesting 681 void startTimelineClient(final Configuration conf) 682 throws YarnException, IOException, InterruptedException { 683 try { 684 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 685 @Override 686 public Void run() throws Exception { 687 if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, 688 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { 689 // Creating the Timeline Client 690 timelineClient = TimelineClient.createTimelineClient(); 691 timelineClient.init(conf); 692 timelineClient.start(); 693 } else { 694 timelineClient = null; 695 LOG.warn("Timeline service is not enabled"); 696 } 697 return null; 698 } 699 }); 700 } catch (UndeclaredThrowableException e) { 701 throw new YarnException(e.getCause()); 702 } 703 } 704 705 @VisibleForTesting 706 NMCallbackHandler createNMCallbackHandler() { 707 return new NMCallbackHandler(this); 708 } 709 710 @VisibleForTesting 711 protected boolean finish() { 712 // wait for completion. 713 while (!done 714 && (numCompletedContainers.get() != numTotalContainers)) { 715 try { 716 Thread.sleep(200); 717 } catch (InterruptedException ex) {} 718 } 719 720 if(timelineClient != null) { 721 publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(), 722 DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi); 723 } 724 725 // Join all launched threads 726 // needed for when we time out 727 // and we need to release containers 728 for (Thread launchThread : launchThreads) { 729 try { 730 launchThread.join(10000); 731 } catch (InterruptedException e) { 732 LOG.info("Exception thrown in thread join: " + e.getMessage()); 733 e.printStackTrace(); 734 } 735 } 736 737 // When the application completes, it should stop all running containers 738 LOG.info("Application completed. Stopping running containers"); 739 nmClientAsync.stop(); 740 741 // When the application completes, it should send a finish application 742 // signal to the RM 743 LOG.info("Application completed. Signalling finish to RM"); 744 745 FinalApplicationStatus appStatus; 746 String appMessage = null; 747 boolean success = true; 748 if (numCompletedContainers.get() - numFailedContainers.get() 749 >= numTotalContainers) { 750 appStatus = FinalApplicationStatus.SUCCEEDED; 751 } else { 752 appStatus = FinalApplicationStatus.FAILED; 753 appMessage = "Diagnostics." + ", total=" + numTotalContainers 754 + ", completed=" + numCompletedContainers.get() + ", allocated=" 755 + numAllocatedContainers.get() + ", failed=" 756 + numFailedContainers.get(); 757 LOG.info(appMessage); 758 success = false; 759 } 760 try { 761 amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); 762 } catch (YarnException ex) { 763 LOG.error("Failed to unregister application", ex); 764 } catch (IOException e) { 765 LOG.error("Failed to unregister application", e); 766 } 767 768 amRMClient.stop(); 769 770 // Stop Timeline Client 771 if(timelineClient != null) { 772 timelineClient.stop(); 773 } 774 775 return success; 776 } 777 778 @VisibleForTesting 779 class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler { 780 @SuppressWarnings("unchecked") 781 @Override 782 public void onContainersCompleted(List<ContainerStatus> completedContainers) { 783 LOG.info("Got response from RM for container ask, completedCnt=" 784 + completedContainers.size()); 785 for (ContainerStatus containerStatus : completedContainers) { 786 LOG.info(appAttemptID + " got container status for containerID=" 787 + containerStatus.getContainerId() + ", state=" 788 + containerStatus.getState() + ", exitStatus=" 789 + containerStatus.getExitStatus() + ", diagnostics=" 790 + containerStatus.getDiagnostics()); 791 792 // non complete containers should not be here 793 assert (containerStatus.getState() == ContainerState.COMPLETE); 794 // ignore containers we know nothing about - probably from a previous 795 // attempt 796 if (!launchedContainers.contains(containerStatus.getContainerId())) { 797 LOG.info("Ignoring completed status of " 798 + containerStatus.getContainerId() 799 + "; unknown container(probably launched by previous attempt)"); 800 continue; 801 } 802 803 // increment counters for completed/failed containers 804 int exitStatus = containerStatus.getExitStatus(); 805 if (0 != exitStatus) { 806 // container failed 807 if (ContainerExitStatus.ABORTED != exitStatus) { 808 // shell script failed 809 // counts as completed 810 numCompletedContainers.incrementAndGet(); 811 numFailedContainers.incrementAndGet(); 812 } else { 813 // container was killed by framework, possibly preempted 814 // we should re-try as the container was lost for some reason 815 numAllocatedContainers.decrementAndGet(); 816 numRequestedContainers.decrementAndGet(); 817 // we do not need to release the container as it would be done 818 // by the RM 819 } 820 } else { 821 // nothing to do 822 // container completed successfully 823 numCompletedContainers.incrementAndGet(); 824 LOG.info("Container completed successfully." + ", containerId=" 825 + containerStatus.getContainerId()); 826 } 827 if(timelineClient != null) { 828 publishContainerEndEvent( 829 timelineClient, containerStatus, domainId, appSubmitterUgi); 830 } 831 } 832 833 // ask for more containers if any failed 834 int askCount = numTotalContainers - numRequestedContainers.get(); 835 numRequestedContainers.addAndGet(askCount); 836 837 if (askCount > 0) { 838 for (int i = 0; i < askCount; ++i) { 839 ContainerRequest containerAsk = setupContainerAskForRM(); 840 amRMClient.addContainerRequest(containerAsk); 841 } 842 } 843 844 if (numCompletedContainers.get() == numTotalContainers) { 845 done = true; 846 } 847 } 848 849 @Override 850 public void onContainersAllocated(List<Container> allocatedContainers) { 851 LOG.info("Got response from RM for container ask, allocatedCnt=" 852 + allocatedContainers.size()); 853 numAllocatedContainers.addAndGet(allocatedContainers.size()); 854 for (Container allocatedContainer : allocatedContainers) { 855 String yarnShellId = Integer.toString(yarnShellIdCounter); 856 yarnShellIdCounter++; 857 LOG.info("Launching shell command on a new container." 858 + ", containerId=" + allocatedContainer.getId() 859 + ", yarnShellId=" + yarnShellId 860 + ", containerNode=" + allocatedContainer.getNodeId().getHost() 861 + ":" + allocatedContainer.getNodeId().getPort() 862 + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() 863 + ", containerResourceMemory" 864 + allocatedContainer.getResource().getMemorySize() 865 + ", containerResourceVirtualCores" 866 + allocatedContainer.getResource().getVirtualCores()); 867 // + ", containerToken" 868 // +allocatedContainer.getContainerToken().getIdentifier().toString()); 869 870 Thread launchThread = createLaunchContainerThread(allocatedContainer, 871 yarnShellId); 872 873 // launch and start the container on a separate thread to keep 874 // the main thread unblocked 875 // as all containers may not be allocated at one go. 876 launchThreads.add(launchThread); 877 launchedContainers.add(allocatedContainer.getId()); 878 launchThread.start(); 879 } 880 } 881 882 @Override 883 public void onContainersResourceChanged(List<Container> containers) {} 884 885 @Override 886 public void onShutdownRequest() { 887 done = true; 888 } 889 890 @Override 891 public void onNodesUpdated(List<NodeReport> updatedNodes) {} 892 893 @Override 894 public float getProgress() { 895 // set progress to deliver to RM on next heartbeat 896 float progress = (float) numCompletedContainers.get() 897 / numTotalContainers; 898 return progress; 899 } 900 901 @Override 902 public void onError(Throwable e) { 903 done = true; 904 amRMClient.stop(); 905 } 906 } 907 908 @VisibleForTesting 909 static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler { 910 911 private ConcurrentMap<ContainerId, Container> containers = 912 new ConcurrentHashMap<ContainerId, Container>(); 913 private final ApplicationMaster applicationMaster; 914 915 public NMCallbackHandler(ApplicationMaster applicationMaster) { 916 this.applicationMaster = applicationMaster; 917 } 918 919 public void addContainer(ContainerId containerId, Container container) { 920 containers.putIfAbsent(containerId, container); 921 } 922 923 @Override 924 public void onContainerStopped(ContainerId containerId) { 925 if (LOG.isDebugEnabled()) { 926 LOG.debug("Succeeded to stop Container " + containerId); 927 } 928 containers.remove(containerId); 929 } 930 931 @Override 932 public void onContainerStatusReceived(ContainerId containerId, 933 ContainerStatus containerStatus) { 934 if (LOG.isDebugEnabled()) { 935 LOG.debug("Container Status: id=" + containerId + ", status=" + 936 containerStatus); 937 } 938 } 939 940 @Override 941 public void onContainerStarted(ContainerId containerId, 942 Map<String, ByteBuffer> allServiceResponse) { 943 if (LOG.isDebugEnabled()) { 944 LOG.debug("Succeeded to start Container " + containerId); 945 } 946 Container container = containers.get(containerId); 947 if (container != null) { 948 applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId()); 949 } 950 if(applicationMaster.timelineClient != null) { 951 applicationMaster.publishContainerStartEvent( 952 applicationMaster.timelineClient, container, 953 applicationMaster.domainId, applicationMaster.appSubmitterUgi); 954 } 955 } 956 957 @Override 958 public void onContainerResourceIncreased( 959 ContainerId containerId, Resource resource) {} 960 961 @Override 962 public void onStartContainerError(ContainerId containerId, Throwable t) { 963 LOG.error("Failed to start Container " + containerId); 964 containers.remove(containerId); 965 applicationMaster.numCompletedContainers.incrementAndGet(); 966 applicationMaster.numFailedContainers.incrementAndGet(); 967 } 968 969 @Override 970 public void onGetContainerStatusError( 971 ContainerId containerId, Throwable t) { 972 LOG.error("Failed to query the status of Container " + containerId); 973 } 974 975 @Override 976 public void onStopContainerError(ContainerId containerId, Throwable t) { 977 LOG.error("Failed to stop Container " + containerId); 978 containers.remove(containerId); 979 } 980 981 @Override 982 public void onIncreaseContainerResourceError( 983 ContainerId containerId, Throwable t) {} 984 985 } 986 987 /** 988 * Thread to connect to the {@link ContainerManagementProtocol} and launch the container 989 * that will execute the shell command. 990 */ 991 private class LaunchContainerRunnable implements Runnable { 992 993 // Allocated container 994 private Container container; 995 private String shellId; 996 997 NMCallbackHandler containerListener; 998 999 /** 1000 * @param lcontainer Allocated container 1001 * @param containerListener Callback handler of the container 1002 */ 1003 public LaunchContainerRunnable(Container lcontainer, 1004 NMCallbackHandler containerListener, String shellId) { 1005 this.container = lcontainer; 1006 this.containerListener = containerListener; 1007 this.shellId = shellId; 1008 } 1009 1010 @Override 1011 /** 1012 * Connects to CM, sets up container launch context 1013 * for shell command and eventually dispatches the container 1014 * start request to the CM. 1015 */ 1016 public void run() { 1017 LOG.info("Setting up container launch container for containerid=" 1018 + container.getId() + " with shellid=" + shellId); 1019 1020 // Set the local resources 1021 Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); 1022 1023 // The container for the eventual shell commands needs its own local 1024 // resources too. 1025 // In this scenario, if a shell script is specified, we need to have it 1026 // copied and made available to the container. 1027 if (!scriptPath.isEmpty()) { 1028 Path renamedScriptPath = null; 1029 if (Shell.WINDOWS) { 1030 renamedScriptPath = new Path(scriptPath + ".bat"); 1031 } else { 1032 renamedScriptPath = new Path(scriptPath + ".sh"); 1033 } 1034 1035 try { 1036 // rename the script file based on the underlying OS syntax. 1037 renameScriptFile(renamedScriptPath); 1038 } catch (Exception e) { 1039 LOG.error( 1040 "Not able to add suffix (.bat/.sh) to the shell script filename", 1041 e); 1042 // We know we cannot continue launching the container 1043 // so we should release it. 1044 numCompletedContainers.incrementAndGet(); 1045 numFailedContainers.incrementAndGet(); 1046 return; 1047 } 1048 1049 URL yarnUrl = null; 1050 try { 1051 yarnUrl = URL.fromURI(new URI(renamedScriptPath.toString())); 1052 } catch (URISyntaxException e) { 1053 LOG.error("Error when trying to use shell script path specified" 1054 + " in env, path=" + renamedScriptPath, e); 1055 // A failure scenario on bad input such as invalid shell script path 1056 // We know we cannot continue launching the container 1057 // so we should release it. 1058 // TODO 1059 numCompletedContainers.incrementAndGet(); 1060 numFailedContainers.incrementAndGet(); 1061 return; 1062 } 1063 LocalResource shellRsrc = LocalResource.newInstance(yarnUrl, 1064 LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 1065 shellScriptPathLen, shellScriptPathTimestamp); 1066 localResources.put(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH : 1067 EXEC_SHELL_STRING_PATH, shellRsrc); 1068 shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; 1069 } 1070 1071 // Set the necessary command to execute on the allocated container 1072 Vector<CharSequence> vargs = new Vector<CharSequence>(5); 1073 1074 // Set executable command 1075 vargs.add(shellCommand); 1076 // Set shell script path 1077 if (!scriptPath.isEmpty()) { 1078 vargs.add(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH 1079 : EXEC_SHELL_STRING_PATH); 1080 } 1081 1082 // Set args for the shell command if any 1083 vargs.add(shellArgs); 1084 // Add log redirect params 1085 vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); 1086 vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); 1087 1088 // Get final commmand 1089 StringBuilder command = new StringBuilder(); 1090 for (CharSequence str : vargs) { 1091 command.append(str).append(" "); 1092 } 1093 1094 List<String> commands = new ArrayList<String>(); 1095 commands.add(command.toString()); 1096 1097 // Set up ContainerLaunchContext, setting local resource, environment, 1098 // command and token for constructor. 1099 1100 // Note for tokens: Set up tokens for the container too. Today, for normal 1101 // shell commands, the container in distribute-shell doesn't need any 1102 // tokens. We are populating them mainly for NodeManagers to be able to 1103 // download anyfiles in the distributed file-system. The tokens are 1104 // otherwise also useful in cases, for e.g., when one is running a 1105 // "hadoop dfs" command inside the distributed shell. 1106 Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv); 1107 myShellEnv.put(YARN_SHELL_ID, shellId); 1108 ContainerRetryContext containerRetryContext = 1109 ContainerRetryContext.newInstance( 1110 containerRetryPolicy, containerRetryErrorCodes, 1111 containerMaxRetries, containrRetryInterval); 1112 ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( 1113 localResources, myShellEnv, commands, null, allTokens.duplicate(), 1114 null, containerRetryContext); 1115 containerListener.addContainer(container.getId(), container); 1116 nmClientAsync.startContainerAsync(container, ctx); 1117 } 1118 } 1119 1120 private void renameScriptFile(final Path renamedScriptPath) 1121 throws IOException, InterruptedException { 1122 appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() { 1123 @Override 1124 public Void run() throws IOException { 1125 FileSystem fs = renamedScriptPath.getFileSystem(conf); 1126 fs.rename(new Path(scriptPath), renamedScriptPath); 1127 return null; 1128 } 1129 }); 1130 LOG.info("User " + appSubmitterUgi.getUserName() 1131 + " added suffix(.sh/.bat) to script file as " + renamedScriptPath); 1132 } 1133 1134 /** 1135 * Setup the request that will be sent to the RM for the container ask. 1136 * 1137 * @return the setup ResourceRequest to be sent to RM 1138 */ 1139 private ContainerRequest setupContainerAskForRM() { 1140 // setup requirements for hosts 1141 // using * as any host will do for the distributed shell app 1142 // set the priority for the request 1143 // TODO - what is the range for priority? how to decide? 1144 Priority pri = Priority.newInstance(requestPriority); 1145 1146 // Set up resource type requirements 1147 // For now, memory and CPU are supported so we set memory and cpu requirements 1148 Resource capability = Resource.newInstance(containerMemory, 1149 containerVirtualCores); 1150 1151 ContainerRequest request = new ContainerRequest(capability, null, null, 1152 pri); 1153 LOG.info("Requested container ask: " + request.toString()); 1154 return request; 1155 } 1156 1157 private boolean fileExist(String filePath) { 1158 return new File(filePath).exists(); 1159 } 1160 1161 private String readContent(String filePath) throws IOException { 1162 DataInputStream ds = null; 1163 try { 1164 ds = new DataInputStream(new FileInputStream(filePath)); 1165 return ds.readUTF(); 1166 } finally { 1167 org.apache.commons.io.IOUtils.closeQuietly(ds); 1168 } 1169 } 1170 1171 private void publishContainerStartEvent( 1172 final TimelineClient timelineClient, final Container container, 1173 String domainId, UserGroupInformation ugi) { 1174 final TimelineEntity entity = new TimelineEntity(); 1175 entity.setEntityId(container.getId().toString()); 1176 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1177 entity.setDomainId(domainId); 1178 entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); 1179 entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getId() 1180 .getApplicationAttemptId().getApplicationId().toString()); 1181 TimelineEvent event = new TimelineEvent(); 1182 event.setTimestamp(System.currentTimeMillis()); 1183 event.setEventType(DSEvent.DS_CONTAINER_START.toString()); 1184 event.addEventInfo("Node", container.getNodeId().toString()); 1185 event.addEventInfo("Resources", container.getResource().toString()); 1186 entity.addEvent(event); 1187 1188 try { 1189 processTimelineResponseErrors( 1190 putContainerEntity(timelineClient, 1191 container.getId().getApplicationAttemptId(), 1192 entity)); 1193 } catch (YarnException | IOException | ClientHandlerException e) { 1194 LOG.error("Container start event could not be published for " 1195 + container.getId().toString(), e); 1196 } 1197 } 1198 1199 @VisibleForTesting 1200 void publishContainerEndEvent( 1201 final TimelineClient timelineClient, ContainerStatus container, 1202 String domainId, UserGroupInformation ugi) { 1203 final TimelineEntity entity = new TimelineEntity(); 1204 entity.setEntityId(container.getContainerId().toString()); 1205 entity.setEntityType(DSEntity.DS_CONTAINER.toString()); 1206 entity.setDomainId(domainId); 1207 entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); 1208 entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, 1209 container.getContainerId().getApplicationAttemptId() 1210 .getApplicationId().toString()); 1211 TimelineEvent event = new TimelineEvent(); 1212 event.setTimestamp(System.currentTimeMillis()); 1213 event.setEventType(DSEvent.DS_CONTAINER_END.toString()); 1214 event.addEventInfo("State", container.getState().name()); 1215 event.addEventInfo("Exit Status", container.getExitStatus()); 1216 entity.addEvent(event); 1217 try { 1218 processTimelineResponseErrors( 1219 putContainerEntity(timelineClient, 1220 container.getContainerId().getApplicationAttemptId(), 1221 entity)); 1222 } catch (YarnException | IOException | ClientHandlerException e) { 1223 LOG.error("Container end event could not be published for " 1224 + container.getContainerId().toString(), e); 1225 } 1226 } 1227 1228 private TimelinePutResponse putContainerEntity( 1229 TimelineClient timelineClient, ApplicationAttemptId currAttemptId, 1230 TimelineEntity entity) 1231 throws YarnException, IOException { 1232 if (TimelineUtils.timelineServiceV1_5Enabled(conf)) { 1233 TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance( 1234 currAttemptId.getApplicationId(), 1235 CONTAINER_ENTITY_GROUP_ID); 1236 return timelineClient.putEntities(currAttemptId, groupId, entity); 1237 } else { 1238 return timelineClient.putEntities(entity); 1239 } 1240 } 1241 1242 private void publishApplicationAttemptEvent( 1243 final TimelineClient timelineClient, String appAttemptId, 1244 DSEvent appEvent, String domainId, UserGroupInformation ugi) { 1245 final TimelineEntity entity = new TimelineEntity(); 1246 entity.setEntityId(appAttemptId); 1247 entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString()); 1248 entity.setDomainId(domainId); 1249 entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName()); 1250 TimelineEvent event = new TimelineEvent(); 1251 event.setEventType(appEvent.toString()); 1252 event.setTimestamp(System.currentTimeMillis()); 1253 entity.addEvent(event); 1254 try { 1255 TimelinePutResponse response = timelineClient.putEntities(entity); 1256 processTimelineResponseErrors(response); 1257 } catch (YarnException | IOException | ClientHandlerException e) { 1258 LOG.error("App Attempt " 1259 + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end") 1260 + " event could not be published for " 1261 + appAttemptId.toString(), e); 1262 } 1263 } 1264 1265 private TimelinePutResponse processTimelineResponseErrors( 1266 TimelinePutResponse response) { 1267 List<TimelinePutResponse.TimelinePutError> errors = response.getErrors(); 1268 if (errors.size() == 0) { 1269 LOG.debug("Timeline entities are successfully put"); 1270 } else { 1271 for (TimelinePutResponse.TimelinePutError error : errors) { 1272 LOG.error( 1273 "Error when publishing entity [" + error.getEntityType() + "," 1274 + error.getEntityId() + "], server side error code: " 1275 + error.getErrorCode()); 1276 } 1277 } 1278 return response; 1279 } 1280 1281 RMCallbackHandler getRMCallbackHandler() { 1282 return new RMCallbackHandler(); 1283 } 1284 1285 @VisibleForTesting 1286 void setAmRMClient(AMRMClientAsync client) { 1287 this.amRMClient = client; 1288 } 1289 1290 @VisibleForTesting 1291 int getNumCompletedContainers() { 1292 return numCompletedContainers.get(); 1293 } 1294 1295 @VisibleForTesting 1296 boolean getDone() { 1297 return done; 1298 } 1299 1300 @VisibleForTesting 1301 Thread createLaunchContainerThread(Container allocatedContainer, 1302 String shellId) { 1303 LaunchContainerRunnable runnableLaunchContainer = 1304 new LaunchContainerRunnable(allocatedContainer, containerListener, 1305 shellId); 1306 return new Thread(runnableLaunchContainer); 1307 } 1308}