001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019
020package org.apache.hadoop.fs;
021
022import com.google.common.annotations.VisibleForTesting;
023
024import java.io.BufferedOutputStream;
025import java.io.DataOutput;
026import java.io.EOFException;
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.FileNotFoundException;
030import java.io.FileOutputStream;
031import java.io.IOException;
032import java.io.OutputStream;
033import java.io.FileDescriptor;
034import java.net.URI;
035import java.nio.ByteBuffer;
036import java.nio.file.Files;
037import java.nio.file.NoSuchFileException;
038import java.nio.file.attribute.BasicFileAttributes;
039import java.nio.file.attribute.BasicFileAttributeView;
040import java.nio.file.attribute.FileTime;
041import java.util.Arrays;
042import java.util.EnumSet;
043import java.util.StringTokenizer;
044
045import org.apache.hadoop.classification.InterfaceAudience;
046import org.apache.hadoop.classification.InterfaceStability;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.permission.FsPermission;
049import org.apache.hadoop.io.IOUtils;
050import org.apache.hadoop.io.nativeio.NativeIO;
051import org.apache.hadoop.util.Progressable;
052import org.apache.hadoop.util.Shell;
053import org.apache.hadoop.util.StringUtils;
054
055/****************************************************************
056 * Implement the FileSystem API for the raw local filesystem.
057 *
058 *****************************************************************/
059@InterfaceAudience.Public
060@InterfaceStability.Stable
061public class RawLocalFileSystem extends FileSystem {
062  static final URI NAME = URI.create("file:///");
063  private Path workingDir;
064  // Temporary workaround for HADOOP-9652.
065  private static boolean useDeprecatedFileStatus = true;
066
067  private FsPermission umask;
068
069  @VisibleForTesting
070  public static void useStatIfAvailable() {
071    useDeprecatedFileStatus = !Stat.isAvailable();
072  }
073  
074  public RawLocalFileSystem() {
075    workingDir = getInitialWorkingDirectory();
076  }
077  
078  private Path makeAbsolute(Path f) {
079    if (f.isAbsolute()) {
080      return f;
081    } else {
082      return new Path(workingDir, f);
083    }
084  }
085  
086  /** Convert a path to a File. */
087  public File pathToFile(Path path) {
088    checkPath(path);
089    if (!path.isAbsolute()) {
090      path = new Path(getWorkingDirectory(), path);
091    }
092    return new File(path.toUri().getPath());
093  }
094
095  @Override
096  public URI getUri() { return NAME; }
097  
098  @Override
099  public void initialize(URI uri, Configuration conf) throws IOException {
100    super.initialize(uri, conf);
101    setConf(conf);
102    umask = FsPermission.getUMask(conf);
103  }
104  
105  /*******************************************************
106   * For open()'s FSInputStream.
107   *******************************************************/
108  class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor {
109    private FileInputStream fis;
110    private long position;
111
112    public LocalFSFileInputStream(Path f) throws IOException {
113      fis = new FileInputStream(pathToFile(f));
114    }
115    
116    @Override
117    public void seek(long pos) throws IOException {
118      if (pos < 0) {
119        throw new EOFException(
120          FSExceptionMessages.NEGATIVE_SEEK);
121      }
122      fis.getChannel().position(pos);
123      this.position = pos;
124    }
125    
126    @Override
127    public long getPos() throws IOException {
128      return this.position;
129    }
130    
131    @Override
132    public boolean seekToNewSource(long targetPos) throws IOException {
133      return false;
134    }
135    
136    /*
137     * Just forward to the fis
138     */
139    @Override
140    public int available() throws IOException { return fis.available(); }
141    @Override
142    public void close() throws IOException { fis.close(); }
143    @Override
144    public boolean markSupported() { return false; }
145    
146    @Override
147    public int read() throws IOException {
148      try {
149        int value = fis.read();
150        if (value >= 0) {
151          this.position++;
152          statistics.incrementBytesRead(1);
153        }
154        return value;
155      } catch (IOException e) {                 // unexpected exception
156        throw new FSError(e);                   // assume native fs error
157      }
158    }
159    
160    @Override
161    public int read(byte[] b, int off, int len) throws IOException {
162      // parameter check
163      validatePositionedReadArgs(position, b, off, len);
164      try {
165        int value = fis.read(b, off, len);
166        if (value > 0) {
167          this.position += value;
168          statistics.incrementBytesRead(value);
169        }
170        return value;
171      } catch (IOException e) {                 // unexpected exception
172        throw new FSError(e);                   // assume native fs error
173      }
174    }
175    
176    @Override
177    public int read(long position, byte[] b, int off, int len)
178      throws IOException {
179      // parameter check
180      validatePositionedReadArgs(position, b, off, len);
181      if (len == 0) {
182        return 0;
183      }
184
185      ByteBuffer bb = ByteBuffer.wrap(b, off, len);
186      try {
187        int value = fis.getChannel().read(bb, position);
188        if (value > 0) {
189          statistics.incrementBytesRead(value);
190        }
191        return value;
192      } catch (IOException e) {
193        throw new FSError(e);
194      }
195    }
196    
197    @Override
198    public long skip(long n) throws IOException {
199      long value = fis.skip(n);
200      if (value > 0) {
201        this.position += value;
202      }
203      return value;
204    }
205
206    @Override
207    public FileDescriptor getFileDescriptor() throws IOException {
208      return fis.getFD();
209    }
210  }
211  
212  @Override
213  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
214    if (!exists(f)) {
215      throw new FileNotFoundException(f.toString());
216    }
217    return new FSDataInputStream(new BufferedFSInputStream(
218        new LocalFSFileInputStream(f), bufferSize));
219  }
220  
221  /*********************************************************
222   * For create()'s FSOutputStream.
223   *********************************************************/
224  class LocalFSFileOutputStream extends OutputStream {
225    private FileOutputStream fos;
226    
227    private LocalFSFileOutputStream(Path f, boolean append,
228        FsPermission permission) throws IOException {
229      File file = pathToFile(f);
230      if (!append && permission == null) {
231        permission = FsPermission.getFileDefault();
232      }
233      if (permission == null) {
234        this.fos = new FileOutputStream(file, append);
235      } else {
236        permission = permission.applyUMask(umask);
237        if (Shell.WINDOWS && NativeIO.isAvailable()) {
238          this.fos = NativeIO.Windows.createFileOutputStreamWithMode(file,
239              append, permission.toShort());
240        } else {
241          this.fos = new FileOutputStream(file, append);
242          boolean success = false;
243          try {
244            setPermission(f, permission);
245            success = true;
246          } finally {
247            if (!success) {
248              IOUtils.cleanup(LOG, this.fos);
249            }
250          }
251        }
252      }
253    }
254    
255    /*
256     * Just forward to the fos
257     */
258    @Override
259    public void close() throws IOException { fos.close(); }
260    @Override
261    public void flush() throws IOException { fos.flush(); }
262    @Override
263    public void write(byte[] b, int off, int len) throws IOException {
264      try {
265        fos.write(b, off, len);
266      } catch (IOException e) {                // unexpected exception
267        throw new FSError(e);                  // assume native fs error
268      }
269    }
270    
271    @Override
272    public void write(int b) throws IOException {
273      try {
274        fos.write(b);
275      } catch (IOException e) {              // unexpected exception
276        throw new FSError(e);                // assume native fs error
277      }
278    }
279  }
280
281  @Override
282  public FSDataOutputStream append(Path f, int bufferSize,
283      Progressable progress) throws IOException {
284    if (!exists(f)) {
285      throw new FileNotFoundException("File " + f + " not found");
286    }
287    FileStatus status = getFileStatus(f);
288    if (status.isDirectory()) {
289      throw new IOException("Cannot append to a diretory (=" + f + " )");
290    }
291    return new FSDataOutputStream(new BufferedOutputStream(
292        createOutputStreamWithMode(f, true, null), bufferSize), statistics,
293        status.getLen());
294  }
295
296  @Override
297  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
298    short replication, long blockSize, Progressable progress)
299    throws IOException {
300    return create(f, overwrite, true, bufferSize, replication, blockSize,
301        progress, null);
302  }
303
304  private FSDataOutputStream create(Path f, boolean overwrite,
305      boolean createParent, int bufferSize, short replication, long blockSize,
306      Progressable progress, FsPermission permission) throws IOException {
307    if (exists(f) && !overwrite) {
308      throw new FileAlreadyExistsException("File already exists: " + f);
309    }
310    Path parent = f.getParent();
311    if (parent != null && !mkdirs(parent)) {
312      throw new IOException("Mkdirs failed to create " + parent.toString());
313    }
314    return new FSDataOutputStream(new BufferedOutputStream(
315        createOutputStreamWithMode(f, false, permission), bufferSize),
316        statistics);
317  }
318  
319  protected OutputStream createOutputStream(Path f, boolean append) 
320      throws IOException {
321    return createOutputStreamWithMode(f, append, null);
322  }
323
324  protected OutputStream createOutputStreamWithMode(Path f, boolean append,
325      FsPermission permission) throws IOException {
326    return new LocalFSFileOutputStream(f, append, permission);
327  }
328  
329  @Override
330  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
331      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
332      Progressable progress) throws IOException {
333    if (exists(f) && !flags.contains(CreateFlag.OVERWRITE)) {
334      throw new FileAlreadyExistsException("File already exists: " + f);
335    }
336    return new FSDataOutputStream(new BufferedOutputStream(
337        createOutputStreamWithMode(f, false, permission), bufferSize),
338            statistics);
339  }
340
341  @Override
342  public FSDataOutputStream create(Path f, FsPermission permission,
343    boolean overwrite, int bufferSize, short replication, long blockSize,
344    Progressable progress) throws IOException {
345
346    FSDataOutputStream out = create(f, overwrite, true, bufferSize, replication,
347        blockSize, progress, permission);
348    return out;
349  }
350
351  @Override
352  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
353      boolean overwrite,
354      int bufferSize, short replication, long blockSize,
355      Progressable progress) throws IOException {
356    FSDataOutputStream out = create(f, overwrite, false, bufferSize, replication,
357        blockSize, progress, permission);
358    return out;
359  }
360
361  @Override
362  public boolean rename(Path src, Path dst) throws IOException {
363    // Attempt rename using Java API.
364    File srcFile = pathToFile(src);
365    File dstFile = pathToFile(dst);
366    if (srcFile.renameTo(dstFile)) {
367      return true;
368    }
369
370    // Else try POSIX style rename on Windows only
371    if (Shell.WINDOWS &&
372        handleEmptyDstDirectoryOnWindows(src, srcFile, dst, dstFile)) {
373      return true;
374    }
375
376    // The fallback behavior accomplishes the rename by a full copy.
377    if (LOG.isDebugEnabled()) {
378      LOG.debug("Falling through to a copy of " + src + " to " + dst);
379    }
380    return FileUtil.copy(this, src, this, dst, true, getConf());
381  }
382
383  @VisibleForTesting
384  public final boolean handleEmptyDstDirectoryOnWindows(Path src, File srcFile,
385      Path dst, File dstFile) throws IOException {
386
387    // Enforce POSIX rename behavior that a source directory replaces an
388    // existing destination if the destination is an empty directory. On most
389    // platforms, this is already handled by the Java API call above. Some
390    // platforms (notably Windows) do not provide this behavior, so the Java API
391    // call renameTo(dstFile) fails. Delete destination and attempt rename
392    // again.
393    if (this.exists(dst)) {
394      FileStatus sdst = this.getFileStatus(dst);
395      if (sdst.isDirectory() && dstFile.list().length == 0) {
396        if (LOG.isDebugEnabled()) {
397          LOG.debug("Deleting empty destination and renaming " + src + " to " +
398            dst);
399        }
400        if (this.delete(dst, false) && srcFile.renameTo(dstFile)) {
401          return true;
402        }
403      }
404    }
405    return false;
406  }
407
408  @Override
409  public boolean truncate(Path f, final long newLength) throws IOException {
410    FileStatus status = getFileStatus(f);
411    if(status == null) {
412      throw new FileNotFoundException("File " + f + " not found");
413    }
414    if(status.isDirectory()) {
415      throw new IOException("Cannot truncate a directory (=" + f + ")");
416    }
417    long oldLength = status.getLen();
418    if(newLength > oldLength) {
419      throw new IllegalArgumentException(
420          "Cannot truncate to a larger file size. Current size: " + oldLength +
421          ", truncate size: " + newLength + ".");
422    }
423    try (FileOutputStream out = new FileOutputStream(pathToFile(f), true)) {
424      try {
425        out.getChannel().truncate(newLength);
426      } catch(IOException e) {
427        throw new FSError(e);
428      }
429    }
430    return true;
431  }
432  
433  /**
434   * Delete the given path to a file or directory.
435   * @param p the path to delete
436   * @param recursive to delete sub-directories
437   * @return true if the file or directory and all its contents were deleted
438   * @throws IOException if p is non-empty and recursive is false 
439   */
440  @Override
441  public boolean delete(Path p, boolean recursive) throws IOException {
442    File f = pathToFile(p);
443    if (!f.exists()) {
444      //no path, return false "nothing to delete"
445      return false;
446    }
447    if (f.isFile()) {
448      return f.delete();
449    } else if (!recursive && f.isDirectory() && 
450        (FileUtil.listFiles(f).length != 0)) {
451      throw new IOException("Directory " + f.toString() + " is not empty");
452    }
453    return FileUtil.fullyDelete(f);
454  }
455 
456  /**
457   * {@inheritDoc}
458   *
459   * (<b>Note</b>: Returned list is not sorted in any given order,
460   * due to reliance on Java's {@link File#list()} API.)
461   */
462  @Override
463  public FileStatus[] listStatus(Path f) throws IOException {
464    File localf = pathToFile(f);
465    FileStatus[] results;
466
467    if (!localf.exists()) {
468      throw new FileNotFoundException("File " + f + " does not exist");
469    }
470
471    if (localf.isDirectory()) {
472      String[] names = localf.list();
473      if (names == null) {
474        return null;
475      }
476      results = new FileStatus[names.length];
477      int j = 0;
478      for (int i = 0; i < names.length; i++) {
479        try {
480          // Assemble the path using the Path 3 arg constructor to make sure
481          // paths with colon are properly resolved on Linux
482          results[j] = getFileStatus(new Path(f, new Path(null, null,
483                                                          names[i])));
484          j++;
485        } catch (FileNotFoundException e) {
486          // ignore the files not found since the dir list may have have
487          // changed since the names[] list was generated.
488        }
489      }
490      if (j == names.length) {
491        return results;
492      }
493      return Arrays.copyOf(results, j);
494    }
495
496    if (!useDeprecatedFileStatus) {
497      return new FileStatus[] { getFileStatus(f) };
498    }
499    return new FileStatus[] {
500        new DeprecatedRawLocalFileStatus(localf,
501        getDefaultBlockSize(f), this) };
502  }
503  
504  protected boolean mkOneDir(File p2f) throws IOException {
505    return mkOneDirWithMode(new Path(p2f.getAbsolutePath()), p2f, null);
506  }
507
508  protected boolean mkOneDirWithMode(Path p, File p2f, FsPermission permission)
509      throws IOException {
510    if (permission == null) {
511      permission = FsPermission.getDirDefault();
512    }
513    permission = permission.applyUMask(umask);
514    if (Shell.WINDOWS && NativeIO.isAvailable()) {
515      try {
516        NativeIO.Windows.createDirectoryWithMode(p2f, permission.toShort());
517        return true;
518      } catch (IOException e) {
519        if (LOG.isDebugEnabled()) {
520          LOG.debug(String.format(
521              "NativeIO.createDirectoryWithMode error, path = %s, mode = %o",
522              p2f, permission.toShort()), e);
523        }
524        return false;
525      }
526    } else {
527      boolean b = p2f.mkdir();
528      if (b) {
529        setPermission(p, permission);
530      }
531      return b;
532    }
533  }
534
535  /**
536   * Creates the specified directory hierarchy. Does not
537   * treat existence as an error.
538   */
539  @Override
540  public boolean mkdirs(Path f) throws IOException {
541    return mkdirsWithOptionalPermission(f, null);
542  }
543
544  @Override
545  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
546    return mkdirsWithOptionalPermission(f, permission);
547  }
548
549  private boolean mkdirsWithOptionalPermission(Path f, FsPermission permission)
550      throws IOException {
551    if(f == null) {
552      throw new IllegalArgumentException("mkdirs path arg is null");
553    }
554    Path parent = f.getParent();
555    File p2f = pathToFile(f);
556    File parent2f = null;
557    if(parent != null) {
558      parent2f = pathToFile(parent);
559      if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {
560        throw new ParentNotDirectoryException("Parent path is not a directory: "
561            + parent);
562      }
563    }
564    if (p2f.exists() && !p2f.isDirectory()) {
565      throw new FileNotFoundException("Destination exists" +
566              " and is not a directory: " + p2f.getCanonicalPath());
567    }
568    return (parent == null || parent2f.exists() || mkdirs(parent)) &&
569      (mkOneDirWithMode(f, p2f, permission) || p2f.isDirectory());
570  }
571  
572  
573  @Override
574  public Path getHomeDirectory() {
575    return this.makeQualified(new Path(System.getProperty("user.home")));
576  }
577
578  /**
579   * Set the working directory to the given directory.
580   */
581  @Override
582  public void setWorkingDirectory(Path newDir) {
583    workingDir = makeAbsolute(newDir);
584    checkPath(workingDir);
585  }
586  
587  @Override
588  public Path getWorkingDirectory() {
589    return workingDir;
590  }
591  
592  @Override
593  protected Path getInitialWorkingDirectory() {
594    return this.makeQualified(new Path(System.getProperty("user.dir")));
595  }
596
597  @Override
598  public FsStatus getStatus(Path p) throws IOException {
599    File partition = pathToFile(p == null ? new Path("/") : p);
600    //File provides getUsableSpace() and getFreeSpace()
601    //File provides no API to obtain used space, assume used = total - free
602    return new FsStatus(partition.getTotalSpace(), 
603      partition.getTotalSpace() - partition.getFreeSpace(),
604      partition.getFreeSpace());
605  }
606  
607  // In the case of the local filesystem, we can just rename the file.
608  @Override
609  public void moveFromLocalFile(Path src, Path dst) throws IOException {
610    rename(src, dst);
611  }
612  
613  // We can write output directly to the final location
614  @Override
615  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
616    throws IOException {
617    return fsOutputFile;
618  }
619  
620  // It's in the right place - nothing to do.
621  @Override
622  public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
623    throws IOException {
624  }
625  
626  @Override
627  public void close() throws IOException {
628    super.close();
629  }
630  
631  @Override
632  public String toString() {
633    return "LocalFS";
634  }
635  
636  @Override
637  public FileStatus getFileStatus(Path f) throws IOException {
638    return getFileLinkStatusInternal(f, true);
639  }
640
641  @Deprecated
642  private FileStatus deprecatedGetFileStatus(Path f) throws IOException {
643    File path = pathToFile(f);
644    if (path.exists()) {
645      return new DeprecatedRawLocalFileStatus(pathToFile(f),
646          getDefaultBlockSize(f), this);
647    } else {
648      throw new FileNotFoundException("File " + f + " does not exist");
649    }
650  }
651
652  @Deprecated
653  static class DeprecatedRawLocalFileStatus extends FileStatus {
654    /* We can add extra fields here. It breaks at least CopyFiles.FilePair().
655     * We recognize if the information is already loaded by check if
656     * onwer.equals("").
657     */
658    private boolean isPermissionLoaded() {
659      return !super.getOwner().isEmpty(); 
660    }
661
662    private static long getLastAccessTime(File f) throws IOException {
663      long accessTime;
664      try {
665        accessTime = Files.readAttributes(f.toPath(),
666            BasicFileAttributes.class).lastAccessTime().toMillis();
667      } catch (NoSuchFileException e) {
668        throw new FileNotFoundException("File " + f + " does not exist");
669      }
670      return accessTime;
671    }
672
673    DeprecatedRawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs)
674      throws IOException {
675      super(f.length(), f.isDirectory(), 1, defaultBlockSize,
676          f.lastModified(), getLastAccessTime(f),
677          null, null, null,
678          new Path(f.getPath()).makeQualified(fs.getUri(),
679            fs.getWorkingDirectory()));
680    }
681    
682    @Override
683    public FsPermission getPermission() {
684      if (!isPermissionLoaded()) {
685        loadPermissionInfo();
686      }
687      return super.getPermission();
688    }
689
690    @Override
691    public String getOwner() {
692      if (!isPermissionLoaded()) {
693        loadPermissionInfo();
694      }
695      return super.getOwner();
696    }
697
698    @Override
699    public String getGroup() {
700      if (!isPermissionLoaded()) {
701        loadPermissionInfo();
702      }
703      return super.getGroup();
704    }
705
706    /// loads permissions, owner, and group from `ls -ld`
707    private void loadPermissionInfo() {
708      IOException e = null;
709      try {
710        String output = FileUtil.execCommand(new File(getPath().toUri()), 
711            Shell.getGetPermissionCommand());
712        StringTokenizer t =
713            new StringTokenizer(output, Shell.TOKEN_SEPARATOR_REGEX);
714        //expected format
715        //-rw-------    1 username groupname ...
716        String permission = t.nextToken();
717        if (permission.length() > FsPermission.MAX_PERMISSION_LENGTH) {
718          //files with ACLs might have a '+'
719          permission = permission.substring(0,
720            FsPermission.MAX_PERMISSION_LENGTH);
721        }
722        setPermission(FsPermission.valueOf(permission));
723        t.nextToken();
724
725        String owner = t.nextToken();
726        // If on windows domain, token format is DOMAIN\\user and we want to
727        // extract only the user name
728        if (Shell.WINDOWS) {
729          int i = owner.indexOf('\\');
730          if (i != -1)
731            owner = owner.substring(i + 1);
732        }
733        setOwner(owner);
734
735        setGroup(t.nextToken());
736      } catch (Shell.ExitCodeException ioe) {
737        if (ioe.getExitCode() != 1) {
738          e = ioe;
739        } else {
740          setPermission(null);
741          setOwner(null);
742          setGroup(null);
743        }
744      } catch (IOException ioe) {
745        e = ioe;
746      } finally {
747        if (e != null) {
748          throw new RuntimeException("Error while running command to get " +
749                                     "file permissions : " + 
750                                     StringUtils.stringifyException(e));
751        }
752      }
753    }
754
755    @Override
756    public void write(DataOutput out) throws IOException {
757      if (!isPermissionLoaded()) {
758        loadPermissionInfo();
759      }
760      super.write(out);
761    }
762  }
763
764  /**
765   * Use the command chown to set owner.
766   */
767  @Override
768  public void setOwner(Path p, String username, String groupname)
769    throws IOException {
770    FileUtil.setOwner(pathToFile(p), username, groupname);
771  }
772
773  /**
774   * Use the command chmod to set permission.
775   */
776  @Override
777  public void setPermission(Path p, FsPermission permission)
778    throws IOException {
779    if (NativeIO.isAvailable()) {
780      NativeIO.POSIX.chmod(pathToFile(p).getCanonicalPath(),
781                     permission.toShort());
782    } else {
783      String perm = String.format("%04o", permission.toShort());
784      Shell.execCommand(Shell.getSetPermissionCommand(perm, false,
785        FileUtil.makeShellPath(pathToFile(p), true)));
786    }
787  }
788 
789  /**
790   * Sets the {@link Path}'s last modified time and last access time to
791   * the given valid times.
792   *
793   * @param mtime the modification time to set (only if no less than zero).
794   * @param atime the access time to set (only if no less than zero).
795   * @throws IOException if setting the times fails.
796   */
797  @Override
798  public void setTimes(Path p, long mtime, long atime) throws IOException {
799    try {
800      BasicFileAttributeView view = Files.getFileAttributeView(
801          pathToFile(p).toPath(), BasicFileAttributeView.class);
802      FileTime fmtime = (mtime >= 0) ? FileTime.fromMillis(mtime) : null;
803      FileTime fatime = (atime >= 0) ? FileTime.fromMillis(atime) : null;
804      view.setTimes(fmtime, fatime, null);
805    } catch (NoSuchFileException e) {
806      throw new FileNotFoundException("File " + p + " does not exist");
807    }
808  }
809
810  @Override
811  public boolean supportsSymlinks() {
812    return true;
813  }
814
815  @SuppressWarnings("deprecation")
816  @Override
817  public void createSymlink(Path target, Path link, boolean createParent)
818      throws IOException {
819    if (!FileSystem.areSymlinksEnabled()) {
820      throw new UnsupportedOperationException("Symlinks not supported");
821    }
822    final String targetScheme = target.toUri().getScheme();
823    if (targetScheme != null && !"file".equals(targetScheme)) {
824      throw new IOException("Unable to create symlink to non-local file "+
825                            "system: "+target.toString());
826    }
827    if (createParent) {
828      mkdirs(link.getParent());
829    }
830
831    // NB: Use createSymbolicLink in java.nio.file.Path once available
832    int result = FileUtil.symLink(target.toString(),
833        makeAbsolute(link).toString());
834    if (result != 0) {
835      throw new IOException("Error " + result + " creating symlink " +
836          link + " to " + target);
837    }
838  }
839
840  /**
841   * Return a FileStatus representing the given path. If the path refers
842   * to a symlink return a FileStatus representing the link rather than
843   * the object the link refers to.
844   */
845  @Override
846  public FileStatus getFileLinkStatus(final Path f) throws IOException {
847    FileStatus fi = getFileLinkStatusInternal(f, false);
848    // getFileLinkStatus is supposed to return a symlink with a
849    // qualified path
850    if (fi.isSymlink()) {
851      Path targetQual = FSLinkResolver.qualifySymlinkTarget(this.getUri(),
852          fi.getPath(), fi.getSymlink());
853      fi.setSymlink(targetQual);
854    }
855    return fi;
856  }
857
858  /**
859   * Public {@link FileStatus} methods delegate to this function, which in turn
860   * either call the new {@link Stat} based implementation or the deprecated
861   * methods based on platform support.
862   * 
863   * @param f Path to stat
864   * @param dereference whether to dereference the final path component if a
865   *          symlink
866   * @return FileStatus of f
867   * @throws IOException
868   */
869  private FileStatus getFileLinkStatusInternal(final Path f,
870      boolean dereference) throws IOException {
871    if (!useDeprecatedFileStatus) {
872      return getNativeFileLinkStatus(f, dereference);
873    } else if (dereference) {
874      return deprecatedGetFileStatus(f);
875    } else {
876      return deprecatedGetFileLinkStatusInternal(f);
877    }
878  }
879
880  /**
881   * Deprecated. Remains for legacy support. Should be removed when {@link Stat}
882   * gains support for Windows and other operating systems.
883   */
884  @Deprecated
885  private FileStatus deprecatedGetFileLinkStatusInternal(final Path f)
886      throws IOException {
887    String target = FileUtil.readLink(new File(f.toString()));
888
889    try {
890      FileStatus fs = getFileStatus(f);
891      // If f refers to a regular file or directory
892      if (target.isEmpty()) {
893        return fs;
894      }
895      // Otherwise f refers to a symlink
896      return new FileStatus(fs.getLen(),
897          false,
898          fs.getReplication(),
899          fs.getBlockSize(),
900          fs.getModificationTime(),
901          fs.getAccessTime(),
902          fs.getPermission(),
903          fs.getOwner(),
904          fs.getGroup(),
905          new Path(target),
906          f);
907    } catch (FileNotFoundException e) {
908      /* The exists method in the File class returns false for dangling
909       * links so we can get a FileNotFoundException for links that exist.
910       * It's also possible that we raced with a delete of the link. Use
911       * the readBasicFileAttributes method in java.nio.file.attributes
912       * when available.
913       */
914      if (!target.isEmpty()) {
915        return new FileStatus(0, false, 0, 0, 0, 0, FsPermission.getDefault(),
916            "", "", new Path(target), f);
917      }
918      // f refers to a file or directory that does not exist
919      throw e;
920    }
921  }
922  /**
923   * Calls out to platform's native stat(1) implementation to get file metadata
924   * (permissions, user, group, atime, mtime, etc). This works around the lack
925   * of lstat(2) in Java 6.
926   * 
927   *  Currently, the {@link Stat} class used to do this only supports Linux
928   *  and FreeBSD, so the old {@link #deprecatedGetFileLinkStatusInternal(Path)}
929   *  implementation (deprecated) remains further OS support is added.
930   *
931   * @param f File to stat
932   * @param dereference whether to dereference symlinks
933   * @return FileStatus of f
934   * @throws IOException
935   */
936  private FileStatus getNativeFileLinkStatus(final Path f,
937      boolean dereference) throws IOException {
938    checkPath(f);
939    Stat stat = new Stat(f, getDefaultBlockSize(f), dereference, this);
940    FileStatus status = stat.getFileStatus();
941    return status;
942  }
943
944  @Override
945  public Path getLinkTarget(Path f) throws IOException {
946    FileStatus fi = getFileLinkStatusInternal(f, false);
947    // return an unqualified symlink target
948    return fi.getSymlink();
949  }
950}