001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs;
020
021import java.io.EOFException;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InputStream;
025import java.nio.channels.ClosedChannelException;
026import java.util.Arrays;
027
028import com.google.common.base.Preconditions;
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.permission.FsPermission;
033import org.apache.hadoop.util.DataChecksum;
034import org.apache.hadoop.util.Progressable;
035
036/****************************************************************
037 * Abstract Checksumed FileSystem.
038 * It provide a basic implementation of a Checksumed FileSystem,
039 * which creates a checksum file for each raw file.
040 * It generates & verifies checksums at the client side.
041 *
042 *****************************************************************/
043@InterfaceAudience.Public
044@InterfaceStability.Stable
045public abstract class ChecksumFileSystem extends FilterFileSystem {
046  private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
047  private int bytesPerChecksum = 512;
048  private boolean verifyChecksum = true;
049  private boolean writeChecksum = true;
050
051  public static double getApproxChkSumLength(long size) {
052    return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
053  }
054  
055  public ChecksumFileSystem(FileSystem fs) {
056    super(fs);
057  }
058
059  @Override
060  public void setConf(Configuration conf) {
061    super.setConf(conf);
062    if (conf != null) {
063      bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
064                                     LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
065      Preconditions.checkState(bytesPerChecksum > 0,
066          "bytes per checksum should be positive but was %s",
067          bytesPerChecksum);
068    }
069  }
070  
071  /**
072   * Set whether to verify checksum.
073   */
074  @Override
075  public void setVerifyChecksum(boolean verifyChecksum) {
076    this.verifyChecksum = verifyChecksum;
077  }
078
079  @Override
080  public void setWriteChecksum(boolean writeChecksum) {
081    this.writeChecksum = writeChecksum;
082  }
083  
084  /** get the raw file system */
085  @Override
086  public FileSystem getRawFileSystem() {
087    return fs;
088  }
089
090  /** Return the name of the checksum file associated with a file.*/
091  public Path getChecksumFile(Path file) {
092    return new Path(file.getParent(), "." + file.getName() + ".crc");
093  }
094
095  /** Return true iff file is a checksum file name.*/
096  public static boolean isChecksumFile(Path file) {
097    String name = file.getName();
098    return name.startsWith(".") && name.endsWith(".crc");
099  }
100
101  /** Return the length of the checksum file given the size of the 
102   * actual file.
103   **/
104  public long getChecksumFileLength(Path file, long fileSize) {
105    return getChecksumLength(fileSize, getBytesPerSum());
106  }
107
108  /** Return the bytes Per Checksum */
109  public int getBytesPerSum() {
110    return bytesPerChecksum;
111  }
112
113  private int getSumBufferSize(int bytesPerSum, int bufferSize) {
114    int defaultBufferSize = getConf().getInt(
115                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
116                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
117    int proportionalBufferSize = bufferSize / bytesPerSum;
118    return Math.max(bytesPerSum,
119                    Math.max(proportionalBufferSize, defaultBufferSize));
120  }
121
122  /*******************************************************
123   * For open()'s FSInputStream
124   * It verifies that data matches checksums.
125   *******************************************************/
126  private static class ChecksumFSInputChecker extends FSInputChecker {
127    private ChecksumFileSystem fs;
128    private FSDataInputStream datas;
129    private FSDataInputStream sums;
130    
131    private static final int HEADER_LENGTH = 8;
132    
133    private int bytesPerSum = 1;
134    
135    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
136      throws IOException {
137      this(fs, file, fs.getConf().getInt(
138                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, 
139                       LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
140    }
141    
142    public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
143      throws IOException {
144      super( file, fs.getFileStatus(file).getReplication() );
145      this.datas = fs.getRawFileSystem().open(file, bufferSize);
146      this.fs = fs;
147      Path sumFile = fs.getChecksumFile(file);
148      try {
149        int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
150        sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
151
152        byte[] version = new byte[CHECKSUM_VERSION.length];
153        sums.readFully(version);
154        if (!Arrays.equals(version, CHECKSUM_VERSION))
155          throw new IOException("Not a checksum file: "+sumFile);
156        this.bytesPerSum = sums.readInt();
157        set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4);
158      } catch (FileNotFoundException e) {         // quietly ignore
159        set(fs.verifyChecksum, null, 1, 0);
160      } catch (IOException e) {                   // loudly ignore
161        LOG.warn("Problem opening checksum file: "+ file + 
162                 ".  Ignoring exception: " , e); 
163        set(fs.verifyChecksum, null, 1, 0);
164      }
165    }
166    
167    private long getChecksumFilePos( long dataPos ) {
168      return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
169    }
170    
171    @Override
172    protected long getChunkPosition( long dataPos ) {
173      return dataPos/bytesPerSum*bytesPerSum;
174    }
175    
176    @Override
177    public int available() throws IOException {
178      return datas.available() + super.available();
179    }
180    
181    @Override
182    public int read(long position, byte[] b, int off, int len)
183      throws IOException {
184      // parameter check
185      if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
186        throw new IndexOutOfBoundsException();
187      } else if (len == 0) {
188        return 0;
189      }
190      if( position<0 ) {
191        throw new IllegalArgumentException(
192            "Parameter position can not to be negative");
193      }
194
195      ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
196      checker.seek(position);
197      int nread = checker.read(b, off, len);
198      checker.close();
199      return nread;
200    }
201    
202    @Override
203    public void close() throws IOException {
204      datas.close();
205      if( sums != null ) {
206        sums.close();
207      }
208      set(fs.verifyChecksum, null, 1, 0);
209    }
210    
211
212    @Override
213    public boolean seekToNewSource(long targetPos) throws IOException {
214      long sumsPos = getChecksumFilePos(targetPos);
215      fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
216      boolean newDataSource = datas.seekToNewSource(targetPos);
217      return sums.seekToNewSource(sumsPos) || newDataSource;
218    }
219
220    @Override
221    protected int readChunk(long pos, byte[] buf, int offset, int len,
222        byte[] checksum) throws IOException {
223
224      boolean eof = false;
225      if (needChecksum()) {
226        assert checksum != null; // we have a checksum buffer
227        assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
228        assert len >= bytesPerSum; // we must read at least one chunk
229
230        final int checksumsToRead = Math.min(
231          len/bytesPerSum, // number of checksums based on len to read
232          checksum.length / CHECKSUM_SIZE); // size of checksum buffer
233        long checksumPos = getChecksumFilePos(pos); 
234        if(checksumPos != sums.getPos()) {
235          sums.seek(checksumPos);
236        }
237
238        int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
239        if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
240          throw new ChecksumException(
241            "Checksum file not a length multiple of checksum size " +
242            "in " + file + " at " + pos + " checksumpos: " + checksumPos +
243            " sumLenread: " + sumLenRead,
244            pos);
245        }
246        if (sumLenRead <= 0) { // we're at the end of the file
247          eof = true;
248        } else {
249          // Adjust amount of data to read based on how many checksum chunks we read
250          len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
251        }
252      }
253      if(pos != datas.getPos()) {
254        datas.seek(pos);
255      }
256      int nread = readFully(datas, buf, offset, len);
257      if (eof && nread > 0) {
258        throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
259      }
260      return nread;
261    }
262  }
263  
264  private static class FSDataBoundedInputStream extends FSDataInputStream {
265    private FileSystem fs;
266    private Path file;
267    private long fileLen = -1L;
268
269    FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) {
270      super(in);
271      this.fs = fs;
272      this.file = file;
273    }
274    
275    @Override
276    public boolean markSupported() {
277      return false;
278    }
279    
280    /* Return the file length */
281    private long getFileLength() throws IOException {
282      if( fileLen==-1L ) {
283        fileLen = fs.getContentSummary(file).getLength();
284      }
285      return fileLen;
286    }
287    
288    /**
289     * Skips over and discards <code>n</code> bytes of data from the
290     * input stream.
291     *
292     *The <code>skip</code> method skips over some smaller number of bytes
293     * when reaching end of file before <code>n</code> bytes have been skipped.
294     * The actual number of bytes skipped is returned.  If <code>n</code> is
295     * negative, no bytes are skipped.
296     *
297     * @param      n   the number of bytes to be skipped.
298     * @return     the actual number of bytes skipped.
299     * @exception  IOException  if an I/O error occurs.
300     *             ChecksumException if the chunk to skip to is corrupted
301     */
302    @Override
303    public synchronized long skip(long n) throws IOException {
304      long curPos = getPos();
305      long fileLength = getFileLength();
306      if( n+curPos > fileLength ) {
307        n = fileLength - curPos;
308      }
309      return super.skip(n);
310    }
311    
312    /**
313     * Seek to the given position in the stream.
314     * The next read() will be from that position.
315     * 
316     * <p>This method does not allow seek past the end of the file.
317     * This produces IOException.
318     *
319     * @param      pos   the postion to seek to.
320     * @exception  IOException  if an I/O error occurs or seeks after EOF
321     *             ChecksumException if the chunk to seek to is corrupted
322     */
323
324    @Override
325    public synchronized void seek(long pos) throws IOException {
326      if (pos > getFileLength()) {
327        throw new EOFException("Cannot seek after EOF");
328      }
329      super.seek(pos);
330    }
331
332  }
333
334  /**
335   * Opens an FSDataInputStream at the indicated Path.
336   * @param f the file name to open
337   * @param bufferSize the size of the buffer to be used.
338   */
339  @Override
340  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
341    FileSystem fs;
342    InputStream in;
343    if (verifyChecksum) {
344      fs = this;
345      in = new ChecksumFSInputChecker(this, f, bufferSize);
346    } else {
347      fs = getRawFileSystem();
348      in = fs.open(f, bufferSize);
349    }
350    return new FSDataBoundedInputStream(fs, f, in);
351  }
352
353  @Override
354  public FSDataOutputStream append(Path f, int bufferSize,
355      Progressable progress) throws IOException {
356    throw new IOException("Not supported");
357  }
358
359  @Override
360  public boolean truncate(Path f, long newLength) throws IOException {
361    throw new IOException("Not supported");
362  }
363
364  /**
365   * Calculated the length of the checksum file in bytes.
366   * @param size the length of the data file in bytes
367   * @param bytesPerSum the number of bytes in a checksum block
368   * @return the number of bytes in the checksum file
369   */
370  public static long getChecksumLength(long size, int bytesPerSum) {
371    //the checksum length is equal to size passed divided by bytesPerSum +
372    //bytes written in the beginning of the checksum file.  
373    return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
374             CHECKSUM_VERSION.length + 4;  
375  }
376
377  /** This class provides an output stream for a checksummed file.
378   * It generates checksums for data. */
379  private static class ChecksumFSOutputSummer extends FSOutputSummer {
380    private FSDataOutputStream datas;    
381    private FSDataOutputStream sums;
382    private static final float CHKSUM_AS_FRACTION = 0.01f;
383    private boolean isClosed = false;
384    
385    public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
386                          Path file, 
387                          boolean overwrite,
388                          int bufferSize,
389                          short replication,
390                          long blockSize,
391                          Progressable progress,
392                          FsPermission permission)
393      throws IOException {
394      super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
395          fs.getBytesPerSum()));
396      int bytesPerSum = fs.getBytesPerSum();
397      this.datas = fs.getRawFileSystem().create(file, permission, overwrite,
398                                         bufferSize, replication, blockSize,
399                                         progress);
400      int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
401      this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file),
402                                               permission, true, sumBufferSize,
403                                               replication, blockSize, null);
404      sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
405      sums.writeInt(bytesPerSum);
406    }
407    
408    @Override
409    public void close() throws IOException {
410      try {
411        flushBuffer();
412        sums.close();
413        datas.close();
414      } finally {
415        isClosed = true;
416      }
417    }
418    
419    @Override
420    protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
421        int ckoff, int cklen)
422    throws IOException {
423      datas.write(b, offset, len);
424      sums.write(checksum, ckoff, cklen);
425    }
426
427    @Override
428    protected void checkClosed() throws IOException {
429      if (isClosed) {
430        throw new ClosedChannelException();
431      }
432    }
433  }
434
435  @Override
436  public FSDataOutputStream create(Path f, FsPermission permission,
437      boolean overwrite, int bufferSize, short replication, long blockSize,
438      Progressable progress) throws IOException {
439    return create(f, permission, overwrite, true, bufferSize,
440        replication, blockSize, progress);
441  }
442
443  private FSDataOutputStream create(Path f, FsPermission permission,
444      boolean overwrite, boolean createParent, int bufferSize,
445      short replication, long blockSize,
446      Progressable progress) throws IOException {
447    Path parent = f.getParent();
448    if (parent != null) {
449      if (!createParent && !exists(parent)) {
450        throw new FileNotFoundException("Parent directory doesn't exist: "
451            + parent);
452      } else if (!mkdirs(parent)) {
453        throw new IOException("Mkdirs failed to create " + parent
454            + " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory()
455            + ")");
456      }
457    }
458    final FSDataOutputStream out;
459    if (writeChecksum) {
460      out = new FSDataOutputStream(
461          new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
462              blockSize, progress, permission), null);
463    } else {
464      out = fs.create(f, permission, overwrite, bufferSize, replication,
465          blockSize, progress);
466      // remove the checksum file since we aren't writing one
467      Path checkFile = getChecksumFile(f);
468      if (fs.exists(checkFile)) {
469        fs.delete(checkFile, true);
470      }
471    }
472    return out;
473  }
474
475  @Override
476  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
477      boolean overwrite, int bufferSize, short replication, long blockSize,
478      Progressable progress) throws IOException {
479    return create(f, permission, overwrite, false, bufferSize, replication,
480        blockSize, progress);
481  }
482
483  /**
484   * Set replication for an existing file.
485   * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
486   * @param src file name
487   * @param replication new replication
488   * @throws IOException
489   * @return true if successful;
490   *         false if file does not exist or is a directory
491   */
492  @Override
493  public boolean setReplication(Path src, short replication) throws IOException {
494    boolean value = fs.setReplication(src, replication);
495    if (!value)
496      return false;
497
498    Path checkFile = getChecksumFile(src);
499    if (exists(checkFile))
500      fs.setReplication(checkFile, replication);
501
502    return true;
503  }
504
505  /**
506   * Rename files/dirs
507   */
508  @Override
509  public boolean rename(Path src, Path dst) throws IOException {
510    if (fs.isDirectory(src)) {
511      return fs.rename(src, dst);
512    } else {
513      if (fs.isDirectory(dst)) {
514        dst = new Path(dst, src.getName());
515      }
516
517      boolean value = fs.rename(src, dst);
518      if (!value)
519        return false;
520
521      Path srcCheckFile = getChecksumFile(src);
522      Path dstCheckFile = getChecksumFile(dst);
523      if (fs.exists(srcCheckFile)) { //try to rename checksum
524        value = fs.rename(srcCheckFile, dstCheckFile);
525      } else if (fs.exists(dstCheckFile)) {
526        // no src checksum, so remove dst checksum
527        value = fs.delete(dstCheckFile, true); 
528      }
529
530      return value;
531    }
532  }
533
534  /**
535   * Implement the delete(Path, boolean) in checksum
536   * file system.
537   */
538  @Override
539  public boolean delete(Path f, boolean recursive) throws IOException{
540    FileStatus fstatus = null;
541    try {
542      fstatus = fs.getFileStatus(f);
543    } catch(FileNotFoundException e) {
544      return false;
545    }
546    if (fstatus.isDirectory()) {
547      //this works since the crcs are in the same
548      //directories and the files. so we just delete
549      //everything in the underlying filesystem
550      return fs.delete(f, recursive);
551    } else {
552      Path checkFile = getChecksumFile(f);
553      if (fs.exists(checkFile)) {
554        fs.delete(checkFile, true);
555      }
556      return fs.delete(f, true);
557    }
558  }
559    
560  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
561    @Override
562    public boolean accept(Path file) {
563      return !isChecksumFile(file);
564    }
565  };
566
567  /**
568   * List the statuses of the files/directories in the given path if the path is
569   * a directory.
570   * 
571   * @param f
572   *          given path
573   * @return the statuses of the files/directories in the given path
574   * @throws IOException
575   */
576  @Override
577  public FileStatus[] listStatus(Path f) throws IOException {
578    return fs.listStatus(f, DEFAULT_FILTER);
579  }
580  
581  /**
582   * List the statuses of the files/directories in the given path if the path is
583   * a directory.
584   * 
585   * @param f
586   *          given path
587   * @return the statuses of the files/directories in the given patch
588   * @throws IOException
589   */
590  @Override
591  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
592  throws IOException {
593    return fs.listLocatedStatus(f, DEFAULT_FILTER);
594  }
595  
596  @Override
597  public boolean mkdirs(Path f) throws IOException {
598    return fs.mkdirs(f);
599  }
600
601  @Override
602  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
603    throws IOException {
604    Configuration conf = getConf();
605    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
606  }
607
608  /**
609   * The src file is under FS, and the dst is on the local disk.
610   * Copy it from FS control to the local dst name.
611   */
612  @Override
613  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
614    throws IOException {
615    Configuration conf = getConf();
616    FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
617  }
618
619  /**
620   * The src file is under FS, and the dst is on the local disk.
621   * Copy it from FS control to the local dst name.
622   * If src and dst are directories, the copyCrc parameter
623   * determines whether to copy CRC files.
624   */
625  public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
626    throws IOException {
627    if (!fs.isDirectory(src)) { // source is a file
628      fs.copyToLocalFile(src, dst);
629      FileSystem localFs = getLocal(getConf()).getRawFileSystem();
630      if (localFs.isDirectory(dst)) {
631        dst = new Path(dst, src.getName());
632      }
633      dst = getChecksumFile(dst);
634      if (localFs.exists(dst)) { //remove old local checksum file
635        localFs.delete(dst, true);
636      }
637      Path checksumFile = getChecksumFile(src);
638      if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
639        fs.copyToLocalFile(checksumFile, dst);
640      }
641    } else {
642      FileStatus[] srcs = listStatus(src);
643      for (FileStatus srcFile : srcs) {
644        copyToLocalFile(srcFile.getPath(), 
645                        new Path(dst, srcFile.getPath().getName()), copyCrc);
646      }
647    }
648  }
649
650  @Override
651  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
652    throws IOException {
653    return tmpLocalFile;
654  }
655
656  @Override
657  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
658    throws IOException {
659    moveFromLocalFile(tmpLocalFile, fsOutputFile);
660  }
661
662  /**
663   * Report a checksum error to the file system.
664   * @param f the file name containing the error
665   * @param in the stream open on the file
666   * @param inPos the position of the beginning of the bad data in the file
667   * @param sums the stream open on the checksum file
668   * @param sumsPos the position of the beginning of the bad data in the checksum file
669   * @return if retry is necessary
670   */
671  public boolean reportChecksumFailure(Path f, FSDataInputStream in,
672                                       long inPos, FSDataInputStream sums, long sumsPos) {
673    return false;
674  }
675}