001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.ref.PhantomReference;
024import java.lang.ref.ReferenceQueue;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.security.PrivilegedExceptionAction;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.IdentityHashMap;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.ServiceConfigurationError;
040import java.util.ServiceLoader;
041import java.util.Set;
042import java.util.Stack;
043import java.util.TreeSet;
044import java.util.concurrent.atomic.AtomicLong;
045
046import org.apache.commons.logging.Log;
047import org.apache.commons.logging.LogFactory;
048import org.apache.hadoop.classification.InterfaceAudience;
049import org.apache.hadoop.classification.InterfaceStability;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.conf.Configured;
052import org.apache.hadoop.fs.Options.ChecksumOpt;
053import org.apache.hadoop.fs.Options.Rename;
054import org.apache.hadoop.fs.permission.AclEntry;
055import org.apache.hadoop.fs.permission.AclStatus;
056import org.apache.hadoop.fs.permission.FsAction;
057import org.apache.hadoop.fs.permission.FsPermission;
058import org.apache.hadoop.io.MultipleIOException;
059import org.apache.hadoop.io.Text;
060import org.apache.hadoop.net.NetUtils;
061import org.apache.hadoop.security.AccessControlException;
062import org.apache.hadoop.security.Credentials;
063import org.apache.hadoop.security.SecurityUtil;
064import org.apache.hadoop.security.UserGroupInformation;
065import org.apache.hadoop.security.token.Token;
066import org.apache.hadoop.util.ClassUtil;
067import org.apache.hadoop.util.DataChecksum;
068import org.apache.hadoop.util.Progressable;
069import org.apache.hadoop.util.ReflectionUtils;
070import org.apache.hadoop.util.ShutdownHookManager;
071import org.apache.hadoop.util.StringUtils;
072import org.apache.htrace.core.Tracer;
073import org.apache.htrace.core.TraceScope;
074
075import com.google.common.annotations.VisibleForTesting;
076
077import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
078
079/****************************************************************
080 * An abstract base class for a fairly generic filesystem.  It
081 * may be implemented as a distributed filesystem, or as a "local"
082 * one that reflects the locally-connected disk.  The local version
083 * exists for small Hadoop instances and for testing.
084 *
085 * <p>
086 *
087 * All user code that may potentially use the Hadoop Distributed
088 * File System should be written to use a FileSystem object.  The
089 * Hadoop DFS is a multi-machine system that appears as a single
090 * disk.  It's useful because of its fault tolerance and potentially
091 * very large capacity.
092 * 
093 * <p>
094 * The local implementation is {@link LocalFileSystem} and distributed
095 * implementation is DistributedFileSystem.
096 *****************************************************************/
097@InterfaceAudience.Public
098@InterfaceStability.Stable
099public abstract class FileSystem extends Configured implements Closeable {
100  public static final String FS_DEFAULT_NAME_KEY = 
101                   CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
102  public static final String DEFAULT_FS = 
103                   CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
104
105  public static final Log LOG = LogFactory.getLog(FileSystem.class);
106
107  /**
108   * Priority of the FileSystem shutdown hook.
109   */
110  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
111
112  public static final String TRASH_PREFIX = ".Trash";
113
114  /** FileSystem cache */
115  static final Cache CACHE = new Cache();
116
117  /** The key this instance is stored under in the cache. */
118  private Cache.Key key;
119
120  /** Recording statistics per a FileSystem class */
121  private static final Map<Class<? extends FileSystem>, Statistics> 
122    statisticsTable =
123      new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
124  
125  /**
126   * The statistics for this file system.
127   */
128  protected Statistics statistics;
129
130  /**
131   * A cache of files that should be deleted when filesystem is closed
132   * or the JVM is exited.
133   */
134  private Set<Path> deleteOnExit = new TreeSet<Path>();
135  
136  boolean resolveSymlinks;
137
138  /**
139   * This method adds a file system for testing so that we can find it later. It
140   * is only for testing.
141   * @param uri the uri to store it under
142   * @param conf the configuration to store it under
143   * @param fs the file system to store
144   * @throws IOException
145   */
146  static void addFileSystemForTesting(URI uri, Configuration conf,
147      FileSystem fs) throws IOException {
148    CACHE.map.put(new Cache.Key(uri, conf), fs);
149  }
150
151  /**
152   * Get a filesystem instance based on the uri, the passed
153   * configuration and the user
154   * @param uri of the filesystem
155   * @param conf the configuration to use
156   * @param user to perform the get as
157   * @return the filesystem instance
158   * @throws IOException
159   * @throws InterruptedException
160   */
161  public static FileSystem get(final URI uri, final Configuration conf,
162        final String user) throws IOException, InterruptedException {
163    String ticketCachePath =
164      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
165    UserGroupInformation ugi =
166        UserGroupInformation.getBestUGI(ticketCachePath, user);
167    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
168      @Override
169      public FileSystem run() throws IOException {
170        return get(uri, conf);
171      }
172    });
173  }
174
175  /**
176   * Returns the configured filesystem implementation.
177   * @param conf the configuration to use
178   */
179  public static FileSystem get(Configuration conf) throws IOException {
180    return get(getDefaultUri(conf), conf);
181  }
182  
183  /** Get the default filesystem URI from a configuration.
184   * @param conf the configuration to use
185   * @return the uri of the default filesystem
186   */
187  public static URI getDefaultUri(Configuration conf) {
188    return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
189  }
190
191  /** Set the default filesystem URI in a configuration.
192   * @param conf the configuration to alter
193   * @param uri the new default filesystem uri
194   */
195  public static void setDefaultUri(Configuration conf, URI uri) {
196    conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
197  }
198
199  /** Set the default filesystem URI in a configuration.
200   * @param conf the configuration to alter
201   * @param uri the new default filesystem uri
202   */
203  public static void setDefaultUri(Configuration conf, String uri) {
204    setDefaultUri(conf, URI.create(fixName(uri)));
205  }
206
207  /** Called after a new FileSystem instance is constructed.
208   * @param name a uri whose authority section names the host, port, etc.
209   *   for this FileSystem
210   * @param conf the configuration
211   */
212  public void initialize(URI name, Configuration conf) throws IOException {
213    statistics = getStatistics(name.getScheme(), getClass());    
214    resolveSymlinks = conf.getBoolean(
215        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
216        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
217  }
218
219  /**
220   * Return the protocol scheme for the FileSystem.
221   * <p/>
222   * This implementation throws an <code>UnsupportedOperationException</code>.
223   *
224   * @return the protocol scheme for the FileSystem.
225   */
226  public String getScheme() {
227    throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
228  }
229
230  /** Returns a URI whose scheme and authority identify this FileSystem.*/
231  public abstract URI getUri();
232  
233  /**
234   * Return a canonicalized form of this FileSystem's URI.
235   * 
236   * The default implementation simply calls {@link #canonicalizeUri(URI)}
237   * on the filesystem's own URI, so subclasses typically only need to
238   * implement that method.
239   *
240   * @see #canonicalizeUri(URI)
241   */
242  protected URI getCanonicalUri() {
243    return canonicalizeUri(getUri());
244  }
245  
246  /**
247   * Canonicalize the given URI.
248   * 
249   * This is filesystem-dependent, but may for example consist of
250   * canonicalizing the hostname using DNS and adding the default
251   * port if not specified.
252   * 
253   * The default implementation simply fills in the default port if
254   * not specified and if the filesystem has a default port.
255   *
256   * @return URI
257   * @see NetUtils#getCanonicalUri(URI, int)
258   */
259  protected URI canonicalizeUri(URI uri) {
260    if (uri.getPort() == -1 && getDefaultPort() > 0) {
261      // reconstruct the uri with the default port set
262      try {
263        uri = new URI(uri.getScheme(), uri.getUserInfo(),
264            uri.getHost(), getDefaultPort(),
265            uri.getPath(), uri.getQuery(), uri.getFragment());
266      } catch (URISyntaxException e) {
267        // Should never happen!
268        throw new AssertionError("Valid URI became unparseable: " +
269            uri);
270      }
271    }
272    
273    return uri;
274  }
275  
276  /**
277   * Get the default port for this file system.
278   * @return the default port or 0 if there isn't one
279   */
280  protected int getDefaultPort() {
281    return 0;
282  }
283
284  protected static FileSystem getFSofPath(final Path absOrFqPath,
285      final Configuration conf)
286      throws UnsupportedFileSystemException, IOException {
287    absOrFqPath.checkNotSchemeWithRelative();
288    absOrFqPath.checkNotRelative();
289
290    // Uses the default file system if not fully qualified
291    return get(absOrFqPath.toUri(), conf);
292  }
293
294  /**
295   * Get a canonical service name for this file system.  The token cache is
296   * the only user of the canonical service name, and uses it to lookup this
297   * filesystem's service tokens.
298   * If file system provides a token of its own then it must have a canonical
299   * name, otherwise canonical name can be null.
300   * 
301   * Default Impl: If the file system has child file systems 
302   * (such as an embedded file system) then it is assumed that the fs has no
303   * tokens of its own and hence returns a null name; otherwise a service
304   * name is built using Uri and port.
305   * 
306   * @return a service string that uniquely identifies this file system, null
307   *         if the filesystem does not implement tokens
308   * @see SecurityUtil#buildDTServiceName(URI, int) 
309   */
310  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
311  public String getCanonicalServiceName() {
312    return (getChildFileSystems() == null)
313      ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
314      : null;
315  }
316
317  /** @deprecated call #getUri() instead.*/
318  @Deprecated
319  public String getName() { return getUri().toString(); }
320
321  /** @deprecated call #get(URI,Configuration) instead. */
322  @Deprecated
323  public static FileSystem getNamed(String name, Configuration conf)
324    throws IOException {
325    return get(URI.create(fixName(name)), conf);
326  }
327  
328  /** Update old-format filesystem names, for back-compatibility.  This should
329   * eventually be replaced with a checkName() method that throws an exception
330   * for old-format names. */ 
331  private static String fixName(String name) {
332    // convert old-format name to new-format name
333    if (name.equals("local")) {         // "local" is now "file:///".
334      LOG.warn("\"local\" is a deprecated filesystem name."
335               +" Use \"file:///\" instead.");
336      name = "file:///";
337    } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
338      LOG.warn("\""+name+"\" is a deprecated filesystem name."
339               +" Use \"hdfs://"+name+"/\" instead.");
340      name = "hdfs://"+name;
341    }
342    return name;
343  }
344
345  /**
346   * Get the local file system.
347   * @param conf the configuration to configure the file system with
348   * @return a LocalFileSystem
349   */
350  public static LocalFileSystem getLocal(Configuration conf)
351    throws IOException {
352    return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
353  }
354
355  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
356   * of the URI determines a configuration property name,
357   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
358   * The entire URI is passed to the FileSystem instance's initialize method.
359   */
360  public static FileSystem get(URI uri, Configuration conf) throws IOException {
361    String scheme = uri.getScheme();
362    String authority = uri.getAuthority();
363
364    if (scheme == null && authority == null) {     // use default FS
365      return get(conf);
366    }
367
368    if (scheme != null && authority == null) {     // no authority
369      URI defaultUri = getDefaultUri(conf);
370      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
371          && defaultUri.getAuthority() != null) {  // & default has authority
372        return get(defaultUri, conf);              // return default
373      }
374    }
375    
376    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
377    if (conf.getBoolean(disableCacheName, false)) {
378      return createFileSystem(uri, conf);
379    }
380
381    return CACHE.get(uri, conf);
382  }
383
384  /**
385   * Returns the FileSystem for this URI's scheme and authority and the 
386   * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
387   * @param uri of the filesystem
388   * @param conf the configuration to use
389   * @param user to perform the get as
390   * @return filesystem instance
391   * @throws IOException
392   * @throws InterruptedException
393   */
394  public static FileSystem newInstance(final URI uri, final Configuration conf,
395      final String user) throws IOException, InterruptedException {
396    String ticketCachePath =
397      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
398    UserGroupInformation ugi =
399        UserGroupInformation.getBestUGI(ticketCachePath, user);
400    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
401      @Override
402      public FileSystem run() throws IOException {
403        return newInstance(uri,conf); 
404      }
405    });
406  }
407  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
408   * of the URI determines a configuration property name,
409   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
410   * The entire URI is passed to the FileSystem instance's initialize method.
411   * This always returns a new FileSystem object.
412   */
413  public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
414    String scheme = uri.getScheme();
415    String authority = uri.getAuthority();
416
417    if (scheme == null) {                       // no scheme: use default FS
418      return newInstance(conf);
419    }
420
421    if (authority == null) {                       // no authority
422      URI defaultUri = getDefaultUri(conf);
423      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
424          && defaultUri.getAuthority() != null) {  // & default has authority
425        return newInstance(defaultUri, conf);              // return default
426      }
427    }
428    return CACHE.getUnique(uri, conf);
429  }
430
431  /** Returns a unique configured filesystem implementation.
432   * This always returns a new FileSystem object.
433   * @param conf the configuration to use
434   */
435  public static FileSystem newInstance(Configuration conf) throws IOException {
436    return newInstance(getDefaultUri(conf), conf);
437  }
438
439  /**
440   * Get a unique local file system object
441   * @param conf the configuration to configure the file system with
442   * @return a LocalFileSystem
443   * This always returns a new FileSystem object.
444   */
445  public static LocalFileSystem newInstanceLocal(Configuration conf)
446    throws IOException {
447    return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
448  }
449
450  /**
451   * Close all cached filesystems. Be sure those filesystems are not
452   * used anymore.
453   * 
454   * @throws IOException
455   */
456  public static void closeAll() throws IOException {
457    CACHE.closeAll();
458  }
459
460  /**
461   * Close all cached filesystems for a given UGI. Be sure those filesystems 
462   * are not used anymore.
463   * @param ugi user group info to close
464   * @throws IOException
465   */
466  public static void closeAllForUGI(UserGroupInformation ugi) 
467  throws IOException {
468    CACHE.closeAll(ugi);
469  }
470
471  /** 
472   * Make sure that a path specifies a FileSystem.
473   * @param path to use
474   */
475  public Path makeQualified(Path path) {
476    checkPath(path);
477    return path.makeQualified(this.getUri(), this.getWorkingDirectory());
478  }
479    
480  /**
481   * Get a new delegation token for this file system.
482   * This is an internal method that should have been declared protected
483   * but wasn't historically.
484   * Callers should use {@link #addDelegationTokens(String, Credentials)}
485   * 
486   * @param renewer the account name that is allowed to renew the token.
487   * @return a new delegation token
488   * @throws IOException
489   */
490  @InterfaceAudience.Private()
491  public Token<?> getDelegationToken(String renewer) throws IOException {
492    return null;
493  }
494  
495  /**
496   * Obtain all delegation tokens used by this FileSystem that are not
497   * already present in the given Credentials.  Existing tokens will neither
498   * be verified as valid nor having the given renewer.  Missing tokens will
499   * be acquired and added to the given Credentials.
500   * 
501   * Default Impl: works for simple fs with its own token
502   * and also for an embedded fs whose tokens are those of its
503   * children file system (i.e. the embedded fs has not tokens of its
504   * own).
505   * 
506   * @param renewer the user allowed to renew the delegation tokens
507   * @param credentials cache in which to add new delegation tokens
508   * @return list of new delegation tokens
509   * @throws IOException
510   */
511  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
512  public Token<?>[] addDelegationTokens(
513      final String renewer, Credentials credentials) throws IOException {
514    if (credentials == null) {
515      credentials = new Credentials();
516    }
517    final List<Token<?>> tokens = new ArrayList<Token<?>>();
518    collectDelegationTokens(renewer, credentials, tokens);
519    return tokens.toArray(new Token<?>[tokens.size()]);
520  }
521  
522  /**
523   * Recursively obtain the tokens for this FileSystem and all descended
524   * FileSystems as determined by getChildFileSystems().
525   * @param renewer the user allowed to renew the delegation tokens
526   * @param credentials cache in which to add the new delegation tokens
527   * @param tokens list in which to add acquired tokens
528   * @throws IOException
529   */
530  private void collectDelegationTokens(final String renewer,
531                                       final Credentials credentials,
532                                       final List<Token<?>> tokens)
533                                           throws IOException {
534    final String serviceName = getCanonicalServiceName();
535    // Collect token of the this filesystem and then of its embedded children
536    if (serviceName != null) { // fs has token, grab it
537      final Text service = new Text(serviceName);
538      Token<?> token = credentials.getToken(service);
539      if (token == null) {
540        token = getDelegationToken(renewer);
541        if (token != null) {
542          tokens.add(token);
543          credentials.addToken(service, token);
544        }
545      }
546    }
547    // Now collect the tokens from the children
548    final FileSystem[] children = getChildFileSystems();
549    if (children != null) {
550      for (final FileSystem fs : children) {
551        fs.collectDelegationTokens(renewer, credentials, tokens);
552      }
553    }
554  }
555
556  /**
557   * Get all the immediate child FileSystems embedded in this FileSystem.
558   * It does not recurse and get grand children.  If a FileSystem
559   * has multiple child FileSystems, then it should return a unique list
560   * of those FileSystems.  Default is to return null to signify no children.
561   * 
562   * @return FileSystems used by this FileSystem
563   */
564  @InterfaceAudience.LimitedPrivate({ "HDFS" })
565  @VisibleForTesting
566  public FileSystem[] getChildFileSystems() {
567    return null;
568  }
569  
570  /** create a file with the provided permission
571   * The permission of the file is set to be the provided permission as in
572   * setPermission, not permission&~umask
573   * 
574   * It is implemented using two RPCs. It is understood that it is inefficient,
575   * but the implementation is thread-safe. The other option is to change the
576   * value of umask in configuration to be 0, but it is not thread-safe.
577   * 
578   * @param fs file system handle
579   * @param file the name of the file to be created
580   * @param permission the permission of the file
581   * @return an output stream
582   * @throws IOException
583   */
584  public static FSDataOutputStream create(FileSystem fs,
585      Path file, FsPermission permission) throws IOException {
586    // create the file with default permission
587    FSDataOutputStream out = fs.create(file);
588    // set its permission to the supplied one
589    fs.setPermission(file, permission);
590    return out;
591  }
592
593  /** create a directory with the provided permission
594   * The permission of the directory is set to be the provided permission as in
595   * setPermission, not permission&~umask
596   * 
597   * @see #create(FileSystem, Path, FsPermission)
598   * 
599   * @param fs file system handle
600   * @param dir the name of the directory to be created
601   * @param permission the permission of the directory
602   * @return true if the directory creation succeeds; false otherwise
603   * @throws IOException
604   */
605  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
606  throws IOException {
607    // create the directory using the default permission
608    boolean result = fs.mkdirs(dir);
609    // set its permission to be the supplied one
610    fs.setPermission(dir, permission);
611    return result;
612  }
613
614  ///////////////////////////////////////////////////////////////
615  // FileSystem
616  ///////////////////////////////////////////////////////////////
617
618  protected FileSystem() {
619    super(null);
620  }
621
622  /** 
623   * Check that a Path belongs to this FileSystem.
624   * @param path to check
625   */
626  protected void checkPath(Path path) {
627    URI uri = path.toUri();
628    String thatScheme = uri.getScheme();
629    if (thatScheme == null)                // fs is relative
630      return;
631    URI thisUri = getCanonicalUri();
632    String thisScheme = thisUri.getScheme();
633    //authority and scheme are not case sensitive
634    if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
635      String thisAuthority = thisUri.getAuthority();
636      String thatAuthority = uri.getAuthority();
637      if (thatAuthority == null &&                // path's authority is null
638          thisAuthority != null) {                // fs has an authority
639        URI defaultUri = getDefaultUri(getConf());
640        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
641          uri = defaultUri; // schemes match, so use this uri instead
642        } else {
643          uri = null; // can't determine auth of the path
644        }
645      }
646      if (uri != null) {
647        // canonicalize uri before comparing with this fs
648        uri = canonicalizeUri(uri);
649        thatAuthority = uri.getAuthority();
650        if (thisAuthority == thatAuthority ||       // authorities match
651            (thisAuthority != null &&
652             thisAuthority.equalsIgnoreCase(thatAuthority)))
653          return;
654      }
655    }
656    throw new IllegalArgumentException("Wrong FS: "+path+
657                                       ", expected: "+this.getUri());
658  }
659
660  /**
661   * Return an array containing hostnames, offset and size of 
662   * portions of the given file.  For a nonexistent 
663   * file or regions, null will be returned.
664   *
665   * This call is most helpful with DFS, where it returns 
666   * hostnames of machines that contain the given file.
667   *
668   * The FileSystem will simply return an elt containing 'localhost'.
669   *
670   * @param file FilesStatus to get data from
671   * @param start offset into the given file
672   * @param len length for which to get locations for
673   */
674  public BlockLocation[] getFileBlockLocations(FileStatus file, 
675      long start, long len) throws IOException {
676    if (file == null) {
677      return null;
678    }
679
680    if (start < 0 || len < 0) {
681      throw new IllegalArgumentException("Invalid start or len parameter");
682    }
683
684    if (file.getLen() <= start) {
685      return new BlockLocation[0];
686
687    }
688    String[] name = { "localhost:50010" };
689    String[] host = { "localhost" };
690    return new BlockLocation[] {
691      new BlockLocation(name, host, 0, file.getLen()) };
692  }
693 
694
695  /**
696   * Return an array containing hostnames, offset and size of 
697   * portions of the given file.  For a nonexistent 
698   * file or regions, null will be returned.
699   *
700   * This call is most helpful with DFS, where it returns 
701   * hostnames of machines that contain the given file.
702   *
703   * The FileSystem will simply return an elt containing 'localhost'.
704   *
705   * @param p path is used to identify an FS since an FS could have
706   *          another FS that it could be delegating the call to
707   * @param start offset into the given file
708   * @param len length for which to get locations for
709   */
710  public BlockLocation[] getFileBlockLocations(Path p, 
711      long start, long len) throws IOException {
712    if (p == null) {
713      throw new NullPointerException();
714    }
715    FileStatus file = getFileStatus(p);
716    return getFileBlockLocations(file, start, len);
717  }
718  
719  /**
720   * Return a set of server default configuration values
721   * @return server default configuration values
722   * @throws IOException
723   * @deprecated use {@link #getServerDefaults(Path)} instead
724   */
725  @Deprecated
726  public FsServerDefaults getServerDefaults() throws IOException {
727    Configuration conf = getConf();
728    // CRC32 is chosen as default as it is available in all 
729    // releases that support checksum.
730    // The client trash configuration is ignored.
731    return new FsServerDefaults(getDefaultBlockSize(), 
732        conf.getInt("io.bytes.per.checksum", 512), 
733        64 * 1024, 
734        getDefaultReplication(),
735        conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
736        false,
737        FS_TRASH_INTERVAL_DEFAULT,
738        DataChecksum.Type.CRC32);
739  }
740
741  /**
742   * Return a set of server default configuration values
743   * @param p path is used to identify an FS since an FS could have
744   *          another FS that it could be delegating the call to
745   * @return server default configuration values
746   * @throws IOException
747   */
748  public FsServerDefaults getServerDefaults(Path p) throws IOException {
749    return getServerDefaults();
750  }
751
752  /**
753   * Return the fully-qualified path of path f resolving the path
754   * through any symlinks or mount point
755   * @param p path to be resolved
756   * @return fully qualified path 
757   * @throws FileNotFoundException
758   */
759   public Path resolvePath(final Path p) throws IOException {
760     checkPath(p);
761     return getFileStatus(p).getPath();
762   }
763
764  /**
765   * Opens an FSDataInputStream at the indicated Path.
766   * @param f the file name to open
767   * @param bufferSize the size of the buffer to be used.
768   */
769  public abstract FSDataInputStream open(Path f, int bufferSize)
770    throws IOException;
771    
772  /**
773   * Opens an FSDataInputStream at the indicated Path.
774   * @param f the file to open
775   */
776  public FSDataInputStream open(Path f) throws IOException {
777    return open(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
778        IO_FILE_BUFFER_SIZE_DEFAULT));
779  }
780
781  /**
782   * Create an FSDataOutputStream at the indicated Path.
783   * Files are overwritten by default.
784   * @param f the file to create
785   */
786  public FSDataOutputStream create(Path f) throws IOException {
787    return create(f, true);
788  }
789
790  /**
791   * Create an FSDataOutputStream at the indicated Path.
792   * @param f the file to create
793   * @param overwrite if a file with this name already exists, then if true,
794   *   the file will be overwritten, and if false an exception will be thrown.
795   */
796  public FSDataOutputStream create(Path f, boolean overwrite)
797      throws IOException {
798    return create(f, overwrite, 
799                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
800                      IO_FILE_BUFFER_SIZE_DEFAULT),
801                  getDefaultReplication(f),
802                  getDefaultBlockSize(f));
803  }
804
805  /**
806   * Create an FSDataOutputStream at the indicated Path with write-progress
807   * reporting.
808   * Files are overwritten by default.
809   * @param f the file to create
810   * @param progress to report progress
811   */
812  public FSDataOutputStream create(Path f, Progressable progress) 
813      throws IOException {
814    return create(f, true, 
815                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
816                      IO_FILE_BUFFER_SIZE_DEFAULT),
817                  getDefaultReplication(f),
818                  getDefaultBlockSize(f), progress);
819  }
820
821  /**
822   * Create an FSDataOutputStream at the indicated Path.
823   * Files are overwritten by default.
824   * @param f the file to create
825   * @param replication the replication factor
826   */
827  public FSDataOutputStream create(Path f, short replication)
828      throws IOException {
829    return create(f, true, 
830                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
831                      IO_FILE_BUFFER_SIZE_DEFAULT),
832                  replication,
833                  getDefaultBlockSize(f));
834  }
835
836  /**
837   * Create an FSDataOutputStream at the indicated Path with write-progress
838   * reporting.
839   * Files are overwritten by default.
840   * @param f the file to create
841   * @param replication the replication factor
842   * @param progress to report progress
843   */
844  public FSDataOutputStream create(Path f, short replication, 
845      Progressable progress) throws IOException {
846    return create(f, true, 
847                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
848                      IO_FILE_BUFFER_SIZE_DEFAULT),
849                  replication, getDefaultBlockSize(f), progress);
850  }
851
852    
853  /**
854   * Create an FSDataOutputStream at the indicated Path.
855   * @param f the file name to create
856   * @param overwrite if a file with this name already exists, then if true,
857   *   the file will be overwritten, and if false an error will be thrown.
858   * @param bufferSize the size of the buffer to be used.
859   */
860  public FSDataOutputStream create(Path f, 
861                                   boolean overwrite,
862                                   int bufferSize
863                                   ) throws IOException {
864    return create(f, overwrite, bufferSize, 
865                  getDefaultReplication(f),
866                  getDefaultBlockSize(f));
867  }
868    
869  /**
870   * Create an FSDataOutputStream at the indicated Path with write-progress
871   * reporting.
872   * @param f the path of the file to open
873   * @param overwrite if a file with this name already exists, then if true,
874   *   the file will be overwritten, and if false an error will be thrown.
875   * @param bufferSize the size of the buffer to be used.
876   */
877  public FSDataOutputStream create(Path f, 
878                                   boolean overwrite,
879                                   int bufferSize,
880                                   Progressable progress
881                                   ) throws IOException {
882    return create(f, overwrite, bufferSize, 
883                  getDefaultReplication(f),
884                  getDefaultBlockSize(f), progress);
885  }
886    
887    
888  /**
889   * Create an FSDataOutputStream at the indicated Path.
890   * @param f the file name to open
891   * @param overwrite if a file with this name already exists, then if true,
892   *   the file will be overwritten, and if false an error will be thrown.
893   * @param bufferSize the size of the buffer to be used.
894   * @param replication required block replication for the file. 
895   */
896  public FSDataOutputStream create(Path f, 
897                                   boolean overwrite,
898                                   int bufferSize,
899                                   short replication,
900                                   long blockSize
901                                   ) throws IOException {
902    return create(f, overwrite, bufferSize, replication, blockSize, null);
903  }
904
905  /**
906   * Create an FSDataOutputStream at the indicated Path with write-progress
907   * reporting.
908   * @param f the file name to open
909   * @param overwrite if a file with this name already exists, then if true,
910   *   the file will be overwritten, and if false an error will be thrown.
911   * @param bufferSize the size of the buffer to be used.
912   * @param replication required block replication for the file. 
913   */
914  public FSDataOutputStream create(Path f,
915                                            boolean overwrite,
916                                            int bufferSize,
917                                            short replication,
918                                            long blockSize,
919                                            Progressable progress
920                                            ) throws IOException {
921    return this.create(f, FsPermission.getFileDefault().applyUMask(
922        FsPermission.getUMask(getConf())), overwrite, bufferSize,
923        replication, blockSize, progress);
924  }
925
926  /**
927   * Create an FSDataOutputStream at the indicated Path with write-progress
928   * reporting.
929   * @param f the file name to open
930   * @param permission
931   * @param overwrite if a file with this name already exists, then if true,
932   *   the file will be overwritten, and if false an error will be thrown.
933   * @param bufferSize the size of the buffer to be used.
934   * @param replication required block replication for the file.
935   * @param blockSize
936   * @param progress
937   * @throws IOException
938   * @see #setPermission(Path, FsPermission)
939   */
940  public abstract FSDataOutputStream create(Path f,
941      FsPermission permission,
942      boolean overwrite,
943      int bufferSize,
944      short replication,
945      long blockSize,
946      Progressable progress) throws IOException;
947  
948  /**
949   * Create an FSDataOutputStream at the indicated Path with write-progress
950   * reporting.
951   * @param f the file name to open
952   * @param permission
953   * @param flags {@link CreateFlag}s to use for this stream.
954   * @param bufferSize the size of the buffer to be used.
955   * @param replication required block replication for the file.
956   * @param blockSize
957   * @param progress
958   * @throws IOException
959   * @see #setPermission(Path, FsPermission)
960   */
961  public FSDataOutputStream create(Path f,
962      FsPermission permission,
963      EnumSet<CreateFlag> flags,
964      int bufferSize,
965      short replication,
966      long blockSize,
967      Progressable progress) throws IOException {
968    return create(f, permission, flags, bufferSize, replication,
969        blockSize, progress, null);
970  }
971  
972  /**
973   * Create an FSDataOutputStream at the indicated Path with a custom
974   * checksum option
975   * @param f the file name to open
976   * @param permission
977   * @param flags {@link CreateFlag}s to use for this stream.
978   * @param bufferSize the size of the buffer to be used.
979   * @param replication required block replication for the file.
980   * @param blockSize
981   * @param progress
982   * @param checksumOpt checksum parameter. If null, the values
983   *        found in conf will be used.
984   * @throws IOException
985   * @see #setPermission(Path, FsPermission)
986   */
987  public FSDataOutputStream create(Path f,
988      FsPermission permission,
989      EnumSet<CreateFlag> flags,
990      int bufferSize,
991      short replication,
992      long blockSize,
993      Progressable progress,
994      ChecksumOpt checksumOpt) throws IOException {
995    // Checksum options are ignored by default. The file systems that
996    // implement checksum need to override this method. The full
997    // support is currently only available in DFS.
998    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
999        bufferSize, replication, blockSize, progress);
1000  }
1001
1002  /*.
1003   * This create has been added to support the FileContext that processes
1004   * the permission
1005   * with umask before calling this method.
1006   * This a temporary method added to support the transition from FileSystem
1007   * to FileContext for user applications.
1008   */
1009  @Deprecated
1010  protected FSDataOutputStream primitiveCreate(Path f,
1011     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
1012     short replication, long blockSize, Progressable progress,
1013     ChecksumOpt checksumOpt) throws IOException {
1014
1015    boolean pathExists = exists(f);
1016    CreateFlag.validate(f, pathExists, flag);
1017    
1018    // Default impl  assumes that permissions do not matter and 
1019    // nor does the bytesPerChecksum  hence
1020    // calling the regular create is good enough.
1021    // FSs that implement permissions should override this.
1022
1023    if (pathExists && flag.contains(CreateFlag.APPEND)) {
1024      return append(f, bufferSize, progress);
1025    }
1026    
1027    return this.create(f, absolutePermission,
1028        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1029        blockSize, progress);
1030  }
1031  
1032  /**
1033   * This version of the mkdirs method assumes that the permission is absolute.
1034   * It has been added to support the FileContext that processes the permission
1035   * with umask before calling this method.
1036   * This a temporary method added to support the transition from FileSystem
1037   * to FileContext for user applications.
1038   */
1039  @Deprecated
1040  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1041    throws IOException {
1042    // Default impl is to assume that permissions do not matter and hence
1043    // calling the regular mkdirs is good enough.
1044    // FSs that implement permissions should override this.
1045   return this.mkdirs(f, absolutePermission);
1046  }
1047
1048
1049  /**
1050   * This version of the mkdirs method assumes that the permission is absolute.
1051   * It has been added to support the FileContext that processes the permission
1052   * with umask before calling this method.
1053   * This a temporary method added to support the transition from FileSystem
1054   * to FileContext for user applications.
1055   */
1056  @Deprecated
1057  protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1058                    boolean createParent)
1059    throws IOException {
1060    
1061    if (!createParent) { // parent must exist.
1062      // since the this.mkdirs makes parent dirs automatically
1063      // we must throw exception if parent does not exist.
1064      final FileStatus stat = getFileStatus(f.getParent());
1065      if (stat == null) {
1066        throw new FileNotFoundException("Missing parent:" + f);
1067      }
1068      if (!stat.isDirectory()) {
1069        throw new ParentNotDirectoryException("parent is not a dir");
1070      }
1071      // parent does exist - go ahead with mkdir of leaf
1072    }
1073    // Default impl is to assume that permissions do not matter and hence
1074    // calling the regular mkdirs is good enough.
1075    // FSs that implement permissions should override this.
1076    if (!this.mkdirs(f, absolutePermission)) {
1077      throw new IOException("mkdir of "+ f + " failed");
1078    }
1079  }
1080
1081  /**
1082   * Opens an FSDataOutputStream at the indicated Path with write-progress
1083   * reporting. Same as create(), except fails if parent directory doesn't
1084   * already exist.
1085   * @param f the file name to open
1086   * @param overwrite if a file with this name already exists, then if true,
1087   * the file will be overwritten, and if false an error will be thrown.
1088   * @param bufferSize the size of the buffer to be used.
1089   * @param replication required block replication for the file.
1090   * @param blockSize
1091   * @param progress
1092   * @throws IOException
1093   * @see #setPermission(Path, FsPermission)
1094   */
1095  public FSDataOutputStream createNonRecursive(Path f,
1096      boolean overwrite,
1097      int bufferSize, short replication, long blockSize,
1098      Progressable progress) throws IOException {
1099    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1100        overwrite, bufferSize, replication, blockSize, progress);
1101  }
1102
1103  /**
1104   * Opens an FSDataOutputStream at the indicated Path with write-progress
1105   * reporting. Same as create(), except fails if parent directory doesn't
1106   * already exist.
1107   * @param f the file name to open
1108   * @param permission
1109   * @param overwrite if a file with this name already exists, then if true,
1110   * the file will be overwritten, and if false an error will be thrown.
1111   * @param bufferSize the size of the buffer to be used.
1112   * @param replication required block replication for the file.
1113   * @param blockSize
1114   * @param progress
1115   * @throws IOException
1116   * @see #setPermission(Path, FsPermission)
1117   */
1118   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1119       boolean overwrite, int bufferSize, short replication, long blockSize,
1120       Progressable progress) throws IOException {
1121     return createNonRecursive(f, permission,
1122         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1123             : EnumSet.of(CreateFlag.CREATE), bufferSize,
1124             replication, blockSize, progress);
1125   }
1126
1127   /**
1128    * Opens an FSDataOutputStream at the indicated Path with write-progress
1129    * reporting. Same as create(), except fails if parent directory doesn't
1130    * already exist.
1131    * @param f the file name to open
1132    * @param permission
1133    * @param flags {@link CreateFlag}s to use for this stream.
1134    * @param bufferSize the size of the buffer to be used.
1135    * @param replication required block replication for the file.
1136    * @param blockSize
1137    * @param progress
1138    * @throws IOException
1139    * @see #setPermission(Path, FsPermission)
1140    */
1141    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1142        EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1143        Progressable progress) throws IOException {
1144      throw new IOException("createNonRecursive unsupported for this filesystem "
1145          + this.getClass());
1146    }
1147
1148  /**
1149   * Creates the given Path as a brand-new zero-length file.  If
1150   * create fails, or if it already existed, return false.
1151   *
1152   * @param f path to use for create
1153   */
1154  public boolean createNewFile(Path f) throws IOException {
1155    if (exists(f)) {
1156      return false;
1157    } else {
1158      create(f, false, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
1159          IO_FILE_BUFFER_SIZE_DEFAULT)).close();
1160      return true;
1161    }
1162  }
1163
1164  /**
1165   * Append to an existing file (optional operation).
1166   * Same as append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
1167   *     IO_FILE_BUFFER_SIZE_DEFAULT), null)
1168   * @param f the existing file to be appended.
1169   * @throws IOException
1170   */
1171  public FSDataOutputStream append(Path f) throws IOException {
1172    return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
1173        IO_FILE_BUFFER_SIZE_DEFAULT), null);
1174  }
1175  /**
1176   * Append to an existing file (optional operation).
1177   * Same as append(f, bufferSize, null).
1178   * @param f the existing file to be appended.
1179   * @param bufferSize the size of the buffer to be used.
1180   * @throws IOException
1181   */
1182  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1183    return append(f, bufferSize, null);
1184  }
1185
1186  /**
1187   * Append to an existing file (optional operation).
1188   * @param f the existing file to be appended.
1189   * @param bufferSize the size of the buffer to be used.
1190   * @param progress for reporting progress if it is not null.
1191   * @throws IOException
1192   */
1193  public abstract FSDataOutputStream append(Path f, int bufferSize,
1194      Progressable progress) throws IOException;
1195
1196  /**
1197   * Concat existing files together.
1198   * @param trg the path to the target destination.
1199   * @param psrcs the paths to the sources to use for the concatenation.
1200   * @throws IOException
1201   */
1202  public void concat(final Path trg, final Path [] psrcs) throws IOException {
1203    throw new UnsupportedOperationException("Not implemented by the " + 
1204        getClass().getSimpleName() + " FileSystem implementation");
1205  }
1206
1207 /**
1208   * Get replication.
1209   * 
1210   * @deprecated Use getFileStatus() instead
1211   * @param src file name
1212   * @return file replication
1213   * @throws IOException
1214   */ 
1215  @Deprecated
1216  public short getReplication(Path src) throws IOException {
1217    return getFileStatus(src).getReplication();
1218  }
1219
1220  /**
1221   * Set replication for an existing file.
1222   * 
1223   * @param src file name
1224   * @param replication new replication
1225   * @throws IOException
1226   * @return true if successful;
1227   *         false if file does not exist or is a directory
1228   */
1229  public boolean setReplication(Path src, short replication)
1230    throws IOException {
1231    return true;
1232  }
1233
1234  /**
1235   * Renames Path src to Path dst.  Can take place on local fs
1236   * or remote DFS.
1237   * @param src path to be renamed
1238   * @param dst new path after rename
1239   * @throws IOException on failure
1240   * @return true if rename is successful
1241   */
1242  public abstract boolean rename(Path src, Path dst) throws IOException;
1243
1244  /**
1245   * Renames Path src to Path dst
1246   * <ul>
1247   * <li
1248   * <li>Fails if src is a file and dst is a directory.
1249   * <li>Fails if src is a directory and dst is a file.
1250   * <li>Fails if the parent of dst does not exist or is a file.
1251   * </ul>
1252   * <p>
1253   * If OVERWRITE option is not passed as an argument, rename fails
1254   * if the dst already exists.
1255   * <p>
1256   * If OVERWRITE option is passed as an argument, rename overwrites
1257   * the dst if it is a file or an empty directory. Rename fails if dst is
1258   * a non-empty directory.
1259   * <p>
1260   * Note that atomicity of rename is dependent on the file system
1261   * implementation. Please refer to the file system documentation for
1262   * details. This default implementation is non atomic.
1263   * <p>
1264   * This method is deprecated since it is a temporary method added to 
1265   * support the transition from FileSystem to FileContext for user 
1266   * applications.
1267   * 
1268   * @param src path to be renamed
1269   * @param dst new path after rename
1270   * @throws IOException on failure
1271   */
1272  @Deprecated
1273  protected void rename(final Path src, final Path dst,
1274      final Rename... options) throws IOException {
1275    // Default implementation
1276    final FileStatus srcStatus = getFileLinkStatus(src);
1277    if (srcStatus == null) {
1278      throw new FileNotFoundException("rename source " + src + " not found.");
1279    }
1280
1281    boolean overwrite = false;
1282    if (null != options) {
1283      for (Rename option : options) {
1284        if (option == Rename.OVERWRITE) {
1285          overwrite = true;
1286        }
1287      }
1288    }
1289
1290    FileStatus dstStatus;
1291    try {
1292      dstStatus = getFileLinkStatus(dst);
1293    } catch (IOException e) {
1294      dstStatus = null;
1295    }
1296    if (dstStatus != null) {
1297      if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1298        throw new IOException("Source " + src + " Destination " + dst
1299            + " both should be either file or directory");
1300      }
1301      if (!overwrite) {
1302        throw new FileAlreadyExistsException("rename destination " + dst
1303            + " already exists.");
1304      }
1305      // Delete the destination that is a file or an empty directory
1306      if (dstStatus.isDirectory()) {
1307        FileStatus[] list = listStatus(dst);
1308        if (list != null && list.length != 0) {
1309          throw new IOException(
1310              "rename cannot overwrite non empty destination directory " + dst);
1311        }
1312      }
1313      delete(dst, false);
1314    } else {
1315      final Path parent = dst.getParent();
1316      final FileStatus parentStatus = getFileStatus(parent);
1317      if (parentStatus == null) {
1318        throw new FileNotFoundException("rename destination parent " + parent
1319            + " not found.");
1320      }
1321      if (!parentStatus.isDirectory()) {
1322        throw new ParentNotDirectoryException("rename destination parent " + parent
1323            + " is a file.");
1324      }
1325    }
1326    if (!rename(src, dst)) {
1327      throw new IOException("rename from " + src + " to " + dst + " failed.");
1328    }
1329  }
1330
1331  /**
1332   * Truncate the file in the indicated path to the indicated size.
1333   * <ul>
1334   * <li>Fails if path is a directory.
1335   * <li>Fails if path does not exist.
1336   * <li>Fails if path is not closed.
1337   * <li>Fails if new size is greater than current size.
1338   * </ul>
1339   * @param f The path to the file to be truncated
1340   * @param newLength The size the file is to be truncated to
1341   *
1342   * @return <code>true</code> if the file has been truncated to the desired
1343   * <code>newLength</code> and is immediately available to be reused for
1344   * write operations such as <code>append</code>, or
1345   * <code>false</code> if a background process of adjusting the length of
1346   * the last block has been started, and clients should wait for it to
1347   * complete before proceeding with further file updates.
1348   */
1349  public boolean truncate(Path f, long newLength) throws IOException {
1350    throw new UnsupportedOperationException("Not implemented by the " +
1351        getClass().getSimpleName() + " FileSystem implementation");
1352  }
1353  
1354  /**
1355   * Delete a file 
1356   * @deprecated Use {@link #delete(Path, boolean)} instead.
1357   */
1358  @Deprecated
1359  public boolean delete(Path f) throws IOException {
1360    return delete(f, true);
1361  }
1362  
1363  /** Delete a file.
1364   *
1365   * @param f the path to delete.
1366   * @param recursive if path is a directory and set to 
1367   * true, the directory is deleted else throws an exception. In
1368   * case of a file the recursive can be set to either true or false. 
1369   * @return  true if delete is successful else false. 
1370   * @throws IOException
1371   */
1372  public abstract boolean delete(Path f, boolean recursive) throws IOException;
1373
1374  /**
1375   * Mark a path to be deleted when FileSystem is closed.
1376   * When the JVM shuts down,
1377   * all FileSystem objects will be closed automatically.
1378   * Then,
1379   * the marked path will be deleted as a result of closing the FileSystem.
1380   *
1381   * The path has to exist in the file system.
1382   * 
1383   * @param f the path to delete.
1384   * @return  true if deleteOnExit is successful, otherwise false.
1385   * @throws IOException
1386   */
1387  public boolean deleteOnExit(Path f) throws IOException {
1388    if (!exists(f)) {
1389      return false;
1390    }
1391    synchronized (deleteOnExit) {
1392      deleteOnExit.add(f);
1393    }
1394    return true;
1395  }
1396  
1397  /**
1398   * Cancel the deletion of the path when the FileSystem is closed
1399   * @param f the path to cancel deletion
1400   */
1401  public boolean cancelDeleteOnExit(Path f) {
1402    synchronized (deleteOnExit) {
1403      return deleteOnExit.remove(f);
1404    }
1405  }
1406
1407  /**
1408   * Delete all files that were marked as delete-on-exit. This recursively
1409   * deletes all files in the specified paths.
1410   */
1411  protected void processDeleteOnExit() {
1412    synchronized (deleteOnExit) {
1413      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1414        Path path = iter.next();
1415        try {
1416          if (exists(path)) {
1417            delete(path, true);
1418          }
1419        }
1420        catch (IOException e) {
1421          LOG.info("Ignoring failure to deleteOnExit for path " + path);
1422        }
1423        iter.remove();
1424      }
1425    }
1426  }
1427  
1428  /** Check if exists.
1429   * @param f source file
1430   */
1431  public boolean exists(Path f) throws IOException {
1432    try {
1433      return getFileStatus(f) != null;
1434    } catch (FileNotFoundException e) {
1435      return false;
1436    }
1437  }
1438
1439  /** True iff the named path is a directory.
1440   * Note: Avoid using this method. Instead reuse the FileStatus 
1441   * returned by getFileStatus() or listStatus() methods.
1442   * @param f path to check
1443   */
1444  public boolean isDirectory(Path f) throws IOException {
1445    try {
1446      return getFileStatus(f).isDirectory();
1447    } catch (FileNotFoundException e) {
1448      return false;               // f does not exist
1449    }
1450  }
1451
1452  /** True iff the named path is a regular file.
1453   * Note: Avoid using this method. Instead reuse the FileStatus 
1454   * returned by getFileStatus() or listStatus() methods.
1455   * @param f path to check
1456   */
1457  public boolean isFile(Path f) throws IOException {
1458    try {
1459      return getFileStatus(f).isFile();
1460    } catch (FileNotFoundException e) {
1461      return false;               // f does not exist
1462    }
1463  }
1464  
1465  /** The number of bytes in a file. */
1466  /** @deprecated Use getFileStatus() instead */
1467  @Deprecated
1468  public long getLength(Path f) throws IOException {
1469    return getFileStatus(f).getLen();
1470  }
1471    
1472  /** Return the {@link ContentSummary} of a given {@link Path}.
1473  * @param f path to use
1474  */
1475  public ContentSummary getContentSummary(Path f) throws IOException {
1476    FileStatus status = getFileStatus(f);
1477    if (status.isFile()) {
1478      // f is a file
1479      long length = status.getLen();
1480      return new ContentSummary.Builder().length(length).
1481          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1482    }
1483    // f is a directory
1484    long[] summary = {0, 0, 1};
1485    for(FileStatus s : listStatus(f)) {
1486      long length = s.getLen();
1487      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1488          new ContentSummary.Builder().length(length).
1489          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1490      summary[0] += c.getLength();
1491      summary[1] += c.getFileCount();
1492      summary[2] += c.getDirectoryCount();
1493    }
1494    return new ContentSummary.Builder().length(summary[0]).
1495        fileCount(summary[1]).directoryCount(summary[2]).
1496        spaceConsumed(summary[0]).build();
1497  }
1498
1499  /** Return the {@link QuotaUsage} of a given {@link Path}.
1500   * @param f path to use
1501   */
1502  public QuotaUsage getQuotaUsage(Path f) throws IOException {
1503    return getContentSummary(f);
1504  }
1505
1506  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1507    @Override
1508    public boolean accept(Path file) {
1509      return true;
1510    }
1511  };
1512    
1513  /**
1514   * List the statuses of the files/directories in the given path if the path is
1515   * a directory.
1516   * <p>
1517   * Does not guarantee to return the List of files/directories status in a
1518   * sorted order.
1519   * @param f given path
1520   * @return the statuses of the files/directories in the given patch
1521   * @throws FileNotFoundException when the path does not exist;
1522   *         IOException see specific implementation
1523   */
1524  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1525                                                         IOException;
1526    
1527  /*
1528   * Filter files/directories in the given path using the user-supplied path
1529   * filter. Results are added to the given array <code>results</code>.
1530   */
1531  private void listStatus(ArrayList<FileStatus> results, Path f,
1532      PathFilter filter) throws FileNotFoundException, IOException {
1533    FileStatus listing[] = listStatus(f);
1534    if (listing == null) {
1535      throw new IOException("Error accessing " + f);
1536    }
1537
1538    for (int i = 0; i < listing.length; i++) {
1539      if (filter.accept(listing[i].getPath())) {
1540        results.add(listing[i]);
1541      }
1542    }
1543  }
1544
1545  /**
1546   * @return an iterator over the corrupt files under the given path
1547   * (may contain duplicates if a file has more than one corrupt block)
1548   * @throws IOException
1549   */
1550  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1551    throws IOException {
1552    throw new UnsupportedOperationException(getClass().getCanonicalName() +
1553                                            " does not support" +
1554                                            " listCorruptFileBlocks");
1555  }
1556
1557  /**
1558   * Filter files/directories in the given path using the user-supplied path
1559   * filter.
1560   * <p>
1561   * Does not guarantee to return the List of files/directories status in a
1562   * sorted order.
1563   * 
1564   * @param f
1565   *          a path name
1566   * @param filter
1567   *          the user-supplied path filter
1568   * @return an array of FileStatus objects for the files under the given path
1569   *         after applying the filter
1570   * @throws FileNotFoundException when the path does not exist;
1571   *         IOException see specific implementation   
1572   */
1573  public FileStatus[] listStatus(Path f, PathFilter filter) 
1574                                   throws FileNotFoundException, IOException {
1575    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1576    listStatus(results, f, filter);
1577    return results.toArray(new FileStatus[results.size()]);
1578  }
1579
1580  /**
1581   * Filter files/directories in the given list of paths using default
1582   * path filter.
1583   * <p>
1584   * Does not guarantee to return the List of files/directories status in a
1585   * sorted order.
1586   * 
1587   * @param files
1588   *          a list of paths
1589   * @return a list of statuses for the files under the given paths after
1590   *         applying the filter default Path filter
1591   * @throws FileNotFoundException when the path does not exist;
1592   *         IOException see specific implementation
1593   */
1594  public FileStatus[] listStatus(Path[] files)
1595      throws FileNotFoundException, IOException {
1596    return listStatus(files, DEFAULT_FILTER);
1597  }
1598
1599  /**
1600   * Filter files/directories in the given list of paths using user-supplied
1601   * path filter.
1602   * <p>
1603   * Does not guarantee to return the List of files/directories status in a
1604   * sorted order.
1605   * 
1606   * @param files
1607   *          a list of paths
1608   * @param filter
1609   *          the user-supplied path filter
1610   * @return a list of statuses for the files under the given paths after
1611   *         applying the filter
1612   * @throws FileNotFoundException when the path does not exist;
1613   *         IOException see specific implementation
1614   */
1615  public FileStatus[] listStatus(Path[] files, PathFilter filter)
1616      throws FileNotFoundException, IOException {
1617    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1618    for (int i = 0; i < files.length; i++) {
1619      listStatus(results, files[i], filter);
1620    }
1621    return results.toArray(new FileStatus[results.size()]);
1622  }
1623
1624  /**
1625   * <p>Return all the files that match filePattern and are not checksum
1626   * files. Results are sorted by their names.
1627   * 
1628   * <p>
1629   * A filename pattern is composed of <i>regular</i> characters and
1630   * <i>special pattern matching</i> characters, which are:
1631   *
1632   * <dl>
1633   *  <dd>
1634   *   <dl>
1635   *    <p>
1636   *    <dt> <tt> ? </tt>
1637   *    <dd> Matches any single character.
1638   *
1639   *    <p>
1640   *    <dt> <tt> * </tt>
1641   *    <dd> Matches zero or more characters.
1642   *
1643   *    <p>
1644   *    <dt> <tt> [<i>abc</i>] </tt>
1645   *    <dd> Matches a single character from character set
1646   *     <tt>{<i>a,b,c</i>}</tt>.
1647   *
1648   *    <p>
1649   *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1650   *    <dd> Matches a single character from the character range
1651   *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1652   *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1653   *
1654   *    <p>
1655   *    <dt> <tt> [^<i>a</i>] </tt>
1656   *    <dd> Matches a single character that is not from character set or range
1657   *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1658   *     immediately to the right of the opening bracket.
1659   *
1660   *    <p>
1661   *    <dt> <tt> \<i>c</i> </tt>
1662   *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1663   *
1664   *    <p>
1665   *    <dt> <tt> {ab,cd} </tt>
1666   *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1667   *    
1668   *    <p>
1669   *    <dt> <tt> {ab,c{de,fh}} </tt>
1670   *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1671   *
1672   *   </dl>
1673   *  </dd>
1674   * </dl>
1675   *
1676   * @param pathPattern a regular expression specifying a pth pattern
1677
1678   * @return an array of paths that match the path pattern
1679   * @throws IOException
1680   */
1681  public FileStatus[] globStatus(Path pathPattern) throws IOException {
1682    return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1683  }
1684  
1685  /**
1686   * Return an array of FileStatus objects whose path names match
1687   * {@code pathPattern} and is accepted by the user-supplied path filter.
1688   * Results are sorted by their path names.
1689   * 
1690   * @param pathPattern a regular expression specifying the path pattern
1691   * @param filter a user-supplied path filter
1692   * @return null if {@code pathPattern} has no glob and the path does not exist
1693   *         an empty array if {@code pathPattern} has a glob and no path
1694   *         matches it else an array of {@link FileStatus} objects matching the
1695   *         pattern
1696   * @throws IOException if any I/O error occurs when fetching file status
1697   */
1698  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1699      throws IOException {
1700    return new Globber(this, pathPattern, filter).glob();
1701  }
1702  
1703  /**
1704   * List the statuses of the files/directories in the given path if the path is
1705   * a directory. 
1706   * Return the file's status and block locations If the path is a file.
1707   * 
1708   * If a returned status is a file, it contains the file's block locations.
1709   * 
1710   * @param f is the path
1711   *
1712   * @return an iterator that traverses statuses of the files/directories 
1713   *         in the given path
1714   *
1715   * @throws FileNotFoundException If <code>f</code> does not exist
1716   * @throws IOException If an I/O error occurred
1717   */
1718  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1719  throws FileNotFoundException, IOException {
1720    return listLocatedStatus(f, DEFAULT_FILTER);
1721  }
1722
1723  /**
1724   * Listing a directory
1725   * The returned results include its block location if it is a file
1726   * The results are filtered by the given path filter
1727   * @param f a path
1728   * @param filter a path filter
1729   * @return an iterator that traverses statuses of the files/directories 
1730   *         in the given path
1731   * @throws FileNotFoundException if <code>f</code> does not exist
1732   * @throws IOException if any I/O error occurred
1733   */
1734  protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1735      final PathFilter filter)
1736  throws FileNotFoundException, IOException {
1737    return new RemoteIterator<LocatedFileStatus>() {
1738      private final FileStatus[] stats = listStatus(f, filter);
1739      private int i = 0;
1740
1741      @Override
1742      public boolean hasNext() {
1743        return i<stats.length;
1744      }
1745
1746      @Override
1747      public LocatedFileStatus next() throws IOException {
1748        if (!hasNext()) {
1749          throw new NoSuchElementException("No more entries in " + f);
1750        }
1751        FileStatus result = stats[i++];
1752        // for files, use getBlockLocations(FileStatus, int, int) to avoid
1753        // calling getFileStatus(Path) to load the FileStatus again
1754        BlockLocation[] locs = result.isFile() ?
1755            getFileBlockLocations(result, 0, result.getLen()) :
1756            null;
1757        return new LocatedFileStatus(result, locs);
1758      }
1759    };
1760  }
1761
1762  /**
1763   * Returns a remote iterator so that followup calls are made on demand
1764   * while consuming the entries. Each file system implementation should
1765   * override this method and provide a more efficient implementation, if
1766   * possible. 
1767   * Does not guarantee to return the iterator that traverses statuses
1768   * of the files in a sorted order.
1769   *
1770   * @param p target path
1771   * @return remote iterator
1772   */
1773  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
1774  throws FileNotFoundException, IOException {
1775    return new RemoteIterator<FileStatus>() {
1776      private final FileStatus[] stats = listStatus(p);
1777      private int i = 0;
1778
1779      @Override
1780      public boolean hasNext() {
1781        return i<stats.length;
1782      }
1783
1784      @Override
1785      public FileStatus next() throws IOException {
1786        if (!hasNext()) {
1787          throw new NoSuchElementException("No more entry in " + p);
1788        }
1789        return stats[i++];
1790      }
1791    };
1792  }
1793
1794  /**
1795   * List the statuses and block locations of the files in the given path.
1796   * Does not guarantee to return the iterator that traverses statuses
1797   * of the files in a sorted order.
1798   * 
1799   * If the path is a directory, 
1800   *   if recursive is false, returns files in the directory;
1801   *   if recursive is true, return files in the subtree rooted at the path.
1802   * If the path is a file, return the file's status and block locations.
1803   * 
1804   * @param f is the path
1805   * @param recursive if the subdirectories need to be traversed recursively
1806   *
1807   * @return an iterator that traverses statuses of the files
1808   *
1809   * @throws FileNotFoundException when the path does not exist;
1810   *         IOException see specific implementation
1811   */
1812  public RemoteIterator<LocatedFileStatus> listFiles(
1813      final Path f, final boolean recursive)
1814  throws FileNotFoundException, IOException {
1815    return new RemoteIterator<LocatedFileStatus>() {
1816      private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1817        new Stack<RemoteIterator<LocatedFileStatus>>();
1818      private RemoteIterator<LocatedFileStatus> curItor =
1819        listLocatedStatus(f);
1820      private LocatedFileStatus curFile;
1821     
1822      @Override
1823      public boolean hasNext() throws IOException {
1824        while (curFile == null) {
1825          if (curItor.hasNext()) {
1826            handleFileStat(curItor.next());
1827          } else if (!itors.empty()) {
1828            curItor = itors.pop();
1829          } else {
1830            return false;
1831          }
1832        }
1833        return true;
1834      }
1835
1836      /**
1837       * Process the input stat.
1838       * If it is a file, return the file stat.
1839       * If it is a directory, traverse the directory if recursive is true;
1840       * ignore it if recursive is false.
1841       * @param stat input status
1842       * @throws IOException if any IO error occurs
1843       */
1844      private void handleFileStat(LocatedFileStatus stat) throws IOException {
1845        if (stat.isFile()) { // file
1846          curFile = stat;
1847        } else if (recursive) { // directory
1848          itors.push(curItor);
1849          curItor = listLocatedStatus(stat.getPath());
1850        }
1851      }
1852
1853      @Override
1854      public LocatedFileStatus next() throws IOException {
1855        if (hasNext()) {
1856          LocatedFileStatus result = curFile;
1857          curFile = null;
1858          return result;
1859        } 
1860        throw new java.util.NoSuchElementException("No more entry in " + f);
1861      }
1862    };
1863  }
1864  
1865  /** Return the current user's home directory in this filesystem.
1866   * The default implementation returns "/user/$USER/".
1867   */
1868  public Path getHomeDirectory() {
1869    return this.makeQualified(
1870        new Path("/user/"+System.getProperty("user.name")));
1871  }
1872
1873
1874  /**
1875   * Set the current working directory for the given file system. All relative
1876   * paths will be resolved relative to it.
1877   * 
1878   * @param new_dir
1879   */
1880  public abstract void setWorkingDirectory(Path new_dir);
1881    
1882  /**
1883   * Get the current working directory for the given file system
1884   * @return the directory pathname
1885   */
1886  public abstract Path getWorkingDirectory();
1887  
1888  
1889  /**
1890   * Note: with the new FilesContext class, getWorkingDirectory()
1891   * will be removed. 
1892   * The working directory is implemented in FilesContext.
1893   * 
1894   * Some file systems like LocalFileSystem have an initial workingDir
1895   * that we use as the starting workingDir. For other file systems
1896   * like HDFS there is no built in notion of an initial workingDir.
1897   * 
1898   * @return if there is built in notion of workingDir then it
1899   * is returned; else a null is returned.
1900   */
1901  protected Path getInitialWorkingDirectory() {
1902    return null;
1903  }
1904
1905  /**
1906   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1907   */
1908  public boolean mkdirs(Path f) throws IOException {
1909    return mkdirs(f, FsPermission.getDirDefault());
1910  }
1911
1912  /**
1913   * Make the given file and all non-existent parents into
1914   * directories. Has the semantics of Unix 'mkdir -p'.
1915   * Existence of the directory hierarchy is not an error.
1916   * @param f path to create
1917   * @param permission to apply to f
1918   */
1919  public abstract boolean mkdirs(Path f, FsPermission permission
1920      ) throws IOException;
1921
1922  /**
1923   * The src file is on the local disk.  Add it to FS at
1924   * the given dst name and the source is kept intact afterwards
1925   * @param src path
1926   * @param dst path
1927   */
1928  public void copyFromLocalFile(Path src, Path dst)
1929    throws IOException {
1930    copyFromLocalFile(false, src, dst);
1931  }
1932
1933  /**
1934   * The src files is on the local disk.  Add it to FS at
1935   * the given dst name, removing the source afterwards.
1936   * @param srcs path
1937   * @param dst path
1938   */
1939  public void moveFromLocalFile(Path[] srcs, Path dst)
1940    throws IOException {
1941    copyFromLocalFile(true, true, srcs, dst);
1942  }
1943
1944  /**
1945   * The src file is on the local disk.  Add it to FS at
1946   * the given dst name, removing the source afterwards.
1947   * @param src path
1948   * @param dst path
1949   */
1950  public void moveFromLocalFile(Path src, Path dst)
1951    throws IOException {
1952    copyFromLocalFile(true, src, dst);
1953  }
1954
1955  /**
1956   * The src file is on the local disk.  Add it to FS at
1957   * the given dst name.
1958   * delSrc indicates if the source should be removed
1959   * @param delSrc whether to delete the src
1960   * @param src path
1961   * @param dst path
1962   */
1963  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1964    throws IOException {
1965    copyFromLocalFile(delSrc, true, src, dst);
1966  }
1967  
1968  /**
1969   * The src files are on the local disk.  Add it to FS at
1970   * the given dst name.
1971   * delSrc indicates if the source should be removed
1972   * @param delSrc whether to delete the src
1973   * @param overwrite whether to overwrite an existing file
1974   * @param srcs array of paths which are source
1975   * @param dst path
1976   */
1977  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1978                                Path[] srcs, Path dst)
1979    throws IOException {
1980    Configuration conf = getConf();
1981    FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1982  }
1983  
1984  /**
1985   * The src file is on the local disk.  Add it to FS at
1986   * the given dst name.
1987   * delSrc indicates if the source should be removed
1988   * @param delSrc whether to delete the src
1989   * @param overwrite whether to overwrite an existing file
1990   * @param src path
1991   * @param dst path
1992   */
1993  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1994                                Path src, Path dst)
1995    throws IOException {
1996    Configuration conf = getConf();
1997    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1998  }
1999    
2000  /**
2001   * The src file is under FS, and the dst is on the local disk.
2002   * Copy it from FS control to the local dst name.
2003   * @param src path
2004   * @param dst path
2005   */
2006  public void copyToLocalFile(Path src, Path dst) throws IOException {
2007    copyToLocalFile(false, src, dst);
2008  }
2009    
2010  /**
2011   * The src file is under FS, and the dst is on the local disk.
2012   * Copy it from FS control to the local dst name.
2013   * Remove the source afterwards
2014   * @param src path
2015   * @param dst path
2016   */
2017  public void moveToLocalFile(Path src, Path dst) throws IOException {
2018    copyToLocalFile(true, src, dst);
2019  }
2020
2021  /**
2022   * The src file is under FS, and the dst is on the local disk.
2023   * Copy it from FS control to the local dst name.
2024   * delSrc indicates if the src will be removed or not.
2025   * @param delSrc whether to delete the src
2026   * @param src path
2027   * @param dst path
2028   */   
2029  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
2030    throws IOException {
2031    copyToLocalFile(delSrc, src, dst, false);
2032  }
2033  
2034    /**
2035   * The src file is under FS, and the dst is on the local disk. Copy it from FS
2036   * control to the local dst name. delSrc indicates if the src will be removed
2037   * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
2038   * as local file system or not. RawLocalFileSystem is non crc file system.So,
2039   * It will not create any crc files at local.
2040   * 
2041   * @param delSrc
2042   *          whether to delete the src
2043   * @param src
2044   *          path
2045   * @param dst
2046   *          path
2047   * @param useRawLocalFileSystem
2048   *          whether to use RawLocalFileSystem as local file system or not.
2049   * 
2050   * @throws IOException
2051   *           - if any IO error
2052   */
2053  public void copyToLocalFile(boolean delSrc, Path src, Path dst,
2054      boolean useRawLocalFileSystem) throws IOException {
2055    Configuration conf = getConf();
2056    FileSystem local = null;
2057    if (useRawLocalFileSystem) {
2058      local = getLocal(conf).getRawFileSystem();
2059    } else {
2060      local = getLocal(conf);
2061    }
2062    FileUtil.copy(this, src, local, dst, delSrc, conf);
2063  }
2064
2065  /**
2066   * Returns a local File that the user can write output to.  The caller
2067   * provides both the eventual FS target name and the local working
2068   * file.  If the FS is local, we write directly into the target.  If
2069   * the FS is remote, we write into the tmp local area.
2070   * @param fsOutputFile path of output file
2071   * @param tmpLocalFile path of local tmp file
2072   */
2073  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2074    throws IOException {
2075    return tmpLocalFile;
2076  }
2077
2078  /**
2079   * Called when we're all done writing to the target.  A local FS will
2080   * do nothing, because we've written to exactly the right place.  A remote
2081   * FS will copy the contents of tmpLocalFile to the correct target at
2082   * fsOutputFile.
2083   * @param fsOutputFile path of output file
2084   * @param tmpLocalFile path to local tmp file
2085   */
2086  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2087    throws IOException {
2088    moveFromLocalFile(tmpLocalFile, fsOutputFile);
2089  }
2090
2091  /**
2092   * No more filesystem operations are needed.  Will
2093   * release any held locks.
2094   */
2095  @Override
2096  public void close() throws IOException {
2097    // delete all files that were marked as delete-on-exit.
2098    processDeleteOnExit();
2099    CACHE.remove(this.key, this);
2100  }
2101
2102  /** Return the total size of all files in the filesystem. */
2103  public long getUsed() throws IOException {
2104    Path path = new Path("/");
2105    return getUsed(path);
2106  }
2107
2108  /** Return the total size of all files from a specified path. */
2109  public long getUsed(Path path) throws IOException {
2110    return getContentSummary(path).getLength();
2111  }
2112
2113  /**
2114   * Get the block size for a particular file.
2115   * @param f the filename
2116   * @return the number of bytes in a block
2117   */
2118  /** @deprecated Use getFileStatus() instead */
2119  @Deprecated
2120  public long getBlockSize(Path f) throws IOException {
2121    return getFileStatus(f).getBlockSize();
2122  }
2123
2124  /**
2125   * Return the number of bytes that large input files should be optimally
2126   * be split into to minimize i/o time.
2127   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2128   */
2129  @Deprecated
2130  public long getDefaultBlockSize() {
2131    // default to 32MB: large enough to minimize the impact of seeks
2132    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2133  }
2134    
2135  /** Return the number of bytes that large input files should be optimally
2136   * be split into to minimize i/o time.  The given path will be used to
2137   * locate the actual filesystem.  The full path does not have to exist.
2138   * @param f path of file
2139   * @return the default block size for the path's filesystem
2140   */
2141  public long getDefaultBlockSize(Path f) {
2142    return getDefaultBlockSize();
2143  }
2144
2145  /**
2146   * Get the default replication.
2147   * @deprecated use {@link #getDefaultReplication(Path)} instead
2148   */
2149  @Deprecated
2150  public short getDefaultReplication() { return 1; }
2151
2152  /**
2153   * Get the default replication for a path.   The given path will be used to
2154   * locate the actual filesystem.  The full path does not have to exist.
2155   * @param path of the file
2156   * @return default replication for the path's filesystem 
2157   */
2158  public short getDefaultReplication(Path path) {
2159    return getDefaultReplication();
2160  }
2161  
2162  /**
2163   * Return a file status object that represents the path.
2164   * @param f The path we want information from
2165   * @return a FileStatus object
2166   * @throws FileNotFoundException when the path does not exist;
2167   *         IOException see specific implementation
2168   */
2169  public abstract FileStatus getFileStatus(Path f) throws IOException;
2170
2171  /**
2172   * Checks if the user can access a path.  The mode specifies which access
2173   * checks to perform.  If the requested permissions are granted, then the
2174   * method returns normally.  If access is denied, then the method throws an
2175   * {@link AccessControlException}.
2176   * <p/>
2177   * The default implementation of this method calls {@link #getFileStatus(Path)}
2178   * and checks the returned permissions against the requested permissions.
2179   * Note that the getFileStatus call will be subject to authorization checks.
2180   * Typically, this requires search (execute) permissions on each directory in
2181   * the path's prefix, but this is implementation-defined.  Any file system
2182   * that provides a richer authorization model (such as ACLs) may override the
2183   * default implementation so that it checks against that model instead.
2184   * <p>
2185   * In general, applications should avoid using this method, due to the risk of
2186   * time-of-check/time-of-use race conditions.  The permissions on a file may
2187   * change immediately after the access call returns.  Most applications should
2188   * prefer running specific file system actions as the desired user represented
2189   * by a {@link UserGroupInformation}.
2190   *
2191   * @param path Path to check
2192   * @param mode type of access to check
2193   * @throws AccessControlException if access is denied
2194   * @throws FileNotFoundException if the path does not exist
2195   * @throws IOException see specific implementation
2196   */
2197  @InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
2198  public void access(Path path, FsAction mode) throws AccessControlException,
2199      FileNotFoundException, IOException {
2200    checkAccessPermissions(this.getFileStatus(path), mode);
2201  }
2202
2203  /**
2204   * This method provides the default implementation of
2205   * {@link #access(Path, FsAction)}.
2206   *
2207   * @param stat FileStatus to check
2208   * @param mode type of access to check
2209   * @throws IOException for any error
2210   */
2211  @InterfaceAudience.Private
2212  static void checkAccessPermissions(FileStatus stat, FsAction mode)
2213      throws IOException {
2214    FsPermission perm = stat.getPermission();
2215    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
2216    String user = ugi.getShortUserName();
2217    List<String> groups = Arrays.asList(ugi.getGroupNames());
2218    if (user.equals(stat.getOwner())) {
2219      if (perm.getUserAction().implies(mode)) {
2220        return;
2221      }
2222    } else if (groups.contains(stat.getGroup())) {
2223      if (perm.getGroupAction().implies(mode)) {
2224        return;
2225      }
2226    } else {
2227      if (perm.getOtherAction().implies(mode)) {
2228        return;
2229      }
2230    }
2231    throw new AccessControlException(String.format(
2232      "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
2233      stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
2234  }
2235
2236  /**
2237   * See {@link FileContext#fixRelativePart}
2238   */
2239  protected Path fixRelativePart(Path p) {
2240    if (p.isUriPathAbsolute()) {
2241      return p;
2242    } else {
2243      return new Path(getWorkingDirectory(), p);
2244    }
2245  }
2246
2247  /**
2248   * See {@link FileContext#createSymlink(Path, Path, boolean)}
2249   */
2250  public void createSymlink(final Path target, final Path link,
2251      final boolean createParent) throws AccessControlException,
2252      FileAlreadyExistsException, FileNotFoundException,
2253      ParentNotDirectoryException, UnsupportedFileSystemException, 
2254      IOException {
2255    // Supporting filesystems should override this method
2256    throw new UnsupportedOperationException(
2257        "Filesystem does not support symlinks!");
2258  }
2259
2260  /**
2261   * See {@link FileContext#getFileLinkStatus(Path)}
2262   */
2263  public FileStatus getFileLinkStatus(final Path f)
2264      throws AccessControlException, FileNotFoundException,
2265      UnsupportedFileSystemException, IOException {
2266    // Supporting filesystems should override this method
2267    return getFileStatus(f);
2268  }
2269
2270  /**
2271   * See {@link AbstractFileSystem#supportsSymlinks()}
2272   */
2273  public boolean supportsSymlinks() {
2274    return false;
2275  }
2276
2277  /**
2278   * See {@link FileContext#getLinkTarget(Path)}
2279   */
2280  public Path getLinkTarget(Path f) throws IOException {
2281    // Supporting filesystems should override this method
2282    throw new UnsupportedOperationException(
2283        "Filesystem does not support symlinks!");
2284  }
2285
2286  /**
2287   * See {@link AbstractFileSystem#getLinkTarget(Path)}
2288   */
2289  protected Path resolveLink(Path f) throws IOException {
2290    // Supporting filesystems should override this method
2291    throw new UnsupportedOperationException(
2292        "Filesystem does not support symlinks!");
2293  }
2294
2295  /**
2296   * Get the checksum of a file.
2297   *
2298   * @param f The file path
2299   * @return The file checksum.  The default return value is null,
2300   *  which indicates that no checksum algorithm is implemented
2301   *  in the corresponding FileSystem.
2302   */
2303  public FileChecksum getFileChecksum(Path f) throws IOException {
2304    return getFileChecksum(f, Long.MAX_VALUE);
2305  }
2306
2307  /**
2308   * Get the checksum of a file, from the beginning of the file till the
2309   * specific length.
2310   * @param f The file path
2311   * @param length The length of the file range for checksum calculation
2312   * @return The file checksum.
2313   */
2314  public FileChecksum getFileChecksum(Path f, final long length)
2315      throws IOException {
2316    return null;
2317  }
2318
2319  /**
2320   * Set the verify checksum flag. This is only applicable if the 
2321   * corresponding FileSystem supports checksum. By default doesn't do anything.
2322   * @param verifyChecksum
2323   */
2324  public void setVerifyChecksum(boolean verifyChecksum) {
2325    //doesn't do anything
2326  }
2327
2328  /**
2329   * Set the write checksum flag. This is only applicable if the 
2330   * corresponding FileSystem supports checksum. By default doesn't do anything.
2331   * @param writeChecksum
2332   */
2333  public void setWriteChecksum(boolean writeChecksum) {
2334    //doesn't do anything
2335  }
2336
2337  /**
2338   * Returns a status object describing the use and capacity of the
2339   * file system. If the file system has multiple partitions, the
2340   * use and capacity of the root partition is reflected.
2341   * 
2342   * @return a FsStatus object
2343   * @throws IOException
2344   *           see specific implementation
2345   */
2346  public FsStatus getStatus() throws IOException {
2347    return getStatus(null);
2348  }
2349
2350  /**
2351   * Returns a status object describing the use and capacity of the
2352   * file system. If the file system has multiple partitions, the
2353   * use and capacity of the partition pointed to by the specified
2354   * path is reflected.
2355   * @param p Path for which status should be obtained. null means
2356   * the default partition. 
2357   * @return a FsStatus object
2358   * @throws IOException
2359   *           see specific implementation
2360   */
2361  public FsStatus getStatus(Path p) throws IOException {
2362    return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2363  }
2364
2365  /**
2366   * Set permission of a path.
2367   * @param p
2368   * @param permission
2369   */
2370  public void setPermission(Path p, FsPermission permission
2371      ) throws IOException {
2372  }
2373
2374  /**
2375   * Set owner of a path (i.e. a file or a directory).
2376   * The parameters username and groupname cannot both be null.
2377   * @param p The path
2378   * @param username If it is null, the original username remains unchanged.
2379   * @param groupname If it is null, the original groupname remains unchanged.
2380   */
2381  public void setOwner(Path p, String username, String groupname
2382      ) throws IOException {
2383  }
2384
2385  /**
2386   * Set access time of a file
2387   * @param p The path
2388   * @param mtime Set the modification time of this file.
2389   *              The number of milliseconds since Jan 1, 1970. 
2390   *              A value of -1 means that this call should not set modification time.
2391   * @param atime Set the access time of this file.
2392   *              The number of milliseconds since Jan 1, 1970. 
2393   *              A value of -1 means that this call should not set access time.
2394   */
2395  public void setTimes(Path p, long mtime, long atime
2396      ) throws IOException {
2397  }
2398
2399  /**
2400   * Create a snapshot with a default name.
2401   * @param path The directory where snapshots will be taken.
2402   * @return the snapshot path.
2403   */
2404  public final Path createSnapshot(Path path) throws IOException {
2405    return createSnapshot(path, null);
2406  }
2407
2408  /**
2409   * Create a snapshot
2410   * @param path The directory where snapshots will be taken.
2411   * @param snapshotName The name of the snapshot
2412   * @return the snapshot path.
2413   */
2414  public Path createSnapshot(Path path, String snapshotName)
2415      throws IOException {
2416    throw new UnsupportedOperationException(getClass().getSimpleName()
2417        + " doesn't support createSnapshot");
2418  }
2419  
2420  /**
2421   * Rename a snapshot
2422   * @param path The directory path where the snapshot was taken
2423   * @param snapshotOldName Old name of the snapshot
2424   * @param snapshotNewName New name of the snapshot
2425   * @throws IOException
2426   */
2427  public void renameSnapshot(Path path, String snapshotOldName,
2428      String snapshotNewName) throws IOException {
2429    throw new UnsupportedOperationException(getClass().getSimpleName()
2430        + " doesn't support renameSnapshot");
2431  }
2432  
2433  /**
2434   * Delete a snapshot of a directory
2435   * @param path  The directory that the to-be-deleted snapshot belongs to
2436   * @param snapshotName The name of the snapshot
2437   */
2438  public void deleteSnapshot(Path path, String snapshotName)
2439      throws IOException {
2440    throw new UnsupportedOperationException(getClass().getSimpleName()
2441        + " doesn't support deleteSnapshot");
2442  }
2443  
2444  /**
2445   * Modifies ACL entries of files and directories.  This method can add new ACL
2446   * entries or modify the permissions on existing ACL entries.  All existing
2447   * ACL entries that are not specified in this call are retained without
2448   * changes.  (Modifications are merged into the current ACL.)
2449   *
2450   * @param path Path to modify
2451   * @param aclSpec List<AclEntry> describing modifications
2452   * @throws IOException if an ACL could not be modified
2453   */
2454  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2455      throws IOException {
2456    throw new UnsupportedOperationException(getClass().getSimpleName()
2457        + " doesn't support modifyAclEntries");
2458  }
2459
2460  /**
2461   * Removes ACL entries from files and directories.  Other ACL entries are
2462   * retained.
2463   *
2464   * @param path Path to modify
2465   * @param aclSpec List<AclEntry> describing entries to remove
2466   * @throws IOException if an ACL could not be modified
2467   */
2468  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2469      throws IOException {
2470    throw new UnsupportedOperationException(getClass().getSimpleName()
2471        + " doesn't support removeAclEntries");
2472  }
2473
2474  /**
2475   * Removes all default ACL entries from files and directories.
2476   *
2477   * @param path Path to modify
2478   * @throws IOException if an ACL could not be modified
2479   */
2480  public void removeDefaultAcl(Path path)
2481      throws IOException {
2482    throw new UnsupportedOperationException(getClass().getSimpleName()
2483        + " doesn't support removeDefaultAcl");
2484  }
2485
2486  /**
2487   * Removes all but the base ACL entries of files and directories.  The entries
2488   * for user, group, and others are retained for compatibility with permission
2489   * bits.
2490   *
2491   * @param path Path to modify
2492   * @throws IOException if an ACL could not be removed
2493   */
2494  public void removeAcl(Path path)
2495      throws IOException {
2496    throw new UnsupportedOperationException(getClass().getSimpleName()
2497        + " doesn't support removeAcl");
2498  }
2499
2500  /**
2501   * Fully replaces ACL of files and directories, discarding all existing
2502   * entries.
2503   *
2504   * @param path Path to modify
2505   * @param aclSpec List<AclEntry> describing modifications, must include entries
2506   *   for user, group, and others for compatibility with permission bits.
2507   * @throws IOException if an ACL could not be modified
2508   */
2509  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2510    throw new UnsupportedOperationException(getClass().getSimpleName()
2511        + " doesn't support setAcl");
2512  }
2513
2514  /**
2515   * Gets the ACL of a file or directory.
2516   *
2517   * @param path Path to get
2518   * @return AclStatus describing the ACL of the file or directory
2519   * @throws IOException if an ACL could not be read
2520   */
2521  public AclStatus getAclStatus(Path path) throws IOException {
2522    throw new UnsupportedOperationException(getClass().getSimpleName()
2523        + " doesn't support getAclStatus");
2524  }
2525
2526  /**
2527   * Set an xattr of a file or directory.
2528   * The name must be prefixed with the namespace followed by ".". For example,
2529   * "user.attr".
2530   * <p/>
2531   * Refer to the HDFS extended attributes user documentation for details.
2532   *
2533   * @param path Path to modify
2534   * @param name xattr name.
2535   * @param value xattr value.
2536   * @throws IOException
2537   */
2538  public void setXAttr(Path path, String name, byte[] value)
2539      throws IOException {
2540    setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
2541        XAttrSetFlag.REPLACE));
2542  }
2543
2544  /**
2545   * Set an xattr of a file or directory.
2546   * The name must be prefixed with the namespace followed by ".". For example,
2547   * "user.attr".
2548   * <p/>
2549   * Refer to the HDFS extended attributes user documentation for details.
2550   *
2551   * @param path Path to modify
2552   * @param name xattr name.
2553   * @param value xattr value.
2554   * @param flag xattr set flag
2555   * @throws IOException
2556   */
2557  public void setXAttr(Path path, String name, byte[] value,
2558      EnumSet<XAttrSetFlag> flag) throws IOException {
2559    throw new UnsupportedOperationException(getClass().getSimpleName()
2560        + " doesn't support setXAttr");
2561  }
2562
2563  /**
2564   * Get an xattr name and value for a file or directory.
2565   * The name must be prefixed with the namespace followed by ".". For example,
2566   * "user.attr".
2567   * <p/>
2568   * Refer to the HDFS extended attributes user documentation for details.
2569   *
2570   * @param path Path to get extended attribute
2571   * @param name xattr name.
2572   * @return byte[] xattr value.
2573   * @throws IOException
2574   */
2575  public byte[] getXAttr(Path path, String name) throws IOException {
2576    throw new UnsupportedOperationException(getClass().getSimpleName()
2577        + " doesn't support getXAttr");
2578  }
2579
2580  /**
2581   * Get all of the xattr name/value pairs for a file or directory.
2582   * Only those xattrs which the logged-in user has permissions to view
2583   * are returned.
2584   * <p/>
2585   * Refer to the HDFS extended attributes user documentation for details.
2586   *
2587   * @param path Path to get extended attributes
2588   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2589   * @throws IOException
2590   */
2591  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
2592    throw new UnsupportedOperationException(getClass().getSimpleName()
2593        + " doesn't support getXAttrs");
2594  }
2595
2596  /**
2597   * Get all of the xattrs name/value pairs for a file or directory.
2598   * Only those xattrs which the logged-in user has permissions to view
2599   * are returned.
2600   * <p/>
2601   * Refer to the HDFS extended attributes user documentation for details.
2602   *
2603   * @param path Path to get extended attributes
2604   * @param names XAttr names.
2605   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2606   * @throws IOException
2607   */
2608  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
2609      throws IOException {
2610    throw new UnsupportedOperationException(getClass().getSimpleName()
2611        + " doesn't support getXAttrs");
2612  }
2613
2614  /**
2615   * Get all of the xattr names for a file or directory.
2616   * Only those xattr names which the logged-in user has permissions to view
2617   * are returned.
2618   * <p/>
2619   * Refer to the HDFS extended attributes user documentation for details.
2620   *
2621   * @param path Path to get extended attributes
2622   * @return List<String> of the XAttr names of the file or directory
2623   * @throws IOException
2624   */
2625  public List<String> listXAttrs(Path path) throws IOException {
2626    throw new UnsupportedOperationException(getClass().getSimpleName()
2627            + " doesn't support listXAttrs");
2628  }
2629
2630  /**
2631   * Remove an xattr of a file or directory.
2632   * The name must be prefixed with the namespace followed by ".". For example,
2633   * "user.attr".
2634   * <p/>
2635   * Refer to the HDFS extended attributes user documentation for details.
2636   *
2637   * @param path Path to remove extended attribute
2638   * @param name xattr name
2639   * @throws IOException
2640   */
2641  public void removeXAttr(Path path, String name) throws IOException {
2642    throw new UnsupportedOperationException(getClass().getSimpleName()
2643        + " doesn't support removeXAttr");
2644  }
2645
2646  /**
2647   * Set the storage policy for a given file or directory.
2648   *
2649   * @param src file or directory path.
2650   * @param policyName the name of the target storage policy. The list
2651   *                   of supported Storage policies can be retrieved
2652   *                   via {@link #getAllStoragePolicies}.
2653   * @throws IOException
2654   */
2655  public void setStoragePolicy(final Path src, final String policyName)
2656      throws IOException {
2657    throw new UnsupportedOperationException(getClass().getSimpleName()
2658        + " doesn't support setStoragePolicy");
2659  }
2660
2661  /**
2662   * Query the effective storage policy ID for the given file or directory.
2663   *
2664   * @param src file or directory path.
2665   * @return storage policy for give file.
2666   * @throws IOException
2667   */
2668  public BlockStoragePolicySpi getStoragePolicy(final Path src)
2669      throws IOException {
2670    throw new UnsupportedOperationException(getClass().getSimpleName()
2671        + " doesn't support getStoragePolicy");
2672  }
2673
2674  /**
2675   * Retrieve all the storage policies supported by this file system.
2676   *
2677   * @return all storage policies supported by this filesystem.
2678   * @throws IOException
2679   */
2680  public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
2681      throws IOException {
2682    throw new UnsupportedOperationException(getClass().getSimpleName()
2683        + " doesn't support getAllStoragePolicies");
2684  }
2685
2686  /**
2687   * Get the root directory of Trash for current user when the path specified
2688   * is deleted.
2689   *
2690   * @param path the trash root of the path to be determined.
2691   * @return the default implementation returns "/user/$USER/.Trash".
2692   */
2693  public Path getTrashRoot(Path path) {
2694    return this.makeQualified(new Path(getHomeDirectory().toUri().getPath(),
2695        TRASH_PREFIX));
2696  }
2697
2698  /**
2699   * Get all the trash roots for current user or all users.
2700   *
2701   * @param allUsers return trash roots for all users if true.
2702   * @return all the trash root directories.
2703   *         Default FileSystem returns .Trash under users' home directories if
2704   *         /user/$USER/.Trash exists.
2705   */
2706  public Collection<FileStatus> getTrashRoots(boolean allUsers) {
2707    Path userHome = new Path(getHomeDirectory().toUri().getPath());
2708    List<FileStatus> ret = new ArrayList<>();
2709    try {
2710      if (!allUsers) {
2711        Path userTrash = new Path(userHome, TRASH_PREFIX);
2712        if (exists(userTrash)) {
2713          ret.add(getFileStatus(userTrash));
2714        }
2715      } else {
2716        Path homeParent = userHome.getParent();
2717        if (exists(homeParent)) {
2718          FileStatus[] candidates = listStatus(homeParent);
2719          for (FileStatus candidate : candidates) {
2720            Path userTrash = new Path(candidate.getPath(), TRASH_PREFIX);
2721            if (exists(userTrash)) {
2722              candidate.setPath(userTrash);
2723              ret.add(candidate);
2724            }
2725          }
2726        }
2727      }
2728    } catch (IOException e) {
2729      LOG.warn("Cannot get all trash roots", e);
2730    }
2731    return ret;
2732  }
2733
2734  // making it volatile to be able to do a double checked locking
2735  private volatile static boolean FILE_SYSTEMS_LOADED = false;
2736
2737  private static final Map<String, Class<? extends FileSystem>>
2738    SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2739
2740  private static void loadFileSystems() {
2741    synchronized (FileSystem.class) {
2742      if (!FILE_SYSTEMS_LOADED) {
2743        ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2744        Iterator<FileSystem> it = serviceLoader.iterator();
2745        while (it.hasNext()) {
2746          FileSystem fs = null;
2747          try {
2748            fs = it.next();
2749            try {
2750              SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2751            } catch (Exception e) {
2752              LOG.warn("Cannot load: " + fs + " from " +
2753                  ClassUtil.findContainingJar(fs.getClass()), e);
2754            }
2755          } catch (ServiceConfigurationError ee) {
2756            LOG.warn("Cannot load filesystem", ee);
2757          }
2758        }
2759        FILE_SYSTEMS_LOADED = true;
2760      }
2761    }
2762  }
2763
2764  public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2765      Configuration conf) throws IOException {
2766    if (!FILE_SYSTEMS_LOADED) {
2767      loadFileSystems();
2768    }
2769    Class<? extends FileSystem> clazz = null;
2770    if (conf != null) {
2771      clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2772    }
2773    if (clazz == null) {
2774      clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2775    }
2776    if (clazz == null) {
2777      throw new IOException("No FileSystem for scheme: " + scheme);
2778    }
2779    return clazz;
2780  }
2781
2782  private static FileSystem createFileSystem(URI uri, Configuration conf
2783      ) throws IOException {
2784    Tracer tracer = FsTracer.get(conf);
2785    TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
2786    scope.addKVAnnotation("scheme", uri.getScheme());
2787    try {
2788      Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2789      FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2790      fs.initialize(uri, conf);
2791      return fs;
2792    } finally {
2793      scope.close();
2794    }
2795  }
2796
2797  /** Caching FileSystem objects */
2798  static class Cache {
2799    private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2800
2801    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2802    private final Set<Key> toAutoClose = new HashSet<Key>();
2803
2804    /** A variable that makes all objects in the cache unique */
2805    private static AtomicLong unique = new AtomicLong(1);
2806
2807    FileSystem get(URI uri, Configuration conf) throws IOException{
2808      Key key = new Key(uri, conf);
2809      return getInternal(uri, conf, key);
2810    }
2811
2812    /** The objects inserted into the cache using this method are all unique */
2813    FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2814      Key key = new Key(uri, conf, unique.getAndIncrement());
2815      return getInternal(uri, conf, key);
2816    }
2817
2818    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2819      FileSystem fs;
2820      synchronized (this) {
2821        fs = map.get(key);
2822      }
2823      if (fs != null) {
2824        return fs;
2825      }
2826
2827      fs = createFileSystem(uri, conf);
2828      synchronized (this) { // refetch the lock again
2829        FileSystem oldfs = map.get(key);
2830        if (oldfs != null) { // a file system is created while lock is releasing
2831          fs.close(); // close the new file system
2832          return oldfs;  // return the old file system
2833        }
2834        
2835        // now insert the new file system into the map
2836        if (map.isEmpty()
2837                && !ShutdownHookManager.get().isShutdownInProgress()) {
2838          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2839        }
2840        fs.key = key;
2841        map.put(key, fs);
2842        if (conf.getBoolean("fs.automatic.close", true)) {
2843          toAutoClose.add(key);
2844        }
2845        return fs;
2846      }
2847    }
2848
2849    synchronized void remove(Key key, FileSystem fs) {
2850      FileSystem cachedFs = map.remove(key);
2851      if (fs == cachedFs) {
2852        toAutoClose.remove(key);
2853      } else if (cachedFs != null) {
2854        map.put(key, cachedFs);
2855      }
2856    }
2857
2858    synchronized void closeAll() throws IOException {
2859      closeAll(false);
2860    }
2861
2862    /**
2863     * Close all FileSystem instances in the Cache.
2864     * @param onlyAutomatic only close those that are marked for automatic closing
2865     */
2866    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2867      List<IOException> exceptions = new ArrayList<IOException>();
2868
2869      // Make a copy of the keys in the map since we'll be modifying
2870      // the map while iterating over it, which isn't safe.
2871      List<Key> keys = new ArrayList<Key>();
2872      keys.addAll(map.keySet());
2873
2874      for (Key key : keys) {
2875        final FileSystem fs = map.get(key);
2876
2877        if (onlyAutomatic && !toAutoClose.contains(key)) {
2878          continue;
2879        }
2880
2881        //remove from cache
2882        map.remove(key);
2883        toAutoClose.remove(key);
2884
2885        if (fs != null) {
2886          try {
2887            fs.close();
2888          }
2889          catch(IOException ioe) {
2890            exceptions.add(ioe);
2891          }
2892        }
2893      }
2894
2895      if (!exceptions.isEmpty()) {
2896        throw MultipleIOException.createIOException(exceptions);
2897      }
2898    }
2899
2900    private class ClientFinalizer implements Runnable {
2901      @Override
2902      public synchronized void run() {
2903        try {
2904          closeAll(true);
2905        } catch (IOException e) {
2906          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2907        }
2908      }
2909    }
2910
2911    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2912      List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2913      //Make a pass over the list and collect the filesystems to close
2914      //we cannot close inline since close() removes the entry from the Map
2915      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2916        final Key key = entry.getKey();
2917        final FileSystem fs = entry.getValue();
2918        if (ugi.equals(key.ugi) && fs != null) {
2919          targetFSList.add(fs);   
2920        }
2921      }
2922      List<IOException> exceptions = new ArrayList<IOException>();
2923      //now make a pass over the target list and close each
2924      for (FileSystem fs : targetFSList) {
2925        try {
2926          fs.close();
2927        }
2928        catch(IOException ioe) {
2929          exceptions.add(ioe);
2930        }
2931      }
2932      if (!exceptions.isEmpty()) {
2933        throw MultipleIOException.createIOException(exceptions);
2934      }
2935    }
2936
2937    /** FileSystem.Cache.Key */
2938    static class Key {
2939      final String scheme;
2940      final String authority;
2941      final UserGroupInformation ugi;
2942      final long unique;   // an artificial way to make a key unique
2943
2944      Key(URI uri, Configuration conf) throws IOException {
2945        this(uri, conf, 0);
2946      }
2947
2948      Key(URI uri, Configuration conf, long unique) throws IOException {
2949        scheme = uri.getScheme()==null ?
2950            "" : StringUtils.toLowerCase(uri.getScheme());
2951        authority = uri.getAuthority()==null ?
2952            "" : StringUtils.toLowerCase(uri.getAuthority());
2953        this.unique = unique;
2954        
2955        this.ugi = UserGroupInformation.getCurrentUser();
2956      }
2957
2958      @Override
2959      public int hashCode() {
2960        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2961      }
2962
2963      static boolean isEqual(Object a, Object b) {
2964        return a == b || (a != null && a.equals(b));        
2965      }
2966
2967      @Override
2968      public boolean equals(Object obj) {
2969        if (obj == this) {
2970          return true;
2971        }
2972        if (obj != null && obj instanceof Key) {
2973          Key that = (Key)obj;
2974          return isEqual(this.scheme, that.scheme)
2975                 && isEqual(this.authority, that.authority)
2976                 && isEqual(this.ugi, that.ugi)
2977                 && (this.unique == that.unique);
2978        }
2979        return false;        
2980      }
2981
2982      @Override
2983      public String toString() {
2984        return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
2985      }
2986    }
2987  }
2988  
2989  /**
2990   * Tracks statistics about how many reads, writes, and so forth have been
2991   * done in a FileSystem.
2992   * 
2993   * Since there is only one of these objects per FileSystem, there will 
2994   * typically be many threads writing to this object.  Almost every operation
2995   * on an open file will involve a write to this object.  In contrast, reading
2996   * statistics is done infrequently by most programs, and not at all by others.
2997   * Hence, this is optimized for writes.
2998   * 
2999   * Each thread writes to its own thread-local area of memory.  This removes 
3000   * contention and allows us to scale up to many, many threads.  To read
3001   * statistics, the reader thread totals up the contents of all of the 
3002   * thread-local data areas.
3003   */
3004  public static final class Statistics {
3005    /**
3006     * Statistics data.
3007     * 
3008     * There is only a single writer to thread-local StatisticsData objects.
3009     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
3010     * to prevent lost updates.
3011     * The Java specification guarantees that updates to volatile longs will
3012     * be perceived as atomic with respect to other threads, which is all we
3013     * need.
3014     */
3015    public static class StatisticsData {
3016      volatile long bytesRead;
3017      volatile long bytesWritten;
3018      volatile int readOps;
3019      volatile int largeReadOps;
3020      volatile int writeOps;
3021
3022      /**
3023       * Add another StatisticsData object to this one.
3024       */
3025      void add(StatisticsData other) {
3026        this.bytesRead += other.bytesRead;
3027        this.bytesWritten += other.bytesWritten;
3028        this.readOps += other.readOps;
3029        this.largeReadOps += other.largeReadOps;
3030        this.writeOps += other.writeOps;
3031      }
3032
3033      /**
3034       * Negate the values of all statistics.
3035       */
3036      void negate() {
3037        this.bytesRead = -this.bytesRead;
3038        this.bytesWritten = -this.bytesWritten;
3039        this.readOps = -this.readOps;
3040        this.largeReadOps = -this.largeReadOps;
3041        this.writeOps = -this.writeOps;
3042      }
3043
3044      @Override
3045      public String toString() {
3046        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
3047            + readOps + " read ops, " + largeReadOps + " large read ops, "
3048            + writeOps + " write ops";
3049      }
3050      
3051      public long getBytesRead() {
3052        return bytesRead;
3053      }
3054      
3055      public long getBytesWritten() {
3056        return bytesWritten;
3057      }
3058      
3059      public int getReadOps() {
3060        return readOps;
3061      }
3062      
3063      public int getLargeReadOps() {
3064        return largeReadOps;
3065      }
3066      
3067      public int getWriteOps() {
3068        return writeOps;
3069      }
3070    }
3071
3072    private interface StatisticsAggregator<T> {
3073      void accept(StatisticsData data);
3074      T aggregate();
3075    }
3076
3077    private final String scheme;
3078
3079    /**
3080     * rootData is data that doesn't belong to any thread, but will be added
3081     * to the totals.  This is useful for making copies of Statistics objects,
3082     * and for storing data that pertains to threads that have been garbage
3083     * collected.  Protected by the Statistics lock.
3084     */
3085    private final StatisticsData rootData;
3086
3087    /**
3088     * Thread-local data.
3089     */
3090    private final ThreadLocal<StatisticsData> threadData;
3091
3092    /**
3093     * Set of all thread-local data areas.  Protected by the Statistics lock.
3094     * The references to the statistics data are kept using phantom references
3095     * to the associated threads. Proper clean-up is performed by the cleaner
3096     * thread when the threads are garbage collected.
3097     */
3098    private final Set<StatisticsDataReference> allData;
3099
3100    /**
3101     * Global reference queue and a cleaner thread that manage statistics data
3102     * references from all filesystem instances.
3103     */
3104    private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
3105    private static final Thread STATS_DATA_CLEANER;
3106
3107    static {
3108      STATS_DATA_REF_QUEUE = new ReferenceQueue<Thread>();
3109      // start a single daemon cleaner thread
3110      STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
3111      STATS_DATA_CLEANER.
3112          setName(StatisticsDataReferenceCleaner.class.getName());
3113      STATS_DATA_CLEANER.setDaemon(true);
3114      STATS_DATA_CLEANER.start();
3115    }
3116
3117    public Statistics(String scheme) {
3118      this.scheme = scheme;
3119      this.rootData = new StatisticsData();
3120      this.threadData = new ThreadLocal<StatisticsData>();
3121      this.allData = new HashSet<StatisticsDataReference>();
3122    }
3123
3124    /**
3125     * Copy constructor.
3126     * 
3127     * @param other    The input Statistics object which is cloned.
3128     */
3129    public Statistics(Statistics other) {
3130      this.scheme = other.scheme;
3131      this.rootData = new StatisticsData();
3132      other.visitAll(new StatisticsAggregator<Void>() {
3133        @Override
3134        public void accept(StatisticsData data) {
3135          rootData.add(data);
3136        }
3137
3138        public Void aggregate() {
3139          return null;
3140        }
3141      });
3142      this.threadData = new ThreadLocal<StatisticsData>();
3143      this.allData = new HashSet<StatisticsDataReference>();
3144    }
3145
3146    /**
3147     * A phantom reference to a thread that also includes the data associated
3148     * with that thread. On the thread being garbage collected, it is enqueued
3149     * to the reference queue for clean-up.
3150     */
3151    private class StatisticsDataReference extends PhantomReference<Thread> {
3152      private final StatisticsData data;
3153
3154      public StatisticsDataReference(StatisticsData data, Thread thread) {
3155        super(thread, STATS_DATA_REF_QUEUE);
3156        this.data = data;
3157      }
3158
3159      public StatisticsData getData() {
3160        return data;
3161      }
3162
3163      /**
3164       * Performs clean-up action when the associated thread is garbage
3165       * collected.
3166       */
3167      public void cleanUp() {
3168        // use the statistics lock for safety
3169        synchronized (Statistics.this) {
3170          /*
3171           * If the thread that created this thread-local data no longer exists,
3172           * remove the StatisticsData from our list and fold the values into
3173           * rootData.
3174           */
3175          rootData.add(data);
3176          allData.remove(this);
3177        }
3178      }
3179    }
3180
3181    /**
3182     * Background action to act on references being removed.
3183     */
3184    private static class StatisticsDataReferenceCleaner implements Runnable {
3185      @Override
3186      public void run() {
3187        while (!Thread.interrupted()) {
3188          try {
3189            StatisticsDataReference ref =
3190                (StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
3191            ref.cleanUp();
3192          } catch (InterruptedException ie) {
3193            LOG.warn("Cleaner thread interrupted, will stop", ie);
3194            Thread.currentThread().interrupt();
3195          } catch (Throwable th) {
3196            LOG.warn("Exception in the cleaner thread but it will continue to "
3197                + "run", th);
3198          }
3199        }
3200      }
3201    }
3202
3203    /**
3204     * Get or create the thread-local data associated with the current thread.
3205     */
3206    public StatisticsData getThreadStatistics() {
3207      StatisticsData data = threadData.get();
3208      if (data == null) {
3209        data = new StatisticsData();
3210        threadData.set(data);
3211        StatisticsDataReference ref =
3212            new StatisticsDataReference(data, Thread.currentThread());
3213        synchronized(this) {
3214          allData.add(ref);
3215        }
3216      }
3217      return data;
3218    }
3219
3220    /**
3221     * Increment the bytes read in the statistics
3222     * @param newBytes the additional bytes read
3223     */
3224    public void incrementBytesRead(long newBytes) {
3225      getThreadStatistics().bytesRead += newBytes;
3226    }
3227    
3228    /**
3229     * Increment the bytes written in the statistics
3230     * @param newBytes the additional bytes written
3231     */
3232    public void incrementBytesWritten(long newBytes) {
3233      getThreadStatistics().bytesWritten += newBytes;
3234    }
3235    
3236    /**
3237     * Increment the number of read operations
3238     * @param count number of read operations
3239     */
3240    public void incrementReadOps(int count) {
3241      getThreadStatistics().readOps += count;
3242    }
3243
3244    /**
3245     * Increment the number of large read operations
3246     * @param count number of large read operations
3247     */
3248    public void incrementLargeReadOps(int count) {
3249      getThreadStatistics().largeReadOps += count;
3250    }
3251
3252    /**
3253     * Increment the number of write operations
3254     * @param count number of write operations
3255     */
3256    public void incrementWriteOps(int count) {
3257      getThreadStatistics().writeOps += count;
3258    }
3259
3260    /**
3261     * Apply the given aggregator to all StatisticsData objects associated with
3262     * this Statistics object.
3263     *
3264     * For each StatisticsData object, we will call accept on the visitor.
3265     * Finally, at the end, we will call aggregate to get the final total. 
3266     *
3267     * @param         visitor to use.
3268     * @return        The total.
3269     */
3270    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
3271      visitor.accept(rootData);
3272      for (StatisticsDataReference ref: allData) {
3273        StatisticsData data = ref.getData();
3274        visitor.accept(data);
3275      }
3276      return visitor.aggregate();
3277    }
3278
3279    /**
3280     * Get the total number of bytes read
3281     * @return the number of bytes
3282     */
3283    public long getBytesRead() {
3284      return visitAll(new StatisticsAggregator<Long>() {
3285        private long bytesRead = 0;
3286
3287        @Override
3288        public void accept(StatisticsData data) {
3289          bytesRead += data.bytesRead;
3290        }
3291
3292        public Long aggregate() {
3293          return bytesRead;
3294        }
3295      });
3296    }
3297    
3298    /**
3299     * Get the total number of bytes written
3300     * @return the number of bytes
3301     */
3302    public long getBytesWritten() {
3303      return visitAll(new StatisticsAggregator<Long>() {
3304        private long bytesWritten = 0;
3305
3306        @Override
3307        public void accept(StatisticsData data) {
3308          bytesWritten += data.bytesWritten;
3309        }
3310
3311        public Long aggregate() {
3312          return bytesWritten;
3313        }
3314      });
3315    }
3316    
3317    /**
3318     * Get the number of file system read operations such as list files
3319     * @return number of read operations
3320     */
3321    public int getReadOps() {
3322      return visitAll(new StatisticsAggregator<Integer>() {
3323        private int readOps = 0;
3324
3325        @Override
3326        public void accept(StatisticsData data) {
3327          readOps += data.readOps;
3328          readOps += data.largeReadOps;
3329        }
3330
3331        public Integer aggregate() {
3332          return readOps;
3333        }
3334      });
3335    }
3336
3337    /**
3338     * Get the number of large file system read operations such as list files
3339     * under a large directory
3340     * @return number of large read operations
3341     */
3342    public int getLargeReadOps() {
3343      return visitAll(new StatisticsAggregator<Integer>() {
3344        private int largeReadOps = 0;
3345
3346        @Override
3347        public void accept(StatisticsData data) {
3348          largeReadOps += data.largeReadOps;
3349        }
3350
3351        public Integer aggregate() {
3352          return largeReadOps;
3353        }
3354      });
3355    }
3356
3357    /**
3358     * Get the number of file system write operations such as create, append 
3359     * rename etc.
3360     * @return number of write operations
3361     */
3362    public int getWriteOps() {
3363      return visitAll(new StatisticsAggregator<Integer>() {
3364        private int writeOps = 0;
3365
3366        @Override
3367        public void accept(StatisticsData data) {
3368          writeOps += data.writeOps;
3369        }
3370
3371        public Integer aggregate() {
3372          return writeOps;
3373        }
3374      });
3375    }
3376
3377
3378    @Override
3379    public String toString() {
3380      return visitAll(new StatisticsAggregator<String>() {
3381        private StatisticsData total = new StatisticsData();
3382
3383        @Override
3384        public void accept(StatisticsData data) {
3385          total.add(data);
3386        }
3387
3388        public String aggregate() {
3389          return total.toString();
3390        }
3391      });
3392    }
3393
3394    /**
3395     * Resets all statistics to 0.
3396     *
3397     * In order to reset, we add up all the thread-local statistics data, and
3398     * set rootData to the negative of that.
3399     *
3400     * This may seem like a counterintuitive way to reset the statsitics.  Why
3401     * can't we just zero out all the thread-local data?  Well, thread-local
3402     * data can only be modified by the thread that owns it.  If we tried to
3403     * modify the thread-local data from this thread, our modification might get
3404     * interleaved with a read-modify-write operation done by the thread that
3405     * owns the data.  That would result in our update getting lost.
3406     *
3407     * The approach used here avoids this problem because it only ever reads
3408     * (not writes) the thread-local data.  Both reads and writes to rootData
3409     * are done under the lock, so we're free to modify rootData from any thread
3410     * that holds the lock.
3411     */
3412    public void reset() {
3413      visitAll(new StatisticsAggregator<Void>() {
3414        private StatisticsData total = new StatisticsData();
3415
3416        @Override
3417        public void accept(StatisticsData data) {
3418          total.add(data);
3419        }
3420
3421        public Void aggregate() {
3422          total.negate();
3423          rootData.add(total);
3424          return null;
3425        }
3426      });
3427    }
3428    
3429    /**
3430     * Get the uri scheme associated with this statistics object.
3431     * @return the schema associated with this set of statistics
3432     */
3433    public String getScheme() {
3434      return scheme;
3435    }
3436
3437    @VisibleForTesting
3438    synchronized int getAllThreadLocalDataSize() {
3439      return allData.size();
3440    }
3441  }
3442  
3443  /**
3444   * Get the Map of Statistics object indexed by URI Scheme.
3445   * @return a Map having a key as URI scheme and value as Statistics object
3446   * @deprecated use {@link #getAllStatistics} instead
3447   */
3448  @Deprecated
3449  public static synchronized Map<String, Statistics> getStatistics() {
3450    Map<String, Statistics> result = new HashMap<String, Statistics>();
3451    for(Statistics stat: statisticsTable.values()) {
3452      result.put(stat.getScheme(), stat);
3453    }
3454    return result;
3455  }
3456
3457  /**
3458   * Return the FileSystem classes that have Statistics
3459   */
3460  public static synchronized List<Statistics> getAllStatistics() {
3461    return new ArrayList<Statistics>(statisticsTable.values());
3462  }
3463  
3464  /**
3465   * Get the statistics for a particular file system
3466   * @param cls the class to lookup
3467   * @return a statistics object
3468   */
3469  public static synchronized 
3470  Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
3471    Statistics result = statisticsTable.get(cls);
3472    if (result == null) {
3473      result = new Statistics(scheme);
3474      statisticsTable.put(cls, result);
3475    }
3476    return result;
3477  }
3478  
3479  /**
3480   * Reset all statistics for all file systems
3481   */
3482  public static synchronized void clearStatistics() {
3483    for(Statistics stat: statisticsTable.values()) {
3484      stat.reset();
3485    }
3486  }
3487
3488  /**
3489   * Print all statistics for all file systems
3490   */
3491  public static synchronized
3492  void printStatistics() throws IOException {
3493    for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3494            statisticsTable.entrySet()) {
3495      System.out.println("  FileSystem " + pair.getKey().getName() + 
3496                         ": " + pair.getValue());
3497    }
3498  }
3499
3500  // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3501  private static boolean symlinksEnabled = false;
3502
3503  private static Configuration conf = null;
3504
3505  @VisibleForTesting
3506  public static boolean areSymlinksEnabled() {
3507    return symlinksEnabled;
3508  }
3509
3510  @VisibleForTesting
3511  public static void enableSymlinks() {
3512    symlinksEnabled = true;
3513  }
3514}