001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.v2.hs;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.ConnectException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.NavigableSet;
031import java.util.Set;
032import java.util.SortedMap;
033import java.util.TreeMap;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ConcurrentMap;
036import java.util.concurrent.ConcurrentSkipListMap;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.concurrent.ThreadFactory;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicInteger;
042
043import org.apache.commons.logging.Log;
044import org.apache.commons.logging.LogFactory;
045import org.apache.hadoop.classification.InterfaceAudience;
046import org.apache.hadoop.classification.InterfaceStability;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FSDataInputStream;
049import org.apache.hadoop.fs.FileAlreadyExistsException;
050import org.apache.hadoop.fs.FileContext;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.Options;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.fs.PathFilter;
055import org.apache.hadoop.fs.RemoteIterator;
056import org.apache.hadoop.fs.UnsupportedFileSystemException;
057import org.apache.hadoop.fs.permission.FsPermission;
058import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
059import org.apache.hadoop.hdfs.server.namenode.NameNode;
060import org.apache.hadoop.ipc.RetriableException;
061import org.apache.hadoop.mapred.JobACLsManager;
062import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
063import org.apache.hadoop.mapreduce.v2.api.records.JobId;
064import org.apache.hadoop.mapreduce.v2.app.job.Job;
065import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
066import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
067import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
068import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
069import org.apache.hadoop.security.AccessControlException;
070import org.apache.hadoop.service.AbstractService;
071import org.apache.hadoop.util.ShutdownThreadsHelper;
072import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
073import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
074
075import com.google.common.annotations.VisibleForTesting;
076import com.google.common.util.concurrent.ThreadFactoryBuilder;
077import org.apache.hadoop.yarn.util.Clock;
078import org.apache.hadoop.yarn.util.SystemClock;
079
080/**
081 * This class provides a way to interact with history files in a thread safe
082 * manor.
083 */
084@InterfaceAudience.Public
085@InterfaceStability.Unstable
086public class HistoryFileManager extends AbstractService {
087  private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
088  private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
089
090  private static enum HistoryInfoState {
091    IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
092  };
093  
094  private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
095      .doneSubdirsBeforeSerialTail();
096
097  /**
098   * Maps between a serial number (generated based on jobId) and the timestamp
099   * component(s) to which it belongs. Facilitates jobId based searches. If a
100   * jobId is not found in this list - it will not be found.
101   */
102  private static class SerialNumberIndex {
103    private SortedMap<String, Set<String>> cache;
104    private int maxSize;
105
106    public SerialNumberIndex(int maxSize) {
107      this.cache = new TreeMap<String, Set<String>>();
108      this.maxSize = maxSize;
109    }
110
111    public synchronized void add(String serialPart, String timestampPart) {
112      if (!cache.containsKey(serialPart)) {
113        cache.put(serialPart, new HashSet<String>());
114        if (cache.size() > maxSize) {
115          String key = cache.firstKey();
116          LOG.error("Dropping " + key
117              + " from the SerialNumberIndex. We will no "
118              + "longer be able to see jobs that are in that serial index for "
119              + cache.get(key));
120          cache.remove(key);
121        }
122      }
123      Set<String> datePartSet = cache.get(serialPart);
124      datePartSet.add(timestampPart);
125    }
126
127    public synchronized void remove(String serialPart, String timeStampPart) {
128      if (cache.containsKey(serialPart)) {
129        Set<String> set = cache.get(serialPart);
130        set.remove(timeStampPart);
131        if (set.isEmpty()) {
132          cache.remove(serialPart);
133        }
134      }
135    }
136
137    public synchronized Set<String> get(String serialPart) {
138      Set<String> found = cache.get(serialPart);
139      if (found != null) {
140        return new HashSet<String>(found);
141      }
142      return null;
143    }
144  }
145
146  /**
147   * Wrapper around {@link ConcurrentSkipListMap} that maintains size along
148   * side for O(1) size() implementation for use in JobListCache.
149   *
150   * Note: The size is not updated atomically with changes additions/removals.
151   * This race can lead to size() returning an incorrect size at times.
152   */
153  static class JobIdHistoryFileInfoMap {
154    private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
155    private AtomicInteger mapSize;
156
157    JobIdHistoryFileInfoMap() {
158      cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
159      mapSize = new AtomicInteger();
160    }
161
162    public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) {
163      HistoryFileInfo ret = cache.putIfAbsent(key, value);
164      if (ret == null) {
165        mapSize.incrementAndGet();
166      }
167      return ret;
168    }
169
170    public HistoryFileInfo remove(JobId key) {
171      HistoryFileInfo ret = cache.remove(key);
172      if (ret != null) {
173        mapSize.decrementAndGet();
174      }
175      return ret;
176    }
177
178    /**
179     * Returns the recorded size of the internal map. Note that this could be out
180     * of sync with the actual size of the map
181     * @return "recorded" size
182     */
183    public int size() {
184      return mapSize.get();
185    }
186
187    public HistoryFileInfo get(JobId key) {
188      return cache.get(key);
189    }
190
191    public NavigableSet<JobId> navigableKeySet() {
192      return cache.navigableKeySet();
193    }
194
195    public Collection<HistoryFileInfo> values() {
196      return cache.values();
197    }
198  }
199
200  static class JobListCache {
201    private JobIdHistoryFileInfoMap cache;
202    private int maxSize;
203    private long maxAge;
204
205    public JobListCache(int maxSize, long maxAge) {
206      this.maxSize = maxSize;
207      this.maxAge = maxAge;
208      this.cache = new JobIdHistoryFileInfoMap();
209    }
210
211    public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
212      JobId jobId = fileInfo.getJobId();
213      if (LOG.isDebugEnabled()) {
214        LOG.debug("Adding " + jobId + " to job list cache with "
215            + fileInfo.getJobIndexInfo());
216      }
217      HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo);
218      if (cache.size() > maxSize) {
219        //There is a race here, where more then one thread could be trying to
220        // remove entries.  This could result in too many entries being removed
221        // from the cache.  This is considered OK as the size of the cache
222        // should be rather large, and we would rather have performance over
223        // keeping the cache size exactly at the maximum.
224        Iterator<JobId> keys = cache.navigableKeySet().iterator();
225        long cutoff = System.currentTimeMillis() - maxAge;
226
227        // MAPREDUCE-6436: In order to reduce the number of logs written
228        // in case of a lot of move pending histories.
229        JobId firstInIntermediateKey = null;
230        int inIntermediateCount = 0;
231        JobId firstMoveFailedKey = null;
232        int moveFailedCount = 0;
233
234        while(cache.size() > maxSize && keys.hasNext()) {
235          JobId key = keys.next();
236          HistoryFileInfo firstValue = cache.get(key);
237          if(firstValue != null) {
238            synchronized(firstValue) {
239              if (firstValue.isMovePending()) {
240                if(firstValue.didMoveFail() &&
241                    firstValue.jobIndexInfo.getFinishTime() <= cutoff) {
242                  cache.remove(key);
243                  //Now lets try to delete it
244                  try {
245                    firstValue.delete();
246                  } catch (IOException e) {
247                    LOG.error("Error while trying to delete history files" +
248                                " that could not be moved to done.", e);
249                  }
250                } else {
251                  if (firstValue.didMoveFail()) {
252                    if (moveFailedCount == 0) {
253                      firstMoveFailedKey = key;
254                    }
255                    moveFailedCount += 1;
256                  } else {
257                    if (inIntermediateCount == 0) {
258                      firstInIntermediateKey = key;
259                    }
260                    inIntermediateCount += 1;
261                  }
262                }
263              } else {
264                cache.remove(key);
265              }
266            }
267          }
268        }
269        // Log output only for first jobhisotry in pendings to restrict
270        // the total number of logs.
271        if (inIntermediateCount > 0) {
272          LOG.warn("Waiting to remove IN_INTERMEDIATE state histories " +
273                  "(e.g. " + firstInIntermediateKey + ") from JobListCache " +
274                  "because it is not in done yet. Total count is " +
275                  inIntermediateCount + ".");
276        }
277        if (moveFailedCount > 0) {
278          LOG.warn("Waiting to remove MOVE_FAILED state histories " +
279                  "(e.g. " + firstMoveFailedKey + ") from JobListCache " +
280                  "because it is not in done yet. Total count is " +
281                  moveFailedCount + ".");
282        }
283      }
284      return old;
285    }
286
287    public void delete(HistoryFileInfo fileInfo) {
288      if (LOG.isDebugEnabled()) {
289        LOG.debug("Removing from cache " + fileInfo);
290      }
291      cache.remove(fileInfo.getJobId());
292    }
293
294    public Collection<HistoryFileInfo> values() {
295      return new ArrayList<HistoryFileInfo>(cache.values());
296    }
297
298    public HistoryFileInfo get(JobId jobId) {
299      return cache.get(jobId);
300    }
301
302    public boolean isFull() {
303      return cache.size() >= maxSize;
304    }
305  }
306
307  /**
308   * This class represents a user dir in the intermediate done directory.  This
309   * is mostly for locking purposes. 
310   */
311  private class UserLogDir {
312    long modTime = 0;
313    private long scanTime = 0;
314
315    public synchronized void scanIfNeeded(FileStatus fs) {
316      long newModTime = fs.getModificationTime();
317      // MAPREDUCE-6680: In some Cloud FileSystem, like Azure FS or S3, file's
318      // modification time is truncated into seconds. In that case,
319      // modTime == newModTime doesn't means no file update in the directory,
320      // so we need to have additional check.
321      // Note: modTime (X second Y millisecond) could be casted to X second or
322      // X+1 second.
323      if (modTime != newModTime
324          || (scanTime/1000) == (modTime/1000)
325          || (scanTime/1000 + 1) == (modTime/1000)) {
326        // reset scanTime before scanning happens
327        scanTime = System.currentTimeMillis();
328        Path p = fs.getPath();
329        try {
330          scanIntermediateDirectory(p);
331          //If scanning fails, we will scan again.  We assume the failure is
332          // temporary.
333          modTime = newModTime;
334        } catch (IOException e) {
335          LOG.error("Error while trying to scan the directory " + p, e);
336        }
337      } else {
338        if (LOG.isDebugEnabled()) {
339          LOG.debug("Scan not needed of " + fs.getPath());
340        }
341        // reset scanTime
342        scanTime = System.currentTimeMillis();
343      }
344    }
345  }
346
347  public class HistoryFileInfo {
348    private Path historyFile;
349    private Path confFile;
350    private Path summaryFile;
351    private JobIndexInfo jobIndexInfo;
352    private volatile HistoryInfoState state;
353
354    @VisibleForTesting
355    protected HistoryFileInfo(Path historyFile, Path confFile,
356        Path summaryFile, JobIndexInfo jobIndexInfo, boolean isInDone) {
357      this.historyFile = historyFile;
358      this.confFile = confFile;
359      this.summaryFile = summaryFile;
360      this.jobIndexInfo = jobIndexInfo;
361      state = isInDone ? HistoryInfoState.IN_DONE
362          : HistoryInfoState.IN_INTERMEDIATE;
363    }
364
365    @VisibleForTesting
366    boolean isMovePending() {
367      return state == HistoryInfoState.IN_INTERMEDIATE
368          || state == HistoryInfoState.MOVE_FAILED;
369    }
370
371    @VisibleForTesting
372    boolean didMoveFail() {
373      return state == HistoryInfoState.MOVE_FAILED;
374    }
375
376    /**
377     * @return true if the files backed by this were deleted.
378     */
379    public boolean isDeleted() {
380      return state == HistoryInfoState.DELETED;
381    }
382
383    @Override
384    public String toString() {
385      return "HistoryFileInfo jobID " + getJobId()
386             + " historyFile = " + historyFile;
387    }
388
389    @VisibleForTesting
390    synchronized void moveToDone() throws IOException {
391      if (LOG.isDebugEnabled()) {
392        LOG.debug("moveToDone: " + historyFile);
393      }
394      if (!isMovePending()) {
395        // It was either deleted or is already in done. Either way do nothing
396        if (LOG.isDebugEnabled()) {
397          LOG.debug("Move no longer pending");
398        }
399        return;
400      }
401      try {
402        long completeTime = jobIndexInfo.getFinishTime();
403        if (completeTime == 0) {
404          completeTime = System.currentTimeMillis();
405        }
406        JobId jobId = jobIndexInfo.getJobId();
407
408        List<Path> paths = new ArrayList<Path>(2);
409        if (historyFile == null) {
410          LOG.info("No file for job-history with " + jobId + " found in cache!");
411        } else {
412          paths.add(historyFile);
413        }
414
415        if (confFile == null) {
416          LOG.info("No file for jobConf with " + jobId + " found in cache!");
417        } else {
418          paths.add(confFile);
419        }
420
421        if (summaryFile == null || !intermediateDoneDirFc.util().exists(
422            summaryFile)) {
423          LOG.info("No summary file for job: " + jobId);
424        } else {
425          String jobSummaryString = getJobSummary(intermediateDoneDirFc,
426              summaryFile);
427          SUMMARY_LOG.info(jobSummaryString);
428          LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
429          intermediateDoneDirFc.delete(summaryFile, false);
430          summaryFile = null;
431        }
432
433        Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
434        addDirectoryToSerialNumberIndex(targetDir);
435        makeDoneSubdir(targetDir);
436        if (historyFile != null) {
437          Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
438              .getName()));
439          if (!toPath.equals(historyFile)) {
440            moveToDoneNow(historyFile, toPath);
441            historyFile = toPath;
442          }
443        }
444        if (confFile != null) {
445          Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
446              .getName()));
447          if (!toPath.equals(confFile)) {
448            moveToDoneNow(confFile, toPath);
449            confFile = toPath;
450          }
451        }
452        state = HistoryInfoState.IN_DONE;
453      } catch (Throwable t) {
454        LOG.error("Error while trying to move a job to done", t);
455        this.state = HistoryInfoState.MOVE_FAILED;
456      }
457    }
458
459    /**
460     * Parse a job from the JobHistoryFile, if the underlying file is not going
461     * to be deleted.
462     * 
463     * @return the Job or null if the underlying file was deleted.
464     * @throws IOException
465     *           if there is an error trying to read the file.
466     */
467    public synchronized Job loadJob() throws IOException {
468      return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
469          false, jobIndexInfo.getUser(), this, aclsMgr);
470    }
471
472    /**
473     * Return the history file.
474     * @return the history file.
475     */
476    public synchronized Path getHistoryFile() {
477      return historyFile;
478    }
479    
480    protected synchronized void delete() throws IOException {
481      if (LOG.isDebugEnabled()) {
482        LOG.debug("deleting " + historyFile + " and " + confFile);
483      }
484      state = HistoryInfoState.DELETED;
485      doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
486      doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
487    }
488
489    public JobIndexInfo getJobIndexInfo() {
490      return jobIndexInfo;
491    }
492
493    public JobId getJobId() {
494      return jobIndexInfo.getJobId();
495    }
496
497    public synchronized Path getConfFile() {
498      return confFile;
499    }
500    
501    public synchronized Configuration loadConfFile() throws IOException {
502      FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
503      Configuration jobConf = new Configuration(false);
504      jobConf.addResource(fc.open(confFile), confFile.toString());
505      return jobConf;
506    }
507  }
508
509  private SerialNumberIndex serialNumberIndex = null;
510  protected JobListCache jobListCache = null;
511
512  // Maintains a list of known done subdirectories.
513  private final Set<Path> existingDoneSubdirs = Collections
514      .synchronizedSet(new HashSet<Path>());
515
516  /**
517   * Maintains a mapping between intermediate user directories and the last
518   * known modification time.
519   */
520  private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 
521    new ConcurrentHashMap<String, UserLogDir>();
522
523  private JobACLsManager aclsMgr;
524
525  @VisibleForTesting
526  Configuration conf;
527
528  private String serialNumberFormat;
529
530  private Path doneDirPrefixPath = null; // folder for completed jobs
531  private FileContext doneDirFc; // done Dir FileContext
532
533  private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path
534  private FileContext intermediateDoneDirFc; // Intermediate Done Dir
535                                             // FileContext
536  @VisibleForTesting
537  protected ThreadPoolExecutor moveToDoneExecutor = null;
538  private long maxHistoryAge = 0;
539  
540  public HistoryFileManager() {
541    super(HistoryFileManager.class.getName());
542  }
543
544  @Override
545  protected void serviceInit(Configuration conf) throws Exception {
546    this.conf = conf;
547
548    int serialNumberLowDigits = 3;
549    serialNumberFormat = ("%0"
550        + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits)
551        + "d");
552
553    long maxFSWaitTime = conf.getLong(
554        JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME,
555        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
556    createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime);
557
558    this.aclsMgr = new JobACLsManager(conf);
559
560    maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
561        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
562    
563    jobListCache = createJobListCache();
564
565    serialNumberIndex = new SerialNumberIndex(conf.getInt(
566        JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
567        JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE));
568
569    int numMoveThreads = conf.getInt(
570        JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
571        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
572    moveToDoneExecutor = createMoveToDoneThreadPool(numMoveThreads);
573    super.serviceInit(conf);
574  }
575
576  protected ThreadPoolExecutor createMoveToDoneThreadPool(int numMoveThreads) {
577    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
578        "MoveIntermediateToDone Thread #%d").build();
579    return new HadoopThreadPoolExecutor(numMoveThreads, numMoveThreads,
580        1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
581  }
582
583  @VisibleForTesting
584  void createHistoryDirs(Clock clock, long intervalCheckMillis,
585      long timeOutMillis) throws IOException {
586    long start = clock.getTime();
587    boolean done = false;
588    int counter = 0;
589    while (!done &&
590        ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) {
591      done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec
592      try {
593        Thread.sleep(intervalCheckMillis);
594      } catch (InterruptedException ex) {
595        throw new YarnRuntimeException(ex);
596      }
597    }
598    if (!done) {
599      throw new YarnRuntimeException("Timed out '" + timeOutMillis+
600              "ms' waiting for FileSystem to become available");
601    }
602  }
603
604  /**
605   * Check if the NameNode is still not started yet as indicated by the
606   * exception type and message.
607   * DistributedFileSystem returns a RemoteException with a message stating
608   * SafeModeException in it. So this is only way to check it is because of
609   * being in safe mode. In addition, Name Node may have not started yet, in
610   * which case, the message contains "NameNode still not started".
611   */
612  private boolean isNameNodeStillNotStarted(Exception ex) {
613    String nameNodeNotStartedMsg = NameNode.composeNotStartedMessage(
614        HdfsServerConstants.NamenodeRole.NAMENODE);
615    return ex.toString().contains("SafeModeException") ||
616        (ex instanceof RetriableException && ex.getMessage().contains(
617            nameNodeNotStartedMsg));
618  }
619
620  /**
621   * Returns TRUE if the history dirs were created, FALSE if they could not
622   * be created because the FileSystem is not reachable or in safe mode and
623   * throws and exception otherwise.
624   */
625  @VisibleForTesting
626  boolean tryCreatingHistoryDirs(boolean logWait) throws IOException {
627    boolean succeeded = true;
628    String doneDirPrefix = JobHistoryUtils.
629        getConfiguredHistoryServerDoneDirPrefix(conf);
630    try {
631      doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
632          new Path(doneDirPrefix));
633      doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
634      doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK);
635      mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
636          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
637    } catch (ConnectException ex) {
638      if (logWait) {
639        LOG.info("Waiting for FileSystem at " +
640            doneDirPrefixPath.toUri().getAuthority()  + "to be available");
641      }
642      succeeded = false;
643    } catch (IOException e) {
644      if (isNameNodeStillNotStarted(e)) {
645        succeeded = false;
646        if (logWait) {
647          LOG.info("Waiting for FileSystem at " +
648              doneDirPrefixPath.toUri().getAuthority() +
649              "to be out of safe mode");
650        }
651      } else {
652        throw new YarnRuntimeException("Error creating done directory: ["
653            + doneDirPrefixPath + "]", e);
654      }
655    }
656    if (succeeded) {
657      String intermediateDoneDirPrefix = JobHistoryUtils.
658          getConfiguredHistoryIntermediateDoneDirPrefix(conf);
659      try {
660        intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified(
661            new Path(intermediateDoneDirPrefix));
662        intermediateDoneDirFc = FileContext.getFileContext(
663            intermediateDoneDirPath.toUri(), conf);
664        mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
665            JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
666      } catch (ConnectException ex) {
667        succeeded = false;
668        if (logWait) {
669          LOG.info("Waiting for FileSystem at " +
670              intermediateDoneDirPath.toUri().getAuthority() +
671              "to be available");
672        }
673      } catch (IOException e) {
674        if (isNameNodeStillNotStarted(e)) {
675          succeeded = false;
676          if (logWait) {
677            LOG.info("Waiting for FileSystem at " +
678                intermediateDoneDirPath.toUri().getAuthority() +
679                "to be out of safe mode");
680          }
681        } else {
682          throw new YarnRuntimeException(
683              "Error creating intermediate done directory: ["
684              + intermediateDoneDirPath + "]", e);
685        }
686      }
687    }
688    return succeeded;
689  }
690
691  @Override
692  public void serviceStop() throws Exception {
693    ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor);
694    super.serviceStop();
695  }
696
697  protected JobListCache createJobListCache() {
698    return new JobListCache(conf.getInt(
699        JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
700        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge);
701  }
702
703  private void mkdir(FileContext fc, Path path, FsPermission fsp)
704      throws IOException {
705    if (!fc.util().exists(path)) {
706      try {
707        fc.mkdir(path, fsp, true);
708
709        FileStatus fsStatus = fc.getFileStatus(path);
710        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
711            + ", Expected: " + fsp.toShort());
712        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
713          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
714              + ", " + fsp);
715          fc.setPermission(path, fsp);
716        }
717      } catch (FileAlreadyExistsException e) {
718        LOG.info("Directory: [" + path + "] already exists.");
719      }
720    }
721  }
722
723  protected HistoryFileInfo createHistoryFileInfo(Path historyFile,
724      Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo,
725      boolean isInDone) {
726    return new HistoryFileInfo(
727        historyFile, confFile, summaryFile, jobIndexInfo, isInDone);
728  }
729
730  /**
731   * Populates index data structures. Should only be called at initialization
732   * times.
733   */
734  @SuppressWarnings("unchecked")
735  void initExisting() throws IOException {
736    LOG.info("Initializing Existing Jobs...");
737    List<FileStatus> timestampedDirList = findTimestampedDirectories();
738    // Sort first just so insertion is in a consistent order
739    Collections.sort(timestampedDirList);
740    for (FileStatus fs : timestampedDirList) {
741      // TODO Could verify the correct format for these directories.
742      addDirectoryToSerialNumberIndex(fs.getPath());
743    }
744    for (int i= timestampedDirList.size() - 1;
745        i >= 0 && !jobListCache.isFull(); i--) {
746      FileStatus fs = timestampedDirList.get(i); 
747      addDirectoryToJobListCache(fs.getPath());
748    }
749  }
750
751  private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
752    String serialPart = serialDirPath.getName();
753    String timeStampPart = JobHistoryUtils
754        .getTimestampPartFromPath(serialDirPath.toString());
755    if (timeStampPart == null) {
756      LOG.warn("Could not find timestamp portion from path: "
757          + serialDirPath.toString() + ". Continuing with next");
758      return;
759    }
760    if (serialPart == null) {
761      LOG.warn("Could not find serial portion from path: "
762          + serialDirPath.toString() + ". Continuing with next");
763      return;
764    }
765    serialNumberIndex.remove(serialPart, timeStampPart);
766  }
767
768  private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
769    if (LOG.isDebugEnabled()) {
770      LOG.debug("Adding " + serialDirPath + " to serial index");
771    }
772    String serialPart = serialDirPath.getName();
773    String timestampPart = JobHistoryUtils
774        .getTimestampPartFromPath(serialDirPath.toString());
775    if (timestampPart == null) {
776      LOG.warn("Could not find timestamp portion from path: " + serialDirPath
777          + ". Continuing with next");
778      return;
779    }
780    if (serialPart == null) {
781      LOG.warn("Could not find serial portion from path: "
782          + serialDirPath.toString() + ". Continuing with next");
783    } else {
784      serialNumberIndex.add(serialPart, timestampPart);
785    }
786  }
787
788  private void addDirectoryToJobListCache(Path path) throws IOException {
789    if (LOG.isDebugEnabled()) {
790      LOG.debug("Adding " + path + " to job list cache.");
791    }
792    List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
793        doneDirFc);
794    for (FileStatus fs : historyFileList) {
795      if (LOG.isDebugEnabled()) {
796        LOG.debug("Adding in history for " + fs.getPath());
797      }
798      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
799          .getName());
800      String confFileName = JobHistoryUtils
801          .getIntermediateConfFileName(jobIndexInfo.getJobId());
802      String summaryFileName = JobHistoryUtils
803          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
804      HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs
805          .getPath().getParent(), confFileName), new Path(fs.getPath()
806          .getParent(), summaryFileName), jobIndexInfo, true);
807      jobListCache.addIfAbsent(fileInfo);
808    }
809  }
810
811  @VisibleForTesting
812  protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
813      PathFilter pathFilter) throws IOException {
814    path = fc.makeQualified(path);
815    List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
816    try {
817      RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
818      while (fileStatusIter.hasNext()) {
819        FileStatus fileStatus = fileStatusIter.next();
820        Path filePath = fileStatus.getPath();
821        if (fileStatus.isFile() && pathFilter.accept(filePath)) {
822          jhStatusList.add(fileStatus);
823        }
824      }
825    } catch (FileNotFoundException fe) {
826      LOG.error("Error while scanning directory " + path, fe);
827    }
828    return jhStatusList;
829  }
830
831  protected List<FileStatus> scanDirectoryForHistoryFiles(Path path,
832      FileContext fc) throws IOException {
833    return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
834  }
835  
836  /**
837   * Finds all history directories with a timestamp component by scanning the
838   * filesystem. Used when the JobHistory server is started.
839   * 
840   * @return list of history directories
841   */
842  protected List<FileStatus> findTimestampedDirectories() throws IOException {
843    List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
844        doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
845    return fsList;
846  }
847
848  /**
849   * Scans the intermediate directory to find user directories. Scans these for
850   * history files if the modification time for the directory has changed. Once
851   * it finds history files it starts the process of moving them to the done 
852   * directory.
853   * 
854   * @throws IOException
855   *           if there was a error while scanning
856   */
857  void scanIntermediateDirectory() throws IOException {
858    // TODO it would be great to limit how often this happens, except in the
859    // case where we are looking for a particular job.
860    List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
861        intermediateDoneDirFc, intermediateDoneDirPath, "");
862    LOG.debug("Scanning intermediate dirs");
863    for (FileStatus userDir : userDirList) {
864      String name = userDir.getPath().getName();
865      UserLogDir dir = userDirModificationTimeMap.get(name);
866      if(dir == null) {
867        dir = new UserLogDir();
868        UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
869        if(old != null) {
870          dir = old;
871        }
872      }
873      dir.scanIfNeeded(userDir);
874    }
875  }
876
877  /**
878   * Scans the specified path and populates the intermediate cache.
879   * 
880   * @param absPath
881   * @throws IOException
882   */
883  private void scanIntermediateDirectory(final Path absPath) throws IOException {
884    if (LOG.isDebugEnabled()) {
885      LOG.debug("Scanning intermediate dir " + absPath);
886    }
887    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
888        intermediateDoneDirFc);
889    if (LOG.isDebugEnabled()) {
890      LOG.debug("Found " + fileStatusList.size() + " files");
891    }
892    for (FileStatus fs : fileStatusList) {
893      if (LOG.isDebugEnabled()) {
894        LOG.debug("scanning file: "+ fs.getPath());
895      }
896      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
897          .getName());
898      String confFileName = JobHistoryUtils
899          .getIntermediateConfFileName(jobIndexInfo.getJobId());
900      String summaryFileName = JobHistoryUtils
901          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
902      HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs
903          .getPath().getParent(), confFileName), new Path(fs.getPath()
904          .getParent(), summaryFileName), jobIndexInfo, false);
905
906      final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
907      if (old == null || old.didMoveFail()) {
908        final HistoryFileInfo found = (old == null) ? fileInfo : old;
909        long cutoff = System.currentTimeMillis() - maxHistoryAge;
910        if(found.getJobIndexInfo().getFinishTime() <= cutoff) {
911          try {
912            found.delete();
913          } catch (IOException e) {
914            LOG.warn("Error cleaning up a HistoryFile that is out of date.", e);
915          }
916        } else {
917          if (LOG.isDebugEnabled()) {
918            LOG.debug("Scheduling move to done of " +found);
919          }
920          moveToDoneExecutor.execute(new Runnable() {
921            @Override
922            public void run() {
923              try {
924                found.moveToDone();
925              } catch (IOException e) {
926                LOG.info("Failed to process fileInfo for job: " + 
927                    found.getJobId(), e);
928              }
929            }
930          });
931        }
932      } else if (!old.isMovePending()) {
933        //This is a duplicate so just delete it
934        if (LOG.isDebugEnabled()) {
935          LOG.debug("Duplicate: deleting");
936        }
937        fileInfo.delete();
938      }
939    }
940  }
941
942  /**
943   * Searches the job history file FileStatus list for the specified JobId.
944   * 
945   * @param fileStatusList
946   *          fileStatus list of Job History Files.
947   * @param jobId
948   *          The JobId to find.
949   * @return A FileInfo object for the jobId, null if not found.
950   * @throws IOException
951   */
952  private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
953      JobId jobId) throws IOException {
954    for (FileStatus fs : fileStatusList) {
955      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
956          .getName());
957      if (jobIndexInfo.getJobId().equals(jobId)) {
958        String confFileName = JobHistoryUtils
959            .getIntermediateConfFileName(jobIndexInfo.getJobId());
960        String summaryFileName = JobHistoryUtils
961            .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
962        HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(
963            fs.getPath().getParent(), confFileName), new Path(fs.getPath()
964            .getParent(), summaryFileName), jobIndexInfo, true);
965        return fileInfo;
966      }
967    }
968    return null;
969  }
970
971  /**
972   * Scans old directories known by the idToDateString map for the specified
973   * jobId. If the number of directories is higher than the supported size of
974   * the idToDateString cache, the jobId will not be found.
975   * 
976   * @param jobId
977   *          the jobId.
978   * @return
979   * @throws IOException
980   */
981  private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
982    String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
983        jobId, serialNumberFormat);
984    Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
985    if (dateStringSet == null) {
986      return null;
987    }
988    for (String timestampPart : dateStringSet) {
989      Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
990      List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
991          doneDirFc);
992      HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
993      if (fileInfo != null) {
994        return fileInfo;
995      }
996    }
997    return null;
998  }
999
1000  public Collection<HistoryFileInfo> getAllFileInfo() throws IOException {
1001    scanIntermediateDirectory();
1002    return jobListCache.values();
1003  }
1004
1005  public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
1006    // FileInfo available in cache.
1007    HistoryFileInfo fileInfo = jobListCache.get(jobId);
1008    if (fileInfo != null) {
1009      return fileInfo;
1010    }
1011    // OK so scan the intermediate to be sure we did not lose it that way
1012    scanIntermediateDirectory();
1013    fileInfo = jobListCache.get(jobId);
1014    if (fileInfo != null) {
1015      return fileInfo;
1016    }
1017
1018    // Intermediate directory does not contain job. Search through older ones.
1019    fileInfo = scanOldDirsForJob(jobId);
1020    if (fileInfo != null) {
1021      return fileInfo;
1022    }
1023    return null;
1024  }
1025
1026  private void moveToDoneNow(final Path src, final Path target)
1027      throws IOException {
1028    LOG.info("Moving " + src.toString() + " to " + target.toString());
1029    intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
1030  }
1031
1032  private String getJobSummary(FileContext fc, Path path) throws IOException {
1033    Path qPath = fc.makeQualified(path);
1034    FSDataInputStream in = null;
1035    String jobSummaryString = null;
1036    try {
1037      in = fc.open(qPath);
1038      jobSummaryString = in.readUTF();
1039    } finally {
1040      if (in != null) {
1041        in.close();
1042      }
1043    }
1044    return jobSummaryString;
1045  }
1046
1047  private void makeDoneSubdir(Path path) throws IOException {
1048    try {
1049      doneDirFc.getFileStatus(path);
1050      existingDoneSubdirs.add(path);
1051    } catch (FileNotFoundException fnfE) {
1052      try {
1053        FsPermission fsp = new FsPermission(
1054            JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
1055        doneDirFc.mkdir(path, fsp, true);
1056        FileStatus fsStatus = doneDirFc.getFileStatus(path);
1057        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
1058            + ", Expected: " + fsp.toShort());
1059        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
1060          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
1061              + ", " + fsp);
1062          doneDirFc.setPermission(path, fsp);
1063        }
1064        existingDoneSubdirs.add(path);
1065      } catch (FileAlreadyExistsException faeE) { // Nothing to do.
1066      }
1067    }
1068  }
1069
1070  private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
1071    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
1072        id, timestampComponent, serialNumberFormat));
1073  }
1074
1075  private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
1076    String timestampComponent = JobHistoryUtils
1077        .timestampDirectoryComponent(millisecondTime);
1078    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
1079        id, timestampComponent, serialNumberFormat));
1080  }
1081
1082  private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
1083    if (finishTime == 0) {
1084      return fileStatus.getModificationTime();
1085    }
1086    return finishTime;
1087  }
1088
1089  private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException {
1090    jobListCache.delete(fileInfo);
1091    fileInfo.delete();
1092  }
1093
1094  List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException {
1095      return JobHistoryUtils.
1096        getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff);
1097  }
1098
1099  /**
1100   * Clean up older history files.
1101   * 
1102   * @throws IOException
1103   *           on any error trying to remove the entries.
1104   */
1105  @SuppressWarnings("unchecked")
1106  void clean() throws IOException {
1107    long cutoff = System.currentTimeMillis() - maxHistoryAge;
1108    boolean halted = false;
1109    List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff);
1110    // Sort in ascending order. Relies on YYYY/MM/DD/Serial
1111    Collections.sort(serialDirList);
1112    for (FileStatus serialDir : serialDirList) {
1113      List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(
1114          serialDir.getPath(), doneDirFc);
1115      for (FileStatus historyFile : historyFileList) {
1116        JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile
1117            .getPath().getName());
1118        long effectiveTimestamp = getEffectiveTimestamp(
1119            jobIndexInfo.getFinishTime(), historyFile);
1120        if (effectiveTimestamp <= cutoff) {
1121          HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
1122              .getJobId());
1123          if (fileInfo == null) {
1124            String confFileName = JobHistoryUtils
1125                .getIntermediateConfFileName(jobIndexInfo.getJobId());
1126
1127            fileInfo = createHistoryFileInfo(historyFile.getPath(), new Path(
1128                historyFile.getPath().getParent(), confFileName), null,
1129                jobIndexInfo, true);
1130          }
1131          deleteJobFromDone(fileInfo);
1132        } else {
1133          halted = true;
1134          break;
1135        }
1136      }
1137      if (!halted) {
1138        deleteDir(serialDir);
1139        removeDirectoryFromSerialNumberIndex(serialDir.getPath());
1140        existingDoneSubdirs.remove(serialDir.getPath());
1141      } else {
1142        break; // Don't scan any more directories.
1143      }
1144    }
1145  }
1146  
1147  protected boolean deleteDir(FileStatus serialDir)
1148      throws AccessControlException, FileNotFoundException,
1149      UnsupportedFileSystemException, IOException {
1150    return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
1151  }
1152
1153  // for test
1154  @VisibleForTesting
1155  void setMaxHistoryAge(long newValue){
1156    maxHistoryAge=newValue;
1157  } 
1158}