001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hdfs.client; 019 020import java.io.IOException; 021import java.io.OutputStream; 022import java.util.EnumSet; 023 024import org.apache.hadoop.classification.InterfaceAudience; 025import org.apache.hadoop.classification.InterfaceStability; 026import org.apache.hadoop.crypto.CryptoOutputStream; 027import org.apache.hadoop.fs.FSDataOutputStream; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.hdfs.DFSOutputStream; 030 031import com.google.common.base.Preconditions; 032 033/** 034 * The Hdfs implementation of {@link FSDataOutputStream}. 035 */ 036@InterfaceAudience.Public 037@InterfaceStability.Evolving 038public class HdfsDataOutputStream extends FSDataOutputStream { 039 public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats, 040 long startPosition) throws IOException { 041 super(out, stats, startPosition); 042 } 043 044 public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats 045 ) throws IOException { 046 this(out, stats, 0L); 047 } 048 049 public HdfsDataOutputStream(CryptoOutputStream out, FileSystem.Statistics stats, 050 long startPosition) throws IOException { 051 super(out, stats, startPosition); 052 Preconditions.checkArgument(out.getWrappedStream() instanceof DFSOutputStream, 053 "CryptoOutputStream should wrap a DFSOutputStream"); 054 } 055 056 public HdfsDataOutputStream(CryptoOutputStream out, FileSystem.Statistics stats) 057 throws IOException { 058 this(out, stats, 0L); 059 } 060 061 /** 062 * Get the actual number of replicas of the current block. 063 * 064 * This can be different from the designated replication factor of the file 065 * because the namenode does not maintain replication for the blocks which are 066 * currently being written to. Depending on the configuration, the client may 067 * continue to write to a block even if a few datanodes in the write pipeline 068 * have failed, or the client may add a new datanodes once a datanode has 069 * failed. 070 * 071 * @return the number of valid replicas of the current block 072 */ 073 public synchronized int getCurrentBlockReplication() throws IOException { 074 OutputStream wrappedStream = getWrappedStream(); 075 if (wrappedStream instanceof CryptoOutputStream) { 076 wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream(); 077 } 078 return ((DFSOutputStream) wrappedStream).getCurrentBlockReplication(); 079 } 080 081 /** 082 * Sync buffered data to DataNodes (flush to disk devices). 083 * 084 * @param syncFlags 085 * Indicate the detailed semantic and actions of the hsync. 086 * @throws IOException 087 * @see FSDataOutputStream#hsync() 088 */ 089 public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException { 090 OutputStream wrappedStream = getWrappedStream(); 091 if (wrappedStream instanceof CryptoOutputStream) { 092 ((CryptoOutputStream) wrappedStream).flush(); 093 wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream(); 094 } 095 ((DFSOutputStream) wrappedStream).hsync(syncFlags); 096 } 097 098 public static enum SyncFlag { 099 100 /** 101 * When doing sync to DataNodes, also update the metadata (block length) in 102 * the NameNode. 103 */ 104 UPDATE_LENGTH, 105 106 /** 107 * Sync the data to DataNode, close the current block, and allocate a new 108 * block 109 */ 110 END_BLOCK; 111 } 112}