001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs; 020 021import java.io.EOFException; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InputStream; 025import java.nio.channels.ClosedChannelException; 026import java.util.Arrays; 027 028import com.google.common.base.Preconditions; 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.permission.FsPermission; 033import org.apache.hadoop.util.DataChecksum; 034import org.apache.hadoop.util.Progressable; 035 036/**************************************************************** 037 * Abstract Checksumed FileSystem. 038 * It provide a basic implementation of a Checksumed FileSystem, 039 * which creates a checksum file for each raw file. 040 * It generates & verifies checksums at the client side. 041 * 042 *****************************************************************/ 043@InterfaceAudience.Public 044@InterfaceStability.Stable 045public abstract class ChecksumFileSystem extends FilterFileSystem { 046 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0}; 047 private int bytesPerChecksum = 512; 048 private boolean verifyChecksum = true; 049 private boolean writeChecksum = true; 050 051 public static double getApproxChkSumLength(long size) { 052 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size; 053 } 054 055 public ChecksumFileSystem(FileSystem fs) { 056 super(fs); 057 } 058 059 @Override 060 public void setConf(Configuration conf) { 061 super.setConf(conf); 062 if (conf != null) { 063 bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY, 064 LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT); 065 Preconditions.checkState(bytesPerChecksum > 0, 066 "bytes per checksum should be positive but was %s", 067 bytesPerChecksum); 068 } 069 } 070 071 /** 072 * Set whether to verify checksum. 073 */ 074 @Override 075 public void setVerifyChecksum(boolean verifyChecksum) { 076 this.verifyChecksum = verifyChecksum; 077 } 078 079 @Override 080 public void setWriteChecksum(boolean writeChecksum) { 081 this.writeChecksum = writeChecksum; 082 } 083 084 /** get the raw file system */ 085 @Override 086 public FileSystem getRawFileSystem() { 087 return fs; 088 } 089 090 /** Return the name of the checksum file associated with a file.*/ 091 public Path getChecksumFile(Path file) { 092 return new Path(file.getParent(), "." + file.getName() + ".crc"); 093 } 094 095 /** Return true iff file is a checksum file name.*/ 096 public static boolean isChecksumFile(Path file) { 097 String name = file.getName(); 098 return name.startsWith(".") && name.endsWith(".crc"); 099 } 100 101 /** Return the length of the checksum file given the size of the 102 * actual file. 103 **/ 104 public long getChecksumFileLength(Path file, long fileSize) { 105 return getChecksumLength(fileSize, getBytesPerSum()); 106 } 107 108 /** Return the bytes Per Checksum */ 109 public int getBytesPerSum() { 110 return bytesPerChecksum; 111 } 112 113 private int getSumBufferSize(int bytesPerSum, int bufferSize) { 114 int defaultBufferSize = getConf().getInt( 115 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 116 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT); 117 int proportionalBufferSize = bufferSize / bytesPerSum; 118 return Math.max(bytesPerSum, 119 Math.max(proportionalBufferSize, defaultBufferSize)); 120 } 121 122 /******************************************************* 123 * For open()'s FSInputStream 124 * It verifies that data matches checksums. 125 *******************************************************/ 126 private static class ChecksumFSInputChecker extends FSInputChecker { 127 private ChecksumFileSystem fs; 128 private FSDataInputStream datas; 129 private FSDataInputStream sums; 130 131 private static final int HEADER_LENGTH = 8; 132 133 private int bytesPerSum = 1; 134 135 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) 136 throws IOException { 137 this(fs, file, fs.getConf().getInt( 138 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 139 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT)); 140 } 141 142 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) 143 throws IOException { 144 super( file, fs.getFileStatus(file).getReplication() ); 145 this.datas = fs.getRawFileSystem().open(file, bufferSize); 146 this.fs = fs; 147 Path sumFile = fs.getChecksumFile(file); 148 try { 149 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize); 150 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize); 151 152 byte[] version = new byte[CHECKSUM_VERSION.length]; 153 sums.readFully(version); 154 if (!Arrays.equals(version, CHECKSUM_VERSION)) 155 throw new IOException("Not a checksum file: "+sumFile); 156 this.bytesPerSum = sums.readInt(); 157 set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4); 158 } catch (FileNotFoundException e) { // quietly ignore 159 set(fs.verifyChecksum, null, 1, 0); 160 } catch (IOException e) { // loudly ignore 161 LOG.warn("Problem opening checksum file: "+ file + 162 ". Ignoring exception: " , e); 163 set(fs.verifyChecksum, null, 1, 0); 164 } 165 } 166 167 private long getChecksumFilePos( long dataPos ) { 168 return HEADER_LENGTH + 4*(dataPos/bytesPerSum); 169 } 170 171 @Override 172 protected long getChunkPosition( long dataPos ) { 173 return dataPos/bytesPerSum*bytesPerSum; 174 } 175 176 @Override 177 public int available() throws IOException { 178 return datas.available() + super.available(); 179 } 180 181 @Override 182 public int read(long position, byte[] b, int off, int len) 183 throws IOException { 184 // parameter check 185 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 186 throw new IndexOutOfBoundsException(); 187 } else if (len == 0) { 188 return 0; 189 } 190 if( position<0 ) { 191 throw new IllegalArgumentException( 192 "Parameter position can not to be negative"); 193 } 194 195 ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file); 196 checker.seek(position); 197 int nread = checker.read(b, off, len); 198 checker.close(); 199 return nread; 200 } 201 202 @Override 203 public void close() throws IOException { 204 datas.close(); 205 if( sums != null ) { 206 sums.close(); 207 } 208 set(fs.verifyChecksum, null, 1, 0); 209 } 210 211 212 @Override 213 public boolean seekToNewSource(long targetPos) throws IOException { 214 long sumsPos = getChecksumFilePos(targetPos); 215 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos); 216 boolean newDataSource = datas.seekToNewSource(targetPos); 217 return sums.seekToNewSource(sumsPos) || newDataSource; 218 } 219 220 @Override 221 protected int readChunk(long pos, byte[] buf, int offset, int len, 222 byte[] checksum) throws IOException { 223 224 boolean eof = false; 225 if (needChecksum()) { 226 assert checksum != null; // we have a checksum buffer 227 assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length 228 assert len >= bytesPerSum; // we must read at least one chunk 229 230 final int checksumsToRead = Math.min( 231 len/bytesPerSum, // number of checksums based on len to read 232 checksum.length / CHECKSUM_SIZE); // size of checksum buffer 233 long checksumPos = getChecksumFilePos(pos); 234 if(checksumPos != sums.getPos()) { 235 sums.seek(checksumPos); 236 } 237 238 int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead); 239 if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) { 240 throw new ChecksumException( 241 "Checksum file not a length multiple of checksum size " + 242 "in " + file + " at " + pos + " checksumpos: " + checksumPos + 243 " sumLenread: " + sumLenRead, 244 pos); 245 } 246 if (sumLenRead <= 0) { // we're at the end of the file 247 eof = true; 248 } else { 249 // Adjust amount of data to read based on how many checksum chunks we read 250 len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE)); 251 } 252 } 253 if(pos != datas.getPos()) { 254 datas.seek(pos); 255 } 256 int nread = readFully(datas, buf, offset, len); 257 if (eof && nread > 0) { 258 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos); 259 } 260 return nread; 261 } 262 } 263 264 private static class FSDataBoundedInputStream extends FSDataInputStream { 265 private FileSystem fs; 266 private Path file; 267 private long fileLen = -1L; 268 269 FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) { 270 super(in); 271 this.fs = fs; 272 this.file = file; 273 } 274 275 @Override 276 public boolean markSupported() { 277 return false; 278 } 279 280 /* Return the file length */ 281 private long getFileLength() throws IOException { 282 if( fileLen==-1L ) { 283 fileLen = fs.getContentSummary(file).getLength(); 284 } 285 return fileLen; 286 } 287 288 /** 289 * Skips over and discards <code>n</code> bytes of data from the 290 * input stream. 291 * 292 *The <code>skip</code> method skips over some smaller number of bytes 293 * when reaching end of file before <code>n</code> bytes have been skipped. 294 * The actual number of bytes skipped is returned. If <code>n</code> is 295 * negative, no bytes are skipped. 296 * 297 * @param n the number of bytes to be skipped. 298 * @return the actual number of bytes skipped. 299 * @exception IOException if an I/O error occurs. 300 * ChecksumException if the chunk to skip to is corrupted 301 */ 302 @Override 303 public synchronized long skip(long n) throws IOException { 304 long curPos = getPos(); 305 long fileLength = getFileLength(); 306 if( n+curPos > fileLength ) { 307 n = fileLength - curPos; 308 } 309 return super.skip(n); 310 } 311 312 /** 313 * Seek to the given position in the stream. 314 * The next read() will be from that position. 315 * 316 * <p>This method does not allow seek past the end of the file. 317 * This produces IOException. 318 * 319 * @param pos the postion to seek to. 320 * @exception IOException if an I/O error occurs or seeks after EOF 321 * ChecksumException if the chunk to seek to is corrupted 322 */ 323 324 @Override 325 public synchronized void seek(long pos) throws IOException { 326 if (pos > getFileLength()) { 327 throw new EOFException("Cannot seek after EOF"); 328 } 329 super.seek(pos); 330 } 331 332 } 333 334 /** 335 * Opens an FSDataInputStream at the indicated Path. 336 * @param f the file name to open 337 * @param bufferSize the size of the buffer to be used. 338 */ 339 @Override 340 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 341 FileSystem fs; 342 InputStream in; 343 if (verifyChecksum) { 344 fs = this; 345 in = new ChecksumFSInputChecker(this, f, bufferSize); 346 } else { 347 fs = getRawFileSystem(); 348 in = fs.open(f, bufferSize); 349 } 350 return new FSDataBoundedInputStream(fs, f, in); 351 } 352 353 @Override 354 public FSDataOutputStream append(Path f, int bufferSize, 355 Progressable progress) throws IOException { 356 throw new IOException("Not supported"); 357 } 358 359 @Override 360 public boolean truncate(Path f, long newLength) throws IOException { 361 throw new IOException("Not supported"); 362 } 363 364 /** 365 * Calculated the length of the checksum file in bytes. 366 * @param size the length of the data file in bytes 367 * @param bytesPerSum the number of bytes in a checksum block 368 * @return the number of bytes in the checksum file 369 */ 370 public static long getChecksumLength(long size, int bytesPerSum) { 371 //the checksum length is equal to size passed divided by bytesPerSum + 372 //bytes written in the beginning of the checksum file. 373 return ((size + bytesPerSum - 1) / bytesPerSum) * 4 + 374 CHECKSUM_VERSION.length + 4; 375 } 376 377 /** This class provides an output stream for a checksummed file. 378 * It generates checksums for data. */ 379 private static class ChecksumFSOutputSummer extends FSOutputSummer { 380 private FSDataOutputStream datas; 381 private FSDataOutputStream sums; 382 private static final float CHKSUM_AS_FRACTION = 0.01f; 383 private boolean isClosed = false; 384 385 public ChecksumFSOutputSummer(ChecksumFileSystem fs, 386 Path file, 387 boolean overwrite, 388 int bufferSize, 389 short replication, 390 long blockSize, 391 Progressable progress, 392 FsPermission permission) 393 throws IOException { 394 super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 395 fs.getBytesPerSum())); 396 int bytesPerSum = fs.getBytesPerSum(); 397 this.datas = fs.getRawFileSystem().create(file, permission, overwrite, 398 bufferSize, replication, blockSize, 399 progress); 400 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize); 401 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), 402 permission, true, sumBufferSize, 403 replication, blockSize, null); 404 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length); 405 sums.writeInt(bytesPerSum); 406 } 407 408 @Override 409 public void close() throws IOException { 410 try { 411 flushBuffer(); 412 sums.close(); 413 datas.close(); 414 } finally { 415 isClosed = true; 416 } 417 } 418 419 @Override 420 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum, 421 int ckoff, int cklen) 422 throws IOException { 423 datas.write(b, offset, len); 424 sums.write(checksum, ckoff, cklen); 425 } 426 427 @Override 428 protected void checkClosed() throws IOException { 429 if (isClosed) { 430 throw new ClosedChannelException(); 431 } 432 } 433 } 434 435 @Override 436 public FSDataOutputStream create(Path f, FsPermission permission, 437 boolean overwrite, int bufferSize, short replication, long blockSize, 438 Progressable progress) throws IOException { 439 return create(f, permission, overwrite, true, bufferSize, 440 replication, blockSize, progress); 441 } 442 443 private FSDataOutputStream create(Path f, FsPermission permission, 444 boolean overwrite, boolean createParent, int bufferSize, 445 short replication, long blockSize, 446 Progressable progress) throws IOException { 447 Path parent = f.getParent(); 448 if (parent != null) { 449 if (!createParent && !exists(parent)) { 450 throw new FileNotFoundException("Parent directory doesn't exist: " 451 + parent); 452 } else if (!mkdirs(parent)) { 453 throw new IOException("Mkdirs failed to create " + parent 454 + " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory() 455 + ")"); 456 } 457 } 458 final FSDataOutputStream out; 459 if (writeChecksum) { 460 out = new FSDataOutputStream( 461 new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication, 462 blockSize, progress, permission), null); 463 } else { 464 out = fs.create(f, permission, overwrite, bufferSize, replication, 465 blockSize, progress); 466 // remove the checksum file since we aren't writing one 467 Path checkFile = getChecksumFile(f); 468 if (fs.exists(checkFile)) { 469 fs.delete(checkFile, true); 470 } 471 } 472 return out; 473 } 474 475 @Override 476 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 477 boolean overwrite, int bufferSize, short replication, long blockSize, 478 Progressable progress) throws IOException { 479 return create(f, permission, overwrite, false, bufferSize, replication, 480 blockSize, progress); 481 } 482 483 /** 484 * Set replication for an existing file. 485 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt> 486 * @param src file name 487 * @param replication new replication 488 * @throws IOException 489 * @return true if successful; 490 * false if file does not exist or is a directory 491 */ 492 @Override 493 public boolean setReplication(Path src, short replication) throws IOException { 494 boolean value = fs.setReplication(src, replication); 495 if (!value) 496 return false; 497 498 Path checkFile = getChecksumFile(src); 499 if (exists(checkFile)) 500 fs.setReplication(checkFile, replication); 501 502 return true; 503 } 504 505 /** 506 * Rename files/dirs 507 */ 508 @Override 509 public boolean rename(Path src, Path dst) throws IOException { 510 if (fs.isDirectory(src)) { 511 return fs.rename(src, dst); 512 } else { 513 if (fs.isDirectory(dst)) { 514 dst = new Path(dst, src.getName()); 515 } 516 517 boolean value = fs.rename(src, dst); 518 if (!value) 519 return false; 520 521 Path srcCheckFile = getChecksumFile(src); 522 Path dstCheckFile = getChecksumFile(dst); 523 if (fs.exists(srcCheckFile)) { //try to rename checksum 524 value = fs.rename(srcCheckFile, dstCheckFile); 525 } else if (fs.exists(dstCheckFile)) { 526 // no src checksum, so remove dst checksum 527 value = fs.delete(dstCheckFile, true); 528 } 529 530 return value; 531 } 532 } 533 534 /** 535 * Implement the delete(Path, boolean) in checksum 536 * file system. 537 */ 538 @Override 539 public boolean delete(Path f, boolean recursive) throws IOException{ 540 FileStatus fstatus = null; 541 try { 542 fstatus = fs.getFileStatus(f); 543 } catch(FileNotFoundException e) { 544 return false; 545 } 546 if (fstatus.isDirectory()) { 547 //this works since the crcs are in the same 548 //directories and the files. so we just delete 549 //everything in the underlying filesystem 550 return fs.delete(f, recursive); 551 } else { 552 Path checkFile = getChecksumFile(f); 553 if (fs.exists(checkFile)) { 554 fs.delete(checkFile, true); 555 } 556 return fs.delete(f, true); 557 } 558 } 559 560 final private static PathFilter DEFAULT_FILTER = new PathFilter() { 561 @Override 562 public boolean accept(Path file) { 563 return !isChecksumFile(file); 564 } 565 }; 566 567 /** 568 * List the statuses of the files/directories in the given path if the path is 569 * a directory. 570 * 571 * @param f 572 * given path 573 * @return the statuses of the files/directories in the given path 574 * @throws IOException 575 */ 576 @Override 577 public FileStatus[] listStatus(Path f) throws IOException { 578 return fs.listStatus(f, DEFAULT_FILTER); 579 } 580 581 /** 582 * List the statuses of the files/directories in the given path if the path is 583 * a directory. 584 * 585 * @param f 586 * given path 587 * @return the statuses of the files/directories in the given patch 588 * @throws IOException 589 */ 590 @Override 591 public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) 592 throws IOException { 593 return fs.listLocatedStatus(f, DEFAULT_FILTER); 594 } 595 596 @Override 597 public boolean mkdirs(Path f) throws IOException { 598 return fs.mkdirs(f); 599 } 600 601 @Override 602 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) 603 throws IOException { 604 Configuration conf = getConf(); 605 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf); 606 } 607 608 /** 609 * The src file is under FS, and the dst is on the local disk. 610 * Copy it from FS control to the local dst name. 611 */ 612 @Override 613 public void copyToLocalFile(boolean delSrc, Path src, Path dst) 614 throws IOException { 615 Configuration conf = getConf(); 616 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf); 617 } 618 619 /** 620 * The src file is under FS, and the dst is on the local disk. 621 * Copy it from FS control to the local dst name. 622 * If src and dst are directories, the copyCrc parameter 623 * determines whether to copy CRC files. 624 */ 625 public void copyToLocalFile(Path src, Path dst, boolean copyCrc) 626 throws IOException { 627 if (!fs.isDirectory(src)) { // source is a file 628 fs.copyToLocalFile(src, dst); 629 FileSystem localFs = getLocal(getConf()).getRawFileSystem(); 630 if (localFs.isDirectory(dst)) { 631 dst = new Path(dst, src.getName()); 632 } 633 dst = getChecksumFile(dst); 634 if (localFs.exists(dst)) { //remove old local checksum file 635 localFs.delete(dst, true); 636 } 637 Path checksumFile = getChecksumFile(src); 638 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file 639 fs.copyToLocalFile(checksumFile, dst); 640 } 641 } else { 642 FileStatus[] srcs = listStatus(src); 643 for (FileStatus srcFile : srcs) { 644 copyToLocalFile(srcFile.getPath(), 645 new Path(dst, srcFile.getPath().getName()), copyCrc); 646 } 647 } 648 } 649 650 @Override 651 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 652 throws IOException { 653 return tmpLocalFile; 654 } 655 656 @Override 657 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 658 throws IOException { 659 moveFromLocalFile(tmpLocalFile, fsOutputFile); 660 } 661 662 /** 663 * Report a checksum error to the file system. 664 * @param f the file name containing the error 665 * @param in the stream open on the file 666 * @param inPos the position of the beginning of the bad data in the file 667 * @param sums the stream open on the checksum file 668 * @param sumsPos the position of the beginning of the bad data in the checksum file 669 * @return if retry is necessary 670 */ 671 public boolean reportChecksumFailure(Path f, FSDataInputStream in, 672 long inPos, FSDataInputStream sums, long sumsPos) { 673 return false; 674 } 675}