001/** 002* Licensed to the Apache Software Foundation (ASF) under one 003* or more contributor license agreements. See the NOTICE file 004* distributed with this work for additional information 005* regarding copyright ownership. The ASF licenses this file 006* to you under the Apache License, Version 2.0 (the 007* "License"); you may not use this file except in compliance 008* with the License. You may obtain a copy of the License at 009* 010* http://www.apache.org/licenses/LICENSE-2.0 011* 012* Unless required by applicable law or agreed to in writing, software 013* distributed under the License is distributed on an "AS IS" BASIS, 014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015* See the License for the specific language governing permissions and 016* limitations under the License. 017*/ 018 019package org.apache.hadoop.yarn.logaggregation; 020 021import java.io.DataInput; 022import java.io.DataInputStream; 023import java.io.DataOutput; 024import java.io.DataOutputStream; 025import java.io.EOFException; 026import java.io.File; 027import java.io.FileInputStream; 028import java.io.IOException; 029import java.io.InputStreamReader; 030import java.io.OutputStream; 031import java.io.PrintStream; 032import java.io.Writer; 033import java.nio.charset.Charset; 034import java.security.PrivilegedExceptionAction; 035import java.util.ArrayList; 036import java.util.Arrays; 037import java.util.Collections; 038import java.util.EnumSet; 039import java.util.HashMap; 040import java.util.HashSet; 041import java.util.Iterator; 042import java.util.List; 043import java.util.Map; 044import java.util.Map.Entry; 045import java.util.Set; 046import java.util.regex.Pattern; 047 048import org.apache.commons.io.input.BoundedInputStream; 049import org.apache.commons.io.output.WriterOutputStream; 050import org.apache.commons.logging.Log; 051import org.apache.commons.logging.LogFactory; 052import org.apache.hadoop.classification.InterfaceAudience.Private; 053import org.apache.hadoop.classification.InterfaceAudience.Public; 054import org.apache.hadoop.classification.InterfaceStability.Evolving; 055import org.apache.hadoop.conf.Configuration; 056import org.apache.hadoop.fs.CreateFlag; 057import org.apache.hadoop.fs.FSDataInputStream; 058import org.apache.hadoop.fs.FSDataOutputStream; 059import org.apache.hadoop.fs.FileContext; 060import org.apache.hadoop.fs.Options; 061import org.apache.hadoop.fs.Path; 062import org.apache.hadoop.fs.permission.FsPermission; 063import org.apache.hadoop.io.IOUtils; 064import org.apache.hadoop.io.SecureIOUtils; 065import org.apache.hadoop.io.Writable; 066import org.apache.hadoop.io.file.tfile.TFile; 067import org.apache.hadoop.security.UserGroupInformation; 068import org.apache.hadoop.yarn.api.records.ApplicationAccessType; 069import org.apache.hadoop.yarn.api.records.ContainerId; 070import org.apache.hadoop.yarn.api.records.LogAggregationContext; 071import org.apache.hadoop.yarn.conf.YarnConfiguration; 072import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 073import org.apache.hadoop.yarn.util.ConverterUtils; 074import org.apache.hadoop.yarn.util.Times; 075 076import com.google.common.annotations.VisibleForTesting; 077import com.google.common.base.Predicate; 078import com.google.common.collect.Iterables; 079import com.google.common.collect.Sets; 080 081@Public 082@Evolving 083public class AggregatedLogFormat { 084 085 private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class); 086 private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL"); 087 private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER"); 088 private static final LogKey VERSION_KEY = new LogKey("VERSION"); 089 private static final Map<String, LogKey> RESERVED_KEYS; 090 //Maybe write out the retention policy. 091 //Maybe write out a list of containerLogs skipped by the retention policy. 092 private static final int VERSION = 1; 093 094 /** 095 * Umask for the log file. 096 */ 097 private static final FsPermission APP_LOG_FILE_UMASK = FsPermission 098 .createImmutable((short) (0640 ^ 0777)); 099 100 101 static { 102 RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>(); 103 RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY); 104 RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY); 105 RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY); 106 } 107 108 @Public 109 public static class LogKey implements Writable { 110 111 private String keyString; 112 113 public LogKey() { 114 115 } 116 117 public LogKey(ContainerId containerId) { 118 this.keyString = containerId.toString(); 119 } 120 121 public LogKey(String keyString) { 122 this.keyString = keyString; 123 } 124 125 @Override 126 public int hashCode() { 127 return keyString == null ? 0 : keyString.hashCode(); 128 } 129 130 @Override 131 public boolean equals(Object obj) { 132 if (obj instanceof LogKey) { 133 LogKey other = (LogKey) obj; 134 if (this.keyString == null) { 135 return other.keyString == null; 136 } 137 return this.keyString.equals(other.keyString); 138 } 139 return false; 140 } 141 142 @Private 143 @Override 144 public void write(DataOutput out) throws IOException { 145 out.writeUTF(this.keyString); 146 } 147 148 @Private 149 @Override 150 public void readFields(DataInput in) throws IOException { 151 this.keyString = in.readUTF(); 152 } 153 154 @Override 155 public String toString() { 156 return this.keyString; 157 } 158 } 159 160 @Private 161 public static class LogValue { 162 163 private final List<String> rootLogDirs; 164 private final ContainerId containerId; 165 private final String user; 166 private final LogAggregationContext logAggregationContext; 167 private Set<File> uploadedFiles = new HashSet<File>(); 168 private final Set<String> alreadyUploadedLogFiles; 169 private Set<String> allExistingFileMeta = new HashSet<String>(); 170 private final boolean appFinished; 171 // TODO Maybe add a version string here. Instead of changing the version of 172 // the entire k-v format 173 174 public LogValue(List<String> rootLogDirs, ContainerId containerId, 175 String user) { 176 this(rootLogDirs, containerId, user, null, new HashSet<String>(), true); 177 } 178 179 public LogValue(List<String> rootLogDirs, ContainerId containerId, 180 String user, LogAggregationContext logAggregationContext, 181 Set<String> alreadyUploadedLogFiles, boolean appFinished) { 182 this.rootLogDirs = new ArrayList<String>(rootLogDirs); 183 this.containerId = containerId; 184 this.user = user; 185 186 // Ensure logs are processed in lexical order 187 Collections.sort(this.rootLogDirs); 188 this.logAggregationContext = logAggregationContext; 189 this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; 190 this.appFinished = appFinished; 191 } 192 193 private Set<File> getPendingLogFilesToUploadForThisContainer() { 194 Set<File> pendingUploadFiles = new HashSet<File>(); 195 for (String rootLogDir : this.rootLogDirs) { 196 File appLogDir = 197 new File(rootLogDir, 198 ConverterUtils.toString( 199 this.containerId.getApplicationAttemptId(). 200 getApplicationId()) 201 ); 202 File containerLogDir = 203 new File(appLogDir, ConverterUtils.toString(this.containerId)); 204 205 if (!containerLogDir.isDirectory()) { 206 continue; // ContainerDir may have been deleted by the user. 207 } 208 209 pendingUploadFiles 210 .addAll(getPendingLogFilesToUpload(containerLogDir)); 211 } 212 return pendingUploadFiles; 213 } 214 215 public void write(DataOutputStream out, Set<File> pendingUploadFiles) 216 throws IOException { 217 List<File> fileList = new ArrayList<File>(pendingUploadFiles); 218 Collections.sort(fileList); 219 220 for (File logFile : fileList) { 221 // We only aggregate top level files. 222 // Ignore anything inside sub-folders. 223 if (logFile.isDirectory()) { 224 LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it."); 225 continue; 226 } 227 228 FileInputStream in = null; 229 try { 230 in = secureOpenFile(logFile); 231 } catch (IOException e) { 232 logErrorMessage(logFile, e); 233 IOUtils.cleanup(LOG, in); 234 continue; 235 } 236 237 final long fileLength = logFile.length(); 238 // Write the logFile Type 239 out.writeUTF(logFile.getName()); 240 241 // Write the log length as UTF so that it is printable 242 out.writeUTF(String.valueOf(fileLength)); 243 244 // Write the log itself 245 try { 246 byte[] buf = new byte[65535]; 247 int len = 0; 248 long bytesLeft = fileLength; 249 while ((len = in.read(buf)) != -1) { 250 //If buffer contents within fileLength, write 251 if (len < bytesLeft) { 252 out.write(buf, 0, len); 253 bytesLeft-=len; 254 } 255 //else only write contents within fileLength, then exit early 256 else { 257 out.write(buf, 0, (int)bytesLeft); 258 break; 259 } 260 } 261 long newLength = logFile.length(); 262 if(fileLength < newLength) { 263 LOG.warn("Aggregated logs truncated by approximately "+ 264 (newLength-fileLength) +" bytes."); 265 } 266 this.uploadedFiles.add(logFile); 267 } catch (IOException e) { 268 String message = logErrorMessage(logFile, e); 269 out.write(message.getBytes(Charset.forName("UTF-8"))); 270 } finally { 271 IOUtils.cleanup(LOG, in); 272 } 273 } 274 } 275 276 @VisibleForTesting 277 public FileInputStream secureOpenFile(File logFile) throws IOException { 278 return SecureIOUtils.openForRead(logFile, getUser(), null); 279 } 280 281 private static String logErrorMessage(File logFile, Exception e) { 282 String message = "Error aggregating log file. Log file : " 283 + logFile.getAbsolutePath() + ". " + e.getMessage(); 284 LOG.error(message, e); 285 return message; 286 } 287 288 // Added for testing purpose. 289 public String getUser() { 290 return user; 291 } 292 293 private Set<File> getPendingLogFilesToUpload(File containerLogDir) { 294 Set<File> candidates = 295 new HashSet<File>(Arrays.asList(containerLogDir.listFiles())); 296 for (File logFile : candidates) { 297 this.allExistingFileMeta.add(getLogFileMetaData(logFile)); 298 } 299 300 if (this.logAggregationContext != null && candidates.size() > 0) { 301 filterFiles( 302 this.appFinished ? this.logAggregationContext.getIncludePattern() 303 : this.logAggregationContext.getRolledLogsIncludePattern(), 304 candidates, false); 305 306 filterFiles( 307 this.appFinished ? this.logAggregationContext.getExcludePattern() 308 : this.logAggregationContext.getRolledLogsExcludePattern(), 309 candidates, true); 310 311 Iterable<File> mask = 312 Iterables.filter(candidates, new Predicate<File>() { 313 @Override 314 public boolean apply(File next) { 315 return !alreadyUploadedLogFiles 316 .contains(getLogFileMetaData(next)); 317 } 318 }); 319 candidates = Sets.newHashSet(mask); 320 } 321 return candidates; 322 } 323 324 private void filterFiles(String pattern, Set<File> candidates, 325 boolean exclusion) { 326 if (pattern != null && !pattern.isEmpty()) { 327 Pattern filterPattern = Pattern.compile(pattern); 328 for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr 329 .hasNext();) { 330 File candidate = candidatesItr.next(); 331 boolean match = filterPattern.matcher(candidate.getName()).find(); 332 if ((!match && !exclusion) || (match && exclusion)) { 333 candidatesItr.remove(); 334 } 335 } 336 } 337 } 338 339 public Set<Path> getCurrentUpLoadedFilesPath() { 340 Set<Path> path = new HashSet<Path>(); 341 for (File file : this.uploadedFiles) { 342 path.add(new Path(file.getAbsolutePath())); 343 } 344 return path; 345 } 346 347 public Set<String> getCurrentUpLoadedFileMeta() { 348 Set<String> info = new HashSet<String>(); 349 for (File file : this.uploadedFiles) { 350 info.add(getLogFileMetaData(file)); 351 } 352 return info; 353 } 354 355 public Set<String> getAllExistingFilesMeta() { 356 return this.allExistingFileMeta; 357 } 358 359 private String getLogFileMetaData(File file) { 360 return containerId.toString() + "_" + file.getName() + "_" 361 + file.lastModified(); 362 } 363 } 364 365 /** 366 * The writer that writes out the aggregated logs. 367 */ 368 @Private 369 public static class LogWriter { 370 371 private final FSDataOutputStream fsDataOStream; 372 private final TFile.Writer writer; 373 private FileContext fc; 374 375 public LogWriter(final Configuration conf, final Path remoteAppLogFile, 376 UserGroupInformation userUgi) throws IOException { 377 try { 378 this.fsDataOStream = 379 userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() { 380 @Override 381 public FSDataOutputStream run() throws Exception { 382 fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf); 383 fc.setUMask(APP_LOG_FILE_UMASK); 384 return fc.create( 385 remoteAppLogFile, 386 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 387 new Options.CreateOpts[] {}); 388 } 389 }); 390 } catch (InterruptedException e) { 391 throw new IOException(e); 392 } 393 394 // Keys are not sorted: null arg 395 // 256KB minBlockSize : Expected log size for each container too 396 this.writer = 397 new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get( 398 YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE, 399 YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf); 400 //Write the version string 401 writeVersion(); 402 } 403 404 @VisibleForTesting 405 public TFile.Writer getWriter() { 406 return this.writer; 407 } 408 409 private void writeVersion() throws IOException { 410 DataOutputStream out = this.writer.prepareAppendKey(-1); 411 VERSION_KEY.write(out); 412 out.close(); 413 out = this.writer.prepareAppendValue(-1); 414 out.writeInt(VERSION); 415 out.close(); 416 } 417 418 public void writeApplicationOwner(String user) throws IOException { 419 DataOutputStream out = this.writer.prepareAppendKey(-1); 420 APPLICATION_OWNER_KEY.write(out); 421 out.close(); 422 out = this.writer.prepareAppendValue(-1); 423 out.writeUTF(user); 424 out.close(); 425 } 426 427 public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls) 428 throws IOException { 429 DataOutputStream out = this.writer.prepareAppendKey(-1); 430 APPLICATION_ACL_KEY.write(out); 431 out.close(); 432 out = this.writer.prepareAppendValue(-1); 433 for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) { 434 out.writeUTF(entry.getKey().toString()); 435 out.writeUTF(entry.getValue()); 436 } 437 out.close(); 438 } 439 440 public void append(LogKey logKey, LogValue logValue) throws IOException { 441 Set<File> pendingUploadFiles = 442 logValue.getPendingLogFilesToUploadForThisContainer(); 443 if (pendingUploadFiles.size() == 0) { 444 return; 445 } 446 DataOutputStream out = this.writer.prepareAppendKey(-1); 447 logKey.write(out); 448 out.close(); 449 out = this.writer.prepareAppendValue(-1); 450 logValue.write(out, pendingUploadFiles); 451 out.close(); 452 } 453 454 public void close() { 455 try { 456 this.writer.close(); 457 } catch (IOException e) { 458 LOG.warn("Exception closing writer", e); 459 } 460 IOUtils.closeStream(fsDataOStream); 461 } 462 } 463 464 @Public 465 @Evolving 466 public static class LogReader { 467 468 private final FSDataInputStream fsDataIStream; 469 private final TFile.Reader.Scanner scanner; 470 private final TFile.Reader reader; 471 472 public LogReader(Configuration conf, Path remoteAppLogFile) 473 throws IOException { 474 FileContext fileContext = 475 FileContext.getFileContext(remoteAppLogFile.toUri(), conf); 476 this.fsDataIStream = fileContext.open(remoteAppLogFile); 477 reader = 478 new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus( 479 remoteAppLogFile).getLen(), conf); 480 this.scanner = reader.createScanner(); 481 } 482 483 private boolean atBeginning = true; 484 485 /** 486 * Returns the owner of the application. 487 * 488 * @return the application owner. 489 * @throws IOException 490 */ 491 public String getApplicationOwner() throws IOException { 492 TFile.Reader.Scanner ownerScanner = null; 493 try { 494 ownerScanner = reader.createScanner(); 495 LogKey key = new LogKey(); 496 while (!ownerScanner.atEnd()) { 497 TFile.Reader.Scanner.Entry entry = ownerScanner.entry(); 498 key.readFields(entry.getKeyStream()); 499 if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) { 500 DataInputStream valueStream = entry.getValueStream(); 501 return valueStream.readUTF(); 502 } 503 ownerScanner.advance(); 504 } 505 return null; 506 } finally { 507 IOUtils.cleanup(LOG, ownerScanner); 508 } 509 } 510 511 /** 512 * Returns ACLs for the application. An empty map is returned if no ACLs are 513 * found. 514 * 515 * @return a map of the Application ACLs. 516 * @throws IOException 517 */ 518 public Map<ApplicationAccessType, String> getApplicationAcls() 519 throws IOException { 520 // TODO Seek directly to the key once a comparator is specified. 521 TFile.Reader.Scanner aclScanner = null; 522 try { 523 aclScanner = reader.createScanner(); 524 LogKey key = new LogKey(); 525 Map<ApplicationAccessType, String> acls = 526 new HashMap<ApplicationAccessType, String>(); 527 while (!aclScanner.atEnd()) { 528 TFile.Reader.Scanner.Entry entry = aclScanner.entry(); 529 key.readFields(entry.getKeyStream()); 530 if (key.toString().equals(APPLICATION_ACL_KEY.toString())) { 531 DataInputStream valueStream = entry.getValueStream(); 532 while (true) { 533 String appAccessOp = null; 534 String aclString = null; 535 try { 536 appAccessOp = valueStream.readUTF(); 537 } catch (EOFException e) { 538 // Valid end of stream. 539 break; 540 } 541 try { 542 aclString = valueStream.readUTF(); 543 } catch (EOFException e) { 544 throw new YarnRuntimeException("Error reading ACLs", e); 545 } 546 acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString); 547 } 548 } 549 aclScanner.advance(); 550 } 551 return acls; 552 } finally { 553 IOUtils.cleanup(LOG, aclScanner); 554 } 555 } 556 557 /** 558 * Read the next key and return the value-stream. 559 * 560 * @param key 561 * @return the valueStream if there are more keys or null otherwise. 562 * @throws IOException 563 */ 564 public DataInputStream next(LogKey key) throws IOException { 565 if (!this.atBeginning) { 566 this.scanner.advance(); 567 } else { 568 this.atBeginning = false; 569 } 570 if (this.scanner.atEnd()) { 571 return null; 572 } 573 TFile.Reader.Scanner.Entry entry = this.scanner.entry(); 574 key.readFields(entry.getKeyStream()); 575 // Skip META keys 576 if (RESERVED_KEYS.containsKey(key.toString())) { 577 return next(key); 578 } 579 DataInputStream valueStream = entry.getValueStream(); 580 return valueStream; 581 } 582 583 /** 584 * Get a ContainerLogsReader to read the logs for 585 * the specified container. 586 * 587 * @param containerId 588 * @return object to read the container's logs or null if the 589 * logs could not be found 590 * @throws IOException 591 */ 592 @Private 593 public ContainerLogsReader getContainerLogsReader( 594 ContainerId containerId) throws IOException { 595 ContainerLogsReader logReader = null; 596 597 final LogKey containerKey = new LogKey(containerId); 598 LogKey key = new LogKey(); 599 DataInputStream valueStream = next(key); 600 while (valueStream != null && !key.equals(containerKey)) { 601 valueStream = next(key); 602 } 603 604 if (valueStream != null) { 605 logReader = new ContainerLogsReader(valueStream); 606 } 607 608 return logReader; 609 } 610 611 //TODO Change Log format and interfaces to be containerId specific. 612 // Avoid returning completeValueStreams. 613// public List<String> getTypesForContainer(DataInputStream valueStream){} 614// 615// /** 616// * @param valueStream 617// * The Log stream for the container. 618// * @param fileType 619// * the log type required. 620// * @return An InputStreamReader for the required log type or null if the 621// * type is not found. 622// * @throws IOException 623// */ 624// public InputStreamReader getLogStreamForType(DataInputStream valueStream, 625// String fileType) throws IOException { 626// valueStream.reset(); 627// try { 628// while (true) { 629// String ft = valueStream.readUTF(); 630// String fileLengthStr = valueStream.readUTF(); 631// long fileLength = Long.parseLong(fileLengthStr); 632// if (ft.equals(fileType)) { 633// BoundedInputStream bis = 634// new BoundedInputStream(valueStream, fileLength); 635// return new InputStreamReader(bis); 636// } else { 637// long totalSkipped = 0; 638// long currSkipped = 0; 639// while (currSkipped != -1 && totalSkipped < fileLength) { 640// currSkipped = valueStream.skip(fileLength - totalSkipped); 641// totalSkipped += currSkipped; 642// } 643// // TODO Verify skip behaviour. 644// if (currSkipped == -1) { 645// return null; 646// } 647// } 648// } 649// } catch (EOFException e) { 650// return null; 651// } 652// } 653 654 /** 655 * Writes all logs for a single container to the provided writer. 656 * @param valueStream 657 * @param writer 658 * @param logUploadedTime 659 * @throws IOException 660 */ 661 public static void readAcontainerLogs(DataInputStream valueStream, 662 Writer writer, long logUploadedTime) throws IOException { 663 OutputStream os = null; 664 PrintStream ps = null; 665 try { 666 os = new WriterOutputStream(writer, Charset.forName("UTF-8")); 667 ps = new PrintStream(os); 668 while (true) { 669 try { 670 readContainerLogs(valueStream, ps, logUploadedTime); 671 } catch (EOFException e) { 672 // EndOfFile 673 return; 674 } 675 } 676 } finally { 677 IOUtils.cleanup(LOG, ps); 678 IOUtils.cleanup(LOG, os); 679 } 680 } 681 682 /** 683 * Writes all logs for a single container to the provided writer. 684 * @param valueStream 685 * @param writer 686 * @throws IOException 687 */ 688 public static void readAcontainerLogs(DataInputStream valueStream, 689 Writer writer) throws IOException { 690 readAcontainerLogs(valueStream, writer, -1); 691 } 692 693 private static void readContainerLogs(DataInputStream valueStream, 694 PrintStream out, long logUploadedTime) throws IOException { 695 byte[] buf = new byte[65535]; 696 697 String fileType = valueStream.readUTF(); 698 String fileLengthStr = valueStream.readUTF(); 699 long fileLength = Long.parseLong(fileLengthStr); 700 out.print("LogType:"); 701 out.println(fileType); 702 if (logUploadedTime != -1) { 703 out.print("Log Upload Time:"); 704 out.println(Times.format(logUploadedTime)); 705 } 706 out.print("LogLength:"); 707 out.println(fileLengthStr); 708 out.println("Log Contents:"); 709 710 long curRead = 0; 711 long pendingRead = fileLength - curRead; 712 int toRead = 713 pendingRead > buf.length ? buf.length : (int) pendingRead; 714 int len = valueStream.read(buf, 0, toRead); 715 while (len != -1 && curRead < fileLength) { 716 out.write(buf, 0, len); 717 curRead += len; 718 719 pendingRead = fileLength - curRead; 720 toRead = 721 pendingRead > buf.length ? buf.length : (int) pendingRead; 722 len = valueStream.read(buf, 0, toRead); 723 } 724 out.println("End of LogType:" + fileType); 725 out.println(""); 726 } 727 728 /** 729 * Keep calling this till you get a {@link EOFException} for getting logs of 730 * all types for a single container. 731 * 732 * @param valueStream 733 * @param out 734 * @param logUploadedTime 735 * @throws IOException 736 */ 737 public static void readAContainerLogsForALogType( 738 DataInputStream valueStream, PrintStream out, long logUploadedTime) 739 throws IOException { 740 readContainerLogs(valueStream, out, logUploadedTime); 741 } 742 743 /** 744 * Keep calling this till you get a {@link EOFException} for getting logs of 745 * all types for a single container. 746 * 747 * @param valueStream 748 * @param out 749 * @throws IOException 750 */ 751 public static void readAContainerLogsForALogType( 752 DataInputStream valueStream, PrintStream out) 753 throws IOException { 754 readAContainerLogsForALogType(valueStream, out, -1); 755 } 756 757 /** 758 * Keep calling this till you get a {@link EOFException} for getting logs of 759 * the specific types for a single container. 760 * @param valueStream 761 * @param out 762 * @param logUploadedTime 763 * @param logType 764 * @throws IOException 765 */ 766 public static int readContainerLogsForALogType( 767 DataInputStream valueStream, PrintStream out, long logUploadedTime, 768 List<String> logType) throws IOException { 769 byte[] buf = new byte[65535]; 770 771 String fileType = valueStream.readUTF(); 772 String fileLengthStr = valueStream.readUTF(); 773 long fileLength = Long.parseLong(fileLengthStr); 774 if (logType.contains(fileType)) { 775 out.print("LogType:"); 776 out.println(fileType); 777 if (logUploadedTime != -1) { 778 out.print("Log Upload Time:"); 779 out.println(Times.format(logUploadedTime)); 780 } 781 out.print("LogLength:"); 782 out.println(fileLengthStr); 783 out.println("Log Contents:"); 784 785 long curRead = 0; 786 long pendingRead = fileLength - curRead; 787 int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; 788 int len = valueStream.read(buf, 0, toRead); 789 while (len != -1 && curRead < fileLength) { 790 out.write(buf, 0, len); 791 curRead += len; 792 793 pendingRead = fileLength - curRead; 794 toRead = pendingRead > buf.length ? buf.length : (int) pendingRead; 795 len = valueStream.read(buf, 0, toRead); 796 } 797 out.println("End of LogType:" + fileType); 798 out.println(""); 799 return 0; 800 } else { 801 long totalSkipped = 0; 802 long currSkipped = 0; 803 while (currSkipped != -1 && totalSkipped < fileLength) { 804 currSkipped = valueStream.skip(fileLength - totalSkipped); 805 totalSkipped += currSkipped; 806 } 807 return -1; 808 } 809 } 810 811 public void close() { 812 IOUtils.cleanup(LOG, scanner, reader, fsDataIStream); 813 } 814 } 815 816 @Private 817 public static class ContainerLogsReader { 818 private DataInputStream valueStream; 819 private String currentLogType = null; 820 private long currentLogLength = 0; 821 private BoundedInputStream currentLogData = null; 822 private InputStreamReader currentLogISR; 823 824 public ContainerLogsReader(DataInputStream stream) { 825 valueStream = stream; 826 } 827 828 public String nextLog() throws IOException { 829 if (currentLogData != null && currentLogLength > 0) { 830 // seek to the end of the current log, relying on BoundedInputStream 831 // to prevent seeking past the end of the current log 832 do { 833 if (currentLogData.skip(currentLogLength) < 0) { 834 break; 835 } 836 } while (currentLogData.read() != -1); 837 } 838 839 currentLogType = null; 840 currentLogLength = 0; 841 currentLogData = null; 842 currentLogISR = null; 843 844 try { 845 String logType = valueStream.readUTF(); 846 String logLengthStr = valueStream.readUTF(); 847 currentLogLength = Long.parseLong(logLengthStr); 848 currentLogData = 849 new BoundedInputStream(valueStream, currentLogLength); 850 currentLogData.setPropagateClose(false); 851 currentLogISR = new InputStreamReader(currentLogData, 852 Charset.forName("UTF-8")); 853 currentLogType = logType; 854 } catch (EOFException e) { 855 } 856 857 return currentLogType; 858 } 859 860 public String getCurrentLogType() { 861 return currentLogType; 862 } 863 864 public long getCurrentLogLength() { 865 return currentLogLength; 866 } 867 868 public long skip(long n) throws IOException { 869 return currentLogData.skip(n); 870 } 871 872 public int read() throws IOException { 873 return currentLogData.read(); 874 } 875 876 public int read(byte[] buf, int off, int len) throws IOException { 877 return currentLogData.read(buf, off, len); 878 } 879 880 public int read(char[] buf, int off, int len) throws IOException { 881 return currentLogISR.read(buf, off, len); 882 } 883 } 884}