001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.v2.hs;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.ConnectException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.NavigableSet;
031import java.util.Set;
032import java.util.SortedMap;
033import java.util.TreeMap;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ConcurrentMap;
036import java.util.concurrent.ConcurrentSkipListMap;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.concurrent.ThreadFactory;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicInteger;
042
043import org.apache.commons.logging.Log;
044import org.apache.commons.logging.LogFactory;
045import org.apache.hadoop.classification.InterfaceAudience;
046import org.apache.hadoop.classification.InterfaceStability;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FSDataInputStream;
049import org.apache.hadoop.fs.FileAlreadyExistsException;
050import org.apache.hadoop.fs.FileContext;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.Options;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.fs.PathFilter;
055import org.apache.hadoop.fs.RemoteIterator;
056import org.apache.hadoop.fs.UnsupportedFileSystemException;
057import org.apache.hadoop.fs.permission.FsPermission;
058import org.apache.hadoop.mapred.JobACLsManager;
059import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
060import org.apache.hadoop.mapreduce.v2.api.records.JobId;
061import org.apache.hadoop.mapreduce.v2.app.job.Job;
062import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
063import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
064import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
065import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
066import org.apache.hadoop.security.AccessControlException;
067import org.apache.hadoop.service.AbstractService;
068import org.apache.hadoop.util.ShutdownThreadsHelper;
069import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
070import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
071
072import com.google.common.annotations.VisibleForTesting;
073import com.google.common.util.concurrent.ThreadFactoryBuilder;
074import org.apache.hadoop.yarn.util.Clock;
075import org.apache.hadoop.yarn.util.SystemClock;
076
077/**
078 * This class provides a way to interact with history files in a thread safe
079 * manor.
080 */
081@InterfaceAudience.Public
082@InterfaceStability.Unstable
083public class HistoryFileManager extends AbstractService {
084  private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
085  private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
086
087  private static enum HistoryInfoState {
088    IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
089  };
090  
091  private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
092      .doneSubdirsBeforeSerialTail();
093
094  /**
095   * Maps between a serial number (generated based on jobId) and the timestamp
096   * component(s) to which it belongs. Facilitates jobId based searches. If a
097   * jobId is not found in this list - it will not be found.
098   */
099  private static class SerialNumberIndex {
100    private SortedMap<String, Set<String>> cache;
101    private int maxSize;
102
103    public SerialNumberIndex(int maxSize) {
104      this.cache = new TreeMap<String, Set<String>>();
105      this.maxSize = maxSize;
106    }
107
108    public synchronized void add(String serialPart, String timestampPart) {
109      if (!cache.containsKey(serialPart)) {
110        cache.put(serialPart, new HashSet<String>());
111        if (cache.size() > maxSize) {
112          String key = cache.firstKey();
113          LOG.error("Dropping " + key
114              + " from the SerialNumberIndex. We will no "
115              + "longer be able to see jobs that are in that serial index for "
116              + cache.get(key));
117          cache.remove(key);
118        }
119      }
120      Set<String> datePartSet = cache.get(serialPart);
121      datePartSet.add(timestampPart);
122    }
123
124    public synchronized void remove(String serialPart, String timeStampPart) {
125      if (cache.containsKey(serialPart)) {
126        Set<String> set = cache.get(serialPart);
127        set.remove(timeStampPart);
128        if (set.isEmpty()) {
129          cache.remove(serialPart);
130        }
131      }
132    }
133
134    public synchronized Set<String> get(String serialPart) {
135      Set<String> found = cache.get(serialPart);
136      if (found != null) {
137        return new HashSet<String>(found);
138      }
139      return null;
140    }
141  }
142
143  /**
144   * Wrapper around {@link ConcurrentSkipListMap} that maintains size along
145   * side for O(1) size() implementation for use in JobListCache.
146   *
147   * Note: The size is not updated atomically with changes additions/removals.
148   * This race can lead to size() returning an incorrect size at times.
149   */
150  static class JobIdHistoryFileInfoMap {
151    private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
152    private AtomicInteger mapSize;
153
154    JobIdHistoryFileInfoMap() {
155      cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
156      mapSize = new AtomicInteger();
157    }
158
159    public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) {
160      HistoryFileInfo ret = cache.putIfAbsent(key, value);
161      if (ret == null) {
162        mapSize.incrementAndGet();
163      }
164      return ret;
165    }
166
167    public HistoryFileInfo remove(JobId key) {
168      HistoryFileInfo ret = cache.remove(key);
169      if (ret != null) {
170        mapSize.decrementAndGet();
171      }
172      return ret;
173    }
174
175    /**
176     * Returns the recorded size of the internal map. Note that this could be out
177     * of sync with the actual size of the map
178     * @return "recorded" size
179     */
180    public int size() {
181      return mapSize.get();
182    }
183
184    public HistoryFileInfo get(JobId key) {
185      return cache.get(key);
186    }
187
188    public NavigableSet<JobId> navigableKeySet() {
189      return cache.navigableKeySet();
190    }
191
192    public Collection<HistoryFileInfo> values() {
193      return cache.values();
194    }
195  }
196
197  static class JobListCache {
198    private JobIdHistoryFileInfoMap cache;
199    private int maxSize;
200    private long maxAge;
201
202    public JobListCache(int maxSize, long maxAge) {
203      this.maxSize = maxSize;
204      this.maxAge = maxAge;
205      this.cache = new JobIdHistoryFileInfoMap();
206    }
207
208    public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
209      JobId jobId = fileInfo.getJobId();
210      if (LOG.isDebugEnabled()) {
211        LOG.debug("Adding " + jobId + " to job list cache with "
212            + fileInfo.getJobIndexInfo());
213      }
214      HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo);
215      if (cache.size() > maxSize) {
216        //There is a race here, where more then one thread could be trying to
217        // remove entries.  This could result in too many entries being removed
218        // from the cache.  This is considered OK as the size of the cache
219        // should be rather large, and we would rather have performance over
220        // keeping the cache size exactly at the maximum.
221        Iterator<JobId> keys = cache.navigableKeySet().iterator();
222        long cutoff = System.currentTimeMillis() - maxAge;
223
224        // MAPREDUCE-6436: In order to reduce the number of logs written
225        // in case of a lot of move pending histories.
226        JobId firstInIntermediateKey = null;
227        int inIntermediateCount = 0;
228        JobId firstMoveFailedKey = null;
229        int moveFailedCount = 0;
230
231        while(cache.size() > maxSize && keys.hasNext()) {
232          JobId key = keys.next();
233          HistoryFileInfo firstValue = cache.get(key);
234          if(firstValue != null) {
235            synchronized(firstValue) {
236              if (firstValue.isMovePending()) {
237                if(firstValue.didMoveFail() &&
238                    firstValue.jobIndexInfo.getFinishTime() <= cutoff) {
239                  cache.remove(key);
240                  //Now lets try to delete it
241                  try {
242                    firstValue.delete();
243                  } catch (IOException e) {
244                    LOG.error("Error while trying to delete history files" +
245                                " that could not be moved to done.", e);
246                  }
247                } else {
248                  if (firstValue.didMoveFail()) {
249                    if (moveFailedCount == 0) {
250                      firstMoveFailedKey = key;
251                    }
252                    moveFailedCount += 1;
253                  } else {
254                    if (inIntermediateCount == 0) {
255                      firstInIntermediateKey = key;
256                    }
257                    inIntermediateCount += 1;
258                  }
259                }
260              } else {
261                cache.remove(key);
262              }
263            }
264          }
265        }
266        // Log output only for first jobhisotry in pendings to restrict
267        // the total number of logs.
268        if (inIntermediateCount > 0) {
269          LOG.warn("Waiting to remove IN_INTERMEDIATE state histories " +
270                  "(e.g. " + firstInIntermediateKey + ") from JobListCache " +
271                  "because it is not in done yet. Total count is " +
272                  inIntermediateCount + ".");
273        }
274        if (moveFailedCount > 0) {
275          LOG.warn("Waiting to remove MOVE_FAILED state histories " +
276                  "(e.g. " + firstMoveFailedKey + ") from JobListCache " +
277                  "because it is not in done yet. Total count is " +
278                  moveFailedCount + ".");
279        }
280      }
281      return old;
282    }
283
284    public void delete(HistoryFileInfo fileInfo) {
285      if (LOG.isDebugEnabled()) {
286        LOG.debug("Removing from cache " + fileInfo);
287      }
288      cache.remove(fileInfo.getJobId());
289    }
290
291    public Collection<HistoryFileInfo> values() {
292      return new ArrayList<HistoryFileInfo>(cache.values());
293    }
294
295    public HistoryFileInfo get(JobId jobId) {
296      return cache.get(jobId);
297    }
298
299    public boolean isFull() {
300      return cache.size() >= maxSize;
301    }
302  }
303
304  /**
305   * This class represents a user dir in the intermediate done directory.  This
306   * is mostly for locking purposes. 
307   */
308  private class UserLogDir {
309    long modTime = 0;
310    
311    public synchronized void scanIfNeeded(FileStatus fs) {
312      long newModTime = fs.getModificationTime();
313      if (modTime != newModTime) {
314        Path p = fs.getPath();
315        try {
316          scanIntermediateDirectory(p);
317          //If scanning fails, we will scan again.  We assume the failure is
318          // temporary.
319          modTime = newModTime;
320        } catch (IOException e) {
321          LOG.error("Error while trying to scan the directory " + p, e);
322        }
323      } else {
324        if (LOG.isDebugEnabled()) {
325          LOG.debug("Scan not needed of " + fs.getPath());
326        }
327      }
328    }
329  }
330  
331  public class HistoryFileInfo {
332    private Path historyFile;
333    private Path confFile;
334    private Path summaryFile;
335    private JobIndexInfo jobIndexInfo;
336    private HistoryInfoState state;
337
338    @VisibleForTesting
339    protected HistoryFileInfo(Path historyFile, Path confFile,
340        Path summaryFile, JobIndexInfo jobIndexInfo, boolean isInDone) {
341      this.historyFile = historyFile;
342      this.confFile = confFile;
343      this.summaryFile = summaryFile;
344      this.jobIndexInfo = jobIndexInfo;
345      state = isInDone ? HistoryInfoState.IN_DONE
346          : HistoryInfoState.IN_INTERMEDIATE;
347    }
348
349    @VisibleForTesting
350    synchronized boolean isMovePending() {
351      return state == HistoryInfoState.IN_INTERMEDIATE
352          || state == HistoryInfoState.MOVE_FAILED;
353    }
354
355    @VisibleForTesting
356    synchronized boolean didMoveFail() {
357      return state == HistoryInfoState.MOVE_FAILED;
358    }
359    
360    /**
361     * @return true if the files backed by this were deleted.
362     */
363    public synchronized boolean isDeleted() {
364      return state == HistoryInfoState.DELETED;
365    }
366
367    @Override
368    public String toString() {
369      return "HistoryFileInfo jobID " + getJobId()
370             + " historyFile = " + historyFile;
371    }
372
373    @VisibleForTesting
374    synchronized void moveToDone() throws IOException {
375      if (LOG.isDebugEnabled()) {
376        LOG.debug("moveToDone: " + historyFile);
377      }
378      if (!isMovePending()) {
379        // It was either deleted or is already in done. Either way do nothing
380        if (LOG.isDebugEnabled()) {
381          LOG.debug("Move no longer pending");
382        }
383        return;
384      }
385      try {
386        long completeTime = jobIndexInfo.getFinishTime();
387        if (completeTime == 0) {
388          completeTime = System.currentTimeMillis();
389        }
390        JobId jobId = jobIndexInfo.getJobId();
391
392        List<Path> paths = new ArrayList<Path>(2);
393        if (historyFile == null) {
394          LOG.info("No file for job-history with " + jobId + " found in cache!");
395        } else {
396          paths.add(historyFile);
397        }
398
399        if (confFile == null) {
400          LOG.info("No file for jobConf with " + jobId + " found in cache!");
401        } else {
402          paths.add(confFile);
403        }
404
405        if (summaryFile == null || !intermediateDoneDirFc.util().exists(
406            summaryFile)) {
407          LOG.info("No summary file for job: " + jobId);
408        } else {
409          String jobSummaryString = getJobSummary(intermediateDoneDirFc,
410              summaryFile);
411          SUMMARY_LOG.info(jobSummaryString);
412          LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
413          intermediateDoneDirFc.delete(summaryFile, false);
414          summaryFile = null;
415        }
416
417        Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
418        addDirectoryToSerialNumberIndex(targetDir);
419        makeDoneSubdir(targetDir);
420        if (historyFile != null) {
421          Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
422              .getName()));
423          if (!toPath.equals(historyFile)) {
424            moveToDoneNow(historyFile, toPath);
425            historyFile = toPath;
426          }
427        }
428        if (confFile != null) {
429          Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
430              .getName()));
431          if (!toPath.equals(confFile)) {
432            moveToDoneNow(confFile, toPath);
433            confFile = toPath;
434          }
435        }
436        state = HistoryInfoState.IN_DONE;
437      } catch (Throwable t) {
438        LOG.error("Error while trying to move a job to done", t);
439        this.state = HistoryInfoState.MOVE_FAILED;
440      }
441    }
442
443    /**
444     * Parse a job from the JobHistoryFile, if the underlying file is not going
445     * to be deleted.
446     * 
447     * @return the Job or null if the underlying file was deleted.
448     * @throws IOException
449     *           if there is an error trying to read the file.
450     */
451    public synchronized Job loadJob() throws IOException {
452      return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
453          false, jobIndexInfo.getUser(), this, aclsMgr);
454    }
455
456    /**
457     * Return the history file.
458     * @return the history file.
459     */
460    public synchronized Path getHistoryFile() {
461      return historyFile;
462    }
463    
464    protected synchronized void delete() throws IOException {
465      if (LOG.isDebugEnabled()) {
466        LOG.debug("deleting " + historyFile + " and " + confFile);
467      }
468      state = HistoryInfoState.DELETED;
469      doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
470      doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
471    }
472
473    public JobIndexInfo getJobIndexInfo() {
474      return jobIndexInfo;
475    }
476
477    public JobId getJobId() {
478      return jobIndexInfo.getJobId();
479    }
480
481    public synchronized Path getConfFile() {
482      return confFile;
483    }
484    
485    public synchronized Configuration loadConfFile() throws IOException {
486      FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
487      Configuration jobConf = new Configuration(false);
488      jobConf.addResource(fc.open(confFile), confFile.toString());
489      return jobConf;
490    }
491  }
492
493  private SerialNumberIndex serialNumberIndex = null;
494  protected JobListCache jobListCache = null;
495
496  // Maintains a list of known done subdirectories.
497  private final Set<Path> existingDoneSubdirs = Collections
498      .synchronizedSet(new HashSet<Path>());
499
500  /**
501   * Maintains a mapping between intermediate user directories and the last
502   * known modification time.
503   */
504  private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 
505    new ConcurrentHashMap<String, UserLogDir>();
506
507  private JobACLsManager aclsMgr;
508
509  @VisibleForTesting
510  Configuration conf;
511
512  private String serialNumberFormat;
513
514  private Path doneDirPrefixPath = null; // folder for completed jobs
515  private FileContext doneDirFc; // done Dir FileContext
516
517  private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path
518  private FileContext intermediateDoneDirFc; // Intermediate Done Dir
519                                             // FileContext
520  @VisibleForTesting
521  protected ThreadPoolExecutor moveToDoneExecutor = null;
522  private long maxHistoryAge = 0;
523  
524  public HistoryFileManager() {
525    super(HistoryFileManager.class.getName());
526  }
527
528  @Override
529  protected void serviceInit(Configuration conf) throws Exception {
530    this.conf = conf;
531
532    int serialNumberLowDigits = 3;
533    serialNumberFormat = ("%0"
534        + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits)
535        + "d");
536
537    long maxFSWaitTime = conf.getLong(
538        JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME,
539        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
540    createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime);
541
542    this.aclsMgr = new JobACLsManager(conf);
543
544    maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
545        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
546    
547    jobListCache = createJobListCache();
548
549    serialNumberIndex = new SerialNumberIndex(conf.getInt(
550        JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
551        JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE));
552
553    int numMoveThreads = conf.getInt(
554        JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
555        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
556    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
557        "MoveIntermediateToDone Thread #%d").build();
558    moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads,
559        numMoveThreads, 1, TimeUnit.HOURS,
560        new LinkedBlockingQueue<Runnable>(), tf);
561
562    super.serviceInit(conf);
563  }
564
565  @VisibleForTesting
566  void createHistoryDirs(Clock clock, long intervalCheckMillis,
567      long timeOutMillis) throws IOException {
568    long start = clock.getTime();
569    boolean done = false;
570    int counter = 0;
571    while (!done &&
572        ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) {
573      done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec
574      try {
575        Thread.sleep(intervalCheckMillis);
576      } catch (InterruptedException ex) {
577        throw new YarnRuntimeException(ex);
578      }
579    }
580    if (!done) {
581      throw new YarnRuntimeException("Timed out '" + timeOutMillis+
582              "ms' waiting for FileSystem to become available");
583    }
584  }
585
586  /**
587   * DistributedFileSystem returns a RemoteException with a message stating
588   * SafeModeException in it. So this is only way to check it is because of
589   * being in safe mode.
590   */
591  private boolean isBecauseSafeMode(Throwable ex) {
592    return ex.toString().contains("SafeModeException");
593  }
594
595  /**
596   * Returns TRUE if the history dirs were created, FALSE if they could not
597   * be created because the FileSystem is not reachable or in safe mode and
598   * throws and exception otherwise.
599   */
600  @VisibleForTesting
601  boolean tryCreatingHistoryDirs(boolean logWait) throws IOException {
602    boolean succeeded = true;
603    String doneDirPrefix = JobHistoryUtils.
604        getConfiguredHistoryServerDoneDirPrefix(conf);
605    try {
606      doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
607          new Path(doneDirPrefix));
608      doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
609      doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK);
610      mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
611          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
612    } catch (ConnectException ex) {
613      if (logWait) {
614        LOG.info("Waiting for FileSystem at " +
615            doneDirPrefixPath.toUri().getAuthority()  + "to be available");
616      }
617      succeeded = false;
618    } catch (IOException e) {
619      if (isBecauseSafeMode(e)) {
620        succeeded = false;
621        if (logWait) {
622          LOG.info("Waiting for FileSystem at " +
623              doneDirPrefixPath.toUri().getAuthority() +
624              "to be out of safe mode");
625        }
626      } else {
627        throw new YarnRuntimeException("Error creating done directory: ["
628            + doneDirPrefixPath + "]", e);
629      }
630    }
631    if (succeeded) {
632      String intermediateDoneDirPrefix = JobHistoryUtils.
633          getConfiguredHistoryIntermediateDoneDirPrefix(conf);
634      try {
635        intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified(
636            new Path(intermediateDoneDirPrefix));
637        intermediateDoneDirFc = FileContext.getFileContext(
638            intermediateDoneDirPath.toUri(), conf);
639        mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
640            JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
641      } catch (ConnectException ex) {
642        succeeded = false;
643        if (logWait) {
644          LOG.info("Waiting for FileSystem at " +
645              intermediateDoneDirPath.toUri().getAuthority() +
646              "to be available");
647        }
648      } catch (IOException e) {
649        if (isBecauseSafeMode(e)) {
650          succeeded = false;
651          if (logWait) {
652            LOG.info("Waiting for FileSystem at " +
653                intermediateDoneDirPath.toUri().getAuthority() +
654                "to be out of safe mode");
655          }
656        } else {
657          throw new YarnRuntimeException(
658              "Error creating intermediate done directory: ["
659              + intermediateDoneDirPath + "]", e);
660        }
661      }
662    }
663    return succeeded;
664  }
665
666  @Override
667  public void serviceStop() throws Exception {
668    ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor);
669    super.serviceStop();
670  }
671
672  protected JobListCache createJobListCache() {
673    return new JobListCache(conf.getInt(
674        JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
675        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge);
676  }
677
678  private void mkdir(FileContext fc, Path path, FsPermission fsp)
679      throws IOException {
680    if (!fc.util().exists(path)) {
681      try {
682        fc.mkdir(path, fsp, true);
683
684        FileStatus fsStatus = fc.getFileStatus(path);
685        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
686            + ", Expected: " + fsp.toShort());
687        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
688          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
689              + ", " + fsp);
690          fc.setPermission(path, fsp);
691        }
692      } catch (FileAlreadyExistsException e) {
693        LOG.info("Directory: [" + path + "] already exists.");
694      }
695    }
696  }
697
698  /**
699   * Populates index data structures. Should only be called at initialization
700   * times.
701   */
702  @SuppressWarnings("unchecked")
703  void initExisting() throws IOException {
704    LOG.info("Initializing Existing Jobs...");
705    List<FileStatus> timestampedDirList = findTimestampedDirectories();
706    // Sort first just so insertion is in a consistent order
707    Collections.sort(timestampedDirList);
708    for (FileStatus fs : timestampedDirList) {
709      // TODO Could verify the correct format for these directories.
710      addDirectoryToSerialNumberIndex(fs.getPath());
711    }
712    for (int i= timestampedDirList.size() - 1;
713        i >= 0 && !jobListCache.isFull(); i--) {
714      FileStatus fs = timestampedDirList.get(i); 
715      addDirectoryToJobListCache(fs.getPath());
716    }
717  }
718
719  private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
720    String serialPart = serialDirPath.getName();
721    String timeStampPart = JobHistoryUtils
722        .getTimestampPartFromPath(serialDirPath.toString());
723    if (timeStampPart == null) {
724      LOG.warn("Could not find timestamp portion from path: "
725          + serialDirPath.toString() + ". Continuing with next");
726      return;
727    }
728    if (serialPart == null) {
729      LOG.warn("Could not find serial portion from path: "
730          + serialDirPath.toString() + ". Continuing with next");
731      return;
732    }
733    serialNumberIndex.remove(serialPart, timeStampPart);
734  }
735
736  private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
737    if (LOG.isDebugEnabled()) {
738      LOG.debug("Adding " + serialDirPath + " to serial index");
739    }
740    String serialPart = serialDirPath.getName();
741    String timestampPart = JobHistoryUtils
742        .getTimestampPartFromPath(serialDirPath.toString());
743    if (timestampPart == null) {
744      LOG.warn("Could not find timestamp portion from path: " + serialDirPath
745          + ". Continuing with next");
746      return;
747    }
748    if (serialPart == null) {
749      LOG.warn("Could not find serial portion from path: "
750          + serialDirPath.toString() + ". Continuing with next");
751    } else {
752      serialNumberIndex.add(serialPart, timestampPart);
753    }
754  }
755
756  private void addDirectoryToJobListCache(Path path) throws IOException {
757    if (LOG.isDebugEnabled()) {
758      LOG.debug("Adding " + path + " to job list cache.");
759    }
760    List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
761        doneDirFc);
762    for (FileStatus fs : historyFileList) {
763      if (LOG.isDebugEnabled()) {
764        LOG.debug("Adding in history for " + fs.getPath());
765      }
766      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
767          .getName());
768      String confFileName = JobHistoryUtils
769          .getIntermediateConfFileName(jobIndexInfo.getJobId());
770      String summaryFileName = JobHistoryUtils
771          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
772      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
773          .getPath().getParent(), confFileName), new Path(fs.getPath()
774          .getParent(), summaryFileName), jobIndexInfo, true);
775      jobListCache.addIfAbsent(fileInfo);
776    }
777  }
778
779  @VisibleForTesting
780  protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
781      PathFilter pathFilter) throws IOException {
782    path = fc.makeQualified(path);
783    List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
784    try {
785      RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
786      while (fileStatusIter.hasNext()) {
787        FileStatus fileStatus = fileStatusIter.next();
788        Path filePath = fileStatus.getPath();
789        if (fileStatus.isFile() && pathFilter.accept(filePath)) {
790          jhStatusList.add(fileStatus);
791        }
792      }
793    } catch (FileNotFoundException fe) {
794      LOG.error("Error while scanning directory " + path, fe);
795    }
796    return jhStatusList;
797  }
798
799  protected List<FileStatus> scanDirectoryForHistoryFiles(Path path,
800      FileContext fc) throws IOException {
801    return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
802  }
803  
804  /**
805   * Finds all history directories with a timestamp component by scanning the
806   * filesystem. Used when the JobHistory server is started.
807   * 
808   * @return list of history directories
809   */
810  protected List<FileStatus> findTimestampedDirectories() throws IOException {
811    List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
812        doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
813    return fsList;
814  }
815
816  /**
817   * Scans the intermediate directory to find user directories. Scans these for
818   * history files if the modification time for the directory has changed. Once
819   * it finds history files it starts the process of moving them to the done 
820   * directory.
821   * 
822   * @throws IOException
823   *           if there was a error while scanning
824   */
825  void scanIntermediateDirectory() throws IOException {
826    // TODO it would be great to limit how often this happens, except in the
827    // case where we are looking for a particular job.
828    List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
829        intermediateDoneDirFc, intermediateDoneDirPath, "");
830    LOG.debug("Scanning intermediate dirs");
831    for (FileStatus userDir : userDirList) {
832      String name = userDir.getPath().getName();
833      UserLogDir dir = userDirModificationTimeMap.get(name);
834      if(dir == null) {
835        dir = new UserLogDir();
836        UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
837        if(old != null) {
838          dir = old;
839        }
840      }
841      dir.scanIfNeeded(userDir);
842    }
843  }
844
845  /**
846   * Scans the specified path and populates the intermediate cache.
847   * 
848   * @param absPath
849   * @throws IOException
850   */
851  private void scanIntermediateDirectory(final Path absPath) throws IOException {
852    if (LOG.isDebugEnabled()) {
853      LOG.debug("Scanning intermediate dir " + absPath);
854    }
855    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
856        intermediateDoneDirFc);
857    if (LOG.isDebugEnabled()) {
858      LOG.debug("Found " + fileStatusList.size() + " files");
859    }
860    for (FileStatus fs : fileStatusList) {
861      if (LOG.isDebugEnabled()) {
862        LOG.debug("scanning file: "+ fs.getPath());
863      }
864      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
865          .getName());
866      String confFileName = JobHistoryUtils
867          .getIntermediateConfFileName(jobIndexInfo.getJobId());
868      String summaryFileName = JobHistoryUtils
869          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
870      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
871          .getPath().getParent(), confFileName), new Path(fs.getPath()
872          .getParent(), summaryFileName), jobIndexInfo, false);
873
874      final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
875      if (old == null || old.didMoveFail()) {
876        final HistoryFileInfo found = (old == null) ? fileInfo : old;
877        long cutoff = System.currentTimeMillis() - maxHistoryAge;
878        if(found.getJobIndexInfo().getFinishTime() <= cutoff) {
879          try {
880            found.delete();
881          } catch (IOException e) {
882            LOG.warn("Error cleaning up a HistoryFile that is out of date.", e);
883          }
884        } else {
885          if (LOG.isDebugEnabled()) {
886            LOG.debug("Scheduling move to done of " +found);
887          }
888          moveToDoneExecutor.execute(new Runnable() {
889            @Override
890            public void run() {
891              try {
892                found.moveToDone();
893              } catch (IOException e) {
894                LOG.info("Failed to process fileInfo for job: " + 
895                    found.getJobId(), e);
896              }
897            }
898          });
899        }
900      } else if (!old.isMovePending()) {
901        //This is a duplicate so just delete it
902        if (LOG.isDebugEnabled()) {
903          LOG.debug("Duplicate: deleting");
904        }
905        fileInfo.delete();
906      }
907    }
908  }
909
910  /**
911   * Searches the job history file FileStatus list for the specified JobId.
912   * 
913   * @param fileStatusList
914   *          fileStatus list of Job History Files.
915   * @param jobId
916   *          The JobId to find.
917   * @return A FileInfo object for the jobId, null if not found.
918   * @throws IOException
919   */
920  private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
921      JobId jobId) throws IOException {
922    for (FileStatus fs : fileStatusList) {
923      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
924          .getName());
925      if (jobIndexInfo.getJobId().equals(jobId)) {
926        String confFileName = JobHistoryUtils
927            .getIntermediateConfFileName(jobIndexInfo.getJobId());
928        String summaryFileName = JobHistoryUtils
929            .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
930        HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
931            fs.getPath().getParent(), confFileName), new Path(fs.getPath()
932            .getParent(), summaryFileName), jobIndexInfo, true);
933        return fileInfo;
934      }
935    }
936    return null;
937  }
938
939  /**
940   * Scans old directories known by the idToDateString map for the specified
941   * jobId. If the number of directories is higher than the supported size of
942   * the idToDateString cache, the jobId will not be found.
943   * 
944   * @param jobId
945   *          the jobId.
946   * @return
947   * @throws IOException
948   */
949  private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
950    String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
951        jobId, serialNumberFormat);
952    Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
953    if (dateStringSet == null) {
954      return null;
955    }
956    for (String timestampPart : dateStringSet) {
957      Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
958      List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
959          doneDirFc);
960      HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
961      if (fileInfo != null) {
962        return fileInfo;
963      }
964    }
965    return null;
966  }
967
968  public Collection<HistoryFileInfo> getAllFileInfo() throws IOException {
969    scanIntermediateDirectory();
970    return jobListCache.values();
971  }
972
973  public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
974    // FileInfo available in cache.
975    HistoryFileInfo fileInfo = jobListCache.get(jobId);
976    if (fileInfo != null) {
977      return fileInfo;
978    }
979    // OK so scan the intermediate to be sure we did not lose it that way
980    scanIntermediateDirectory();
981    fileInfo = jobListCache.get(jobId);
982    if (fileInfo != null) {
983      return fileInfo;
984    }
985
986    // Intermediate directory does not contain job. Search through older ones.
987    fileInfo = scanOldDirsForJob(jobId);
988    if (fileInfo != null) {
989      return fileInfo;
990    }
991    return null;
992  }
993
994  private void moveToDoneNow(final Path src, final Path target)
995      throws IOException {
996    LOG.info("Moving " + src.toString() + " to " + target.toString());
997    intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
998  }
999
1000  private String getJobSummary(FileContext fc, Path path) throws IOException {
1001    Path qPath = fc.makeQualified(path);
1002    FSDataInputStream in = null;
1003    String jobSummaryString = null;
1004    try {
1005      in = fc.open(qPath);
1006      jobSummaryString = in.readUTF();
1007    } finally {
1008      if (in != null) {
1009        in.close();
1010      }
1011    }
1012    return jobSummaryString;
1013  }
1014
1015  private void makeDoneSubdir(Path path) throws IOException {
1016    try {
1017      doneDirFc.getFileStatus(path);
1018      existingDoneSubdirs.add(path);
1019    } catch (FileNotFoundException fnfE) {
1020      try {
1021        FsPermission fsp = new FsPermission(
1022            JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
1023        doneDirFc.mkdir(path, fsp, true);
1024        FileStatus fsStatus = doneDirFc.getFileStatus(path);
1025        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
1026            + ", Expected: " + fsp.toShort());
1027        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
1028          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
1029              + ", " + fsp);
1030          doneDirFc.setPermission(path, fsp);
1031        }
1032        existingDoneSubdirs.add(path);
1033      } catch (FileAlreadyExistsException faeE) { // Nothing to do.
1034      }
1035    }
1036  }
1037
1038  private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
1039    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
1040        id, timestampComponent, serialNumberFormat));
1041  }
1042
1043  private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
1044    String timestampComponent = JobHistoryUtils
1045        .timestampDirectoryComponent(millisecondTime);
1046    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
1047        id, timestampComponent, serialNumberFormat));
1048  }
1049
1050  private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
1051    if (finishTime == 0) {
1052      return fileStatus.getModificationTime();
1053    }
1054    return finishTime;
1055  }
1056
1057  private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException {
1058    jobListCache.delete(fileInfo);
1059    fileInfo.delete();
1060  }
1061
1062  List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException {
1063      return JobHistoryUtils.
1064        getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff);
1065  }
1066
1067  /**
1068   * Clean up older history files.
1069   * 
1070   * @throws IOException
1071   *           on any error trying to remove the entries.
1072   */
1073  @SuppressWarnings("unchecked")
1074  void clean() throws IOException {
1075    long cutoff = System.currentTimeMillis() - maxHistoryAge;
1076    boolean halted = false;
1077    List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff);
1078    // Sort in ascending order. Relies on YYYY/MM/DD/Serial
1079    Collections.sort(serialDirList);
1080    for (FileStatus serialDir : serialDirList) {
1081      List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(
1082          serialDir.getPath(), doneDirFc);
1083      for (FileStatus historyFile : historyFileList) {
1084        JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile
1085            .getPath().getName());
1086        long effectiveTimestamp = getEffectiveTimestamp(
1087            jobIndexInfo.getFinishTime(), historyFile);
1088        if (effectiveTimestamp <= cutoff) {
1089          HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
1090              .getJobId());
1091          if (fileInfo == null) {
1092            String confFileName = JobHistoryUtils
1093                .getIntermediateConfFileName(jobIndexInfo.getJobId());
1094
1095            fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path(
1096                historyFile.getPath().getParent(), confFileName), null,
1097                jobIndexInfo, true);
1098          }
1099          deleteJobFromDone(fileInfo);
1100        } else {
1101          halted = true;
1102          break;
1103        }
1104      }
1105      if (!halted) {
1106        deleteDir(serialDir);
1107        removeDirectoryFromSerialNumberIndex(serialDir.getPath());
1108        existingDoneSubdirs.remove(serialDir.getPath());
1109      } else {
1110        break; // Don't scan any more directories.
1111      }
1112    }
1113  }
1114  
1115  protected boolean deleteDir(FileStatus serialDir)
1116      throws AccessControlException, FileNotFoundException,
1117      UnsupportedFileSystemException, IOException {
1118    return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
1119  }
1120
1121  // for test
1122  @VisibleForTesting
1123  void setMaxHistoryAge(long newValue){
1124    maxHistoryAge=newValue;
1125  } 
1126}