001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022import java.net.Socket; 023import java.nio.ByteBuffer; 024import java.nio.channels.FileChannel; 025import java.nio.channels.WritableByteChannel; 026import java.nio.file.DirectoryStream; 027import java.nio.file.DirectoryIteratorException; 028import java.nio.file.Files; 029import java.nio.file.Path; 030import java.util.ArrayList; 031import java.util.List; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035 036import org.apache.hadoop.classification.InterfaceAudience; 037import org.apache.hadoop.classification.InterfaceStability; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.util.ChunkedArrayList; 040 041/** 042 * An utility class for I/O related functionality. 043 */ 044@InterfaceAudience.Public 045@InterfaceStability.Evolving 046public class IOUtils { 047 public static final Log LOG = LogFactory.getLog(IOUtils.class); 048 049 /** 050 * Copies from one stream to another. 051 * 052 * @param in InputStrem to read from 053 * @param out OutputStream to write to 054 * @param buffSize the size of the buffer 055 * @param close whether or not close the InputStream and 056 * OutputStream at the end. The streams are closed in the finally clause. 057 */ 058 public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 059 throws IOException { 060 try { 061 copyBytes(in, out, buffSize); 062 if(close) { 063 out.close(); 064 out = null; 065 in.close(); 066 in = null; 067 } 068 } finally { 069 if(close) { 070 closeStream(out); 071 closeStream(in); 072 } 073 } 074 } 075 076 /** 077 * Copies from one stream to another. 078 * 079 * @param in InputStrem to read from 080 * @param out OutputStream to write to 081 * @param buffSize the size of the buffer 082 */ 083 public static void copyBytes(InputStream in, OutputStream out, int buffSize) 084 throws IOException { 085 PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; 086 byte buf[] = new byte[buffSize]; 087 int bytesRead = in.read(buf); 088 while (bytesRead >= 0) { 089 out.write(buf, 0, bytesRead); 090 if ((ps != null) && ps.checkError()) { 091 throw new IOException("Unable to write to output stream."); 092 } 093 bytesRead = in.read(buf); 094 } 095 } 096 097 /** 098 * Copies from one stream to another. <strong>closes the input and output streams 099 * at the end</strong>. 100 * 101 * @param in InputStrem to read from 102 * @param out OutputStream to write to 103 * @param conf the Configuration object 104 */ 105 public static void copyBytes(InputStream in, OutputStream out, Configuration conf) 106 throws IOException { 107 copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true); 108 } 109 110 /** 111 * Copies from one stream to another. 112 * 113 * @param in InputStream to read from 114 * @param out OutputStream to write to 115 * @param conf the Configuration object 116 * @param close whether or not close the InputStream and 117 * OutputStream at the end. The streams are closed in the finally clause. 118 */ 119 public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close) 120 throws IOException { 121 copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close); 122 } 123 124 /** 125 * Copies count bytes from one stream to another. 126 * 127 * @param in InputStream to read from 128 * @param out OutputStream to write to 129 * @param count number of bytes to copy 130 * @param close whether to close the streams 131 * @throws IOException if bytes can not be read or written 132 */ 133 public static void copyBytes(InputStream in, OutputStream out, long count, 134 boolean close) throws IOException { 135 byte buf[] = new byte[4096]; 136 long bytesRemaining = count; 137 int bytesRead; 138 139 try { 140 while (bytesRemaining > 0) { 141 int bytesToRead = (int) 142 (bytesRemaining < buf.length ? bytesRemaining : buf.length); 143 144 bytesRead = in.read(buf, 0, bytesToRead); 145 if (bytesRead == -1) 146 break; 147 148 out.write(buf, 0, bytesRead); 149 bytesRemaining -= bytesRead; 150 } 151 if (close) { 152 out.close(); 153 out = null; 154 in.close(); 155 in = null; 156 } 157 } finally { 158 if (close) { 159 closeStream(out); 160 closeStream(in); 161 } 162 } 163 } 164 165 /** 166 * Utility wrapper for reading from {@link InputStream}. It catches any errors 167 * thrown by the underlying stream (either IO or decompression-related), and 168 * re-throws as an IOException. 169 * 170 * @param is - InputStream to be read from 171 * @param buf - buffer the data is read into 172 * @param off - offset within buf 173 * @param len - amount of data to be read 174 * @return number of bytes read 175 */ 176 public static int wrappedReadForCompressedData(InputStream is, byte[] buf, 177 int off, int len) throws IOException { 178 try { 179 return is.read(buf, off, len); 180 } catch (IOException ie) { 181 throw ie; 182 } catch (Throwable t) { 183 throw new IOException("Error while reading compressed data", t); 184 } 185 } 186 187 /** 188 * Reads len bytes in a loop. 189 * 190 * @param in InputStream to read from 191 * @param buf The buffer to fill 192 * @param off offset from the buffer 193 * @param len the length of bytes to read 194 * @throws IOException if it could not read requested number of bytes 195 * for any reason (including EOF) 196 */ 197 public static void readFully(InputStream in, byte buf[], 198 int off, int len) throws IOException { 199 int toRead = len; 200 while (toRead > 0) { 201 int ret = in.read(buf, off, toRead); 202 if (ret < 0) { 203 throw new IOException( "Premature EOF from inputStream"); 204 } 205 toRead -= ret; 206 off += ret; 207 } 208 } 209 210 /** 211 * Similar to readFully(). Skips bytes in a loop. 212 * @param in The InputStream to skip bytes from 213 * @param len number of bytes to skip. 214 * @throws IOException if it could not skip requested number of bytes 215 * for any reason (including EOF) 216 */ 217 public static void skipFully(InputStream in, long len) throws IOException { 218 long amt = len; 219 while (amt > 0) { 220 long ret = in.skip(amt); 221 if (ret == 0) { 222 // skip may return 0 even if we're not at EOF. Luckily, we can 223 // use the read() method to figure out if we're at the end. 224 int b = in.read(); 225 if (b == -1) { 226 throw new EOFException( "Premature EOF from inputStream after " + 227 "skipping " + (len - amt) + " byte(s)."); 228 } 229 ret = 1; 230 } 231 amt -= ret; 232 } 233 } 234 235 /** 236 * Close the Closeable objects and <b>ignore</b> any {@link IOException} or 237 * null pointers. Must only be used for cleanup in exception handlers. 238 * 239 * @param log the log to record problems to at debug level. Can be null. 240 * @param closeables the objects to close 241 */ 242 public static void cleanup(Log log, java.io.Closeable... closeables) { 243 for (java.io.Closeable c : closeables) { 244 if (c != null) { 245 try { 246 c.close(); 247 } catch(Throwable e) { 248 if (log != null && log.isDebugEnabled()) { 249 log.debug("Exception in closing " + c, e); 250 } 251 } 252 } 253 } 254 } 255 256 /** 257 * Closes the stream ignoring {@link IOException}. 258 * Must only be called in cleaning up from exception handlers. 259 * 260 * @param stream the Stream to close 261 */ 262 public static void closeStream(java.io.Closeable stream) { 263 cleanup(null, stream); 264 } 265 266 /** 267 * Closes the socket ignoring {@link IOException} 268 * 269 * @param sock the Socket to close 270 */ 271 public static void closeSocket(Socket sock) { 272 if (sock != null) { 273 try { 274 sock.close(); 275 } catch (IOException ignored) { 276 LOG.debug("Ignoring exception while closing socket", ignored); 277 } 278 } 279 } 280 281 /** 282 * The /dev/null of OutputStreams. 283 */ 284 public static class NullOutputStream extends OutputStream { 285 @Override 286 public void write(byte[] b, int off, int len) throws IOException { 287 } 288 289 @Override 290 public void write(int b) throws IOException { 291 } 292 } 293 294 /** 295 * Write a ByteBuffer to a WritableByteChannel, handling short writes. 296 * 297 * @param bc The WritableByteChannel to write to 298 * @param buf The input buffer 299 * @throws IOException On I/O error 300 */ 301 public static void writeFully(WritableByteChannel bc, ByteBuffer buf) 302 throws IOException { 303 do { 304 bc.write(buf); 305 } while (buf.remaining() > 0); 306 } 307 308 /** 309 * Write a ByteBuffer to a FileChannel at a given offset, 310 * handling short writes. 311 * 312 * @param fc The FileChannel to write to 313 * @param buf The input buffer 314 * @param offset The offset in the file to start writing at 315 * @throws IOException On I/O error 316 */ 317 public static void writeFully(FileChannel fc, ByteBuffer buf, 318 long offset) throws IOException { 319 do { 320 offset += fc.write(buf, offset); 321 } while (buf.remaining() > 0); 322 } 323 324 /** 325 * Return the complete list of files in a directory as strings.<p/> 326 * 327 * This is better than File#listDir because it does not ignore IOExceptions. 328 * 329 * @param dir The directory to list. 330 * @param filter If non-null, the filter to use when listing 331 * this directory. 332 * @return The list of files in the directory. 333 * 334 * @throws IOException On I/O error 335 */ 336 public static List<String> listDirectory(File dir, FilenameFilter filter) 337 throws IOException { 338 ArrayList<String> list = new ArrayList<String> (); 339 try (DirectoryStream<Path> stream = 340 Files.newDirectoryStream(dir.toPath())) { 341 for (Path entry: stream) { 342 String fileName = entry.getFileName().toString(); 343 if ((filter == null) || filter.accept(dir, fileName)) { 344 list.add(fileName); 345 } 346 } 347 } catch (DirectoryIteratorException e) { 348 throw e.getCause(); 349 } 350 return list; 351 } 352}