001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.io;
020
021import java.io.*;
022import java.net.Socket;
023import java.nio.ByteBuffer;
024import java.nio.channels.FileChannel;
025import java.nio.channels.WritableByteChannel;
026import java.nio.file.DirectoryStream;
027import java.nio.file.DirectoryIteratorException;
028import java.nio.file.Files;
029import java.nio.file.Path;
030import java.util.ArrayList;
031import java.util.List;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035
036import org.apache.hadoop.classification.InterfaceAudience;
037import org.apache.hadoop.classification.InterfaceStability;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.util.ChunkedArrayList;
040
041/**
042 * An utility class for I/O related functionality. 
043 */
044@InterfaceAudience.Public
045@InterfaceStability.Evolving
046public class IOUtils {
047  public static final Log LOG = LogFactory.getLog(IOUtils.class);
048
049  /**
050   * Copies from one stream to another.
051   *
052   * @param in InputStrem to read from
053   * @param out OutputStream to write to
054   * @param buffSize the size of the buffer 
055   * @param close whether or not close the InputStream and 
056   * OutputStream at the end. The streams are closed in the finally clause.  
057   */
058  public static void copyBytes(InputStream in, OutputStream out,
059                               int buffSize, boolean close)
060    throws IOException {
061    try {
062      copyBytes(in, out, buffSize);
063      if(close) {
064        out.close();
065        out = null;
066        in.close();
067        in = null;
068      }
069    } finally {
070      if(close) {
071        closeStream(out);
072        closeStream(in);
073      }
074    }
075  }
076  
077  /**
078   * Copies from one stream to another.
079   * 
080   * @param in InputStrem to read from
081   * @param out OutputStream to write to
082   * @param buffSize the size of the buffer 
083   */
084  public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
085    throws IOException {
086    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
087    byte buf[] = new byte[buffSize];
088    int bytesRead = in.read(buf);
089    while (bytesRead >= 0) {
090      out.write(buf, 0, bytesRead);
091      if ((ps != null) && ps.checkError()) {
092        throw new IOException("Unable to write to output stream.");
093      }
094      bytesRead = in.read(buf);
095    }
096  }
097
098  /**
099   * Copies from one stream to another. <strong>closes the input and output streams 
100   * at the end</strong>.
101   *
102   * @param in InputStrem to read from
103   * @param out OutputStream to write to
104   * @param conf the Configuration object 
105   */
106  public static void copyBytes(InputStream in, OutputStream out, Configuration conf)
107    throws IOException {
108    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true);
109  }
110  
111  /**
112   * Copies from one stream to another.
113   *
114   * @param in InputStream to read from
115   * @param out OutputStream to write to
116   * @param conf the Configuration object
117   * @param close whether or not close the InputStream and 
118   * OutputStream at the end. The streams are closed in the finally clause.
119   */
120  public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
121    throws IOException {
122    copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096),  close);
123  }
124
125  /**
126   * Copies count bytes from one stream to another.
127   *
128   * @param in InputStream to read from
129   * @param out OutputStream to write to
130   * @param count number of bytes to copy
131   * @param close whether to close the streams
132   * @throws IOException if bytes can not be read or written
133   */
134  public static void copyBytes(InputStream in, OutputStream out, long count,
135      boolean close) throws IOException {
136    byte buf[] = new byte[4096];
137    long bytesRemaining = count;
138    int bytesRead;
139
140    try {
141      while (bytesRemaining > 0) {
142        int bytesToRead = (int)
143          (bytesRemaining < buf.length ? bytesRemaining : buf.length);
144
145        bytesRead = in.read(buf, 0, bytesToRead);
146        if (bytesRead == -1)
147          break;
148
149        out.write(buf, 0, bytesRead);
150        bytesRemaining -= bytesRead;
151      }
152      if (close) {
153        out.close();
154        out = null;
155        in.close();
156        in = null;
157      }
158    } finally {
159      if (close) {
160        closeStream(out);
161        closeStream(in);
162      }
163    }
164  }
165  
166  /**
167   * Utility wrapper for reading from {@link InputStream}. It catches any errors
168   * thrown by the underlying stream (either IO or decompression-related), and
169   * re-throws as an IOException.
170   * 
171   * @param is - InputStream to be read from
172   * @param buf - buffer the data is read into
173   * @param off - offset within buf
174   * @param len - amount of data to be read
175   * @return number of bytes read
176   */
177  public static int wrappedReadForCompressedData(InputStream is, byte[] buf,
178      int off, int len) throws IOException {
179    try {
180      return is.read(buf, off, len);
181    } catch (IOException ie) {
182      throw ie;
183    } catch (Throwable t) {
184      throw new IOException("Error while reading compressed data", t);
185    }
186  }
187
188  /**
189   * Reads len bytes in a loop.
190   *
191   * @param in InputStream to read from
192   * @param buf The buffer to fill
193   * @param off offset from the buffer
194   * @param len the length of bytes to read
195   * @throws IOException if it could not read requested number of bytes 
196   * for any reason (including EOF)
197   */
198  public static void readFully(InputStream in, byte[] buf,
199      int off, int len) throws IOException {
200    int toRead = len;
201    while (toRead > 0) {
202      int ret = in.read(buf, off, toRead);
203      if (ret < 0) {
204        throw new IOException( "Premature EOF from inputStream");
205      }
206      toRead -= ret;
207      off += ret;
208    }
209  }
210  
211  /**
212   * Similar to readFully(). Skips bytes in a loop.
213   * @param in The InputStream to skip bytes from
214   * @param len number of bytes to skip.
215   * @throws IOException if it could not skip requested number of bytes 
216   * for any reason (including EOF)
217   */
218  public static void skipFully(InputStream in, long len) throws IOException {
219    long amt = len;
220    while (amt > 0) {
221      long ret = in.skip(amt);
222      if (ret == 0) {
223        // skip may return 0 even if we're not at EOF.  Luckily, we can 
224        // use the read() method to figure out if we're at the end.
225        int b = in.read();
226        if (b == -1) {
227          throw new EOFException( "Premature EOF from inputStream after " +
228              "skipping " + (len - amt) + " byte(s).");
229        }
230        ret = 1;
231      }
232      amt -= ret;
233    }
234  }
235  
236  /**
237   * Close the Closeable objects and <b>ignore</b> any {@link Throwable} or
238   * null pointers. Must only be used for cleanup in exception handlers.
239   *
240   * @param log the log to record problems to at debug level. Can be null.
241   * @param closeables the objects to close
242   */
243  public static void cleanup(Log log, java.io.Closeable... closeables) {
244    for (java.io.Closeable c : closeables) {
245      if (c != null) {
246        try {
247          c.close();
248        } catch(Throwable e) {
249          if (log != null && log.isDebugEnabled()) {
250            log.debug("Exception in closing " + c, e);
251          }
252        }
253      }
254    }
255  }
256
257  /**
258   * Closes the stream ignoring {@link Throwable}.
259   * Must only be called in cleaning up from exception handlers.
260   *
261   * @param stream the Stream to close
262   */
263  public static void closeStream(java.io.Closeable stream) {
264    if (stream != null) {
265      cleanup(null, stream);
266    }
267  }
268  
269  /**
270   * Closes the socket ignoring {@link IOException}
271   *
272   * @param sock the Socket to close
273   */
274  public static void closeSocket(Socket sock) {
275    if (sock != null) {
276      try {
277        sock.close();
278      } catch (IOException ignored) {
279        LOG.debug("Ignoring exception while closing socket", ignored);
280      }
281    }
282  }
283  
284  /**
285   * The /dev/null of OutputStreams.
286   */
287  public static class NullOutputStream extends OutputStream {
288    @Override
289    public void write(byte[] b, int off, int len) throws IOException {
290    }
291
292    @Override
293    public void write(int b) throws IOException {
294    }
295  }  
296  
297  /**
298   * Write a ByteBuffer to a WritableByteChannel, handling short writes.
299   * 
300   * @param bc               The WritableByteChannel to write to
301   * @param buf              The input buffer
302   * @throws IOException     On I/O error
303   */
304  public static void writeFully(WritableByteChannel bc, ByteBuffer buf)
305      throws IOException {
306    do {
307      bc.write(buf);
308    } while (buf.remaining() > 0);
309  }
310
311  /**
312   * Write a ByteBuffer to a FileChannel at a given offset, 
313   * handling short writes.
314   * 
315   * @param fc               The FileChannel to write to
316   * @param buf              The input buffer
317   * @param offset           The offset in the file to start writing at
318   * @throws IOException     On I/O error
319   */
320  public static void writeFully(FileChannel fc, ByteBuffer buf,
321      long offset) throws IOException {
322    do {
323      offset += fc.write(buf, offset);
324    } while (buf.remaining() > 0);
325  }
326
327  /**
328   * Return the complete list of files in a directory as strings.<p/>
329   *
330   * This is better than File#listDir because it does not ignore IOExceptions.
331   *
332   * @param dir              The directory to list.
333   * @param filter           If non-null, the filter to use when listing
334   *                         this directory.
335   * @return                 The list of files in the directory.
336   *
337   * @throws IOException     On I/O error
338   */
339  public static List<String> listDirectory(File dir, FilenameFilter filter)
340      throws IOException {
341    ArrayList<String> list = new ArrayList<String> ();
342    try (DirectoryStream<Path> stream =
343             Files.newDirectoryStream(dir.toPath())) {
344      for (Path entry: stream) {
345        String fileName = entry.getFileName().toString();
346        if ((filter == null) || filter.accept(dir, fileName)) {
347          list.add(fileName);
348        }
349      }
350    } catch (DirectoryIteratorException e) {
351      throw e.getCause();
352    }
353    return list;
354  }
355}