001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.net.Socket;
023import java.nio.ByteBuffer;
024import java.nio.channels.FileChannel;
025import java.nio.channels.WritableByteChannel;
026import java.nio.file.DirectoryStream;
027import java.nio.file.DirectoryIteratorException;
028import java.nio.file.Files;
029import java.nio.file.Path;
030import java.util.ArrayList;
031import java.util.List;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035
036import org.apache.hadoop.classification.InterfaceAudience;
037import org.apache.hadoop.classification.InterfaceStability;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.util.ChunkedArrayList;
040
041/**
042 * An utility class for I/O related functionality. 
043 */
044@InterfaceAudience.Public
045@InterfaceStability.Evolving
046public class IOUtils {
047  public static final Log LOG = LogFactory.getLog(IOUtils.class);
048
049  /**
050   * Copies from one stream to another.
051   *
052   * @param in InputStrem to read from
053   * @param out OutputStream to write to
054   * @param buffSize the size of the buffer 
055   * @param close whether or not close the InputStream and 
056   * OutputStream at the end. The streams are closed in the finally clause.  
057   */
058  public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
059    throws IOException {
060    try {
061      copyBytes(in, out, buffSize);
062      if(close) {
063        out.close();
064        out = null;
065        in.close();
066        in = null;
067      }
068    } finally {
069      if(close) {
070        closeStream(out);
071        closeStream(in);
072      }
073    }
074  }
075  
076  /**
077   * Copies from one stream to another.
078   * 
079   * @param in InputStrem to read from
080   * @param out OutputStream to write to
081   * @param buffSize the size of the buffer 
082   */
083  public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
084    throws IOException {
085    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
086    byte buf[] = new byte[buffSize];
087    int bytesRead = in.read(buf);
088    while (bytesRead >= 0) {
089      out.write(buf, 0, bytesRead);
090      if ((ps != null) && ps.checkError()) {
091        throw new IOException("Unable to write to output stream.");
092      }
093      bytesRead = in.read(buf);
094    }
095  }
096
097  /**
098   * Copies from one stream to another. <strong>closes the input and output streams 
099   * at the end</strong>.
100   *
101   * @param in InputStrem to read from
102   * @param out OutputStream to write to
103   * @param conf the Configuration object 
104   */
105  public static void copyBytes(InputStream in, OutputStream out, Configuration conf)
106    throws IOException {
107    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true);
108  }
109  
110  /**
111   * Copies from one stream to another.
112   *
113   * @param in InputStream to read from
114   * @param out OutputStream to write to
115   * @param conf the Configuration object
116   * @param close whether or not close the InputStream and 
117   * OutputStream at the end. The streams are closed in the finally clause.
118   */
119  public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
120    throws IOException {
121    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096),  close);
122  }
123
124  /**
125   * Copies count bytes from one stream to another.
126   *
127   * @param in InputStream to read from
128   * @param out OutputStream to write to
129   * @param count number of bytes to copy
130   * @param close whether to close the streams
131   * @throws IOException if bytes can not be read or written
132   */
133  public static void copyBytes(InputStream in, OutputStream out, long count,
134      boolean close) throws IOException {
135    byte buf[] = new byte[4096];
136    long bytesRemaining = count;
137    int bytesRead;
138
139    try {
140      while (bytesRemaining > 0) {
141        int bytesToRead = (int)
142          (bytesRemaining < buf.length ? bytesRemaining : buf.length);
143
144        bytesRead = in.read(buf, 0, bytesToRead);
145        if (bytesRead == -1)
146          break;
147
148        out.write(buf, 0, bytesRead);
149        bytesRemaining -= bytesRead;
150      }
151      if (close) {
152        out.close();
153        out = null;
154        in.close();
155        in = null;
156      }
157    } finally {
158      if (close) {
159        closeStream(out);
160        closeStream(in);
161      }
162    }
163  }
164  
165  /**
166   * Utility wrapper for reading from {@link InputStream}. It catches any errors
167   * thrown by the underlying stream (either IO or decompression-related), and
168   * re-throws as an IOException.
169   * 
170   * @param is - InputStream to be read from
171   * @param buf - buffer the data is read into
172   * @param off - offset within buf
173   * @param len - amount of data to be read
174   * @return number of bytes read
175   */
176  public static int wrappedReadForCompressedData(InputStream is, byte[] buf,
177      int off, int len) throws IOException {
178    try {
179      return is.read(buf, off, len);
180    } catch (IOException ie) {
181      throw ie;
182    } catch (Throwable t) {
183      throw new IOException("Error while reading compressed data", t);
184    }
185  }
186
187  /**
188   * Reads len bytes in a loop.
189   *
190   * @param in InputStream to read from
191   * @param buf The buffer to fill
192   * @param off offset from the buffer
193   * @param len the length of bytes to read
194   * @throws IOException if it could not read requested number of bytes 
195   * for any reason (including EOF)
196   */
197  public static void readFully(InputStream in, byte buf[],
198      int off, int len) throws IOException {
199    int toRead = len;
200    while (toRead > 0) {
201      int ret = in.read(buf, off, toRead);
202      if (ret < 0) {
203        throw new IOException( "Premature EOF from inputStream");
204      }
205      toRead -= ret;
206      off += ret;
207    }
208  }
209  
210  /**
211   * Similar to readFully(). Skips bytes in a loop.
212   * @param in The InputStream to skip bytes from
213   * @param len number of bytes to skip.
214   * @throws IOException if it could not skip requested number of bytes 
215   * for any reason (including EOF)
216   */
217  public static void skipFully(InputStream in, long len) throws IOException {
218    long amt = len;
219    while (amt > 0) {
220      long ret = in.skip(amt);
221      if (ret == 0) {
222        // skip may return 0 even if we're not at EOF.  Luckily, we can 
223        // use the read() method to figure out if we're at the end.
224        int b = in.read();
225        if (b == -1) {
226          throw new EOFException( "Premature EOF from inputStream after " +
227              "skipping " + (len - amt) + " byte(s).");
228        }
229        ret = 1;
230      }
231      amt -= ret;
232    }
233  }
234  
235  /**
236   * Close the Closeable objects and <b>ignore</b> any {@link IOException} or 
237   * null pointers. Must only be used for cleanup in exception handlers.
238   *
239   * @param log the log to record problems to at debug level. Can be null.
240   * @param closeables the objects to close
241   */
242  public static void cleanup(Log log, java.io.Closeable... closeables) {
243    for (java.io.Closeable c : closeables) {
244      if (c != null) {
245        try {
246          c.close();
247        } catch(Throwable e) {
248          if (log != null && log.isDebugEnabled()) {
249            log.debug("Exception in closing " + c, e);
250          }
251        }
252      }
253    }
254  }
255
256  /**
257   * Closes the stream ignoring {@link IOException}.
258   * Must only be called in cleaning up from exception handlers.
259   *
260   * @param stream the Stream to close
261   */
262  public static void closeStream(java.io.Closeable stream) {
263    cleanup(null, stream);
264  }
265  
266  /**
267   * Closes the socket ignoring {@link IOException}
268   *
269   * @param sock the Socket to close
270   */
271  public static void closeSocket(Socket sock) {
272    if (sock != null) {
273      try {
274        sock.close();
275      } catch (IOException ignored) {
276        LOG.debug("Ignoring exception while closing socket", ignored);
277      }
278    }
279  }
280  
281  /**
282   * The /dev/null of OutputStreams.
283   */
284  public static class NullOutputStream extends OutputStream {
285    @Override
286    public void write(byte[] b, int off, int len) throws IOException {
287    }
288
289    @Override
290    public void write(int b) throws IOException {
291    }
292  }  
293  
294  /**
295   * Write a ByteBuffer to a WritableByteChannel, handling short writes.
296   * 
297   * @param bc               The WritableByteChannel to write to
298   * @param buf              The input buffer
299   * @throws IOException     On I/O error
300   */
301  public static void writeFully(WritableByteChannel bc, ByteBuffer buf)
302      throws IOException {
303    do {
304      bc.write(buf);
305    } while (buf.remaining() > 0);
306  }
307
308  /**
309   * Write a ByteBuffer to a FileChannel at a given offset, 
310   * handling short writes.
311   * 
312   * @param fc               The FileChannel to write to
313   * @param buf              The input buffer
314   * @param offset           The offset in the file to start writing at
315   * @throws IOException     On I/O error
316   */
317  public static void writeFully(FileChannel fc, ByteBuffer buf,
318      long offset) throws IOException {
319    do {
320      offset += fc.write(buf, offset);
321    } while (buf.remaining() > 0);
322  }
323
324  /**
325   * Return the complete list of files in a directory as strings.<p/>
326   *
327   * This is better than File#listDir because it does not ignore IOExceptions.
328   *
329   * @param dir              The directory to list.
330   * @param filter           If non-null, the filter to use when listing
331   *                         this directory.
332   * @return                 The list of files in the directory.
333   *
334   * @throws IOException     On I/O error
335   */
336  public static List<String> listDirectory(File dir, FilenameFilter filter)
337      throws IOException {
338    ArrayList<String> list = new ArrayList<String> ();
339    try (DirectoryStream<Path> stream =
340             Files.newDirectoryStream(dir.toPath())) {
341      for (Path entry: stream) {
342        String fileName = entry.getFileName().toString();
343        if ((filter == null) || filter.accept(dir, fileName)) {
344          list.add(fileName);
345        }
346      }
347    } catch (DirectoryIteratorException e) {
348      throw e.getCause();
349    }
350    return list;
351  }
352}