001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.fs; 019 020import java.io.DataOutputStream; 021import java.io.FilterOutputStream; 022import java.io.IOException; 023import java.io.OutputStream; 024 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceStability; 027 028/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}. 029 */ 030@InterfaceAudience.Public 031@InterfaceStability.Stable 032public class FSDataOutputStream extends DataOutputStream 033 implements Syncable, CanSetDropBehind { 034 private final OutputStream wrappedStream; 035 036 private static class PositionCache extends FilterOutputStream { 037 private final FileSystem.Statistics statistics; 038 private long position; 039 040 PositionCache(OutputStream out, FileSystem.Statistics stats, long pos) { 041 super(out); 042 statistics = stats; 043 position = pos; 044 } 045 046 @Override 047 public void write(int b) throws IOException { 048 out.write(b); 049 position++; 050 if (statistics != null) { 051 statistics.incrementBytesWritten(1); 052 } 053 } 054 055 @Override 056 public void write(byte b[], int off, int len) throws IOException { 057 out.write(b, off, len); 058 position += len; // update position 059 if (statistics != null) { 060 statistics.incrementBytesWritten(len); 061 } 062 } 063 064 long getPos() { 065 return position; // return cached position 066 } 067 068 @Override 069 public void close() throws IOException { 070 // ensure close works even if a null reference was passed in 071 if (out != null) { 072 out.close(); 073 } 074 } 075 } 076 077 public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats) { 078 this(out, stats, 0); 079 } 080 081 public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats, 082 long startPosition) { 083 super(new PositionCache(out, stats, startPosition)); 084 wrappedStream = out; 085 } 086 087 /** 088 * Get the current position in the output stream. 089 * 090 * @return the current position in the output stream 091 */ 092 public long getPos() { 093 return ((PositionCache)out).getPos(); 094 } 095 096 /** 097 * Close the underlying output stream. 098 */ 099 @Override 100 public void close() throws IOException { 101 out.close(); // This invokes PositionCache.close() 102 } 103 104 /** 105 * Get a reference to the wrapped output stream. 106 * 107 * @return the underlying output stream 108 */ 109 @InterfaceAudience.LimitedPrivate({"HDFS"}) 110 public OutputStream getWrappedStream() { 111 return wrappedStream; 112 } 113 114 @Override // Syncable 115 public void hflush() throws IOException { 116 if (wrappedStream instanceof Syncable) { 117 ((Syncable)wrappedStream).hflush(); 118 } else { 119 wrappedStream.flush(); 120 } 121 } 122 123 @Override // Syncable 124 public void hsync() throws IOException { 125 if (wrappedStream instanceof Syncable) { 126 ((Syncable)wrappedStream).hsync(); 127 } else { 128 wrappedStream.flush(); 129 } 130 } 131 132 @Override 133 public void setDropBehind(Boolean dropBehind) throws IOException { 134 try { 135 ((CanSetDropBehind)wrappedStream).setDropBehind(dropBehind); 136 } catch (ClassCastException e) { 137 throw new UnsupportedOperationException("the wrapped stream does " + 138 "not support setting the drop-behind caching setting."); 139 } 140 } 141}