001/**
002 * 
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.fs;
020
021import java.io.*;
022import java.nio.ByteBuffer;
023import java.util.EnumSet;
024
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.io.ByteBufferPool;
028import org.apache.hadoop.fs.ByteBufferUtil;
029import org.apache.hadoop.util.IdentityHashStore;
030
031/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
032 * and buffers input through a {@link BufferedInputStream}. */
033@InterfaceAudience.Public
034@InterfaceStability.Stable
035public class FSDataInputStream extends DataInputStream
036    implements Seekable, PositionedReadable, 
037      ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
038      HasEnhancedByteBufferAccess, CanUnbuffer {
039  /**
040   * Map ByteBuffers that we have handed out to readers to ByteBufferPool 
041   * objects
042   */
043  private final IdentityHashStore<ByteBuffer, ByteBufferPool>
044    extendedReadBuffers
045      = new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
046
047  public FSDataInputStream(InputStream in) {
048    super(in);
049    if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
050      throw new IllegalArgumentException(
051          "In is not an instance of Seekable or PositionedReadable");
052    }
053  }
054  
055  /**
056   * Seek to the given offset.
057   *
058   * @param desired offset to seek to
059   */
060  @Override
061  public void seek(long desired) throws IOException {
062    ((Seekable)in).seek(desired);
063  }
064
065  /**
066   * Get the current position in the input stream.
067   *
068   * @return current position in the input stream
069   */
070  @Override
071  public long getPos() throws IOException {
072    return ((Seekable)in).getPos();
073  }
074  
075  /**
076   * Read bytes from the given position in the stream to the given buffer.
077   *
078   * @param position  position in the input stream to seek
079   * @param buffer    buffer into which data is read
080   * @param offset    offset into the buffer in which data is written
081   * @param length    maximum number of bytes to read
082   * @return total number of bytes read into the buffer, or <code>-1</code>
083   *         if there is no more data because the end of the stream has been
084   *         reached
085   */
086  @Override
087  public int read(long position, byte[] buffer, int offset, int length)
088    throws IOException {
089    return ((PositionedReadable)in).read(position, buffer, offset, length);
090  }
091
092  /**
093   * Read bytes from the given position in the stream to the given buffer.
094   * Continues to read until <code>length</code> bytes have been read.
095   *
096   * @param position  position in the input stream to seek
097   * @param buffer    buffer into which data is read
098   * @param offset    offset into the buffer in which data is written
099   * @param length    the number of bytes to read
100   * @throws EOFException If the end of stream is reached while reading.
101   *                      If an exception is thrown an undetermined number
102   *                      of bytes in the buffer may have been written. 
103   */
104  @Override
105  public void readFully(long position, byte[] buffer, int offset, int length)
106    throws IOException {
107    ((PositionedReadable)in).readFully(position, buffer, offset, length);
108  }
109  
110  /**
111   * See {@link #readFully(long, byte[], int, int)}.
112   */
113  @Override
114  public void readFully(long position, byte[] buffer)
115    throws IOException {
116    ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
117  }
118  
119  /**
120   * Seek to the given position on an alternate copy of the data.
121   *
122   * @param  targetPos  position to seek to
123   * @return true if a new source is found, false otherwise
124   */
125  @Override
126  public boolean seekToNewSource(long targetPos) throws IOException {
127    return ((Seekable)in).seekToNewSource(targetPos); 
128  }
129  
130  /**
131   * Get a reference to the wrapped input stream. Used by unit tests.
132   *
133   * @return the underlying input stream
134   */
135  @InterfaceAudience.LimitedPrivate({"HDFS"})
136  public InputStream getWrappedStream() {
137    return in;
138  }
139
140  @Override
141  public int read(ByteBuffer buf) throws IOException {
142    if (in instanceof ByteBufferReadable) {
143      return ((ByteBufferReadable)in).read(buf);
144    }
145
146    throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
147  }
148
149  @Override
150  public FileDescriptor getFileDescriptor() throws IOException {
151    if (in instanceof HasFileDescriptor) {
152      return ((HasFileDescriptor) in).getFileDescriptor();
153    } else if (in instanceof FileInputStream) {
154      return ((FileInputStream) in).getFD();
155    } else {
156      return null;
157    }
158  }
159
160  @Override
161  public void setReadahead(Long readahead)
162      throws IOException, UnsupportedOperationException {
163    try {
164      ((CanSetReadahead)in).setReadahead(readahead);
165    } catch (ClassCastException e) {
166      throw new UnsupportedOperationException(
167          "this stream does not support setting the readahead " +
168          "caching strategy.");
169    }
170  }
171
172  @Override
173  public void setDropBehind(Boolean dropBehind)
174      throws IOException, UnsupportedOperationException {
175    try {
176      ((CanSetDropBehind)in).setDropBehind(dropBehind);
177    } catch (ClassCastException e) {
178      throw new UnsupportedOperationException("this stream does not " +
179          "support setting the drop-behind caching setting.");
180    }
181  }
182
183  @Override
184  public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
185      EnumSet<ReadOption> opts) 
186          throws IOException, UnsupportedOperationException {
187    try {
188      return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
189          maxLength, opts);
190    }
191    catch (ClassCastException e) {
192      ByteBuffer buffer = ByteBufferUtil.
193          fallbackRead(this, bufferPool, maxLength);
194      if (buffer != null) {
195        extendedReadBuffers.put(buffer, bufferPool);
196      }
197      return buffer;
198    }
199  }
200
201  private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET =
202      EnumSet.noneOf(ReadOption.class);
203
204  final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength)
205          throws IOException, UnsupportedOperationException {
206    return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET);
207  }
208  
209  @Override
210  public void releaseBuffer(ByteBuffer buffer) {
211    try {
212      ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer);
213    }
214    catch (ClassCastException e) {
215      ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer);
216      if (bufferPool == null) {
217        throw new IllegalArgumentException("tried to release a buffer " +
218            "that was not created by this stream.");
219      }
220      bufferPool.putBuffer(buffer);
221    }
222  }
223
224  @Override
225  public void unbuffer() {
226    try {
227      ((CanUnbuffer)in).unbuffer();
228    } catch (ClassCastException e) {
229      throw new UnsupportedOperationException("this stream does not " +
230          "support unbuffering.");
231    }
232  }
233}