001/** 002* Licensed to the Apache Software Foundation (ASF) under one 003* or more contributor license agreements. See the NOTICE file 004* distributed with this work for additional information 005* regarding copyright ownership. The ASF licenses this file 006* to you under the Apache License, Version 2.0 (the 007* "License"); you may not use this file except in compliance 008* with the License. You may obtain a copy of the License at 009* 010* http://www.apache.org/licenses/LICENSE-2.0 011* 012* Unless required by applicable law or agreed to in writing, software 013* distributed under the License is distributed on an "AS IS" BASIS, 014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015* See the License for the specific language governing permissions and 016* limitations under the License. 017*/ 018 019package org.apache.hadoop.yarn.logaggregation; 020 021import java.io.DataInput; 022import java.io.DataInputStream; 023import java.io.DataOutput; 024import java.io.DataOutputStream; 025import java.io.EOFException; 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.IOException; 029import java.io.InputStreamReader; 030import java.io.OutputStream; 031import java.io.PrintStream; 032import java.io.Writer; 033import java.nio.charset.Charset; 034import java.security.PrivilegedExceptionAction; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Collections; 038import java.util.EnumSet; 039import java.util.HashMap; 040import java.util.HashSet; 041import java.util.Iterator; 042import java.util.List; 043import java.util.Map; 044import java.util.Map.Entry; 045import java.util.Set; 046import java.util.regex.Pattern; 047 048import org.apache.commons.io.input.BoundedInputStream; 049import org.apache.commons.io.output.WriterOutputStream; 050import org.apache.commons.logging.Log; 051import org.apache.commons.logging.LogFactory; 052import org.apache.hadoop.classification.InterfaceAudience.Private; 053import org.apache.hadoop.classification.InterfaceAudience.Public; 054import org.apache.hadoop.classification.InterfaceStability.Evolving; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.CreateFlag; 057import org.apache.hadoop.fs.FSDataInputStream; 058import org.apache.hadoop.fs.FSDataOutputStream; 059import org.apache.hadoop.fs.FileContext; 060import org.apache.hadoop.fs.Options; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.fs.permission.FsPermission; 063import org.apache.hadoop.io.IOUtils; 064import org.apache.hadoop.io.SecureIOUtils; 065import org.apache.hadoop.io.Writable; 066import org.apache.hadoop.io.file.tfile.TFile; 067import org.apache.hadoop.security.UserGroupInformation; 068import org.apache.hadoop.yarn.api.records.ApplicationAccessType; 069import org.apache.hadoop.yarn.api.records.ContainerId; 070import org.apache.hadoop.yarn.api.records.LogAggregationContext; 071import org.apache.hadoop.yarn.conf.YarnConfiguration; 072import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 073import org.apache.hadoop.yarn.util.ConverterUtils; 074import org.apache.hadoop.yarn.util.Times; 075 076import com.google.common.annotations.VisibleForTesting; 077import com.google.common.base.Predicate; 078import com.google.common.collect.Iterables; 079import com.google.common.collect.Sets; 080 081@Public 082@Evolving 083public class AggregatedLogFormat { 084 085 private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class); 086 private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL"); 087 private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER"); 088 private static final LogKey VERSION_KEY = new LogKey("VERSION"); 089 private static final Map<String, LogKey> RESERVED_KEYS; 090 //Maybe write out the retention policy. 091 //Maybe write out a list of containerLogs skipped by the retention policy. 092 private static final int VERSION = 1; 093 094 /** 095 * Umask for the log file. 096 */ 097 private static final FsPermission APP_LOG_FILE_UMASK = FsPermission 098 .createImmutable((short) (0640 ^ 0777)); 099 100 101 static { 102 RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>(); 103 RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY); 104 RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY); 105 RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY); 106 } 107 108 @Public 109 public static class LogKey implements Writable { 110 111 private String keyString; 112 113 public LogKey() { 114 115 } 116 117 public LogKey(ContainerId containerId) { 118 this.keyString = containerId.toString(); 119 } 120 121 public LogKey(String keyString) { 122 this.keyString = keyString; 123 } 124 125 @Override 126 public int hashCode() { 127 return keyString == null ? 0 : keyString.hashCode(); 128 } 129 130 @Override 131 public boolean equals(Object obj) { 132 if (obj instanceof LogKey) { 133 LogKey other = (LogKey) obj; 134 if (this.keyString == null) { 135 return other.keyString == null; 136 } 137 return this.keyString.equals(other.keyString); 138 } 139 return false; 140 } 141 142 @Private 143 @Override 144 public void write(DataOutput out) throws IOException { 145 out.writeUTF(this.keyString); 146 } 147 148 @Private 149 @Override 150 public void readFields(DataInput in) throws IOException { 151 this.keyString = in.readUTF(); 152 } 153 154 @Override 155 public String toString() { 156 return this.keyString; 157 } 158 } 159 160 @Private 161 public static class LogValue { 162 163 private final List<String> rootLogDirs; 164 private final ContainerId containerId; 165 private final String user; 166 private final LogAggregationContext logAggregationContext; 167 private Set<File> uploadedFiles = new HashSet<File>(); 168 private final Set<String> alreadyUploadedLogFiles; 169 private Set<String> allExistingFileMeta = new HashSet<String>(); 170 private final boolean appFinished; 171 private final boolean containerFinished; 172 173 /** 174 * The retention context to determine if log files are older than 175 * the retention policy configured. 176 */ 177 private final LogRetentionContext logRetentionContext; 178 /** 179 * The set of log files that are older than retention policy that will 180 * not be uploaded but ready for deletion. 181 */ 182 private final Set<File> obseleteRetentionLogFiles = new HashSet<File>(); 183 184 // TODO Maybe add a version string here. Instead of changing the version of 185 // the entire k-v format 186 187 public LogValue(List<String> rootLogDirs, ContainerId containerId, 188 String user) { 189 this(rootLogDirs, containerId, user, null, new HashSet<String>(), 190 null, true, true); 191 } 192 193 public LogValue(List<String> rootLogDirs, ContainerId containerId, 194 String user, LogAggregationContext logAggregationContext, 195 Set<String> alreadyUploadedLogFiles, 196 LogRetentionContext retentionContext, boolean appFinished, 197 boolean containerFinished) { 198 this.rootLogDirs = new ArrayList<String>(rootLogDirs); 199 this.containerId = containerId; 200 this.user = user; 201 202 // Ensure logs are processed in lexical order 203 Collections.sort(this.rootLogDirs); 204 this.logAggregationContext = logAggregationContext; 205 this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; 206 this.appFinished = appFinished; 207 this.containerFinished = containerFinished; 208 this.logRetentionContext = retentionContext; 209 } 210 211 @VisibleForTesting 212 public Set<File> getPendingLogFilesToUploadForThisContainer() { 213 Set<File> pendingUploadFiles = new HashSet<File>(); 214 for (String rootLogDir : this.rootLogDirs) { 215 File appLogDir = new File(rootLogDir, 216 this.containerId.getApplicationAttemptId(). 217 getApplicationId().toString()); 218 File containerLogDir = 219 new File(appLogDir, this.containerId.toString()); 220 221 if (!containerLogDir.isDirectory()) { 222 continue; // ContainerDir may have been deleted by the user. 223 } 224 225 pendingUploadFiles 226 .addAll(getPendingLogFilesToUpload(containerLogDir)); 227 } 228 return pendingUploadFiles; 229 } 230 231 public void write(DataOutputStream out, Set<File> pendingUploadFiles) 232 throws IOException { 233 List<File> fileList = new ArrayList<File>(pendingUploadFiles); 234 Collections.sort(fileList); 235 236 for (File logFile : fileList) { 237 // We only aggregate top level files. 238 // Ignore anything inside sub-folders. 239 if (logFile.isDirectory()) { 240 LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it."); 241 continue; 242 } 243 244 FileInputStream in = null; 245 try { 246 in = secureOpenFile(logFile); 247 } catch (IOException e) { 248 logErrorMessage(logFile, e); 249 IOUtils.cleanup(LOG, in); 250 continue; 251 } 252 253 final long fileLength = logFile.length(); 254 // Write the logFile Type 255 out.writeUTF(logFile.getName()); 256 257 // Write the log length as UTF so that it is printable 258 out.writeUTF(String.valueOf(fileLength)); 259 260 // Write the log itself 261 try { 262 byte[] buf = new byte[65535]; 263 int len = 0; 264 long bytesLeft = fileLength; 265 while ((len = in.read(buf)) != -1) { 266 //If buffer contents within fileLength, write 267 if (len < bytesLeft) { 268 out.write(buf, 0, len); 269 bytesLeft-=len; 270 } 271 //else only write contents within fileLength, then exit early 272 else { 273 out.write(buf, 0, (int)bytesLeft); 274 break; 275 } 276 } 277 long newLength = logFile.length(); 278 if(fileLength < newLength) { 279 LOG.warn("Aggregated logs truncated by approximately "+ 280 (newLength-fileLength) +" bytes."); 281 } 282 this.uploadedFiles.add(logFile); 283 } catch (IOException e) { 284 String message = logErrorMessage(logFile, e); 285 out.write(message.getBytes(Charset.forName("UTF-8"))); 286 } finally { 287 IOUtils.cleanup(LOG, in); 288 } 289 } 290 } 291 292 @VisibleForTesting 293 public FileInputStream secureOpenFile(File logFile) throws IOException { 294 return SecureIOUtils.openForRead(logFile, getUser(), null); 295 } 296 297 private static String logErrorMessage(File logFile, Exception e) { 298 String message = "Error aggregating log file. Log file : " 299 + logFile.getAbsolutePath() + ". " + e.getMessage(); 300 LOG.error(message, e); 301 return message; 302 } 303 304 // Added for testing purpose. 305 public String getUser() { 306 return user; 307 } 308 309 private Set<File> getPendingLogFilesToUpload(File containerLogDir) { 310 Set<File> candidates = 311 new HashSet<File>(Arrays.asList(containerLogDir.listFiles())); 312 for (File logFile : candidates) { 313 this.allExistingFileMeta.add(getLogFileMetaData(logFile)); 314 } 315 316 // if log files are older than retention policy, do not upload them. 317 // but schedule them for deletion. 318 if(logRetentionContext != null && !logRetentionContext.shouldRetainLog()){ 319 obseleteRetentionLogFiles.addAll(candidates); 320 candidates.clear(); 321 return candidates; 322 } 323 324 Set<File> fileCandidates = new HashSet<File>(candidates); 325 if (this.logAggregationContext != null && candidates.size() > 0) { 326 fileCandidates = getFileCandidates(fileCandidates, this.appFinished); 327 if (!this.appFinished && this.containerFinished) { 328 Set<File> addition = new HashSet<File>(candidates); 329 addition = getFileCandidates(addition, true); 330 fileCandidates.addAll(addition); 331 } 332 } 333 334 return fileCandidates; 335 } 336 337 private Set<File> getFileCandidates(Set<File> candidates, 338 boolean useRegularPattern) { 339 filterFiles( 340 useRegularPattern ? this.logAggregationContext.getIncludePattern() 341 : this.logAggregationContext.getRolledLogsIncludePattern(), 342 candidates, false); 343 344 filterFiles( 345 useRegularPattern ? this.logAggregationContext.getExcludePattern() 346 : this.logAggregationContext.getRolledLogsExcludePattern(), 347 candidates, true); 348 349 Iterable<File> mask = 350 Iterables.filter(candidates, new Predicate<File>() { 351 @Override 352 public boolean apply(File next) { 353 return !alreadyUploadedLogFiles 354 .contains(getLogFileMetaData(next)); 355 } 356 }); 357 return Sets.newHashSet(mask); 358 } 359 360 private void filterFiles(String pattern, Set<File> candidates, 361 boolean exclusion) { 362 if (pattern != null && !pattern.isEmpty()) { 363 Pattern filterPattern = Pattern.compile(pattern); 364 for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr 365 .hasNext();) { 366 File candidate = candidatesItr.next(); 367 boolean match = filterPattern.matcher(candidate.getName()).find(); 368 if ((!match && !exclusion) || (match && exclusion)) { 369 candidatesItr.remove(); 370 } 371 } 372 } 373 } 374 375 public Set<Path> getCurrentUpLoadedFilesPath() { 376 Set<Path> path = new HashSet<Path>(); 377 for (File file : this.uploadedFiles) { 378 path.add(new Path(file.getAbsolutePath())); 379 } 380 return path; 381 } 382 383 public Set<String> getCurrentUpLoadedFileMeta() { 384 Set<String> info = new HashSet<String>(); 385 for (File file : this.uploadedFiles) { 386 info.add(getLogFileMetaData(file)); 387 } 388 return info; 389 } 390 391 public Set<Path> getObseleteRetentionLogFiles() { 392 Set<Path> path = new HashSet<Path>(); 393 for(File file: this.obseleteRetentionLogFiles) { 394 path.add(new Path(file.getAbsolutePath())); 395 } 396 return path; 397 } 398 399 public Set<String> getAllExistingFilesMeta() { 400 return this.allExistingFileMeta; 401 } 402 403 private String getLogFileMetaData(File file) { 404 return containerId.toString() + "_" + file.getName() + "_" 405 + file.lastModified(); 406 } 407 } 408 409 /** 410 * A context for log retention to determine if files are older than 411 * the retention policy configured in YarnConfiguration. 412 */ 413 public static class LogRetentionContext { 414 /** 415 * The time used with logRetentionMillis, to determine ages of 416 * log files and if files need to be uploaded. 417 */ 418 private final long logInitedTimeMillis; 419 /** 420 * The numbers of milli seconds since a log file is created to determine 421 * if we should upload it. -1 if disabled. 422 * see YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS for details. 423 */ 424 private final long logRetentionMillis; 425 426 public LogRetentionContext(long logInitedTimeMillis, long 427 logRetentionMillis) { 428 this.logInitedTimeMillis = logInitedTimeMillis; 429 this.logRetentionMillis = logRetentionMillis; 430 } 431 432 public boolean isDisabled() { 433 return logInitedTimeMillis < 0 || logRetentionMillis < 0; 434 } 435 436 public boolean shouldRetainLog() { 437 return isDisabled() || 438 System.currentTimeMillis() - logInitedTimeMillis < logRetentionMillis; 439 } 440 } 441 442 /** 443 * The writer that writes out the aggregated logs. 444 */ 445 @Private 446 public static class LogWriter { 447 448 private final FSDataOutputStream fsDataOStream; 449 private final TFile.Writer writer; 450 private FileContext fc; 451 452 public LogWriter(final Configuration conf, final Path remoteAppLogFile, 453 UserGroupInformation userUgi) throws IOException { 454 try { 455 this.fsDataOStream = 456 userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() { 457 @Override 458 public FSDataOutputStream run() throws Exception { 459 fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf); 460 fc.setUMask(APP_LOG_FILE_UMASK); 461 return fc.create( 462 remoteAppLogFile, 463 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 464 new Options.CreateOpts[] {}); 465 } 466 }); 467 } catch (InterruptedException e) { 468 throw new IOException(e); 469 } 470 471 // Keys are not sorted: null arg 472 // 256KB minBlockSize : Expected log size for each container too 473 this.writer = 474 new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get( 475 YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, 476 YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf); 477 //Write the version string 478 writeVersion(); 479 } 480 481 @VisibleForTesting 482 public TFile.Writer getWriter() { 483 return this.writer; 484 } 485 486 private void writeVersion() throws IOException { 487 DataOutputStream out = this.writer.prepareAppendKey(-1); 488 VERSION_KEY.write(out); 489 out.close(); 490 out = this.writer.prepareAppendValue(-1); 491 out.writeInt(VERSION); 492 out.close(); 493 } 494 495 public void writeApplicationOwner(String user) throws IOException { 496 DataOutputStream out = this.writer.prepareAppendKey(-1); 497 APPLICATION_OWNER_KEY.write(out); 498 out.close(); 499 out = this.writer.prepareAppendValue(-1); 500 out.writeUTF(user); 501 out.close(); 502 } 503 504 public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls) 505 throws IOException { 506 DataOutputStream out = this.writer.prepareAppendKey(-1); 507 APPLICATION_ACL_KEY.write(out); 508 out.close(); 509 out = this.writer.prepareAppendValue(-1); 510 for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) { 511 out.writeUTF(entry.getKey().toString()); 512 out.writeUTF(entry.getValue()); 513 } 514 out.close(); 515 } 516 517 public void append(LogKey logKey, LogValue logValue) throws IOException { 518 Set<File> pendingUploadFiles = 519 logValue.getPendingLogFilesToUploadForThisContainer(); 520 if (pendingUploadFiles.size() == 0) { 521 return; 522 } 523 DataOutputStream out = this.writer.prepareAppendKey(-1); 524 logKey.write(out); 525 out.close(); 526 out = this.writer.prepareAppendValue(-1); 527 logValue.write(out, pendingUploadFiles); 528 out.close(); 529 } 530 531 public void close() { 532 try { 533 this.writer.close(); 534 } catch (IOException e) { 535 LOG.warn("Exception closing writer", e); 536 } 537 IOUtils.closeStream(fsDataOStream); 538 } 539 } 540 541 @Public 542 @Evolving 543 public static class LogReader { 544 545 private final FSDataInputStream fsDataIStream; 546 private final TFile.Reader.Scanner scanner; 547 private final TFile.Reader reader; 548 549 public LogReader(Configuration conf, Path remoteAppLogFile) 550 throws IOException { 551 FileContext fileContext = 552 FileContext.getFileContext(remoteAppLogFile.toUri(), conf); 553 this.fsDataIStream = fileContext.open(remoteAppLogFile); 554 reader = 555 new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus( 556 remoteAppLogFile).getLen(), conf); 557 this.scanner = reader.createScanner(); 558 } 559 560 private boolean atBeginning = true; 561 562 /** 563 * Returns the owner of the application. 564 * 565 * @return the application owner. 566 * @throws IOException 567 */ 568 public String getApplicationOwner() throws IOException { 569 TFile.Reader.Scanner ownerScanner = null; 570 try { 571 ownerScanner = reader.createScanner(); 572 LogKey key = new LogKey(); 573 while (!ownerScanner.atEnd()) { 574 TFile.Reader.Scanner.Entry entry = ownerScanner.entry(); 575 key.readFields(entry.getKeyStream()); 576 if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) { 577 DataInputStream valueStream = entry.getValueStream(); 578 return valueStream.readUTF(); 579 } 580 ownerScanner.advance(); 581 } 582 return null; 583 } finally { 584 IOUtils.cleanup(LOG, ownerScanner); 585 } 586 } 587 588 /** 589 * Returns ACLs for the application. An empty map is returned if no ACLs are 590 * found. 591 * 592 * @return a map of the Application ACLs. 593 * @throws IOException 594 */ 595 public Map<ApplicationAccessType, String> getApplicationAcls() 596 throws IOException { 597 // TODO Seek directly to the key once a comparator is specified. 598 TFile.Reader.Scanner aclScanner = null; 599 try { 600 aclScanner = reader.createScanner(); 601 LogKey key = new LogKey(); 602 Map<ApplicationAccessType, String> acls = 603 new HashMap<ApplicationAccessType, String>(); 604 while (!aclScanner.atEnd()) { 605 TFile.Reader.Scanner.Entry entry = aclScanner.entry(); 606 key.readFields(entry.getKeyStream()); 607 if (key.toString().equals(APPLICATION_ACL_KEY.toString())) { 608 DataInputStream valueStream = entry.getValueStream(); 609 while (true) { 610 String appAccessOp = null; 611 String aclString = null; 612 try { 613 appAccessOp = valueStream.readUTF(); 614 } catch (EOFException e) { 615 // Valid end of stream. 616 break; 617 } 618 try { 619 aclString = valueStream.readUTF(); 620 } catch (EOFException e) { 621 throw new YarnRuntimeException("Error reading ACLs", e); 622 } 623 acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString); 624 } 625 } 626 aclScanner.advance(); 627 } 628 return acls; 629 } finally { 630 IOUtils.cleanup(LOG, aclScanner); 631 } 632 } 633 634 /** 635 * Read the next key and return the value-stream. 636 * 637 * @param key 638 * @return the valueStream if there are more keys or null otherwise. 639 * @throws IOException 640 */ 641 public DataInputStream next(LogKey key) throws IOException { 642 if (!this.atBeginning) { 643 this.scanner.advance(); 644 } else { 645 this.atBeginning = false; 646 } 647 if (this.scanner.atEnd()) { 648 return null; 649 } 650 TFile.Reader.Scanner.Entry entry = this.scanner.entry(); 651 key.readFields(entry.getKeyStream()); 652 // Skip META keys 653 if (RESERVED_KEYS.containsKey(key.toString())) { 654 return next(key); 655 } 656 DataInputStream valueStream = entry.getValueStream(); 657 return valueStream; 658 } 659 660 /** 661 * Get a ContainerLogsReader to read the logs for 662 * the specified container. 663 * 664 * @param containerId 665 * @return object to read the container's logs or null if the 666 * logs could not be found 667 * @throws IOException 668 */ 669 @Private 670 public ContainerLogsReader getContainerLogsReader( 671 ContainerId containerId) throws IOException { 672 ContainerLogsReader logReader = null; 673 674 final LogKey containerKey = new LogKey(containerId); 675 LogKey key = new LogKey(); 676 DataInputStream valueStream = next(key); 677 while (valueStream != null && !key.equals(containerKey)) { 678 valueStream = next(key); 679 } 680 681 if (valueStream != null) { 682 logReader = new ContainerLogsReader(valueStream); 683 } 684 685 return logReader; 686 } 687 688 //TODO Change Log format and interfaces to be containerId specific. 689 // Avoid returning completeValueStreams. 690// public List<String> getTypesForContainer(DataInputStream valueStream){} 691// 692// /** 693// * @param valueStream 694// * The Log stream for the container. 695// * @param fileType 696// * the log type required. 697// * @return An InputStreamReader for the required log type or null if the 698// * type is not found. 699// * @throws IOException 700// */ 701// public InputStreamReader getLogStreamForType(DataInputStream valueStream, 702// String fileType) throws IOException { 703// valueStream.reset(); 704// try { 705// while (true) { 706// String ft = valueStream.readUTF(); 707// String fileLengthStr = valueStream.readUTF(); 708// long fileLength = Long.parseLong(fileLengthStr); 709// if (ft.equals(fileType)) { 710// BoundedInputStream bis = 711// new BoundedInputStream(valueStream, fileLength); 712// return new InputStreamReader(bis); 713// } else { 714// long totalSkipped = 0; 715// long currSkipped = 0; 716// while (currSkipped != -1 && totalSkipped < fileLength) { 717// currSkipped = valueStream.skip(fileLength - totalSkipped); 718// totalSkipped += currSkipped; 719// } 720// // TODO Verify skip behaviour. 721// if (currSkipped == -1) { 722// return null; 723// } 724// } 725// } 726// } catch (EOFException e) { 727// return null; 728// } 729// } 730 731 /** 732 * Writes all logs for a single container to the provided writer. 733 * @param valueStream 734 * @param writer 735 * @param logUploadedTime 736 * @throws IOException 737 */ 738 public static void readAcontainerLogs(DataInputStream valueStream, 739 Writer writer, long logUploadedTime) throws IOException { 740 OutputStream os = null; 741 PrintStream ps = null; 742 try { 743 os = new WriterOutputStream(writer, Charset.forName("UTF-8")); 744 ps = new PrintStream(os); 745 while (true) { 746 try { 747 readContainerLogs(valueStream, ps, logUploadedTime, Long.MAX_VALUE); 748 } catch (EOFException e) { 749 // EndOfFile 750 return; 751 } 752 } 753 } finally { 754 IOUtils.cleanup(LOG, ps); 755 IOUtils.cleanup(LOG, os); 756 } 757 } 758 759 /** 760 * Writes all logs for a single container to the provided writer. 761 * @param valueStream 762 * @param writer 763 * @throws IOException 764 */ 765 public static void readAcontainerLogs(DataInputStream valueStream, 766 Writer writer) throws IOException { 767 readAcontainerLogs(valueStream, writer, -1); 768 } 769 770 private static void readContainerLogs(DataInputStream valueStream, 771 PrintStream out, long logUploadedTime, long bytes) 772 throws IOException { 773 byte[] buf = new byte[65535]; 774 775 String fileType = valueStream.readUTF(); 776 String fileLengthStr = valueStream.readUTF(); 777 long fileLength = Long.parseLong(fileLengthStr); 778 out.print("LogType:"); 779 out.println(fileType); 780 if (logUploadedTime != -1) { 781 out.print("Log Upload Time:"); 782 out.println(Times.format(logUploadedTime)); 783 } 784 out.print("LogLength:"); 785 out.println(fileLengthStr); 786 out.println("Log Contents:"); 787 788 long toSkip = 0; 789 long totalBytesToRead = fileLength; 790 long skipAfterRead = 0; 791 if (bytes < 0) { 792 long absBytes = Math.abs(bytes); 793 if (absBytes < fileLength) { 794 toSkip = fileLength - absBytes; 795 totalBytesToRead = absBytes; 796 } 797 org.apache.hadoop.io.IOUtils.skipFully( 798 valueStream, toSkip); 799 } else { 800 if (bytes < fileLength) { 801 totalBytesToRead = bytes; 802 skipAfterRead = fileLength - bytes; 803 } 804 } 805 806 long curRead = 0; 807 long pendingRead = totalBytesToRead - curRead; 808 int toRead = 809 pendingRead > buf.length ? buf.length : (int) pendingRead; 810 int len = valueStream.read(buf, 0, toRead); 811 while (len != -1 && curRead < totalBytesToRead) { 812 out.write(buf, 0, len); 813 curRead += len; 814 815 pendingRead = totalBytesToRead - curRead; 816 toRead = 817 pendingRead > buf.length ? buf.length : (int) pendingRead; 818 len = valueStream.read(buf, 0, toRead); 819 } 820 org.apache.hadoop.io.IOUtils.skipFully( 821 valueStream, skipAfterRead); 822 out.println("\nEnd of LogType:" + fileType); 823 out.println(""); 824 } 825 826 /** 827 * Keep calling this till you get a {@link EOFException} for getting logs of 828 * all types for a single container. 829 * 830 * @param valueStream 831 * @param out 832 * @param logUploadedTime 833 * @throws IOException 834 */ 835 public static void readAContainerLogsForALogType( 836 DataInputStream valueStream, PrintStream out, long logUploadedTime) 837 throws IOException { 838 readContainerLogs(valueStream, out, logUploadedTime, Long.MAX_VALUE); 839 } 840 841 /** 842 * Keep calling this till you get a {@link EOFException} for getting logs of 843 * all types for a single container for the specific bytes. 844 * 845 * @param valueStream 846 * @param out 847 * @param logUploadedTime 848 * @param bytes 849 * @throws IOException 850 */ 851 public static void readAContainerLogsForALogType( 852 DataInputStream valueStream, PrintStream out, long logUploadedTime, 853 long bytes) throws IOException { 854 readContainerLogs(valueStream, out, logUploadedTime, bytes); 855 } 856 857 /** 858 * Keep calling this till you get a {@link EOFException} for getting logs of 859 * all types for a single container. 860 * 861 * @param valueStream 862 * @param out 863 * @throws IOException 864 */ 865 public static void readAContainerLogsForALogType( 866 DataInputStream valueStream, PrintStream out) 867 throws IOException { 868 readAContainerLogsForALogType(valueStream, out, -1); 869 } 870 871 /** 872 * Keep calling this till you get a {@link EOFException} for getting logs of 873 * the specific types for a single container. 874 * @param valueStream 875 * @param out 876 * @param logUploadedTime 877 * @param logType 878 * @throws IOException 879 */ 880 public static int readContainerLogsForALogType( 881 DataInputStream valueStream, PrintStream out, long logUploadedTime, 882 List<String> logType) throws IOException { 883 return readContainerLogsForALogType(valueStream, out, logUploadedTime, 884 logType, Long.MAX_VALUE); 885 } 886 887 /** 888 * Keep calling this till you get a {@link EOFException} for getting logs of 889 * the specific types for a single container. 890 * @param valueStream 891 * @param out 892 * @param logUploadedTime 893 * @param logType 894 * @throws IOException 895 */ 896 public static int readContainerLogsForALogType( 897 DataInputStream valueStream, PrintStream out, long logUploadedTime, 898 List<String> logType, long bytes) throws IOException { 899 byte[] buf = new byte[65535]; 900 901 String fileType = valueStream.readUTF(); 902 String fileLengthStr = valueStream.readUTF(); 903 long fileLength = Long.parseLong(fileLengthStr); 904 if (logType.contains(fileType)) { 905 out.print("LogType:"); 906 out.println(fileType); 907 if (logUploadedTime != -1) { 908 out.print("Log Upload Time:"); 909 out.println(Times.format(logUploadedTime)); 910 } 911 out.print("LogLength:"); 912 out.println(fileLengthStr); 913 out.println("Log Contents:"); 914 915 long toSkip = 0; 916 long totalBytesToRead = fileLength; 917 long skipAfterRead = 0; 918 if (bytes < 0) { 919 long absBytes = Math.abs(bytes); 920 if (absBytes < fileLength) { 921 toSkip = fileLength - absBytes; 922 totalBytesToRead = absBytes; 923 } 924 org.apache.hadoop.io.IOUtils.skipFully( 925 valueStream, toSkip); 926 } else { 927 if (bytes < fileLength) { 928 totalBytesToRead = bytes; 929 skipAfterRead = fileLength - bytes; 930 } 931 } 932 933 long curRead = 0; 934 long pendingRead = totalBytesToRead - curRead; 935 int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; 936 int len = valueStream.read(buf, 0, toRead); 937 while (len != -1 && curRead < totalBytesToRead) { 938 out.write(buf, 0, len); 939 curRead += len; 940 941 pendingRead = totalBytesToRead - curRead; 942 toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; 943 len = valueStream.read(buf, 0, toRead); 944 } 945 org.apache.hadoop.io.IOUtils.skipFully( 946 valueStream, skipAfterRead); 947 out.println("\nEnd of LogType:" + fileType); 948 out.println(""); 949 return 0; 950 } else { 951 long totalSkipped = 0; 952 long currSkipped = 0; 953 while (currSkipped != -1 && totalSkipped < fileLength) { 954 currSkipped = valueStream.skip(fileLength - totalSkipped); 955 totalSkipped += currSkipped; 956 } 957 return -1; 958 } 959 } 960 961 @Private 962 public static String readContainerMetaDataAndSkipData( 963 DataInputStream valueStream, PrintStream out) throws IOException { 964 965 String fileType = valueStream.readUTF(); 966 String fileLengthStr = valueStream.readUTF(); 967 long fileLength = Long.parseLong(fileLengthStr); 968 if (out != null) { 969 out.print("LogType:"); 970 out.println(fileType); 971 out.print("LogLength:"); 972 out.println(fileLengthStr); 973 } 974 long totalSkipped = 0; 975 long currSkipped = 0; 976 while (currSkipped != -1 && totalSkipped < fileLength) { 977 currSkipped = valueStream.skip(fileLength - totalSkipped); 978 totalSkipped += currSkipped; 979 } 980 return fileType; 981 } 982 983 public void close() { 984 IOUtils.cleanup(LOG, scanner, reader, fsDataIStream); 985 } 986 } 987 988 @Private 989 public static class ContainerLogsReader { 990 private DataInputStream valueStream; 991 private String currentLogType = null; 992 private long currentLogLength = 0; 993 private BoundedInputStream currentLogData = null; 994 private InputStreamReader currentLogISR; 995 996 public ContainerLogsReader(DataInputStream stream) { 997 valueStream = stream; 998 } 999 1000 public String nextLog() throws IOException { 1001 if (currentLogData != null && currentLogLength > 0) { 1002 // seek to the end of the current log, relying on BoundedInputStream 1003 // to prevent seeking past the end of the current log 1004 do { 1005 if (currentLogData.skip(currentLogLength) < 0) { 1006 break; 1007 } 1008 } while (currentLogData.read() != -1); 1009 } 1010 1011 currentLogType = null; 1012 currentLogLength = 0; 1013 currentLogData = null; 1014 currentLogISR = null; 1015 1016 try { 1017 String logType = valueStream.readUTF(); 1018 String logLengthStr = valueStream.readUTF(); 1019 currentLogLength = Long.parseLong(logLengthStr); 1020 currentLogData = 1021 new BoundedInputStream(valueStream, currentLogLength); 1022 currentLogData.setPropagateClose(false); 1023 currentLogISR = new InputStreamReader(currentLogData, 1024 Charset.forName("UTF-8")); 1025 currentLogType = logType; 1026 } catch (EOFException e) { 1027 } 1028 1029 return currentLogType; 1030 } 1031 1032 public String getCurrentLogType() { 1033 return currentLogType; 1034 } 1035 1036 public long getCurrentLogLength() { 1037 return currentLogLength; 1038 } 1039 1040 public long skip(long n) throws IOException { 1041 return currentLogData.skip(n); 1042 } 1043 1044 public int read() throws IOException { 1045 return currentLogData.read(); 1046 } 1047 1048 public int read(byte[] buf, int off, int len) throws IOException { 1049 return currentLogData.read(buf, off, len); 1050 } 1051 1052 public int read(char[] buf, int off, int len) throws IOException { 1053 return currentLogISR.read(buf, off, len); 1054 } 1055 } 1056}