001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.yarn.applications.distributedshell;
020
021import java.io.BufferedReader;
022import java.io.DataInputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.IOException;
026import java.io.StringReader;
027import java.lang.reflect.UndeclaredThrowableException;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.nio.ByteBuffer;
031import java.security.PrivilegedExceptionAction;
032import java.util.ArrayList;
033import java.util.Collections;
034import java.util.HashMap;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.Set;
039import java.util.Vector;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.ConcurrentMap;
042import java.util.concurrent.atomic.AtomicInteger;
043
044import org.apache.commons.cli.CommandLine;
045import org.apache.commons.cli.GnuParser;
046import org.apache.commons.cli.HelpFormatter;
047import org.apache.commons.cli.Options;
048import org.apache.commons.cli.ParseException;
049import org.apache.commons.logging.Log;
050import org.apache.commons.logging.LogFactory;
051import org.apache.hadoop.classification.InterfaceAudience;
052import org.apache.hadoop.classification.InterfaceAudience.Private;
053import org.apache.hadoop.classification.InterfaceStability;
054import org.apache.hadoop.conf.Configuration;
055import org.apache.hadoop.fs.FileSystem;
056import org.apache.hadoop.fs.Path;
057import org.apache.hadoop.io.DataOutputBuffer;
058import org.apache.hadoop.io.IOUtils;
059import org.apache.hadoop.net.NetUtils;
060import org.apache.hadoop.security.Credentials;
061import org.apache.hadoop.security.UserGroupInformation;
062import org.apache.hadoop.security.token.Token;
063import org.apache.hadoop.util.ExitUtil;
064import org.apache.hadoop.util.Shell;
065import org.apache.hadoop.yarn.api.ApplicationConstants;
066import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
067import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
068import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
069import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
070import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
071import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
072import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
073import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
074import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
075import org.apache.hadoop.yarn.api.records.Container;
076import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
077import org.apache.hadoop.yarn.api.records.ContainerId;
078import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
079import org.apache.hadoop.yarn.api.records.ContainerState;
080import org.apache.hadoop.yarn.api.records.ContainerStatus;
081import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
082import org.apache.hadoop.yarn.api.records.LocalResource;
083import org.apache.hadoop.yarn.api.records.LocalResourceType;
084import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
085import org.apache.hadoop.yarn.api.records.NodeReport;
086import org.apache.hadoop.yarn.api.records.Priority;
087import org.apache.hadoop.yarn.api.records.Resource;
088import org.apache.hadoop.yarn.api.records.ResourceRequest;
089import org.apache.hadoop.yarn.api.records.URL;
090import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
091import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
092import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
093import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
094import org.apache.hadoop.yarn.client.api.TimelineClient;
095import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
096import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
097import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
098import org.apache.hadoop.yarn.conf.YarnConfiguration;
099import org.apache.hadoop.yarn.exceptions.YarnException;
100import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
101import org.apache.hadoop.yarn.util.ConverterUtils;
102import org.apache.log4j.LogManager;
103
104import com.google.common.annotations.VisibleForTesting;
105
106/**
107 * An ApplicationMaster for executing shell commands on a set of launched
108 * containers using the YARN framework.
109 * 
110 * <p>
111 * This class is meant to act as an example on how to write yarn-based
112 * application masters.
113 * </p>
114 * 
115 * <p>
116 * The ApplicationMaster is started on a container by the
117 * <code>ResourceManager</code>'s launcher. The first thing that the
118 * <code>ApplicationMaster</code> needs to do is to connect and register itself
119 * with the <code>ResourceManager</code>. The registration sets up information
120 * within the <code>ResourceManager</code> regarding what host:port the
121 * ApplicationMaster is listening on to provide any form of functionality to a
122 * client as well as a tracking url that a client can use to keep track of
123 * status/job history if needed. However, in the distributedshell, trackingurl
124 * and appMasterHost:appMasterRpcPort are not supported.
125 * </p>
126 * 
127 * <p>
128 * The <code>ApplicationMaster</code> needs to send a heartbeat to the
129 * <code>ResourceManager</code> at regular intervals to inform the
130 * <code>ResourceManager</code> that it is up and alive. The
131 * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
132 * <code>ApplicationMaster</code> acts as a heartbeat.
133 * 
134 * <p>
135 * For the actual handling of the job, the <code>ApplicationMaster</code> has to
136 * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
137 * required no. of containers using {@link ResourceRequest} with the necessary
138 * resource specifications such as node location, computational
139 * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
140 * responds with an {@link AllocateResponse} that informs the
141 * <code>ApplicationMaster</code> of the set of newly allocated containers,
142 * completed containers as well as current state of available resources.
143 * </p>
144 * 
145 * <p>
146 * For each allocated container, the <code>ApplicationMaster</code> can then set
147 * up the necessary launch context via {@link ContainerLaunchContext} to specify
148 * the allocated container id, local resources required by the executable, the
149 * environment to be setup for the executable, commands to execute, etc. and
150 * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
151 * launch and execute the defined commands on the given allocated container.
152 * </p>
153 * 
154 * <p>
155 * The <code>ApplicationMaster</code> can monitor the launched container by
156 * either querying the <code>ResourceManager</code> using
157 * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
158 * the {@link ContainerManagementProtocol} by querying for the status of the allocated
159 * container's {@link ContainerId}.
160 *
161 * <p>
162 * After the job has been completed, the <code>ApplicationMaster</code> has to
163 * send a {@link FinishApplicationMasterRequest} to the
164 * <code>ResourceManager</code> to inform it that the
165 * <code>ApplicationMaster</code> has been completed.
166 */
167@InterfaceAudience.Public
168@InterfaceStability.Unstable
169public class ApplicationMaster {
170
171  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
172
173  @VisibleForTesting
174  @Private
175  public static enum DSEvent {
176    DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
177  }
178  
179  @VisibleForTesting
180  @Private
181  public static enum DSEntity {
182    DS_APP_ATTEMPT, DS_CONTAINER
183  }
184
185  private static final String YARN_SHELL_ID = "YARN_SHELL_ID";
186
187  // Configuration
188  private Configuration conf;
189
190  // Handle to communicate with the Resource Manager
191  @SuppressWarnings("rawtypes")
192  private AMRMClientAsync amRMClient;
193
194  // In both secure and non-secure modes, this points to the job-submitter.
195  @VisibleForTesting
196  UserGroupInformation appSubmitterUgi;
197
198  // Handle to communicate with the Node Manager
199  private NMClientAsync nmClientAsync;
200  // Listen to process the response from the Node Manager
201  private NMCallbackHandler containerListener;
202
203  // Application Attempt Id ( combination of attemptId and fail count )
204  @VisibleForTesting
205  protected ApplicationAttemptId appAttemptID;
206
207  // TODO
208  // For status update for clients - yet to be implemented
209  // Hostname of the container
210  private String appMasterHostname = "";
211  // Port on which the app master listens for status updates from clients
212  private int appMasterRpcPort = -1;
213  // Tracking url to which app master publishes info for clients to monitor
214  private String appMasterTrackingUrl = "";
215
216  // App Master configuration
217  // No. of containers to run shell command on
218  @VisibleForTesting
219  protected int numTotalContainers = 1;
220  // Memory to request for the container on which the shell command will run
221  private int containerMemory = 10;
222  // VirtualCores to request for the container on which the shell command will run
223  private int containerVirtualCores = 1;
224  // Priority of the request
225  private int requestPriority;
226
227  // Counter for completed containers ( complete denotes successful or failed )
228  private AtomicInteger numCompletedContainers = new AtomicInteger();
229  // Allocated container count so that we know how many containers has the RM
230  // allocated to us
231  @VisibleForTesting
232  protected AtomicInteger numAllocatedContainers = new AtomicInteger();
233  // Count of failed containers
234  private AtomicInteger numFailedContainers = new AtomicInteger();
235  // Count of containers already requested from the RM
236  // Needed as once requested, we should not request for containers again.
237  // Only request for more if the original requirement changes.
238  @VisibleForTesting
239  protected AtomicInteger numRequestedContainers = new AtomicInteger();
240
241  // Shell command to be executed
242  private String shellCommand = "";
243  // Args to be passed to the shell command
244  private String shellArgs = "";
245  // Env variables to be setup for the shell command
246  private Map<String, String> shellEnv = new HashMap<String, String>();
247
248  // Location of shell script ( obtained from info set in env )
249  // Shell script path in fs
250  private String scriptPath = "";
251  // Timestamp needed for creating a local resource
252  private long shellScriptPathTimestamp = 0;
253  // File length needed for local resource
254  private long shellScriptPathLen = 0;
255
256  // Timeline domain ID
257  private String domainId = null;
258
259  // Hardcoded path to shell script in launch container's local env
260  private static final String ExecShellStringPath = Client.SCRIPT_PATH + ".sh";
261  private static final String ExecBatScripStringtPath = Client.SCRIPT_PATH
262      + ".bat";
263
264  // Hardcoded path to custom log_properties
265  private static final String log4jPath = "log4j.properties";
266
267  private static final String shellCommandPath = "shellCommands";
268  private static final String shellArgsPath = "shellArgs";
269
270  private volatile boolean done;
271
272  private ByteBuffer allTokens;
273
274  // Launch threads
275  private List<Thread> launchThreads = new ArrayList<Thread>();
276
277  // Timeline Client
278  @VisibleForTesting
279  TimelineClient timelineClient;
280
281  private final String linux_bash_command = "bash";
282  private final String windows_command = "cmd /c";
283
284  private int yarnShellIdCounter = 1;
285
286  @VisibleForTesting
287  protected final Set<ContainerId> launchedContainers =
288      Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
289
290  /**
291   * @param args Command line args
292   */
293  public static void main(String[] args) {
294    boolean result = false;
295    try {
296      ApplicationMaster appMaster = new ApplicationMaster();
297      LOG.info("Initializing ApplicationMaster");
298      boolean doRun = appMaster.init(args);
299      if (!doRun) {
300        System.exit(0);
301      }
302      appMaster.run();
303      result = appMaster.finish();
304    } catch (Throwable t) {
305      LOG.fatal("Error running ApplicationMaster", t);
306      LogManager.shutdown();
307      ExitUtil.terminate(1, t);
308    }
309    if (result) {
310      LOG.info("Application Master completed successfully. exiting");
311      System.exit(0);
312    } else {
313      LOG.info("Application Master failed. exiting");
314      System.exit(2);
315    }
316  }
317
318  /**
319   * Dump out contents of $CWD and the environment to stdout for debugging
320   */
321  private void dumpOutDebugInfo() {
322
323    LOG.info("Dump debug output");
324    Map<String, String> envs = System.getenv();
325    for (Map.Entry<String, String> env : envs.entrySet()) {
326      LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
327      System.out.println("System env: key=" + env.getKey() + ", val="
328          + env.getValue());
329    }
330
331    BufferedReader buf = null;
332    try {
333      String lines = Shell.WINDOWS ? Shell.execCommand("cmd", "/c", "dir") :
334        Shell.execCommand("ls", "-al");
335      buf = new BufferedReader(new StringReader(lines));
336      String line = "";
337      while ((line = buf.readLine()) != null) {
338        LOG.info("System CWD content: " + line);
339        System.out.println("System CWD content: " + line);
340      }
341    } catch (IOException e) {
342      e.printStackTrace();
343    } finally {
344      IOUtils.cleanup(LOG, buf);
345    }
346  }
347
348  public ApplicationMaster() {
349    // Set up the configuration
350    conf = new YarnConfiguration();
351  }
352
353  /**
354   * Parse command line options
355   *
356   * @param args Command line args
357   * @return Whether init successful and run should be invoked
358   * @throws ParseException
359   * @throws IOException
360   */
361  public boolean init(String[] args) throws ParseException, IOException {
362    Options opts = new Options();
363    opts.addOption("app_attempt_id", true,
364        "App Attempt ID. Not to be used unless for testing purposes");
365    opts.addOption("shell_env", true,
366        "Environment for shell script. Specified as env_key=env_val pairs");
367    opts.addOption("container_memory", true,
368        "Amount of memory in MB to be requested to run the shell command");
369    opts.addOption("container_vcores", true,
370        "Amount of virtual cores to be requested to run the shell command");
371    opts.addOption("num_containers", true,
372        "No. of containers on which the shell command needs to be executed");
373    opts.addOption("priority", true, "Application Priority. Default 0");
374    opts.addOption("debug", false, "Dump out debug information");
375
376    opts.addOption("help", false, "Print usage");
377    CommandLine cliParser = new GnuParser().parse(opts, args);
378
379    if (args.length == 0) {
380      printUsage(opts);
381      throw new IllegalArgumentException(
382          "No args specified for application master to initialize");
383    }
384
385    //Check whether customer log4j.properties file exists
386    if (fileExist(log4jPath)) {
387      try {
388        Log4jPropertyHelper.updateLog4jConfiguration(ApplicationMaster.class,
389            log4jPath);
390      } catch (Exception e) {
391        LOG.warn("Can not set up custom log4j properties. " + e);
392      }
393    }
394
395    if (cliParser.hasOption("help")) {
396      printUsage(opts);
397      return false;
398    }
399
400    if (cliParser.hasOption("debug")) {
401      dumpOutDebugInfo();
402    }
403
404    Map<String, String> envs = System.getenv();
405
406    if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
407      if (cliParser.hasOption("app_attempt_id")) {
408        String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
409        appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
410      } else {
411        throw new IllegalArgumentException(
412            "Application Attempt Id not set in the environment");
413      }
414    } else {
415      ContainerId containerId = ConverterUtils.toContainerId(envs
416          .get(Environment.CONTAINER_ID.name()));
417      appAttemptID = containerId.getApplicationAttemptId();
418    }
419
420    if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
421      throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
422          + " not set in the environment");
423    }
424    if (!envs.containsKey(Environment.NM_HOST.name())) {
425      throw new RuntimeException(Environment.NM_HOST.name()
426          + " not set in the environment");
427    }
428    if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
429      throw new RuntimeException(Environment.NM_HTTP_PORT
430          + " not set in the environment");
431    }
432    if (!envs.containsKey(Environment.NM_PORT.name())) {
433      throw new RuntimeException(Environment.NM_PORT.name()
434          + " not set in the environment");
435    }
436
437    LOG.info("Application master for app" + ", appId="
438        + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
439        + appAttemptID.getApplicationId().getClusterTimestamp()
440        + ", attemptId=" + appAttemptID.getAttemptId());
441
442    if (!fileExist(shellCommandPath)
443        && envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION).isEmpty()) {
444      throw new IllegalArgumentException(
445          "No shell command or shell script specified to be executed by application master");
446    }
447
448    if (fileExist(shellCommandPath)) {
449      shellCommand = readContent(shellCommandPath);
450    }
451
452    if (fileExist(shellArgsPath)) {
453      shellArgs = readContent(shellArgsPath);
454    }
455
456    if (cliParser.hasOption("shell_env")) {
457      String shellEnvs[] = cliParser.getOptionValues("shell_env");
458      for (String env : shellEnvs) {
459        env = env.trim();
460        int index = env.indexOf('=');
461        if (index == -1) {
462          shellEnv.put(env, "");
463          continue;
464        }
465        String key = env.substring(0, index);
466        String val = "";
467        if (index < (env.length() - 1)) {
468          val = env.substring(index + 1);
469        }
470        shellEnv.put(key, val);
471      }
472    }
473
474    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
475      scriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
476
477      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
478        shellScriptPathTimestamp = Long.parseLong(envs
479            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
480      }
481      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
482        shellScriptPathLen = Long.parseLong(envs
483            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
484      }
485      if (!scriptPath.isEmpty()
486          && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
487        LOG.error("Illegal values in env for shell script path" + ", path="
488            + scriptPath + ", len=" + shellScriptPathLen + ", timestamp="
489            + shellScriptPathTimestamp);
490        throw new IllegalArgumentException(
491            "Illegal values in env for shell script path");
492      }
493    }
494
495    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN)) {
496      domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
497    }
498
499    containerMemory = Integer.parseInt(cliParser.getOptionValue(
500        "container_memory", "10"));
501    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
502        "container_vcores", "1"));
503    numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
504        "num_containers", "1"));
505    if (numTotalContainers == 0) {
506      throw new IllegalArgumentException(
507          "Cannot run distributed shell with no containers");
508    }
509    requestPriority = Integer.parseInt(cliParser
510        .getOptionValue("priority", "0"));
511    return true;
512  }
513
514  /**
515   * Helper function to print usage
516   *
517   * @param opts Parsed command line options
518   */
519  private void printUsage(Options opts) {
520    new HelpFormatter().printHelp("ApplicationMaster", opts);
521  }
522
523  /**
524   * Main run function for the application master
525   *
526   * @throws YarnException
527   * @throws IOException
528   */
529  @SuppressWarnings({ "unchecked" })
530  public void run() throws YarnException, IOException, InterruptedException {
531    LOG.info("Starting ApplicationMaster");
532
533    // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
534    // are marked as LimitedPrivate
535    Credentials credentials =
536        UserGroupInformation.getCurrentUser().getCredentials();
537    DataOutputBuffer dob = new DataOutputBuffer();
538    credentials.writeTokenStorageToStream(dob);
539    // Now remove the AM->RM token so that containers cannot access it.
540    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
541    LOG.info("Executing with tokens:");
542    while (iter.hasNext()) {
543      Token<?> token = iter.next();
544      LOG.info(token);
545      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
546        iter.remove();
547      }
548    }
549    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
550
551    // Create appSubmitterUgi and add original tokens to it
552    String appSubmitterUserName =
553        System.getenv(ApplicationConstants.Environment.USER.name());
554    appSubmitterUgi =
555        UserGroupInformation.createRemoteUser(appSubmitterUserName);
556    appSubmitterUgi.addCredentials(credentials);
557
558
559    AMRMClientAsync.AbstractCallbackHandler allocListener =
560        new RMCallbackHandler();
561    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
562    amRMClient.init(conf);
563    amRMClient.start();
564
565    containerListener = createNMCallbackHandler();
566    nmClientAsync = new NMClientAsyncImpl(containerListener);
567    nmClientAsync.init(conf);
568    nmClientAsync.start();
569
570    startTimelineClient(conf);
571    if(timelineClient != null) {
572      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
573          DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
574    }
575
576    // Setup local RPC Server to accept status requests directly from clients
577    // TODO need to setup a protocol for client to be able to communicate to
578    // the RPC server
579    // TODO use the rpc port info to register with the RM for the client to
580    // send requests to this app master
581
582    // Register self with ResourceManager
583    // This will start heartbeating to the RM
584    appMasterHostname = NetUtils.getHostname();
585    RegisterApplicationMasterResponse response = amRMClient
586        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
587            appMasterTrackingUrl);
588    // Dump out information about cluster capability as seen by the
589    // resource manager
590    int maxMem = response.getMaximumResourceCapability().getMemory();
591    LOG.info("Max mem capability of resources in this cluster " + maxMem);
592    
593    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
594    LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
595
596    // A resource ask cannot exceed the max.
597    if (containerMemory > maxMem) {
598      LOG.info("Container memory specified above max threshold of cluster."
599          + " Using max value." + ", specified=" + containerMemory + ", max="
600          + maxMem);
601      containerMemory = maxMem;
602    }
603
604    if (containerVirtualCores > maxVCores) {
605      LOG.info("Container virtual cores specified above max threshold of cluster."
606          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
607          + maxVCores);
608      containerVirtualCores = maxVCores;
609    }
610
611    List<Container> previousAMRunningContainers =
612        response.getContainersFromPreviousAttempts();
613    LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
614      + " previous attempts' running containers on AM registration.");
615    for(Container container: previousAMRunningContainers) {
616      launchedContainers.add(container.getId());
617    }
618    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
619
620
621    int numTotalContainersToRequest =
622        numTotalContainers - previousAMRunningContainers.size();
623    // Setup ask for containers from RM
624    // Send request for containers to RM
625    // Until we get our fully allocated quota, we keep on polling RM for
626    // containers
627    // Keep looping until all the containers are launched and shell script
628    // executed on them ( regardless of success/failure).
629    for (int i = 0; i < numTotalContainersToRequest; ++i) {
630      ContainerRequest containerAsk = setupContainerAskForRM();
631      amRMClient.addContainerRequest(containerAsk);
632    }
633    numRequestedContainers.set(numTotalContainers);
634  }
635
636  @VisibleForTesting
637  void startTimelineClient(final Configuration conf)
638      throws YarnException, IOException, InterruptedException {
639    try {
640      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
641        @Override
642        public Void run() throws Exception {
643          if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
644              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
645            // Creating the Timeline Client
646            timelineClient = TimelineClient.createTimelineClient();
647            timelineClient.init(conf);
648            timelineClient.start();
649          } else {
650            timelineClient = null;
651            LOG.warn("Timeline service is not enabled");
652          }
653          return null;
654        }
655      });
656    } catch (UndeclaredThrowableException e) {
657      throw new YarnException(e.getCause());
658    }
659  }
660
661  @VisibleForTesting
662  NMCallbackHandler createNMCallbackHandler() {
663    return new NMCallbackHandler(this);
664  }
665
666  @VisibleForTesting
667  protected boolean finish() {
668    // wait for completion.
669    while (!done
670        && (numCompletedContainers.get() != numTotalContainers)) {
671      try {
672        Thread.sleep(200);
673      } catch (InterruptedException ex) {}
674    }
675
676    if(timelineClient != null) {
677      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
678          DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
679    }
680
681    // Join all launched threads
682    // needed for when we time out
683    // and we need to release containers
684    for (Thread launchThread : launchThreads) {
685      try {
686        launchThread.join(10000);
687      } catch (InterruptedException e) {
688        LOG.info("Exception thrown in thread join: " + e.getMessage());
689        e.printStackTrace();
690      }
691    }
692
693    // When the application completes, it should stop all running containers
694    LOG.info("Application completed. Stopping running containers");
695    nmClientAsync.stop();
696
697    // When the application completes, it should send a finish application
698    // signal to the RM
699    LOG.info("Application completed. Signalling finish to RM");
700
701    FinalApplicationStatus appStatus;
702    String appMessage = null;
703    boolean success = true;
704    if (numFailedContainers.get() == 0 && 
705        numCompletedContainers.get() == numTotalContainers) {
706      appStatus = FinalApplicationStatus.SUCCEEDED;
707    } else {
708      appStatus = FinalApplicationStatus.FAILED;
709      appMessage = "Diagnostics." + ", total=" + numTotalContainers
710          + ", completed=" + numCompletedContainers.get() + ", allocated="
711          + numAllocatedContainers.get() + ", failed="
712          + numFailedContainers.get();
713      LOG.info(appMessage);
714      success = false;
715    }
716    try {
717      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
718    } catch (YarnException ex) {
719      LOG.error("Failed to unregister application", ex);
720    } catch (IOException e) {
721      LOG.error("Failed to unregister application", e);
722    }
723    
724    amRMClient.stop();
725
726    // Stop Timeline Client
727    if(timelineClient != null) {
728      timelineClient.stop();
729    }
730
731    return success;
732  }
733
734  @VisibleForTesting
735  class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
736    @SuppressWarnings("unchecked")
737    @Override
738    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
739      LOG.info("Got response from RM for container ask, completedCnt="
740          + completedContainers.size());
741      for (ContainerStatus containerStatus : completedContainers) {
742        LOG.info(appAttemptID + " got container status for containerID="
743            + containerStatus.getContainerId() + ", state="
744            + containerStatus.getState() + ", exitStatus="
745            + containerStatus.getExitStatus() + ", diagnostics="
746            + containerStatus.getDiagnostics());
747
748        // non complete containers should not be here
749        assert (containerStatus.getState() == ContainerState.COMPLETE);
750        // ignore containers we know nothing about - probably from a previous
751        // attempt
752        if (!launchedContainers.contains(containerStatus.getContainerId())) {
753          LOG.info("Ignoring completed status of "
754              + containerStatus.getContainerId()
755              + "; unknown container(probably launched by previous attempt)");
756          continue;
757        }
758
759        // increment counters for completed/failed containers
760        int exitStatus = containerStatus.getExitStatus();
761        if (0 != exitStatus) {
762          // container failed
763          if (ContainerExitStatus.ABORTED != exitStatus) {
764            // shell script failed
765            // counts as completed
766            numCompletedContainers.incrementAndGet();
767            numFailedContainers.incrementAndGet();
768          } else {
769            // container was killed by framework, possibly preempted
770            // we should re-try as the container was lost for some reason
771            numAllocatedContainers.decrementAndGet();
772            numRequestedContainers.decrementAndGet();
773            // we do not need to release the container as it would be done
774            // by the RM
775          }
776        } else {
777          // nothing to do
778          // container completed successfully
779          numCompletedContainers.incrementAndGet();
780          LOG.info("Container completed successfully." + ", containerId="
781              + containerStatus.getContainerId());
782        }
783        if(timelineClient != null) {
784          publishContainerEndEvent(
785              timelineClient, containerStatus, domainId, appSubmitterUgi);
786        }
787      }
788      
789      // ask for more containers if any failed
790      int askCount = numTotalContainers - numRequestedContainers.get();
791      numRequestedContainers.addAndGet(askCount);
792
793      if (askCount > 0) {
794        for (int i = 0; i < askCount; ++i) {
795          ContainerRequest containerAsk = setupContainerAskForRM();
796          amRMClient.addContainerRequest(containerAsk);
797        }
798      }
799      
800      if (numCompletedContainers.get() == numTotalContainers) {
801        done = true;
802      }
803    }
804
805    @Override
806    public void onContainersAllocated(List<Container> allocatedContainers) {
807      LOG.info("Got response from RM for container ask, allocatedCnt="
808          + allocatedContainers.size());
809      numAllocatedContainers.addAndGet(allocatedContainers.size());
810      for (Container allocatedContainer : allocatedContainers) {
811        String yarnShellId = Integer.toString(yarnShellIdCounter);
812        yarnShellIdCounter++;
813        LOG.info("Launching shell command on a new container."
814            + ", containerId=" + allocatedContainer.getId()
815            + ", yarnShellId=" + yarnShellId
816            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
817            + ":" + allocatedContainer.getNodeId().getPort()
818            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
819            + ", containerResourceMemory"
820            + allocatedContainer.getResource().getMemory()
821            + ", containerResourceVirtualCores"
822            + allocatedContainer.getResource().getVirtualCores());
823        // + ", containerToken"
824        // +allocatedContainer.getContainerToken().getIdentifier().toString());
825
826        Thread launchThread = createLaunchContainerThread(allocatedContainer,
827            yarnShellId);
828
829        // launch and start the container on a separate thread to keep
830        // the main thread unblocked
831        // as all containers may not be allocated at one go.
832        launchThreads.add(launchThread);
833        launchedContainers.add(allocatedContainer.getId());
834        launchThread.start();
835      }
836    }
837
838    @Override
839    public void onContainersResourceChanged(List<Container> containers) {}
840
841    @Override
842    public void onShutdownRequest() {
843      done = true;
844    }
845
846    @Override
847    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
848
849    @Override
850    public float getProgress() {
851      // set progress to deliver to RM on next heartbeat
852      float progress = (float) numCompletedContainers.get()
853          / numTotalContainers;
854      return progress;
855    }
856
857    @Override
858    public void onError(Throwable e) {
859      done = true;
860      amRMClient.stop();
861    }
862  }
863
864  @VisibleForTesting
865  static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
866
867    private ConcurrentMap<ContainerId, Container> containers =
868        new ConcurrentHashMap<ContainerId, Container>();
869    private final ApplicationMaster applicationMaster;
870
871    public NMCallbackHandler(ApplicationMaster applicationMaster) {
872      this.applicationMaster = applicationMaster;
873    }
874
875    public void addContainer(ContainerId containerId, Container container) {
876      containers.putIfAbsent(containerId, container);
877    }
878
879    @Override
880    public void onContainerStopped(ContainerId containerId) {
881      if (LOG.isDebugEnabled()) {
882        LOG.debug("Succeeded to stop Container " + containerId);
883      }
884      containers.remove(containerId);
885    }
886
887    @Override
888    public void onContainerStatusReceived(ContainerId containerId,
889        ContainerStatus containerStatus) {
890      if (LOG.isDebugEnabled()) {
891        LOG.debug("Container Status: id=" + containerId + ", status=" +
892            containerStatus);
893      }
894    }
895
896    @Override
897    public void onContainerStarted(ContainerId containerId,
898        Map<String, ByteBuffer> allServiceResponse) {
899      if (LOG.isDebugEnabled()) {
900        LOG.debug("Succeeded to start Container " + containerId);
901      }
902      Container container = containers.get(containerId);
903      if (container != null) {
904        applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
905      }
906      if(applicationMaster.timelineClient != null) {
907        ApplicationMaster.publishContainerStartEvent(
908            applicationMaster.timelineClient, container,
909            applicationMaster.domainId, applicationMaster.appSubmitterUgi);
910      }
911    }
912
913    @Override
914    public void onContainerResourceIncreased(
915        ContainerId containerId, Resource resource) {}
916
917    @Override
918    public void onStartContainerError(ContainerId containerId, Throwable t) {
919      LOG.error("Failed to start Container " + containerId);
920      containers.remove(containerId);
921      applicationMaster.numCompletedContainers.incrementAndGet();
922      applicationMaster.numFailedContainers.incrementAndGet();
923    }
924
925    @Override
926    public void onGetContainerStatusError(
927        ContainerId containerId, Throwable t) {
928      LOG.error("Failed to query the status of Container " + containerId);
929    }
930
931    @Override
932    public void onStopContainerError(ContainerId containerId, Throwable t) {
933      LOG.error("Failed to stop Container " + containerId);
934      containers.remove(containerId);
935    }
936
937    @Override
938    public void onIncreaseContainerResourceError(
939        ContainerId containerId, Throwable t) {}
940
941  }
942
943  /**
944   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
945   * that will execute the shell command.
946   */
947  private class LaunchContainerRunnable implements Runnable {
948
949    // Allocated container
950    private Container container;
951    private String shellId;
952
953    NMCallbackHandler containerListener;
954
955    /**
956     * @param lcontainer Allocated container
957     * @param containerListener Callback handler of the container
958     */
959    public LaunchContainerRunnable(Container lcontainer,
960        NMCallbackHandler containerListener, String shellId) {
961      this.container = lcontainer;
962      this.containerListener = containerListener;
963      this.shellId = shellId;
964    }
965
966    @Override
967    /**
968     * Connects to CM, sets up container launch context 
969     * for shell command and eventually dispatches the container 
970     * start request to the CM. 
971     */
972    public void run() {
973      LOG.info("Setting up container launch container for containerid="
974          + container.getId() + " with shellid=" + shellId);
975
976      // Set the local resources
977      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
978
979      // The container for the eventual shell commands needs its own local
980      // resources too.
981      // In this scenario, if a shell script is specified, we need to have it
982      // copied and made available to the container.
983      if (!scriptPath.isEmpty()) {
984        Path renamedScriptPath = null;
985        if (Shell.WINDOWS) {
986          renamedScriptPath = new Path(scriptPath + ".bat");
987        } else {
988          renamedScriptPath = new Path(scriptPath + ".sh");
989        }
990
991        try {
992          // rename the script file based on the underlying OS syntax.
993          renameScriptFile(renamedScriptPath);
994        } catch (Exception e) {
995          LOG.error(
996              "Not able to add suffix (.bat/.sh) to the shell script filename",
997              e);
998          // We know we cannot continue launching the container
999          // so we should release it.
1000          numCompletedContainers.incrementAndGet();
1001          numFailedContainers.incrementAndGet();
1002          return;
1003        }
1004
1005        URL yarnUrl = null;
1006        try {
1007          yarnUrl = ConverterUtils.getYarnUrlFromURI(
1008            new URI(renamedScriptPath.toString()));
1009        } catch (URISyntaxException e) {
1010          LOG.error("Error when trying to use shell script path specified"
1011              + " in env, path=" + renamedScriptPath, e);
1012          // A failure scenario on bad input such as invalid shell script path
1013          // We know we cannot continue launching the container
1014          // so we should release it.
1015          // TODO
1016          numCompletedContainers.incrementAndGet();
1017          numFailedContainers.incrementAndGet();
1018          return;
1019        }
1020        LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
1021          LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
1022          shellScriptPathLen, shellScriptPathTimestamp);
1023        localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath :
1024            ExecShellStringPath, shellRsrc);
1025        shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
1026      }
1027
1028      // Set the necessary command to execute on the allocated container
1029      Vector<CharSequence> vargs = new Vector<CharSequence>(5);
1030
1031      // Set executable command
1032      vargs.add(shellCommand);
1033      // Set shell script path
1034      if (!scriptPath.isEmpty()) {
1035        vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
1036            : ExecShellStringPath);
1037      }
1038
1039      // Set args for the shell command if any
1040      vargs.add(shellArgs);
1041      // Add log redirect params
1042      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
1043      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
1044
1045      // Get final commmand
1046      StringBuilder command = new StringBuilder();
1047      for (CharSequence str : vargs) {
1048        command.append(str).append(" ");
1049      }
1050
1051      List<String> commands = new ArrayList<String>();
1052      commands.add(command.toString());
1053
1054      // Set up ContainerLaunchContext, setting local resource, environment,
1055      // command and token for constructor.
1056
1057      // Note for tokens: Set up tokens for the container too. Today, for normal
1058      // shell commands, the container in distribute-shell doesn't need any
1059      // tokens. We are populating them mainly for NodeManagers to be able to
1060      // download anyfiles in the distributed file-system. The tokens are
1061      // otherwise also useful in cases, for e.g., when one is running a
1062      // "hadoop dfs" command inside the distributed shell.
1063      Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv);
1064      myShellEnv.put(YARN_SHELL_ID, shellId);
1065      ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
1066        localResources, myShellEnv, commands, null, allTokens.duplicate(),
1067          null);
1068      containerListener.addContainer(container.getId(), container);
1069      nmClientAsync.startContainerAsync(container, ctx);
1070    }
1071  }
1072
1073  private void renameScriptFile(final Path renamedScriptPath)
1074      throws IOException, InterruptedException {
1075    appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
1076      @Override
1077      public Void run() throws IOException {
1078        FileSystem fs = renamedScriptPath.getFileSystem(conf);
1079        fs.rename(new Path(scriptPath), renamedScriptPath);
1080        return null;
1081      }
1082    });
1083    LOG.info("User " + appSubmitterUgi.getUserName()
1084        + " added suffix(.sh/.bat) to script file as " + renamedScriptPath);
1085  }
1086
1087  /**
1088   * Setup the request that will be sent to the RM for the container ask.
1089   *
1090   * @return the setup ResourceRequest to be sent to RM
1091   */
1092  private ContainerRequest setupContainerAskForRM() {
1093    // setup requirements for hosts
1094    // using * as any host will do for the distributed shell app
1095    // set the priority for the request
1096    // TODO - what is the range for priority? how to decide?
1097    Priority pri = Priority.newInstance(requestPriority);
1098
1099    // Set up resource type requirements
1100    // For now, memory and CPU are supported so we set memory and cpu requirements
1101    Resource capability = Resource.newInstance(containerMemory,
1102      containerVirtualCores);
1103
1104    ContainerRequest request = new ContainerRequest(capability, null, null,
1105        pri);
1106    LOG.info("Requested container ask: " + request.toString());
1107    return request;
1108  }
1109
1110  private boolean fileExist(String filePath) {
1111    return new File(filePath).exists();
1112  }
1113
1114  private String readContent(String filePath) throws IOException {
1115    DataInputStream ds = null;
1116    try {
1117      ds = new DataInputStream(new FileInputStream(filePath));
1118      return ds.readUTF();
1119    } finally {
1120      org.apache.commons.io.IOUtils.closeQuietly(ds);
1121    }
1122  }
1123  
1124  private static void publishContainerStartEvent(
1125      final TimelineClient timelineClient, Container container, String domainId,
1126      UserGroupInformation ugi) {
1127    final TimelineEntity entity = new TimelineEntity();
1128    entity.setEntityId(container.getId().toString());
1129    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1130    entity.setDomainId(domainId);
1131    entity.addPrimaryFilter("user", ugi.getShortUserName());
1132    TimelineEvent event = new TimelineEvent();
1133    event.setTimestamp(System.currentTimeMillis());
1134    event.setEventType(DSEvent.DS_CONTAINER_START.toString());
1135    event.addEventInfo("Node", container.getNodeId().toString());
1136    event.addEventInfo("Resources", container.getResource().toString());
1137    entity.addEvent(event);
1138
1139    try {
1140      ugi.doAs(new PrivilegedExceptionAction<TimelinePutResponse>() {
1141        @Override
1142        public TimelinePutResponse run() throws Exception {
1143          return processTimelineResponseErrors(
1144              timelineClient.putEntities(entity));
1145        }
1146      });
1147    } catch (Exception e) {
1148      LOG.error("Container start event could not be published for "
1149          + container.getId().toString(),
1150          e instanceof UndeclaredThrowableException ? e.getCause() : e);
1151    }
1152  }
1153
1154  private static void publishContainerEndEvent(
1155      final TimelineClient timelineClient, ContainerStatus container,
1156      String domainId, UserGroupInformation ugi) {
1157    final TimelineEntity entity = new TimelineEntity();
1158    entity.setEntityId(container.getContainerId().toString());
1159    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
1160    entity.setDomainId(domainId);
1161    entity.addPrimaryFilter("user", ugi.getShortUserName());
1162    TimelineEvent event = new TimelineEvent();
1163    event.setTimestamp(System.currentTimeMillis());
1164    event.setEventType(DSEvent.DS_CONTAINER_END.toString());
1165    event.addEventInfo("State", container.getState().name());
1166    event.addEventInfo("Exit Status", container.getExitStatus());
1167    entity.addEvent(event);
1168    try {
1169      TimelinePutResponse response = timelineClient.putEntities(entity);
1170      processTimelineResponseErrors(response);
1171    } catch (YarnException | IOException e) {
1172      LOG.error("Container end event could not be published for "
1173          + container.getContainerId().toString(), e);
1174    }
1175  }
1176
1177  private static void publishApplicationAttemptEvent(
1178      final TimelineClient timelineClient, String appAttemptId,
1179      DSEvent appEvent, String domainId, UserGroupInformation ugi) {
1180    final TimelineEntity entity = new TimelineEntity();
1181    entity.setEntityId(appAttemptId);
1182    entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
1183    entity.setDomainId(domainId);
1184    entity.addPrimaryFilter("user", ugi.getShortUserName());
1185    TimelineEvent event = new TimelineEvent();
1186    event.setEventType(appEvent.toString());
1187    event.setTimestamp(System.currentTimeMillis());
1188    entity.addEvent(event);
1189    try {
1190      TimelinePutResponse response = timelineClient.putEntities(entity);
1191      processTimelineResponseErrors(response);
1192    } catch (YarnException | IOException e) {
1193      LOG.error("App Attempt "
1194          + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
1195          + " event could not be published for "
1196          + appAttemptId.toString(), e);
1197    }
1198  }
1199
1200  private static TimelinePutResponse processTimelineResponseErrors(
1201      TimelinePutResponse response) {
1202    List<TimelinePutResponse.TimelinePutError> errors = response.getErrors();
1203    if (errors.size() == 0) {
1204      LOG.debug("Timeline entities are successfully put");
1205    } else {
1206      for (TimelinePutResponse.TimelinePutError error : errors) {
1207        LOG.error(
1208            "Error when publishing entity [" + error.getEntityType() + ","
1209                + error.getEntityId() + "], server side error code: "
1210                + error.getErrorCode());
1211      }
1212    }
1213    return response;
1214  }
1215
1216  RMCallbackHandler getRMCallbackHandler() {
1217    return new RMCallbackHandler();
1218  }
1219
1220  @VisibleForTesting
1221  void setAmRMClient(AMRMClientAsync client) {
1222    this.amRMClient = client;
1223  }
1224
1225  @VisibleForTesting
1226  int getNumCompletedContainers() {
1227    return numCompletedContainers.get();
1228  }
1229
1230  @VisibleForTesting
1231  boolean getDone() {
1232    return done;
1233  }
1234
1235  @VisibleForTesting
1236  Thread createLaunchContainerThread(Container allocatedContainer,
1237      String shellId) {
1238    LaunchContainerRunnable runnableLaunchContainer =
1239        new LaunchContainerRunnable(allocatedContainer, containerListener,
1240            shellId);
1241    return new Thread(runnableLaunchContainer);
1242  }
1243}