001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.v2.hs; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.ConnectException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.NavigableSet; 031import java.util.Set; 032import java.util.SortedMap; 033import java.util.TreeMap; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ConcurrentMap; 036import java.util.concurrent.ConcurrentSkipListMap; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.ThreadFactory; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045import org.apache.hadoop.classification.InterfaceAudience; 046import org.apache.hadoop.classification.InterfaceStability; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FSDataInputStream; 049import org.apache.hadoop.fs.FileAlreadyExistsException; 050import org.apache.hadoop.fs.FileContext; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.Options; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.fs.PathFilter; 055import org.apache.hadoop.fs.RemoteIterator; 056import org.apache.hadoop.fs.UnsupportedFileSystemException; 057import org.apache.hadoop.fs.permission.FsPermission; 058import org.apache.hadoop.mapred.JobACLsManager; 059import org.apache.hadoop.mapreduce.jobhistory.JobSummary; 060import org.apache.hadoop.mapreduce.v2.api.records.JobId; 061import org.apache.hadoop.mapreduce.v2.app.job.Job; 062import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; 063import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; 064import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; 065import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; 066import org.apache.hadoop.security.AccessControlException; 067import org.apache.hadoop.service.AbstractService; 068import org.apache.hadoop.util.ShutdownThreadsHelper; 069import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; 070import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 071 072import com.google.common.annotations.VisibleForTesting; 073import com.google.common.util.concurrent.ThreadFactoryBuilder; 074import org.apache.hadoop.yarn.util.Clock; 075import org.apache.hadoop.yarn.util.SystemClock; 076 077/** 078 * This class provides a way to interact with history files in a thread safe 079 * manor. 080 */ 081@InterfaceAudience.Public 082@InterfaceStability.Unstable 083public class HistoryFileManager extends AbstractService { 084 private static final Log LOG = LogFactory.getLog(HistoryFileManager.class); 085 private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); 086 087 private static enum HistoryInfoState { 088 IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED 089 }; 090 091 private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils 092 .doneSubdirsBeforeSerialTail(); 093 094 /** 095 * Maps between a serial number (generated based on jobId) and the timestamp 096 * component(s) to which it belongs. Facilitates jobId based searches. If a 097 * jobId is not found in this list - it will not be found. 098 */ 099 private static class SerialNumberIndex { 100 private SortedMap<String, Set<String>> cache; 101 private int maxSize; 102 103 public SerialNumberIndex(int maxSize) { 104 this.cache = new TreeMap<String, Set<String>>(); 105 this.maxSize = maxSize; 106 } 107 108 public synchronized void add(String serialPart, String timestampPart) { 109 if (!cache.containsKey(serialPart)) { 110 cache.put(serialPart, new HashSet<String>()); 111 if (cache.size() > maxSize) { 112 String key = cache.firstKey(); 113 LOG.error("Dropping " + key 114 + " from the SerialNumberIndex. We will no " 115 + "longer be able to see jobs that are in that serial index for " 116 + cache.get(key)); 117 cache.remove(key); 118 } 119 } 120 Set<String> datePartSet = cache.get(serialPart); 121 datePartSet.add(timestampPart); 122 } 123 124 public synchronized void remove(String serialPart, String timeStampPart) { 125 if (cache.containsKey(serialPart)) { 126 Set<String> set = cache.get(serialPart); 127 set.remove(timeStampPart); 128 if (set.isEmpty()) { 129 cache.remove(serialPart); 130 } 131 } 132 } 133 134 public synchronized Set<String> get(String serialPart) { 135 Set<String> found = cache.get(serialPart); 136 if (found != null) { 137 return new HashSet<String>(found); 138 } 139 return null; 140 } 141 } 142 143 /** 144 * Wrapper around {@link ConcurrentSkipListMap} that maintains size along 145 * side for O(1) size() implementation for use in JobListCache. 146 * 147 * Note: The size is not updated atomically with changes additions/removals. 148 * This race can lead to size() returning an incorrect size at times. 149 */ 150 static class JobIdHistoryFileInfoMap { 151 private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache; 152 private AtomicInteger mapSize; 153 154 JobIdHistoryFileInfoMap() { 155 cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>(); 156 mapSize = new AtomicInteger(); 157 } 158 159 public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) { 160 HistoryFileInfo ret = cache.putIfAbsent(key, value); 161 if (ret == null) { 162 mapSize.incrementAndGet(); 163 } 164 return ret; 165 } 166 167 public HistoryFileInfo remove(JobId key) { 168 HistoryFileInfo ret = cache.remove(key); 169 if (ret != null) { 170 mapSize.decrementAndGet(); 171 } 172 return ret; 173 } 174 175 /** 176 * Returns the recorded size of the internal map. Note that this could be out 177 * of sync with the actual size of the map 178 * @return "recorded" size 179 */ 180 public int size() { 181 return mapSize.get(); 182 } 183 184 public HistoryFileInfo get(JobId key) { 185 return cache.get(key); 186 } 187 188 public NavigableSet<JobId> navigableKeySet() { 189 return cache.navigableKeySet(); 190 } 191 192 public Collection<HistoryFileInfo> values() { 193 return cache.values(); 194 } 195 } 196 197 static class JobListCache { 198 private JobIdHistoryFileInfoMap cache; 199 private int maxSize; 200 private long maxAge; 201 202 public JobListCache(int maxSize, long maxAge) { 203 this.maxSize = maxSize; 204 this.maxAge = maxAge; 205 this.cache = new JobIdHistoryFileInfoMap(); 206 } 207 208 public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { 209 JobId jobId = fileInfo.getJobId(); 210 if (LOG.isDebugEnabled()) { 211 LOG.debug("Adding " + jobId + " to job list cache with " 212 + fileInfo.getJobIndexInfo()); 213 } 214 HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo); 215 if (cache.size() > maxSize) { 216 //There is a race here, where more then one thread could be trying to 217 // remove entries. This could result in too many entries being removed 218 // from the cache. This is considered OK as the size of the cache 219 // should be rather large, and we would rather have performance over 220 // keeping the cache size exactly at the maximum. 221 Iterator<JobId> keys = cache.navigableKeySet().iterator(); 222 long cutoff = System.currentTimeMillis() - maxAge; 223 224 // MAPREDUCE-6436: In order to reduce the number of logs written 225 // in case of a lot of move pending histories. 226 JobId firstInIntermediateKey = null; 227 int inIntermediateCount = 0; 228 JobId firstMoveFailedKey = null; 229 int moveFailedCount = 0; 230 231 while(cache.size() > maxSize && keys.hasNext()) { 232 JobId key = keys.next(); 233 HistoryFileInfo firstValue = cache.get(key); 234 if(firstValue != null) { 235 synchronized(firstValue) { 236 if (firstValue.isMovePending()) { 237 if(firstValue.didMoveFail() && 238 firstValue.jobIndexInfo.getFinishTime() <= cutoff) { 239 cache.remove(key); 240 //Now lets try to delete it 241 try { 242 firstValue.delete(); 243 } catch (IOException e) { 244 LOG.error("Error while trying to delete history files" + 245 " that could not be moved to done.", e); 246 } 247 } else { 248 if (firstValue.didMoveFail()) { 249 if (moveFailedCount == 0) { 250 firstMoveFailedKey = key; 251 } 252 moveFailedCount += 1; 253 } else { 254 if (inIntermediateCount == 0) { 255 firstInIntermediateKey = key; 256 } 257 inIntermediateCount += 1; 258 } 259 } 260 } else { 261 cache.remove(key); 262 } 263 } 264 } 265 } 266 // Log output only for first jobhisotry in pendings to restrict 267 // the total number of logs. 268 if (inIntermediateCount > 0) { 269 LOG.warn("Waiting to remove IN_INTERMEDIATE state histories " + 270 "(e.g. " + firstInIntermediateKey + ") from JobListCache " + 271 "because it is not in done yet. Total count is " + 272 inIntermediateCount + "."); 273 } 274 if (moveFailedCount > 0) { 275 LOG.warn("Waiting to remove MOVE_FAILED state histories " + 276 "(e.g. " + firstMoveFailedKey + ") from JobListCache " + 277 "because it is not in done yet. Total count is " + 278 moveFailedCount + "."); 279 } 280 } 281 return old; 282 } 283 284 public void delete(HistoryFileInfo fileInfo) { 285 if (LOG.isDebugEnabled()) { 286 LOG.debug("Removing from cache " + fileInfo); 287 } 288 cache.remove(fileInfo.getJobId()); 289 } 290 291 public Collection<HistoryFileInfo> values() { 292 return new ArrayList<HistoryFileInfo>(cache.values()); 293 } 294 295 public HistoryFileInfo get(JobId jobId) { 296 return cache.get(jobId); 297 } 298 299 public boolean isFull() { 300 return cache.size() >= maxSize; 301 } 302 } 303 304 /** 305 * This class represents a user dir in the intermediate done directory. This 306 * is mostly for locking purposes. 307 */ 308 private class UserLogDir { 309 long modTime = 0; 310 311 public synchronized void scanIfNeeded(FileStatus fs) { 312 long newModTime = fs.getModificationTime(); 313 if (modTime != newModTime) { 314 Path p = fs.getPath(); 315 try { 316 scanIntermediateDirectory(p); 317 //If scanning fails, we will scan again. We assume the failure is 318 // temporary. 319 modTime = newModTime; 320 } catch (IOException e) { 321 LOG.error("Error while trying to scan the directory " + p, e); 322 } 323 } else { 324 if (LOG.isDebugEnabled()) { 325 LOG.debug("Scan not needed of " + fs.getPath()); 326 } 327 } 328 } 329 } 330 331 public class HistoryFileInfo { 332 private Path historyFile; 333 private Path confFile; 334 private Path summaryFile; 335 private JobIndexInfo jobIndexInfo; 336 private HistoryInfoState state; 337 338 @VisibleForTesting 339 protected HistoryFileInfo(Path historyFile, Path confFile, 340 Path summaryFile, JobIndexInfo jobIndexInfo, boolean isInDone) { 341 this.historyFile = historyFile; 342 this.confFile = confFile; 343 this.summaryFile = summaryFile; 344 this.jobIndexInfo = jobIndexInfo; 345 state = isInDone ? HistoryInfoState.IN_DONE 346 : HistoryInfoState.IN_INTERMEDIATE; 347 } 348 349 @VisibleForTesting 350 synchronized boolean isMovePending() { 351 return state == HistoryInfoState.IN_INTERMEDIATE 352 || state == HistoryInfoState.MOVE_FAILED; 353 } 354 355 @VisibleForTesting 356 synchronized boolean didMoveFail() { 357 return state == HistoryInfoState.MOVE_FAILED; 358 } 359 360 /** 361 * @return true if the files backed by this were deleted. 362 */ 363 public synchronized boolean isDeleted() { 364 return state == HistoryInfoState.DELETED; 365 } 366 367 @Override 368 public String toString() { 369 return "HistoryFileInfo jobID " + getJobId() 370 + " historyFile = " + historyFile; 371 } 372 373 @VisibleForTesting 374 synchronized void moveToDone() throws IOException { 375 if (LOG.isDebugEnabled()) { 376 LOG.debug("moveToDone: " + historyFile); 377 } 378 if (!isMovePending()) { 379 // It was either deleted or is already in done. Either way do nothing 380 if (LOG.isDebugEnabled()) { 381 LOG.debug("Move no longer pending"); 382 } 383 return; 384 } 385 try { 386 long completeTime = jobIndexInfo.getFinishTime(); 387 if (completeTime == 0) { 388 completeTime = System.currentTimeMillis(); 389 } 390 JobId jobId = jobIndexInfo.getJobId(); 391 392 List<Path> paths = new ArrayList<Path>(2); 393 if (historyFile == null) { 394 LOG.info("No file for job-history with " + jobId + " found in cache!"); 395 } else { 396 paths.add(historyFile); 397 } 398 399 if (confFile == null) { 400 LOG.info("No file for jobConf with " + jobId + " found in cache!"); 401 } else { 402 paths.add(confFile); 403 } 404 405 if (summaryFile == null || !intermediateDoneDirFc.util().exists( 406 summaryFile)) { 407 LOG.info("No summary file for job: " + jobId); 408 } else { 409 String jobSummaryString = getJobSummary(intermediateDoneDirFc, 410 summaryFile); 411 SUMMARY_LOG.info(jobSummaryString); 412 LOG.info("Deleting JobSummary file: [" + summaryFile + "]"); 413 intermediateDoneDirFc.delete(summaryFile, false); 414 summaryFile = null; 415 } 416 417 Path targetDir = canonicalHistoryLogPath(jobId, completeTime); 418 addDirectoryToSerialNumberIndex(targetDir); 419 makeDoneSubdir(targetDir); 420 if (historyFile != null) { 421 Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile 422 .getName())); 423 if (!toPath.equals(historyFile)) { 424 moveToDoneNow(historyFile, toPath); 425 historyFile = toPath; 426 } 427 } 428 if (confFile != null) { 429 Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile 430 .getName())); 431 if (!toPath.equals(confFile)) { 432 moveToDoneNow(confFile, toPath); 433 confFile = toPath; 434 } 435 } 436 state = HistoryInfoState.IN_DONE; 437 } catch (Throwable t) { 438 LOG.error("Error while trying to move a job to done", t); 439 this.state = HistoryInfoState.MOVE_FAILED; 440 } 441 } 442 443 /** 444 * Parse a job from the JobHistoryFile, if the underlying file is not going 445 * to be deleted. 446 * 447 * @return the Job or null if the underlying file was deleted. 448 * @throws IOException 449 * if there is an error trying to read the file. 450 */ 451 public synchronized Job loadJob() throws IOException { 452 return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile, 453 false, jobIndexInfo.getUser(), this, aclsMgr); 454 } 455 456 /** 457 * Return the history file. 458 * @return the history file. 459 */ 460 public synchronized Path getHistoryFile() { 461 return historyFile; 462 } 463 464 protected synchronized void delete() throws IOException { 465 if (LOG.isDebugEnabled()) { 466 LOG.debug("deleting " + historyFile + " and " + confFile); 467 } 468 state = HistoryInfoState.DELETED; 469 doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); 470 doneDirFc.delete(doneDirFc.makeQualified(confFile), false); 471 } 472 473 public JobIndexInfo getJobIndexInfo() { 474 return jobIndexInfo; 475 } 476 477 public JobId getJobId() { 478 return jobIndexInfo.getJobId(); 479 } 480 481 public synchronized Path getConfFile() { 482 return confFile; 483 } 484 485 public synchronized Configuration loadConfFile() throws IOException { 486 FileContext fc = FileContext.getFileContext(confFile.toUri(), conf); 487 Configuration jobConf = new Configuration(false); 488 jobConf.addResource(fc.open(confFile), confFile.toString()); 489 return jobConf; 490 } 491 } 492 493 private SerialNumberIndex serialNumberIndex = null; 494 protected JobListCache jobListCache = null; 495 496 // Maintains a list of known done subdirectories. 497 private final Set<Path> existingDoneSubdirs = Collections 498 .synchronizedSet(new HashSet<Path>()); 499 500 /** 501 * Maintains a mapping between intermediate user directories and the last 502 * known modification time. 503 */ 504 private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 505 new ConcurrentHashMap<String, UserLogDir>(); 506 507 private JobACLsManager aclsMgr; 508 509 @VisibleForTesting 510 Configuration conf; 511 512 private String serialNumberFormat; 513 514 private Path doneDirPrefixPath = null; // folder for completed jobs 515 private FileContext doneDirFc; // done Dir FileContext 516 517 private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path 518 private FileContext intermediateDoneDirFc; // Intermediate Done Dir 519 // FileContext 520 @VisibleForTesting 521 protected ThreadPoolExecutor moveToDoneExecutor = null; 522 private long maxHistoryAge = 0; 523 524 public HistoryFileManager() { 525 super(HistoryFileManager.class.getName()); 526 } 527 528 @Override 529 protected void serviceInit(Configuration conf) throws Exception { 530 this.conf = conf; 531 532 int serialNumberLowDigits = 3; 533 serialNumberFormat = ("%0" 534 + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) 535 + "d"); 536 537 long maxFSWaitTime = conf.getLong( 538 JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME, 539 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); 540 createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime); 541 542 this.aclsMgr = new JobACLsManager(conf); 543 544 maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 545 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE); 546 547 jobListCache = createJobListCache(); 548 549 serialNumberIndex = new SerialNumberIndex(conf.getInt( 550 JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, 551 JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE)); 552 553 int numMoveThreads = conf.getInt( 554 JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, 555 JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); 556 ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( 557 "MoveIntermediateToDone Thread #%d").build(); 558 moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads, 559 numMoveThreads, 1, TimeUnit.HOURS, 560 new LinkedBlockingQueue<Runnable>(), tf); 561 562 super.serviceInit(conf); 563 } 564 565 @VisibleForTesting 566 void createHistoryDirs(Clock clock, long intervalCheckMillis, 567 long timeOutMillis) throws IOException { 568 long start = clock.getTime(); 569 boolean done = false; 570 int counter = 0; 571 while (!done && 572 ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) { 573 done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec 574 try { 575 Thread.sleep(intervalCheckMillis); 576 } catch (InterruptedException ex) { 577 throw new YarnRuntimeException(ex); 578 } 579 } 580 if (!done) { 581 throw new YarnRuntimeException("Timed out '" + timeOutMillis+ 582 "ms' waiting for FileSystem to become available"); 583 } 584 } 585 586 /** 587 * DistributedFileSystem returns a RemoteException with a message stating 588 * SafeModeException in it. So this is only way to check it is because of 589 * being in safe mode. 590 */ 591 private boolean isBecauseSafeMode(Throwable ex) { 592 return ex.toString().contains("SafeModeException"); 593 } 594 595 /** 596 * Returns TRUE if the history dirs were created, FALSE if they could not 597 * be created because the FileSystem is not reachable or in safe mode and 598 * throws and exception otherwise. 599 */ 600 @VisibleForTesting 601 boolean tryCreatingHistoryDirs(boolean logWait) throws IOException { 602 boolean succeeded = true; 603 String doneDirPrefix = JobHistoryUtils. 604 getConfiguredHistoryServerDoneDirPrefix(conf); 605 try { 606 doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( 607 new Path(doneDirPrefix)); 608 doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); 609 doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK); 610 mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( 611 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); 612 } catch (ConnectException ex) { 613 if (logWait) { 614 LOG.info("Waiting for FileSystem at " + 615 doneDirPrefixPath.toUri().getAuthority() + "to be available"); 616 } 617 succeeded = false; 618 } catch (IOException e) { 619 if (isBecauseSafeMode(e)) { 620 succeeded = false; 621 if (logWait) { 622 LOG.info("Waiting for FileSystem at " + 623 doneDirPrefixPath.toUri().getAuthority() + 624 "to be out of safe mode"); 625 } 626 } else { 627 throw new YarnRuntimeException("Error creating done directory: [" 628 + doneDirPrefixPath + "]", e); 629 } 630 } 631 if (succeeded) { 632 String intermediateDoneDirPrefix = JobHistoryUtils. 633 getConfiguredHistoryIntermediateDoneDirPrefix(conf); 634 try { 635 intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified( 636 new Path(intermediateDoneDirPrefix)); 637 intermediateDoneDirFc = FileContext.getFileContext( 638 intermediateDoneDirPath.toUri(), conf); 639 mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission( 640 JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort())); 641 } catch (ConnectException ex) { 642 succeeded = false; 643 if (logWait) { 644 LOG.info("Waiting for FileSystem at " + 645 intermediateDoneDirPath.toUri().getAuthority() + 646 "to be available"); 647 } 648 } catch (IOException e) { 649 if (isBecauseSafeMode(e)) { 650 succeeded = false; 651 if (logWait) { 652 LOG.info("Waiting for FileSystem at " + 653 intermediateDoneDirPath.toUri().getAuthority() + 654 "to be out of safe mode"); 655 } 656 } else { 657 throw new YarnRuntimeException( 658 "Error creating intermediate done directory: [" 659 + intermediateDoneDirPath + "]", e); 660 } 661 } 662 } 663 return succeeded; 664 } 665 666 @Override 667 public void serviceStop() throws Exception { 668 ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor); 669 super.serviceStop(); 670 } 671 672 protected JobListCache createJobListCache() { 673 return new JobListCache(conf.getInt( 674 JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, 675 JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge); 676 } 677 678 private void mkdir(FileContext fc, Path path, FsPermission fsp) 679 throws IOException { 680 if (!fc.util().exists(path)) { 681 try { 682 fc.mkdir(path, fsp, true); 683 684 FileStatus fsStatus = fc.getFileStatus(path); 685 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 686 + ", Expected: " + fsp.toShort()); 687 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 688 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 689 + ", " + fsp); 690 fc.setPermission(path, fsp); 691 } 692 } catch (FileAlreadyExistsException e) { 693 LOG.info("Directory: [" + path + "] already exists."); 694 } 695 } 696 } 697 698 /** 699 * Populates index data structures. Should only be called at initialization 700 * times. 701 */ 702 @SuppressWarnings("unchecked") 703 void initExisting() throws IOException { 704 LOG.info("Initializing Existing Jobs..."); 705 List<FileStatus> timestampedDirList = findTimestampedDirectories(); 706 // Sort first just so insertion is in a consistent order 707 Collections.sort(timestampedDirList); 708 for (FileStatus fs : timestampedDirList) { 709 // TODO Could verify the correct format for these directories. 710 addDirectoryToSerialNumberIndex(fs.getPath()); 711 } 712 for (int i= timestampedDirList.size() - 1; 713 i >= 0 && !jobListCache.isFull(); i--) { 714 FileStatus fs = timestampedDirList.get(i); 715 addDirectoryToJobListCache(fs.getPath()); 716 } 717 } 718 719 private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) { 720 String serialPart = serialDirPath.getName(); 721 String timeStampPart = JobHistoryUtils 722 .getTimestampPartFromPath(serialDirPath.toString()); 723 if (timeStampPart == null) { 724 LOG.warn("Could not find timestamp portion from path: " 725 + serialDirPath.toString() + ". Continuing with next"); 726 return; 727 } 728 if (serialPart == null) { 729 LOG.warn("Could not find serial portion from path: " 730 + serialDirPath.toString() + ". Continuing with next"); 731 return; 732 } 733 serialNumberIndex.remove(serialPart, timeStampPart); 734 } 735 736 private void addDirectoryToSerialNumberIndex(Path serialDirPath) { 737 if (LOG.isDebugEnabled()) { 738 LOG.debug("Adding " + serialDirPath + " to serial index"); 739 } 740 String serialPart = serialDirPath.getName(); 741 String timestampPart = JobHistoryUtils 742 .getTimestampPartFromPath(serialDirPath.toString()); 743 if (timestampPart == null) { 744 LOG.warn("Could not find timestamp portion from path: " + serialDirPath 745 + ". Continuing with next"); 746 return; 747 } 748 if (serialPart == null) { 749 LOG.warn("Could not find serial portion from path: " 750 + serialDirPath.toString() + ". Continuing with next"); 751 } else { 752 serialNumberIndex.add(serialPart, timestampPart); 753 } 754 } 755 756 private void addDirectoryToJobListCache(Path path) throws IOException { 757 if (LOG.isDebugEnabled()) { 758 LOG.debug("Adding " + path + " to job list cache."); 759 } 760 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path, 761 doneDirFc); 762 for (FileStatus fs : historyFileList) { 763 if (LOG.isDebugEnabled()) { 764 LOG.debug("Adding in history for " + fs.getPath()); 765 } 766 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 767 .getName()); 768 String confFileName = JobHistoryUtils 769 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 770 String summaryFileName = JobHistoryUtils 771 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 772 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs 773 .getPath().getParent(), confFileName), new Path(fs.getPath() 774 .getParent(), summaryFileName), jobIndexInfo, true); 775 jobListCache.addIfAbsent(fileInfo); 776 } 777 } 778 779 @VisibleForTesting 780 protected static List<FileStatus> scanDirectory(Path path, FileContext fc, 781 PathFilter pathFilter) throws IOException { 782 path = fc.makeQualified(path); 783 List<FileStatus> jhStatusList = new ArrayList<FileStatus>(); 784 try { 785 RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path); 786 while (fileStatusIter.hasNext()) { 787 FileStatus fileStatus = fileStatusIter.next(); 788 Path filePath = fileStatus.getPath(); 789 if (fileStatus.isFile() && pathFilter.accept(filePath)) { 790 jhStatusList.add(fileStatus); 791 } 792 } 793 } catch (FileNotFoundException fe) { 794 LOG.error("Error while scanning directory " + path, fe); 795 } 796 return jhStatusList; 797 } 798 799 protected List<FileStatus> scanDirectoryForHistoryFiles(Path path, 800 FileContext fc) throws IOException { 801 return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter()); 802 } 803 804 /** 805 * Finds all history directories with a timestamp component by scanning the 806 * filesystem. Used when the JobHistory server is started. 807 * 808 * @return list of history directories 809 */ 810 protected List<FileStatus> findTimestampedDirectories() throws IOException { 811 List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, 812 doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL); 813 return fsList; 814 } 815 816 /** 817 * Scans the intermediate directory to find user directories. Scans these for 818 * history files if the modification time for the directory has changed. Once 819 * it finds history files it starts the process of moving them to the done 820 * directory. 821 * 822 * @throws IOException 823 * if there was a error while scanning 824 */ 825 void scanIntermediateDirectory() throws IOException { 826 // TODO it would be great to limit how often this happens, except in the 827 // case where we are looking for a particular job. 828 List<FileStatus> userDirList = JobHistoryUtils.localGlobber( 829 intermediateDoneDirFc, intermediateDoneDirPath, ""); 830 LOG.debug("Scanning intermediate dirs"); 831 for (FileStatus userDir : userDirList) { 832 String name = userDir.getPath().getName(); 833 UserLogDir dir = userDirModificationTimeMap.get(name); 834 if(dir == null) { 835 dir = new UserLogDir(); 836 UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir); 837 if(old != null) { 838 dir = old; 839 } 840 } 841 dir.scanIfNeeded(userDir); 842 } 843 } 844 845 /** 846 * Scans the specified path and populates the intermediate cache. 847 * 848 * @param absPath 849 * @throws IOException 850 */ 851 private void scanIntermediateDirectory(final Path absPath) throws IOException { 852 if (LOG.isDebugEnabled()) { 853 LOG.debug("Scanning intermediate dir " + absPath); 854 } 855 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath, 856 intermediateDoneDirFc); 857 if (LOG.isDebugEnabled()) { 858 LOG.debug("Found " + fileStatusList.size() + " files"); 859 } 860 for (FileStatus fs : fileStatusList) { 861 if (LOG.isDebugEnabled()) { 862 LOG.debug("scanning file: "+ fs.getPath()); 863 } 864 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 865 .getName()); 866 String confFileName = JobHistoryUtils 867 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 868 String summaryFileName = JobHistoryUtils 869 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 870 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs 871 .getPath().getParent(), confFileName), new Path(fs.getPath() 872 .getParent(), summaryFileName), jobIndexInfo, false); 873 874 final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo); 875 if (old == null || old.didMoveFail()) { 876 final HistoryFileInfo found = (old == null) ? fileInfo : old; 877 long cutoff = System.currentTimeMillis() - maxHistoryAge; 878 if(found.getJobIndexInfo().getFinishTime() <= cutoff) { 879 try { 880 found.delete(); 881 } catch (IOException e) { 882 LOG.warn("Error cleaning up a HistoryFile that is out of date.", e); 883 } 884 } else { 885 if (LOG.isDebugEnabled()) { 886 LOG.debug("Scheduling move to done of " +found); 887 } 888 moveToDoneExecutor.execute(new Runnable() { 889 @Override 890 public void run() { 891 try { 892 found.moveToDone(); 893 } catch (IOException e) { 894 LOG.info("Failed to process fileInfo for job: " + 895 found.getJobId(), e); 896 } 897 } 898 }); 899 } 900 } else if (!old.isMovePending()) { 901 //This is a duplicate so just delete it 902 if (LOG.isDebugEnabled()) { 903 LOG.debug("Duplicate: deleting"); 904 } 905 fileInfo.delete(); 906 } 907 } 908 } 909 910 /** 911 * Searches the job history file FileStatus list for the specified JobId. 912 * 913 * @param fileStatusList 914 * fileStatus list of Job History Files. 915 * @param jobId 916 * The JobId to find. 917 * @return A FileInfo object for the jobId, null if not found. 918 * @throws IOException 919 */ 920 private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList, 921 JobId jobId) throws IOException { 922 for (FileStatus fs : fileStatusList) { 923 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 924 .getName()); 925 if (jobIndexInfo.getJobId().equals(jobId)) { 926 String confFileName = JobHistoryUtils 927 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 928 String summaryFileName = JobHistoryUtils 929 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 930 HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path( 931 fs.getPath().getParent(), confFileName), new Path(fs.getPath() 932 .getParent(), summaryFileName), jobIndexInfo, true); 933 return fileInfo; 934 } 935 } 936 return null; 937 } 938 939 /** 940 * Scans old directories known by the idToDateString map for the specified 941 * jobId. If the number of directories is higher than the supported size of 942 * the idToDateString cache, the jobId will not be found. 943 * 944 * @param jobId 945 * the jobId. 946 * @return 947 * @throws IOException 948 */ 949 private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { 950 String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent( 951 jobId, serialNumberFormat); 952 Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber); 953 if (dateStringSet == null) { 954 return null; 955 } 956 for (String timestampPart : dateStringSet) { 957 Path logDir = canonicalHistoryLogPath(jobId, timestampPart); 958 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, 959 doneDirFc); 960 HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId); 961 if (fileInfo != null) { 962 return fileInfo; 963 } 964 } 965 return null; 966 } 967 968 public Collection<HistoryFileInfo> getAllFileInfo() throws IOException { 969 scanIntermediateDirectory(); 970 return jobListCache.values(); 971 } 972 973 public HistoryFileInfo getFileInfo(JobId jobId) throws IOException { 974 // FileInfo available in cache. 975 HistoryFileInfo fileInfo = jobListCache.get(jobId); 976 if (fileInfo != null) { 977 return fileInfo; 978 } 979 // OK so scan the intermediate to be sure we did not lose it that way 980 scanIntermediateDirectory(); 981 fileInfo = jobListCache.get(jobId); 982 if (fileInfo != null) { 983 return fileInfo; 984 } 985 986 // Intermediate directory does not contain job. Search through older ones. 987 fileInfo = scanOldDirsForJob(jobId); 988 if (fileInfo != null) { 989 return fileInfo; 990 } 991 return null; 992 } 993 994 private void moveToDoneNow(final Path src, final Path target) 995 throws IOException { 996 LOG.info("Moving " + src.toString() + " to " + target.toString()); 997 intermediateDoneDirFc.rename(src, target, Options.Rename.NONE); 998 } 999 1000 private String getJobSummary(FileContext fc, Path path) throws IOException { 1001 Path qPath = fc.makeQualified(path); 1002 FSDataInputStream in = null; 1003 String jobSummaryString = null; 1004 try { 1005 in = fc.open(qPath); 1006 jobSummaryString = in.readUTF(); 1007 } finally { 1008 if (in != null) { 1009 in.close(); 1010 } 1011 } 1012 return jobSummaryString; 1013 } 1014 1015 private void makeDoneSubdir(Path path) throws IOException { 1016 try { 1017 doneDirFc.getFileStatus(path); 1018 existingDoneSubdirs.add(path); 1019 } catch (FileNotFoundException fnfE) { 1020 try { 1021 FsPermission fsp = new FsPermission( 1022 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION); 1023 doneDirFc.mkdir(path, fsp, true); 1024 FileStatus fsStatus = doneDirFc.getFileStatus(path); 1025 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 1026 + ", Expected: " + fsp.toShort()); 1027 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 1028 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 1029 + ", " + fsp); 1030 doneDirFc.setPermission(path, fsp); 1031 } 1032 existingDoneSubdirs.add(path); 1033 } catch (FileAlreadyExistsException faeE) { // Nothing to do. 1034 } 1035 } 1036 } 1037 1038 private Path canonicalHistoryLogPath(JobId id, String timestampComponent) { 1039 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 1040 id, timestampComponent, serialNumberFormat)); 1041 } 1042 1043 private Path canonicalHistoryLogPath(JobId id, long millisecondTime) { 1044 String timestampComponent = JobHistoryUtils 1045 .timestampDirectoryComponent(millisecondTime); 1046 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 1047 id, timestampComponent, serialNumberFormat)); 1048 } 1049 1050 private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) { 1051 if (finishTime == 0) { 1052 return fileStatus.getModificationTime(); 1053 } 1054 return finishTime; 1055 } 1056 1057 private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException { 1058 jobListCache.delete(fileInfo); 1059 fileInfo.delete(); 1060 } 1061 1062 List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException { 1063 return JobHistoryUtils. 1064 getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff); 1065 } 1066 1067 /** 1068 * Clean up older history files. 1069 * 1070 * @throws IOException 1071 * on any error trying to remove the entries. 1072 */ 1073 @SuppressWarnings("unchecked") 1074 void clean() throws IOException { 1075 long cutoff = System.currentTimeMillis() - maxHistoryAge; 1076 boolean halted = false; 1077 List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff); 1078 // Sort in ascending order. Relies on YYYY/MM/DD/Serial 1079 Collections.sort(serialDirList); 1080 for (FileStatus serialDir : serialDirList) { 1081 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles( 1082 serialDir.getPath(), doneDirFc); 1083 for (FileStatus historyFile : historyFileList) { 1084 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile 1085 .getPath().getName()); 1086 long effectiveTimestamp = getEffectiveTimestamp( 1087 jobIndexInfo.getFinishTime(), historyFile); 1088 if (effectiveTimestamp <= cutoff) { 1089 HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo 1090 .getJobId()); 1091 if (fileInfo == null) { 1092 String confFileName = JobHistoryUtils 1093 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 1094 1095 fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path( 1096 historyFile.getPath().getParent(), confFileName), null, 1097 jobIndexInfo, true); 1098 } 1099 deleteJobFromDone(fileInfo); 1100 } else { 1101 halted = true; 1102 break; 1103 } 1104 } 1105 if (!halted) { 1106 deleteDir(serialDir); 1107 removeDirectoryFromSerialNumberIndex(serialDir.getPath()); 1108 existingDoneSubdirs.remove(serialDir.getPath()); 1109 } else { 1110 break; // Don't scan any more directories. 1111 } 1112 } 1113 } 1114 1115 protected boolean deleteDir(FileStatus serialDir) 1116 throws AccessControlException, FileNotFoundException, 1117 UnsupportedFileSystemException, IOException { 1118 return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); 1119 } 1120 1121 // for test 1122 @VisibleForTesting 1123 void setMaxHistoryAge(long newValue){ 1124 maxHistoryAge=newValue; 1125 } 1126}