001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs; 020 021import java.io.EOFException; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InputStream; 025import java.nio.channels.ClosedChannelException; 026import java.util.Arrays; 027 028import org.apache.hadoop.classification.InterfaceAudience; 029import org.apache.hadoop.classification.InterfaceStability; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.permission.FsPermission; 032import org.apache.hadoop.util.DataChecksum; 033import org.apache.hadoop.util.Progressable; 034 035/**************************************************************** 036 * Abstract Checksumed FileSystem. 037 * It provide a basic implementation of a Checksumed FileSystem, 038 * which creates a checksum file for each raw file. 039 * It generates & verifies checksums at the client side. 040 * 041 *****************************************************************/ 042@InterfaceAudience.Public 043@InterfaceStability.Stable 044public abstract class ChecksumFileSystem extends FilterFileSystem { 045 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0}; 046 private int bytesPerChecksum = 512; 047 private boolean verifyChecksum = true; 048 private boolean writeChecksum = true; 049 050 public static double getApproxChkSumLength(long size) { 051 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size; 052 } 053 054 public ChecksumFileSystem(FileSystem fs) { 055 super(fs); 056 } 057 058 @Override 059 public void setConf(Configuration conf) { 060 super.setConf(conf); 061 if (conf != null) { 062 bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY, 063 LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT); 064 } 065 } 066 067 /** 068 * Set whether to verify checksum. 069 */ 070 @Override 071 public void setVerifyChecksum(boolean verifyChecksum) { 072 this.verifyChecksum = verifyChecksum; 073 } 074 075 @Override 076 public void setWriteChecksum(boolean writeChecksum) { 077 this.writeChecksum = writeChecksum; 078 } 079 080 /** get the raw file system */ 081 @Override 082 public FileSystem getRawFileSystem() { 083 return fs; 084 } 085 086 /** Return the name of the checksum file associated with a file.*/ 087 public Path getChecksumFile(Path file) { 088 return new Path(file.getParent(), "." + file.getName() + ".crc"); 089 } 090 091 /** Return true iff file is a checksum file name.*/ 092 public static boolean isChecksumFile(Path file) { 093 String name = file.getName(); 094 return name.startsWith(".") && name.endsWith(".crc"); 095 } 096 097 /** Return the length of the checksum file given the size of the 098 * actual file. 099 **/ 100 public long getChecksumFileLength(Path file, long fileSize) { 101 return getChecksumLength(fileSize, getBytesPerSum()); 102 } 103 104 /** Return the bytes Per Checksum */ 105 public int getBytesPerSum() { 106 return bytesPerChecksum; 107 } 108 109 private int getSumBufferSize(int bytesPerSum, int bufferSize) { 110 int defaultBufferSize = getConf().getInt( 111 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 112 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT); 113 int proportionalBufferSize = bufferSize / bytesPerSum; 114 return Math.max(bytesPerSum, 115 Math.max(proportionalBufferSize, defaultBufferSize)); 116 } 117 118 /******************************************************* 119 * For open()'s FSInputStream 120 * It verifies that data matches checksums. 121 *******************************************************/ 122 private static class ChecksumFSInputChecker extends FSInputChecker { 123 private ChecksumFileSystem fs; 124 private FSDataInputStream datas; 125 private FSDataInputStream sums; 126 127 private static final int HEADER_LENGTH = 8; 128 129 private int bytesPerSum = 1; 130 131 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) 132 throws IOException { 133 this(fs, file, fs.getConf().getInt( 134 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 135 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT)); 136 } 137 138 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) 139 throws IOException { 140 super( file, fs.getFileStatus(file).getReplication() ); 141 this.datas = fs.getRawFileSystem().open(file, bufferSize); 142 this.fs = fs; 143 Path sumFile = fs.getChecksumFile(file); 144 try { 145 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize); 146 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize); 147 148 byte[] version = new byte[CHECKSUM_VERSION.length]; 149 sums.readFully(version); 150 if (!Arrays.equals(version, CHECKSUM_VERSION)) 151 throw new IOException("Not a checksum file: "+sumFile); 152 this.bytesPerSum = sums.readInt(); 153 set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4); 154 } catch (FileNotFoundException e) { // quietly ignore 155 set(fs.verifyChecksum, null, 1, 0); 156 } catch (IOException e) { // loudly ignore 157 LOG.warn("Problem opening checksum file: "+ file + 158 ". Ignoring exception: " , e); 159 set(fs.verifyChecksum, null, 1, 0); 160 } 161 } 162 163 private long getChecksumFilePos( long dataPos ) { 164 return HEADER_LENGTH + 4*(dataPos/bytesPerSum); 165 } 166 167 @Override 168 protected long getChunkPosition( long dataPos ) { 169 return dataPos/bytesPerSum*bytesPerSum; 170 } 171 172 @Override 173 public int available() throws IOException { 174 return datas.available() + super.available(); 175 } 176 177 @Override 178 public int read(long position, byte[] b, int off, int len) 179 throws IOException { 180 // parameter check 181 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 182 throw new IndexOutOfBoundsException(); 183 } else if (len == 0) { 184 return 0; 185 } 186 if( position<0 ) { 187 throw new IllegalArgumentException( 188 "Parameter position can not to be negative"); 189 } 190 191 ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file); 192 checker.seek(position); 193 int nread = checker.read(b, off, len); 194 checker.close(); 195 return nread; 196 } 197 198 @Override 199 public void close() throws IOException { 200 datas.close(); 201 if( sums != null ) { 202 sums.close(); 203 } 204 set(fs.verifyChecksum, null, 1, 0); 205 } 206 207 208 @Override 209 public boolean seekToNewSource(long targetPos) throws IOException { 210 long sumsPos = getChecksumFilePos(targetPos); 211 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos); 212 boolean newDataSource = datas.seekToNewSource(targetPos); 213 return sums.seekToNewSource(sumsPos) || newDataSource; 214 } 215 216 @Override 217 protected int readChunk(long pos, byte[] buf, int offset, int len, 218 byte[] checksum) throws IOException { 219 220 boolean eof = false; 221 if (needChecksum()) { 222 assert checksum != null; // we have a checksum buffer 223 assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length 224 assert len >= bytesPerSum; // we must read at least one chunk 225 226 final int checksumsToRead = Math.min( 227 len/bytesPerSum, // number of checksums based on len to read 228 checksum.length / CHECKSUM_SIZE); // size of checksum buffer 229 long checksumPos = getChecksumFilePos(pos); 230 if(checksumPos != sums.getPos()) { 231 sums.seek(checksumPos); 232 } 233 234 int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead); 235 if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) { 236 throw new ChecksumException( 237 "Checksum file not a length multiple of checksum size " + 238 "in " + file + " at " + pos + " checksumpos: " + checksumPos + 239 " sumLenread: " + sumLenRead, 240 pos); 241 } 242 if (sumLenRead <= 0) { // we're at the end of the file 243 eof = true; 244 } else { 245 // Adjust amount of data to read based on how many checksum chunks we read 246 len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE)); 247 } 248 } 249 if(pos != datas.getPos()) { 250 datas.seek(pos); 251 } 252 int nread = readFully(datas, buf, offset, len); 253 if (eof && nread > 0) { 254 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos); 255 } 256 return nread; 257 } 258 } 259 260 private static class FSDataBoundedInputStream extends FSDataInputStream { 261 private FileSystem fs; 262 private Path file; 263 private long fileLen = -1L; 264 265 FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) { 266 super(in); 267 this.fs = fs; 268 this.file = file; 269 } 270 271 @Override 272 public boolean markSupported() { 273 return false; 274 } 275 276 /* Return the file length */ 277 private long getFileLength() throws IOException { 278 if( fileLen==-1L ) { 279 fileLen = fs.getContentSummary(file).getLength(); 280 } 281 return fileLen; 282 } 283 284 /** 285 * Skips over and discards <code>n</code> bytes of data from the 286 * input stream. 287 * 288 *The <code>skip</code> method skips over some smaller number of bytes 289 * when reaching end of file before <code>n</code> bytes have been skipped. 290 * The actual number of bytes skipped is returned. If <code>n</code> is 291 * negative, no bytes are skipped. 292 * 293 * @param n the number of bytes to be skipped. 294 * @return the actual number of bytes skipped. 295 * @exception IOException if an I/O error occurs. 296 * ChecksumException if the chunk to skip to is corrupted 297 */ 298 @Override 299 public synchronized long skip(long n) throws IOException { 300 long curPos = getPos(); 301 long fileLength = getFileLength(); 302 if( n+curPos > fileLength ) { 303 n = fileLength - curPos; 304 } 305 return super.skip(n); 306 } 307 308 /** 309 * Seek to the given position in the stream. 310 * The next read() will be from that position. 311 * 312 * <p>This method does not allow seek past the end of the file. 313 * This produces IOException. 314 * 315 * @param pos the postion to seek to. 316 * @exception IOException if an I/O error occurs or seeks after EOF 317 * ChecksumException if the chunk to seek to is corrupted 318 */ 319 320 @Override 321 public synchronized void seek(long pos) throws IOException { 322 if (pos > getFileLength()) { 323 throw new EOFException("Cannot seek after EOF"); 324 } 325 super.seek(pos); 326 } 327 328 } 329 330 /** 331 * Opens an FSDataInputStream at the indicated Path. 332 * @param f the file name to open 333 * @param bufferSize the size of the buffer to be used. 334 */ 335 @Override 336 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 337 FileSystem fs; 338 InputStream in; 339 if (verifyChecksum) { 340 fs = this; 341 in = new ChecksumFSInputChecker(this, f, bufferSize); 342 } else { 343 fs = getRawFileSystem(); 344 in = fs.open(f, bufferSize); 345 } 346 return new FSDataBoundedInputStream(fs, f, in); 347 } 348 349 @Override 350 public FSDataOutputStream append(Path f, int bufferSize, 351 Progressable progress) throws IOException { 352 throw new IOException("Not supported"); 353 } 354 355 @Override 356 public boolean truncate(Path f, long newLength) throws IOException { 357 throw new IOException("Not supported"); 358 } 359 360 /** 361 * Calculated the length of the checksum file in bytes. 362 * @param size the length of the data file in bytes 363 * @param bytesPerSum the number of bytes in a checksum block 364 * @return the number of bytes in the checksum file 365 */ 366 public static long getChecksumLength(long size, int bytesPerSum) { 367 //the checksum length is equal to size passed divided by bytesPerSum + 368 //bytes written in the beginning of the checksum file. 369 return ((size + bytesPerSum - 1) / bytesPerSum) * 4 + 370 CHECKSUM_VERSION.length + 4; 371 } 372 373 /** This class provides an output stream for a checksummed file. 374 * It generates checksums for data. */ 375 private static class ChecksumFSOutputSummer extends FSOutputSummer { 376 private FSDataOutputStream datas; 377 private FSDataOutputStream sums; 378 private static final float CHKSUM_AS_FRACTION = 0.01f; 379 private boolean isClosed = false; 380 381 public ChecksumFSOutputSummer(ChecksumFileSystem fs, 382 Path file, 383 boolean overwrite, 384 int bufferSize, 385 short replication, 386 long blockSize, 387 Progressable progress, 388 FsPermission permission) 389 throws IOException { 390 super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 391 fs.getBytesPerSum())); 392 int bytesPerSum = fs.getBytesPerSum(); 393 this.datas = fs.getRawFileSystem().create(file, permission, overwrite, 394 bufferSize, replication, blockSize, 395 progress); 396 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize); 397 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), 398 permission, true, sumBufferSize, 399 replication, blockSize, null); 400 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length); 401 sums.writeInt(bytesPerSum); 402 } 403 404 @Override 405 public void close() throws IOException { 406 try { 407 flushBuffer(); 408 sums.close(); 409 datas.close(); 410 } finally { 411 isClosed = true; 412 } 413 } 414 415 @Override 416 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum, 417 int ckoff, int cklen) 418 throws IOException { 419 datas.write(b, offset, len); 420 sums.write(checksum, ckoff, cklen); 421 } 422 423 @Override 424 protected void checkClosed() throws IOException { 425 if (isClosed) { 426 throw new ClosedChannelException(); 427 } 428 } 429 } 430 431 @Override 432 public FSDataOutputStream create(Path f, FsPermission permission, 433 boolean overwrite, int bufferSize, short replication, long blockSize, 434 Progressable progress) throws IOException { 435 return create(f, permission, overwrite, true, bufferSize, 436 replication, blockSize, progress); 437 } 438 439 private FSDataOutputStream create(Path f, FsPermission permission, 440 boolean overwrite, boolean createParent, int bufferSize, 441 short replication, long blockSize, 442 Progressable progress) throws IOException { 443 Path parent = f.getParent(); 444 if (parent != null) { 445 if (!createParent && !exists(parent)) { 446 throw new FileNotFoundException("Parent directory doesn't exist: " 447 + parent); 448 } else if (!mkdirs(parent)) { 449 throw new IOException("Mkdirs failed to create " + parent 450 + " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory() 451 + ")"); 452 } 453 } 454 final FSDataOutputStream out; 455 if (writeChecksum) { 456 out = new FSDataOutputStream( 457 new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication, 458 blockSize, progress, permission), null); 459 } else { 460 out = fs.create(f, permission, overwrite, bufferSize, replication, 461 blockSize, progress); 462 // remove the checksum file since we aren't writing one 463 Path checkFile = getChecksumFile(f); 464 if (fs.exists(checkFile)) { 465 fs.delete(checkFile, true); 466 } 467 } 468 return out; 469 } 470 471 @Override 472 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 473 boolean overwrite, int bufferSize, short replication, long blockSize, 474 Progressable progress) throws IOException { 475 return create(f, permission, overwrite, false, bufferSize, replication, 476 blockSize, progress); 477 } 478 479 /** 480 * Set replication for an existing file. 481 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt> 482 * @param src file name 483 * @param replication new replication 484 * @throws IOException 485 * @return true if successful; 486 * false if file does not exist or is a directory 487 */ 488 @Override 489 public boolean setReplication(Path src, short replication) throws IOException { 490 boolean value = fs.setReplication(src, replication); 491 if (!value) 492 return false; 493 494 Path checkFile = getChecksumFile(src); 495 if (exists(checkFile)) 496 fs.setReplication(checkFile, replication); 497 498 return true; 499 } 500 501 /** 502 * Rename files/dirs 503 */ 504 @Override 505 public boolean rename(Path src, Path dst) throws IOException { 506 if (fs.isDirectory(src)) { 507 return fs.rename(src, dst); 508 } else { 509 if (fs.isDirectory(dst)) { 510 dst = new Path(dst, src.getName()); 511 } 512 513 boolean value = fs.rename(src, dst); 514 if (!value) 515 return false; 516 517 Path srcCheckFile = getChecksumFile(src); 518 Path dstCheckFile = getChecksumFile(dst); 519 if (fs.exists(srcCheckFile)) { //try to rename checksum 520 value = fs.rename(srcCheckFile, dstCheckFile); 521 } else if (fs.exists(dstCheckFile)) { 522 // no src checksum, so remove dst checksum 523 value = fs.delete(dstCheckFile, true); 524 } 525 526 return value; 527 } 528 } 529 530 /** 531 * Implement the delete(Path, boolean) in checksum 532 * file system. 533 */ 534 @Override 535 public boolean delete(Path f, boolean recursive) throws IOException{ 536 FileStatus fstatus = null; 537 try { 538 fstatus = fs.getFileStatus(f); 539 } catch(FileNotFoundException e) { 540 return false; 541 } 542 if (fstatus.isDirectory()) { 543 //this works since the crcs are in the same 544 //directories and the files. so we just delete 545 //everything in the underlying filesystem 546 return fs.delete(f, recursive); 547 } else { 548 Path checkFile = getChecksumFile(f); 549 if (fs.exists(checkFile)) { 550 fs.delete(checkFile, true); 551 } 552 return fs.delete(f, true); 553 } 554 } 555 556 final private static PathFilter DEFAULT_FILTER = new PathFilter() { 557 @Override 558 public boolean accept(Path file) { 559 return !isChecksumFile(file); 560 } 561 }; 562 563 /** 564 * List the statuses of the files/directories in the given path if the path is 565 * a directory. 566 * 567 * @param f 568 * given path 569 * @return the statuses of the files/directories in the given path 570 * @throws IOException 571 */ 572 @Override 573 public FileStatus[] listStatus(Path f) throws IOException { 574 return fs.listStatus(f, DEFAULT_FILTER); 575 } 576 577 /** 578 * List the statuses of the files/directories in the given path if the path is 579 * a directory. 580 * 581 * @param f 582 * given path 583 * @return the statuses of the files/directories in the given patch 584 * @throws IOException 585 */ 586 @Override 587 public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) 588 throws IOException { 589 return fs.listLocatedStatus(f, DEFAULT_FILTER); 590 } 591 592 @Override 593 public boolean mkdirs(Path f) throws IOException { 594 return fs.mkdirs(f); 595 } 596 597 @Override 598 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) 599 throws IOException { 600 Configuration conf = getConf(); 601 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf); 602 } 603 604 /** 605 * The src file is under FS, and the dst is on the local disk. 606 * Copy it from FS control to the local dst name. 607 */ 608 @Override 609 public void copyToLocalFile(boolean delSrc, Path src, Path dst) 610 throws IOException { 611 Configuration conf = getConf(); 612 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf); 613 } 614 615 /** 616 * The src file is under FS, and the dst is on the local disk. 617 * Copy it from FS control to the local dst name. 618 * If src and dst are directories, the copyCrc parameter 619 * determines whether to copy CRC files. 620 */ 621 public void copyToLocalFile(Path src, Path dst, boolean copyCrc) 622 throws IOException { 623 if (!fs.isDirectory(src)) { // source is a file 624 fs.copyToLocalFile(src, dst); 625 FileSystem localFs = getLocal(getConf()).getRawFileSystem(); 626 if (localFs.isDirectory(dst)) { 627 dst = new Path(dst, src.getName()); 628 } 629 dst = getChecksumFile(dst); 630 if (localFs.exists(dst)) { //remove old local checksum file 631 localFs.delete(dst, true); 632 } 633 Path checksumFile = getChecksumFile(src); 634 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file 635 fs.copyToLocalFile(checksumFile, dst); 636 } 637 } else { 638 FileStatus[] srcs = listStatus(src); 639 for (FileStatus srcFile : srcs) { 640 copyToLocalFile(srcFile.getPath(), 641 new Path(dst, srcFile.getPath().getName()), copyCrc); 642 } 643 } 644 } 645 646 @Override 647 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 648 throws IOException { 649 return tmpLocalFile; 650 } 651 652 @Override 653 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 654 throws IOException { 655 moveFromLocalFile(tmpLocalFile, fsOutputFile); 656 } 657 658 /** 659 * Report a checksum error to the file system. 660 * @param f the file name containing the error 661 * @param in the stream open on the file 662 * @param inPos the position of the beginning of the bad data in the file 663 * @param sums the stream open on the checksum file 664 * @param sumsPos the position of the beginning of the bad data in the checksum file 665 * @return if retry is necessary 666 */ 667 public boolean reportChecksumFailure(Path f, FSDataInputStream in, 668 long inPos, FSDataInputStream sums, long sumsPos) { 669 return false; 670 } 671}