001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hdfs.client;
019
020import java.io.IOException;
021import java.io.OutputStream;
022import java.util.EnumSet;
023
024import org.apache.hadoop.classification.InterfaceAudience;
025import org.apache.hadoop.classification.InterfaceStability;
026import org.apache.hadoop.crypto.CryptoOutputStream;
027import org.apache.hadoop.fs.FSDataOutputStream;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.hdfs.DFSOutputStream;
030
031import com.google.common.base.Preconditions;
032
033/**
034 * The Hdfs implementation of {@link FSDataOutputStream}.
035 */
036@InterfaceAudience.Public
037@InterfaceStability.Evolving
038public class HdfsDataOutputStream extends FSDataOutputStream {
039  public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats,
040      long startPosition) throws IOException {
041    super(out, stats, startPosition);
042  }
043
044  public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats
045      ) throws IOException {
046    this(out, stats, 0L);
047  }
048
049  public HdfsDataOutputStream(CryptoOutputStream out, FileSystem.Statistics stats,
050      long startPosition) throws IOException {
051    super(out, stats, startPosition);
052    Preconditions.checkArgument(out.getWrappedStream() instanceof DFSOutputStream,
053        "CryptoOutputStream should wrap a DFSOutputStream");
054  }
055
056  public HdfsDataOutputStream(CryptoOutputStream out, FileSystem.Statistics stats)
057      throws IOException {
058    this(out, stats, 0L);
059  }
060
061  /**
062   * Get the actual number of replicas of the current block.
063   * 
064   * This can be different from the designated replication factor of the file
065   * because the namenode does not maintain replication for the blocks which are
066   * currently being written to. Depending on the configuration, the client may
067   * continue to write to a block even if a few datanodes in the write pipeline
068   * have failed, or the client may add a new datanodes once a datanode has
069   * failed.
070   * 
071   * @return the number of valid replicas of the current block
072   */
073  public synchronized int getCurrentBlockReplication() throws IOException {
074    OutputStream wrappedStream = getWrappedStream();
075    if (wrappedStream instanceof CryptoOutputStream) {
076      wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream();
077    }
078    return ((DFSOutputStream) wrappedStream).getCurrentBlockReplication();
079  }
080  
081  /**
082   * Sync buffered data to DataNodes (flush to disk devices).
083   * 
084   * @param syncFlags
085   *          Indicate the detailed semantic and actions of the hsync.
086   * @throws IOException
087   * @see FSDataOutputStream#hsync()
088   */
089  public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
090    OutputStream wrappedStream = getWrappedStream();
091    if (wrappedStream instanceof CryptoOutputStream) {
092      ((CryptoOutputStream) wrappedStream).flush();
093      wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream();
094    }
095    ((DFSOutputStream) wrappedStream).hsync(syncFlags);
096  }
097  
098  public static enum SyncFlag {
099
100    /**
101     * When doing sync to DataNodes, also update the metadata (block length) in
102     * the NameNode.
103     */
104    UPDATE_LENGTH,
105
106    /**
107     * Sync the data to DataNode, close the current block, and allocate a new
108     * block
109     */
110    END_BLOCK;
111  }
112}