001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.DataOutputStream;
021import java.io.FilterOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027
028/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
029 */
030@InterfaceAudience.Public
031@InterfaceStability.Stable
032public class FSDataOutputStream extends DataOutputStream
033    implements Syncable, CanSetDropBehind {
034  private final OutputStream wrappedStream;
035
036  private static class PositionCache extends FilterOutputStream {
037    private final FileSystem.Statistics statistics;
038    private long position;
039
040    PositionCache(OutputStream out, FileSystem.Statistics stats, long pos) {
041      super(out);
042      statistics = stats;
043      position = pos;
044    }
045
046    @Override
047    public void write(int b) throws IOException {
048      out.write(b);
049      position++;
050      if (statistics != null) {
051        statistics.incrementBytesWritten(1);
052      }
053    }
054    
055    @Override
056    public void write(byte b[], int off, int len) throws IOException {
057      out.write(b, off, len);
058      position += len;                            // update position
059      if (statistics != null) {
060        statistics.incrementBytesWritten(len);
061      }
062    }
063      
064    long getPos() {
065      return position;                            // return cached position
066    }
067
068    @Override
069    public void close() throws IOException {
070      // ensure close works even if a null reference was passed in
071      if (out != null) {
072        out.close();
073      }
074    }
075  }
076
077  public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats) {
078    this(out, stats, 0);
079  }
080
081  public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats,
082                            long startPosition) {
083    super(new PositionCache(out, stats, startPosition));
084    wrappedStream = out;
085  }
086  
087  /**
088   * Get the current position in the output stream.
089   *
090   * @return the current position in the output stream
091   */
092  public long getPos() {
093    return ((PositionCache)out).getPos();
094  }
095
096  /**
097   * Close the underlying output stream.
098   */
099  @Override
100  public void close() throws IOException {
101    out.close(); // This invokes PositionCache.close()
102  }
103
104  /**
105   * Get a reference to the wrapped output stream.
106   *
107   * @return the underlying output stream
108   */
109  @InterfaceAudience.LimitedPrivate({"HDFS"})
110  public OutputStream getWrappedStream() {
111    return wrappedStream;
112  }
113
114  @Override  // Syncable
115  public void hflush() throws IOException {
116    if (wrappedStream instanceof Syncable) {
117      ((Syncable)wrappedStream).hflush();
118    } else {
119      wrappedStream.flush();
120    }
121  }
122  
123  @Override  // Syncable
124  public void hsync() throws IOException {
125    if (wrappedStream instanceof Syncable) {
126      ((Syncable)wrappedStream).hsync();
127    } else {
128      wrappedStream.flush();
129    }
130  }
131
132  @Override
133  public void setDropBehind(Boolean dropBehind) throws IOException {
134    try {
135      ((CanSetDropBehind)wrappedStream).setDropBehind(dropBehind);
136    } catch (ClassCastException e) {
137      throw new UnsupportedOperationException("the wrapped stream does " +
138          "not support setting the drop-behind caching setting.");
139    }
140  }
141}