001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.applications.distributedshell;
020
021import java.io.BufferedReader;
022import java.io.DataInputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.IOException;
026import java.io.StringReader;
027import java.lang.reflect.UndeclaredThrowableException;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.nio.ByteBuffer;
031import java.security.PrivilegedExceptionAction;
032import java.util.ArrayList;
033import java.util.Collections;
034import java.util.HashMap;
035import java.util.HashSet;
036import java.util.Iterator;
037import java.util.List;
038import java.util.Map;
039import java.util.Set;
040import java.util.Vector;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ConcurrentMap;
043import java.util.concurrent.atomic.AtomicInteger;
044
045import org.apache.commons.cli.CommandLine;
046import org.apache.commons.cli.GnuParser;
047import org.apache.commons.cli.HelpFormatter;
048import org.apache.commons.cli.Options;
049import org.apache.commons.cli.ParseException;
050import org.apache.commons.logging.Log;
051import org.apache.commons.logging.LogFactory;
052import org.apache.hadoop.classification.InterfaceAudience;
053import org.apache.hadoop.classification.InterfaceAudience.Private;
054import org.apache.hadoop.classification.InterfaceStability;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.FileSystem;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.io.DataOutputBuffer;
059import org.apache.hadoop.io.IOUtils;
060import org.apache.hadoop.net.NetUtils;
061import org.apache.hadoop.security.Credentials;
062import org.apache.hadoop.security.UserGroupInformation;
063import org.apache.hadoop.security.token.Token;
064import org.apache.hadoop.util.ExitUtil;
065import org.apache.hadoop.util.Shell;
066import org.apache.hadoop.yarn.api.ApplicationConstants;
067import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
068import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
069import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
070import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
071import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
072import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
073import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
074import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
075import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
076import org.apache.hadoop.yarn.api.records.Container;
077import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
078import org.apache.hadoop.yarn.api.records.ContainerId;
079import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
080import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
081import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
082import org.apache.hadoop.yarn.api.records.ContainerState;
083import org.apache.hadoop.yarn.api.records.ContainerStatus;
084import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
085import org.apache.hadoop.yarn.api.records.LocalResource;
086import org.apache.hadoop.yarn.api.records.LocalResourceType;
087import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
088import org.apache.hadoop.yarn.api.records.NodeReport;
089import org.apache.hadoop.yarn.api.records.Priority;
090import org.apache.hadoop.yarn.api.records.Resource;
091import org.apache.hadoop.yarn.api.records.ResourceRequest;
092import org.apache.hadoop.yarn.api.records.URL;
093import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
094import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
095import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
096import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
097import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
098import org.apache.hadoop.yarn.client.api.TimelineClient;
099import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
100import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
101import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
102import org.apache.hadoop.yarn.conf.YarnConfiguration;
103import org.apache.hadoop.yarn.exceptions.YarnException;
104import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
105import org.apache.hadoop.yarn.util.ConverterUtils;
106import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
107import org.apache.log4j.LogManager;
108
109import com.google.common.annotations.VisibleForTesting;
110import com.sun.jersey.api.client.ClientHandlerException;
111
112/**
113 * An ApplicationMaster for executing shell commands on a set of launched
114 * containers using the YARN framework.
115 * 
116 * <p>
117 * This class is meant to act as an example on how to write yarn-based
118 * application masters.
119 * </p>
120 * 
121 * <p>
122 * The ApplicationMaster is started on a container by the
123 * <code>ResourceManager</code>'s launcher. The first thing that the
124 * <code>ApplicationMaster</code> needs to do is to connect and register itself
125 * with the <code>ResourceManager</code>. The registration sets up information
126 * within the <code>ResourceManager</code> regarding what host:port the
127 * ApplicationMaster is listening on to provide any form of functionality to a
128 * client as well as a tracking url that a client can use to keep track of
129 * status/job history if needed. However, in the distributedshell, trackingurl
130 * and appMasterHost:appMasterRpcPort are not supported.
131 * </p>
132 * 
133 * <p>
134 * The <code>ApplicationMaster</code> needs to send a heartbeat to the
135 * <code>ResourceManager</code> at regular intervals to inform the
136 * <code>ResourceManager</code> that it is up and alive. The
137 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
138 * <code>ApplicationMaster</code> acts as a heartbeat.
139 * 
140 * <p>
141 * For the actual handling of the job, the <code>ApplicationMaster</code> has to
142 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
143 * required no. of containers using {@link ResourceRequest} with the necessary
144 * resource specifications such as node location, computational
145 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
146 * responds with an {@link AllocateResponse} that informs the
147 * <code>ApplicationMaster</code> of the set of newly allocated containers,
148 * completed containers as well as current state of available resources.
149 * </p>
150 * 
151 * <p>
152 * For each allocated container, the <code>ApplicationMaster</code> can then set
153 * up the necessary launch context via {@link ContainerLaunchContext} to specify
154 * the allocated container id, local resources required by the executable, the
155 * environment to be setup for the executable, commands to execute, etc. and
156 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
157 * launch and execute the defined commands on the given allocated container.
158 * </p>
159 * 
160 * <p>
161 * The <code>ApplicationMaster</code> can monitor the launched container by
162 * either querying the <code>ResourceManager</code> using
163 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
164 * the {@link ContainerManagementProtocol} by querying for the status of the allocated
165 * container's {@link ContainerId}.
166 *
167 * <p>
168 * After the job has been completed, the <code>ApplicationMaster</code> has to
169 * send a {@link FinishApplicationMasterRequest} to the
170 * <code>ResourceManager</code> to inform it that the
171 * <code>ApplicationMaster</code> has been completed.
172 */
173@InterfaceAudience.Public
174@InterfaceStability.Unstable
175public class ApplicationMaster {
176
177  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
178
179  @VisibleForTesting
180  @Private
181  public static enum DSEvent {
182    DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
183  }
184  
185  @VisibleForTesting
186  @Private
187  public static enum DSEntity {
188    DS_APP_ATTEMPT, DS_CONTAINER
189  }
190
191  private static final String YARN_SHELL_ID = "YARN_SHELL_ID";
192
193  // Configuration
194  private Configuration conf;
195
196  // Handle to communicate with the Resource Manager
197  @SuppressWarnings("rawtypes")
198  private AMRMClientAsync amRMClient;
199
200  // In both secure and non-secure modes, this points to the job-submitter.
201  @VisibleForTesting
202  UserGroupInformation appSubmitterUgi;
203
204  // Handle to communicate with the Node Manager
205  private NMClientAsync nmClientAsync;
206  // Listen to process the response from the Node Manager
207  private NMCallbackHandler containerListener;
208
209  // Application Attempt Id ( combination of attemptId and fail count )
210  @VisibleForTesting
211  protected ApplicationAttemptId appAttemptID;
212
213  // TODO
214  // For status update for clients - yet to be implemented
215  // Hostname of the container
216  private String appMasterHostname = "";
217  // Port on which the app master listens for status updates from clients
218  private int appMasterRpcPort = -1;
219  // Tracking url to which app master publishes info for clients to monitor
220  private String appMasterTrackingUrl = "";
221
222  // App Master configuration
223  // No. of containers to run shell command on
224  @VisibleForTesting
225  protected int numTotalContainers = 1;
226  // Memory to request for the container on which the shell command will run
227  private long containerMemory = 10;
228  // VirtualCores to request for the container on which the shell command will run
229  private int containerVirtualCores = 1;
230  // Priority of the request
231  private int requestPriority;
232
233  // Counter for completed containers ( complete denotes successful or failed )
234  private AtomicInteger numCompletedContainers = new AtomicInteger();
235  // Allocated container count so that we know how many containers has the RM
236  // allocated to us
237  @VisibleForTesting
238  protected AtomicInteger numAllocatedContainers = new AtomicInteger();
239  // Count of failed containers
240  private AtomicInteger numFailedContainers = new AtomicInteger();
241  // Count of containers already requested from the RM
242  // Needed as once requested, we should not request for containers again.
243  // Only request for more if the original requirement changes.
244  @VisibleForTesting
245  protected AtomicInteger numRequestedContainers = new AtomicInteger();
246
247  // Shell command to be executed
248  private String shellCommand = "";
249  // Args to be passed to the shell command
250  private String shellArgs = "";
251  // Env variables to be setup for the shell command
252  private Map<String, String> shellEnv = new HashMap<String, String>();
253
254  // Location of shell script ( obtained from info set in env )
255  // Shell script path in fs
256  private String scriptPath = "";
257  // Timestamp needed for creating a local resource
258  private long shellScriptPathTimestamp = 0;
259  // File length needed for local resource
260  private long shellScriptPathLen = 0;
261
262  // Container retry options
263  private ContainerRetryPolicy containerRetryPolicy =
264      ContainerRetryPolicy.NEVER_RETRY;
265  private Set<Integer> containerRetryErrorCodes = null;
266  private int containerMaxRetries = 0;
267  private int containrRetryInterval = 0;
268
269  // Timeline domain ID
270  private String domainId = null;
271
272  // Hardcoded path to shell script in launch container's local env
273  private static final String EXEC_SHELL_STRING_PATH = Client.SCRIPT_PATH
274      + ".sh";
275  private static final String EXEC_BAT_SCRIPT_STRING_PATH = Client.SCRIPT_PATH
276      + ".bat";
277
278  // Hardcoded path to custom log_properties
279  private static final String log4jPath = "log4j.properties";
280
281  private static final String shellCommandPath = "shellCommands";
282  private static final String shellArgsPath = "shellArgs";
283
284  private volatile boolean done;
285
286  private ByteBuffer allTokens;
287
288  // Launch threads
289  private List<Thread> launchThreads = new ArrayList<Thread>();
290
291  // Timeline Client
292  @VisibleForTesting
293  TimelineClient timelineClient;
294  static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
295  static final String APPID_TIMELINE_FILTER_NAME = "appId";
296  static final String USER_TIMELINE_FILTER_NAME = "user";
297
298  private final String linux_bash_command = "bash";
299  private final String windows_command = "cmd /c";
300
301  private int yarnShellIdCounter = 1;
302
303  @VisibleForTesting
304  protected final Set<ContainerId> launchedContainers =
305      Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
306
307  /**
308   * @param args Command line args
309   */
310  public static void main(String[] args) {
311    boolean result = false;
312    try {
313      ApplicationMaster appMaster = new ApplicationMaster();
314      LOG.info("Initializing ApplicationMaster");
315      boolean doRun = appMaster.init(args);
316      if (!doRun) {
317        System.exit(0);
318      }
319      appMaster.run();
320      result = appMaster.finish();
321    } catch (Throwable t) {
322      LOG.fatal("Error running ApplicationMaster", t);
323      LogManager.shutdown();
324      ExitUtil.terminate(1, t);
325    }
326    if (result) {
327      LOG.info("Application Master completed successfully. exiting");
328      System.exit(0);
329    } else {
330      LOG.info("Application Master failed. exiting");
331      System.exit(2);
332    }
333  }
334
335  /**
336   * Dump out contents of $CWD and the environment to stdout for debugging
337   */
338  private void dumpOutDebugInfo() {
339
340    LOG.info("Dump debug output");
341    Map<String, String> envs = System.getenv();
342    for (Map.Entry<String, String> env : envs.entrySet()) {
343      LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
344      System.out.println("System env: key=" + env.getKey() + ", val="
345          + env.getValue());
346    }
347
348    BufferedReader buf = null;
349    try {
350      String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
351        Shell.execCommand("ls", "-al");
352      buf = new BufferedReader(new StringReader(lines));
353      String line = "";
354      while ((line = buf.readLine()) != null) {
355        LOG.info("System CWD content: " + line);
356        System.out.println("System CWD content: " + line);
357      }
358    } catch (IOException e) {
359      e.printStackTrace();
360    } finally {
361      IOUtils.cleanup(LOG, buf);
362    }
363  }
364
365  public ApplicationMaster() {
366    // Set up the configuration
367    conf = new YarnConfiguration();
368  }
369
370  /**
371   * Parse command line options
372   *
373   * @param args Command line args
374   * @return Whether init successful and run should be invoked
375   * @throws ParseException
376   * @throws IOException
377   */
378  public boolean init(String[] args) throws ParseException, IOException {
379    Options opts = new Options();
380    opts.addOption("app_attempt_id", true,
381        "App Attempt ID. Not to be used unless for testing purposes");
382    opts.addOption("shell_env", true,
383        "Environment for shell script. Specified as env_key=env_val pairs");
384    opts.addOption("container_memory", true,
385        "Amount of memory in MB to be requested to run the shell command");
386    opts.addOption("container_vcores", true,
387        "Amount of virtual cores to be requested to run the shell command");
388    opts.addOption("num_containers", true,
389        "No. of containers on which the shell command needs to be executed");
390    opts.addOption("priority", true, "Application Priority. Default 0");
391    opts.addOption("container_retry_policy", true,
392        "Retry policy when container fails to run, "
393            + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, "
394            + "2: RETRY_ON_SPECIFIC_ERROR_CODES");
395    opts.addOption("container_retry_error_codes", true,
396        "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error "
397            + "codes is specified with this option, "
398            + "e.g. --container_retry_error_codes 1,2,3");
399    opts.addOption("container_max_retries", true,
400        "If container could retry, it specifies max retires");
401    opts.addOption("container_retry_interval", true,
402        "Interval between each retry, unit is milliseconds");
403    opts.addOption("debug", false, "Dump out debug information");
404
405    opts.addOption("help", false, "Print usage");
406    CommandLine cliParser = new GnuParser().parse(opts, args);
407
408    if (args.length == 0) {
409      printUsage(opts);
410      throw new IllegalArgumentException(
411          "No args specified for application master to initialize");
412    }
413
414    //Check whether customer log4j.properties file exists
415    if (fileExist(log4jPath)) {
416      try {
417        Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
418            log4jPath);
419      } catch (Exception e) {
420        LOG.warn("Can not set up custom log4j properties. " + e);
421      }
422    }
423
424    if (cliParser.hasOption("help")) {
425      printUsage(opts);
426      return false;
427    }
428
429    if (cliParser.hasOption("debug")) {
430      dumpOutDebugInfo();
431    }
432
433    Map<String, String> envs = System.getenv();
434
435    if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
436      if (cliParser.hasOption("app_attempt_id")) {
437        String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
438        appAttemptID = ApplicationAttemptId.fromString(appIdStr);
439      } else {
440        throw new IllegalArgumentException(
441            "Application Attempt Id not set in the environment");
442      }
443    } else {
444      ContainerId containerId = ContainerId.fromString(envs
445          .get(Environment.CONTAINER_ID.name()));
446      appAttemptID = containerId.getApplicationAttemptId();
447    }
448
449    if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
450      throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
451          + " not set in the environment");
452    }
453    if (!envs.containsKey(Environment.NM_HOST.name())) {
454      throw new RuntimeException(Environment.NM_HOST.name()
455          + " not set in the environment");
456    }
457    if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
458      throw new RuntimeException(Environment.NM_HTTP_PORT
459          + " not set in the environment");
460    }
461    if (!envs.containsKey(Environment.NM_PORT.name())) {
462      throw new RuntimeException(Environment.NM_PORT.name()
463          + " not set in the environment");
464    }
465
466    LOG.info("Application master for app" + ", appId="
467        + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
468        + appAttemptID.getApplicationId().getClusterTimestamp()
469        + ", attemptId=" + appAttemptID.getAttemptId());
470
471    if (!fileExist(shellCommandPath)
472        && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
473      throw new IllegalArgumentException(
474          "No shell command or shell script specified to be executed by application master");
475    }
476
477    if (fileExist(shellCommandPath)) {
478      shellCommand = readContent(shellCommandPath);
479    }
480
481    if (fileExist(shellArgsPath)) {
482      shellArgs = readContent(shellArgsPath);
483    }
484
485    if (cliParser.hasOption("shell_env")) {
486      String shellEnvs[] = cliParser.getOptionValues("shell_env");
487      for (String env : shellEnvs) {
488        env = env.trim();
489        int index = env.indexOf('=');
490        if (index == -1) {
491          shellEnv.put(env, "");
492          continue;
493        }
494        String key = env.substring(0, index);
495        String val = "";
496        if (index < (env.length() - 1)) {
497          val = env.substring(index + 1);
498        }
499        shellEnv.put(key, val);
500      }
501    }
502
503    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
504      scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
505
506      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
507        shellScriptPathTimestamp = Long.parseLong(envs
508            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
509      }
510      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
511        shellScriptPathLen = Long.parseLong(envs
512            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
513      }
514      if (!scriptPath.isEmpty()
515          && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
516        LOG.error("Illegal values in env for shell script path" + ", path="
517            + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
518            + shellScriptPathTimestamp);
519        throw new IllegalArgumentException(
520            "Illegal values in env for shell script path");
521      }
522    }
523
524    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
525      domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
526    }
527
528    containerMemory = Integer.parseInt(cliParser.getOptionValue(
529        "container_memory", "10"));
530    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
531        "container_vcores", "1"));
532    numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
533        "num_containers", "1"));
534    if (numTotalContainers == 0) {
535      throw new IllegalArgumentException(
536          "Cannot run distributed shell with no containers");
537    }
538    requestPriority = Integer.parseInt(cliParser
539        .getOptionValue("priority", "0"));
540
541    containerRetryPolicy = ContainerRetryPolicy.values()[
542        Integer.parseInt(cliParser.getOptionValue(
543            "container_retry_policy", "0"))];
544    if (cliParser.hasOption("container_retry_error_codes")) {
545      containerRetryErrorCodes = new HashSet<>();
546      for (String errorCode :
547          cliParser.getOptionValue("container_retry_error_codes").split(",")) {
548        containerRetryErrorCodes.add(Integer.parseInt(errorCode));
549      }
550    }
551    containerMaxRetries = Integer.parseInt(
552        cliParser.getOptionValue("container_max_retries", "0"));
553    containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
554        "container_retry_interval", "0"));
555    return true;
556  }
557
558  /**
559   * Helper function to print usage
560   *
561   * @param opts Parsed command line options
562   */
563  private void printUsage(Options opts) {
564    new HelpFormatter().printHelp("ApplicationMaster", opts);
565  }
566
567  /**
568   * Main run function for the application master
569   *
570   * @throws YarnException
571   * @throws IOException
572   */
573  @SuppressWarnings({ "unchecked" })
574  public void run() throws YarnException, IOException, InterruptedException {
575    LOG.info("Starting ApplicationMaster");
576
577    // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
578    // are marked as LimitedPrivate
579    Credentials credentials =
580        UserGroupInformation.getCurrentUser().getCredentials();
581    DataOutputBuffer dob = new DataOutputBuffer();
582    credentials.writeTokenStorageToStream(dob);
583    // Now remove the AM->RM token so that containers cannot access it.
584    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
585    LOG.info("Executing with tokens:");
586    while (iter.hasNext()) {
587      Token<?> token = iter.next();
588      LOG.info(token);
589      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
590        iter.remove();
591      }
592    }
593    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
594
595    // Create appSubmitterUgi and add original tokens to it
596    String appSubmitterUserName =
597        System.getenv(ApplicationConstants.Environment.USER.name());
598    appSubmitterUgi =
599        UserGroupInformation.createRemoteUser(appSubmitterUserName);
600    appSubmitterUgi.addCredentials(credentials);
601
602
603    AMRMClientAsync.AbstractCallbackHandler allocListener =
604        new RMCallbackHandler();
605    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
606    amRMClient.init(conf);
607    amRMClient.start();
608
609    containerListener = createNMCallbackHandler();
610    nmClientAsync = new NMClientAsyncImpl(containerListener);
611    nmClientAsync.init(conf);
612    nmClientAsync.start();
613
614    startTimelineClient(conf);
615    if(timelineClient != null) {
616      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
617          DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
618    }
619
620    // Setup local RPC Server to accept status requests directly from clients
621    // TODO need to setup a protocol for client to be able to communicate to
622    // the RPC server
623    // TODO use the rpc port info to register with the RM for the client to
624    // send requests to this app master
625
626    // Register self with ResourceManager
627    // This will start heartbeating to the RM
628    appMasterHostname = NetUtils.getHostname();
629    RegisterApplicationMasterResponse response = amRMClient
630        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
631            appMasterTrackingUrl);
632    // Dump out information about cluster capability as seen by the
633    // resource manager
634    long maxMem = response.getMaximumResourceCapability().getMemorySize();
635    LOG.info("Max mem capability of resources in this cluster " + maxMem);
636    
637    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
638    LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
639
640    // A resource ask cannot exceed the max.
641    if (containerMemory > maxMem) {
642      LOG.info("Container memory specified above max threshold of cluster."
643          + " Using max value." + ", specified=" + containerMemory + ", max="
644          + maxMem);
645      containerMemory = maxMem;
646    }
647
648    if (containerVirtualCores > maxVCores) {
649      LOG.info("Container virtual cores specified above max threshold of cluster."
650          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
651          + maxVCores);
652      containerVirtualCores = maxVCores;
653    }
654
655    List<Container> previousAMRunningContainers =
656        response.getContainersFromPreviousAttempts();
657    LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
658      + " previous attempts' running containers on AM registration.");
659    for(Container container: previousAMRunningContainers) {
660      launchedContainers.add(container.getId());
661    }
662    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
663
664
665    int numTotalContainersToRequest =
666        numTotalContainers - previousAMRunningContainers.size();
667    // Setup ask for containers from RM
668    // Send request for containers to RM
669    // Until we get our fully allocated quota, we keep on polling RM for
670    // containers
671    // Keep looping until all the containers are launched and shell script
672    // executed on them ( regardless of success/failure).
673    for (int i = 0; i < numTotalContainersToRequest; ++i) {
674      ContainerRequest containerAsk = setupContainerAskForRM();
675      amRMClient.addContainerRequest(containerAsk);
676    }
677    numRequestedContainers.set(numTotalContainers);
678  }
679
680  @VisibleForTesting
681  void startTimelineClient(final Configuration conf)
682      throws YarnException, IOException, InterruptedException {
683    try {
684      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
685        @Override
686        public Void run() throws Exception {
687          if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
688              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
689            // Creating the Timeline Client
690            timelineClient = TimelineClient.createTimelineClient();
691            timelineClient.init(conf);
692            timelineClient.start();
693          } else {
694            timelineClient = null;
695            LOG.warn("Timeline service is not enabled");
696          }
697          return null;
698        }
699      });
700    } catch (UndeclaredThrowableException e) {
701      throw new YarnException(e.getCause());
702    }
703  }
704
705  @VisibleForTesting
706  NMCallbackHandler createNMCallbackHandler() {
707    return new NMCallbackHandler(this);
708  }
709
710  @VisibleForTesting
711  protected boolean finish() {
712    // wait for completion.
713    while (!done
714        && (numCompletedContainers.get() != numTotalContainers)) {
715      try {
716        Thread.sleep(200);
717      } catch (InterruptedException ex) {}
718    }
719
720    if(timelineClient != null) {
721      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
722          DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
723    }
724
725    // Join all launched threads
726    // needed for when we time out
727    // and we need to release containers
728    for (Thread launchThread : launchThreads) {
729      try {
730        launchThread.join(10000);
731      } catch (InterruptedException e) {
732        LOG.info("Exception thrown in thread join: " + e.getMessage());
733        e.printStackTrace();
734      }
735    }
736
737    // When the application completes, it should stop all running containers
738    LOG.info("Application completed. Stopping running containers");
739    nmClientAsync.stop();
740
741    // When the application completes, it should send a finish application
742    // signal to the RM
743    LOG.info("Application completed. Signalling finish to RM");
744
745    FinalApplicationStatus appStatus;
746    String appMessage = null;
747    boolean success = true;
748    if (numCompletedContainers.get() - numFailedContainers.get()
749        >= numTotalContainers) {
750      appStatus = FinalApplicationStatus.SUCCEEDED;
751    } else {
752      appStatus = FinalApplicationStatus.FAILED;
753      appMessage = "Diagnostics." + ", total=" + numTotalContainers
754          + ", completed=" + numCompletedContainers.get() + ", allocated="
755          + numAllocatedContainers.get() + ", failed="
756          + numFailedContainers.get();
757      LOG.info(appMessage);
758      success = false;
759    }
760    try {
761      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
762    } catch (YarnException ex) {
763      LOG.error("Failed to unregister application", ex);
764    } catch (IOException e) {
765      LOG.error("Failed to unregister application", e);
766    }
767    
768    amRMClient.stop();
769
770    // Stop Timeline Client
771    if(timelineClient != null) {
772      timelineClient.stop();
773    }
774
775    return success;
776  }
777
778  @VisibleForTesting
779  class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
780    @SuppressWarnings("unchecked")
781    @Override
782    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
783      LOG.info("Got response from RM for container ask, completedCnt="
784          + completedContainers.size());
785      for (ContainerStatus containerStatus : completedContainers) {
786        LOG.info(appAttemptID + " got container status for containerID="
787            + containerStatus.getContainerId() + ", state="
788            + containerStatus.getState() + ", exitStatus="
789            + containerStatus.getExitStatus() + ", diagnostics="
790            + containerStatus.getDiagnostics());
791
792        // non complete containers should not be here
793        assert (containerStatus.getState() == ContainerState.COMPLETE);
794        // ignore containers we know nothing about - probably from a previous
795        // attempt
796        if (!launchedContainers.contains(containerStatus.getContainerId())) {
797          LOG.info("Ignoring completed status of "
798              + containerStatus.getContainerId()
799              + "; unknown container(probably launched by previous attempt)");
800          continue;
801        }
802
803        // increment counters for completed/failed containers
804        int exitStatus = containerStatus.getExitStatus();
805        if (0 != exitStatus) {
806          // container failed
807          if (ContainerExitStatus.ABORTED != exitStatus) {
808            // shell script failed
809            // counts as completed
810            numCompletedContainers.incrementAndGet();
811            numFailedContainers.incrementAndGet();
812          } else {
813            // container was killed by framework, possibly preempted
814            // we should re-try as the container was lost for some reason
815            numAllocatedContainers.decrementAndGet();
816            numRequestedContainers.decrementAndGet();
817            // we do not need to release the container as it would be done
818            // by the RM
819          }
820        } else {
821          // nothing to do
822          // container completed successfully
823          numCompletedContainers.incrementAndGet();
824          LOG.info("Container completed successfully." + ", containerId="
825              + containerStatus.getContainerId());
826        }
827        if(timelineClient != null) {
828          publishContainerEndEvent(
829              timelineClient, containerStatus, domainId, appSubmitterUgi);
830        }
831      }
832      
833      // ask for more containers if any failed
834      int askCount = numTotalContainers - numRequestedContainers.get();
835      numRequestedContainers.addAndGet(askCount);
836
837      if (askCount > 0) {
838        for (int i = 0; i < askCount; ++i) {
839          ContainerRequest containerAsk = setupContainerAskForRM();
840          amRMClient.addContainerRequest(containerAsk);
841        }
842      }
843      
844      if (numCompletedContainers.get() == numTotalContainers) {
845        done = true;
846      }
847    }
848
849    @Override
850    public void onContainersAllocated(List<Container> allocatedContainers) {
851      LOG.info("Got response from RM for container ask, allocatedCnt="
852          + allocatedContainers.size());
853      numAllocatedContainers.addAndGet(allocatedContainers.size());
854      for (Container allocatedContainer : allocatedContainers) {
855        String yarnShellId = Integer.toString(yarnShellIdCounter);
856        yarnShellIdCounter++;
857        LOG.info("Launching shell command on a new container."
858            + ", containerId=" + allocatedContainer.getId()
859            + ", yarnShellId=" + yarnShellId
860            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
861            + ":" + allocatedContainer.getNodeId().getPort()
862            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
863            + ", containerResourceMemory"
864            + allocatedContainer.getResource().getMemorySize()
865            + ", containerResourceVirtualCores"
866            + allocatedContainer.getResource().getVirtualCores());
867        // + ", containerToken"
868        // +allocatedContainer.getContainerToken().getIdentifier().toString());
869
870        Thread launchThread = createLaunchContainerThread(allocatedContainer,
871            yarnShellId);
872
873        // launch and start the container on a separate thread to keep
874        // the main thread unblocked
875        // as all containers may not be allocated at one go.
876        launchThreads.add(launchThread);
877        launchedContainers.add(allocatedContainer.getId());
878        launchThread.start();
879      }
880    }
881
882    @Override
883    public void onContainersResourceChanged(List<Container> containers) {}
884
885    @Override
886    public void onShutdownRequest() {
887      done = true;
888    }
889
890    @Override
891    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
892
893    @Override
894    public float getProgress() {
895      // set progress to deliver to RM on next heartbeat
896      float progress = (float) numCompletedContainers.get()
897          / numTotalContainers;
898      return progress;
899    }
900
901    @Override
902    public void onError(Throwable e) {
903      done = true;
904      amRMClient.stop();
905    }
906  }
907
908  @VisibleForTesting
909  static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
910
911    private ConcurrentMap<ContainerId, Container> containers =
912        new ConcurrentHashMap<ContainerId, Container>();
913    private final ApplicationMaster applicationMaster;
914
915    public NMCallbackHandler(ApplicationMaster applicationMaster) {
916      this.applicationMaster = applicationMaster;
917    }
918
919    public void addContainer(ContainerId containerId, Container container) {
920      containers.putIfAbsent(containerId, container);
921    }
922
923    @Override
924    public void onContainerStopped(ContainerId containerId) {
925      if (LOG.isDebugEnabled()) {
926        LOG.debug("Succeeded to stop Container " + containerId);
927      }
928      containers.remove(containerId);
929    }
930
931    @Override
932    public void onContainerStatusReceived(ContainerId containerId,
933        ContainerStatus containerStatus) {
934      if (LOG.isDebugEnabled()) {
935        LOG.debug("Container Status: id=" + containerId + ", status=" +
936            containerStatus);
937      }
938    }
939
940    @Override
941    public void onContainerStarted(ContainerId containerId,
942        Map<String, ByteBuffer> allServiceResponse) {
943      if (LOG.isDebugEnabled()) {
944        LOG.debug("Succeeded to start Container " + containerId);
945      }
946      Container container = containers.get(containerId);
947      if (container != null) {
948        applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
949      }
950      if(applicationMaster.timelineClient != null) {
951        applicationMaster.publishContainerStartEvent(
952            applicationMaster.timelineClient, container,
953            applicationMaster.domainId, applicationMaster.appSubmitterUgi);
954      }
955    }
956
957    @Override
958    public void onContainerResourceIncreased(
959        ContainerId containerId, Resource resource) {}
960
961    @Override
962    public void onStartContainerError(ContainerId containerId, Throwable t) {
963      LOG.error("Failed to start Container " + containerId);
964      containers.remove(containerId);
965      applicationMaster.numCompletedContainers.incrementAndGet();
966      applicationMaster.numFailedContainers.incrementAndGet();
967    }
968
969    @Override
970    public void onGetContainerStatusError(
971        ContainerId containerId, Throwable t) {
972      LOG.error("Failed to query the status of Container " + containerId);
973    }
974
975    @Override
976    public void onStopContainerError(ContainerId containerId, Throwable t) {
977      LOG.error("Failed to stop Container " + containerId);
978      containers.remove(containerId);
979    }
980
981    @Override
982    public void onIncreaseContainerResourceError(
983        ContainerId containerId, Throwable t) {}
984
985  }
986
987  /**
988   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
989   * that will execute the shell command.
990   */
991  private class LaunchContainerRunnable implements Runnable {
992
993    // Allocated container
994    private Container container;
995    private String shellId;
996
997    NMCallbackHandler containerListener;
998
999    /**
1000     * @param lcontainer Allocated container
1001     * @param containerListener Callback handler of the container
1002     */
1003    public LaunchContainerRunnable(Container lcontainer,
1004        NMCallbackHandler containerListener, String shellId) {
1005      this.container = lcontainer;
1006      this.containerListener = containerListener;
1007      this.shellId = shellId;
1008    }
1009
1010    @Override
1011    /**
1012     * Connects to CM, sets up container launch context 
1013     * for shell command and eventually dispatches the container 
1014     * start request to the CM. 
1015     */
1016    public void run() {
1017      LOG.info("Setting up container launch container for containerid="
1018          + container.getId() + " with shellid=" + shellId);
1019
1020      // Set the local resources
1021      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
1022
1023      // The container for the eventual shell commands needs its own local
1024      // resources too.
1025      // In this scenario, if a shell script is specified, we need to have it
1026      // copied and made available to the container.
1027      if (!scriptPath.isEmpty()) {
1028        Path renamedScriptPath = null;
1029        if (Shell.WINDOWS) {
1030          renamedScriptPath = new Path(scriptPath + ".bat");
1031        } else {
1032          renamedScriptPath = new Path(scriptPath + ".sh");
1033        }
1034
1035        try {
1036          // rename the script file based on the underlying OS syntax.
1037          renameScriptFile(renamedScriptPath);
1038        } catch (Exception e) {
1039          LOG.error(
1040              "Not able to add suffix (.bat/.sh) to the shell script filename",
1041              e);
1042          // We know we cannot continue launching the container
1043          // so we should release it.
1044          numCompletedContainers.incrementAndGet();
1045          numFailedContainers.incrementAndGet();
1046          return;
1047        }
1048
1049        URL yarnUrl = null;
1050        try {
1051          yarnUrl = URL.fromURI(new URI(renamedScriptPath.toString()));
1052        } catch (URISyntaxException e) {
1053          LOG.error("Error when trying to use shell script path specified"
1054              + " in env, path=" + renamedScriptPath, e);
1055          // A failure scenario on bad input such as invalid shell script path
1056          // We know we cannot continue launching the container
1057          // so we should release it.
1058          // TODO
1059          numCompletedContainers.incrementAndGet();
1060          numFailedContainers.incrementAndGet();
1061          return;
1062        }
1063        LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
1064          LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
1065          shellScriptPathLen, shellScriptPathTimestamp);
1066        localResources.put(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH :
1067            EXEC_SHELL_STRING_PATH, shellRsrc);
1068        shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
1069      }
1070
1071      // Set the necessary command to execute on the allocated container
1072      Vector<CharSequence> vargs = new Vector<CharSequence>(5);
1073
1074      // Set executable command
1075      vargs.add(shellCommand);
1076      // Set shell script path
1077      if (!scriptPath.isEmpty()) {
1078        vargs.add(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH
1079            : EXEC_SHELL_STRING_PATH);
1080      }
1081
1082      // Set args for the shell command if any
1083      vargs.add(shellArgs);
1084      // Add log redirect params
1085      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
1086      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
1087
1088      // Get final commmand
1089      StringBuilder command = new StringBuilder();
1090      for (CharSequence str : vargs) {
1091        command.append(str).append(" ");
1092      }
1093
1094      List<String> commands = new ArrayList<String>();
1095      commands.add(command.toString());
1096
1097      // Set up ContainerLaunchContext, setting local resource, environment,
1098      // command and token for constructor.
1099
1100      // Note for tokens: Set up tokens for the container too. Today, for normal
1101      // shell commands, the container in distribute-shell doesn't need any
1102      // tokens. We are populating them mainly for NodeManagers to be able to
1103      // download anyfiles in the distributed file-system. The tokens are
1104      // otherwise also useful in cases, for e.g., when one is running a
1105      // "hadoop dfs" command inside the distributed shell.
1106      Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv);
1107      myShellEnv.put(YARN_SHELL_ID, shellId);
1108      ContainerRetryContext containerRetryContext =
1109          ContainerRetryContext.newInstance(
1110              containerRetryPolicy, containerRetryErrorCodes,
1111              containerMaxRetries, containrRetryInterval);
1112      ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
1113        localResources, myShellEnv, commands, null, allTokens.duplicate(),
1114          null, containerRetryContext);
1115      containerListener.addContainer(container.getId(), container);
1116      nmClientAsync.startContainerAsync(container, ctx);
1117    }
1118  }
1119
1120  private void renameScriptFile(final Path renamedScriptPath)
1121      throws IOException, InterruptedException {
1122    appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
1123      @Override
1124      public Void run() throws IOException {
1125        FileSystem fs = renamedScriptPath.getFileSystem(conf);
1126        fs.rename(new Path(scriptPath), renamedScriptPath);
1127        return null;
1128      }
1129    });
1130    LOG.info("User " + appSubmitterUgi.getUserName()
1131        + " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
1132  }
1133
1134  /**
1135   * Setup the request that will be sent to the RM for the container ask.
1136   *
1137   * @return the setup ResourceRequest to be sent to RM
1138   */
1139  private ContainerRequest setupContainerAskForRM() {
1140    // setup requirements for hosts
1141    // using * as any host will do for the distributed shell app
1142    // set the priority for the request
1143    // TODO - what is the range for priority? how to decide?
1144    Priority pri = Priority.newInstance(requestPriority);
1145
1146    // Set up resource type requirements
1147    // For now, memory and CPU are supported so we set memory and cpu requirements
1148    Resource capability = Resource.newInstance(containerMemory,
1149      containerVirtualCores);
1150
1151    ContainerRequest request = new ContainerRequest(capability, null, null,
1152        pri);
1153    LOG.info("Requested container ask: " + request.toString());
1154    return request;
1155  }
1156
1157  private boolean fileExist(String filePath) {
1158    return new File(filePath).exists();
1159  }
1160
1161  private String readContent(String filePath) throws IOException {
1162    DataInputStream ds = null;
1163    try {
1164      ds = new DataInputStream(new FileInputStream(filePath));
1165      return ds.readUTF();
1166    } finally {
1167      org.apache.commons.io.IOUtils.closeQuietly(ds);
1168    }
1169  }
1170
1171  private void publishContainerStartEvent(
1172      final TimelineClient timelineClient, final Container container,
1173      String domainId, UserGroupInformation ugi) {
1174    final TimelineEntity entity = new TimelineEntity();
1175    entity.setEntityId(container.getId().toString());
1176    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1177    entity.setDomainId(domainId);
1178    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
1179    entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME, container.getId()
1180        .getApplicationAttemptId().getApplicationId().toString());
1181    TimelineEvent event = new TimelineEvent();
1182    event.setTimestamp(System.currentTimeMillis());
1183    event.setEventType(DSEvent.DS_CONTAINER_START.toString());
1184    event.addEventInfo("Node", container.getNodeId().toString());
1185    event.addEventInfo("Resources", container.getResource().toString());
1186    entity.addEvent(event);
1187
1188    try {
1189      processTimelineResponseErrors(
1190          putContainerEntity(timelineClient,
1191              container.getId().getApplicationAttemptId(),
1192              entity));
1193    } catch (YarnException | IOException | ClientHandlerException e) {
1194      LOG.error("Container start event could not be published for "
1195          + container.getId().toString(), e);
1196    }
1197  }
1198
1199  @VisibleForTesting
1200  void publishContainerEndEvent(
1201      final TimelineClient timelineClient, ContainerStatus container,
1202      String domainId, UserGroupInformation ugi) {
1203    final TimelineEntity entity = new TimelineEntity();
1204    entity.setEntityId(container.getContainerId().toString());
1205    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1206    entity.setDomainId(domainId);
1207    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
1208    entity.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME,
1209        container.getContainerId().getApplicationAttemptId()
1210            .getApplicationId().toString());
1211    TimelineEvent event = new TimelineEvent();
1212    event.setTimestamp(System.currentTimeMillis());
1213    event.setEventType(DSEvent.DS_CONTAINER_END.toString());
1214    event.addEventInfo("State", container.getState().name());
1215    event.addEventInfo("Exit Status", container.getExitStatus());
1216    entity.addEvent(event);
1217    try {
1218      processTimelineResponseErrors(
1219          putContainerEntity(timelineClient,
1220              container.getContainerId().getApplicationAttemptId(),
1221              entity));
1222    } catch (YarnException | IOException | ClientHandlerException e) {
1223      LOG.error("Container end event could not be published for "
1224          + container.getContainerId().toString(), e);
1225    }
1226  }
1227
1228  private TimelinePutResponse putContainerEntity(
1229      TimelineClient timelineClient, ApplicationAttemptId currAttemptId,
1230      TimelineEntity entity)
1231      throws YarnException, IOException {
1232    if (TimelineUtils.timelineServiceV1_5Enabled(conf)) {
1233      TimelineEntityGroupId groupId = TimelineEntityGroupId.newInstance(
1234          currAttemptId.getApplicationId(),
1235          CONTAINER_ENTITY_GROUP_ID);
1236      return timelineClient.putEntities(currAttemptId, groupId, entity);
1237    } else {
1238      return timelineClient.putEntities(entity);
1239    }
1240  }
1241
1242  private void publishApplicationAttemptEvent(
1243      final TimelineClient timelineClient, String appAttemptId,
1244      DSEvent appEvent, String domainId, UserGroupInformation ugi) {
1245    final TimelineEntity entity = new TimelineEntity();
1246    entity.setEntityId(appAttemptId);
1247    entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
1248    entity.setDomainId(domainId);
1249    entity.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, ugi.getShortUserName());
1250    TimelineEvent event = new TimelineEvent();
1251    event.setEventType(appEvent.toString());
1252    event.setTimestamp(System.currentTimeMillis());
1253    entity.addEvent(event);
1254    try {
1255      TimelinePutResponse response = timelineClient.putEntities(entity);
1256      processTimelineResponseErrors(response);
1257    } catch (YarnException | IOException | ClientHandlerException e) {
1258      LOG.error("App Attempt "
1259          + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
1260          + " event could not be published for "
1261          + appAttemptId.toString(), e);
1262    }
1263  }
1264
1265  private TimelinePutResponse processTimelineResponseErrors(
1266      TimelinePutResponse response) {
1267    List<TimelinePutResponse.TimelinePutError> errors = response.getErrors();
1268    if (errors.size() == 0) {
1269      LOG.debug("Timeline entities are successfully put");
1270    } else {
1271      for (TimelinePutResponse.TimelinePutError error : errors) {
1272        LOG.error(
1273            "Error when publishing entity [" + error.getEntityType() + ","
1274                + error.getEntityId() + "], server side error code: "
1275                + error.getErrorCode());
1276      }
1277    }
1278    return response;
1279  }
1280
1281  RMCallbackHandler getRMCallbackHandler() {
1282    return new RMCallbackHandler();
1283  }
1284
1285  @VisibleForTesting
1286  void setAmRMClient(AMRMClientAsync client) {
1287    this.amRMClient = client;
1288  }
1289
1290  @VisibleForTesting
1291  int getNumCompletedContainers() {
1292    return numCompletedContainers.get();
1293  }
1294
1295  @VisibleForTesting
1296  boolean getDone() {
1297    return done;
1298  }
1299
1300  @VisibleForTesting
1301  Thread createLaunchContainerThread(Container allocatedContainer,
1302      String shellId) {
1303    LaunchContainerRunnable runnableLaunchContainer =
1304        new LaunchContainerRunnable(allocatedContainer, containerListener,
1305            shellId);
1306    return new Thread(runnableLaunchContainer);
1307  }
1308}