001/**
002* Licensed to the Apache Software Foundation (ASF) under one
003* or more contributor license agreements.  See the NOTICE file
004* distributed with this work for additional information
005* regarding copyright ownership.  The ASF licenses this file
006* to you under the Apache License, Version 2.0 (the
007* "License"); you may not use this file except in compliance
008* with the License.  You may obtain a copy of the License at
009*
010*     http://www.apache.org/licenses/LICENSE-2.0
011*
012* Unless required by applicable law or agreed to in writing, software
013* distributed under the License is distributed on an "AS IS" BASIS,
014* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015* See the License for the specific language governing permissions and
016* limitations under the License.
017*/
018
019package org.apache.hadoop.yarn.logaggregation;
020
021import java.io.DataInput;
022import java.io.DataInputStream;
023import java.io.DataOutput;
024import java.io.DataOutputStream;
025import java.io.EOFException;
026import java.io.File;
027import java.io.FileInputStream;
028import java.io.IOException;
029import java.io.InputStreamReader;
030import java.io.OutputStream;
031import java.io.PrintStream;
032import java.io.Writer;
033import java.nio.charset.Charset;
034import java.security.PrivilegedExceptionAction;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collections;
038import java.util.EnumSet;
039import java.util.HashMap;
040import java.util.HashSet;
041import java.util.Iterator;
042import java.util.List;
043import java.util.Map;
044import java.util.Map.Entry;
045import java.util.Set;
046import java.util.regex.Pattern;
047
048import org.apache.commons.io.input.BoundedInputStream;
049import org.apache.commons.io.output.WriterOutputStream;
050import org.apache.commons.logging.Log;
051import org.apache.commons.logging.LogFactory;
052import org.apache.hadoop.classification.InterfaceAudience.Private;
053import org.apache.hadoop.classification.InterfaceAudience.Public;
054import org.apache.hadoop.classification.InterfaceStability.Evolving;
055import org.apache.hadoop.conf.Configuration;
056import org.apache.hadoop.fs.CreateFlag;
057import org.apache.hadoop.fs.FSDataInputStream;
058import org.apache.hadoop.fs.FSDataOutputStream;
059import org.apache.hadoop.fs.FileContext;
060import org.apache.hadoop.fs.Options;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.fs.permission.FsPermission;
063import org.apache.hadoop.io.IOUtils;
064import org.apache.hadoop.io.SecureIOUtils;
065import org.apache.hadoop.io.Writable;
066import org.apache.hadoop.io.file.tfile.TFile;
067import org.apache.hadoop.security.UserGroupInformation;
068import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
069import org.apache.hadoop.yarn.api.records.ContainerId;
070import org.apache.hadoop.yarn.api.records.LogAggregationContext;
071import org.apache.hadoop.yarn.conf.YarnConfiguration;
072import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
073import org.apache.hadoop.yarn.util.ConverterUtils;
074import org.apache.hadoop.yarn.util.Times;
075
076import com.google.common.annotations.VisibleForTesting;
077import com.google.common.base.Predicate;
078import com.google.common.collect.Iterables;
079import com.google.common.collect.Sets;
080
081@Public
082@Evolving
083public class AggregatedLogFormat {
084
085  private static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
086  private static final LogKey APPLICATION_ACL_KEY = new LogKey("APPLICATION_ACL");
087  private static final LogKey APPLICATION_OWNER_KEY = new LogKey("APPLICATION_OWNER");
088  private static final LogKey VERSION_KEY = new LogKey("VERSION");
089  private static final Map<String, LogKey> RESERVED_KEYS;
090  //Maybe write out the retention policy.
091  //Maybe write out a list of containerLogs skipped by the retention policy.
092  private static final int VERSION = 1;
093
094  /**
095   * Umask for the log file.
096   */
097  private static final FsPermission APP_LOG_FILE_UMASK = FsPermission
098      .createImmutable((short) (0640 ^ 0777));
099
100
101  static {
102    RESERVED_KEYS = new HashMap<String, AggregatedLogFormat.LogKey>();
103    RESERVED_KEYS.put(APPLICATION_ACL_KEY.toString(), APPLICATION_ACL_KEY);
104    RESERVED_KEYS.put(APPLICATION_OWNER_KEY.toString(), APPLICATION_OWNER_KEY);
105    RESERVED_KEYS.put(VERSION_KEY.toString(), VERSION_KEY);
106  }
107
108  @Public
109  public static class LogKey implements Writable {
110
111    private String keyString;
112
113    public LogKey() {
114
115    }
116
117    public LogKey(ContainerId containerId) {
118      this.keyString = containerId.toString();
119    }
120
121    public LogKey(String keyString) {
122      this.keyString = keyString;
123    }
124    
125    @Override
126    public int hashCode() {
127      return keyString == null ? 0 : keyString.hashCode();
128    }
129
130    @Override
131    public boolean equals(Object obj) {
132      if (obj instanceof LogKey) {
133        LogKey other = (LogKey) obj;
134        if (this.keyString == null) {
135          return other.keyString == null;
136        }
137        return this.keyString.equals(other.keyString);
138      }
139      return false;
140    }
141
142    @Private
143    @Override
144    public void write(DataOutput out) throws IOException {
145      out.writeUTF(this.keyString);
146    }
147
148    @Private
149    @Override
150    public void readFields(DataInput in) throws IOException {
151      this.keyString = in.readUTF();
152    }
153
154    @Override
155    public String toString() {
156      return this.keyString;
157    }
158  }
159
160  @Private
161  public static class LogValue {
162
163    private final List<String> rootLogDirs;
164    private final ContainerId containerId;
165    private final String user;
166    private final LogAggregationContext logAggregationContext;
167    private Set<File> uploadedFiles = new HashSet<File>();
168    private final Set<String> alreadyUploadedLogFiles;
169    private Set<String> allExistingFileMeta = new HashSet<String>();
170    private final boolean appFinished;
171    private final boolean containerFinished;
172
173    /**
174     * The retention context to determine if log files are older than
175     * the retention policy configured.
176     */
177    private final LogRetentionContext logRetentionContext;
178    /**
179     * The set of log files that are older than retention policy that will
180     * not be uploaded but ready for deletion.
181     */
182    private final Set<File> obseleteRetentionLogFiles = new HashSet<File>();
183
184    // TODO Maybe add a version string here. Instead of changing the version of
185    // the entire k-v format
186
187    public LogValue(List<String> rootLogDirs, ContainerId containerId,
188        String user) {
189      this(rootLogDirs, containerId, user, null, new HashSet<String>(),
190          null, true, true);
191    }
192
193    public LogValue(List<String> rootLogDirs, ContainerId containerId,
194        String user, LogAggregationContext logAggregationContext,
195        Set<String> alreadyUploadedLogFiles,
196        LogRetentionContext retentionContext, boolean appFinished,
197        boolean containerFinished) {
198      this.rootLogDirs = new ArrayList<String>(rootLogDirs);
199      this.containerId = containerId;
200      this.user = user;
201
202      // Ensure logs are processed in lexical order
203      Collections.sort(this.rootLogDirs);
204      this.logAggregationContext = logAggregationContext;
205      this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
206      this.appFinished = appFinished;
207      this.containerFinished = containerFinished;
208      this.logRetentionContext = retentionContext;
209    }
210
211    @VisibleForTesting
212    public Set<File> getPendingLogFilesToUploadForThisContainer() {
213      Set<File> pendingUploadFiles = new HashSet<File>();
214      for (String rootLogDir : this.rootLogDirs) {
215        File appLogDir = new File(rootLogDir,
216            this.containerId.getApplicationAttemptId().
217                getApplicationId().toString());
218        File containerLogDir =
219            new File(appLogDir, this.containerId.toString());
220
221        if (!containerLogDir.isDirectory()) {
222          continue; // ContainerDir may have been deleted by the user.
223        }
224
225        pendingUploadFiles
226          .addAll(getPendingLogFilesToUpload(containerLogDir));
227      }
228      return pendingUploadFiles;
229    }
230
231    public void write(DataOutputStream out, Set<File> pendingUploadFiles)
232        throws IOException {
233      List<File> fileList = new ArrayList<File>(pendingUploadFiles);
234      Collections.sort(fileList);
235
236      for (File logFile : fileList) {
237        // We only aggregate top level files.
238        // Ignore anything inside sub-folders.
239        if (logFile.isDirectory()) {
240          LOG.warn(logFile.getAbsolutePath() + " is a directory. Ignore it.");
241          continue;
242        }
243
244        FileInputStream in = null;
245        try {
246          in = secureOpenFile(logFile);
247        } catch (IOException e) {
248          logErrorMessage(logFile, e);
249          IOUtils.cleanup(LOG, in);
250          continue;
251        }
252
253        final long fileLength = logFile.length();
254        // Write the logFile Type
255        out.writeUTF(logFile.getName());
256
257        // Write the log length as UTF so that it is printable
258        out.writeUTF(String.valueOf(fileLength));
259
260        // Write the log itself
261        try {
262          byte[] buf = new byte[65535];
263          int len = 0;
264          long bytesLeft = fileLength;
265          while ((len = in.read(buf)) != -1) {
266            //If buffer contents within fileLength, write
267            if (len < bytesLeft) {
268              out.write(buf, 0, len);
269              bytesLeft-=len;
270            }
271            //else only write contents within fileLength, then exit early
272            else {
273              out.write(buf, 0, (int)bytesLeft);
274              break;
275            }
276          }
277          long newLength = logFile.length();
278          if(fileLength < newLength) {
279            LOG.warn("Aggregated logs truncated by approximately "+
280                (newLength-fileLength) +" bytes.");
281          }
282          this.uploadedFiles.add(logFile);
283        } catch (IOException e) {
284          String message = logErrorMessage(logFile, e);
285          out.write(message.getBytes(Charset.forName("UTF-8")));
286        } finally {
287          IOUtils.cleanup(LOG, in);
288        }
289      }
290    }
291
292    @VisibleForTesting
293    public FileInputStream secureOpenFile(File logFile) throws IOException {
294      return SecureIOUtils.openForRead(logFile, getUser(), null);
295    }
296
297    private static String logErrorMessage(File logFile, Exception e) {
298      String message = "Error aggregating log file. Log file : "
299          + logFile.getAbsolutePath() + ". " + e.getMessage();
300      LOG.error(message, e);
301      return message;
302    }
303
304    // Added for testing purpose.
305    public String getUser() {
306      return user;
307    }
308
309    private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
310      Set<File> candidates =
311          new HashSet<File>(Arrays.asList(containerLogDir.listFiles()));
312      for (File logFile : candidates) {
313        this.allExistingFileMeta.add(getLogFileMetaData(logFile));
314      }
315
316      // if log files are older than retention policy, do not upload them.
317      // but schedule them for deletion.
318      if(logRetentionContext != null && !logRetentionContext.shouldRetainLog()){
319        obseleteRetentionLogFiles.addAll(candidates);
320        candidates.clear();
321        return candidates;
322      }
323
324      Set<File> fileCandidates = new HashSet<File>(candidates);
325      if (this.logAggregationContext != null && candidates.size() > 0) {
326        fileCandidates = getFileCandidates(fileCandidates, this.appFinished);
327        if (!this.appFinished && this.containerFinished) {
328          Set<File> addition = new HashSet<File>(candidates);
329          addition = getFileCandidates(addition, true);
330          fileCandidates.addAll(addition);
331        }
332      }
333
334      return fileCandidates;
335    }
336
337    private Set<File> getFileCandidates(Set<File> candidates,
338        boolean useRegularPattern) {
339      filterFiles(
340          useRegularPattern ? this.logAggregationContext.getIncludePattern()
341              : this.logAggregationContext.getRolledLogsIncludePattern(),
342          candidates, false);
343
344      filterFiles(
345          useRegularPattern ? this.logAggregationContext.getExcludePattern()
346              : this.logAggregationContext.getRolledLogsExcludePattern(),
347          candidates, true);
348
349      Iterable<File> mask =
350          Iterables.filter(candidates, new Predicate<File>() {
351            @Override
352            public boolean apply(File next) {
353              return !alreadyUploadedLogFiles
354                  .contains(getLogFileMetaData(next));
355            }
356          });
357      return Sets.newHashSet(mask);
358    }
359
360    private void filterFiles(String pattern, Set<File> candidates,
361        boolean exclusion) {
362      if (pattern != null && !pattern.isEmpty()) {
363        Pattern filterPattern = Pattern.compile(pattern);
364        for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
365          .hasNext();) {
366          File candidate = candidatesItr.next();
367          boolean match = filterPattern.matcher(candidate.getName()).find();
368          if ((!match && !exclusion) || (match && exclusion)) {
369            candidatesItr.remove();
370          }
371        }
372      }
373    }
374
375    public Set<Path> getCurrentUpLoadedFilesPath() {
376      Set<Path> path = new HashSet<Path>();
377      for (File file : this.uploadedFiles) {
378        path.add(new Path(file.getAbsolutePath()));
379      }
380      return path;
381    }
382
383    public Set<String> getCurrentUpLoadedFileMeta() {
384      Set<String> info = new HashSet<String>();
385      for (File file : this.uploadedFiles) {
386        info.add(getLogFileMetaData(file));
387      }
388      return info;
389    }
390
391    public Set<Path> getObseleteRetentionLogFiles() {
392      Set<Path> path = new HashSet<Path>();
393      for(File file: this.obseleteRetentionLogFiles) {
394        path.add(new Path(file.getAbsolutePath()));
395      }
396      return path;
397    }
398
399    public Set<String> getAllExistingFilesMeta() {
400      return this.allExistingFileMeta;
401    }
402
403    private String getLogFileMetaData(File file) {
404      return containerId.toString() + "_" + file.getName() + "_"
405          + file.lastModified();
406    }
407  }
408
409  /**
410   * A context for log retention to determine if files are older than
411   * the retention policy configured in YarnConfiguration.
412   */
413  public static class LogRetentionContext {
414    /**
415     * The time used with logRetentionMillis, to determine ages of
416     * log files and if files need to be uploaded.
417     */
418    private final long logInitedTimeMillis;
419    /**
420     * The numbers of milli seconds since a log file is created to determine
421     * if we should upload it. -1 if disabled.
422     * see YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS for details.
423     */
424    private final long logRetentionMillis;
425
426    public LogRetentionContext(long logInitedTimeMillis, long
427        logRetentionMillis) {
428      this.logInitedTimeMillis = logInitedTimeMillis;
429      this.logRetentionMillis = logRetentionMillis;
430    }
431
432    public boolean isDisabled() {
433      return logInitedTimeMillis < 0 || logRetentionMillis < 0;
434    }
435
436    public boolean shouldRetainLog() {
437      return isDisabled() ||
438          System.currentTimeMillis() - logInitedTimeMillis < logRetentionMillis;
439    }
440  }
441
442  /**
443   * The writer that writes out the aggregated logs.
444   */
445  @Private
446  public static class LogWriter {
447
448    private final FSDataOutputStream fsDataOStream;
449    private final TFile.Writer writer;
450    private FileContext fc;
451
452    public LogWriter(final Configuration conf, final Path remoteAppLogFile,
453        UserGroupInformation userUgi) throws IOException {
454      try {
455        this.fsDataOStream =
456            userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
457              @Override
458              public FSDataOutputStream run() throws Exception {
459                fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
460                fc.setUMask(APP_LOG_FILE_UMASK);
461                return fc.create(
462                    remoteAppLogFile,
463                    EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
464                    new Options.CreateOpts[] {});
465              }
466            });
467      } catch (InterruptedException e) {
468        throw new IOException(e);
469      }
470
471      // Keys are not sorted: null arg
472      // 256KB minBlockSize : Expected log size for each container too
473      this.writer =
474          new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
475              YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
476              YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
477      //Write the version string
478      writeVersion();
479    }
480
481    @VisibleForTesting
482    public TFile.Writer getWriter() {
483      return this.writer;
484    }
485
486    private void writeVersion() throws IOException {
487      DataOutputStream out = this.writer.prepareAppendKey(-1);
488      VERSION_KEY.write(out);
489      out.close();
490      out = this.writer.prepareAppendValue(-1);
491      out.writeInt(VERSION);
492      out.close();
493    }
494
495    public void writeApplicationOwner(String user) throws IOException {
496      DataOutputStream out = this.writer.prepareAppendKey(-1);
497      APPLICATION_OWNER_KEY.write(out);
498      out.close();
499      out = this.writer.prepareAppendValue(-1);
500      out.writeUTF(user);
501      out.close();
502    }
503
504    public void writeApplicationACLs(Map<ApplicationAccessType, String> appAcls)
505        throws IOException {
506      DataOutputStream out = this.writer.prepareAppendKey(-1);
507      APPLICATION_ACL_KEY.write(out);
508      out.close();
509      out = this.writer.prepareAppendValue(-1);
510      for (Entry<ApplicationAccessType, String> entry : appAcls.entrySet()) {
511        out.writeUTF(entry.getKey().toString());
512        out.writeUTF(entry.getValue());
513      }
514      out.close();
515    }
516
517    public void append(LogKey logKey, LogValue logValue) throws IOException {
518      Set<File> pendingUploadFiles =
519          logValue.getPendingLogFilesToUploadForThisContainer();
520      if (pendingUploadFiles.size() == 0) {
521        return;
522      }
523      DataOutputStream out = this.writer.prepareAppendKey(-1);
524      logKey.write(out);
525      out.close();
526      out = this.writer.prepareAppendValue(-1);
527      logValue.write(out, pendingUploadFiles);
528      out.close();
529    }
530
531    public void close() {
532      try {
533        this.writer.close();
534      } catch (IOException e) {
535        LOG.warn("Exception closing writer", e);
536      }
537      IOUtils.closeStream(fsDataOStream);
538    }
539  }
540
541  @Public
542  @Evolving
543  public static class LogReader {
544
545    private final FSDataInputStream fsDataIStream;
546    private final TFile.Reader.Scanner scanner;
547    private final TFile.Reader reader;
548
549    public LogReader(Configuration conf, Path remoteAppLogFile)
550        throws IOException {
551      FileContext fileContext =
552          FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
553      this.fsDataIStream = fileContext.open(remoteAppLogFile);
554      reader =
555          new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
556              remoteAppLogFile).getLen(), conf);
557      this.scanner = reader.createScanner();
558    }
559
560    private boolean atBeginning = true;
561
562    /**
563     * Returns the owner of the application.
564     * 
565     * @return the application owner.
566     * @throws IOException
567     */
568    public String getApplicationOwner() throws IOException {
569      TFile.Reader.Scanner ownerScanner = null;
570      try {
571        ownerScanner = reader.createScanner();
572        LogKey key = new LogKey();
573        while (!ownerScanner.atEnd()) {
574          TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
575          key.readFields(entry.getKeyStream());
576          if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
577            DataInputStream valueStream = entry.getValueStream();
578            return valueStream.readUTF();
579          }
580          ownerScanner.advance();
581        }
582        return null;
583      } finally {
584        IOUtils.cleanup(LOG, ownerScanner);
585      }
586    }
587
588    /**
589     * Returns ACLs for the application. An empty map is returned if no ACLs are
590     * found.
591     * 
592     * @return a map of the Application ACLs.
593     * @throws IOException
594     */
595    public Map<ApplicationAccessType, String> getApplicationAcls()
596        throws IOException {
597      // TODO Seek directly to the key once a comparator is specified.
598      TFile.Reader.Scanner aclScanner = null;
599      try {
600        aclScanner = reader.createScanner();
601        LogKey key = new LogKey();
602        Map<ApplicationAccessType, String> acls =
603            new HashMap<ApplicationAccessType, String>();
604        while (!aclScanner.atEnd()) {
605          TFile.Reader.Scanner.Entry entry = aclScanner.entry();
606          key.readFields(entry.getKeyStream());
607          if (key.toString().equals(APPLICATION_ACL_KEY.toString())) {
608            DataInputStream valueStream = entry.getValueStream();
609            while (true) {
610              String appAccessOp = null;
611              String aclString = null;
612              try {
613                appAccessOp = valueStream.readUTF();
614              } catch (EOFException e) {
615                // Valid end of stream.
616                break;
617              }
618              try {
619                aclString = valueStream.readUTF();
620              } catch (EOFException e) {
621                throw new YarnRuntimeException("Error reading ACLs", e);
622              }
623              acls.put(ApplicationAccessType.valueOf(appAccessOp), aclString);
624            }
625          }
626          aclScanner.advance();
627        }
628        return acls;
629      } finally {
630        IOUtils.cleanup(LOG, aclScanner);
631      }
632    }
633
634    /**
635     * Read the next key and return the value-stream.
636     * 
637     * @param key
638     * @return the valueStream if there are more keys or null otherwise.
639     * @throws IOException
640     */
641    public DataInputStream next(LogKey key) throws IOException {
642      if (!this.atBeginning) {
643        this.scanner.advance();
644      } else {
645        this.atBeginning = false;
646      }
647      if (this.scanner.atEnd()) {
648        return null;
649      }
650      TFile.Reader.Scanner.Entry entry = this.scanner.entry();
651      key.readFields(entry.getKeyStream());
652      // Skip META keys
653      if (RESERVED_KEYS.containsKey(key.toString())) {
654        return next(key);
655      }
656      DataInputStream valueStream = entry.getValueStream();
657      return valueStream;
658    }
659
660    /**
661     * Get a ContainerLogsReader to read the logs for
662     * the specified container.
663     *
664     * @param containerId
665     * @return object to read the container's logs or null if the
666     *         logs could not be found
667     * @throws IOException
668     */
669    @Private
670    public ContainerLogsReader getContainerLogsReader(
671        ContainerId containerId) throws IOException {
672      ContainerLogsReader logReader = null;
673
674      final LogKey containerKey = new LogKey(containerId);
675      LogKey key = new LogKey();
676      DataInputStream valueStream = next(key);
677      while (valueStream != null && !key.equals(containerKey)) {
678        valueStream = next(key);
679      }
680
681      if (valueStream != null) {
682        logReader = new ContainerLogsReader(valueStream);
683      }
684
685      return logReader;
686    }
687
688    //TODO  Change Log format and interfaces to be containerId specific.
689    // Avoid returning completeValueStreams.
690//    public List<String> getTypesForContainer(DataInputStream valueStream){}
691//    
692//    /**
693//     * @param valueStream
694//     *          The Log stream for the container.
695//     * @param fileType
696//     *          the log type required.
697//     * @return An InputStreamReader for the required log type or null if the
698//     *         type is not found.
699//     * @throws IOException
700//     */
701//    public InputStreamReader getLogStreamForType(DataInputStream valueStream,
702//        String fileType) throws IOException {
703//      valueStream.reset();
704//      try {
705//        while (true) {
706//          String ft = valueStream.readUTF();
707//          String fileLengthStr = valueStream.readUTF();
708//          long fileLength = Long.parseLong(fileLengthStr);
709//          if (ft.equals(fileType)) {
710//            BoundedInputStream bis =
711//                new BoundedInputStream(valueStream, fileLength);
712//            return new InputStreamReader(bis);
713//          } else {
714//            long totalSkipped = 0;
715//            long currSkipped = 0;
716//            while (currSkipped != -1 && totalSkipped < fileLength) {
717//              currSkipped = valueStream.skip(fileLength - totalSkipped);
718//              totalSkipped += currSkipped;
719//            }
720//            // TODO Verify skip behaviour.
721//            if (currSkipped == -1) {
722//              return null;
723//            }
724//          }
725//        }
726//      } catch (EOFException e) {
727//        return null;
728//      }
729//    }
730
731    /**
732     * Writes all logs for a single container to the provided writer.
733     * @param valueStream
734     * @param writer
735     * @param logUploadedTime
736     * @throws IOException
737     */
738    public static void readAcontainerLogs(DataInputStream valueStream,
739        Writer writer, long logUploadedTime) throws IOException {
740      OutputStream os = null;
741      PrintStream ps = null;
742      try {
743        os = new WriterOutputStream(writer, Charset.forName("UTF-8"));
744        ps = new PrintStream(os);
745        while (true) {
746          try {
747            readContainerLogs(valueStream, ps, logUploadedTime, Long.MAX_VALUE);
748          } catch (EOFException e) {
749            // EndOfFile
750            return;
751          }
752        }
753      } finally {
754        IOUtils.cleanup(LOG, ps);
755        IOUtils.cleanup(LOG, os);
756      }
757    }
758
759    /**
760     * Writes all logs for a single container to the provided writer.
761     * @param valueStream
762     * @param writer
763     * @throws IOException
764     */
765    public static void readAcontainerLogs(DataInputStream valueStream,
766        Writer writer) throws IOException {
767      readAcontainerLogs(valueStream, writer, -1);
768    }
769
770    private static void readContainerLogs(DataInputStream valueStream,
771        PrintStream out, long logUploadedTime, long bytes)
772        throws IOException {
773      byte[] buf = new byte[65535];
774
775      String fileType = valueStream.readUTF();
776      String fileLengthStr = valueStream.readUTF();
777      long fileLength = Long.parseLong(fileLengthStr);
778      out.print("LogType:");
779      out.println(fileType);
780      if (logUploadedTime != -1) {
781        out.print("Log Upload Time:");
782        out.println(Times.format(logUploadedTime));
783      }
784      out.print("LogLength:");
785      out.println(fileLengthStr);
786      out.println("Log Contents:");
787
788      long toSkip = 0;
789      long totalBytesToRead = fileLength;
790      long skipAfterRead = 0;
791      if (bytes < 0) {
792        long absBytes = Math.abs(bytes);
793        if (absBytes < fileLength) {
794          toSkip = fileLength - absBytes;
795          totalBytesToRead = absBytes;
796        }
797        org.apache.hadoop.io.IOUtils.skipFully(
798            valueStream, toSkip);
799      } else {
800        if (bytes < fileLength) {
801          totalBytesToRead = bytes;
802          skipAfterRead = fileLength - bytes;
803        }
804      }
805
806      long curRead = 0;
807      long pendingRead = totalBytesToRead - curRead;
808      int toRead =
809                pendingRead > buf.length ? buf.length : (int) pendingRead;
810      int len = valueStream.read(buf, 0, toRead);
811      while (len != -1 && curRead < totalBytesToRead) {
812        out.write(buf, 0, len);
813        curRead += len;
814
815        pendingRead = totalBytesToRead - curRead;
816        toRead =
817                  pendingRead > buf.length ? buf.length : (int) pendingRead;
818        len = valueStream.read(buf, 0, toRead);
819      }
820      org.apache.hadoop.io.IOUtils.skipFully(
821          valueStream, skipAfterRead);
822      out.println("\nEnd of LogType:" + fileType);
823      out.println("");
824    }
825
826    /**
827     * Keep calling this till you get a {@link EOFException} for getting logs of
828     * all types for a single container.
829     * 
830     * @param valueStream
831     * @param out
832     * @param logUploadedTime
833     * @throws IOException
834     */
835    public static void readAContainerLogsForALogType(
836        DataInputStream valueStream, PrintStream out, long logUploadedTime)
837          throws IOException {
838      readContainerLogs(valueStream, out, logUploadedTime, Long.MAX_VALUE);
839    }
840
841    /**
842     * Keep calling this till you get a {@link EOFException} for getting logs of
843     * all types for a single container for the specific bytes.
844     *
845     * @param valueStream
846     * @param out
847     * @param logUploadedTime
848     * @param bytes
849     * @throws IOException
850     */
851    public static void readAContainerLogsForALogType(
852        DataInputStream valueStream, PrintStream out, long logUploadedTime,
853        long bytes) throws IOException {
854      readContainerLogs(valueStream, out, logUploadedTime, bytes);
855    }
856
857    /**
858     * Keep calling this till you get a {@link EOFException} for getting logs of
859     * all types for a single container.
860     * 
861     * @param valueStream
862     * @param out
863     * @throws IOException
864     */
865    public static void readAContainerLogsForALogType(
866        DataInputStream valueStream, PrintStream out)
867          throws IOException {
868      readAContainerLogsForALogType(valueStream, out, -1);
869    }
870
871    /**
872     * Keep calling this till you get a {@link EOFException} for getting logs of
873     * the specific types for a single container.
874     * @param valueStream
875     * @param out
876     * @param logUploadedTime
877     * @param logType
878     * @throws IOException
879     */
880    public static int readContainerLogsForALogType(
881        DataInputStream valueStream, PrintStream out, long logUploadedTime,
882        List<String> logType) throws IOException {
883      return readContainerLogsForALogType(valueStream, out, logUploadedTime,
884          logType, Long.MAX_VALUE);
885    }
886
887    /**
888     * Keep calling this till you get a {@link EOFException} for getting logs of
889     * the specific types for a single container.
890     * @param valueStream
891     * @param out
892     * @param logUploadedTime
893     * @param logType
894     * @throws IOException
895     */
896    public static int readContainerLogsForALogType(
897        DataInputStream valueStream, PrintStream out, long logUploadedTime,
898        List<String> logType, long bytes) throws IOException {
899      byte[] buf = new byte[65535];
900
901      String fileType = valueStream.readUTF();
902      String fileLengthStr = valueStream.readUTF();
903      long fileLength = Long.parseLong(fileLengthStr);
904      if (logType.contains(fileType)) {
905        out.print("LogType:");
906        out.println(fileType);
907        if (logUploadedTime != -1) {
908          out.print("Log Upload Time:");
909          out.println(Times.format(logUploadedTime));
910        }
911        out.print("LogLength:");
912        out.println(fileLengthStr);
913        out.println("Log Contents:");
914
915        long toSkip = 0;
916        long totalBytesToRead = fileLength;
917        long skipAfterRead = 0;
918        if (bytes < 0) {
919          long absBytes = Math.abs(bytes);
920          if (absBytes < fileLength) {
921            toSkip = fileLength - absBytes;
922            totalBytesToRead = absBytes;
923          }
924          org.apache.hadoop.io.IOUtils.skipFully(
925              valueStream, toSkip);
926        } else {
927          if (bytes < fileLength) {
928            totalBytesToRead = bytes;
929            skipAfterRead = fileLength - bytes;
930          }
931        }
932
933        long curRead = 0;
934        long pendingRead = totalBytesToRead - curRead;
935        int toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
936        int len = valueStream.read(buf, 0, toRead);
937        while (len != -1 && curRead < totalBytesToRead) {
938          out.write(buf, 0, len);
939          curRead += len;
940
941          pendingRead = totalBytesToRead - curRead;
942          toRead = pendingRead > buf.length ? buf.length : (int) pendingRead;
943          len = valueStream.read(buf, 0, toRead);
944        }
945        org.apache.hadoop.io.IOUtils.skipFully(
946            valueStream, skipAfterRead);
947        out.println("\nEnd of LogType:" + fileType);
948        out.println("");
949        return 0;
950      } else {
951        long totalSkipped = 0;
952        long currSkipped = 0;
953        while (currSkipped != -1 && totalSkipped < fileLength) {
954          currSkipped = valueStream.skip(fileLength - totalSkipped);
955          totalSkipped += currSkipped;
956        }
957        return -1;
958      }
959    }
960
961    @Private
962    public static String readContainerMetaDataAndSkipData(
963        DataInputStream valueStream, PrintStream out) throws IOException {
964
965      String fileType = valueStream.readUTF();
966      String fileLengthStr = valueStream.readUTF();
967      long fileLength = Long.parseLong(fileLengthStr);
968      if (out != null) {
969        out.print("LogType:");
970        out.println(fileType);
971        out.print("LogLength:");
972        out.println(fileLengthStr);
973      }
974      long totalSkipped = 0;
975      long currSkipped = 0;
976      while (currSkipped != -1 && totalSkipped < fileLength) {
977        currSkipped = valueStream.skip(fileLength - totalSkipped);
978        totalSkipped += currSkipped;
979      }
980      return fileType;
981    }
982
983    public void close() {
984      IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
985    }
986  }
987
988  @Private
989  public static class ContainerLogsReader {
990    private DataInputStream valueStream;
991    private String currentLogType = null;
992    private long currentLogLength = 0;
993    private BoundedInputStream currentLogData = null;
994    private InputStreamReader currentLogISR;
995
996    public ContainerLogsReader(DataInputStream stream) {
997      valueStream = stream;
998    }
999
1000    public String nextLog() throws IOException {
1001      if (currentLogData != null && currentLogLength > 0) {
1002        // seek to the end of the current log, relying on BoundedInputStream
1003        // to prevent seeking past the end of the current log
1004        do {
1005          if (currentLogData.skip(currentLogLength) < 0) {
1006            break;
1007          }
1008        } while (currentLogData.read() != -1);
1009      }
1010
1011      currentLogType = null;
1012      currentLogLength = 0;
1013      currentLogData = null;
1014      currentLogISR = null;
1015
1016      try {
1017        String logType = valueStream.readUTF();
1018        String logLengthStr = valueStream.readUTF();
1019        currentLogLength = Long.parseLong(logLengthStr);
1020        currentLogData =
1021            new BoundedInputStream(valueStream, currentLogLength);
1022        currentLogData.setPropagateClose(false);
1023        currentLogISR = new InputStreamReader(currentLogData,
1024            Charset.forName("UTF-8"));
1025        currentLogType = logType;
1026      } catch (EOFException e) {
1027      }
1028
1029      return currentLogType;
1030    }
1031
1032    public String getCurrentLogType() {
1033      return currentLogType;
1034    }
1035
1036    public long getCurrentLogLength() {
1037      return currentLogLength;
1038    }
1039
1040    public long skip(long n) throws IOException {
1041      return currentLogData.skip(n);
1042    }
1043
1044    public int read() throws IOException {
1045      return currentLogData.read();
1046    }
1047
1048    public int read(byte[] buf, int off, int len) throws IOException {
1049      return currentLogData.read(buf, off, len);
1050    }
1051
1052    public int read(char[] buf, int off, int len) throws IOException {
1053      return currentLogISR.read(buf, off, len);
1054    }
1055  }
1056}