001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019
020package org.apache.hadoop.fs;
021
022import com.google.common.annotations.VisibleForTesting;
023
024import java.io.BufferedOutputStream;
025import java.io.DataOutput;
026import java.io.EOFException;
027import java.io.File;
028import java.io.FileInputStream;
029import java.io.FileNotFoundException;
030import java.io.FileOutputStream;
031import java.io.IOException;
032import java.io.OutputStream;
033import java.io.FileDescriptor;
034import java.net.URI;
035import java.nio.ByteBuffer;
036import java.nio.file.AccessDeniedException;
037import java.nio.file.Files;
038import java.nio.file.NoSuchFileException;
039import java.nio.file.attribute.BasicFileAttributes;
040import java.nio.file.attribute.BasicFileAttributeView;
041import java.nio.file.attribute.FileTime;
042import java.util.Arrays;
043import java.util.EnumSet;
044import java.util.StringTokenizer;
045
046import org.apache.hadoop.classification.InterfaceAudience;
047import org.apache.hadoop.classification.InterfaceStability;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.permission.FsPermission;
050import org.apache.hadoop.io.IOUtils;
051import org.apache.hadoop.io.nativeio.NativeIO;
052import org.apache.hadoop.util.Progressable;
053import org.apache.hadoop.util.Shell;
054import org.apache.hadoop.util.StringUtils;
055
056/****************************************************************
057 * Implement the FileSystem API for the raw local filesystem.
058 *
059 *****************************************************************/
060@InterfaceAudience.Public
061@InterfaceStability.Stable
062public class RawLocalFileSystem extends FileSystem {
063  static final URI NAME = URI.create("file:///");
064  private Path workingDir;
065  // Temporary workaround for HADOOP-9652.
066  private static boolean useDeprecatedFileStatus = true;
067
068  private FsPermission umask;
069
070  @VisibleForTesting
071  public static void useStatIfAvailable() {
072    useDeprecatedFileStatus = !Stat.isAvailable();
073  }
074  
075  public RawLocalFileSystem() {
076    workingDir = getInitialWorkingDirectory();
077  }
078  
079  private Path makeAbsolute(Path f) {
080    if (f.isAbsolute()) {
081      return f;
082    } else {
083      return new Path(workingDir, f);
084    }
085  }
086  
087  /** Convert a path to a File. */
088  public File pathToFile(Path path) {
089    checkPath(path);
090    if (!path.isAbsolute()) {
091      path = new Path(getWorkingDirectory(), path);
092    }
093    return new File(path.toUri().getPath());
094  }
095
096  @Override
097  public URI getUri() { return NAME; }
098  
099  @Override
100  public void initialize(URI uri, Configuration conf) throws IOException {
101    super.initialize(uri, conf);
102    setConf(conf);
103    umask = FsPermission.getUMask(conf);
104  }
105  
106  /*******************************************************
107   * For open()'s FSInputStream.
108   *******************************************************/
109  class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor {
110    private FileInputStream fis;
111    private long position;
112
113    public LocalFSFileInputStream(Path f) throws IOException {
114      fis = new FileInputStream(pathToFile(f));
115    }
116    
117    @Override
118    public void seek(long pos) throws IOException {
119      if (pos < 0) {
120        throw new EOFException(
121          FSExceptionMessages.NEGATIVE_SEEK);
122      }
123      fis.getChannel().position(pos);
124      this.position = pos;
125    }
126    
127    @Override
128    public long getPos() throws IOException {
129      return this.position;
130    }
131    
132    @Override
133    public boolean seekToNewSource(long targetPos) throws IOException {
134      return false;
135    }
136    
137    /*
138     * Just forward to the fis
139     */
140    @Override
141    public int available() throws IOException { return fis.available(); }
142    @Override
143    public void close() throws IOException { fis.close(); }
144    @Override
145    public boolean markSupported() { return false; }
146    
147    @Override
148    public int read() throws IOException {
149      try {
150        int value = fis.read();
151        if (value >= 0) {
152          this.position++;
153          statistics.incrementBytesRead(1);
154        }
155        return value;
156      } catch (IOException e) {                 // unexpected exception
157        throw new FSError(e);                   // assume native fs error
158      }
159    }
160    
161    @Override
162    public int read(byte[] b, int off, int len) throws IOException {
163      try {
164        int value = fis.read(b, off, len);
165        if (value > 0) {
166          this.position += value;
167          statistics.incrementBytesRead(value);
168        }
169        return value;
170      } catch (IOException e) {                 // unexpected exception
171        throw new FSError(e);                   // assume native fs error
172      }
173    }
174    
175    @Override
176    public int read(long position, byte[] b, int off, int len)
177      throws IOException {
178      ByteBuffer bb = ByteBuffer.wrap(b, off, len);
179      try {
180        int value = fis.getChannel().read(bb, position);
181        if (value > 0) {
182          statistics.incrementBytesRead(value);
183        }
184        return value;
185      } catch (IOException e) {
186        throw new FSError(e);
187      }
188    }
189    
190    @Override
191    public long skip(long n) throws IOException {
192      long value = fis.skip(n);
193      if (value > 0) {
194        this.position += value;
195      }
196      return value;
197    }
198
199    @Override
200    public FileDescriptor getFileDescriptor() throws IOException {
201      return fis.getFD();
202    }
203  }
204  
205  @Override
206  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
207    if (!exists(f)) {
208      throw new FileNotFoundException(f.toString());
209    }
210    return new FSDataInputStream(new BufferedFSInputStream(
211        new LocalFSFileInputStream(f), bufferSize));
212  }
213  
214  /*********************************************************
215   * For create()'s FSOutputStream.
216   *********************************************************/
217  class LocalFSFileOutputStream extends OutputStream {
218    private FileOutputStream fos;
219    
220    private LocalFSFileOutputStream(Path f, boolean append,
221        FsPermission permission) throws IOException {
222      File file = pathToFile(f);
223      if (!append && permission == null) {
224        permission = FsPermission.getFileDefault();
225      }
226      if (permission == null) {
227        this.fos = new FileOutputStream(file, append);
228      } else {
229        permission = permission.applyUMask(umask);
230        if (Shell.WINDOWS && NativeIO.isAvailable()) {
231          this.fos = NativeIO.Windows.createFileOutputStreamWithMode(file,
232              append, permission.toShort());
233        } else {
234          this.fos = new FileOutputStream(file, append);
235          boolean success = false;
236          try {
237            setPermission(f, permission);
238            success = true;
239          } finally {
240            if (!success) {
241              IOUtils.cleanup(LOG, this.fos);
242            }
243          }
244        }
245      }
246    }
247    
248    /*
249     * Just forward to the fos
250     */
251    @Override
252    public void close() throws IOException { fos.close(); }
253    @Override
254    public void flush() throws IOException { fos.flush(); }
255    @Override
256    public void write(byte[] b, int off, int len) throws IOException {
257      try {
258        fos.write(b, off, len);
259      } catch (IOException e) {                // unexpected exception
260        throw new FSError(e);                  // assume native fs error
261      }
262    }
263    
264    @Override
265    public void write(int b) throws IOException {
266      try {
267        fos.write(b);
268      } catch (IOException e) {              // unexpected exception
269        throw new FSError(e);                // assume native fs error
270      }
271    }
272  }
273
274  @Override
275  public FSDataOutputStream append(Path f, int bufferSize,
276      Progressable progress) throws IOException {
277    if (!exists(f)) {
278      throw new FileNotFoundException("File " + f + " not found");
279    }
280    FileStatus status = getFileStatus(f);
281    if (status.isDirectory()) {
282      throw new IOException("Cannot append to a diretory (=" + f + " )");
283    }
284    return new FSDataOutputStream(new BufferedOutputStream(
285        createOutputStreamWithMode(f, true, null), bufferSize), statistics,
286        status.getLen());
287  }
288
289  @Override
290  public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
291    short replication, long blockSize, Progressable progress)
292    throws IOException {
293    return create(f, overwrite, true, bufferSize, replication, blockSize,
294        progress, null);
295  }
296
297  private FSDataOutputStream create(Path f, boolean overwrite,
298      boolean createParent, int bufferSize, short replication, long blockSize,
299      Progressable progress, FsPermission permission) throws IOException {
300    if (exists(f) && !overwrite) {
301      throw new FileAlreadyExistsException("File already exists: " + f);
302    }
303    Path parent = f.getParent();
304    if (parent != null && !mkdirs(parent)) {
305      throw new IOException("Mkdirs failed to create " + parent.toString());
306    }
307    return new FSDataOutputStream(new BufferedOutputStream(
308        createOutputStreamWithMode(f, false, permission), bufferSize),
309        statistics);
310  }
311  
312  protected OutputStream createOutputStream(Path f, boolean append) 
313      throws IOException {
314    return createOutputStreamWithMode(f, append, null);
315  }
316
317  protected OutputStream createOutputStreamWithMode(Path f, boolean append,
318      FsPermission permission) throws IOException {
319    return new LocalFSFileOutputStream(f, append, permission);
320  }
321  
322  @Override
323  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
324      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
325      Progressable progress) throws IOException {
326    if (exists(f) && !flags.contains(CreateFlag.OVERWRITE)) {
327      throw new FileAlreadyExistsException("File already exists: " + f);
328    }
329    return new FSDataOutputStream(new BufferedOutputStream(
330        createOutputStreamWithMode(f, false, permission), bufferSize),
331            statistics);
332  }
333
334  @Override
335  public FSDataOutputStream create(Path f, FsPermission permission,
336    boolean overwrite, int bufferSize, short replication, long blockSize,
337    Progressable progress) throws IOException {
338
339    FSDataOutputStream out = create(f, overwrite, true, bufferSize, replication,
340        blockSize, progress, permission);
341    return out;
342  }
343
344  @Override
345  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
346      boolean overwrite,
347      int bufferSize, short replication, long blockSize,
348      Progressable progress) throws IOException {
349    FSDataOutputStream out = create(f, overwrite, false, bufferSize, replication,
350        blockSize, progress, permission);
351    return out;
352  }
353
354  @Override
355  public boolean rename(Path src, Path dst) throws IOException {
356    // Attempt rename using Java API.
357    File srcFile = pathToFile(src);
358    File dstFile = pathToFile(dst);
359    if (srcFile.renameTo(dstFile)) {
360      return true;
361    }
362
363    // Else try POSIX style rename on Windows only
364    if (Shell.WINDOWS &&
365        handleEmptyDstDirectoryOnWindows(src, srcFile, dst, dstFile)) {
366      return true;
367    }
368
369    // The fallback behavior accomplishes the rename by a full copy.
370    if (LOG.isDebugEnabled()) {
371      LOG.debug("Falling through to a copy of " + src + " to " + dst);
372    }
373    return FileUtil.copy(this, src, this, dst, true, getConf());
374  }
375
376  @VisibleForTesting
377  public final boolean handleEmptyDstDirectoryOnWindows(Path src, File srcFile,
378      Path dst, File dstFile) throws IOException {
379
380    // Enforce POSIX rename behavior that a source directory replaces an
381    // existing destination if the destination is an empty directory. On most
382    // platforms, this is already handled by the Java API call above. Some
383    // platforms (notably Windows) do not provide this behavior, so the Java API
384    // call renameTo(dstFile) fails. Delete destination and attempt rename
385    // again.
386    if (this.exists(dst)) {
387      FileStatus sdst = this.getFileStatus(dst);
388      if (sdst.isDirectory() && dstFile.list().length == 0) {
389        if (LOG.isDebugEnabled()) {
390          LOG.debug("Deleting empty destination and renaming " + src + " to " +
391            dst);
392        }
393        if (this.delete(dst, false) && srcFile.renameTo(dstFile)) {
394          return true;
395        }
396      }
397    }
398    return false;
399  }
400
401  @Override
402  public boolean truncate(Path f, final long newLength) throws IOException {
403    FileStatus status = getFileStatus(f);
404    if(status == null) {
405      throw new FileNotFoundException("File " + f + " not found");
406    }
407    if(status.isDirectory()) {
408      throw new IOException("Cannot truncate a directory (=" + f + ")");
409    }
410    long oldLength = status.getLen();
411    if(newLength > oldLength) {
412      throw new IllegalArgumentException(
413          "Cannot truncate to a larger file size. Current size: " + oldLength +
414          ", truncate size: " + newLength + ".");
415    }
416    try (FileOutputStream out = new FileOutputStream(pathToFile(f), true)) {
417      try {
418        out.getChannel().truncate(newLength);
419      } catch(IOException e) {
420        throw new FSError(e);
421      }
422    }
423    return true;
424  }
425  
426  /**
427   * Delete the given path to a file or directory.
428   * @param p the path to delete
429   * @param recursive to delete sub-directories
430   * @return true if the file or directory and all its contents were deleted
431   * @throws IOException if p is non-empty and recursive is false 
432   */
433  @Override
434  public boolean delete(Path p, boolean recursive) throws IOException {
435    File f = pathToFile(p);
436    if (!f.exists()) {
437      //no path, return false "nothing to delete"
438      return false;
439    }
440    if (f.isFile()) {
441      return f.delete();
442    } else if (!recursive && f.isDirectory() && 
443        (FileUtil.listFiles(f).length != 0)) {
444      throw new IOException("Directory " + f.toString() + " is not empty");
445    }
446    return FileUtil.fullyDelete(f);
447  }
448 
449  /**
450   * {@inheritDoc}
451   *
452   * (<b>Note</b>: Returned list is not sorted in any given order,
453   * due to reliance on Java's {@link File#list()} API.)
454   */
455  @Override
456  public FileStatus[] listStatus(Path f) throws IOException {
457    File localf = pathToFile(f);
458    FileStatus[] results;
459
460    if (!localf.exists()) {
461      throw new FileNotFoundException("File " + f + " does not exist");
462    }
463
464    if (localf.isDirectory()) {
465      String[] names = localf.list();
466      if (names == null) {
467        if (!localf.canRead()) {
468          throw new AccessDeniedException("cannot open directory " + f +
469              ": Permission denied");
470        }
471        return null;
472      }
473      results = new FileStatus[names.length];
474      int j = 0;
475      for (int i = 0; i < names.length; i++) {
476        try {
477          // Assemble the path using the Path 3 arg constructor to make sure
478          // paths with colon are properly resolved on Linux
479          results[j] = getFileStatus(new Path(f, new Path(null, null,
480                                                          names[i])));
481          j++;
482        } catch (FileNotFoundException e) {
483          // ignore the files not found since the dir list may have have
484          // changed since the names[] list was generated.
485        }
486      }
487      if (j == names.length) {
488        return results;
489      }
490      return Arrays.copyOf(results, j);
491    }
492
493    if (!useDeprecatedFileStatus) {
494      return new FileStatus[] { getFileStatus(f) };
495    }
496    return new FileStatus[] {
497        new DeprecatedRawLocalFileStatus(localf,
498        getDefaultBlockSize(f), this) };
499  }
500  
501  protected boolean mkOneDir(File p2f) throws IOException {
502    return mkOneDirWithMode(new Path(p2f.getAbsolutePath()), p2f, null);
503  }
504
505  protected boolean mkOneDirWithMode(Path p, File p2f, FsPermission permission)
506      throws IOException {
507    if (permission == null) {
508      permission = FsPermission.getDirDefault();
509    }
510    permission = permission.applyUMask(umask);
511    if (Shell.WINDOWS && NativeIO.isAvailable()) {
512      try {
513        NativeIO.Windows.createDirectoryWithMode(p2f, permission.toShort());
514        return true;
515      } catch (IOException e) {
516        if (LOG.isDebugEnabled()) {
517          LOG.debug(String.format(
518              "NativeIO.createDirectoryWithMode error, path = %s, mode = %o",
519              p2f, permission.toShort()), e);
520        }
521        return false;
522      }
523    } else {
524      boolean b = p2f.mkdir();
525      if (b) {
526        setPermission(p, permission);
527      }
528      return b;
529    }
530  }
531
532  /**
533   * Creates the specified directory hierarchy. Does not
534   * treat existence as an error.
535   */
536  @Override
537  public boolean mkdirs(Path f) throws IOException {
538    return mkdirsWithOptionalPermission(f, null);
539  }
540
541  @Override
542  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
543    return mkdirsWithOptionalPermission(f, permission);
544  }
545
546  private boolean mkdirsWithOptionalPermission(Path f, FsPermission permission)
547      throws IOException {
548    if(f == null) {
549      throw new IllegalArgumentException("mkdirs path arg is null");
550    }
551    Path parent = f.getParent();
552    File p2f = pathToFile(f);
553    File parent2f = null;
554    if(parent != null) {
555      parent2f = pathToFile(parent);
556      if(parent2f != null && parent2f.exists() && !parent2f.isDirectory()) {
557        throw new ParentNotDirectoryException("Parent path is not a directory: "
558            + parent);
559      }
560    }
561    if (p2f.exists() && !p2f.isDirectory()) {
562      throw new FileNotFoundException("Destination exists" +
563              " and is not a directory: " + p2f.getCanonicalPath());
564    }
565    return (parent == null || parent2f.exists() || mkdirs(parent)) &&
566      (mkOneDirWithMode(f, p2f, permission) || p2f.isDirectory());
567  }
568  
569  
570  @Override
571  public Path getHomeDirectory() {
572    return this.makeQualified(new Path(System.getProperty("user.home")));
573  }
574
575  /**
576   * Set the working directory to the given directory.
577   */
578  @Override
579  public void setWorkingDirectory(Path newDir) {
580    workingDir = makeAbsolute(newDir);
581    checkPath(workingDir);
582  }
583  
584  @Override
585  public Path getWorkingDirectory() {
586    return workingDir;
587  }
588  
589  @Override
590  protected Path getInitialWorkingDirectory() {
591    return this.makeQualified(new Path(System.getProperty("user.dir")));
592  }
593
594  @Override
595  public FsStatus getStatus(Path p) throws IOException {
596    File partition = pathToFile(p == null ? new Path("/") : p);
597    //File provides getUsableSpace() and getFreeSpace()
598    //File provides no API to obtain used space, assume used = total - free
599    return new FsStatus(partition.getTotalSpace(), 
600      partition.getTotalSpace() - partition.getFreeSpace(),
601      partition.getFreeSpace());
602  }
603  
604  // In the case of the local filesystem, we can just rename the file.
605  @Override
606  public void moveFromLocalFile(Path src, Path dst) throws IOException {
607    rename(src, dst);
608  }
609  
610  // We can write output directly to the final location
611  @Override
612  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
613    throws IOException {
614    return fsOutputFile;
615  }
616  
617  // It's in the right place - nothing to do.
618  @Override
619  public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
620    throws IOException {
621  }
622  
623  @Override
624  public void close() throws IOException {
625    super.close();
626  }
627  
628  @Override
629  public String toString() {
630    return "LocalFS";
631  }
632  
633  @Override
634  public FileStatus getFileStatus(Path f) throws IOException {
635    return getFileLinkStatusInternal(f, true);
636  }
637
638  @Deprecated
639  private FileStatus deprecatedGetFileStatus(Path f) throws IOException {
640    File path = pathToFile(f);
641    if (path.exists()) {
642      return new DeprecatedRawLocalFileStatus(pathToFile(f),
643          getDefaultBlockSize(f), this);
644    } else {
645      throw new FileNotFoundException("File " + f + " does not exist");
646    }
647  }
648
649  @Deprecated
650  static class DeprecatedRawLocalFileStatus extends FileStatus {
651    /* We can add extra fields here. It breaks at least CopyFiles.FilePair().
652     * We recognize if the information is already loaded by check if
653     * onwer.equals("").
654     */
655    private boolean isPermissionLoaded() {
656      return !super.getOwner().isEmpty(); 
657    }
658
659    private static long getLastAccessTime(File f) throws IOException {
660      long accessTime;
661      try {
662        accessTime = Files.readAttributes(f.toPath(),
663            BasicFileAttributes.class).lastAccessTime().toMillis();
664      } catch (NoSuchFileException e) {
665        throw new FileNotFoundException("File " + f + " does not exist");
666      }
667      return accessTime;
668    }
669
670    DeprecatedRawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs)
671      throws IOException {
672      super(f.length(), f.isDirectory(), 1, defaultBlockSize,
673          f.lastModified(), getLastAccessTime(f),
674          null, null, null,
675          new Path(f.getPath()).makeQualified(fs.getUri(),
676            fs.getWorkingDirectory()));
677    }
678    
679    @Override
680    public FsPermission getPermission() {
681      if (!isPermissionLoaded()) {
682        loadPermissionInfo();
683      }
684      return super.getPermission();
685    }
686
687    @Override
688    public String getOwner() {
689      if (!isPermissionLoaded()) {
690        loadPermissionInfo();
691      }
692      return super.getOwner();
693    }
694
695    @Override
696    public String getGroup() {
697      if (!isPermissionLoaded()) {
698        loadPermissionInfo();
699      }
700      return super.getGroup();
701    }
702
703    /// loads permissions, owner, and group from `ls -ld`
704    private void loadPermissionInfo() {
705      IOException e = null;
706      try {
707        String output = FileUtil.execCommand(new File(getPath().toUri()), 
708            Shell.getGetPermissionCommand());
709        StringTokenizer t =
710            new StringTokenizer(output, Shell.TOKEN_SEPARATOR_REGEX);
711        //expected format
712        //-rw-------    1 username groupname ...
713        String permission = t.nextToken();
714        if (permission.length() > FsPermission.MAX_PERMISSION_LENGTH) {
715          //files with ACLs might have a '+'
716          permission = permission.substring(0,
717            FsPermission.MAX_PERMISSION_LENGTH);
718        }
719        setPermission(FsPermission.valueOf(permission));
720        t.nextToken();
721
722        String owner = t.nextToken();
723        // If on windows domain, token format is DOMAIN\\user and we want to
724        // extract only the user name
725        if (Shell.WINDOWS) {
726          int i = owner.indexOf('\\');
727          if (i != -1)
728            owner = owner.substring(i + 1);
729        }
730        setOwner(owner);
731
732        setGroup(t.nextToken());
733      } catch (Shell.ExitCodeException ioe) {
734        if (ioe.getExitCode() != 1) {
735          e = ioe;
736        } else {
737          setPermission(null);
738          setOwner(null);
739          setGroup(null);
740        }
741      } catch (IOException ioe) {
742        e = ioe;
743      } finally {
744        if (e != null) {
745          throw new RuntimeException("Error while running command to get " +
746                                     "file permissions : " + 
747                                     StringUtils.stringifyException(e));
748        }
749      }
750    }
751
752    @Override
753    public void write(DataOutput out) throws IOException {
754      if (!isPermissionLoaded()) {
755        loadPermissionInfo();
756      }
757      super.write(out);
758    }
759  }
760
761  /**
762   * Use the command chown to set owner.
763   */
764  @Override
765  public void setOwner(Path p, String username, String groupname)
766    throws IOException {
767    FileUtil.setOwner(pathToFile(p), username, groupname);
768  }
769
770  /**
771   * Use the command chmod to set permission.
772   */
773  @Override
774  public void setPermission(Path p, FsPermission permission)
775    throws IOException {
776    if (NativeIO.isAvailable()) {
777      NativeIO.POSIX.chmod(pathToFile(p).getCanonicalPath(),
778                     permission.toShort());
779    } else {
780      String perm = String.format("%04o", permission.toShort());
781      Shell.execCommand(Shell.getSetPermissionCommand(perm, false,
782        FileUtil.makeShellPath(pathToFile(p), true)));
783    }
784  }
785 
786  /**
787   * Sets the {@link Path}'s last modified time and last access time to
788   * the given valid times.
789   *
790   * @param mtime the modification time to set (only if no less than zero).
791   * @param atime the access time to set (only if no less than zero).
792   * @throws IOException if setting the times fails.
793   */
794  @Override
795  public void setTimes(Path p, long mtime, long atime) throws IOException {
796    try {
797      BasicFileAttributeView view = Files.getFileAttributeView(
798          pathToFile(p).toPath(), BasicFileAttributeView.class);
799      FileTime fmtime = (mtime >= 0) ? FileTime.fromMillis(mtime) : null;
800      FileTime fatime = (atime >= 0) ? FileTime.fromMillis(atime) : null;
801      view.setTimes(fmtime, fatime, null);
802    } catch (NoSuchFileException e) {
803      throw new FileNotFoundException("File " + p + " does not exist");
804    }
805  }
806
807  @Override
808  public boolean supportsSymlinks() {
809    return true;
810  }
811
812  @SuppressWarnings("deprecation")
813  @Override
814  public void createSymlink(Path target, Path link, boolean createParent)
815      throws IOException {
816    if (!FileSystem.areSymlinksEnabled()) {
817      throw new UnsupportedOperationException("Symlinks not supported");
818    }
819    final String targetScheme = target.toUri().getScheme();
820    if (targetScheme != null && !"file".equals(targetScheme)) {
821      throw new IOException("Unable to create symlink to non-local file "+
822                            "system: "+target.toString());
823    }
824    if (createParent) {
825      mkdirs(link.getParent());
826    }
827
828    // NB: Use createSymbolicLink in java.nio.file.Path once available
829    int result = FileUtil.symLink(target.toString(),
830        makeAbsolute(link).toString());
831    if (result != 0) {
832      throw new IOException("Error " + result + " creating symlink " +
833          link + " to " + target);
834    }
835  }
836
837  /**
838   * Return a FileStatus representing the given path. If the path refers
839   * to a symlink return a FileStatus representing the link rather than
840   * the object the link refers to.
841   */
842  @Override
843  public FileStatus getFileLinkStatus(final Path f) throws IOException {
844    FileStatus fi = getFileLinkStatusInternal(f, false);
845    // getFileLinkStatus is supposed to return a symlink with a
846    // qualified path
847    if (fi.isSymlink()) {
848      Path targetQual = FSLinkResolver.qualifySymlinkTarget(this.getUri(),
849          fi.getPath(), fi.getSymlink());
850      fi.setSymlink(targetQual);
851    }
852    return fi;
853  }
854
855  /**
856   * Public {@link FileStatus} methods delegate to this function, which in turn
857   * either call the new {@link Stat} based implementation or the deprecated
858   * methods based on platform support.
859   * 
860   * @param f Path to stat
861   * @param dereference whether to dereference the final path component if a
862   *          symlink
863   * @return FileStatus of f
864   * @throws IOException
865   */
866  private FileStatus getFileLinkStatusInternal(final Path f,
867      boolean dereference) throws IOException {
868    if (!useDeprecatedFileStatus) {
869      return getNativeFileLinkStatus(f, dereference);
870    } else if (dereference) {
871      return deprecatedGetFileStatus(f);
872    } else {
873      return deprecatedGetFileLinkStatusInternal(f);
874    }
875  }
876
877  /**
878   * Deprecated. Remains for legacy support. Should be removed when {@link Stat}
879   * gains support for Windows and other operating systems.
880   */
881  @Deprecated
882  private FileStatus deprecatedGetFileLinkStatusInternal(final Path f)
883      throws IOException {
884    String target = FileUtil.readLink(new File(f.toString()));
885
886    try {
887      FileStatus fs = getFileStatus(f);
888      // If f refers to a regular file or directory
889      if (target.isEmpty()) {
890        return fs;
891      }
892      // Otherwise f refers to a symlink
893      return new FileStatus(fs.getLen(),
894          false,
895          fs.getReplication(),
896          fs.getBlockSize(),
897          fs.getModificationTime(),
898          fs.getAccessTime(),
899          fs.getPermission(),
900          fs.getOwner(),
901          fs.getGroup(),
902          new Path(target),
903          f);
904    } catch (FileNotFoundException e) {
905      /* The exists method in the File class returns false for dangling
906       * links so we can get a FileNotFoundException for links that exist.
907       * It's also possible that we raced with a delete of the link. Use
908       * the readBasicFileAttributes method in java.nio.file.attributes
909       * when available.
910       */
911      if (!target.isEmpty()) {
912        return new FileStatus(0, false, 0, 0, 0, 0, FsPermission.getDefault(),
913            "", "", new Path(target), f);
914      }
915      // f refers to a file or directory that does not exist
916      throw e;
917    }
918  }
919  /**
920   * Calls out to platform's native stat(1) implementation to get file metadata
921   * (permissions, user, group, atime, mtime, etc). This works around the lack
922   * of lstat(2) in Java 6.
923   * 
924   *  Currently, the {@link Stat} class used to do this only supports Linux
925   *  and FreeBSD, so the old {@link #deprecatedGetFileLinkStatusInternal(Path)}
926   *  implementation (deprecated) remains further OS support is added.
927   *
928   * @param f File to stat
929   * @param dereference whether to dereference symlinks
930   * @return FileStatus of f
931   * @throws IOException
932   */
933  private FileStatus getNativeFileLinkStatus(final Path f,
934      boolean dereference) throws IOException {
935    checkPath(f);
936    Stat stat = new Stat(f, getDefaultBlockSize(f), dereference, this);
937    FileStatus status = stat.getFileStatus();
938    return status;
939  }
940
941  @Override
942  public Path getLinkTarget(Path f) throws IOException {
943    FileStatus fi = getFileLinkStatusInternal(f, false);
944    // return an unqualified symlink target
945    return fi.getSymlink();
946  }
947}