001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.io; 020 021import java.io.*; 022import java.net.Socket; 023import java.nio.ByteBuffer; 024import java.nio.channels.FileChannel; 025import java.nio.channels.WritableByteChannel; 026import java.nio.file.DirectoryStream; 027import java.nio.file.DirectoryIteratorException; 028import java.nio.file.Files; 029import java.nio.file.Path; 030import java.util.ArrayList; 031import java.util.List; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035 036import org.apache.hadoop.classification.InterfaceAudience; 037import org.apache.hadoop.classification.InterfaceStability; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.util.ChunkedArrayList; 040 041/** 042 * An utility class for I/O related functionality. 043 */ 044@InterfaceAudience.Public 045@InterfaceStability.Evolving 046public class IOUtils { 047 public static final Log LOG = LogFactory.getLog(IOUtils.class); 048 049 /** 050 * Copies from one stream to another. 051 * 052 * @param in InputStrem to read from 053 * @param out OutputStream to write to 054 * @param buffSize the size of the buffer 055 * @param close whether or not close the InputStream and 056 * OutputStream at the end. The streams are closed in the finally clause. 057 */ 058 public static void copyBytes(InputStream in, OutputStream out, 059 int buffSize, boolean close) 060 throws IOException { 061 try { 062 copyBytes(in, out, buffSize); 063 if(close) { 064 out.close(); 065 out = null; 066 in.close(); 067 in = null; 068 } 069 } finally { 070 if(close) { 071 closeStream(out); 072 closeStream(in); 073 } 074 } 075 } 076 077 /** 078 * Copies from one stream to another. 079 * 080 * @param in InputStrem to read from 081 * @param out OutputStream to write to 082 * @param buffSize the size of the buffer 083 */ 084 public static void copyBytes(InputStream in, OutputStream out, int buffSize) 085 throws IOException { 086 PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; 087 byte buf[] = new byte[buffSize]; 088 int bytesRead = in.read(buf); 089 while (bytesRead >= 0) { 090 out.write(buf, 0, bytesRead); 091 if ((ps != null) && ps.checkError()) { 092 throw new IOException("Unable to write to output stream."); 093 } 094 bytesRead = in.read(buf); 095 } 096 } 097 098 /** 099 * Copies from one stream to another. <strong>closes the input and output streams 100 * at the end</strong>. 101 * 102 * @param in InputStrem to read from 103 * @param out OutputStream to write to 104 * @param conf the Configuration object 105 */ 106 public static void copyBytes(InputStream in, OutputStream out, Configuration conf) 107 throws IOException { 108 copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), true); 109 } 110 111 /** 112 * Copies from one stream to another. 113 * 114 * @param in InputStream to read from 115 * @param out OutputStream to write to 116 * @param conf the Configuration object 117 * @param close whether or not close the InputStream and 118 * OutputStream at the end. The streams are closed in the finally clause. 119 */ 120 public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close) 121 throws IOException { 122 copyBytes(in, out, conf.getInt("io.file.buffer.size", 4096), close); 123 } 124 125 /** 126 * Copies count bytes from one stream to another. 127 * 128 * @param in InputStream to read from 129 * @param out OutputStream to write to 130 * @param count number of bytes to copy 131 * @param close whether to close the streams 132 * @throws IOException if bytes can not be read or written 133 */ 134 public static void copyBytes(InputStream in, OutputStream out, long count, 135 boolean close) throws IOException { 136 byte buf[] = new byte[4096]; 137 long bytesRemaining = count; 138 int bytesRead; 139 140 try { 141 while (bytesRemaining > 0) { 142 int bytesToRead = (int) 143 (bytesRemaining < buf.length ? bytesRemaining : buf.length); 144 145 bytesRead = in.read(buf, 0, bytesToRead); 146 if (bytesRead == -1) 147 break; 148 149 out.write(buf, 0, bytesRead); 150 bytesRemaining -= bytesRead; 151 } 152 if (close) { 153 out.close(); 154 out = null; 155 in.close(); 156 in = null; 157 } 158 } finally { 159 if (close) { 160 closeStream(out); 161 closeStream(in); 162 } 163 } 164 } 165 166 /** 167 * Utility wrapper for reading from {@link InputStream}. It catches any errors 168 * thrown by the underlying stream (either IO or decompression-related), and 169 * re-throws as an IOException. 170 * 171 * @param is - InputStream to be read from 172 * @param buf - buffer the data is read into 173 * @param off - offset within buf 174 * @param len - amount of data to be read 175 * @return number of bytes read 176 */ 177 public static int wrappedReadForCompressedData(InputStream is, byte[] buf, 178 int off, int len) throws IOException { 179 try { 180 return is.read(buf, off, len); 181 } catch (IOException ie) { 182 throw ie; 183 } catch (Throwable t) { 184 throw new IOException("Error while reading compressed data", t); 185 } 186 } 187 188 /** 189 * Reads len bytes in a loop. 190 * 191 * @param in InputStream to read from 192 * @param buf The buffer to fill 193 * @param off offset from the buffer 194 * @param len the length of bytes to read 195 * @throws IOException if it could not read requested number of bytes 196 * for any reason (including EOF) 197 */ 198 public static void readFully(InputStream in, byte[] buf, 199 int off, int len) throws IOException { 200 int toRead = len; 201 while (toRead > 0) { 202 int ret = in.read(buf, off, toRead); 203 if (ret < 0) { 204 throw new IOException( "Premature EOF from inputStream"); 205 } 206 toRead -= ret; 207 off += ret; 208 } 209 } 210 211 /** 212 * Similar to readFully(). Skips bytes in a loop. 213 * @param in The InputStream to skip bytes from 214 * @param len number of bytes to skip. 215 * @throws IOException if it could not skip requested number of bytes 216 * for any reason (including EOF) 217 */ 218 public static void skipFully(InputStream in, long len) throws IOException { 219 long amt = len; 220 while (amt > 0) { 221 long ret = in.skip(amt); 222 if (ret == 0) { 223 // skip may return 0 even if we're not at EOF. Luckily, we can 224 // use the read() method to figure out if we're at the end. 225 int b = in.read(); 226 if (b == -1) { 227 throw new EOFException( "Premature EOF from inputStream after " + 228 "skipping " + (len - amt) + " byte(s)."); 229 } 230 ret = 1; 231 } 232 amt -= ret; 233 } 234 } 235 236 /** 237 * Close the Closeable objects and <b>ignore</b> any {@link Throwable} or 238 * null pointers. Must only be used for cleanup in exception handlers. 239 * 240 * @param log the log to record problems to at debug level. Can be null. 241 * @param closeables the objects to close 242 */ 243 public static void cleanup(Log log, java.io.Closeable... closeables) { 244 for (java.io.Closeable c : closeables) { 245 if (c != null) { 246 try { 247 c.close(); 248 } catch(Throwable e) { 249 if (log != null && log.isDebugEnabled()) { 250 log.debug("Exception in closing " + c, e); 251 } 252 } 253 } 254 } 255 } 256 257 /** 258 * Closes the stream ignoring {@link Throwable}. 259 * Must only be called in cleaning up from exception handlers. 260 * 261 * @param stream the Stream to close 262 */ 263 public static void closeStream(java.io.Closeable stream) { 264 if (stream != null) { 265 cleanup(null, stream); 266 } 267 } 268 269 /** 270 * Closes the socket ignoring {@link IOException} 271 * 272 * @param sock the Socket to close 273 */ 274 public static void closeSocket(Socket sock) { 275 if (sock != null) { 276 try { 277 sock.close(); 278 } catch (IOException ignored) { 279 LOG.debug("Ignoring exception while closing socket", ignored); 280 } 281 } 282 } 283 284 /** 285 * The /dev/null of OutputStreams. 286 */ 287 public static class NullOutputStream extends OutputStream { 288 @Override 289 public void write(byte[] b, int off, int len) throws IOException { 290 } 291 292 @Override 293 public void write(int b) throws IOException { 294 } 295 } 296 297 /** 298 * Write a ByteBuffer to a WritableByteChannel, handling short writes. 299 * 300 * @param bc The WritableByteChannel to write to 301 * @param buf The input buffer 302 * @throws IOException On I/O error 303 */ 304 public static void writeFully(WritableByteChannel bc, ByteBuffer buf) 305 throws IOException { 306 do { 307 bc.write(buf); 308 } while (buf.remaining() > 0); 309 } 310 311 /** 312 * Write a ByteBuffer to a FileChannel at a given offset, 313 * handling short writes. 314 * 315 * @param fc The FileChannel to write to 316 * @param buf The input buffer 317 * @param offset The offset in the file to start writing at 318 * @throws IOException On I/O error 319 */ 320 public static void writeFully(FileChannel fc, ByteBuffer buf, 321 long offset) throws IOException { 322 do { 323 offset += fc.write(buf, offset); 324 } while (buf.remaining() > 0); 325 } 326 327 /** 328 * Return the complete list of files in a directory as strings.<p/> 329 * 330 * This is better than File#listDir because it does not ignore IOExceptions. 331 * 332 * @param dir The directory to list. 333 * @param filter If non-null, the filter to use when listing 334 * this directory. 335 * @return The list of files in the directory. 336 * 337 * @throws IOException On I/O error 338 */ 339 public static List<String> listDirectory(File dir, FilenameFilter filter) 340 throws IOException { 341 ArrayList<String> list = new ArrayList<String> (); 342 try (DirectoryStream<Path> stream = 343 Files.newDirectoryStream(dir.toPath())) { 344 for (Path entry: stream) { 345 String fileName = entry.getFileName().toString(); 346 if ((filter == null) || filter.accept(dir, fileName)) { 347 list.add(fileName); 348 } 349 } 350 } catch (DirectoryIteratorException e) { 351 throw e.getCause(); 352 } 353 return list; 354 } 355}