001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.client;
019
020import java.io.IOException;
021import java.io.OutputStream;
022import java.util.EnumSet;
023
024import org.apache.hadoop.classification.InterfaceAudience;
025import org.apache.hadoop.classification.InterfaceStability;
026import org.apache.hadoop.crypto.CryptoOutputStream;
027import org.apache.hadoop.fs.FSDataOutputStream;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.hdfs.DFSOutputStream;
030
031import com.google.common.base.Preconditions;
032
033/**
034 * The Hdfs implementation of {@link FSDataOutputStream}.
035 */
036@InterfaceAudience.Public
037@InterfaceStability.Evolving
038public class HdfsDataOutputStream extends FSDataOutputStream {
039  public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats,
040      long startPosition) throws IOException {
041    super(out, stats, startPosition);
042  }
043
044  public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats)
045      throws IOException {
046    this(out, stats, 0L);
047  }
048
049  public HdfsDataOutputStream(CryptoOutputStream out,
050      FileSystem.Statistics stats, long startPosition) throws IOException {
051    super(out, stats, startPosition);
052    Preconditions.checkArgument(
053        out.getWrappedStream() instanceof DFSOutputStream,
054        "CryptoOutputStream should wrap a DFSOutputStream");
055  }
056
057  public HdfsDataOutputStream(CryptoOutputStream out,
058      FileSystem.Statistics stats) throws IOException {
059    this(out, stats, 0L);
060  }
061
062  /**
063   * Get the actual number of replicas of the current block.
064   *
065   * This can be different from the designated replication factor of the file
066   * because the namenode does not maintain replication for the blocks which are
067   * currently being written to. Depending on the configuration, the client may
068   * continue to write to a block even if a few datanodes in the write pipeline
069   * have failed, or the client may add a new datanodes once a datanode has
070   * failed.
071   *
072   * @return the number of valid replicas of the current block
073   */
074  public synchronized int getCurrentBlockReplication() throws IOException {
075    OutputStream wrappedStream = getWrappedStream();
076    if (wrappedStream instanceof CryptoOutputStream) {
077      wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream();
078    }
079    return ((DFSOutputStream) wrappedStream).getCurrentBlockReplication();
080  }
081
082  /**
083   * Sync buffered data to DataNodes (flush to disk devices).
084   *
085   * @param syncFlags
086   *          Indicate the detailed semantic and actions of the hsync.
087   * @throws IOException
088   * @see FSDataOutputStream#hsync()
089   */
090  public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
091    OutputStream wrappedStream = getWrappedStream();
092    if (wrappedStream instanceof CryptoOutputStream) {
093      wrappedStream.flush();
094      wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream();
095    }
096    ((DFSOutputStream) wrappedStream).hsync(syncFlags);
097  }
098
099  public enum SyncFlag {
100
101    /**
102     * When doing sync to DataNodes, also update the metadata (block length) in
103     * the NameNode.
104     */
105    UPDATE_LENGTH,
106
107    /**
108     * Sync the data to DataNode, close the current block, and allocate a new
109     * block
110     */
111    END_BLOCK
112  }
113}