001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.logaggregation;
020
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.DataOutputStream;
025import java.io.EOFException;
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.IOException;
029import java.io.InputStreamReader;
030import java.io.OutputStream;
031import java.io.PrintStream;
032import java.io.Writer;
033import java.nio.charset.Charset;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collections;
038import java.util.EnumSet;
039import java.util.HashMap;
040import java.util.HashSet;
041import java.util.Iterator;
042import java.util.List;
043import java.util.Map;
044import java.util.Map.Entry;
045import java.util.Set;
046import java.util.regex.Pattern;
047
048import org.apache.commons.io.input.BoundedInputStream;
049import org.apache.commons.io.output.WriterOutputStream;
050import org.apache.commons.logging.Log;
051import org.apache.commons.logging.LogFactory;
052import org.apache.hadoop.classification.InterfaceAudience.Private;
053import org.apache.hadoop.classification.InterfaceAudience.Public;
054import org.apache.hadoop.classification.InterfaceStability.Evolving;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.CreateFlag;
057import org.apache.hadoop.fs.FSDataInputStream;
058import org.apache.hadoop.fs.FSDataOutputStream;
059import org.apache.hadoop.fs.FileContext;
060import org.apache.hadoop.fs.Options;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.fs.permission.FsPermission;
063import org.apache.hadoop.io.IOUtils;
064import org.apache.hadoop.io.SecureIOUtils;
065import org.apache.hadoop.io.Writable;
066import org.apache.hadoop.io.file.tfile.TFile;
067import org.apache.hadoop.security.UserGroupInformation;
068import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
069import org.apache.hadoop.yarn.api.records.ContainerId;
070import org.apache.hadoop.yarn.api.records.LogAggregationContext;
071import org.apache.hadoop.yarn.conf.YarnConfiguration;
072import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
073import org.apache.hadoop.yarn.util.ConverterUtils;
074import org.apache.hadoop.yarn.util.Times;
075
076import com.google.common.annotations.VisibleForTesting;
077import com.google.common.base.Predicate;
078import com.google.common.collect.Iterables;
079import com.google.common.collect.Sets;
080
081@Public
082@Evolving
083public class AggregatedLogFormat {
084
085  private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
086  private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
087  private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
088  private static final LogKey VERSION_KEY = new LogKey("VERSION");
089  private static final Map<String, LogKey> RESERVED_KEYS;
090  //Maybe write out the retention policy.
091  //Maybe write out a list of containerLogs skipped by the retention policy.
092  private static final int VERSION = 1;
093
094  /**
095   * Umask for the log file.
096   */
097  private static final FsPermission APP_LOG_FILE_UMASK = FsPermission
098      .createImmutable((short) (0640 ^ 0777));
099
100
101  static {
102    RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
103    RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
104    RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
105    RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
106  }
107
108  @Public
109  public static class LogKey implements Writable {
110
111    private String keyString;
112
113    public LogKey() {
114
115    }
116
117    public LogKey(ContainerId containerId) {
118      this.keyString = containerId.toString();
119    }
120
121    public LogKey(String keyString) {
122      this.keyString = keyString;
123    }
124    
125    @Override
126    public int hashCode() {
127      return keyString == null ? 0 : keyString.hashCode();
128    }
129
130    @Override
131    public boolean equals(Object obj) {
132      if (obj instanceof LogKey) {
133        LogKey other = (LogKey) obj;
134        if (this.keyString == null) {
135          return other.keyString == null;
136        }
137        return this.keyString.equals(other.keyString);
138      }
139      return false;
140    }
141
142    @Private
143    @Override
144    public void write(DataOutput out) throws IOException {
145      out.writeUTF(this.keyString);
146    }
147
148    @Private
149    @Override
150    public void readFields(DataInput in) throws IOException {
151      this.keyString = in.readUTF();
152    }
153
154    @Override
155    public String toString() {
156      return this.keyString;
157    }
158  }
159
160  @Private
161  public static class LogValue {
162
163    private final List<String> rootLogDirs;
164    private final ContainerId containerId;
165    private final String user;
166    private final LogAggregationContext logAggregationContext;
167    private Set<File> uploadedFiles = new HashSet<File>();
168    private final Set<String> alreadyUploadedLogFiles;
169    private Set<String> allExistingFileMeta = new HashSet<String>();
170    private final boolean appFinished;
171    // TODO Maybe add a version string here. Instead of changing the version of
172    // the entire k-v format
173
174    public LogValue(List<String> rootLogDirs, ContainerId containerId,
175        String user) {
176      this(rootLogDirs, containerId, user, null, new HashSet<String>(), true);
177    }
178
179    public LogValue(List<String> rootLogDirs, ContainerId containerId,
180        String user, LogAggregationContext logAggregationContext,
181        Set<String> alreadyUploadedLogFiles, boolean appFinished) {
182      this.rootLogDirs = new ArrayList<String>(rootLogDirs);
183      this.containerId = containerId;
184      this.user = user;
185
186      // Ensure logs are processed in lexical order
187      Collections.sort(this.rootLogDirs);
188      this.logAggregationContext = logAggregationContext;
189      this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
190      this.appFinished = appFinished;
191    }
192
193    private Set<File> getPendingLogFilesToUploadForThisContainer() {
194      Set<File> pendingUploadFiles = new HashSet<File>();
195      for (String rootLogDir : this.rootLogDirs) {
196        File appLogDir =
197            new File(rootLogDir, 
198                ConverterUtils.toString(
199                    this.containerId.getApplicationAttemptId().
200                        getApplicationId())
201                );
202        File containerLogDir =
203            new File(appLogDir, ConverterUtils.toString(this.containerId));
204
205        if (!containerLogDir.isDirectory()) {
206          continue; // ContainerDir may have been deleted by the user.
207        }
208
209        pendingUploadFiles
210          .addAll(getPendingLogFilesToUpload(containerLogDir));
211      }
212      return pendingUploadFiles;
213    }
214
215    public void write(DataOutputStream out, Set<File> pendingUploadFiles)
216        throws IOException {
217      List<File> fileList = new ArrayList<File>(pendingUploadFiles);
218      Collections.sort(fileList);
219
220      for (File logFile : fileList) {
221        // We only aggregate top level files.
222        // Ignore anything inside sub-folders.
223        if (logFile.isDirectory()) {
224          LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it.");
225          continue;
226        }
227
228        FileInputStream in = null;
229        try {
230          in = secureOpenFile(logFile);
231        } catch (IOException e) {
232          logErrorMessage(logFile, e);
233          IOUtils.cleanup(LOG, in);
234          continue;
235        }
236
237        final long fileLength = logFile.length();
238        // Write the logFile Type
239        out.writeUTF(logFile.getName());
240
241        // Write the log length as UTF so that it is printable
242        out.writeUTF(String.valueOf(fileLength));
243
244        // Write the log itself
245        try {
246          byte[] buf = new byte[65535];
247          int len = 0;
248          long bytesLeft = fileLength;
249          while ((len = in.read(buf)) != -1) {
250            //If buffer contents within fileLength, write
251            if (len < bytesLeft) {
252              out.write(buf, 0, len);
253              bytesLeft-=len;
254            }
255            //else only write contents within fileLength, then exit early
256            else {
257              out.write(buf, 0, (int)bytesLeft);
258              break;
259            }
260          }
261          long newLength = logFile.length();
262          if(fileLength < newLength) {
263            LOG.warn("Aggregated logs truncated by approximately "+
264                (newLength-fileLength) +" bytes.");
265          }
266          this.uploadedFiles.add(logFile);
267        } catch (IOException e) {
268          String message = logErrorMessage(logFile, e);
269          out.write(message.getBytes(Charset.forName("UTF-8")));
270        } finally {
271          IOUtils.cleanup(LOG, in);
272        }
273      }
274    }
275
276    @VisibleForTesting
277    public FileInputStream secureOpenFile(File logFile) throws IOException {
278      return SecureIOUtils.openForRead(logFile, getUser(), null);
279    }
280
281    private static String logErrorMessage(File logFile, Exception e) {
282      String message = "Error aggregating log file. Log file : "
283          + logFile.getAbsolutePath() + ". " + e.getMessage();
284      LOG.error(message, e);
285      return message;
286    }
287
288    // Added for testing purpose.
289    public String getUser() {
290      return user;
291    }
292
293    private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
294      Set<File> candidates =
295          new HashSet<File>(Arrays.asList(containerLogDir.listFiles()));
296      for (File logFile : candidates) {
297        this.allExistingFileMeta.add(getLogFileMetaData(logFile));
298      }
299
300      if (this.logAggregationContext != null && candidates.size() > 0) {
301        filterFiles(
302          this.appFinished ? this.logAggregationContext.getIncludePattern()
303              : this.logAggregationContext.getRolledLogsIncludePattern(),
304          candidates, false);
305
306        filterFiles(
307          this.appFinished ? this.logAggregationContext.getExcludePattern()
308              : this.logAggregationContext.getRolledLogsExcludePattern(),
309          candidates, true);
310
311        Iterable<File> mask =
312            Iterables.filter(candidates, new Predicate<File>() {
313              @Override
314              public boolean apply(File next) {
315                return !alreadyUploadedLogFiles
316                  .contains(getLogFileMetaData(next));
317              }
318            });
319        candidates = Sets.newHashSet(mask);
320      }
321      return candidates;
322    }
323
324    private void filterFiles(String pattern, Set<File> candidates,
325        boolean exclusion) {
326      if (pattern != null && !pattern.isEmpty()) {
327        Pattern filterPattern = Pattern.compile(pattern);
328        for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
329          .hasNext();) {
330          File candidate = candidatesItr.next();
331          boolean match = filterPattern.matcher(candidate.getName()).find();
332          if ((!match && !exclusion) || (match && exclusion)) {
333            candidatesItr.remove();
334          }
335        }
336      }
337    }
338
339    public Set<Path> getCurrentUpLoadedFilesPath() {
340      Set<Path> path = new HashSet<Path>();
341      for (File file : this.uploadedFiles) {
342        path.add(new Path(file.getAbsolutePath()));
343      }
344      return path;
345    }
346
347    public Set<String> getCurrentUpLoadedFileMeta() {
348      Set<String> info = new HashSet<String>();
349      for (File file : this.uploadedFiles) {
350        info.add(getLogFileMetaData(file));
351      }
352      return info;
353    }
354
355    public Set<String> getAllExistingFilesMeta() {
356      return this.allExistingFileMeta;
357    }
358
359    private String getLogFileMetaData(File file) {
360      return containerId.toString() + "_" + file.getName() + "_"
361          + file.lastModified();
362    }
363  }
364
365  /**
366   * The writer that writes out the aggregated logs.
367   */
368  @Private
369  public static class LogWriter {
370
371    private final FSDataOutputStream fsDataOStream;
372    private final TFile.Writer writer;
373    private FileContext fc;
374
375    public LogWriter(final Configuration conf, final Path remoteAppLogFile,
376        UserGroupInformation userUgi) throws IOException {
377      try {
378        this.fsDataOStream =
379            userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
380              @Override
381              public FSDataOutputStream run() throws Exception {
382                fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
383                fc.setUMask(APP_LOG_FILE_UMASK);
384                return fc.create(
385                    remoteAppLogFile,
386                    EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
387                    new Options.CreateOpts[] {});
388              }
389            });
390      } catch (InterruptedException e) {
391        throw new IOException(e);
392      }
393
394      // Keys are not sorted: null arg
395      // 256KB minBlockSize : Expected log size for each container too
396      this.writer =
397          new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
398              YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
399              YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
400      //Write the version string
401      writeVersion();
402    }
403
404    @VisibleForTesting
405    public TFile.Writer getWriter() {
406      return this.writer;
407    }
408
409    private void writeVersion() throws IOException {
410      DataOutputStream out = this.writer.prepareAppendKey(-1);
411      VERSION_KEY.write(out);
412      out.close();
413      out = this.writer.prepareAppendValue(-1);
414      out.writeInt(VERSION);
415      out.close();
416    }
417
418    public void writeApplicationOwner(String user) throws IOException {
419      DataOutputStream out = this.writer.prepareAppendKey(-1);
420      APPLICATION_OWNER_KEY.write(out);
421      out.close();
422      out = this.writer.prepareAppendValue(-1);
423      out.writeUTF(user);
424      out.close();
425    }
426
427    public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
428        throws IOException {
429      DataOutputStream out = this.writer.prepareAppendKey(-1);
430      APPLICATION_ACL_KEY.write(out);
431      out.close();
432      out = this.writer.prepareAppendValue(-1);
433      for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
434        out.writeUTF(entry.getKey().toString());
435        out.writeUTF(entry.getValue());
436      }
437      out.close();
438    }
439
440    public void append(LogKey logKey, LogValue logValue) throws IOException {
441      Set<File> pendingUploadFiles =
442          logValue.getPendingLogFilesToUploadForThisContainer();
443      if (pendingUploadFiles.size() == 0) {
444        return;
445      }
446      DataOutputStream out = this.writer.prepareAppendKey(-1);
447      logKey.write(out);
448      out.close();
449      out = this.writer.prepareAppendValue(-1);
450      logValue.write(out, pendingUploadFiles);
451      out.close();
452    }
453
454    public void close() {
455      try {
456        this.writer.close();
457      } catch (IOException e) {
458        LOG.warn("Exception closing writer", e);
459      }
460      IOUtils.closeStream(fsDataOStream);
461    }
462  }
463
464  @Public
465  @Evolving
466  public static class LogReader {
467
468    private final FSDataInputStream fsDataIStream;
469    private final TFile.Reader.Scanner scanner;
470    private final TFile.Reader reader;
471
472    public LogReader(Configuration conf, Path remoteAppLogFile)
473        throws IOException {
474      FileContext fileContext =
475          FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
476      this.fsDataIStream = fileContext.open(remoteAppLogFile);
477      reader =
478          new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
479              remoteAppLogFile).getLen(), conf);
480      this.scanner = reader.createScanner();
481    }
482
483    private boolean atBeginning = true;
484
485    /**
486     * Returns the owner of the application.
487     * 
488     * @return the application owner.
489     * @throws IOException
490     */
491    public String getApplicationOwner() throws IOException {
492      TFile.Reader.Scanner ownerScanner = null;
493      try {
494        ownerScanner = reader.createScanner();
495        LogKey key = new LogKey();
496        while (!ownerScanner.atEnd()) {
497          TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
498          key.readFields(entry.getKeyStream());
499          if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
500            DataInputStream valueStream = entry.getValueStream();
501            return valueStream.readUTF();
502          }
503          ownerScanner.advance();
504        }
505        return null;
506      } finally {
507        IOUtils.cleanup(LOG, ownerScanner);
508      }
509    }
510
511    /**
512     * Returns ACLs for the application. An empty map is returned if no ACLs are
513     * found.
514     * 
515     * @return a map of the Application ACLs.
516     * @throws IOException
517     */
518    public Map<ApplicationAccessType, String> getApplicationAcls()
519        throws IOException {
520      // TODO Seek directly to the key once a comparator is specified.
521      TFile.Reader.Scanner aclScanner = null;
522      try {
523        aclScanner = reader.createScanner();
524        LogKey key = new LogKey();
525        Map<ApplicationAccessType, String> acls =
526            new HashMap<ApplicationAccessType, String>();
527        while (!aclScanner.atEnd()) {
528          TFile.Reader.Scanner.Entry entry = aclScanner.entry();
529          key.readFields(entry.getKeyStream());
530          if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
531            DataInputStream valueStream = entry.getValueStream();
532            while (true) {
533              String appAccessOp = null;
534              String aclString = null;
535              try {
536                appAccessOp = valueStream.readUTF();
537              } catch (EOFException e) {
538                // Valid end of stream.
539                break;
540              }
541              try {
542                aclString = valueStream.readUTF();
543              } catch (EOFException e) {
544                throw new YarnRuntimeException("Error reading ACLs", e);
545              }
546              acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
547            }
548          }
549          aclScanner.advance();
550        }
551        return acls;
552      } finally {
553        IOUtils.cleanup(LOG, aclScanner);
554      }
555    }
556
557    /**
558     * Read the next key and return the value-stream.
559     * 
560     * @param key
561     * @return the valueStream if there are more keys or null otherwise.
562     * @throws IOException
563     */
564    public DataInputStream next(LogKey key) throws IOException {
565      if (!this.atBeginning) {
566        this.scanner.advance();
567      } else {
568        this.atBeginning = false;
569      }
570      if (this.scanner.atEnd()) {
571        return null;
572      }
573      TFile.Reader.Scanner.Entry entry = this.scanner.entry();
574      key.readFields(entry.getKeyStream());
575      // Skip META keys
576      if (RESERVED_KEYS.containsKey(key.toString())) {
577        return next(key);
578      }
579      DataInputStream valueStream = entry.getValueStream();
580      return valueStream;
581    }
582
583    /**
584     * Get a ContainerLogsReader to read the logs for
585     * the specified container.
586     *
587     * @param containerId
588     * @return object to read the container's logs or null if the
589     *         logs could not be found
590     * @throws IOException
591     */
592    @Private
593    public ContainerLogsReader getContainerLogsReader(
594        ContainerId containerId) throws IOException {
595      ContainerLogsReader logReader = null;
596
597      final LogKey containerKey = new LogKey(containerId);
598      LogKey key = new LogKey();
599      DataInputStream valueStream = next(key);
600      while (valueStream != null && !key.equals(containerKey)) {
601        valueStream = next(key);
602      }
603
604      if (valueStream != null) {
605        logReader = new ContainerLogsReader(valueStream);
606      }
607
608      return logReader;
609    }
610
611    //TODO  Change Log format and interfaces to be containerId specific.
612    // Avoid returning completeValueStreams.
613//    public List<String> getTypesForContainer(DataInputStream valueStream){}
614//    
615//    /**
616//     * @param valueStream
617//     *          The Log stream for the container.
618//     * @param fileType
619//     *          the log type required.
620//     * @return An InputStreamReader for the required log type or null if the
621//     *         type is not found.
622//     * @throws IOException
623//     */
624//    public InputStreamReader getLogStreamForType(DataInputStream valueStream,
625//        String fileType) throws IOException {
626//      valueStream.reset();
627//      try {
628//        while (true) {
629//          String ft = valueStream.readUTF();
630//          String fileLengthStr = valueStream.readUTF();
631//          long fileLength = Long.parseLong(fileLengthStr);
632//          if (ft.equals(fileType)) {
633//            BoundedInputStream bis =
634//                new BoundedInputStream(valueStream, fileLength);
635//            return new InputStreamReader(bis);
636//          } else {
637//            long totalSkipped = 0;
638//            long currSkipped = 0;
639//            while (currSkipped != -1 && totalSkipped < fileLength) {
640//              currSkipped = valueStream.skip(fileLength - totalSkipped);
641//              totalSkipped += currSkipped;
642//            }
643//            // TODO Verify skip behaviour.
644//            if (currSkipped == -1) {
645//              return null;
646//            }
647//          }
648//        }
649//      } catch (EOFException e) {
650//        return null;
651//      }
652//    }
653
654    /**
655     * Writes all logs for a single container to the provided writer.
656     * @param valueStream
657     * @param writer
658     * @param logUploadedTime
659     * @throws IOException
660     */
661    public static void readAcontainerLogs(DataInputStream valueStream,
662        Writer writer, long logUploadedTime) throws IOException {
663      OutputStream os = null;
664      PrintStream ps = null;
665      try {
666        os = new WriterOutputStream(writer, Charset.forName("UTF-8"));
667        ps = new PrintStream(os);
668        while (true) {
669          try {
670            readContainerLogs(valueStream, ps, logUploadedTime);
671          } catch (EOFException e) {
672            // EndOfFile
673            return;
674          }
675        }
676      } finally {
677        IOUtils.cleanup(LOG, ps);
678        IOUtils.cleanup(LOG, os);
679      }
680    }
681
682    /**
683     * Writes all logs for a single container to the provided writer.
684     * @param valueStream
685     * @param writer
686     * @throws IOException
687     */
688    public static void readAcontainerLogs(DataInputStream valueStream,
689        Writer writer) throws IOException {
690      readAcontainerLogs(valueStream, writer, -1);
691    }
692
693    private static void readContainerLogs(DataInputStream valueStream,
694        PrintStream out, long logUploadedTime) throws IOException {
695      byte[] buf = new byte[65535];
696
697      String fileType = valueStream.readUTF();
698      String fileLengthStr = valueStream.readUTF();
699      long fileLength = Long.parseLong(fileLengthStr);
700      out.print("LogType:");
701      out.println(fileType);
702      if (logUploadedTime != -1) {
703        out.print("Log Upload Time:");
704        out.println(Times.format(logUploadedTime));
705      }
706      out.print("LogLength:");
707      out.println(fileLengthStr);
708      out.println("Log Contents:");
709
710      long curRead = 0;
711      long pendingRead = fileLength - curRead;
712      int toRead =
713                pendingRead > buf.length ? buf.length : (int) pendingRead;
714      int len = valueStream.read(buf, 0, toRead);
715      while (len != -1 && curRead < fileLength) {
716        out.write(buf, 0, len);
717        curRead += len;
718
719        pendingRead = fileLength - curRead;
720        toRead =
721                  pendingRead > buf.length ? buf.length : (int) pendingRead;
722        len = valueStream.read(buf, 0, toRead);
723      }
724      out.println("End of LogType:" + fileType);
725      out.println("");
726    }
727
728    /**
729     * Keep calling this till you get a {@link EOFException} for getting logs of
730     * all types for a single container.
731     * 
732     * @param valueStream
733     * @param out
734     * @param logUploadedTime
735     * @throws IOException
736     */
737    public static void readAContainerLogsForALogType(
738        DataInputStream valueStream, PrintStream out, long logUploadedTime)
739          throws IOException {
740      readContainerLogs(valueStream, out, logUploadedTime);
741    }
742
743    /**
744     * Keep calling this till you get a {@link EOFException} for getting logs of
745     * all types for a single container.
746     * 
747     * @param valueStream
748     * @param out
749     * @throws IOException
750     */
751    public static void readAContainerLogsForALogType(
752        DataInputStream valueStream, PrintStream out)
753          throws IOException {
754      readAContainerLogsForALogType(valueStream, out, -1);
755    }
756
757    /**
758     * Keep calling this till you get a {@link EOFException} for getting logs of
759     * the specific types for a single container.
760     * @param valueStream
761     * @param out
762     * @param logUploadedTime
763     * @param logType
764     * @throws IOException
765     */
766    public static int readContainerLogsForALogType(
767        DataInputStream valueStream, PrintStream out, long logUploadedTime,
768        List<String> logType) throws IOException {
769      byte[] buf = new byte[65535];
770
771      String fileType = valueStream.readUTF();
772      String fileLengthStr = valueStream.readUTF();
773      long fileLength = Long.parseLong(fileLengthStr);
774      if (logType.contains(fileType)) {
775        out.print("LogType:");
776        out.println(fileType);
777        if (logUploadedTime != -1) {
778          out.print("Log Upload Time:");
779          out.println(Times.format(logUploadedTime));
780        }
781        out.print("LogLength:");
782        out.println(fileLengthStr);
783        out.println("Log Contents:");
784
785        long curRead = 0;
786        long pendingRead = fileLength - curRead;
787        int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
788        int len = valueStream.read(buf, 0, toRead);
789        while (len != -1 && curRead < fileLength) {
790          out.write(buf, 0, len);
791          curRead += len;
792
793          pendingRead = fileLength - curRead;
794          toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
795          len = valueStream.read(buf, 0, toRead);
796        }
797        out.println("End of LogType:" + fileType);
798        out.println("");
799        return 0;
800      } else {
801        long totalSkipped = 0;
802        long currSkipped = 0;
803        while (currSkipped != -1 && totalSkipped < fileLength) {
804          currSkipped = valueStream.skip(fileLength - totalSkipped);
805          totalSkipped += currSkipped;
806        }
807        return -1;
808      }
809    }
810
811    public void close() {
812      IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
813    }
814  }
815
816  @Private
817  public static class ContainerLogsReader {
818    private DataInputStream valueStream;
819    private String currentLogType = null;
820    private long currentLogLength = 0;
821    private BoundedInputStream currentLogData = null;
822    private InputStreamReader currentLogISR;
823
824    public ContainerLogsReader(DataInputStream stream) {
825      valueStream = stream;
826    }
827
828    public String nextLog() throws IOException {
829      if (currentLogData != null && currentLogLength > 0) {
830        // seek to the end of the current log, relying on BoundedInputStream
831        // to prevent seeking past the end of the current log
832        do {
833          if (currentLogData.skip(currentLogLength) < 0) {
834            break;
835          }
836        } while (currentLogData.read() != -1);
837      }
838
839      currentLogType = null;
840      currentLogLength = 0;
841      currentLogData = null;
842      currentLogISR = null;
843
844      try {
845        String logType = valueStream.readUTF();
846        String logLengthStr = valueStream.readUTF();
847        currentLogLength = Long.parseLong(logLengthStr);
848        currentLogData =
849            new BoundedInputStream(valueStream, currentLogLength);
850        currentLogData.setPropagateClose(false);
851        currentLogISR = new InputStreamReader(currentLogData,
852            Charset.forName("UTF-8"));
853        currentLogType = logType;
854      } catch (EOFException e) {
855      }
856
857      return currentLogType;
858    }
859
860    public String getCurrentLogType() {
861      return currentLogType;
862    }
863
864    public long getCurrentLogLength() {
865      return currentLogLength;
866    }
867
868    public long skip(long n) throws IOException {
869      return currentLogData.skip(n);
870    }
871
872    public int read() throws IOException {
873      return currentLogData.read();
874    }
875
876    public int read(byte[] buf, int off, int len) throws IOException {
877      return currentLogData.read(buf, off, len);
878    }
879
880    public int read(char[] buf, int off, int len) throws IOException {
881      return currentLogISR.read(buf, off, len);
882    }
883  }
884}