001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.mapreduce.v2.hs; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.ConnectException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.NavigableSet; 031import java.util.Set; 032import java.util.SortedMap; 033import java.util.TreeMap; 034import java.util.concurrent.ConcurrentHashMap; 035import java.util.concurrent.ConcurrentMap; 036import java.util.concurrent.ConcurrentSkipListMap; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.ThreadFactory; 039import java.util.concurrent.ThreadPoolExecutor; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicInteger; 042 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045import org.apache.hadoop.classification.InterfaceAudience; 046import org.apache.hadoop.classification.InterfaceStability; 047import org.apache.hadoop.conf.Configuration; 048import org.apache.hadoop.fs.FSDataInputStream; 049import org.apache.hadoop.fs.FileAlreadyExistsException; 050import org.apache.hadoop.fs.FileContext; 051import org.apache.hadoop.fs.FileStatus; 052import org.apache.hadoop.fs.Options; 053import org.apache.hadoop.fs.Path; 054import org.apache.hadoop.fs.PathFilter; 055import org.apache.hadoop.fs.RemoteIterator; 056import org.apache.hadoop.fs.UnsupportedFileSystemException; 057import org.apache.hadoop.fs.permission.FsPermission; 058import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; 059import org.apache.hadoop.hdfs.server.namenode.NameNode; 060import org.apache.hadoop.ipc.RetriableException; 061import org.apache.hadoop.mapred.JobACLsManager; 062import org.apache.hadoop.mapreduce.jobhistory.JobSummary; 063import org.apache.hadoop.mapreduce.v2.api.records.JobId; 064import org.apache.hadoop.mapreduce.v2.app.job.Job; 065import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; 066import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; 067import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; 068import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; 069import org.apache.hadoop.security.AccessControlException; 070import org.apache.hadoop.service.AbstractService; 071import org.apache.hadoop.util.ShutdownThreadsHelper; 072import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; 073import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 074 075import com.google.common.annotations.VisibleForTesting; 076import com.google.common.util.concurrent.ThreadFactoryBuilder; 077import org.apache.hadoop.yarn.util.Clock; 078import org.apache.hadoop.yarn.util.SystemClock; 079 080/** 081 * This class provides a way to interact with history files in a thread safe 082 * manor. 083 */ 084@InterfaceAudience.Public 085@InterfaceStability.Unstable 086public class HistoryFileManager extends AbstractService { 087 private static final Log LOG = LogFactory.getLog(HistoryFileManager.class); 088 private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); 089 090 private static enum HistoryInfoState { 091 IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED 092 }; 093 094 private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils 095 .doneSubdirsBeforeSerialTail(); 096 097 /** 098 * Maps between a serial number (generated based on jobId) and the timestamp 099 * component(s) to which it belongs. Facilitates jobId based searches. If a 100 * jobId is not found in this list - it will not be found. 101 */ 102 private static class SerialNumberIndex { 103 private SortedMap<String, Set<String>> cache; 104 private int maxSize; 105 106 public SerialNumberIndex(int maxSize) { 107 this.cache = new TreeMap<String, Set<String>>(); 108 this.maxSize = maxSize; 109 } 110 111 public synchronized void add(String serialPart, String timestampPart) { 112 if (!cache.containsKey(serialPart)) { 113 cache.put(serialPart, new HashSet<String>()); 114 if (cache.size() > maxSize) { 115 String key = cache.firstKey(); 116 LOG.error("Dropping " + key 117 + " from the SerialNumberIndex. We will no " 118 + "longer be able to see jobs that are in that serial index for " 119 + cache.get(key)); 120 cache.remove(key); 121 } 122 } 123 Set<String> datePartSet = cache.get(serialPart); 124 datePartSet.add(timestampPart); 125 } 126 127 public synchronized void remove(String serialPart, String timeStampPart) { 128 if (cache.containsKey(serialPart)) { 129 Set<String> set = cache.get(serialPart); 130 set.remove(timeStampPart); 131 if (set.isEmpty()) { 132 cache.remove(serialPart); 133 } 134 } 135 } 136 137 public synchronized Set<String> get(String serialPart) { 138 Set<String> found = cache.get(serialPart); 139 if (found != null) { 140 return new HashSet<String>(found); 141 } 142 return null; 143 } 144 } 145 146 /** 147 * Wrapper around {@link ConcurrentSkipListMap} that maintains size along 148 * side for O(1) size() implementation for use in JobListCache. 149 * 150 * Note: The size is not updated atomically with changes additions/removals. 151 * This race can lead to size() returning an incorrect size at times. 152 */ 153 static class JobIdHistoryFileInfoMap { 154 private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache; 155 private AtomicInteger mapSize; 156 157 JobIdHistoryFileInfoMap() { 158 cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>(); 159 mapSize = new AtomicInteger(); 160 } 161 162 public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) { 163 HistoryFileInfo ret = cache.putIfAbsent(key, value); 164 if (ret == null) { 165 mapSize.incrementAndGet(); 166 } 167 return ret; 168 } 169 170 public HistoryFileInfo remove(JobId key) { 171 HistoryFileInfo ret = cache.remove(key); 172 if (ret != null) { 173 mapSize.decrementAndGet(); 174 } 175 return ret; 176 } 177 178 /** 179 * Returns the recorded size of the internal map. Note that this could be out 180 * of sync with the actual size of the map 181 * @return "recorded" size 182 */ 183 public int size() { 184 return mapSize.get(); 185 } 186 187 public HistoryFileInfo get(JobId key) { 188 return cache.get(key); 189 } 190 191 public NavigableSet<JobId> navigableKeySet() { 192 return cache.navigableKeySet(); 193 } 194 195 public Collection<HistoryFileInfo> values() { 196 return cache.values(); 197 } 198 } 199 200 static class JobListCache { 201 private JobIdHistoryFileInfoMap cache; 202 private int maxSize; 203 private long maxAge; 204 205 public JobListCache(int maxSize, long maxAge) { 206 this.maxSize = maxSize; 207 this.maxAge = maxAge; 208 this.cache = new JobIdHistoryFileInfoMap(); 209 } 210 211 public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { 212 JobId jobId = fileInfo.getJobId(); 213 if (LOG.isDebugEnabled()) { 214 LOG.debug("Adding " + jobId + " to job list cache with " 215 + fileInfo.getJobIndexInfo()); 216 } 217 HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo); 218 if (cache.size() > maxSize) { 219 //There is a race here, where more then one thread could be trying to 220 // remove entries. This could result in too many entries being removed 221 // from the cache. This is considered OK as the size of the cache 222 // should be rather large, and we would rather have performance over 223 // keeping the cache size exactly at the maximum. 224 Iterator<JobId> keys = cache.navigableKeySet().iterator(); 225 long cutoff = System.currentTimeMillis() - maxAge; 226 227 // MAPREDUCE-6436: In order to reduce the number of logs written 228 // in case of a lot of move pending histories. 229 JobId firstInIntermediateKey = null; 230 int inIntermediateCount = 0; 231 JobId firstMoveFailedKey = null; 232 int moveFailedCount = 0; 233 234 while(cache.size() > maxSize && keys.hasNext()) { 235 JobId key = keys.next(); 236 HistoryFileInfo firstValue = cache.get(key); 237 if(firstValue != null) { 238 synchronized(firstValue) { 239 if (firstValue.isMovePending()) { 240 if(firstValue.didMoveFail() && 241 firstValue.jobIndexInfo.getFinishTime() <= cutoff) { 242 cache.remove(key); 243 //Now lets try to delete it 244 try { 245 firstValue.delete(); 246 } catch (IOException e) { 247 LOG.error("Error while trying to delete history files" + 248 " that could not be moved to done.", e); 249 } 250 } else { 251 if (firstValue.didMoveFail()) { 252 if (moveFailedCount == 0) { 253 firstMoveFailedKey = key; 254 } 255 moveFailedCount += 1; 256 } else { 257 if (inIntermediateCount == 0) { 258 firstInIntermediateKey = key; 259 } 260 inIntermediateCount += 1; 261 } 262 } 263 } else { 264 cache.remove(key); 265 } 266 } 267 } 268 } 269 // Log output only for first jobhisotry in pendings to restrict 270 // the total number of logs. 271 if (inIntermediateCount > 0) { 272 LOG.warn("Waiting to remove IN_INTERMEDIATE state histories " + 273 "(e.g. " + firstInIntermediateKey + ") from JobListCache " + 274 "because it is not in done yet. Total count is " + 275 inIntermediateCount + "."); 276 } 277 if (moveFailedCount > 0) { 278 LOG.warn("Waiting to remove MOVE_FAILED state histories " + 279 "(e.g. " + firstMoveFailedKey + ") from JobListCache " + 280 "because it is not in done yet. Total count is " + 281 moveFailedCount + "."); 282 } 283 } 284 return old; 285 } 286 287 public void delete(HistoryFileInfo fileInfo) { 288 if (LOG.isDebugEnabled()) { 289 LOG.debug("Removing from cache " + fileInfo); 290 } 291 cache.remove(fileInfo.getJobId()); 292 } 293 294 public Collection<HistoryFileInfo> values() { 295 return new ArrayList<HistoryFileInfo>(cache.values()); 296 } 297 298 public HistoryFileInfo get(JobId jobId) { 299 return cache.get(jobId); 300 } 301 302 public boolean isFull() { 303 return cache.size() >= maxSize; 304 } 305 } 306 307 /** 308 * This class represents a user dir in the intermediate done directory. This 309 * is mostly for locking purposes. 310 */ 311 private class UserLogDir { 312 long modTime = 0; 313 private long scanTime = 0; 314 315 public synchronized void scanIfNeeded(FileStatus fs) { 316 long newModTime = fs.getModificationTime(); 317 // MAPREDUCE-6680: In some Cloud FileSystem, like Azure FS or S3, file's 318 // modification time is truncated into seconds. In that case, 319 // modTime == newModTime doesn't means no file update in the directory, 320 // so we need to have additional check. 321 // Note: modTime (X second Y millisecond) could be casted to X second or 322 // X+1 second. 323 if (modTime != newModTime 324 || (scanTime/1000) == (modTime/1000) 325 || (scanTime/1000 + 1) == (modTime/1000)) { 326 // reset scanTime before scanning happens 327 scanTime = System.currentTimeMillis(); 328 Path p = fs.getPath(); 329 try { 330 scanIntermediateDirectory(p); 331 //If scanning fails, we will scan again. We assume the failure is 332 // temporary. 333 modTime = newModTime; 334 } catch (IOException e) { 335 LOG.error("Error while trying to scan the directory " + p, e); 336 } 337 } else { 338 if (LOG.isDebugEnabled()) { 339 LOG.debug("Scan not needed of " + fs.getPath()); 340 } 341 // reset scanTime 342 scanTime = System.currentTimeMillis(); 343 } 344 } 345 } 346 347 public class HistoryFileInfo { 348 private Path historyFile; 349 private Path confFile; 350 private Path summaryFile; 351 private JobIndexInfo jobIndexInfo; 352 private volatile HistoryInfoState state; 353 354 @VisibleForTesting 355 protected HistoryFileInfo(Path historyFile, Path confFile, 356 Path summaryFile, JobIndexInfo jobIndexInfo, boolean isInDone) { 357 this.historyFile = historyFile; 358 this.confFile = confFile; 359 this.summaryFile = summaryFile; 360 this.jobIndexInfo = jobIndexInfo; 361 state = isInDone ? HistoryInfoState.IN_DONE 362 : HistoryInfoState.IN_INTERMEDIATE; 363 } 364 365 @VisibleForTesting 366 boolean isMovePending() { 367 return state == HistoryInfoState.IN_INTERMEDIATE 368 || state == HistoryInfoState.MOVE_FAILED; 369 } 370 371 @VisibleForTesting 372 boolean didMoveFail() { 373 return state == HistoryInfoState.MOVE_FAILED; 374 } 375 376 /** 377 * @return true if the files backed by this were deleted. 378 */ 379 public boolean isDeleted() { 380 return state == HistoryInfoState.DELETED; 381 } 382 383 @Override 384 public String toString() { 385 return "HistoryFileInfo jobID " + getJobId() 386 + " historyFile = " + historyFile; 387 } 388 389 @VisibleForTesting 390 synchronized void moveToDone() throws IOException { 391 if (LOG.isDebugEnabled()) { 392 LOG.debug("moveToDone: " + historyFile); 393 } 394 if (!isMovePending()) { 395 // It was either deleted or is already in done. Either way do nothing 396 if (LOG.isDebugEnabled()) { 397 LOG.debug("Move no longer pending"); 398 } 399 return; 400 } 401 try { 402 long completeTime = jobIndexInfo.getFinishTime(); 403 if (completeTime == 0) { 404 completeTime = System.currentTimeMillis(); 405 } 406 JobId jobId = jobIndexInfo.getJobId(); 407 408 List<Path> paths = new ArrayList<Path>(2); 409 if (historyFile == null) { 410 LOG.info("No file for job-history with " + jobId + " found in cache!"); 411 } else { 412 paths.add(historyFile); 413 } 414 415 if (confFile == null) { 416 LOG.info("No file for jobConf with " + jobId + " found in cache!"); 417 } else { 418 paths.add(confFile); 419 } 420 421 if (summaryFile == null || !intermediateDoneDirFc.util().exists( 422 summaryFile)) { 423 LOG.info("No summary file for job: " + jobId); 424 } else { 425 String jobSummaryString = getJobSummary(intermediateDoneDirFc, 426 summaryFile); 427 SUMMARY_LOG.info(jobSummaryString); 428 LOG.info("Deleting JobSummary file: [" + summaryFile + "]"); 429 intermediateDoneDirFc.delete(summaryFile, false); 430 summaryFile = null; 431 } 432 433 Path targetDir = canonicalHistoryLogPath(jobId, completeTime); 434 addDirectoryToSerialNumberIndex(targetDir); 435 makeDoneSubdir(targetDir); 436 if (historyFile != null) { 437 Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile 438 .getName())); 439 if (!toPath.equals(historyFile)) { 440 moveToDoneNow(historyFile, toPath); 441 historyFile = toPath; 442 } 443 } 444 if (confFile != null) { 445 Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile 446 .getName())); 447 if (!toPath.equals(confFile)) { 448 moveToDoneNow(confFile, toPath); 449 confFile = toPath; 450 } 451 } 452 state = HistoryInfoState.IN_DONE; 453 } catch (Throwable t) { 454 LOG.error("Error while trying to move a job to done", t); 455 this.state = HistoryInfoState.MOVE_FAILED; 456 } 457 } 458 459 /** 460 * Parse a job from the JobHistoryFile, if the underlying file is not going 461 * to be deleted. 462 * 463 * @return the Job or null if the underlying file was deleted. 464 * @throws IOException 465 * if there is an error trying to read the file. 466 */ 467 public synchronized Job loadJob() throws IOException { 468 return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile, 469 false, jobIndexInfo.getUser(), this, aclsMgr); 470 } 471 472 /** 473 * Return the history file. 474 * @return the history file. 475 */ 476 public synchronized Path getHistoryFile() { 477 return historyFile; 478 } 479 480 protected synchronized void delete() throws IOException { 481 if (LOG.isDebugEnabled()) { 482 LOG.debug("deleting " + historyFile + " and " + confFile); 483 } 484 state = HistoryInfoState.DELETED; 485 doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); 486 doneDirFc.delete(doneDirFc.makeQualified(confFile), false); 487 } 488 489 public JobIndexInfo getJobIndexInfo() { 490 return jobIndexInfo; 491 } 492 493 public JobId getJobId() { 494 return jobIndexInfo.getJobId(); 495 } 496 497 public synchronized Path getConfFile() { 498 return confFile; 499 } 500 501 public synchronized Configuration loadConfFile() throws IOException { 502 FileContext fc = FileContext.getFileContext(confFile.toUri(), conf); 503 Configuration jobConf = new Configuration(false); 504 jobConf.addResource(fc.open(confFile), confFile.toString()); 505 return jobConf; 506 } 507 } 508 509 private SerialNumberIndex serialNumberIndex = null; 510 protected JobListCache jobListCache = null; 511 512 // Maintains a list of known done subdirectories. 513 private final Set<Path> existingDoneSubdirs = Collections 514 .synchronizedSet(new HashSet<Path>()); 515 516 /** 517 * Maintains a mapping between intermediate user directories and the last 518 * known modification time. 519 */ 520 private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = 521 new ConcurrentHashMap<String, UserLogDir>(); 522 523 private JobACLsManager aclsMgr; 524 525 @VisibleForTesting 526 Configuration conf; 527 528 private String serialNumberFormat; 529 530 private Path doneDirPrefixPath = null; // folder for completed jobs 531 private FileContext doneDirFc; // done Dir FileContext 532 533 private Path intermediateDoneDirPath = null; // Intermediate Done Dir Path 534 private FileContext intermediateDoneDirFc; // Intermediate Done Dir 535 // FileContext 536 @VisibleForTesting 537 protected ThreadPoolExecutor moveToDoneExecutor = null; 538 private long maxHistoryAge = 0; 539 540 public HistoryFileManager() { 541 super(HistoryFileManager.class.getName()); 542 } 543 544 @Override 545 protected void serviceInit(Configuration conf) throws Exception { 546 this.conf = conf; 547 548 int serialNumberLowDigits = 3; 549 serialNumberFormat = ("%0" 550 + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) 551 + "d"); 552 553 long maxFSWaitTime = conf.getLong( 554 JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME, 555 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); 556 createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime); 557 558 this.aclsMgr = new JobACLsManager(conf); 559 560 maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, 561 JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE); 562 563 jobListCache = createJobListCache(); 564 565 serialNumberIndex = new SerialNumberIndex(conf.getInt( 566 JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, 567 JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE)); 568 569 int numMoveThreads = conf.getInt( 570 JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, 571 JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); 572 moveToDoneExecutor = createMoveToDoneThreadPool(numMoveThreads); 573 super.serviceInit(conf); 574 } 575 576 protected ThreadPoolExecutor createMoveToDoneThreadPool(int numMoveThreads) { 577 ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( 578 "MoveIntermediateToDone Thread #%d").build(); 579 return new HadoopThreadPoolExecutor(numMoveThreads, numMoveThreads, 580 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf); 581 } 582 583 @VisibleForTesting 584 void createHistoryDirs(Clock clock, long intervalCheckMillis, 585 long timeOutMillis) throws IOException { 586 long start = clock.getTime(); 587 boolean done = false; 588 int counter = 0; 589 while (!done && 590 ((timeOutMillis == -1) || (clock.getTime() - start < timeOutMillis))) { 591 done = tryCreatingHistoryDirs(counter++ % 3 == 0); // log every 3 attempts, 30sec 592 try { 593 Thread.sleep(intervalCheckMillis); 594 } catch (InterruptedException ex) { 595 throw new YarnRuntimeException(ex); 596 } 597 } 598 if (!done) { 599 throw new YarnRuntimeException("Timed out '" + timeOutMillis+ 600 "ms' waiting for FileSystem to become available"); 601 } 602 } 603 604 /** 605 * Check if the NameNode is still not started yet as indicated by the 606 * exception type and message. 607 * DistributedFileSystem returns a RemoteException with a message stating 608 * SafeModeException in it. So this is only way to check it is because of 609 * being in safe mode. In addition, Name Node may have not started yet, in 610 * which case, the message contains "NameNode still not started". 611 */ 612 private boolean isNameNodeStillNotStarted(Exception ex) { 613 String nameNodeNotStartedMsg = NameNode.composeNotStartedMessage( 614 HdfsServerConstants.NamenodeRole.NAMENODE); 615 return ex.toString().contains("SafeModeException") || 616 (ex instanceof RetriableException && ex.getMessage().contains( 617 nameNodeNotStartedMsg)); 618 } 619 620 /** 621 * Returns TRUE if the history dirs were created, FALSE if they could not 622 * be created because the FileSystem is not reachable or in safe mode and 623 * throws and exception otherwise. 624 */ 625 @VisibleForTesting 626 boolean tryCreatingHistoryDirs(boolean logWait) throws IOException { 627 boolean succeeded = true; 628 String doneDirPrefix = JobHistoryUtils. 629 getConfiguredHistoryServerDoneDirPrefix(conf); 630 try { 631 doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( 632 new Path(doneDirPrefix)); 633 doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); 634 doneDirFc.setUMask(JobHistoryUtils.HISTORY_DONE_DIR_UMASK); 635 mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( 636 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); 637 } catch (ConnectException ex) { 638 if (logWait) { 639 LOG.info("Waiting for FileSystem at " + 640 doneDirPrefixPath.toUri().getAuthority() + "to be available"); 641 } 642 succeeded = false; 643 } catch (IOException e) { 644 if (isNameNodeStillNotStarted(e)) { 645 succeeded = false; 646 if (logWait) { 647 LOG.info("Waiting for FileSystem at " + 648 doneDirPrefixPath.toUri().getAuthority() + 649 "to be out of safe mode"); 650 } 651 } else { 652 throw new YarnRuntimeException("Error creating done directory: [" 653 + doneDirPrefixPath + "]", e); 654 } 655 } 656 if (succeeded) { 657 String intermediateDoneDirPrefix = JobHistoryUtils. 658 getConfiguredHistoryIntermediateDoneDirPrefix(conf); 659 try { 660 intermediateDoneDirPath = FileContext.getFileContext(conf).makeQualified( 661 new Path(intermediateDoneDirPrefix)); 662 intermediateDoneDirFc = FileContext.getFileContext( 663 intermediateDoneDirPath.toUri(), conf); 664 mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission( 665 JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort())); 666 } catch (ConnectException ex) { 667 succeeded = false; 668 if (logWait) { 669 LOG.info("Waiting for FileSystem at " + 670 intermediateDoneDirPath.toUri().getAuthority() + 671 "to be available"); 672 } 673 } catch (IOException e) { 674 if (isNameNodeStillNotStarted(e)) { 675 succeeded = false; 676 if (logWait) { 677 LOG.info("Waiting for FileSystem at " + 678 intermediateDoneDirPath.toUri().getAuthority() + 679 "to be out of safe mode"); 680 } 681 } else { 682 throw new YarnRuntimeException( 683 "Error creating intermediate done directory: [" 684 + intermediateDoneDirPath + "]", e); 685 } 686 } 687 } 688 return succeeded; 689 } 690 691 @Override 692 public void serviceStop() throws Exception { 693 ShutdownThreadsHelper.shutdownExecutorService(moveToDoneExecutor); 694 super.serviceStop(); 695 } 696 697 protected JobListCache createJobListCache() { 698 return new JobListCache(conf.getInt( 699 JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, 700 JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), maxHistoryAge); 701 } 702 703 private void mkdir(FileContext fc, Path path, FsPermission fsp) 704 throws IOException { 705 if (!fc.util().exists(path)) { 706 try { 707 fc.mkdir(path, fsp, true); 708 709 FileStatus fsStatus = fc.getFileStatus(path); 710 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 711 + ", Expected: " + fsp.toShort()); 712 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 713 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 714 + ", " + fsp); 715 fc.setPermission(path, fsp); 716 } 717 } catch (FileAlreadyExistsException e) { 718 LOG.info("Directory: [" + path + "] already exists."); 719 } 720 } 721 } 722 723 protected HistoryFileInfo createHistoryFileInfo(Path historyFile, 724 Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo, 725 boolean isInDone) { 726 return new HistoryFileInfo( 727 historyFile, confFile, summaryFile, jobIndexInfo, isInDone); 728 } 729 730 /** 731 * Populates index data structures. Should only be called at initialization 732 * times. 733 */ 734 @SuppressWarnings("unchecked") 735 void initExisting() throws IOException { 736 LOG.info("Initializing Existing Jobs..."); 737 List<FileStatus> timestampedDirList = findTimestampedDirectories(); 738 // Sort first just so insertion is in a consistent order 739 Collections.sort(timestampedDirList); 740 for (FileStatus fs : timestampedDirList) { 741 // TODO Could verify the correct format for these directories. 742 addDirectoryToSerialNumberIndex(fs.getPath()); 743 } 744 for (int i= timestampedDirList.size() - 1; 745 i >= 0 && !jobListCache.isFull(); i--) { 746 FileStatus fs = timestampedDirList.get(i); 747 addDirectoryToJobListCache(fs.getPath()); 748 } 749 } 750 751 private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) { 752 String serialPart = serialDirPath.getName(); 753 String timeStampPart = JobHistoryUtils 754 .getTimestampPartFromPath(serialDirPath.toString()); 755 if (timeStampPart == null) { 756 LOG.warn("Could not find timestamp portion from path: " 757 + serialDirPath.toString() + ". Continuing with next"); 758 return; 759 } 760 if (serialPart == null) { 761 LOG.warn("Could not find serial portion from path: " 762 + serialDirPath.toString() + ". Continuing with next"); 763 return; 764 } 765 serialNumberIndex.remove(serialPart, timeStampPart); 766 } 767 768 private void addDirectoryToSerialNumberIndex(Path serialDirPath) { 769 if (LOG.isDebugEnabled()) { 770 LOG.debug("Adding " + serialDirPath + " to serial index"); 771 } 772 String serialPart = serialDirPath.getName(); 773 String timestampPart = JobHistoryUtils 774 .getTimestampPartFromPath(serialDirPath.toString()); 775 if (timestampPart == null) { 776 LOG.warn("Could not find timestamp portion from path: " + serialDirPath 777 + ". Continuing with next"); 778 return; 779 } 780 if (serialPart == null) { 781 LOG.warn("Could not find serial portion from path: " 782 + serialDirPath.toString() + ". Continuing with next"); 783 } else { 784 serialNumberIndex.add(serialPart, timestampPart); 785 } 786 } 787 788 private void addDirectoryToJobListCache(Path path) throws IOException { 789 if (LOG.isDebugEnabled()) { 790 LOG.debug("Adding " + path + " to job list cache."); 791 } 792 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path, 793 doneDirFc); 794 for (FileStatus fs : historyFileList) { 795 if (LOG.isDebugEnabled()) { 796 LOG.debug("Adding in history for " + fs.getPath()); 797 } 798 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 799 .getName()); 800 String confFileName = JobHistoryUtils 801 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 802 String summaryFileName = JobHistoryUtils 803 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 804 HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs 805 .getPath().getParent(), confFileName), new Path(fs.getPath() 806 .getParent(), summaryFileName), jobIndexInfo, true); 807 jobListCache.addIfAbsent(fileInfo); 808 } 809 } 810 811 @VisibleForTesting 812 protected static List<FileStatus> scanDirectory(Path path, FileContext fc, 813 PathFilter pathFilter) throws IOException { 814 path = fc.makeQualified(path); 815 List<FileStatus> jhStatusList = new ArrayList<FileStatus>(); 816 try { 817 RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path); 818 while (fileStatusIter.hasNext()) { 819 FileStatus fileStatus = fileStatusIter.next(); 820 Path filePath = fileStatus.getPath(); 821 if (fileStatus.isFile() && pathFilter.accept(filePath)) { 822 jhStatusList.add(fileStatus); 823 } 824 } 825 } catch (FileNotFoundException fe) { 826 LOG.error("Error while scanning directory " + path, fe); 827 } 828 return jhStatusList; 829 } 830 831 protected List<FileStatus> scanDirectoryForHistoryFiles(Path path, 832 FileContext fc) throws IOException { 833 return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter()); 834 } 835 836 /** 837 * Finds all history directories with a timestamp component by scanning the 838 * filesystem. Used when the JobHistory server is started. 839 * 840 * @return list of history directories 841 */ 842 protected List<FileStatus> findTimestampedDirectories() throws IOException { 843 List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, 844 doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL); 845 return fsList; 846 } 847 848 /** 849 * Scans the intermediate directory to find user directories. Scans these for 850 * history files if the modification time for the directory has changed. Once 851 * it finds history files it starts the process of moving them to the done 852 * directory. 853 * 854 * @throws IOException 855 * if there was a error while scanning 856 */ 857 void scanIntermediateDirectory() throws IOException { 858 // TODO it would be great to limit how often this happens, except in the 859 // case where we are looking for a particular job. 860 List<FileStatus> userDirList = JobHistoryUtils.localGlobber( 861 intermediateDoneDirFc, intermediateDoneDirPath, ""); 862 LOG.debug("Scanning intermediate dirs"); 863 for (FileStatus userDir : userDirList) { 864 String name = userDir.getPath().getName(); 865 UserLogDir dir = userDirModificationTimeMap.get(name); 866 if(dir == null) { 867 dir = new UserLogDir(); 868 UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir); 869 if(old != null) { 870 dir = old; 871 } 872 } 873 dir.scanIfNeeded(userDir); 874 } 875 } 876 877 /** 878 * Scans the specified path and populates the intermediate cache. 879 * 880 * @param absPath 881 * @throws IOException 882 */ 883 private void scanIntermediateDirectory(final Path absPath) throws IOException { 884 if (LOG.isDebugEnabled()) { 885 LOG.debug("Scanning intermediate dir " + absPath); 886 } 887 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath, 888 intermediateDoneDirFc); 889 if (LOG.isDebugEnabled()) { 890 LOG.debug("Found " + fileStatusList.size() + " files"); 891 } 892 for (FileStatus fs : fileStatusList) { 893 if (LOG.isDebugEnabled()) { 894 LOG.debug("scanning file: "+ fs.getPath()); 895 } 896 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 897 .getName()); 898 String confFileName = JobHistoryUtils 899 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 900 String summaryFileName = JobHistoryUtils 901 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 902 HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path(fs 903 .getPath().getParent(), confFileName), new Path(fs.getPath() 904 .getParent(), summaryFileName), jobIndexInfo, false); 905 906 final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo); 907 if (old == null || old.didMoveFail()) { 908 final HistoryFileInfo found = (old == null) ? fileInfo : old; 909 long cutoff = System.currentTimeMillis() - maxHistoryAge; 910 if(found.getJobIndexInfo().getFinishTime() <= cutoff) { 911 try { 912 found.delete(); 913 } catch (IOException e) { 914 LOG.warn("Error cleaning up a HistoryFile that is out of date.", e); 915 } 916 } else { 917 if (LOG.isDebugEnabled()) { 918 LOG.debug("Scheduling move to done of " +found); 919 } 920 moveToDoneExecutor.execute(new Runnable() { 921 @Override 922 public void run() { 923 try { 924 found.moveToDone(); 925 } catch (IOException e) { 926 LOG.info("Failed to process fileInfo for job: " + 927 found.getJobId(), e); 928 } 929 } 930 }); 931 } 932 } else if (!old.isMovePending()) { 933 //This is a duplicate so just delete it 934 if (LOG.isDebugEnabled()) { 935 LOG.debug("Duplicate: deleting"); 936 } 937 fileInfo.delete(); 938 } 939 } 940 } 941 942 /** 943 * Searches the job history file FileStatus list for the specified JobId. 944 * 945 * @param fileStatusList 946 * fileStatus list of Job History Files. 947 * @param jobId 948 * The JobId to find. 949 * @return A FileInfo object for the jobId, null if not found. 950 * @throws IOException 951 */ 952 private HistoryFileInfo getJobFileInfo(List<FileStatus> fileStatusList, 953 JobId jobId) throws IOException { 954 for (FileStatus fs : fileStatusList) { 955 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() 956 .getName()); 957 if (jobIndexInfo.getJobId().equals(jobId)) { 958 String confFileName = JobHistoryUtils 959 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 960 String summaryFileName = JobHistoryUtils 961 .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); 962 HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new Path( 963 fs.getPath().getParent(), confFileName), new Path(fs.getPath() 964 .getParent(), summaryFileName), jobIndexInfo, true); 965 return fileInfo; 966 } 967 } 968 return null; 969 } 970 971 /** 972 * Scans old directories known by the idToDateString map for the specified 973 * jobId. If the number of directories is higher than the supported size of 974 * the idToDateString cache, the jobId will not be found. 975 * 976 * @param jobId 977 * the jobId. 978 * @return 979 * @throws IOException 980 */ 981 private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { 982 String boxedSerialNumber = JobHistoryUtils.serialNumberDirectoryComponent( 983 jobId, serialNumberFormat); 984 Set<String> dateStringSet = serialNumberIndex.get(boxedSerialNumber); 985 if (dateStringSet == null) { 986 return null; 987 } 988 for (String timestampPart : dateStringSet) { 989 Path logDir = canonicalHistoryLogPath(jobId, timestampPart); 990 List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, 991 doneDirFc); 992 HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId); 993 if (fileInfo != null) { 994 return fileInfo; 995 } 996 } 997 return null; 998 } 999 1000 public Collection<HistoryFileInfo> getAllFileInfo() throws IOException { 1001 scanIntermediateDirectory(); 1002 return jobListCache.values(); 1003 } 1004 1005 public HistoryFileInfo getFileInfo(JobId jobId) throws IOException { 1006 // FileInfo available in cache. 1007 HistoryFileInfo fileInfo = jobListCache.get(jobId); 1008 if (fileInfo != null) { 1009 return fileInfo; 1010 } 1011 // OK so scan the intermediate to be sure we did not lose it that way 1012 scanIntermediateDirectory(); 1013 fileInfo = jobListCache.get(jobId); 1014 if (fileInfo != null) { 1015 return fileInfo; 1016 } 1017 1018 // Intermediate directory does not contain job. Search through older ones. 1019 fileInfo = scanOldDirsForJob(jobId); 1020 if (fileInfo != null) { 1021 return fileInfo; 1022 } 1023 return null; 1024 } 1025 1026 private void moveToDoneNow(final Path src, final Path target) 1027 throws IOException { 1028 LOG.info("Moving " + src.toString() + " to " + target.toString()); 1029 intermediateDoneDirFc.rename(src, target, Options.Rename.NONE); 1030 } 1031 1032 private String getJobSummary(FileContext fc, Path path) throws IOException { 1033 Path qPath = fc.makeQualified(path); 1034 FSDataInputStream in = null; 1035 String jobSummaryString = null; 1036 try { 1037 in = fc.open(qPath); 1038 jobSummaryString = in.readUTF(); 1039 } finally { 1040 if (in != null) { 1041 in.close(); 1042 } 1043 } 1044 return jobSummaryString; 1045 } 1046 1047 private void makeDoneSubdir(Path path) throws IOException { 1048 try { 1049 doneDirFc.getFileStatus(path); 1050 existingDoneSubdirs.add(path); 1051 } catch (FileNotFoundException fnfE) { 1052 try { 1053 FsPermission fsp = new FsPermission( 1054 JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION); 1055 doneDirFc.mkdir(path, fsp, true); 1056 FileStatus fsStatus = doneDirFc.getFileStatus(path); 1057 LOG.info("Perms after creating " + fsStatus.getPermission().toShort() 1058 + ", Expected: " + fsp.toShort()); 1059 if (fsStatus.getPermission().toShort() != fsp.toShort()) { 1060 LOG.info("Explicitly setting permissions to : " + fsp.toShort() 1061 + ", " + fsp); 1062 doneDirFc.setPermission(path, fsp); 1063 } 1064 existingDoneSubdirs.add(path); 1065 } catch (FileAlreadyExistsException faeE) { // Nothing to do. 1066 } 1067 } 1068 } 1069 1070 private Path canonicalHistoryLogPath(JobId id, String timestampComponent) { 1071 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 1072 id, timestampComponent, serialNumberFormat)); 1073 } 1074 1075 private Path canonicalHistoryLogPath(JobId id, long millisecondTime) { 1076 String timestampComponent = JobHistoryUtils 1077 .timestampDirectoryComponent(millisecondTime); 1078 return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory( 1079 id, timestampComponent, serialNumberFormat)); 1080 } 1081 1082 private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) { 1083 if (finishTime == 0) { 1084 return fileStatus.getModificationTime(); 1085 } 1086 return finishTime; 1087 } 1088 1089 private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException { 1090 jobListCache.delete(fileInfo); 1091 fileInfo.delete(); 1092 } 1093 1094 List<FileStatus> getHistoryDirsForCleaning(long cutoff) throws IOException { 1095 return JobHistoryUtils. 1096 getHistoryDirsForCleaning(doneDirFc, doneDirPrefixPath, cutoff); 1097 } 1098 1099 /** 1100 * Clean up older history files. 1101 * 1102 * @throws IOException 1103 * on any error trying to remove the entries. 1104 */ 1105 @SuppressWarnings("unchecked") 1106 void clean() throws IOException { 1107 long cutoff = System.currentTimeMillis() - maxHistoryAge; 1108 boolean halted = false; 1109 List<FileStatus> serialDirList = getHistoryDirsForCleaning(cutoff); 1110 // Sort in ascending order. Relies on YYYY/MM/DD/Serial 1111 Collections.sort(serialDirList); 1112 for (FileStatus serialDir : serialDirList) { 1113 List<FileStatus> historyFileList = scanDirectoryForHistoryFiles( 1114 serialDir.getPath(), doneDirFc); 1115 for (FileStatus historyFile : historyFileList) { 1116 JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(historyFile 1117 .getPath().getName()); 1118 long effectiveTimestamp = getEffectiveTimestamp( 1119 jobIndexInfo.getFinishTime(), historyFile); 1120 if (effectiveTimestamp <= cutoff) { 1121 HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo 1122 .getJobId()); 1123 if (fileInfo == null) { 1124 String confFileName = JobHistoryUtils 1125 .getIntermediateConfFileName(jobIndexInfo.getJobId()); 1126 1127 fileInfo = createHistoryFileInfo(historyFile.getPath(), new Path( 1128 historyFile.getPath().getParent(), confFileName), null, 1129 jobIndexInfo, true); 1130 } 1131 deleteJobFromDone(fileInfo); 1132 } else { 1133 halted = true; 1134 break; 1135 } 1136 } 1137 if (!halted) { 1138 deleteDir(serialDir); 1139 removeDirectoryFromSerialNumberIndex(serialDir.getPath()); 1140 existingDoneSubdirs.remove(serialDir.getPath()); 1141 } else { 1142 break; // Don't scan any more directories. 1143 } 1144 } 1145 } 1146 1147 protected boolean deleteDir(FileStatus serialDir) 1148 throws AccessControlException, FileNotFoundException, 1149 UnsupportedFileSystemException, IOException { 1150 return doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); 1151 } 1152 1153 // for test 1154 @VisibleForTesting 1155 void setMaxHistoryAge(long newValue){ 1156 maxHistoryAge=newValue; 1157 } 1158}