001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce.v2.hs;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.ConnectException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.NavigableSet;
031import java.util.Set;
032import java.util.SortedMap;
033import java.util.TreeMap;
034import java.util.concurrent.ConcurrentHashMap;
035import java.util.concurrent.ConcurrentMap;
036import java.util.concurrent.ConcurrentSkipListMap;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.concurrent.ThreadFactory;
039import java.util.concurrent.ThreadPoolExecutor;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicInteger;
042
043import org.apache.commons.logging.Log;
044import org.apache.commons.logging.LogFactory;
045import org.apache.hadoop.classification.InterfaceAudience;
046import org.apache.hadoop.classification.InterfaceStability;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FSDataInputStream;
049import org.apache.hadoop.fs.FileAlreadyExistsException;
050import org.apache.hadoop.fs.FileContext;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.Options;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.fs.PathFilter;
055import org.apache.hadoop.fs.RemoteIterator;
056import org.apache.hadoop.fs.UnsupportedFileSystemException;
057import org.apache.hadoop.fs.permission.FsPermission;
058import org.apache.hadoop.mapred.JobACLsManager;
059import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
060import org.apache.hadoop.mapreduce.v2.api.records.JobId;
061import org.apache.hadoop.mapreduce.v2.app.job.Job;
062import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
063import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
064import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
065import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
066import org.apache.hadoop.security.AccessControlException;
067import org.apache.hadoop.service.AbstractService;
068import org.apache.hadoop.util.ShutdownThreadsHelper;
069import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
070
071import com.google.common.annotations.VisibleForTesting;
072import com.google.common.util.concurrent.ThreadFactoryBuilder;
073import org.apache.hadoop.yarn.util.Clock;
074import org.apache.hadoop.yarn.util.SystemClock;
075
076/**
077 * This class provides a way to interact with history files in a thread safe
078 * manor.
079 */
080@InterfaceAudience.Public
081@InterfaceStability.Unstable
082public class HistoryFileManager extends AbstractService {
083  private static final Log LOG = LogFactory.getLog(HistoryFileManager.class);
084  private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
085
086  private static enum HistoryInfoState {
087    IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
088  };
089  
090  private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
091      .doneSubdirsBeforeSerialTail();
092
093  /**
094   * Maps between a serial number (generated based on jobId) and the timestamp
095   * component(s) to which it belongs. Facilitates jobId based searches. If a
096   * jobId is not found in this list - it will not be found.
097   */
098  private static class SerialNumberIndex {
099    private SortedMap<String, Set<String>> cache;
100    private int maxSize;
101
102    public SerialNumberIndex(int maxSize) {
103      this.cache = new TreeMap<String, Set<String>>();
104      this.maxSize = maxSize;
105    }
106
107    public synchronized void add(String serialPart, String timestampPart) {
108      if (!cache.containsKey(serialPart)) {
109        cache.put(serialPart, new HashSet<String>());
110        if (cache.size() > maxSize) {
111          String key = cache.firstKey();
112          LOG.error("Dropping " + key
113              + " from the SerialNumberIndex. We will no "
114              + "longer be able to see jobs that are in that serial index for "
115              + cache.get(key));
116          cache.remove(key);
117        }
118      }
119      Set<String> datePartSet = cache.get(serialPart);
120      datePartSet.add(timestampPart);
121    }
122
123    public synchronized void remove(String serialPart, String timeStampPart) {
124      if (cache.containsKey(serialPart)) {
125        Set<String> set = cache.get(serialPart);
126        set.remove(timeStampPart);
127        if (set.isEmpty()) {
128          cache.remove(serialPart);
129        }
130      }
131    }
132
133    public synchronized Set<String> get(String serialPart) {
134      Set<String> found = cache.get(serialPart);
135      if (found != null) {
136        return new HashSet<String>(found);
137      }
138      return null;
139    }
140  }
141
142  /**
143   * Wrapper around {@link ConcurrentSkipListMap} that maintains size along
144   * side for O(1) size() implementation for use in JobListCache.
145   *
146   * Note: The size is not updated atomically with changes additions/removals.
147   * This race can lead to size() returning an incorrect size at times.
148   */
149  static class JobIdHistoryFileInfoMap {
150    private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
151    private AtomicInteger mapSize;
152
153    JobIdHistoryFileInfoMap() {
154      cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
155      mapSize = new AtomicInteger();
156    }
157
158    public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) {
159      HistoryFileInfo ret = cache.putIfAbsent(key, value);
160      if (ret == null) {
161        mapSize.incrementAndGet();
162      }
163      return ret;
164    }
165
166    public HistoryFileInfo remove(JobId key) {
167      HistoryFileInfo ret = cache.remove(key);
168      if (ret != null) {
169        mapSize.decrementAndGet();
170      }
171      return ret;
172    }
173
174    /**
175     * Returns the recorded size of the internal map. Note that this could be out
176     * of sync with the actual size of the map
177     * @return "recorded" size
178     */
179    public int size() {
180      return mapSize.get();
181    }
182
183    public HistoryFileInfo get(JobId key) {
184      return cache.get(key);
185    }
186
187    public NavigableSet<JobId> navigableKeySet() {
188      return cache.navigableKeySet();
189    }
190
191    public Collection<HistoryFileInfo> values() {
192      return cache.values();
193    }
194  }
195
196  static class JobListCache {
197    private JobIdHistoryFileInfoMap cache;
198    private int maxSize;
199    private long maxAge;
200
201    public JobListCache(int maxSize, long maxAge) {
202      this.maxSize = maxSize;
203      this.maxAge = maxAge;
204      this.cache = new JobIdHistoryFileInfoMap();
205    }
206
207    public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
208      JobId jobId = fileInfo.getJobId();
209      if (LOG.isDebugEnabled()) {
210        LOG.debug("Adding " + jobId + " to job list cache with "
211            + fileInfo.getJobIndexInfo());
212      }
213      HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo);
214      if (cache.size() > maxSize) {
215        //There is a race here, where more then one thread could be trying to
216        // remove entries.  This could result in too many entries being removed
217        // from the cache.  This is considered OK as the size of the cache
218        // should be rather large, and we would rather have performance over
219        // keeping the cache size exactly at the maximum.
220        Iterator<JobId> keys = cache.navigableKeySet().iterator();
221        long cutoff = System.currentTimeMillis() - maxAge;
222        while(cache.size() > maxSize && keys.hasNext()) {
223          JobId key = keys.next();
224          HistoryFileInfo firstValue = cache.get(key);
225          if(firstValue != null) {
226            synchronized(firstValue) {
227              if (firstValue.isMovePending()) {
228                if(firstValue.didMoveFail() && 
229                    firstValue.jobIndexInfo.getFinishTime() <= cutoff) {
230                  cache.remove(key);
231                  //Now lets try to delete it
232                  try {
233                    firstValue.delete();
234                  } catch (IOException e) {
235                    LOG.error("Error while trying to delete history files" +
236                                " that could not be moved to done.", e);
237                  }
238                } else {
239                  LOG.warn("Waiting to remove " + key
240                      + " from JobListCache because it is not in done yet.");
241                }
242              } else {
243                cache.remove(key);
244              }
245            }
246          }
247        }
248      }
249      return old;
250    }
251
252    public void delete(HistoryFileInfo fileInfo) {
253      if (LOG.isDebugEnabled()) {
254        LOG.debug("Removing from cache " + fileInfo);
255      }
256      cache.remove(fileInfo.getJobId());
257    }
258
259    public Collection<HistoryFileInfo> values() {
260      return new ArrayList<HistoryFileInfo>(cache.values());
261    }
262
263    public HistoryFileInfo get(JobId jobId) {
264      return cache.get(jobId);
265    }
266
267    public boolean isFull() {
268      return cache.size() >= maxSize;
269    }
270  }
271
272  /**
273   * This class represents a user dir in the intermediate done directory.  This
274   * is mostly for locking purposes. 
275   */
276  private class UserLogDir {
277    long modTime = 0;
278    
279    public synchronized void scanIfNeeded(FileStatus fs) {
280      long newModTime = fs.getModificationTime();
281      if (modTime != newModTime) {
282        Path p = fs.getPath();
283        try {
284          scanIntermediateDirectory(p);
285          //If scanning fails, we will scan again.  We assume the failure is
286          // temporary.
287          modTime = newModTime;
288        } catch (IOException e) {
289          LOG.error("Error while trying to scan the directory " + p, e);
290        }
291      } else {
292        if (LOG.isDebugEnabled()) {
293          LOG.debug("Scan not needed of " + fs.getPath());
294        }
295      }
296    }
297  }
298  
299  public class HistoryFileInfo {
300    private Path historyFile;
301    private Path confFile;
302    private Path summaryFile;
303    private JobIndexInfo jobIndexInfo;
304    private HistoryInfoState state;
305
306    @VisibleForTesting
307    protected HistoryFileInfo(Path historyFile, Path confFile,
308        Path summaryFile, JobIndexInfo jobIndexInfo, boolean isInDone) {
309      this.historyFile = historyFile;
310      this.confFile = confFile;
311      this.summaryFile = summaryFile;
312      this.jobIndexInfo = jobIndexInfo;
313      state = isInDone ? HistoryInfoState.IN_DONE
314          : HistoryInfoState.IN_INTERMEDIATE;
315    }
316
317    @VisibleForTesting
318    synchronized boolean isMovePending() {
319      return state == HistoryInfoState.IN_INTERMEDIATE
320          || state == HistoryInfoState.MOVE_FAILED;
321    }
322
323    @VisibleForTesting
324    synchronized boolean didMoveFail() {
325      return state == HistoryInfoState.MOVE_FAILED;
326    }
327    
328    /**
329     * @return true if the files backed by this were deleted.
330     */
331    public synchronized boolean isDeleted() {
332      return state == HistoryInfoState.DELETED;
333    }
334
335    @Override
336    public String toString() {
337      return "HistoryFileInfo jobID " + getJobId()
338             + " historyFile = " + historyFile;
339    }
340
341    @VisibleForTesting
342    synchronized void moveToDone() throws IOException {
343      if (LOG.isDebugEnabled()) {
344        LOG.debug("moveToDone: " + historyFile);
345      }
346      if (!isMovePending()) {
347        // It was either deleted or is already in done. Either way do nothing
348        if (LOG.isDebugEnabled()) {
349          LOG.debug("Move no longer pending");
350        }
351        return;
352      }
353      try {
354        long completeTime = jobIndexInfo.getFinishTime();
355        if (completeTime == 0) {
356          completeTime = System.currentTimeMillis();
357        }
358        JobId jobId = jobIndexInfo.getJobId();
359
360        List<Path> paths = new ArrayList<Path>(2);
361        if (historyFile == null) {
362          LOG.info("No file for job-history with " + jobId + " found in cache!");
363        } else {
364          paths.add(historyFile);
365        }
366
367        if (confFile == null) {
368          LOG.info("No file for jobConf with " + jobId + " found in cache!");
369        } else {
370          paths.add(confFile);
371        }
372
373        if (summaryFile == null || !intermediateDoneDirFc.util().exists(
374            summaryFile)) {
375          LOG.info("No summary file for job: " + jobId);
376        } else {
377          String jobSummaryString = getJobSummary(intermediateDoneDirFc,
378              summaryFile);
379          SUMMARY_LOG.info(jobSummaryString);
380          LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
381          intermediateDoneDirFc.delete(summaryFile, false);
382          summaryFile = null;
383        }
384
385        Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
386        addDirectoryToSerialNumberIndex(targetDir);
387        makeDoneSubdir(targetDir);
388        if (historyFile != null) {
389          Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile
390              .getName()));
391          if (!toPath.equals(historyFile)) {
392            moveToDoneNow(historyFile, toPath);
393            historyFile = toPath;
394          }
395        }
396        if (confFile != null) {
397          Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile
398              .getName()));
399          if (!toPath.equals(confFile)) {
400            moveToDoneNow(confFile, toPath);
401            confFile = toPath;
402          }
403        }
404        state = HistoryInfoState.IN_DONE;
405      } catch (Throwable t) {
406        LOG.error("Error while trying to move a job to done", t);
407        this.state = HistoryInfoState.MOVE_FAILED;
408      }
409    }
410
411    /**
412     * Parse a job from the JobHistoryFile, if the underlying file is not going
413     * to be deleted.
414     * 
415     * @return the Job or null if the underlying file was deleted.
416     * @throws IOException
417     *           if there is an error trying to read the file.
418     */
419    public synchronized Job loadJob() throws IOException {
420      return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile,
421          false, jobIndexInfo.getUser(), this, aclsMgr);
422    }
423
424    /**
425     * Return the history file.  This should only be used for testing.
426     * @return the history file.
427     */
428    synchronized Path getHistoryFile() {
429      return historyFile;
430    }
431    
432    protected synchronized void delete() throws IOException {
433      if (LOG.isDebugEnabled()) {
434        LOG.debug("deleting " + historyFile + " and " + confFile);
435      }
436      state = HistoryInfoState.DELETED;
437      doneDirFc.delete(doneDirFc.makeQualified(historyFile), false);
438      doneDirFc.delete(doneDirFc.makeQualified(confFile), false);
439    }
440
441    public JobIndexInfo getJobIndexInfo() {
442      return jobIndexInfo;
443    }
444
445    public JobId getJobId() {
446      return jobIndexInfo.getJobId();
447    }
448
449    public synchronized Path getConfFile() {
450      return confFile;
451    }
452    
453    public synchronized Configuration loadConfFile() throws IOException {
454      FileContext fc = FileContext.getFileContext(confFile.toUri(), conf);
455      Configuration jobConf = new Configuration(false);
456      jobConf.addResource(fc.open(confFile), confFile.toString());
457      return jobConf;
458    }
459  }
460
461  private SerialNumberIndex serialNumberIndex = null;
462  protected JobListCache jobListCache = null;
463
464  // Maintains a list of known done subdirectories.
465  private final Set<Path> existingDoneSubdirs = Collections
466      .synchronizedSet(new HashSet<Path>());
467
468  /**
469   * Maintains a mapping between intermediate user directories and the last
470   * known modification time.
471   */
472  private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 
473    new ConcurrentHashMap<String, UserLogDir>();
474
475  private JobACLsManager aclsMgr;
476
477  @VisibleForTesting
478  Configuration conf;
479
480  private String serialNumberFormat;
481
482  private Path doneDirPrefixPath = null; // folder for completed jobs
483  private FileContext doneDirFc; // done Dir FileContext
484
485  private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path
486  private FileContext intermediateDoneDirFc; // Intermediate Done Dir
487                                             // FileContext
488  @VisibleForTesting
489  protected ThreadPoolExecutor moveToDoneExecutor = null;
490  private long maxHistoryAge = 0;
491  
492  public HistoryFileManager() {
493    super(HistoryFileManager.class.getName());
494  }
495
496  @Override
497  protected void serviceInit(Configuration conf) throws Exception {
498    this.conf = conf;
499
500    int serialNumberLowDigits = 3;
501    serialNumberFormat = ("%0"
502        + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits)
503        + "d");
504
505    long maxFSWaitTime = conf.getLong(
506        JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME,
507        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
508    createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime);
509
510    this.aclsMgr = new JobACLsManager(conf);
511
512    maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
513        JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
514    
515    jobListCache = createJobListCache();
516
517    serialNumberIndex = new SerialNumberIndex(conf.getInt(
518        JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
519        JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE));
520
521    int numMoveThreads = conf.getInt(
522        JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
523        JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
524    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
525        "MoveIntermediateToDone Thread #%d").build();
526    moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
527        1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
528
529    super.serviceInit(conf);
530  }
531
532  @VisibleForTesting
533  void createHistoryDirs(Clock clock, long intervalCheckMillis,
534      long timeOutMillis) throws IOException {
535    long start = clock.getTime();
536    boolean done = false;
537    int counter = 0;
538    while (!done &&
539        ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) {
540      done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec
541      try {
542        Thread.sleep(intervalCheckMillis);
543      } catch (InterruptedException ex) {
544        throw new YarnRuntimeException(ex);
545      }
546    }
547    if (!done) {
548      throw new YarnRuntimeException("Timed out '" + timeOutMillis+
549              "ms' waiting for FileSystem to become available");
550    }
551  }
552
553  /**
554   * DistributedFileSystem returns a RemoteException with a message stating
555   * SafeModeException in it. So this is only way to check it is because of
556   * being in safe mode.
557   */
558  private boolean isBecauseSafeMode(Throwable ex) {
559    return ex.toString().contains("SafeModeException");
560  }
561
562  /**
563   * Returns TRUE if the history dirs were created, FALSE if they could not
564   * be created because the FileSystem is not reachable or in safe mode and
565   * throws and exception otherwise.
566   */
567  @VisibleForTesting
568  boolean tryCreatingHistoryDirs(boolean logWait) throws IOException {
569    boolean succeeded = true;
570    String doneDirPrefix = JobHistoryUtils.
571        getConfiguredHistoryServerDoneDirPrefix(conf);
572    try {
573      doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
574          new Path(doneDirPrefix));
575      doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
576      doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK);
577      mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
578          JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
579    } catch (ConnectException ex) {
580      if (logWait) {
581        LOG.info("Waiting for FileSystem at " +
582            doneDirPrefixPath.toUri().getAuthority()  + "to be available");
583      }
584      succeeded = false;
585    } catch (IOException e) {
586      if (isBecauseSafeMode(e)) {
587        succeeded = false;
588        if (logWait) {
589          LOG.info("Waiting for FileSystem at " +
590              doneDirPrefixPath.toUri().getAuthority() +
591              "to be out of safe mode");
592        }
593      } else {
594        throw new YarnRuntimeException("Error creating done directory: ["
595            + doneDirPrefixPath + "]", e);
596      }
597    }
598    if (succeeded) {
599      String intermediateDoneDirPrefix = JobHistoryUtils.
600          getConfiguredHistoryIntermediateDoneDirPrefix(conf);
601      try {
602        intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified(
603            new Path(intermediateDoneDirPrefix));
604        intermediateDoneDirFc = FileContext.getFileContext(
605            intermediateDoneDirPath.toUri(), conf);
606        mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
607            JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
608      } catch (ConnectException ex) {
609        succeeded = false;
610        if (logWait) {
611          LOG.info("Waiting for FileSystem at " +
612              intermediateDoneDirPath.toUri().getAuthority() +
613              "to be available");
614        }
615      } catch (IOException e) {
616        if (isBecauseSafeMode(e)) {
617          succeeded = false;
618          if (logWait) {
619            LOG.info("Waiting for FileSystem at " +
620                intermediateDoneDirPath.toUri().getAuthority() +
621                "to be out of safe mode");
622          }
623        } else {
624          throw new YarnRuntimeException(
625              "Error creating intermediate done directory: ["
626              + intermediateDoneDirPath + "]", e);
627        }
628      }
629    }
630    return succeeded;
631  }
632
633  @Override
634  public void serviceStop() throws Exception {
635    ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor);
636    super.serviceStop();
637  }
638
639  protected JobListCache createJobListCache() {
640    return new JobListCache(conf.getInt(
641        JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
642        JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge);
643  }
644
645  private void mkdir(FileContext fc, Path path, FsPermission fsp)
646      throws IOException {
647    if (!fc.util().exists(path)) {
648      try {
649        fc.mkdir(path, fsp, true);
650
651        FileStatus fsStatus = fc.getFileStatus(path);
652        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
653            + ", Expected: " + fsp.toShort());
654        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
655          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
656              + ", " + fsp);
657          fc.setPermission(path, fsp);
658        }
659      } catch (FileAlreadyExistsException e) {
660        LOG.info("Directory: [" + path + "] already exists.");
661      }
662    }
663  }
664
665  /**
666   * Populates index data structures. Should only be called at initialization
667   * times.
668   */
669  @SuppressWarnings("unchecked")
670  void initExisting() throws IOException {
671    LOG.info("Initializing Existing Jobs...");
672    List<FileStatus> timestampedDirList = findTimestampedDirectories();
673    // Sort first just so insertion is in a consistent order
674    Collections.sort(timestampedDirList);
675    for (FileStatus fs : timestampedDirList) {
676      // TODO Could verify the correct format for these directories.
677      addDirectoryToSerialNumberIndex(fs.getPath());
678    }
679    for (int i= timestampedDirList.size() - 1;
680        i >= 0 && !jobListCache.isFull(); i--) {
681      FileStatus fs = timestampedDirList.get(i); 
682      addDirectoryToJobListCache(fs.getPath());
683    }
684  }
685
686  private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
687    String serialPart = serialDirPath.getName();
688    String timeStampPart = JobHistoryUtils
689        .getTimestampPartFromPath(serialDirPath.toString());
690    if (timeStampPart == null) {
691      LOG.warn("Could not find timestamp portion from path: "
692          + serialDirPath.toString() + ". Continuing with next");
693      return;
694    }
695    if (serialPart == null) {
696      LOG.warn("Could not find serial portion from path: "
697          + serialDirPath.toString() + ". Continuing with next");
698      return;
699    }
700    serialNumberIndex.remove(serialPart, timeStampPart);
701  }
702
703  private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
704    if (LOG.isDebugEnabled()) {
705      LOG.debug("Adding " + serialDirPath + " to serial index");
706    }
707    String serialPart = serialDirPath.getName();
708    String timestampPart = JobHistoryUtils
709        .getTimestampPartFromPath(serialDirPath.toString());
710    if (timestampPart == null) {
711      LOG.warn("Could not find timestamp portion from path: " + serialDirPath
712          + ". Continuing with next");
713      return;
714    }
715    if (serialPart == null) {
716      LOG.warn("Could not find serial portion from path: "
717          + serialDirPath.toString() + ". Continuing with next");
718    } else {
719      serialNumberIndex.add(serialPart, timestampPart);
720    }
721  }
722
723  private void addDirectoryToJobListCache(Path path) throws IOException {
724    if (LOG.isDebugEnabled()) {
725      LOG.debug("Adding " + path + " to job list cache.");
726    }
727    List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
728        doneDirFc);
729    for (FileStatus fs : historyFileList) {
730      if (LOG.isDebugEnabled()) {
731        LOG.debug("Adding in history for " + fs.getPath());
732      }
733      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
734          .getName());
735      String confFileName = JobHistoryUtils
736          .getIntermediateConfFileName(jobIndexInfo.getJobId());
737      String summaryFileName = JobHistoryUtils
738          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
739      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
740          .getPath().getParent(), confFileName), new Path(fs.getPath()
741          .getParent(), summaryFileName), jobIndexInfo, true);
742      jobListCache.addIfAbsent(fileInfo);
743    }
744  }
745
746  @VisibleForTesting
747  protected static List<FileStatus> scanDirectory(Path path, FileContext fc,
748      PathFilter pathFilter) throws IOException {
749    path = fc.makeQualified(path);
750    List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
751    try {
752      RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
753      while (fileStatusIter.hasNext()) {
754        FileStatus fileStatus = fileStatusIter.next();
755        Path filePath = fileStatus.getPath();
756        if (fileStatus.isFile() && pathFilter.accept(filePath)) {
757          jhStatusList.add(fileStatus);
758        }
759      }
760    } catch (FileNotFoundException fe) {
761      LOG.error("Error while scanning directory " + path, fe);
762    }
763    return jhStatusList;
764  }
765
766  protected List<FileStatus> scanDirectoryForHistoryFiles(Path path,
767      FileContext fc) throws IOException {
768    return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
769  }
770  
771  /**
772   * Finds all history directories with a timestamp component by scanning the
773   * filesystem. Used when the JobHistory server is started.
774   * 
775   * @return list of history directories
776   */
777  protected List<FileStatus> findTimestampedDirectories() throws IOException {
778    List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
779        doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
780    return fsList;
781  }
782
783  /**
784   * Scans the intermediate directory to find user directories. Scans these for
785   * history files if the modification time for the directory has changed. Once
786   * it finds history files it starts the process of moving them to the done 
787   * directory.
788   * 
789   * @throws IOException
790   *           if there was a error while scanning
791   */
792  void scanIntermediateDirectory() throws IOException {
793    // TODO it would be great to limit how often this happens, except in the
794    // case where we are looking for a particular job.
795    List<FileStatus> userDirList = JobHistoryUtils.localGlobber(
796        intermediateDoneDirFc, intermediateDoneDirPath, "");
797    LOG.debug("Scanning intermediate dirs");
798    for (FileStatus userDir : userDirList) {
799      String name = userDir.getPath().getName();
800      UserLogDir dir = userDirModificationTimeMap.get(name);
801      if(dir == null) {
802        dir = new UserLogDir();
803        UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
804        if(old != null) {
805          dir = old;
806        }
807      }
808      dir.scanIfNeeded(userDir);
809    }
810  }
811
812  /**
813   * Scans the specified path and populates the intermediate cache.
814   * 
815   * @param absPath
816   * @throws IOException
817   */
818  private void scanIntermediateDirectory(final Path absPath) throws IOException {
819    if (LOG.isDebugEnabled()) {
820      LOG.debug("Scanning intermediate dir " + absPath);
821    }
822    List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
823        intermediateDoneDirFc);
824    if (LOG.isDebugEnabled()) {
825      LOG.debug("Found " + fileStatusList.size() + " files");
826    }
827    for (FileStatus fs : fileStatusList) {
828      if (LOG.isDebugEnabled()) {
829        LOG.debug("scanning file: "+ fs.getPath());
830      }
831      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
832          .getName());
833      String confFileName = JobHistoryUtils
834          .getIntermediateConfFileName(jobIndexInfo.getJobId());
835      String summaryFileName = JobHistoryUtils
836          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
837      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
838          .getPath().getParent(), confFileName), new Path(fs.getPath()
839          .getParent(), summaryFileName), jobIndexInfo, false);
840
841      final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo);
842      if (old == null || old.didMoveFail()) {
843        final HistoryFileInfo found = (old == null) ? fileInfo : old;
844        long cutoff = System.currentTimeMillis() - maxHistoryAge;
845        if(found.getJobIndexInfo().getFinishTime() <= cutoff) {
846          try {
847            found.delete();
848          } catch (IOException e) {
849            LOG.warn("Error cleaning up a HistoryFile that is out of date.", e);
850          }
851        } else {
852          if (LOG.isDebugEnabled()) {
853            LOG.debug("Scheduling move to done of " +found);
854          }
855          moveToDoneExecutor.execute(new Runnable() {
856            @Override
857            public void run() {
858              try {
859                found.moveToDone();
860              } catch (IOException e) {
861                LOG.info("Failed to process fileInfo for job: " + 
862                    found.getJobId(), e);
863              }
864            }
865          });
866        }
867      } else if (!old.isMovePending()) {
868        //This is a duplicate so just delete it
869        if (LOG.isDebugEnabled()) {
870          LOG.debug("Duplicate: deleting");
871        }
872        fileInfo.delete();
873      }
874    }
875  }
876
877  /**
878   * Searches the job history file FileStatus list for the specified JobId.
879   * 
880   * @param fileStatusList
881   *          fileStatus list of Job History Files.
882   * @param jobId
883   *          The JobId to find.
884   * @return A FileInfo object for the jobId, null if not found.
885   * @throws IOException
886   */
887  private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList,
888      JobId jobId) throws IOException {
889    for (FileStatus fs : fileStatusList) {
890      JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
891          .getName());
892      if (jobIndexInfo.getJobId().equals(jobId)) {
893        String confFileName = JobHistoryUtils
894            .getIntermediateConfFileName(jobIndexInfo.getJobId());
895        String summaryFileName = JobHistoryUtils
896            .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
897        HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
898            fs.getPath().getParent(), confFileName), new Path(fs.getPath()
899            .getParent(), summaryFileName), jobIndexInfo, true);
900        return fileInfo;
901      }
902    }
903    return null;
904  }
905
906  /**
907   * Scans old directories known by the idToDateString map for the specified
908   * jobId. If the number of directories is higher than the supported size of
909   * the idToDateString cache, the jobId will not be found.
910   * 
911   * @param jobId
912   *          the jobId.
913   * @return
914   * @throws IOException
915   */
916  private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
917    String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent(
918        jobId, serialNumberFormat);
919    Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber);
920    if (dateStringSet == null) {
921      return null;
922    }
923    for (String timestampPart : dateStringSet) {
924      Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
925      List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
926          doneDirFc);
927      HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId);
928      if (fileInfo != null) {
929        return fileInfo;
930      }
931    }
932    return null;
933  }
934
935  public Collection<HistoryFileInfo> getAllFileInfo() throws IOException {
936    scanIntermediateDirectory();
937    return jobListCache.values();
938  }
939
940  public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
941    // FileInfo available in cache.
942    HistoryFileInfo fileInfo = jobListCache.get(jobId);
943    if (fileInfo != null) {
944      return fileInfo;
945    }
946    // OK so scan the intermediate to be sure we did not lose it that way
947    scanIntermediateDirectory();
948    fileInfo = jobListCache.get(jobId);
949    if (fileInfo != null) {
950      return fileInfo;
951    }
952
953    // Intermediate directory does not contain job. Search through older ones.
954    fileInfo = scanOldDirsForJob(jobId);
955    if (fileInfo != null) {
956      return fileInfo;
957    }
958    return null;
959  }
960
961  private void moveToDoneNow(final Path src, final Path target)
962      throws IOException {
963    LOG.info("Moving " + src.toString() + " to " + target.toString());
964    intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
965  }
966
967  private String getJobSummary(FileContext fc, Path path) throws IOException {
968    Path qPath = fc.makeQualified(path);
969    FSDataInputStream in = fc.open(qPath);
970    String jobSummaryString = in.readUTF();
971    in.close();
972    return jobSummaryString;
973  }
974
975  private void makeDoneSubdir(Path path) throws IOException {
976    try {
977      doneDirFc.getFileStatus(path);
978      existingDoneSubdirs.add(path);
979    } catch (FileNotFoundException fnfE) {
980      try {
981        FsPermission fsp = new FsPermission(
982            JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
983        doneDirFc.mkdir(path, fsp, true);
984        FileStatus fsStatus = doneDirFc.getFileStatus(path);
985        LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
986            + ", Expected: " + fsp.toShort());
987        if (fsStatus.getPermission().toShort() != fsp.toShort()) {
988          LOG.info("Explicitly setting permissions to : " + fsp.toShort()
989              + ", " + fsp);
990          doneDirFc.setPermission(path, fsp);
991        }
992        existingDoneSubdirs.add(path);
993      } catch (FileAlreadyExistsException faeE) { // Nothing to do.
994      }
995    }
996  }
997
998  private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
999    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
1000        id, timestampComponent, serialNumberFormat));
1001  }
1002
1003  private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
1004    String timestampComponent = JobHistoryUtils
1005        .timestampDirectoryComponent(millisecondTime);
1006    return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(
1007        id, timestampComponent, serialNumberFormat));
1008  }
1009
1010  private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
1011    if (finishTime == 0) {
1012      return fileStatus.getModificationTime();
1013    }
1014    return finishTime;
1015  }
1016
1017  private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException {
1018    jobListCache.delete(fileInfo);
1019    fileInfo.delete();
1020  }
1021
1022  List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException {
1023      return JobHistoryUtils.
1024        getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff);
1025  }
1026
1027  /**
1028   * Clean up older history files.
1029   * 
1030   * @throws IOException
1031   *           on any error trying to remove the entries.
1032   */
1033  @SuppressWarnings("unchecked")
1034  void clean() throws IOException {
1035    long cutoff = System.currentTimeMillis() - maxHistoryAge;
1036    boolean halted = false;
1037    List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff);
1038    // Sort in ascending order. Relies on YYYY/MM/DD/Serial
1039    Collections.sort(serialDirList);
1040    for (FileStatus serialDir : serialDirList) {
1041      List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(
1042          serialDir.getPath(), doneDirFc);
1043      for (FileStatus historyFile : historyFileList) {
1044        JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile
1045            .getPath().getName());
1046        long effectiveTimestamp = getEffectiveTimestamp(
1047            jobIndexInfo.getFinishTime(), historyFile);
1048        if (effectiveTimestamp <= cutoff) {
1049          HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo
1050              .getJobId());
1051          if (fileInfo == null) {
1052            String confFileName = JobHistoryUtils
1053                .getIntermediateConfFileName(jobIndexInfo.getJobId());
1054
1055            fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path(
1056                historyFile.getPath().getParent(), confFileName), null,
1057                jobIndexInfo, true);
1058          }
1059          deleteJobFromDone(fileInfo);
1060        } else {
1061          halted = true;
1062          break;
1063        }
1064      }
1065      if (!halted) {
1066        deleteDir(serialDir);
1067        removeDirectoryFromSerialNumberIndex(serialDir.getPath());
1068        existingDoneSubdirs.remove(serialDir.getPath());
1069      } else {
1070        break; // Don't scan any more directories.
1071      }
1072    }
1073  }
1074  
1075  protected boolean deleteDir(FileStatus serialDir)
1076      throws AccessControlException, FileNotFoundException,
1077      UnsupportedFileSystemException, IOException {
1078    return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true);
1079  }
1080
1081  // for test
1082  @VisibleForTesting
1083  void setMaxHistoryAge(long newValue){
1084    maxHistoryAge=newValue;
1085  } 
1086}