001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.ref.WeakReference;
024import java.lang.ref.ReferenceQueue;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.security.PrivilegedExceptionAction;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.IdentityHashMap;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.ServiceConfigurationError;
040import java.util.ServiceLoader;
041import java.util.Set;
042import java.util.Stack;
043import java.util.TreeSet;
044import java.util.concurrent.atomic.AtomicLong;
045
046import org.apache.commons.logging.Log;
047import org.apache.commons.logging.LogFactory;
048import org.apache.hadoop.classification.InterfaceAudience;
049import org.apache.hadoop.classification.InterfaceStability;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.conf.Configured;
052import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
053import org.apache.hadoop.fs.Options.ChecksumOpt;
054import org.apache.hadoop.fs.Options.Rename;
055import org.apache.hadoop.fs.permission.AclEntry;
056import org.apache.hadoop.fs.permission.AclStatus;
057import org.apache.hadoop.fs.permission.FsAction;
058import org.apache.hadoop.fs.permission.FsPermission;
059import org.apache.hadoop.io.MultipleIOException;
060import org.apache.hadoop.io.Text;
061import org.apache.hadoop.net.NetUtils;
062import org.apache.hadoop.security.AccessControlException;
063import org.apache.hadoop.security.Credentials;
064import org.apache.hadoop.security.SecurityUtil;
065import org.apache.hadoop.security.UserGroupInformation;
066import org.apache.hadoop.security.token.Token;
067import org.apache.hadoop.util.ClassUtil;
068import org.apache.hadoop.util.DataChecksum;
069import org.apache.hadoop.util.Progressable;
070import org.apache.hadoop.util.ReflectionUtils;
071import org.apache.hadoop.util.ShutdownHookManager;
072import org.apache.hadoop.util.StringUtils;
073import org.apache.htrace.core.Tracer;
074import org.apache.htrace.core.TraceScope;
075
076import com.google.common.annotations.VisibleForTesting;
077
078import static com.google.common.base.Preconditions.checkArgument;
079import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
080
081/****************************************************************
082 * An abstract base class for a fairly generic filesystem.  It
083 * may be implemented as a distributed filesystem, or as a "local"
084 * one that reflects the locally-connected disk.  The local version
085 * exists for small Hadoop instances and for testing.
086 *
087 * <p>
088 *
089 * All user code that may potentially use the Hadoop Distributed
090 * File System should be written to use a FileSystem object.  The
091 * Hadoop DFS is a multi-machine system that appears as a single
092 * disk.  It's useful because of its fault tolerance and potentially
093 * very large capacity.
094 * 
095 * <p>
096 * The local implementation is {@link LocalFileSystem} and distributed
097 * implementation is DistributedFileSystem.
098 *****************************************************************/
099@InterfaceAudience.Public
100@InterfaceStability.Stable
101public abstract class FileSystem extends Configured implements Closeable {
102  public static final String FS_DEFAULT_NAME_KEY = 
103                   CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
104  public static final String DEFAULT_FS = 
105                   CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
106
107  public static final Log LOG = LogFactory.getLog(FileSystem.class);
108
109  /**
110   * Priority of the FileSystem shutdown hook.
111   */
112  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
113
114  public static final String TRASH_PREFIX = ".Trash";
115
116  /** FileSystem cache */
117  static final Cache CACHE = new Cache();
118
119  /** The key this instance is stored under in the cache. */
120  private Cache.Key key;
121
122  /** Recording statistics per a FileSystem class */
123  private static final Map<Class<? extends FileSystem>, Statistics> 
124    statisticsTable =
125      new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
126  
127  /**
128   * The statistics for this file system.
129   */
130  protected Statistics statistics;
131
132  /**
133   * A cache of files that should be deleted when filesystem is closed
134   * or the JVM is exited.
135   */
136  private Set<Path> deleteOnExit = new TreeSet<Path>();
137  
138  boolean resolveSymlinks;
139
140  /**
141   * This method adds a file system for testing so that we can find it later. It
142   * is only for testing.
143   * @param uri the uri to store it under
144   * @param conf the configuration to store it under
145   * @param fs the file system to store
146   * @throws IOException
147   */
148  static void addFileSystemForTesting(URI uri, Configuration conf,
149      FileSystem fs) throws IOException {
150    CACHE.map.put(new Cache.Key(uri, conf), fs);
151  }
152
153  /**
154   * Get a filesystem instance based on the uri, the passed
155   * configuration and the user
156   * @param uri of the filesystem
157   * @param conf the configuration to use
158   * @param user to perform the get as
159   * @return the filesystem instance
160   * @throws IOException
161   * @throws InterruptedException
162   */
163  public static FileSystem get(final URI uri, final Configuration conf,
164        final String user) throws IOException, InterruptedException {
165    String ticketCachePath =
166      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
167    UserGroupInformation ugi =
168        UserGroupInformation.getBestUGI(ticketCachePath, user);
169    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
170      @Override
171      public FileSystem run() throws IOException {
172        return get(uri, conf);
173      }
174    });
175  }
176
177  /**
178   * Returns the configured filesystem implementation.
179   * @param conf the configuration to use
180   */
181  public static FileSystem get(Configuration conf) throws IOException {
182    return get(getDefaultUri(conf), conf);
183  }
184  
185  /** Get the default filesystem URI from a configuration.
186   * @param conf the configuration to use
187   * @return the uri of the default filesystem
188   */
189  public static URI getDefaultUri(Configuration conf) {
190    return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
191  }
192
193  /** Set the default filesystem URI in a configuration.
194   * @param conf the configuration to alter
195   * @param uri the new default filesystem uri
196   */
197  public static void setDefaultUri(Configuration conf, URI uri) {
198    conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
199  }
200
201  /** Set the default filesystem URI in a configuration.
202   * @param conf the configuration to alter
203   * @param uri the new default filesystem uri
204   */
205  public static void setDefaultUri(Configuration conf, String uri) {
206    setDefaultUri(conf, URI.create(fixName(uri)));
207  }
208
209  /** Called after a new FileSystem instance is constructed.
210   * @param name a uri whose authority section names the host, port, etc.
211   *   for this FileSystem
212   * @param conf the configuration
213   */
214  public void initialize(URI name, Configuration conf) throws IOException {
215    final String scheme;
216    if (name.getScheme() == null || name.getScheme().isEmpty()) {
217      scheme = getDefaultUri(conf).getScheme();
218    } else {
219      scheme = name.getScheme();
220    }
221    statistics = getStatistics(scheme, getClass());
222    resolveSymlinks = conf.getBoolean(
223        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
224        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
225  }
226
227  /**
228   * Return the protocol scheme for the FileSystem.
229   * <p/>
230   * This implementation throws an <code>UnsupportedOperationException</code>.
231   *
232   * @return the protocol scheme for the FileSystem.
233   */
234  public String getScheme() {
235    throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
236  }
237
238  /** Returns a URI whose scheme and authority identify this FileSystem.*/
239  public abstract URI getUri();
240  
241  /**
242   * Return a canonicalized form of this FileSystem's URI.
243   * 
244   * The default implementation simply calls {@link #canonicalizeUri(URI)}
245   * on the filesystem's own URI, so subclasses typically only need to
246   * implement that method.
247   *
248   * @see #canonicalizeUri(URI)
249   */
250  protected URI getCanonicalUri() {
251    return canonicalizeUri(getUri());
252  }
253  
254  /**
255   * Canonicalize the given URI.
256   * 
257   * This is filesystem-dependent, but may for example consist of
258   * canonicalizing the hostname using DNS and adding the default
259   * port if not specified.
260   * 
261   * The default implementation simply fills in the default port if
262   * not specified and if the filesystem has a default port.
263   *
264   * @return URI
265   * @see NetUtils#getCanonicalUri(URI, int)
266   */
267  protected URI canonicalizeUri(URI uri) {
268    if (uri.getPort() == -1 && getDefaultPort() > 0) {
269      // reconstruct the uri with the default port set
270      try {
271        uri = new URI(uri.getScheme(), uri.getUserInfo(),
272            uri.getHost(), getDefaultPort(),
273            uri.getPath(), uri.getQuery(), uri.getFragment());
274      } catch (URISyntaxException e) {
275        // Should never happen!
276        throw new AssertionError("Valid URI became unparseable: " +
277            uri);
278      }
279    }
280    
281    return uri;
282  }
283  
284  /**
285   * Get the default port for this file system.
286   * @return the default port or 0 if there isn't one
287   */
288  protected int getDefaultPort() {
289    return 0;
290  }
291
292  protected static FileSystem getFSofPath(final Path absOrFqPath,
293      final Configuration conf)
294      throws UnsupportedFileSystemException, IOException {
295    absOrFqPath.checkNotSchemeWithRelative();
296    absOrFqPath.checkNotRelative();
297
298    // Uses the default file system if not fully qualified
299    return get(absOrFqPath.toUri(), conf);
300  }
301
302  /**
303   * Get a canonical service name for this file system.  The token cache is
304   * the only user of the canonical service name, and uses it to lookup this
305   * filesystem's service tokens.
306   * If file system provides a token of its own then it must have a canonical
307   * name, otherwise canonical name can be null.
308   * 
309   * Default Impl: If the file system has child file systems 
310   * (such as an embedded file system) then it is assumed that the fs has no
311   * tokens of its own and hence returns a null name; otherwise a service
312   * name is built using Uri and port.
313   * 
314   * @return a service string that uniquely identifies this file system, null
315   *         if the filesystem does not implement tokens
316   * @see SecurityUtil#buildDTServiceName(URI, int) 
317   */
318  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
319  public String getCanonicalServiceName() {
320    return (getChildFileSystems() == null)
321      ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
322      : null;
323  }
324
325  /** @deprecated call #getUri() instead.*/
326  @Deprecated
327  public String getName() { return getUri().toString(); }
328
329  /** @deprecated call #get(URI,Configuration) instead. */
330  @Deprecated
331  public static FileSystem getNamed(String name, Configuration conf)
332    throws IOException {
333    return get(URI.create(fixName(name)), conf);
334  }
335  
336  /** Update old-format filesystem names, for back-compatibility.  This should
337   * eventually be replaced with a checkName() method that throws an exception
338   * for old-format names. */ 
339  private static String fixName(String name) {
340    // convert old-format name to new-format name
341    if (name.equals("local")) {         // "local" is now "file:///".
342      LOG.warn("\"local\" is a deprecated filesystem name."
343               +" Use \"file:///\" instead.");
344      name = "file:///";
345    } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
346      LOG.warn("\""+name+"\" is a deprecated filesystem name."
347               +" Use \"hdfs://"+name+"/\" instead.");
348      name = "hdfs://"+name;
349    }
350    return name;
351  }
352
353  /**
354   * Get the local file system.
355   * @param conf the configuration to configure the file system with
356   * @return a LocalFileSystem
357   */
358  public static LocalFileSystem getLocal(Configuration conf)
359    throws IOException {
360    return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
361  }
362
363  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
364   * of the URI determines a configuration property name,
365   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
366   * The entire URI is passed to the FileSystem instance's initialize method.
367   */
368  public static FileSystem get(URI uri, Configuration conf) throws IOException {
369    String scheme = uri.getScheme();
370    String authority = uri.getAuthority();
371
372    if (scheme == null && authority == null) {     // use default FS
373      return get(conf);
374    }
375
376    if (scheme != null && authority == null) {     // no authority
377      URI defaultUri = getDefaultUri(conf);
378      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
379          && defaultUri.getAuthority() != null) {  // & default has authority
380        return get(defaultUri, conf);              // return default
381      }
382    }
383    
384    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
385    if (conf.getBoolean(disableCacheName, false)) {
386      return createFileSystem(uri, conf);
387    }
388
389    return CACHE.get(uri, conf);
390  }
391
392  /**
393   * Returns the FileSystem for this URI's scheme and authority and the 
394   * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
395   * @param uri of the filesystem
396   * @param conf the configuration to use
397   * @param user to perform the get as
398   * @return filesystem instance
399   * @throws IOException
400   * @throws InterruptedException
401   */
402  public static FileSystem newInstance(final URI uri, final Configuration conf,
403      final String user) throws IOException, InterruptedException {
404    String ticketCachePath =
405      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
406    UserGroupInformation ugi =
407        UserGroupInformation.getBestUGI(ticketCachePath, user);
408    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
409      @Override
410      public FileSystem run() throws IOException {
411        return newInstance(uri,conf); 
412      }
413    });
414  }
415  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
416   * of the URI determines a configuration property name,
417   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
418   * The entire URI is passed to the FileSystem instance's initialize method.
419   * This always returns a new FileSystem object.
420   */
421  public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
422    String scheme = uri.getScheme();
423    String authority = uri.getAuthority();
424
425    if (scheme == null) {                       // no scheme: use default FS
426      return newInstance(conf);
427    }
428
429    if (authority == null) {                       // no authority
430      URI defaultUri = getDefaultUri(conf);
431      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
432          && defaultUri.getAuthority() != null) {  // & default has authority
433        return newInstance(defaultUri, conf);              // return default
434      }
435    }
436    return CACHE.getUnique(uri, conf);
437  }
438
439  /** Returns a unique configured filesystem implementation.
440   * This always returns a new FileSystem object.
441   * @param conf the configuration to use
442   */
443  public static FileSystem newInstance(Configuration conf) throws IOException {
444    return newInstance(getDefaultUri(conf), conf);
445  }
446
447  /**
448   * Get a unique local file system object
449   * @param conf the configuration to configure the file system with
450   * @return a LocalFileSystem
451   * This always returns a new FileSystem object.
452   */
453  public static LocalFileSystem newInstanceLocal(Configuration conf)
454    throws IOException {
455    return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
456  }
457
458  /**
459   * Close all cached filesystems. Be sure those filesystems are not
460   * used anymore.
461   * 
462   * @throws IOException
463   */
464  public static void closeAll() throws IOException {
465    CACHE.closeAll();
466  }
467
468  /**
469   * Close all cached filesystems for a given UGI. Be sure those filesystems 
470   * are not used anymore.
471   * @param ugi user group info to close
472   * @throws IOException
473   */
474  public static void closeAllForUGI(UserGroupInformation ugi) 
475  throws IOException {
476    CACHE.closeAll(ugi);
477  }
478
479  /** 
480   * Make sure that a path specifies a FileSystem.
481   * @param path to use
482   */
483  public Path makeQualified(Path path) {
484    checkPath(path);
485    return path.makeQualified(this.getUri(), this.getWorkingDirectory());
486  }
487    
488  /**
489   * Get a new delegation token for this file system.
490   * This is an internal method that should have been declared protected
491   * but wasn't historically.
492   * Callers should use {@link #addDelegationTokens(String, Credentials)}
493   * 
494   * @param renewer the account name that is allowed to renew the token.
495   * @return a new delegation token
496   * @throws IOException
497   */
498  @InterfaceAudience.Private()
499  public Token<?> getDelegationToken(String renewer) throws IOException {
500    return null;
501  }
502  
503  /**
504   * Obtain all delegation tokens used by this FileSystem that are not
505   * already present in the given Credentials.  Existing tokens will neither
506   * be verified as valid nor having the given renewer.  Missing tokens will
507   * be acquired and added to the given Credentials.
508   * 
509   * Default Impl: works for simple fs with its own token
510   * and also for an embedded fs whose tokens are those of its
511   * children file system (i.e. the embedded fs has not tokens of its
512   * own).
513   * 
514   * @param renewer the user allowed to renew the delegation tokens
515   * @param credentials cache in which to add new delegation tokens
516   * @return list of new delegation tokens
517   * @throws IOException
518   */
519  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
520  public Token<?>[] addDelegationTokens(
521      final String renewer, Credentials credentials) throws IOException {
522    if (credentials == null) {
523      credentials = new Credentials();
524    }
525    final List<Token<?>> tokens = new ArrayList<Token<?>>();
526    collectDelegationTokens(renewer, credentials, tokens);
527    return tokens.toArray(new Token<?>[tokens.size()]);
528  }
529  
530  /**
531   * Recursively obtain the tokens for this FileSystem and all descended
532   * FileSystems as determined by getChildFileSystems().
533   * @param renewer the user allowed to renew the delegation tokens
534   * @param credentials cache in which to add the new delegation tokens
535   * @param tokens list in which to add acquired tokens
536   * @throws IOException
537   */
538  private void collectDelegationTokens(final String renewer,
539                                       final Credentials credentials,
540                                       final List<Token<?>> tokens)
541                                           throws IOException {
542    final String serviceName = getCanonicalServiceName();
543    // Collect token of the this filesystem and then of its embedded children
544    if (serviceName != null) { // fs has token, grab it
545      final Text service = new Text(serviceName);
546      Token<?> token = credentials.getToken(service);
547      if (token == null) {
548        token = getDelegationToken(renewer);
549        if (token != null) {
550          tokens.add(token);
551          credentials.addToken(service, token);
552        }
553      }
554    }
555    // Now collect the tokens from the children
556    final FileSystem[] children = getChildFileSystems();
557    if (children != null) {
558      for (final FileSystem fs : children) {
559        fs.collectDelegationTokens(renewer, credentials, tokens);
560      }
561    }
562  }
563
564  /**
565   * Get all the immediate child FileSystems embedded in this FileSystem.
566   * It does not recurse and get grand children.  If a FileSystem
567   * has multiple child FileSystems, then it should return a unique list
568   * of those FileSystems.  Default is to return null to signify no children.
569   * 
570   * @return FileSystems used by this FileSystem
571   */
572  @InterfaceAudience.LimitedPrivate({ "HDFS" })
573  @VisibleForTesting
574  public FileSystem[] getChildFileSystems() {
575    return null;
576  }
577  
578  /** create a file with the provided permission
579   * The permission of the file is set to be the provided permission as in
580   * setPermission, not permission&~umask
581   * 
582   * It is implemented using two RPCs. It is understood that it is inefficient,
583   * but the implementation is thread-safe. The other option is to change the
584   * value of umask in configuration to be 0, but it is not thread-safe.
585   * 
586   * @param fs file system handle
587   * @param file the name of the file to be created
588   * @param permission the permission of the file
589   * @return an output stream
590   * @throws IOException
591   */
592  public static FSDataOutputStream create(FileSystem fs,
593      Path file, FsPermission permission) throws IOException {
594    // create the file with default permission
595    FSDataOutputStream out = fs.create(file);
596    // set its permission to the supplied one
597    fs.setPermission(file, permission);
598    return out;
599  }
600
601  /** create a directory with the provided permission
602   * The permission of the directory is set to be the provided permission as in
603   * setPermission, not permission&~umask
604   * 
605   * @see #create(FileSystem, Path, FsPermission)
606   * 
607   * @param fs file system handle
608   * @param dir the name of the directory to be created
609   * @param permission the permission of the directory
610   * @return true if the directory creation succeeds; false otherwise
611   * @throws IOException
612   */
613  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
614  throws IOException {
615    // create the directory using the default permission
616    boolean result = fs.mkdirs(dir);
617    // set its permission to be the supplied one
618    fs.setPermission(dir, permission);
619    return result;
620  }
621
622  ///////////////////////////////////////////////////////////////
623  // FileSystem
624  ///////////////////////////////////////////////////////////////
625
626  protected FileSystem() {
627    super(null);
628  }
629
630  /** 
631   * Check that a Path belongs to this FileSystem.
632   * @param path to check
633   */
634  protected void checkPath(Path path) {
635    URI uri = path.toUri();
636    String thatScheme = uri.getScheme();
637    if (thatScheme == null)                // fs is relative
638      return;
639    URI thisUri = getCanonicalUri();
640    String thisScheme = thisUri.getScheme();
641    //authority and scheme are not case sensitive
642    if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
643      String thisAuthority = thisUri.getAuthority();
644      String thatAuthority = uri.getAuthority();
645      if (thatAuthority == null &&                // path's authority is null
646          thisAuthority != null) {                // fs has an authority
647        URI defaultUri = getDefaultUri(getConf());
648        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
649          uri = defaultUri; // schemes match, so use this uri instead
650        } else {
651          uri = null; // can't determine auth of the path
652        }
653      }
654      if (uri != null) {
655        // canonicalize uri before comparing with this fs
656        uri = canonicalizeUri(uri);
657        thatAuthority = uri.getAuthority();
658        if (thisAuthority == thatAuthority ||       // authorities match
659            (thisAuthority != null &&
660             thisAuthority.equalsIgnoreCase(thatAuthority)))
661          return;
662      }
663    }
664    throw new IllegalArgumentException("Wrong FS: "+path+
665                                       ", expected: "+this.getUri());
666  }
667
668  /**
669   * Return an array containing hostnames, offset and size of 
670   * portions of the given file.  For a nonexistent 
671   * file or regions, null will be returned.
672   *
673   * This call is most helpful with DFS, where it returns 
674   * hostnames of machines that contain the given file.
675   *
676   * The FileSystem will simply return an elt containing 'localhost'.
677   *
678   * @param file FilesStatus to get data from
679   * @param start offset into the given file
680   * @param len length for which to get locations for
681   */
682  public BlockLocation[] getFileBlockLocations(FileStatus file, 
683      long start, long len) throws IOException {
684    if (file == null) {
685      return null;
686    }
687
688    if (start < 0 || len < 0) {
689      throw new IllegalArgumentException("Invalid start or len parameter");
690    }
691
692    if (file.getLen() <= start) {
693      return new BlockLocation[0];
694
695    }
696    String[] name = {"localhost:9866"};
697    String[] host = {"localhost"};
698    return new BlockLocation[] {
699      new BlockLocation(name, host, 0, file.getLen()) };
700  }
701 
702
703  /**
704   * Return an array containing hostnames, offset and size of 
705   * portions of the given file.  For a nonexistent 
706   * file or regions, null will be returned.
707   *
708   * This call is most helpful with DFS, where it returns 
709   * hostnames of machines that contain the given file.
710   *
711   * The FileSystem will simply return an elt containing 'localhost'.
712   *
713   * @param p path is used to identify an FS since an FS could have
714   *          another FS that it could be delegating the call to
715   * @param start offset into the given file
716   * @param len length for which to get locations for
717   */
718  public BlockLocation[] getFileBlockLocations(Path p, 
719      long start, long len) throws IOException {
720    if (p == null) {
721      throw new NullPointerException();
722    }
723    FileStatus file = getFileStatus(p);
724    return getFileBlockLocations(file, start, len);
725  }
726  
727  /**
728   * Return a set of server default configuration values
729   * @return server default configuration values
730   * @throws IOException
731   * @deprecated use {@link #getServerDefaults(Path)} instead
732   */
733  @Deprecated
734  public FsServerDefaults getServerDefaults() throws IOException {
735    Configuration conf = getConf();
736    // CRC32 is chosen as default as it is available in all 
737    // releases that support checksum.
738    // The client trash configuration is ignored.
739    return new FsServerDefaults(getDefaultBlockSize(), 
740        conf.getInt("io.bytes.per.checksum", 512), 
741        64 * 1024, 
742        getDefaultReplication(),
743        conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT),
744        false,
745        FS_TRASH_INTERVAL_DEFAULT,
746        DataChecksum.Type.CRC32);
747  }
748
749  /**
750   * Return a set of server default configuration values
751   * @param p path is used to identify an FS since an FS could have
752   *          another FS that it could be delegating the call to
753   * @return server default configuration values
754   * @throws IOException
755   */
756  public FsServerDefaults getServerDefaults(Path p) throws IOException {
757    return getServerDefaults();
758  }
759
760  /**
761   * Return the fully-qualified path of path f resolving the path
762   * through any symlinks or mount point
763   * @param p path to be resolved
764   * @return fully qualified path 
765   * @throws FileNotFoundException
766   */
767   public Path resolvePath(final Path p) throws IOException {
768     checkPath(p);
769     return getFileStatus(p).getPath();
770   }
771
772  /**
773   * Opens an FSDataInputStream at the indicated Path.
774   * @param f the file name to open
775   * @param bufferSize the size of the buffer to be used.
776   */
777  public abstract FSDataInputStream open(Path f, int bufferSize)
778    throws IOException;
779    
780  /**
781   * Opens an FSDataInputStream at the indicated Path.
782   * @param f the file to open
783   */
784  public FSDataInputStream open(Path f) throws IOException {
785    return open(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
786        IO_FILE_BUFFER_SIZE_DEFAULT));
787  }
788
789  /**
790   * Create an FSDataOutputStream at the indicated Path.
791   * Files are overwritten by default.
792   * @param f the file to create
793   */
794  public FSDataOutputStream create(Path f) throws IOException {
795    return create(f, true);
796  }
797
798  /**
799   * Create an FSDataOutputStream at the indicated Path.
800   * @param f the file to create
801   * @param overwrite if a file with this name already exists, then if true,
802   *   the file will be overwritten, and if false an exception will be thrown.
803   */
804  public FSDataOutputStream create(Path f, boolean overwrite)
805      throws IOException {
806    return create(f, overwrite, 
807                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
808                      IO_FILE_BUFFER_SIZE_DEFAULT),
809                  getDefaultReplication(f),
810                  getDefaultBlockSize(f));
811  }
812
813  /**
814   * Create an FSDataOutputStream at the indicated Path with write-progress
815   * reporting.
816   * Files are overwritten by default.
817   * @param f the file to create
818   * @param progress to report progress
819   */
820  public FSDataOutputStream create(Path f, Progressable progress) 
821      throws IOException {
822    return create(f, true, 
823                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
824                      IO_FILE_BUFFER_SIZE_DEFAULT),
825                  getDefaultReplication(f),
826                  getDefaultBlockSize(f), progress);
827  }
828
829  /**
830   * Create an FSDataOutputStream at the indicated Path.
831   * Files are overwritten by default.
832   * @param f the file to create
833   * @param replication the replication factor
834   */
835  public FSDataOutputStream create(Path f, short replication)
836      throws IOException {
837    return create(f, true, 
838                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
839                      IO_FILE_BUFFER_SIZE_DEFAULT),
840                  replication,
841                  getDefaultBlockSize(f));
842  }
843
844  /**
845   * Create an FSDataOutputStream at the indicated Path with write-progress
846   * reporting.
847   * Files are overwritten by default.
848   * @param f the file to create
849   * @param replication the replication factor
850   * @param progress to report progress
851   */
852  public FSDataOutputStream create(Path f, short replication, 
853      Progressable progress) throws IOException {
854    return create(f, true, 
855                  getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
856                      IO_FILE_BUFFER_SIZE_DEFAULT),
857                  replication, getDefaultBlockSize(f), progress);
858  }
859
860    
861  /**
862   * Create an FSDataOutputStream at the indicated Path.
863   * @param f the file name to create
864   * @param overwrite if a file with this name already exists, then if true,
865   *   the file will be overwritten, and if false an error will be thrown.
866   * @param bufferSize the size of the buffer to be used.
867   */
868  public FSDataOutputStream create(Path f, 
869                                   boolean overwrite,
870                                   int bufferSize
871                                   ) throws IOException {
872    return create(f, overwrite, bufferSize, 
873                  getDefaultReplication(f),
874                  getDefaultBlockSize(f));
875  }
876    
877  /**
878   * Create an FSDataOutputStream at the indicated Path with write-progress
879   * reporting.
880   * @param f the path of the file to open
881   * @param overwrite if a file with this name already exists, then if true,
882   *   the file will be overwritten, and if false an error will be thrown.
883   * @param bufferSize the size of the buffer to be used.
884   */
885  public FSDataOutputStream create(Path f, 
886                                   boolean overwrite,
887                                   int bufferSize,
888                                   Progressable progress
889                                   ) throws IOException {
890    return create(f, overwrite, bufferSize, 
891                  getDefaultReplication(f),
892                  getDefaultBlockSize(f), progress);
893  }
894    
895    
896  /**
897   * Create an FSDataOutputStream at the indicated Path.
898   * @param f the file name to open
899   * @param overwrite if a file with this name already exists, then if true,
900   *   the file will be overwritten, and if false an error will be thrown.
901   * @param bufferSize the size of the buffer to be used.
902   * @param replication required block replication for the file. 
903   */
904  public FSDataOutputStream create(Path f, 
905                                   boolean overwrite,
906                                   int bufferSize,
907                                   short replication,
908                                   long blockSize
909                                   ) throws IOException {
910    return create(f, overwrite, bufferSize, replication, blockSize, null);
911  }
912
913  /**
914   * Create an FSDataOutputStream at the indicated Path with write-progress
915   * reporting.
916   * @param f the file name to open
917   * @param overwrite if a file with this name already exists, then if true,
918   *   the file will be overwritten, and if false an error will be thrown.
919   * @param bufferSize the size of the buffer to be used.
920   * @param replication required block replication for the file. 
921   */
922  public FSDataOutputStream create(Path f,
923                                            boolean overwrite,
924                                            int bufferSize,
925                                            short replication,
926                                            long blockSize,
927                                            Progressable progress
928                                            ) throws IOException {
929    return this.create(f, FsPermission.getFileDefault().applyUMask(
930        FsPermission.getUMask(getConf())), overwrite, bufferSize,
931        replication, blockSize, progress);
932  }
933
934  /**
935   * Create an FSDataOutputStream at the indicated Path with write-progress
936   * reporting.
937   * @param f the file name to open
938   * @param permission
939   * @param overwrite if a file with this name already exists, then if true,
940   *   the file will be overwritten, and if false an error will be thrown.
941   * @param bufferSize the size of the buffer to be used.
942   * @param replication required block replication for the file.
943   * @param blockSize
944   * @param progress
945   * @throws IOException
946   * @see #setPermission(Path, FsPermission)
947   */
948  public abstract FSDataOutputStream create(Path f,
949      FsPermission permission,
950      boolean overwrite,
951      int bufferSize,
952      short replication,
953      long blockSize,
954      Progressable progress) throws IOException;
955  
956  /**
957   * Create an FSDataOutputStream at the indicated Path with write-progress
958   * reporting.
959   * @param f the file name to open
960   * @param permission
961   * @param flags {@link CreateFlag}s to use for this stream.
962   * @param bufferSize the size of the buffer to be used.
963   * @param replication required block replication for the file.
964   * @param blockSize
965   * @param progress
966   * @throws IOException
967   * @see #setPermission(Path, FsPermission)
968   */
969  public FSDataOutputStream create(Path f,
970      FsPermission permission,
971      EnumSet<CreateFlag> flags,
972      int bufferSize,
973      short replication,
974      long blockSize,
975      Progressable progress) throws IOException {
976    return create(f, permission, flags, bufferSize, replication,
977        blockSize, progress, null);
978  }
979  
980  /**
981   * Create an FSDataOutputStream at the indicated Path with a custom
982   * checksum option
983   * @param f the file name to open
984   * @param permission
985   * @param flags {@link CreateFlag}s to use for this stream.
986   * @param bufferSize the size of the buffer to be used.
987   * @param replication required block replication for the file.
988   * @param blockSize
989   * @param progress
990   * @param checksumOpt checksum parameter. If null, the values
991   *        found in conf will be used.
992   * @throws IOException
993   * @see #setPermission(Path, FsPermission)
994   */
995  public FSDataOutputStream create(Path f,
996      FsPermission permission,
997      EnumSet<CreateFlag> flags,
998      int bufferSize,
999      short replication,
1000      long blockSize,
1001      Progressable progress,
1002      ChecksumOpt checksumOpt) throws IOException {
1003    // Checksum options are ignored by default. The file systems that
1004    // implement checksum need to override this method. The full
1005    // support is currently only available in DFS.
1006    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
1007        bufferSize, replication, blockSize, progress);
1008  }
1009
1010  /*.
1011   * This create has been added to support the FileContext that processes
1012   * the permission
1013   * with umask before calling this method.
1014   * This a temporary method added to support the transition from FileSystem
1015   * to FileContext for user applications.
1016   */
1017  @Deprecated
1018  protected FSDataOutputStream primitiveCreate(Path f,
1019     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
1020     short replication, long blockSize, Progressable progress,
1021     ChecksumOpt checksumOpt) throws IOException {
1022
1023    boolean pathExists = exists(f);
1024    CreateFlag.validate(f, pathExists, flag);
1025    
1026    // Default impl  assumes that permissions do not matter and 
1027    // nor does the bytesPerChecksum  hence
1028    // calling the regular create is good enough.
1029    // FSs that implement permissions should override this.
1030
1031    if (pathExists && flag.contains(CreateFlag.APPEND)) {
1032      return append(f, bufferSize, progress);
1033    }
1034    
1035    return this.create(f, absolutePermission,
1036        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1037        blockSize, progress);
1038  }
1039  
1040  /**
1041   * This version of the mkdirs method assumes that the permission is absolute.
1042   * It has been added to support the FileContext that processes the permission
1043   * with umask before calling this method.
1044   * This a temporary method added to support the transition from FileSystem
1045   * to FileContext for user applications.
1046   */
1047  @Deprecated
1048  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1049    throws IOException {
1050    // Default impl is to assume that permissions do not matter and hence
1051    // calling the regular mkdirs is good enough.
1052    // FSs that implement permissions should override this.
1053   return this.mkdirs(f, absolutePermission);
1054  }
1055
1056
1057  /**
1058   * This version of the mkdirs method assumes that the permission is absolute.
1059   * It has been added to support the FileContext that processes the permission
1060   * with umask before calling this method.
1061   * This a temporary method added to support the transition from FileSystem
1062   * to FileContext for user applications.
1063   */
1064  @Deprecated
1065  protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1066                    boolean createParent)
1067    throws IOException {
1068    
1069    if (!createParent) { // parent must exist.
1070      // since the this.mkdirs makes parent dirs automatically
1071      // we must throw exception if parent does not exist.
1072      final FileStatus stat = getFileStatus(f.getParent());
1073      if (stat == null) {
1074        throw new FileNotFoundException("Missing parent:" + f);
1075      }
1076      if (!stat.isDirectory()) {
1077        throw new ParentNotDirectoryException("parent is not a dir");
1078      }
1079      // parent does exist - go ahead with mkdir of leaf
1080    }
1081    // Default impl is to assume that permissions do not matter and hence
1082    // calling the regular mkdirs is good enough.
1083    // FSs that implement permissions should override this.
1084    if (!this.mkdirs(f, absolutePermission)) {
1085      throw new IOException("mkdir of "+ f + " failed");
1086    }
1087  }
1088
1089  /**
1090   * Opens an FSDataOutputStream at the indicated Path with write-progress
1091   * reporting. Same as create(), except fails if parent directory doesn't
1092   * already exist.
1093   * @param f the file name to open
1094   * @param overwrite if a file with this name already exists, then if true,
1095   * the file will be overwritten, and if false an error will be thrown.
1096   * @param bufferSize the size of the buffer to be used.
1097   * @param replication required block replication for the file.
1098   * @param blockSize
1099   * @param progress
1100   * @throws IOException
1101   * @see #setPermission(Path, FsPermission)
1102   */
1103  public FSDataOutputStream createNonRecursive(Path f,
1104      boolean overwrite,
1105      int bufferSize, short replication, long blockSize,
1106      Progressable progress) throws IOException {
1107    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1108        overwrite, bufferSize, replication, blockSize, progress);
1109  }
1110
1111  /**
1112   * Opens an FSDataOutputStream at the indicated Path with write-progress
1113   * reporting. Same as create(), except fails if parent directory doesn't
1114   * already exist.
1115   * @param f the file name to open
1116   * @param permission
1117   * @param overwrite if a file with this name already exists, then if true,
1118   * the file will be overwritten, and if false an error will be thrown.
1119   * @param bufferSize the size of the buffer to be used.
1120   * @param replication required block replication for the file.
1121   * @param blockSize
1122   * @param progress
1123   * @throws IOException
1124   * @see #setPermission(Path, FsPermission)
1125   */
1126   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1127       boolean overwrite, int bufferSize, short replication, long blockSize,
1128       Progressable progress) throws IOException {
1129     return createNonRecursive(f, permission,
1130         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1131             : EnumSet.of(CreateFlag.CREATE), bufferSize,
1132             replication, blockSize, progress);
1133   }
1134
1135   /**
1136    * Opens an FSDataOutputStream at the indicated Path with write-progress
1137    * reporting. Same as create(), except fails if parent directory doesn't
1138    * already exist.
1139    * @param f the file name to open
1140    * @param permission
1141    * @param flags {@link CreateFlag}s to use for this stream.
1142    * @param bufferSize the size of the buffer to be used.
1143    * @param replication required block replication for the file.
1144    * @param blockSize
1145    * @param progress
1146    * @throws IOException
1147    * @see #setPermission(Path, FsPermission)
1148    */
1149    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1150        EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1151        Progressable progress) throws IOException {
1152      throw new IOException("createNonRecursive unsupported for this filesystem "
1153          + this.getClass());
1154    }
1155
1156  /**
1157   * Creates the given Path as a brand-new zero-length file.  If
1158   * create fails, or if it already existed, return false.
1159   *
1160   * @param f path to use for create
1161   */
1162  public boolean createNewFile(Path f) throws IOException {
1163    if (exists(f)) {
1164      return false;
1165    } else {
1166      create(f, false, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
1167          IO_FILE_BUFFER_SIZE_DEFAULT)).close();
1168      return true;
1169    }
1170  }
1171
1172  /**
1173   * Append to an existing file (optional operation).
1174   * Same as append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
1175   *     IO_FILE_BUFFER_SIZE_DEFAULT), null)
1176   * @param f the existing file to be appended.
1177   * @throws IOException
1178   */
1179  public FSDataOutputStream append(Path f) throws IOException {
1180    return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
1181        IO_FILE_BUFFER_SIZE_DEFAULT), null);
1182  }
1183  /**
1184   * Append to an existing file (optional operation).
1185   * Same as append(f, bufferSize, null).
1186   * @param f the existing file to be appended.
1187   * @param bufferSize the size of the buffer to be used.
1188   * @throws IOException
1189   */
1190  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1191    return append(f, bufferSize, null);
1192  }
1193
1194  /**
1195   * Append to an existing file (optional operation).
1196   * @param f the existing file to be appended.
1197   * @param bufferSize the size of the buffer to be used.
1198   * @param progress for reporting progress if it is not null.
1199   * @throws IOException
1200   */
1201  public abstract FSDataOutputStream append(Path f, int bufferSize,
1202      Progressable progress) throws IOException;
1203
1204  /**
1205   * Concat existing files together.
1206   * @param trg the path to the target destination.
1207   * @param psrcs the paths to the sources to use for the concatenation.
1208   * @throws IOException
1209   */
1210  public void concat(final Path trg, final Path [] psrcs) throws IOException {
1211    throw new UnsupportedOperationException("Not implemented by the " + 
1212        getClass().getSimpleName() + " FileSystem implementation");
1213  }
1214
1215 /**
1216   * Get replication.
1217   * 
1218   * @deprecated Use getFileStatus() instead
1219   * @param src file name
1220   * @return file replication
1221   * @throws IOException
1222   */ 
1223  @Deprecated
1224  public short getReplication(Path src) throws IOException {
1225    return getFileStatus(src).getReplication();
1226  }
1227
1228  /**
1229   * Set replication for an existing file.
1230   * 
1231   * @param src file name
1232   * @param replication new replication
1233   * @throws IOException
1234   * @return true if successful;
1235   *         false if file does not exist or is a directory
1236   */
1237  public boolean setReplication(Path src, short replication)
1238    throws IOException {
1239    return true;
1240  }
1241
1242  /**
1243   * Renames Path src to Path dst.  Can take place on local fs
1244   * or remote DFS.
1245   * @param src path to be renamed
1246   * @param dst new path after rename
1247   * @throws IOException on failure
1248   * @return true if rename is successful
1249   */
1250  public abstract boolean rename(Path src, Path dst) throws IOException;
1251
1252  /**
1253   * Renames Path src to Path dst
1254   * <ul>
1255   * <li>Fails if src is a file and dst is a directory.
1256   * <li>Fails if src is a directory and dst is a file.
1257   * <li>Fails if the parent of dst does not exist or is a file.
1258   * </ul>
1259   * <p>
1260   * If OVERWRITE option is not passed as an argument, rename fails
1261   * if the dst already exists.
1262   * <p>
1263   * If OVERWRITE option is passed as an argument, rename overwrites
1264   * the dst if it is a file or an empty directory. Rename fails if dst is
1265   * a non-empty directory.
1266   * <p>
1267   * Note that atomicity of rename is dependent on the file system
1268   * implementation. Please refer to the file system documentation for
1269   * details. This default implementation is non atomic.
1270   * <p>
1271   * This method is deprecated since it is a temporary method added to 
1272   * support the transition from FileSystem to FileContext for user 
1273   * applications.
1274   * 
1275   * @param src path to be renamed
1276   * @param dst new path after rename
1277   * @throws IOException on failure
1278   */
1279  @Deprecated
1280  protected void rename(final Path src, final Path dst,
1281      final Rename... options) throws IOException {
1282    // Default implementation
1283    final FileStatus srcStatus = getFileLinkStatus(src);
1284    if (srcStatus == null) {
1285      throw new FileNotFoundException("rename source " + src + " not found.");
1286    }
1287
1288    boolean overwrite = false;
1289    if (null != options) {
1290      for (Rename option : options) {
1291        if (option == Rename.OVERWRITE) {
1292          overwrite = true;
1293        }
1294      }
1295    }
1296
1297    FileStatus dstStatus;
1298    try {
1299      dstStatus = getFileLinkStatus(dst);
1300    } catch (IOException e) {
1301      dstStatus = null;
1302    }
1303    if (dstStatus != null) {
1304      if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1305        throw new IOException("Source " + src + " Destination " + dst
1306            + " both should be either file or directory");
1307      }
1308      if (!overwrite) {
1309        throw new FileAlreadyExistsException("rename destination " + dst
1310            + " already exists.");
1311      }
1312      // Delete the destination that is a file or an empty directory
1313      if (dstStatus.isDirectory()) {
1314        FileStatus[] list = listStatus(dst);
1315        if (list != null && list.length != 0) {
1316          throw new IOException(
1317              "rename cannot overwrite non empty destination directory " + dst);
1318        }
1319      }
1320      delete(dst, false);
1321    } else {
1322      final Path parent = dst.getParent();
1323      final FileStatus parentStatus = getFileStatus(parent);
1324      if (parentStatus == null) {
1325        throw new FileNotFoundException("rename destination parent " + parent
1326            + " not found.");
1327      }
1328      if (!parentStatus.isDirectory()) {
1329        throw new ParentNotDirectoryException("rename destination parent " + parent
1330            + " is a file.");
1331      }
1332    }
1333    if (!rename(src, dst)) {
1334      throw new IOException("rename from " + src + " to " + dst + " failed.");
1335    }
1336  }
1337
1338  /**
1339   * Truncate the file in the indicated path to the indicated size.
1340   * <ul>
1341   * <li>Fails if path is a directory.
1342   * <li>Fails if path does not exist.
1343   * <li>Fails if path is not closed.
1344   * <li>Fails if new size is greater than current size.
1345   * </ul>
1346   * @param f The path to the file to be truncated
1347   * @param newLength The size the file is to be truncated to
1348   *
1349   * @return <code>true</code> if the file has been truncated to the desired
1350   * <code>newLength</code> and is immediately available to be reused for
1351   * write operations such as <code>append</code>, or
1352   * <code>false</code> if a background process of adjusting the length of
1353   * the last block has been started, and clients should wait for it to
1354   * complete before proceeding with further file updates.
1355   */
1356  public boolean truncate(Path f, long newLength) throws IOException {
1357    throw new UnsupportedOperationException("Not implemented by the " +
1358        getClass().getSimpleName() + " FileSystem implementation");
1359  }
1360  
1361  /**
1362   * Delete a file 
1363   * @deprecated Use {@link #delete(Path, boolean)} instead.
1364   */
1365  @Deprecated
1366  public boolean delete(Path f) throws IOException {
1367    return delete(f, true);
1368  }
1369  
1370  /** Delete a file.
1371   *
1372   * @param f the path to delete.
1373   * @param recursive if path is a directory and set to 
1374   * true, the directory is deleted else throws an exception. In
1375   * case of a file the recursive can be set to either true or false. 
1376   * @return  true if delete is successful else false. 
1377   * @throws IOException
1378   */
1379  public abstract boolean delete(Path f, boolean recursive) throws IOException;
1380
1381  /**
1382   * Mark a path to be deleted when FileSystem is closed.
1383   * When the JVM shuts down,
1384   * all FileSystem objects will be closed automatically.
1385   * Then,
1386   * the marked path will be deleted as a result of closing the FileSystem.
1387   *
1388   * The path has to exist in the file system.
1389   * 
1390   * @param f the path to delete.
1391   * @return  true if deleteOnExit is successful, otherwise false.
1392   * @throws IOException
1393   */
1394  public boolean deleteOnExit(Path f) throws IOException {
1395    if (!exists(f)) {
1396      return false;
1397    }
1398    synchronized (deleteOnExit) {
1399      deleteOnExit.add(f);
1400    }
1401    return true;
1402  }
1403  
1404  /**
1405   * Cancel the deletion of the path when the FileSystem is closed
1406   * @param f the path to cancel deletion
1407   */
1408  public boolean cancelDeleteOnExit(Path f) {
1409    synchronized (deleteOnExit) {
1410      return deleteOnExit.remove(f);
1411    }
1412  }
1413
1414  /**
1415   * Delete all files that were marked as delete-on-exit. This recursively
1416   * deletes all files in the specified paths.
1417   */
1418  protected void processDeleteOnExit() {
1419    synchronized (deleteOnExit) {
1420      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1421        Path path = iter.next();
1422        try {
1423          if (exists(path)) {
1424            delete(path, true);
1425          }
1426        }
1427        catch (IOException e) {
1428          LOG.info("Ignoring failure to deleteOnExit for path " + path);
1429        }
1430        iter.remove();
1431      }
1432    }
1433  }
1434  
1435  /** Check if exists.
1436   * @param f source file
1437   */
1438  public boolean exists(Path f) throws IOException {
1439    try {
1440      return getFileStatus(f) != null;
1441    } catch (FileNotFoundException e) {
1442      return false;
1443    }
1444  }
1445
1446  /** True iff the named path is a directory.
1447   * Note: Avoid using this method. Instead reuse the FileStatus 
1448   * returned by getFileStatus() or listStatus() methods.
1449   * @param f path to check
1450   */
1451  public boolean isDirectory(Path f) throws IOException {
1452    try {
1453      return getFileStatus(f).isDirectory();
1454    } catch (FileNotFoundException e) {
1455      return false;               // f does not exist
1456    }
1457  }
1458
1459  /** True iff the named path is a regular file.
1460   * Note: Avoid using this method. Instead reuse the FileStatus 
1461   * returned by getFileStatus() or listStatus() methods.
1462   * @param f path to check
1463   */
1464  public boolean isFile(Path f) throws IOException {
1465    try {
1466      return getFileStatus(f).isFile();
1467    } catch (FileNotFoundException e) {
1468      return false;               // f does not exist
1469    }
1470  }
1471  
1472  /** The number of bytes in a file. */
1473  /** @deprecated Use getFileStatus() instead */
1474  @Deprecated
1475  public long getLength(Path f) throws IOException {
1476    return getFileStatus(f).getLen();
1477  }
1478    
1479  /** Return the {@link ContentSummary} of a given {@link Path}.
1480  * @param f path to use
1481  */
1482  public ContentSummary getContentSummary(Path f) throws IOException {
1483    FileStatus status = getFileStatus(f);
1484    if (status.isFile()) {
1485      // f is a file
1486      long length = status.getLen();
1487      return new ContentSummary.Builder().length(length).
1488          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1489    }
1490    // f is a directory
1491    long[] summary = {0, 0, 1};
1492    for(FileStatus s : listStatus(f)) {
1493      long length = s.getLen();
1494      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1495          new ContentSummary.Builder().length(length).
1496          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1497      summary[0] += c.getLength();
1498      summary[1] += c.getFileCount();
1499      summary[2] += c.getDirectoryCount();
1500    }
1501    return new ContentSummary.Builder().length(summary[0]).
1502        fileCount(summary[1]).directoryCount(summary[2]).
1503        spaceConsumed(summary[0]).build();
1504  }
1505
1506  /** Return the {@link QuotaUsage} of a given {@link Path}.
1507   * @param f path to use
1508   */
1509  public QuotaUsage getQuotaUsage(Path f) throws IOException {
1510    return getContentSummary(f);
1511  }
1512
1513  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1514    @Override
1515    public boolean accept(Path file) {
1516      return true;
1517    }
1518  };
1519    
1520  /**
1521   * List the statuses of the files/directories in the given path if the path is
1522   * a directory.
1523   * <p>
1524   * Does not guarantee to return the List of files/directories status in a
1525   * sorted order.
1526   * @param f given path
1527   * @return the statuses of the files/directories in the given patch
1528   * @throws FileNotFoundException when the path does not exist;
1529   *         IOException see specific implementation
1530   */
1531  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1532                                                         IOException;
1533    
1534  /*
1535   * Filter files/directories in the given path using the user-supplied path
1536   * filter. Results are added to the given array <code>results</code>.
1537   */
1538  private void listStatus(ArrayList<FileStatus> results, Path f,
1539      PathFilter filter) throws FileNotFoundException, IOException {
1540    FileStatus listing[] = listStatus(f);
1541    if (listing == null) {
1542      throw new IOException("Error accessing " + f);
1543    }
1544
1545    for (int i = 0; i < listing.length; i++) {
1546      if (filter.accept(listing[i].getPath())) {
1547        results.add(listing[i]);
1548      }
1549    }
1550  }
1551
1552  /**
1553   * @return an iterator over the corrupt files under the given path
1554   * (may contain duplicates if a file has more than one corrupt block)
1555   * @throws IOException
1556   */
1557  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1558    throws IOException {
1559    throw new UnsupportedOperationException(getClass().getCanonicalName() +
1560                                            " does not support" +
1561                                            " listCorruptFileBlocks");
1562  }
1563
1564  /**
1565   * Filter files/directories in the given path using the user-supplied path
1566   * filter.
1567   * <p>
1568   * Does not guarantee to return the List of files/directories status in a
1569   * sorted order.
1570   * 
1571   * @param f
1572   *          a path name
1573   * @param filter
1574   *          the user-supplied path filter
1575   * @return an array of FileStatus objects for the files under the given path
1576   *         after applying the filter
1577   * @throws FileNotFoundException when the path does not exist;
1578   *         IOException see specific implementation   
1579   */
1580  public FileStatus[] listStatus(Path f, PathFilter filter) 
1581                                   throws FileNotFoundException, IOException {
1582    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1583    listStatus(results, f, filter);
1584    return results.toArray(new FileStatus[results.size()]);
1585  }
1586
1587  /**
1588   * Filter files/directories in the given list of paths using default
1589   * path filter.
1590   * <p>
1591   * Does not guarantee to return the List of files/directories status in a
1592   * sorted order.
1593   * 
1594   * @param files
1595   *          a list of paths
1596   * @return a list of statuses for the files under the given paths after
1597   *         applying the filter default Path filter
1598   * @throws FileNotFoundException when the path does not exist;
1599   *         IOException see specific implementation
1600   */
1601  public FileStatus[] listStatus(Path[] files)
1602      throws FileNotFoundException, IOException {
1603    return listStatus(files, DEFAULT_FILTER);
1604  }
1605
1606  /**
1607   * Filter files/directories in the given list of paths using user-supplied
1608   * path filter.
1609   * <p>
1610   * Does not guarantee to return the List of files/directories status in a
1611   * sorted order.
1612   * 
1613   * @param files
1614   *          a list of paths
1615   * @param filter
1616   *          the user-supplied path filter
1617   * @return a list of statuses for the files under the given paths after
1618   *         applying the filter
1619   * @throws FileNotFoundException when the path does not exist;
1620   *         IOException see specific implementation
1621   */
1622  public FileStatus[] listStatus(Path[] files, PathFilter filter)
1623      throws FileNotFoundException, IOException {
1624    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1625    for (int i = 0; i < files.length; i++) {
1626      listStatus(results, files[i], filter);
1627    }
1628    return results.toArray(new FileStatus[results.size()]);
1629  }
1630
1631  /**
1632   * <p>Return all the files that match filePattern and are not checksum
1633   * files. Results are sorted by their names.
1634   * 
1635   * <p>
1636   * A filename pattern is composed of <i>regular</i> characters and
1637   * <i>special pattern matching</i> characters, which are:
1638   *
1639   * <dl>
1640   *  <dd>
1641   *   <dl>
1642   *    <p>
1643   *    <dt> <tt> ? </tt>
1644   *    <dd> Matches any single character.
1645   *
1646   *    <p>
1647   *    <dt> <tt> * </tt>
1648   *    <dd> Matches zero or more characters.
1649   *
1650   *    <p>
1651   *    <dt> <tt> [<i>abc</i>] </tt>
1652   *    <dd> Matches a single character from character set
1653   *     <tt>{<i>a,b,c</i>}</tt>.
1654   *
1655   *    <p>
1656   *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1657   *    <dd> Matches a single character from the character range
1658   *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1659   *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1660   *
1661   *    <p>
1662   *    <dt> <tt> [^<i>a</i>] </tt>
1663   *    <dd> Matches a single character that is not from character set or range
1664   *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1665   *     immediately to the right of the opening bracket.
1666   *
1667   *    <p>
1668   *    <dt> <tt> \<i>c</i> </tt>
1669   *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1670   *
1671   *    <p>
1672   *    <dt> <tt> {ab,cd} </tt>
1673   *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1674   *    
1675   *    <p>
1676   *    <dt> <tt> {ab,c{de,fh}} </tt>
1677   *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1678   *
1679   *   </dl>
1680   *  </dd>
1681   * </dl>
1682   *
1683   * @param pathPattern a regular expression specifying a pth pattern
1684
1685   * @return an array of paths that match the path pattern
1686   * @throws IOException
1687   */
1688  public FileStatus[] globStatus(Path pathPattern) throws IOException {
1689    return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1690  }
1691  
1692  /**
1693   * Return an array of FileStatus objects whose path names match
1694   * {@code pathPattern} and is accepted by the user-supplied path filter.
1695   * Results are sorted by their path names.
1696   * 
1697   * @param pathPattern a regular expression specifying the path pattern
1698   * @param filter a user-supplied path filter
1699   * @return null if {@code pathPattern} has no glob and the path does not exist
1700   *         an empty array if {@code pathPattern} has a glob and no path
1701   *         matches it else an array of {@link FileStatus} objects matching the
1702   *         pattern
1703   * @throws IOException if any I/O error occurs when fetching file status
1704   */
1705  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1706      throws IOException {
1707    return new Globber(this, pathPattern, filter).glob();
1708  }
1709  
1710  /**
1711   * List the statuses of the files/directories in the given path if the path is
1712   * a directory. 
1713   * Return the file's status and block locations If the path is a file.
1714   * 
1715   * If a returned status is a file, it contains the file's block locations.
1716   * 
1717   * @param f is the path
1718   *
1719   * @return an iterator that traverses statuses of the files/directories 
1720   *         in the given path
1721   *
1722   * @throws FileNotFoundException If <code>f</code> does not exist
1723   * @throws IOException If an I/O error occurred
1724   */
1725  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1726  throws FileNotFoundException, IOException {
1727    return listLocatedStatus(f, DEFAULT_FILTER);
1728  }
1729
1730  /**
1731   * Listing a directory
1732   * The returned results include its block location if it is a file
1733   * The results are filtered by the given path filter
1734   * @param f a path
1735   * @param filter a path filter
1736   * @return an iterator that traverses statuses of the files/directories 
1737   *         in the given path
1738   * @throws FileNotFoundException if <code>f</code> does not exist
1739   * @throws IOException if any I/O error occurred
1740   */
1741  protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1742      final PathFilter filter)
1743  throws FileNotFoundException, IOException {
1744    return new RemoteIterator<LocatedFileStatus>() {
1745      private final FileStatus[] stats = listStatus(f, filter);
1746      private int i = 0;
1747
1748      @Override
1749      public boolean hasNext() {
1750        return i<stats.length;
1751      }
1752
1753      @Override
1754      public LocatedFileStatus next() throws IOException {
1755        if (!hasNext()) {
1756          throw new NoSuchElementException("No more entries in " + f);
1757        }
1758        FileStatus result = stats[i++];
1759        // for files, use getBlockLocations(FileStatus, int, int) to avoid
1760        // calling getFileStatus(Path) to load the FileStatus again
1761        BlockLocation[] locs = result.isFile() ?
1762            getFileBlockLocations(result, 0, result.getLen()) :
1763            null;
1764        return new LocatedFileStatus(result, locs);
1765      }
1766    };
1767  }
1768
1769  /**
1770   * Returns a remote iterator so that followup calls are made on demand
1771   * while consuming the entries. Each file system implementation should
1772   * override this method and provide a more efficient implementation, if
1773   * possible. 
1774   * Does not guarantee to return the iterator that traverses statuses
1775   * of the files in a sorted order.
1776   *
1777   * @param p target path
1778   * @return remote iterator
1779   */
1780  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
1781  throws FileNotFoundException, IOException {
1782    return new RemoteIterator<FileStatus>() {
1783      private final FileStatus[] stats = listStatus(p);
1784      private int i = 0;
1785
1786      @Override
1787      public boolean hasNext() {
1788        return i<stats.length;
1789      }
1790
1791      @Override
1792      public FileStatus next() throws IOException {
1793        if (!hasNext()) {
1794          throw new NoSuchElementException("No more entry in " + p);
1795        }
1796        return stats[i++];
1797      }
1798    };
1799  }
1800
1801  /**
1802   * List the statuses and block locations of the files in the given path.
1803   * Does not guarantee to return the iterator that traverses statuses
1804   * of the files in a sorted order.
1805   * 
1806   * If the path is a directory, 
1807   *   if recursive is false, returns files in the directory;
1808   *   if recursive is true, return files in the subtree rooted at the path.
1809   * If the path is a file, return the file's status and block locations.
1810   * 
1811   * @param f is the path
1812   * @param recursive if the subdirectories need to be traversed recursively
1813   *
1814   * @return an iterator that traverses statuses of the files
1815   *
1816   * @throws FileNotFoundException when the path does not exist;
1817   *         IOException see specific implementation
1818   */
1819  public RemoteIterator<LocatedFileStatus> listFiles(
1820      final Path f, final boolean recursive)
1821  throws FileNotFoundException, IOException {
1822    return new RemoteIterator<LocatedFileStatus>() {
1823      private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1824        new Stack<RemoteIterator<LocatedFileStatus>>();
1825      private RemoteIterator<LocatedFileStatus> curItor =
1826        listLocatedStatus(f);
1827      private LocatedFileStatus curFile;
1828     
1829      @Override
1830      public boolean hasNext() throws IOException {
1831        while (curFile == null) {
1832          if (curItor.hasNext()) {
1833            handleFileStat(curItor.next());
1834          } else if (!itors.empty()) {
1835            curItor = itors.pop();
1836          } else {
1837            return false;
1838          }
1839        }
1840        return true;
1841      }
1842
1843      /**
1844       * Process the input stat.
1845       * If it is a file, return the file stat.
1846       * If it is a directory, traverse the directory if recursive is true;
1847       * ignore it if recursive is false.
1848       * @param stat input status
1849       * @throws IOException if any IO error occurs
1850       */
1851      private void handleFileStat(LocatedFileStatus stat) throws IOException {
1852        if (stat.isFile()) { // file
1853          curFile = stat;
1854        } else if (recursive) { // directory
1855          itors.push(curItor);
1856          curItor = listLocatedStatus(stat.getPath());
1857        }
1858      }
1859
1860      @Override
1861      public LocatedFileStatus next() throws IOException {
1862        if (hasNext()) {
1863          LocatedFileStatus result = curFile;
1864          curFile = null;
1865          return result;
1866        } 
1867        throw new java.util.NoSuchElementException("No more entry in " + f);
1868      }
1869    };
1870  }
1871  
1872  /** Return the current user's home directory in this filesystem.
1873   * The default implementation returns "/user/$USER/".
1874   */
1875  public Path getHomeDirectory() {
1876    return this.makeQualified(
1877        new Path("/user/"+System.getProperty("user.name")));
1878  }
1879
1880
1881  /**
1882   * Set the current working directory for the given file system. All relative
1883   * paths will be resolved relative to it.
1884   * 
1885   * @param new_dir
1886   */
1887  public abstract void setWorkingDirectory(Path new_dir);
1888    
1889  /**
1890   * Get the current working directory for the given file system
1891   * @return the directory pathname
1892   */
1893  public abstract Path getWorkingDirectory();
1894  
1895  
1896  /**
1897   * Note: with the new FilesContext class, getWorkingDirectory()
1898   * will be removed. 
1899   * The working directory is implemented in FilesContext.
1900   * 
1901   * Some file systems like LocalFileSystem have an initial workingDir
1902   * that we use as the starting workingDir. For other file systems
1903   * like HDFS there is no built in notion of an initial workingDir.
1904   * 
1905   * @return if there is built in notion of workingDir then it
1906   * is returned; else a null is returned.
1907   */
1908  protected Path getInitialWorkingDirectory() {
1909    return null;
1910  }
1911
1912  /**
1913   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1914   */
1915  public boolean mkdirs(Path f) throws IOException {
1916    return mkdirs(f, FsPermission.getDirDefault());
1917  }
1918
1919  /**
1920   * Make the given file and all non-existent parents into
1921   * directories. Has the semantics of Unix 'mkdir -p'.
1922   * Existence of the directory hierarchy is not an error.
1923   * @param f path to create
1924   * @param permission to apply to f
1925   */
1926  public abstract boolean mkdirs(Path f, FsPermission permission
1927      ) throws IOException;
1928
1929  /**
1930   * The src file is on the local disk.  Add it to FS at
1931   * the given dst name and the source is kept intact afterwards
1932   * @param src path
1933   * @param dst path
1934   */
1935  public void copyFromLocalFile(Path src, Path dst)
1936    throws IOException {
1937    copyFromLocalFile(false, src, dst);
1938  }
1939
1940  /**
1941   * The src files is on the local disk.  Add it to FS at
1942   * the given dst name, removing the source afterwards.
1943   * @param srcs path
1944   * @param dst path
1945   */
1946  public void moveFromLocalFile(Path[] srcs, Path dst)
1947    throws IOException {
1948    copyFromLocalFile(true, true, srcs, dst);
1949  }
1950
1951  /**
1952   * The src file is on the local disk.  Add it to FS at
1953   * the given dst name, removing the source afterwards.
1954   * @param src path
1955   * @param dst path
1956   */
1957  public void moveFromLocalFile(Path src, Path dst)
1958    throws IOException {
1959    copyFromLocalFile(true, src, dst);
1960  }
1961
1962  /**
1963   * The src file is on the local disk.  Add it to FS at
1964   * the given dst name.
1965   * delSrc indicates if the source should be removed
1966   * @param delSrc whether to delete the src
1967   * @param src path
1968   * @param dst path
1969   */
1970  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1971    throws IOException {
1972    copyFromLocalFile(delSrc, true, src, dst);
1973  }
1974  
1975  /**
1976   * The src files are on the local disk.  Add it to FS at
1977   * the given dst name.
1978   * delSrc indicates if the source should be removed
1979   * @param delSrc whether to delete the src
1980   * @param overwrite whether to overwrite an existing file
1981   * @param srcs array of paths which are source
1982   * @param dst path
1983   */
1984  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1985                                Path[] srcs, Path dst)
1986    throws IOException {
1987    Configuration conf = getConf();
1988    FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1989  }
1990  
1991  /**
1992   * The src file is on the local disk.  Add it to FS at
1993   * the given dst name.
1994   * delSrc indicates if the source should be removed
1995   * @param delSrc whether to delete the src
1996   * @param overwrite whether to overwrite an existing file
1997   * @param src path
1998   * @param dst path
1999   */
2000  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
2001                                Path src, Path dst)
2002    throws IOException {
2003    Configuration conf = getConf();
2004    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
2005  }
2006    
2007  /**
2008   * The src file is under FS, and the dst is on the local disk.
2009   * Copy it from FS control to the local dst name.
2010   * @param src path
2011   * @param dst path
2012   */
2013  public void copyToLocalFile(Path src, Path dst) throws IOException {
2014    copyToLocalFile(false, src, dst);
2015  }
2016    
2017  /**
2018   * The src file is under FS, and the dst is on the local disk.
2019   * Copy it from FS control to the local dst name.
2020   * Remove the source afterwards
2021   * @param src path
2022   * @param dst path
2023   */
2024  public void moveToLocalFile(Path src, Path dst) throws IOException {
2025    copyToLocalFile(true, src, dst);
2026  }
2027
2028  /**
2029   * The src file is under FS, and the dst is on the local disk.
2030   * Copy it from FS control to the local dst name.
2031   * delSrc indicates if the src will be removed or not.
2032   * @param delSrc whether to delete the src
2033   * @param src path
2034   * @param dst path
2035   */   
2036  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
2037    throws IOException {
2038    copyToLocalFile(delSrc, src, dst, false);
2039  }
2040  
2041    /**
2042   * The src file is under FS, and the dst is on the local disk. Copy it from FS
2043   * control to the local dst name. delSrc indicates if the src will be removed
2044   * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
2045   * as local file system or not. RawLocalFileSystem is non crc file system.So,
2046   * It will not create any crc files at local.
2047   * 
2048   * @param delSrc
2049   *          whether to delete the src
2050   * @param src
2051   *          path
2052   * @param dst
2053   *          path
2054   * @param useRawLocalFileSystem
2055   *          whether to use RawLocalFileSystem as local file system or not.
2056   * 
2057   * @throws IOException
2058   *           - if any IO error
2059   */
2060  public void copyToLocalFile(boolean delSrc, Path src, Path dst,
2061      boolean useRawLocalFileSystem) throws IOException {
2062    Configuration conf = getConf();
2063    FileSystem local = null;
2064    if (useRawLocalFileSystem) {
2065      local = getLocal(conf).getRawFileSystem();
2066    } else {
2067      local = getLocal(conf);
2068    }
2069    FileUtil.copy(this, src, local, dst, delSrc, conf);
2070  }
2071
2072  /**
2073   * Returns a local File that the user can write output to.  The caller
2074   * provides both the eventual FS target name and the local working
2075   * file.  If the FS is local, we write directly into the target.  If
2076   * the FS is remote, we write into the tmp local area.
2077   * @param fsOutputFile path of output file
2078   * @param tmpLocalFile path of local tmp file
2079   */
2080  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2081    throws IOException {
2082    return tmpLocalFile;
2083  }
2084
2085  /**
2086   * Called when we're all done writing to the target.  A local FS will
2087   * do nothing, because we've written to exactly the right place.  A remote
2088   * FS will copy the contents of tmpLocalFile to the correct target at
2089   * fsOutputFile.
2090   * @param fsOutputFile path of output file
2091   * @param tmpLocalFile path to local tmp file
2092   */
2093  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2094    throws IOException {
2095    moveFromLocalFile(tmpLocalFile, fsOutputFile);
2096  }
2097
2098  /**
2099   * No more filesystem operations are needed.  Will
2100   * release any held locks.
2101   */
2102  @Override
2103  public void close() throws IOException {
2104    // delete all files that were marked as delete-on-exit.
2105    processDeleteOnExit();
2106    CACHE.remove(this.key, this);
2107  }
2108
2109  /** Return the total size of all files in the filesystem. */
2110  public long getUsed() throws IOException {
2111    Path path = new Path("/");
2112    return getUsed(path);
2113  }
2114
2115  /** Return the total size of all files from a specified path. */
2116  public long getUsed(Path path) throws IOException {
2117    return getContentSummary(path).getLength();
2118  }
2119
2120  /**
2121   * Get the block size for a particular file.
2122   * @param f the filename
2123   * @return the number of bytes in a block
2124   */
2125  /** @deprecated Use getFileStatus() instead */
2126  @Deprecated
2127  public long getBlockSize(Path f) throws IOException {
2128    return getFileStatus(f).getBlockSize();
2129  }
2130
2131  /**
2132   * Return the number of bytes that large input files should be optimally
2133   * be split into to minimize i/o time.
2134   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2135   */
2136  @Deprecated
2137  public long getDefaultBlockSize() {
2138    // default to 32MB: large enough to minimize the impact of seeks
2139    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2140  }
2141    
2142  /** Return the number of bytes that large input files should be optimally
2143   * be split into to minimize i/o time.  The given path will be used to
2144   * locate the actual filesystem.  The full path does not have to exist.
2145   * @param f path of file
2146   * @return the default block size for the path's filesystem
2147   */
2148  public long getDefaultBlockSize(Path f) {
2149    return getDefaultBlockSize();
2150  }
2151
2152  /**
2153   * Get the default replication.
2154   * @deprecated use {@link #getDefaultReplication(Path)} instead
2155   */
2156  @Deprecated
2157  public short getDefaultReplication() { return 1; }
2158
2159  /**
2160   * Get the default replication for a path.   The given path will be used to
2161   * locate the actual filesystem.  The full path does not have to exist.
2162   * @param path of the file
2163   * @return default replication for the path's filesystem 
2164   */
2165  public short getDefaultReplication(Path path) {
2166    return getDefaultReplication();
2167  }
2168  
2169  /**
2170   * Return a file status object that represents the path.
2171   * @param f The path we want information from
2172   * @return a FileStatus object
2173   * @throws FileNotFoundException when the path does not exist;
2174   *         IOException see specific implementation
2175   */
2176  public abstract FileStatus getFileStatus(Path f) throws IOException;
2177
2178  /**
2179   * Checks if the user can access a path.  The mode specifies which access
2180   * checks to perform.  If the requested permissions are granted, then the
2181   * method returns normally.  If access is denied, then the method throws an
2182   * {@link AccessControlException}.
2183   * <p/>
2184   * The default implementation of this method calls {@link #getFileStatus(Path)}
2185   * and checks the returned permissions against the requested permissions.
2186   * Note that the getFileStatus call will be subject to authorization checks.
2187   * Typically, this requires search (execute) permissions on each directory in
2188   * the path's prefix, but this is implementation-defined.  Any file system
2189   * that provides a richer authorization model (such as ACLs) may override the
2190   * default implementation so that it checks against that model instead.
2191   * <p>
2192   * In general, applications should avoid using this method, due to the risk of
2193   * time-of-check/time-of-use race conditions.  The permissions on a file may
2194   * change immediately after the access call returns.  Most applications should
2195   * prefer running specific file system actions as the desired user represented
2196   * by a {@link UserGroupInformation}.
2197   *
2198   * @param path Path to check
2199   * @param mode type of access to check
2200   * @throws AccessControlException if access is denied
2201   * @throws FileNotFoundException if the path does not exist
2202   * @throws IOException see specific implementation
2203   */
2204  @InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
2205  public void access(Path path, FsAction mode) throws AccessControlException,
2206      FileNotFoundException, IOException {
2207    checkAccessPermissions(this.getFileStatus(path), mode);
2208  }
2209
2210  /**
2211   * This method provides the default implementation of
2212   * {@link #access(Path, FsAction)}.
2213   *
2214   * @param stat FileStatus to check
2215   * @param mode type of access to check
2216   * @throws IOException for any error
2217   */
2218  @InterfaceAudience.Private
2219  static void checkAccessPermissions(FileStatus stat, FsAction mode)
2220      throws IOException {
2221    FsPermission perm = stat.getPermission();
2222    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
2223    String user = ugi.getShortUserName();
2224    List<String> groups = Arrays.asList(ugi.getGroupNames());
2225    if (user.equals(stat.getOwner())) {
2226      if (perm.getUserAction().implies(mode)) {
2227        return;
2228      }
2229    } else if (groups.contains(stat.getGroup())) {
2230      if (perm.getGroupAction().implies(mode)) {
2231        return;
2232      }
2233    } else {
2234      if (perm.getOtherAction().implies(mode)) {
2235        return;
2236      }
2237    }
2238    throw new AccessControlException(String.format(
2239      "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
2240      stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
2241  }
2242
2243  /**
2244   * See {@link FileContext#fixRelativePart}
2245   */
2246  protected Path fixRelativePart(Path p) {
2247    if (p.isUriPathAbsolute()) {
2248      return p;
2249    } else {
2250      return new Path(getWorkingDirectory(), p);
2251    }
2252  }
2253
2254  /**
2255   * See {@link FileContext#createSymlink(Path, Path, boolean)}
2256   */
2257  public void createSymlink(final Path target, final Path link,
2258      final boolean createParent) throws AccessControlException,
2259      FileAlreadyExistsException, FileNotFoundException,
2260      ParentNotDirectoryException, UnsupportedFileSystemException, 
2261      IOException {
2262    // Supporting filesystems should override this method
2263    throw new UnsupportedOperationException(
2264        "Filesystem does not support symlinks!");
2265  }
2266
2267  /**
2268   * See {@link FileContext#getFileLinkStatus(Path)}
2269   */
2270  public FileStatus getFileLinkStatus(final Path f)
2271      throws AccessControlException, FileNotFoundException,
2272      UnsupportedFileSystemException, IOException {
2273    // Supporting filesystems should override this method
2274    return getFileStatus(f);
2275  }
2276
2277  /**
2278   * See {@link AbstractFileSystem#supportsSymlinks()}
2279   */
2280  public boolean supportsSymlinks() {
2281    return false;
2282  }
2283
2284  /**
2285   * See {@link FileContext#getLinkTarget(Path)}
2286   */
2287  public Path getLinkTarget(Path f) throws IOException {
2288    // Supporting filesystems should override this method
2289    throw new UnsupportedOperationException(
2290        "Filesystem does not support symlinks!");
2291  }
2292
2293  /**
2294   * See {@link AbstractFileSystem#getLinkTarget(Path)}
2295   */
2296  protected Path resolveLink(Path f) throws IOException {
2297    // Supporting filesystems should override this method
2298    throw new UnsupportedOperationException(
2299        "Filesystem does not support symlinks!");
2300  }
2301
2302  /**
2303   * Get the checksum of a file.
2304   *
2305   * @param f The file path
2306   * @return The file checksum.  The default return value is null,
2307   *  which indicates that no checksum algorithm is implemented
2308   *  in the corresponding FileSystem.
2309   */
2310  public FileChecksum getFileChecksum(Path f) throws IOException {
2311    return getFileChecksum(f, Long.MAX_VALUE);
2312  }
2313
2314  /**
2315   * Get the checksum of a file, from the beginning of the file till the
2316   * specific length.
2317   * @param f The file path
2318   * @param length The length of the file range for checksum calculation
2319   * @return The file checksum.
2320   */
2321  public FileChecksum getFileChecksum(Path f, final long length)
2322      throws IOException {
2323    return null;
2324  }
2325
2326  /**
2327   * Set the verify checksum flag. This is only applicable if the 
2328   * corresponding FileSystem supports checksum. By default doesn't do anything.
2329   * @param verifyChecksum
2330   */
2331  public void setVerifyChecksum(boolean verifyChecksum) {
2332    //doesn't do anything
2333  }
2334
2335  /**
2336   * Set the write checksum flag. This is only applicable if the 
2337   * corresponding FileSystem supports checksum. By default doesn't do anything.
2338   * @param writeChecksum
2339   */
2340  public void setWriteChecksum(boolean writeChecksum) {
2341    //doesn't do anything
2342  }
2343
2344  /**
2345   * Returns a status object describing the use and capacity of the
2346   * file system. If the file system has multiple partitions, the
2347   * use and capacity of the root partition is reflected.
2348   * 
2349   * @return a FsStatus object
2350   * @throws IOException
2351   *           see specific implementation
2352   */
2353  public FsStatus getStatus() throws IOException {
2354    return getStatus(null);
2355  }
2356
2357  /**
2358   * Returns a status object describing the use and capacity of the
2359   * file system. If the file system has multiple partitions, the
2360   * use and capacity of the partition pointed to by the specified
2361   * path is reflected.
2362   * @param p Path for which status should be obtained. null means
2363   * the default partition. 
2364   * @return a FsStatus object
2365   * @throws IOException
2366   *           see specific implementation
2367   */
2368  public FsStatus getStatus(Path p) throws IOException {
2369    return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2370  }
2371
2372  /**
2373   * Set permission of a path.
2374   * @param p
2375   * @param permission
2376   */
2377  public void setPermission(Path p, FsPermission permission
2378      ) throws IOException {
2379  }
2380
2381  /**
2382   * Set owner of a path (i.e. a file or a directory).
2383   * The parameters username and groupname cannot both be null.
2384   * @param p The path
2385   * @param username If it is null, the original username remains unchanged.
2386   * @param groupname If it is null, the original groupname remains unchanged.
2387   */
2388  public void setOwner(Path p, String username, String groupname
2389      ) throws IOException {
2390  }
2391
2392  /**
2393   * Set access time of a file
2394   * @param p The path
2395   * @param mtime Set the modification time of this file.
2396   *              The number of milliseconds since Jan 1, 1970. 
2397   *              A value of -1 means that this call should not set modification time.
2398   * @param atime Set the access time of this file.
2399   *              The number of milliseconds since Jan 1, 1970. 
2400   *              A value of -1 means that this call should not set access time.
2401   */
2402  public void setTimes(Path p, long mtime, long atime
2403      ) throws IOException {
2404  }
2405
2406  /**
2407   * Create a snapshot with a default name.
2408   * @param path The directory where snapshots will be taken.
2409   * @return the snapshot path.
2410   */
2411  public final Path createSnapshot(Path path) throws IOException {
2412    return createSnapshot(path, null);
2413  }
2414
2415  /**
2416   * Create a snapshot
2417   * @param path The directory where snapshots will be taken.
2418   * @param snapshotName The name of the snapshot
2419   * @return the snapshot path.
2420   */
2421  public Path createSnapshot(Path path, String snapshotName)
2422      throws IOException {
2423    throw new UnsupportedOperationException(getClass().getSimpleName()
2424        + " doesn't support createSnapshot");
2425  }
2426  
2427  /**
2428   * Rename a snapshot
2429   * @param path The directory path where the snapshot was taken
2430   * @param snapshotOldName Old name of the snapshot
2431   * @param snapshotNewName New name of the snapshot
2432   * @throws IOException
2433   */
2434  public void renameSnapshot(Path path, String snapshotOldName,
2435      String snapshotNewName) throws IOException {
2436    throw new UnsupportedOperationException(getClass().getSimpleName()
2437        + " doesn't support renameSnapshot");
2438  }
2439  
2440  /**
2441   * Delete a snapshot of a directory
2442   * @param path  The directory that the to-be-deleted snapshot belongs to
2443   * @param snapshotName The name of the snapshot
2444   */
2445  public void deleteSnapshot(Path path, String snapshotName)
2446      throws IOException {
2447    throw new UnsupportedOperationException(getClass().getSimpleName()
2448        + " doesn't support deleteSnapshot");
2449  }
2450  
2451  /**
2452   * Modifies ACL entries of files and directories.  This method can add new ACL
2453   * entries or modify the permissions on existing ACL entries.  All existing
2454   * ACL entries that are not specified in this call are retained without
2455   * changes.  (Modifications are merged into the current ACL.)
2456   *
2457   * @param path Path to modify
2458   * @param aclSpec List<AclEntry> describing modifications
2459   * @throws IOException if an ACL could not be modified
2460   */
2461  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2462      throws IOException {
2463    throw new UnsupportedOperationException(getClass().getSimpleName()
2464        + " doesn't support modifyAclEntries");
2465  }
2466
2467  /**
2468   * Removes ACL entries from files and directories.  Other ACL entries are
2469   * retained.
2470   *
2471   * @param path Path to modify
2472   * @param aclSpec List<AclEntry> describing entries to remove
2473   * @throws IOException if an ACL could not be modified
2474   */
2475  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2476      throws IOException {
2477    throw new UnsupportedOperationException(getClass().getSimpleName()
2478        + " doesn't support removeAclEntries");
2479  }
2480
2481  /**
2482   * Removes all default ACL entries from files and directories.
2483   *
2484   * @param path Path to modify
2485   * @throws IOException if an ACL could not be modified
2486   */
2487  public void removeDefaultAcl(Path path)
2488      throws IOException {
2489    throw new UnsupportedOperationException(getClass().getSimpleName()
2490        + " doesn't support removeDefaultAcl");
2491  }
2492
2493  /**
2494   * Removes all but the base ACL entries of files and directories.  The entries
2495   * for user, group, and others are retained for compatibility with permission
2496   * bits.
2497   *
2498   * @param path Path to modify
2499   * @throws IOException if an ACL could not be removed
2500   */
2501  public void removeAcl(Path path)
2502      throws IOException {
2503    throw new UnsupportedOperationException(getClass().getSimpleName()
2504        + " doesn't support removeAcl");
2505  }
2506
2507  /**
2508   * Fully replaces ACL of files and directories, discarding all existing
2509   * entries.
2510   *
2511   * @param path Path to modify
2512   * @param aclSpec List<AclEntry> describing modifications, must include entries
2513   *   for user, group, and others for compatibility with permission bits.
2514   * @throws IOException if an ACL could not be modified
2515   */
2516  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2517    throw new UnsupportedOperationException(getClass().getSimpleName()
2518        + " doesn't support setAcl");
2519  }
2520
2521  /**
2522   * Gets the ACL of a file or directory.
2523   *
2524   * @param path Path to get
2525   * @return AclStatus describing the ACL of the file or directory
2526   * @throws IOException if an ACL could not be read
2527   */
2528  public AclStatus getAclStatus(Path path) throws IOException {
2529    throw new UnsupportedOperationException(getClass().getSimpleName()
2530        + " doesn't support getAclStatus");
2531  }
2532
2533  /**
2534   * Set an xattr of a file or directory.
2535   * The name must be prefixed with the namespace followed by ".". For example,
2536   * "user.attr".
2537   * <p/>
2538   * Refer to the HDFS extended attributes user documentation for details.
2539   *
2540   * @param path Path to modify
2541   * @param name xattr name.
2542   * @param value xattr value.
2543   * @throws IOException
2544   */
2545  public void setXAttr(Path path, String name, byte[] value)
2546      throws IOException {
2547    setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
2548        XAttrSetFlag.REPLACE));
2549  }
2550
2551  /**
2552   * Set an xattr of a file or directory.
2553   * The name must be prefixed with the namespace followed by ".". For example,
2554   * "user.attr".
2555   * <p/>
2556   * Refer to the HDFS extended attributes user documentation for details.
2557   *
2558   * @param path Path to modify
2559   * @param name xattr name.
2560   * @param value xattr value.
2561   * @param flag xattr set flag
2562   * @throws IOException
2563   */
2564  public void setXAttr(Path path, String name, byte[] value,
2565      EnumSet<XAttrSetFlag> flag) throws IOException {
2566    throw new UnsupportedOperationException(getClass().getSimpleName()
2567        + " doesn't support setXAttr");
2568  }
2569
2570  /**
2571   * Get an xattr name and value for a file or directory.
2572   * The name must be prefixed with the namespace followed by ".". For example,
2573   * "user.attr".
2574   * <p/>
2575   * Refer to the HDFS extended attributes user documentation for details.
2576   *
2577   * @param path Path to get extended attribute
2578   * @param name xattr name.
2579   * @return byte[] xattr value.
2580   * @throws IOException
2581   */
2582  public byte[] getXAttr(Path path, String name) throws IOException {
2583    throw new UnsupportedOperationException(getClass().getSimpleName()
2584        + " doesn't support getXAttr");
2585  }
2586
2587  /**
2588   * Get all of the xattr name/value pairs for a file or directory.
2589   * Only those xattrs which the logged-in user has permissions to view
2590   * are returned.
2591   * <p/>
2592   * Refer to the HDFS extended attributes user documentation for details.
2593   *
2594   * @param path Path to get extended attributes
2595   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2596   * @throws IOException
2597   */
2598  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
2599    throw new UnsupportedOperationException(getClass().getSimpleName()
2600        + " doesn't support getXAttrs");
2601  }
2602
2603  /**
2604   * Get all of the xattrs name/value pairs for a file or directory.
2605   * Only those xattrs which the logged-in user has permissions to view
2606   * are returned.
2607   * <p/>
2608   * Refer to the HDFS extended attributes user documentation for details.
2609   *
2610   * @param path Path to get extended attributes
2611   * @param names XAttr names.
2612   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2613   * @throws IOException
2614   */
2615  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
2616      throws IOException {
2617    throw new UnsupportedOperationException(getClass().getSimpleName()
2618        + " doesn't support getXAttrs");
2619  }
2620
2621  /**
2622   * Get all of the xattr names for a file or directory.
2623   * Only those xattr names which the logged-in user has permissions to view
2624   * are returned.
2625   * <p/>
2626   * Refer to the HDFS extended attributes user documentation for details.
2627   *
2628   * @param path Path to get extended attributes
2629   * @return List<String> of the XAttr names of the file or directory
2630   * @throws IOException
2631   */
2632  public List<String> listXAttrs(Path path) throws IOException {
2633    throw new UnsupportedOperationException(getClass().getSimpleName()
2634            + " doesn't support listXAttrs");
2635  }
2636
2637  /**
2638   * Remove an xattr of a file or directory.
2639   * The name must be prefixed with the namespace followed by ".". For example,
2640   * "user.attr".
2641   * <p/>
2642   * Refer to the HDFS extended attributes user documentation for details.
2643   *
2644   * @param path Path to remove extended attribute
2645   * @param name xattr name
2646   * @throws IOException
2647   */
2648  public void removeXAttr(Path path, String name) throws IOException {
2649    throw new UnsupportedOperationException(getClass().getSimpleName()
2650        + " doesn't support removeXAttr");
2651  }
2652
2653  /**
2654   * Set the storage policy for a given file or directory.
2655   *
2656   * @param src file or directory path.
2657   * @param policyName the name of the target storage policy. The list
2658   *                   of supported Storage policies can be retrieved
2659   *                   via {@link #getAllStoragePolicies}.
2660   * @throws IOException
2661   */
2662  public void setStoragePolicy(final Path src, final String policyName)
2663      throws IOException {
2664    throw new UnsupportedOperationException(getClass().getSimpleName()
2665        + " doesn't support setStoragePolicy");
2666  }
2667
2668  /**
2669   * Unset the storage policy set for a given file or directory.
2670   * @param src file or directory path.
2671   * @throws IOException
2672   */
2673  public void unsetStoragePolicy(final Path src) throws IOException {
2674    throw new UnsupportedOperationException(getClass().getSimpleName()
2675        + " doesn't support unsetStoragePolicy");
2676  }
2677
2678  /**
2679   * Query the effective storage policy ID for the given file or directory.
2680   *
2681   * @param src file or directory path.
2682   * @return storage policy for give file.
2683   * @throws IOException
2684   */
2685  public BlockStoragePolicySpi getStoragePolicy(final Path src)
2686      throws IOException {
2687    throw new UnsupportedOperationException(getClass().getSimpleName()
2688        + " doesn't support getStoragePolicy");
2689  }
2690
2691  /**
2692   * Retrieve all the storage policies supported by this file system.
2693   *
2694   * @return all storage policies supported by this filesystem.
2695   * @throws IOException
2696   */
2697  public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
2698      throws IOException {
2699    throw new UnsupportedOperationException(getClass().getSimpleName()
2700        + " doesn't support getAllStoragePolicies");
2701  }
2702
2703  /**
2704   * Get the root directory of Trash for current user when the path specified
2705   * is deleted.
2706   *
2707   * @param path the trash root of the path to be determined.
2708   * @return the default implementation returns "/user/$USER/.Trash".
2709   */
2710  public Path getTrashRoot(Path path) {
2711    return this.makeQualified(new Path(getHomeDirectory().toUri().getPath(),
2712        TRASH_PREFIX));
2713  }
2714
2715  /**
2716   * Get all the trash roots for current user or all users.
2717   *
2718   * @param allUsers return trash roots for all users if true.
2719   * @return all the trash root directories.
2720   *         Default FileSystem returns .Trash under users' home directories if
2721   *         /user/$USER/.Trash exists.
2722   */
2723  public Collection<FileStatus> getTrashRoots(boolean allUsers) {
2724    Path userHome = new Path(getHomeDirectory().toUri().getPath());
2725    List<FileStatus> ret = new ArrayList<>();
2726    try {
2727      if (!allUsers) {
2728        Path userTrash = new Path(userHome, TRASH_PREFIX);
2729        if (exists(userTrash)) {
2730          ret.add(getFileStatus(userTrash));
2731        }
2732      } else {
2733        Path homeParent = userHome.getParent();
2734        if (exists(homeParent)) {
2735          FileStatus[] candidates = listStatus(homeParent);
2736          for (FileStatus candidate : candidates) {
2737            Path userTrash = new Path(candidate.getPath(), TRASH_PREFIX);
2738            if (exists(userTrash)) {
2739              candidate.setPath(userTrash);
2740              ret.add(candidate);
2741            }
2742          }
2743        }
2744      }
2745    } catch (IOException e) {
2746      LOG.warn("Cannot get all trash roots", e);
2747    }
2748    return ret;
2749  }
2750
2751  // making it volatile to be able to do a double checked locking
2752  private volatile static boolean FILE_SYSTEMS_LOADED = false;
2753
2754  private static final Map<String, Class<? extends FileSystem>>
2755    SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2756
2757  private static void loadFileSystems() {
2758    synchronized (FileSystem.class) {
2759      if (!FILE_SYSTEMS_LOADED) {
2760        ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2761        Iterator<FileSystem> it = serviceLoader.iterator();
2762        while (it.hasNext()) {
2763          FileSystem fs = null;
2764          try {
2765            fs = it.next();
2766            try {
2767              SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2768            } catch (Exception e) {
2769              LOG.warn("Cannot load: " + fs + " from " +
2770                  ClassUtil.findContainingJar(fs.getClass()), e);
2771            }
2772          } catch (ServiceConfigurationError ee) {
2773            LOG.warn("Cannot load filesystem", ee);
2774          }
2775        }
2776        FILE_SYSTEMS_LOADED = true;
2777      }
2778    }
2779  }
2780
2781  public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2782      Configuration conf) throws IOException {
2783    if (!FILE_SYSTEMS_LOADED) {
2784      loadFileSystems();
2785    }
2786    Class<? extends FileSystem> clazz = null;
2787    if (conf != null) {
2788      clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2789    }
2790    if (clazz == null) {
2791      clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2792    }
2793    if (clazz == null) {
2794      throw new IOException("No FileSystem for scheme: " + scheme);
2795    }
2796    return clazz;
2797  }
2798
2799  private static FileSystem createFileSystem(URI uri, Configuration conf
2800      ) throws IOException {
2801    Tracer tracer = FsTracer.get(conf);
2802    TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
2803    scope.addKVAnnotation("scheme", uri.getScheme());
2804    try {
2805      Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2806      FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2807      fs.initialize(uri, conf);
2808      return fs;
2809    } finally {
2810      scope.close();
2811    }
2812  }
2813
2814  /** Caching FileSystem objects */
2815  static class Cache {
2816    private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2817
2818    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2819    private final Set<Key> toAutoClose = new HashSet<Key>();
2820
2821    /** A variable that makes all objects in the cache unique */
2822    private static AtomicLong unique = new AtomicLong(1);
2823
2824    FileSystem get(URI uri, Configuration conf) throws IOException{
2825      Key key = new Key(uri, conf);
2826      return getInternal(uri, conf, key);
2827    }
2828
2829    /** The objects inserted into the cache using this method are all unique */
2830    FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2831      Key key = new Key(uri, conf, unique.getAndIncrement());
2832      return getInternal(uri, conf, key);
2833    }
2834
2835    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2836      FileSystem fs;
2837      synchronized (this) {
2838        fs = map.get(key);
2839      }
2840      if (fs != null) {
2841        return fs;
2842      }
2843
2844      fs = createFileSystem(uri, conf);
2845      synchronized (this) { // refetch the lock again
2846        FileSystem oldfs = map.get(key);
2847        if (oldfs != null) { // a file system is created while lock is releasing
2848          fs.close(); // close the new file system
2849          return oldfs;  // return the old file system
2850        }
2851        
2852        // now insert the new file system into the map
2853        if (map.isEmpty()
2854                && !ShutdownHookManager.get().isShutdownInProgress()) {
2855          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2856        }
2857        fs.key = key;
2858        map.put(key, fs);
2859        if (conf.getBoolean("fs.automatic.close", true)) {
2860          toAutoClose.add(key);
2861        }
2862        return fs;
2863      }
2864    }
2865
2866    synchronized void remove(Key key, FileSystem fs) {
2867      FileSystem cachedFs = map.remove(key);
2868      if (fs == cachedFs) {
2869        toAutoClose.remove(key);
2870      } else if (cachedFs != null) {
2871        map.put(key, cachedFs);
2872      }
2873    }
2874
2875    synchronized void closeAll() throws IOException {
2876      closeAll(false);
2877    }
2878
2879    /**
2880     * Close all FileSystem instances in the Cache.
2881     * @param onlyAutomatic only close those that are marked for automatic closing
2882     */
2883    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2884      List<IOException> exceptions = new ArrayList<IOException>();
2885
2886      // Make a copy of the keys in the map since we'll be modifying
2887      // the map while iterating over it, which isn't safe.
2888      List<Key> keys = new ArrayList<Key>();
2889      keys.addAll(map.keySet());
2890
2891      for (Key key : keys) {
2892        final FileSystem fs = map.get(key);
2893
2894        if (onlyAutomatic && !toAutoClose.contains(key)) {
2895          continue;
2896        }
2897
2898        //remove from cache
2899        map.remove(key);
2900        toAutoClose.remove(key);
2901
2902        if (fs != null) {
2903          try {
2904            fs.close();
2905          }
2906          catch(IOException ioe) {
2907            exceptions.add(ioe);
2908          }
2909        }
2910      }
2911
2912      if (!exceptions.isEmpty()) {
2913        throw MultipleIOException.createIOException(exceptions);
2914      }
2915    }
2916
2917    private class ClientFinalizer implements Runnable {
2918      @Override
2919      public synchronized void run() {
2920        try {
2921          closeAll(true);
2922        } catch (IOException e) {
2923          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2924        }
2925      }
2926    }
2927
2928    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2929      List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2930      //Make a pass over the list and collect the filesystems to close
2931      //we cannot close inline since close() removes the entry from the Map
2932      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2933        final Key key = entry.getKey();
2934        final FileSystem fs = entry.getValue();
2935        if (ugi.equals(key.ugi) && fs != null) {
2936          targetFSList.add(fs);   
2937        }
2938      }
2939      List<IOException> exceptions = new ArrayList<IOException>();
2940      //now make a pass over the target list and close each
2941      for (FileSystem fs : targetFSList) {
2942        try {
2943          fs.close();
2944        }
2945        catch(IOException ioe) {
2946          exceptions.add(ioe);
2947        }
2948      }
2949      if (!exceptions.isEmpty()) {
2950        throw MultipleIOException.createIOException(exceptions);
2951      }
2952    }
2953
2954    /** FileSystem.Cache.Key */
2955    static class Key {
2956      final String scheme;
2957      final String authority;
2958      final UserGroupInformation ugi;
2959      final long unique;   // an artificial way to make a key unique
2960
2961      Key(URI uri, Configuration conf) throws IOException {
2962        this(uri, conf, 0);
2963      }
2964
2965      Key(URI uri, Configuration conf, long unique) throws IOException {
2966        scheme = uri.getScheme()==null ?
2967            "" : StringUtils.toLowerCase(uri.getScheme());
2968        authority = uri.getAuthority()==null ?
2969            "" : StringUtils.toLowerCase(uri.getAuthority());
2970        this.unique = unique;
2971        
2972        this.ugi = UserGroupInformation.getCurrentUser();
2973      }
2974
2975      @Override
2976      public int hashCode() {
2977        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2978      }
2979
2980      static boolean isEqual(Object a, Object b) {
2981        return a == b || (a != null && a.equals(b));        
2982      }
2983
2984      @Override
2985      public boolean equals(Object obj) {
2986        if (obj == this) {
2987          return true;
2988        }
2989        if (obj != null && obj instanceof Key) {
2990          Key that = (Key)obj;
2991          return isEqual(this.scheme, that.scheme)
2992                 && isEqual(this.authority, that.authority)
2993                 && isEqual(this.ugi, that.ugi)
2994                 && (this.unique == that.unique);
2995        }
2996        return false;        
2997      }
2998
2999      @Override
3000      public String toString() {
3001        return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
3002      }
3003    }
3004  }
3005  
3006  /**
3007   * Tracks statistics about how many reads, writes, and so forth have been
3008   * done in a FileSystem.
3009   * 
3010   * Since there is only one of these objects per FileSystem, there will 
3011   * typically be many threads writing to this object.  Almost every operation
3012   * on an open file will involve a write to this object.  In contrast, reading
3013   * statistics is done infrequently by most programs, and not at all by others.
3014   * Hence, this is optimized for writes.
3015   * 
3016   * Each thread writes to its own thread-local area of memory.  This removes 
3017   * contention and allows us to scale up to many, many threads.  To read
3018   * statistics, the reader thread totals up the contents of all of the 
3019   * thread-local data areas.
3020   */
3021  public static final class Statistics {
3022    /**
3023     * Statistics data.
3024     * 
3025     * There is only a single writer to thread-local StatisticsData objects.
3026     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
3027     * to prevent lost updates.
3028     * The Java specification guarantees that updates to volatile longs will
3029     * be perceived as atomic with respect to other threads, which is all we
3030     * need.
3031     */
3032    public static class StatisticsData {
3033      private volatile long bytesRead;
3034      private volatile long bytesWritten;
3035      private volatile int readOps;
3036      private volatile int largeReadOps;
3037      private volatile int writeOps;
3038      private volatile long bytesReadLocalHost;
3039      private volatile long bytesReadDistanceOfOneOrTwo;
3040      private volatile long bytesReadDistanceOfThreeOrFour;
3041      private volatile long bytesReadDistanceOfFiveOrLarger;
3042
3043      /**
3044       * Add another StatisticsData object to this one.
3045       */
3046      void add(StatisticsData other) {
3047        this.bytesRead += other.bytesRead;
3048        this.bytesWritten += other.bytesWritten;
3049        this.readOps += other.readOps;
3050        this.largeReadOps += other.largeReadOps;
3051        this.writeOps += other.writeOps;
3052        this.bytesReadLocalHost += other.bytesReadLocalHost;
3053        this.bytesReadDistanceOfOneOrTwo += other.bytesReadDistanceOfOneOrTwo;
3054        this.bytesReadDistanceOfThreeOrFour +=
3055            other.bytesReadDistanceOfThreeOrFour;
3056        this.bytesReadDistanceOfFiveOrLarger +=
3057            other.bytesReadDistanceOfFiveOrLarger;
3058      }
3059
3060      /**
3061       * Negate the values of all statistics.
3062       */
3063      void negate() {
3064        this.bytesRead = -this.bytesRead;
3065        this.bytesWritten = -this.bytesWritten;
3066        this.readOps = -this.readOps;
3067        this.largeReadOps = -this.largeReadOps;
3068        this.writeOps = -this.writeOps;
3069        this.bytesReadLocalHost = -this.bytesReadLocalHost;
3070        this.bytesReadDistanceOfOneOrTwo = -this.bytesReadDistanceOfOneOrTwo;
3071        this.bytesReadDistanceOfThreeOrFour =
3072            -this.bytesReadDistanceOfThreeOrFour;
3073        this.bytesReadDistanceOfFiveOrLarger =
3074            -this.bytesReadDistanceOfFiveOrLarger;
3075      }
3076
3077      @Override
3078      public String toString() {
3079        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
3080            + readOps + " read ops, " + largeReadOps + " large read ops, "
3081            + writeOps + " write ops";
3082      }
3083      
3084      public long getBytesRead() {
3085        return bytesRead;
3086      }
3087      
3088      public long getBytesWritten() {
3089        return bytesWritten;
3090      }
3091      
3092      public int getReadOps() {
3093        return readOps;
3094      }
3095      
3096      public int getLargeReadOps() {
3097        return largeReadOps;
3098      }
3099      
3100      public int getWriteOps() {
3101        return writeOps;
3102      }
3103
3104      public long getBytesReadLocalHost() {
3105        return bytesReadLocalHost;
3106      }
3107
3108      public long getBytesReadDistanceOfOneOrTwo() {
3109        return bytesReadDistanceOfOneOrTwo;
3110      }
3111
3112      public long getBytesReadDistanceOfThreeOrFour() {
3113        return bytesReadDistanceOfThreeOrFour;
3114      }
3115
3116      public long getBytesReadDistanceOfFiveOrLarger() {
3117        return bytesReadDistanceOfFiveOrLarger;
3118      }
3119    }
3120
3121    private interface StatisticsAggregator<T> {
3122      void accept(StatisticsData data);
3123      T aggregate();
3124    }
3125
3126    private final String scheme;
3127
3128    /**
3129     * rootData is data that doesn't belong to any thread, but will be added
3130     * to the totals.  This is useful for making copies of Statistics objects,
3131     * and for storing data that pertains to threads that have been garbage
3132     * collected.  Protected by the Statistics lock.
3133     */
3134    private final StatisticsData rootData;
3135
3136    /**
3137     * Thread-local data.
3138     */
3139    private final ThreadLocal<StatisticsData> threadData;
3140
3141    /**
3142     * Set of all thread-local data areas.  Protected by the Statistics lock.
3143     * The references to the statistics data are kept using weak references
3144     * to the associated threads. Proper clean-up is performed by the cleaner
3145     * thread when the threads are garbage collected.
3146     */
3147    private final Set<StatisticsDataReference> allData;
3148
3149    /**
3150     * Global reference queue and a cleaner thread that manage statistics data
3151     * references from all filesystem instances.
3152     */
3153    private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
3154    private static final Thread STATS_DATA_CLEANER;
3155
3156    static {
3157      STATS_DATA_REF_QUEUE = new ReferenceQueue<Thread>();
3158      // start a single daemon cleaner thread
3159      STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
3160      STATS_DATA_CLEANER.
3161          setName(StatisticsDataReferenceCleaner.class.getName());
3162      STATS_DATA_CLEANER.setDaemon(true);
3163      STATS_DATA_CLEANER.start();
3164    }
3165
3166    public Statistics(String scheme) {
3167      this.scheme = scheme;
3168      this.rootData = new StatisticsData();
3169      this.threadData = new ThreadLocal<StatisticsData>();
3170      this.allData = new HashSet<StatisticsDataReference>();
3171    }
3172
3173    /**
3174     * Copy constructor.
3175     * 
3176     * @param other    The input Statistics object which is cloned.
3177     */
3178    public Statistics(Statistics other) {
3179      this.scheme = other.scheme;
3180      this.rootData = new StatisticsData();
3181      other.visitAll(new StatisticsAggregator<Void>() {
3182        @Override
3183        public void accept(StatisticsData data) {
3184          rootData.add(data);
3185        }
3186
3187        public Void aggregate() {
3188          return null;
3189        }
3190      });
3191      this.threadData = new ThreadLocal<StatisticsData>();
3192      this.allData = new HashSet<StatisticsDataReference>();
3193    }
3194
3195    /**
3196     * A weak reference to a thread that also includes the data associated
3197     * with that thread. On the thread being garbage collected, it is enqueued
3198     * to the reference queue for clean-up.
3199     */
3200    private class StatisticsDataReference extends WeakReference<Thread> {
3201      private final StatisticsData data;
3202
3203      public StatisticsDataReference(StatisticsData data, Thread thread) {
3204        super(thread, STATS_DATA_REF_QUEUE);
3205        this.data = data;
3206      }
3207
3208      public StatisticsData getData() {
3209        return data;
3210      }
3211
3212      /**
3213       * Performs clean-up action when the associated thread is garbage
3214       * collected.
3215       */
3216      public void cleanUp() {
3217        // use the statistics lock for safety
3218        synchronized (Statistics.this) {
3219          /*
3220           * If the thread that created this thread-local data no longer exists,
3221           * remove the StatisticsData from our list and fold the values into
3222           * rootData.
3223           */
3224          rootData.add(data);
3225          allData.remove(this);
3226        }
3227      }
3228    }
3229
3230    /**
3231     * Background action to act on references being removed.
3232     */
3233    private static class StatisticsDataReferenceCleaner implements Runnable {
3234      @Override
3235      public void run() {
3236        while (!Thread.interrupted()) {
3237          try {
3238            StatisticsDataReference ref =
3239                (StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
3240            ref.cleanUp();
3241          } catch (InterruptedException ie) {
3242            LOG.warn("Cleaner thread interrupted, will stop", ie);
3243            Thread.currentThread().interrupt();
3244          } catch (Throwable th) {
3245            LOG.warn("Exception in the cleaner thread but it will continue to "
3246                + "run", th);
3247          }
3248        }
3249      }
3250    }
3251
3252    /**
3253     * Get or create the thread-local data associated with the current thread.
3254     */
3255    public StatisticsData getThreadStatistics() {
3256      StatisticsData data = threadData.get();
3257      if (data == null) {
3258        data = new StatisticsData();
3259        threadData.set(data);
3260        StatisticsDataReference ref =
3261            new StatisticsDataReference(data, Thread.currentThread());
3262        synchronized(this) {
3263          allData.add(ref);
3264        }
3265      }
3266      return data;
3267    }
3268
3269    /**
3270     * Increment the bytes read in the statistics
3271     * @param newBytes the additional bytes read
3272     */
3273    public void incrementBytesRead(long newBytes) {
3274      getThreadStatistics().bytesRead += newBytes;
3275    }
3276    
3277    /**
3278     * Increment the bytes written in the statistics
3279     * @param newBytes the additional bytes written
3280     */
3281    public void incrementBytesWritten(long newBytes) {
3282      getThreadStatistics().bytesWritten += newBytes;
3283    }
3284    
3285    /**
3286     * Increment the number of read operations
3287     * @param count number of read operations
3288     */
3289    public void incrementReadOps(int count) {
3290      getThreadStatistics().readOps += count;
3291    }
3292
3293    /**
3294     * Increment the number of large read operations
3295     * @param count number of large read operations
3296     */
3297    public void incrementLargeReadOps(int count) {
3298      getThreadStatistics().largeReadOps += count;
3299    }
3300
3301    /**
3302     * Increment the number of write operations
3303     * @param count number of write operations
3304     */
3305    public void incrementWriteOps(int count) {
3306      getThreadStatistics().writeOps += count;
3307    }
3308
3309    /**
3310     * Increment the bytes read by the network distance in the statistics
3311     * In the common network topology setup, distance value should be an even
3312     * number such as 0, 2, 4, 6. To make it more general, we group distance
3313     * by {1, 2}, {3, 4} and {5 and beyond} for accounting.
3314     * @param distance the network distance
3315     * @param newBytes the additional bytes read
3316     */
3317    public void incrementBytesReadByDistance(int distance, long newBytes) {
3318      switch (distance) {
3319      case 0:
3320        getThreadStatistics().bytesReadLocalHost += newBytes;
3321        break;
3322      case 1:
3323      case 2:
3324        getThreadStatistics().bytesReadDistanceOfOneOrTwo += newBytes;
3325        break;
3326      case 3:
3327      case 4:
3328        getThreadStatistics().bytesReadDistanceOfThreeOrFour += newBytes;
3329        break;
3330      default:
3331        getThreadStatistics().bytesReadDistanceOfFiveOrLarger += newBytes;
3332        break;
3333      }
3334    }
3335
3336    /**
3337     * Apply the given aggregator to all StatisticsData objects associated with
3338     * this Statistics object.
3339     *
3340     * For each StatisticsData object, we will call accept on the visitor.
3341     * Finally, at the end, we will call aggregate to get the final total. 
3342     *
3343     * @param         visitor to use.
3344     * @return        The total.
3345     */
3346    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
3347      visitor.accept(rootData);
3348      for (StatisticsDataReference ref: allData) {
3349        StatisticsData data = ref.getData();
3350        visitor.accept(data);
3351      }
3352      return visitor.aggregate();
3353    }
3354
3355    /**
3356     * Get the total number of bytes read
3357     * @return the number of bytes
3358     */
3359    public long getBytesRead() {
3360      return visitAll(new StatisticsAggregator<Long>() {
3361        private long bytesRead = 0;
3362
3363        @Override
3364        public void accept(StatisticsData data) {
3365          bytesRead += data.bytesRead;
3366        }
3367
3368        public Long aggregate() {
3369          return bytesRead;
3370        }
3371      });
3372    }
3373    
3374    /**
3375     * Get the total number of bytes written
3376     * @return the number of bytes
3377     */
3378    public long getBytesWritten() {
3379      return visitAll(new StatisticsAggregator<Long>() {
3380        private long bytesWritten = 0;
3381
3382        @Override
3383        public void accept(StatisticsData data) {
3384          bytesWritten += data.bytesWritten;
3385        }
3386
3387        public Long aggregate() {
3388          return bytesWritten;
3389        }
3390      });
3391    }
3392    
3393    /**
3394     * Get the number of file system read operations such as list files
3395     * @return number of read operations
3396     */
3397    public int getReadOps() {
3398      return visitAll(new StatisticsAggregator<Integer>() {
3399        private int readOps = 0;
3400
3401        @Override
3402        public void accept(StatisticsData data) {
3403          readOps += data.readOps;
3404          readOps += data.largeReadOps;
3405        }
3406
3407        public Integer aggregate() {
3408          return readOps;
3409        }
3410      });
3411    }
3412
3413    /**
3414     * Get the number of large file system read operations such as list files
3415     * under a large directory
3416     * @return number of large read operations
3417     */
3418    public int getLargeReadOps() {
3419      return visitAll(new StatisticsAggregator<Integer>() {
3420        private int largeReadOps = 0;
3421
3422        @Override
3423        public void accept(StatisticsData data) {
3424          largeReadOps += data.largeReadOps;
3425        }
3426
3427        public Integer aggregate() {
3428          return largeReadOps;
3429        }
3430      });
3431    }
3432
3433    /**
3434     * Get the number of file system write operations such as create, append 
3435     * rename etc.
3436     * @return number of write operations
3437     */
3438    public int getWriteOps() {
3439      return visitAll(new StatisticsAggregator<Integer>() {
3440        private int writeOps = 0;
3441
3442        @Override
3443        public void accept(StatisticsData data) {
3444          writeOps += data.writeOps;
3445        }
3446
3447        public Integer aggregate() {
3448          return writeOps;
3449        }
3450      });
3451    }
3452
3453    /**
3454     * In the common network topology setup, distance value should be an even
3455     * number such as 0, 2, 4, 6. To make it more general, we group distance
3456     * by {1, 2}, {3, 4} and {5 and beyond} for accounting. So if the caller
3457     * ask for bytes read for distance 2, the function will return the value
3458     * for group {1, 2}.
3459     * @param distance the network distance
3460     * @return the total number of bytes read by the network distance
3461     */
3462    public long getBytesReadByDistance(int distance) {
3463      long bytesRead;
3464      switch (distance) {
3465      case 0:
3466        bytesRead = getData().getBytesReadLocalHost();
3467        break;
3468      case 1:
3469      case 2:
3470        bytesRead = getData().getBytesReadDistanceOfOneOrTwo();
3471        break;
3472      case 3:
3473      case 4:
3474        bytesRead = getData().getBytesReadDistanceOfThreeOrFour();
3475        break;
3476      default:
3477        bytesRead = getData().getBytesReadDistanceOfFiveOrLarger();
3478        break;
3479      }
3480      return bytesRead;
3481    }
3482
3483    /**
3484     * Get all statistics data
3485     * MR or other frameworks can use the method to get all statistics at once.
3486     * @return the StatisticsData
3487     */
3488    public StatisticsData getData() {
3489      return visitAll(new StatisticsAggregator<StatisticsData>() {
3490        private StatisticsData all = new StatisticsData();
3491
3492        @Override
3493        public void accept(StatisticsData data) {
3494          all.add(data);
3495        }
3496
3497        public StatisticsData aggregate() {
3498          return all;
3499        }
3500      });
3501    }
3502
3503    @Override
3504    public String toString() {
3505      return visitAll(new StatisticsAggregator<String>() {
3506        private StatisticsData total = new StatisticsData();
3507
3508        @Override
3509        public void accept(StatisticsData data) {
3510          total.add(data);
3511        }
3512
3513        public String aggregate() {
3514          return total.toString();
3515        }
3516      });
3517    }
3518
3519    /**
3520     * Resets all statistics to 0.
3521     *
3522     * In order to reset, we add up all the thread-local statistics data, and
3523     * set rootData to the negative of that.
3524     *
3525     * This may seem like a counterintuitive way to reset the statsitics.  Why
3526     * can't we just zero out all the thread-local data?  Well, thread-local
3527     * data can only be modified by the thread that owns it.  If we tried to
3528     * modify the thread-local data from this thread, our modification might get
3529     * interleaved with a read-modify-write operation done by the thread that
3530     * owns the data.  That would result in our update getting lost.
3531     *
3532     * The approach used here avoids this problem because it only ever reads
3533     * (not writes) the thread-local data.  Both reads and writes to rootData
3534     * are done under the lock, so we're free to modify rootData from any thread
3535     * that holds the lock.
3536     */
3537    public void reset() {
3538      visitAll(new StatisticsAggregator<Void>() {
3539        private StatisticsData total = new StatisticsData();
3540
3541        @Override
3542        public void accept(StatisticsData data) {
3543          total.add(data);
3544        }
3545
3546        public Void aggregate() {
3547          total.negate();
3548          rootData.add(total);
3549          return null;
3550        }
3551      });
3552    }
3553    
3554    /**
3555     * Get the uri scheme associated with this statistics object.
3556     * @return the schema associated with this set of statistics
3557     */
3558    public String getScheme() {
3559      return scheme;
3560    }
3561
3562    @VisibleForTesting
3563    synchronized int getAllThreadLocalDataSize() {
3564      return allData.size();
3565    }
3566  }
3567  
3568  /**
3569   * Get the Map of Statistics object indexed by URI Scheme.
3570   * @return a Map having a key as URI scheme and value as Statistics object
3571   * @deprecated use {@link #getGlobalStorageStatistics()}
3572   */
3573  @Deprecated
3574  public static synchronized Map<String, Statistics> getStatistics() {
3575    Map<String, Statistics> result = new HashMap<String, Statistics>();
3576    for(Statistics stat: statisticsTable.values()) {
3577      result.put(stat.getScheme(), stat);
3578    }
3579    return result;
3580  }
3581
3582  /**
3583   * Return the FileSystem classes that have Statistics.
3584   * @deprecated use {@link #getGlobalStorageStatistics()}
3585   */
3586  @Deprecated
3587  public static synchronized List<Statistics> getAllStatistics() {
3588    return new ArrayList<Statistics>(statisticsTable.values());
3589  }
3590  
3591  /**
3592   * Get the statistics for a particular file system
3593   * @param cls the class to lookup
3594   * @return a statistics object
3595   * @deprecated use {@link #getGlobalStorageStatistics()}
3596   */
3597  @Deprecated
3598  public static synchronized Statistics getStatistics(final String scheme,
3599      Class<? extends FileSystem> cls) {
3600    checkArgument(scheme != null,
3601        "No statistics is allowed for a file system with null scheme!");
3602    Statistics result = statisticsTable.get(cls);
3603    if (result == null) {
3604      final Statistics newStats = new Statistics(scheme);
3605      statisticsTable.put(cls, newStats);
3606      result = newStats;
3607      GlobalStorageStatistics.INSTANCE.put(scheme,
3608          new StorageStatisticsProvider() {
3609            @Override
3610            public StorageStatistics provide() {
3611              return new FileSystemStorageStatistics(scheme, newStats);
3612            }
3613          });
3614    }
3615    return result;
3616  }
3617  
3618  /**
3619   * Reset all statistics for all file systems
3620   */
3621  public static synchronized void clearStatistics() {
3622    for(Statistics stat: statisticsTable.values()) {
3623      stat.reset();
3624    }
3625  }
3626
3627  /**
3628   * Print all statistics for all file systems
3629   */
3630  public static synchronized
3631  void printStatistics() throws IOException {
3632    for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3633            statisticsTable.entrySet()) {
3634      System.out.println("  FileSystem " + pair.getKey().getName() + 
3635                         ": " + pair.getValue());
3636    }
3637  }
3638
3639  // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3640  private static boolean symlinksEnabled = false;
3641
3642  private static Configuration conf = null;
3643
3644  @VisibleForTesting
3645  public static boolean areSymlinksEnabled() {
3646    return symlinksEnabled;
3647  }
3648
3649  @VisibleForTesting
3650  public static void enableSymlinks() {
3651    symlinksEnabled = true;
3652  }
3653
3654  /**
3655   * Get the StorageStatistics for this FileSystem object.  These statistics are
3656   * per-instance.  They are not shared with any other FileSystem object.
3657   *
3658   * <p>This is a default method which is intended to be overridden by
3659   * subclasses. The default implementation returns an empty storage statistics
3660   * object.</p>
3661   *
3662   * @return    The StorageStatistics for this FileSystem instance.
3663   *            Will never be null.
3664   */
3665  public StorageStatistics getStorageStatistics() {
3666    return new EmptyStorageStatistics(getUri().toString());
3667  }
3668
3669  /**
3670   * Get the global storage statistics.
3671   */
3672  public static GlobalStorageStatistics getGlobalStorageStatistics() {
3673    return GlobalStorageStatistics.INSTANCE;
3674  }
3675}