001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs; 020 021import java.io.EOFException; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InputStream; 025import java.nio.channels.ClosedChannelException; 026import java.util.Arrays; 027import java.util.List; 028 029import com.google.common.base.Preconditions; 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.permission.AclEntry; 034import org.apache.hadoop.fs.permission.FsPermission; 035import org.apache.hadoop.util.DataChecksum; 036import org.apache.hadoop.util.Progressable; 037 038/**************************************************************** 039 * Abstract Checksumed FileSystem. 040 * It provide a basic implementation of a Checksumed FileSystem, 041 * which creates a checksum file for each raw file. 042 * It generates & verifies checksums at the client side. 043 * 044 *****************************************************************/ 045@InterfaceAudience.Public 046@InterfaceStability.Stable 047public abstract class ChecksumFileSystem extends FilterFileSystem { 048 private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0}; 049 private int bytesPerChecksum = 512; 050 private boolean verifyChecksum = true; 051 private boolean writeChecksum = true; 052 053 public static double getApproxChkSumLength(long size) { 054 return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size; 055 } 056 057 public ChecksumFileSystem(FileSystem fs) { 058 super(fs); 059 } 060 061 @Override 062 public void setConf(Configuration conf) { 063 super.setConf(conf); 064 if (conf != null) { 065 bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY, 066 LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT); 067 Preconditions.checkState(bytesPerChecksum > 0, 068 "bytes per checksum should be positive but was %s", 069 bytesPerChecksum); 070 } 071 } 072 073 /** 074 * Set whether to verify checksum. 075 */ 076 @Override 077 public void setVerifyChecksum(boolean verifyChecksum) { 078 this.verifyChecksum = verifyChecksum; 079 } 080 081 @Override 082 public void setWriteChecksum(boolean writeChecksum) { 083 this.writeChecksum = writeChecksum; 084 } 085 086 /** get the raw file system */ 087 @Override 088 public FileSystem getRawFileSystem() { 089 return fs; 090 } 091 092 /** Return the name of the checksum file associated with a file.*/ 093 public Path getChecksumFile(Path file) { 094 return new Path(file.getParent(), "." + file.getName() + ".crc"); 095 } 096 097 /** Return true iff file is a checksum file name.*/ 098 public static boolean isChecksumFile(Path file) { 099 String name = file.getName(); 100 return name.startsWith(".") && name.endsWith(".crc"); 101 } 102 103 /** Return the length of the checksum file given the size of the 104 * actual file. 105 **/ 106 public long getChecksumFileLength(Path file, long fileSize) { 107 return getChecksumLength(fileSize, getBytesPerSum()); 108 } 109 110 /** Return the bytes Per Checksum */ 111 public int getBytesPerSum() { 112 return bytesPerChecksum; 113 } 114 115 private int getSumBufferSize(int bytesPerSum, int bufferSize) { 116 int defaultBufferSize = getConf().getInt( 117 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 118 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT); 119 int proportionalBufferSize = bufferSize / bytesPerSum; 120 return Math.max(bytesPerSum, 121 Math.max(proportionalBufferSize, defaultBufferSize)); 122 } 123 124 /******************************************************* 125 * For open()'s FSInputStream 126 * It verifies that data matches checksums. 127 *******************************************************/ 128 private static class ChecksumFSInputChecker extends FSInputChecker { 129 private ChecksumFileSystem fs; 130 private FSDataInputStream datas; 131 private FSDataInputStream sums; 132 133 private static final int HEADER_LENGTH = 8; 134 135 private int bytesPerSum = 1; 136 137 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) 138 throws IOException { 139 this(fs, file, fs.getConf().getInt( 140 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 141 LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT)); 142 } 143 144 public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) 145 throws IOException { 146 super( file, fs.getFileStatus(file).getReplication() ); 147 this.datas = fs.getRawFileSystem().open(file, bufferSize); 148 this.fs = fs; 149 Path sumFile = fs.getChecksumFile(file); 150 try { 151 int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize); 152 sums = fs.getRawFileSystem().open(sumFile, sumBufferSize); 153 154 byte[] version = new byte[CHECKSUM_VERSION.length]; 155 sums.readFully(version); 156 if (!Arrays.equals(version, CHECKSUM_VERSION)) 157 throw new IOException("Not a checksum file: "+sumFile); 158 this.bytesPerSum = sums.readInt(); 159 set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4); 160 } catch (IOException e) { 161 // mincing the message is terrible, but java throws permission 162 // exceptions as FNF because that's all the method signatures allow! 163 if (!(e instanceof FileNotFoundException) || 164 e.getMessage().endsWith(" (Permission denied)")) { 165 LOG.warn("Problem opening checksum file: "+ file + 166 ". Ignoring exception: " , e); 167 } 168 set(fs.verifyChecksum, null, 1, 0); 169 } 170 } 171 172 private long getChecksumFilePos( long dataPos ) { 173 return HEADER_LENGTH + 4*(dataPos/bytesPerSum); 174 } 175 176 @Override 177 protected long getChunkPosition( long dataPos ) { 178 return dataPos/bytesPerSum*bytesPerSum; 179 } 180 181 @Override 182 public int available() throws IOException { 183 return datas.available() + super.available(); 184 } 185 186 @Override 187 public int read(long position, byte[] b, int off, int len) 188 throws IOException { 189 // parameter check 190 validatePositionedReadArgs(position, b, off, len); 191 if (len == 0) { 192 return 0; 193 } 194 195 int nread; 196 try (ChecksumFSInputChecker checker = 197 new ChecksumFSInputChecker(fs, file)) { 198 checker.seek(position); 199 nread = checker.read(b, off, len); 200 checker.close(); 201 } 202 return nread; 203 } 204 205 @Override 206 public void close() throws IOException { 207 datas.close(); 208 if( sums != null ) { 209 sums.close(); 210 } 211 set(fs.verifyChecksum, null, 1, 0); 212 } 213 214 215 @Override 216 public boolean seekToNewSource(long targetPos) throws IOException { 217 long sumsPos = getChecksumFilePos(targetPos); 218 fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos); 219 boolean newDataSource = datas.seekToNewSource(targetPos); 220 return sums.seekToNewSource(sumsPos) || newDataSource; 221 } 222 223 @Override 224 protected int readChunk(long pos, byte[] buf, int offset, int len, 225 byte[] checksum) throws IOException { 226 227 boolean eof = false; 228 if (needChecksum()) { 229 assert checksum != null; // we have a checksum buffer 230 assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length 231 assert len >= bytesPerSum; // we must read at least one chunk 232 233 final int checksumsToRead = Math.min( 234 len/bytesPerSum, // number of checksums based on len to read 235 checksum.length / CHECKSUM_SIZE); // size of checksum buffer 236 long checksumPos = getChecksumFilePos(pos); 237 if(checksumPos != sums.getPos()) { 238 sums.seek(checksumPos); 239 } 240 241 int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead); 242 if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) { 243 throw new ChecksumException( 244 "Checksum file not a length multiple of checksum size " + 245 "in " + file + " at " + pos + " checksumpos: " + checksumPos + 246 " sumLenread: " + sumLenRead, 247 pos); 248 } 249 if (sumLenRead <= 0) { // we're at the end of the file 250 eof = true; 251 } else { 252 // Adjust amount of data to read based on how many checksum chunks we read 253 len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE)); 254 } 255 } 256 if(pos != datas.getPos()) { 257 datas.seek(pos); 258 } 259 int nread = readFully(datas, buf, offset, len); 260 if (eof && nread > 0) { 261 throw new ChecksumException("Checksum error: "+file+" at "+pos, pos); 262 } 263 return nread; 264 } 265 } 266 267 private static class FSDataBoundedInputStream extends FSDataInputStream { 268 private FileSystem fs; 269 private Path file; 270 private long fileLen = -1L; 271 272 FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) { 273 super(in); 274 this.fs = fs; 275 this.file = file; 276 } 277 278 @Override 279 public boolean markSupported() { 280 return false; 281 } 282 283 /* Return the file length */ 284 private long getFileLength() throws IOException { 285 if( fileLen==-1L ) { 286 fileLen = fs.getContentSummary(file).getLength(); 287 } 288 return fileLen; 289 } 290 291 /** 292 * Skips over and discards <code>n</code> bytes of data from the 293 * input stream. 294 * 295 *The <code>skip</code> method skips over some smaller number of bytes 296 * when reaching end of file before <code>n</code> bytes have been skipped. 297 * The actual number of bytes skipped is returned. If <code>n</code> is 298 * negative, no bytes are skipped. 299 * 300 * @param n the number of bytes to be skipped. 301 * @return the actual number of bytes skipped. 302 * @exception IOException if an I/O error occurs. 303 * ChecksumException if the chunk to skip to is corrupted 304 */ 305 @Override 306 public synchronized long skip(long n) throws IOException { 307 long curPos = getPos(); 308 long fileLength = getFileLength(); 309 if( n+curPos > fileLength ) { 310 n = fileLength - curPos; 311 } 312 return super.skip(n); 313 } 314 315 /** 316 * Seek to the given position in the stream. 317 * The next read() will be from that position. 318 * 319 * <p>This method does not allow seek past the end of the file. 320 * This produces IOException. 321 * 322 * @param pos the postion to seek to. 323 * @exception IOException if an I/O error occurs or seeks after EOF 324 * ChecksumException if the chunk to seek to is corrupted 325 */ 326 327 @Override 328 public synchronized void seek(long pos) throws IOException { 329 if (pos > getFileLength()) { 330 throw new EOFException("Cannot seek after EOF"); 331 } 332 super.seek(pos); 333 } 334 335 } 336 337 /** 338 * Opens an FSDataInputStream at the indicated Path. 339 * @param f the file name to open 340 * @param bufferSize the size of the buffer to be used. 341 */ 342 @Override 343 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 344 FileSystem fs; 345 InputStream in; 346 if (verifyChecksum) { 347 fs = this; 348 in = new ChecksumFSInputChecker(this, f, bufferSize); 349 } else { 350 fs = getRawFileSystem(); 351 in = fs.open(f, bufferSize); 352 } 353 return new FSDataBoundedInputStream(fs, f, in); 354 } 355 356 @Override 357 public FSDataOutputStream append(Path f, int bufferSize, 358 Progressable progress) throws IOException { 359 throw new IOException("Not supported"); 360 } 361 362 @Override 363 public boolean truncate(Path f, long newLength) throws IOException { 364 throw new IOException("Not supported"); 365 } 366 367 /** 368 * Calculated the length of the checksum file in bytes. 369 * @param size the length of the data file in bytes 370 * @param bytesPerSum the number of bytes in a checksum block 371 * @return the number of bytes in the checksum file 372 */ 373 public static long getChecksumLength(long size, int bytesPerSum) { 374 //the checksum length is equal to size passed divided by bytesPerSum + 375 //bytes written in the beginning of the checksum file. 376 return ((size + bytesPerSum - 1) / bytesPerSum) * 4 + 377 CHECKSUM_VERSION.length + 4; 378 } 379 380 /** This class provides an output stream for a checksummed file. 381 * It generates checksums for data. */ 382 private static class ChecksumFSOutputSummer extends FSOutputSummer { 383 private FSDataOutputStream datas; 384 private FSDataOutputStream sums; 385 private static final float CHKSUM_AS_FRACTION = 0.01f; 386 private boolean isClosed = false; 387 388 public ChecksumFSOutputSummer(ChecksumFileSystem fs, 389 Path file, 390 boolean overwrite, 391 int bufferSize, 392 short replication, 393 long blockSize, 394 Progressable progress, 395 FsPermission permission) 396 throws IOException { 397 super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 398 fs.getBytesPerSum())); 399 int bytesPerSum = fs.getBytesPerSum(); 400 this.datas = fs.getRawFileSystem().create(file, permission, overwrite, 401 bufferSize, replication, blockSize, 402 progress); 403 int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize); 404 this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), 405 permission, true, sumBufferSize, 406 replication, blockSize, null); 407 sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length); 408 sums.writeInt(bytesPerSum); 409 } 410 411 @Override 412 public void close() throws IOException { 413 try { 414 flushBuffer(); 415 sums.close(); 416 datas.close(); 417 } finally { 418 isClosed = true; 419 } 420 } 421 422 @Override 423 protected void writeChunk(byte[] b, int offset, int len, byte[] checksum, 424 int ckoff, int cklen) 425 throws IOException { 426 datas.write(b, offset, len); 427 sums.write(checksum, ckoff, cklen); 428 } 429 430 @Override 431 protected void checkClosed() throws IOException { 432 if (isClosed) { 433 throw new ClosedChannelException(); 434 } 435 } 436 } 437 438 @Override 439 public FSDataOutputStream create(Path f, FsPermission permission, 440 boolean overwrite, int bufferSize, short replication, long blockSize, 441 Progressable progress) throws IOException { 442 return create(f, permission, overwrite, true, bufferSize, 443 replication, blockSize, progress); 444 } 445 446 private FSDataOutputStream create(Path f, FsPermission permission, 447 boolean overwrite, boolean createParent, int bufferSize, 448 short replication, long blockSize, 449 Progressable progress) throws IOException { 450 Path parent = f.getParent(); 451 if (parent != null) { 452 if (!createParent && !exists(parent)) { 453 throw new FileNotFoundException("Parent directory doesn't exist: " 454 + parent); 455 } else if (!mkdirs(parent)) { 456 throw new IOException("Mkdirs failed to create " + parent 457 + " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory() 458 + ")"); 459 } 460 } 461 final FSDataOutputStream out; 462 if (writeChecksum) { 463 out = new FSDataOutputStream( 464 new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication, 465 blockSize, progress, permission), null); 466 } else { 467 out = fs.create(f, permission, overwrite, bufferSize, replication, 468 blockSize, progress); 469 // remove the checksum file since we aren't writing one 470 Path checkFile = getChecksumFile(f); 471 if (fs.exists(checkFile)) { 472 fs.delete(checkFile, true); 473 } 474 } 475 return out; 476 } 477 478 @Override 479 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 480 boolean overwrite, int bufferSize, short replication, long blockSize, 481 Progressable progress) throws IOException { 482 return create(f, permission, overwrite, false, bufferSize, replication, 483 blockSize, progress); 484 } 485 486 abstract class FsOperation { 487 boolean run(Path p) throws IOException { 488 boolean status = apply(p); 489 if (status) { 490 Path checkFile = getChecksumFile(p); 491 if (fs.exists(checkFile)) { 492 apply(checkFile); 493 } 494 } 495 return status; 496 } 497 abstract boolean apply(Path p) throws IOException; 498 } 499 500 501 @Override 502 public void setPermission(Path src, final FsPermission permission) 503 throws IOException { 504 new FsOperation(){ 505 @Override 506 boolean apply(Path p) throws IOException { 507 fs.setPermission(p, permission); 508 return true; 509 } 510 }.run(src); 511 } 512 513 @Override 514 public void setOwner(Path src, final String username, final String groupname) 515 throws IOException { 516 new FsOperation(){ 517 @Override 518 boolean apply(Path p) throws IOException { 519 fs.setOwner(p, username, groupname); 520 return true; 521 } 522 }.run(src); 523 } 524 525 @Override 526 public void setAcl(Path src, final List<AclEntry> aclSpec) 527 throws IOException { 528 new FsOperation(){ 529 @Override 530 boolean apply(Path p) throws IOException { 531 fs.setAcl(p, aclSpec); 532 return true; 533 } 534 }.run(src); 535 } 536 537 @Override 538 public void modifyAclEntries(Path src, final List<AclEntry> aclSpec) 539 throws IOException { 540 new FsOperation(){ 541 @Override 542 boolean apply(Path p) throws IOException { 543 fs.modifyAclEntries(p, aclSpec); 544 return true; 545 } 546 }.run(src); 547 } 548 549 @Override 550 public void removeAcl(Path src) throws IOException { 551 new FsOperation(){ 552 @Override 553 boolean apply(Path p) throws IOException { 554 fs.removeAcl(p); 555 return true; 556 } 557 }.run(src); 558 } 559 560 @Override 561 public void removeAclEntries(Path src, final List<AclEntry> aclSpec) 562 throws IOException { 563 new FsOperation(){ 564 @Override 565 boolean apply(Path p) throws IOException { 566 fs.removeAclEntries(p, aclSpec); 567 return true; 568 } 569 }.run(src); 570 } 571 572 @Override 573 public void removeDefaultAcl(Path src) throws IOException { 574 new FsOperation(){ 575 @Override 576 boolean apply(Path p) throws IOException { 577 fs.removeDefaultAcl(p); 578 return true; 579 } 580 }.run(src); 581 } 582 583 /** 584 * Set replication for an existing file. 585 * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt> 586 * @param src file name 587 * @param replication new replication 588 * @throws IOException 589 * @return true if successful; 590 * false if file does not exist or is a directory 591 */ 592 @Override 593 public boolean setReplication(Path src, final short replication) 594 throws IOException { 595 return new FsOperation(){ 596 @Override 597 boolean apply(Path p) throws IOException { 598 return fs.setReplication(p, replication); 599 } 600 }.run(src); 601 } 602 603 /** 604 * Rename files/dirs 605 */ 606 @Override 607 public boolean rename(Path src, Path dst) throws IOException { 608 if (fs.isDirectory(src)) { 609 return fs.rename(src, dst); 610 } else { 611 if (fs.isDirectory(dst)) { 612 dst = new Path(dst, src.getName()); 613 } 614 615 boolean value = fs.rename(src, dst); 616 if (!value) 617 return false; 618 619 Path srcCheckFile = getChecksumFile(src); 620 Path dstCheckFile = getChecksumFile(dst); 621 if (fs.exists(srcCheckFile)) { //try to rename checksum 622 value = fs.rename(srcCheckFile, dstCheckFile); 623 } else if (fs.exists(dstCheckFile)) { 624 // no src checksum, so remove dst checksum 625 value = fs.delete(dstCheckFile, true); 626 } 627 628 return value; 629 } 630 } 631 632 /** 633 * Implement the delete(Path, boolean) in checksum 634 * file system. 635 */ 636 @Override 637 public boolean delete(Path f, boolean recursive) throws IOException{ 638 FileStatus fstatus = null; 639 try { 640 fstatus = fs.getFileStatus(f); 641 } catch(FileNotFoundException e) { 642 return false; 643 } 644 if (fstatus.isDirectory()) { 645 //this works since the crcs are in the same 646 //directories and the files. so we just delete 647 //everything in the underlying filesystem 648 return fs.delete(f, recursive); 649 } else { 650 Path checkFile = getChecksumFile(f); 651 if (fs.exists(checkFile)) { 652 fs.delete(checkFile, true); 653 } 654 return fs.delete(f, true); 655 } 656 } 657 658 final private static PathFilter DEFAULT_FILTER = new PathFilter() { 659 @Override 660 public boolean accept(Path file) { 661 return !isChecksumFile(file); 662 } 663 }; 664 665 /** 666 * List the statuses of the files/directories in the given path if the path is 667 * a directory. 668 * 669 * @param f 670 * given path 671 * @return the statuses of the files/directories in the given path 672 * @throws IOException 673 */ 674 @Override 675 public FileStatus[] listStatus(Path f) throws IOException { 676 return fs.listStatus(f, DEFAULT_FILTER); 677 } 678 679 /** 680 * List the statuses of the files/directories in the given path if the path is 681 * a directory. 682 * 683 * @param f 684 * given path 685 * @return the statuses of the files/directories in the given patch 686 * @throws IOException 687 */ 688 @Override 689 public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) 690 throws IOException { 691 return fs.listLocatedStatus(f, DEFAULT_FILTER); 692 } 693 694 @Override 695 public boolean mkdirs(Path f) throws IOException { 696 return fs.mkdirs(f); 697 } 698 699 @Override 700 public void copyFromLocalFile(boolean delSrc, Path src, Path dst) 701 throws IOException { 702 Configuration conf = getConf(); 703 FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf); 704 } 705 706 /** 707 * The src file is under FS, and the dst is on the local disk. 708 * Copy it from FS control to the local dst name. 709 */ 710 @Override 711 public void copyToLocalFile(boolean delSrc, Path src, Path dst) 712 throws IOException { 713 Configuration conf = getConf(); 714 FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf); 715 } 716 717 /** 718 * The src file is under FS, and the dst is on the local disk. 719 * Copy it from FS control to the local dst name. 720 * If src and dst are directories, the copyCrc parameter 721 * determines whether to copy CRC files. 722 */ 723 public void copyToLocalFile(Path src, Path dst, boolean copyCrc) 724 throws IOException { 725 if (!fs.isDirectory(src)) { // source is a file 726 fs.copyToLocalFile(src, dst); 727 FileSystem localFs = getLocal(getConf()).getRawFileSystem(); 728 if (localFs.isDirectory(dst)) { 729 dst = new Path(dst, src.getName()); 730 } 731 dst = getChecksumFile(dst); 732 if (localFs.exists(dst)) { //remove old local checksum file 733 localFs.delete(dst, true); 734 } 735 Path checksumFile = getChecksumFile(src); 736 if (copyCrc && fs.exists(checksumFile)) { //copy checksum file 737 fs.copyToLocalFile(checksumFile, dst); 738 } 739 } else { 740 FileStatus[] srcs = listStatus(src); 741 for (FileStatus srcFile : srcs) { 742 copyToLocalFile(srcFile.getPath(), 743 new Path(dst, srcFile.getPath().getName()), copyCrc); 744 } 745 } 746 } 747 748 @Override 749 public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) 750 throws IOException { 751 return tmpLocalFile; 752 } 753 754 @Override 755 public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) 756 throws IOException { 757 moveFromLocalFile(tmpLocalFile, fsOutputFile); 758 } 759 760 /** 761 * Report a checksum error to the file system. 762 * @param f the file name containing the error 763 * @param in the stream open on the file 764 * @param inPos the position of the beginning of the bad data in the file 765 * @param sums the stream open on the checksum file 766 * @param sumsPos the position of the beginning of the bad data in the checksum file 767 * @return if retry is necessary 768 */ 769 public boolean reportChecksumFailure(Path f, FSDataInputStream in, 770 long inPos, FSDataInputStream sums, long sumsPos) { 771 return false; 772 } 773}