001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.ref.PhantomReference;
024import java.lang.ref.ReferenceQueue;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.security.PrivilegedExceptionAction;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.IdentityHashMap;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.ServiceLoader;
040import java.util.Set;
041import java.util.Stack;
042import java.util.TreeSet;
043import java.util.concurrent.atomic.AtomicLong;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.apache.hadoop.classification.InterfaceAudience;
048import org.apache.hadoop.classification.InterfaceStability;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.conf.Configured;
051import org.apache.hadoop.fs.Options.ChecksumOpt;
052import org.apache.hadoop.fs.Options.Rename;
053import org.apache.hadoop.fs.permission.AclEntry;
054import org.apache.hadoop.fs.permission.AclStatus;
055import org.apache.hadoop.fs.permission.FsAction;
056import org.apache.hadoop.fs.permission.FsPermission;
057import org.apache.hadoop.io.MultipleIOException;
058import org.apache.hadoop.io.Text;
059import org.apache.hadoop.net.NetUtils;
060import org.apache.hadoop.security.AccessControlException;
061import org.apache.hadoop.security.Credentials;
062import org.apache.hadoop.security.SecurityUtil;
063import org.apache.hadoop.security.UserGroupInformation;
064import org.apache.hadoop.security.token.Token;
065import org.apache.hadoop.util.DataChecksum;
066import org.apache.hadoop.util.Progressable;
067import org.apache.hadoop.util.ReflectionUtils;
068import org.apache.hadoop.util.ShutdownHookManager;
069import org.apache.hadoop.util.StringUtils;
070import org.apache.htrace.Span;
071import org.apache.htrace.Trace;
072import org.apache.htrace.TraceScope;
073
074import com.google.common.annotations.VisibleForTesting;
075
076/****************************************************************
077 * An abstract base class for a fairly generic filesystem.  It
078 * may be implemented as a distributed filesystem, or as a "local"
079 * one that reflects the locally-connected disk.  The local version
080 * exists for small Hadoop instances and for testing.
081 *
082 * <p>
083 *
084 * All user code that may potentially use the Hadoop Distributed
085 * File System should be written to use a FileSystem object.  The
086 * Hadoop DFS is a multi-machine system that appears as a single
087 * disk.  It's useful because of its fault tolerance and potentially
088 * very large capacity.
089 * 
090 * <p>
091 * The local implementation is {@link LocalFileSystem} and distributed
092 * implementation is DistributedFileSystem.
093 *****************************************************************/
094@InterfaceAudience.Public
095@InterfaceStability.Stable
096public abstract class FileSystem extends Configured implements Closeable {
097  public static final String FS_DEFAULT_NAME_KEY = 
098                   CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
099  public static final String DEFAULT_FS = 
100                   CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
101
102  public static final Log LOG = LogFactory.getLog(FileSystem.class);
103
104  /**
105   * Priority of the FileSystem shutdown hook.
106   */
107  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
108
109  /** FileSystem cache */
110  static final Cache CACHE = new Cache();
111
112  /** The key this instance is stored under in the cache. */
113  private Cache.Key key;
114
115  /** Recording statistics per a FileSystem class */
116  private static final Map<Class<? extends FileSystem>, Statistics> 
117    statisticsTable =
118      new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
119  
120  /**
121   * The statistics for this file system.
122   */
123  protected Statistics statistics;
124
125  /**
126   * A cache of files that should be deleted when filesystem is closed
127   * or the JVM is exited.
128   */
129  private Set<Path> deleteOnExit = new TreeSet<Path>();
130  
131  boolean resolveSymlinks;
132  /**
133   * This method adds a file system for testing so that we can find it later. It
134   * is only for testing.
135   * @param uri the uri to store it under
136   * @param conf the configuration to store it under
137   * @param fs the file system to store
138   * @throws IOException
139   */
140  static void addFileSystemForTesting(URI uri, Configuration conf,
141      FileSystem fs) throws IOException {
142    CACHE.map.put(new Cache.Key(uri, conf), fs);
143  }
144
145  /**
146   * Get a filesystem instance based on the uri, the passed
147   * configuration and the user
148   * @param uri of the filesystem
149   * @param conf the configuration to use
150   * @param user to perform the get as
151   * @return the filesystem instance
152   * @throws IOException
153   * @throws InterruptedException
154   */
155  public static FileSystem get(final URI uri, final Configuration conf,
156        final String user) throws IOException, InterruptedException {
157    String ticketCachePath =
158      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
159    UserGroupInformation ugi =
160        UserGroupInformation.getBestUGI(ticketCachePath, user);
161    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
162      @Override
163      public FileSystem run() throws IOException {
164        return get(uri, conf);
165      }
166    });
167  }
168
169  /**
170   * Returns the configured filesystem implementation.
171   * @param conf the configuration to use
172   */
173  public static FileSystem get(Configuration conf) throws IOException {
174    return get(getDefaultUri(conf), conf);
175  }
176  
177  /** Get the default filesystem URI from a configuration.
178   * @param conf the configuration to use
179   * @return the uri of the default filesystem
180   */
181  public static URI getDefaultUri(Configuration conf) {
182    return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
183  }
184
185  /** Set the default filesystem URI in a configuration.
186   * @param conf the configuration to alter
187   * @param uri the new default filesystem uri
188   */
189  public static void setDefaultUri(Configuration conf, URI uri) {
190    conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
191  }
192
193  /** Set the default filesystem URI in a configuration.
194   * @param conf the configuration to alter
195   * @param uri the new default filesystem uri
196   */
197  public static void setDefaultUri(Configuration conf, String uri) {
198    setDefaultUri(conf, URI.create(fixName(uri)));
199  }
200
201  /** Called after a new FileSystem instance is constructed.
202   * @param name a uri whose authority section names the host, port, etc.
203   *   for this FileSystem
204   * @param conf the configuration
205   */
206  public void initialize(URI name, Configuration conf) throws IOException {
207    statistics = getStatistics(name.getScheme(), getClass());    
208    resolveSymlinks = conf.getBoolean(
209        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
210        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
211  }
212
213  /**
214   * Return the protocol scheme for the FileSystem.
215   * <p/>
216   * This implementation throws an <code>UnsupportedOperationException</code>.
217   *
218   * @return the protocol scheme for the FileSystem.
219   */
220  public String getScheme() {
221    throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
222  }
223
224  /** Returns a URI whose scheme and authority identify this FileSystem.*/
225  public abstract URI getUri();
226  
227  /**
228   * Return a canonicalized form of this FileSystem's URI.
229   * 
230   * The default implementation simply calls {@link #canonicalizeUri(URI)}
231   * on the filesystem's own URI, so subclasses typically only need to
232   * implement that method.
233   *
234   * @see #canonicalizeUri(URI)
235   */
236  protected URI getCanonicalUri() {
237    return canonicalizeUri(getUri());
238  }
239  
240  /**
241   * Canonicalize the given URI.
242   * 
243   * This is filesystem-dependent, but may for example consist of
244   * canonicalizing the hostname using DNS and adding the default
245   * port if not specified.
246   * 
247   * The default implementation simply fills in the default port if
248   * not specified and if the filesystem has a default port.
249   *
250   * @return URI
251   * @see NetUtils#getCanonicalUri(URI, int)
252   */
253  protected URI canonicalizeUri(URI uri) {
254    if (uri.getPort() == -1 && getDefaultPort() > 0) {
255      // reconstruct the uri with the default port set
256      try {
257        uri = new URI(uri.getScheme(), uri.getUserInfo(),
258            uri.getHost(), getDefaultPort(),
259            uri.getPath(), uri.getQuery(), uri.getFragment());
260      } catch (URISyntaxException e) {
261        // Should never happen!
262        throw new AssertionError("Valid URI became unparseable: " +
263            uri);
264      }
265    }
266    
267    return uri;
268  }
269  
270  /**
271   * Get the default port for this file system.
272   * @return the default port or 0 if there isn't one
273   */
274  protected int getDefaultPort() {
275    return 0;
276  }
277
278  protected static FileSystem getFSofPath(final Path absOrFqPath,
279      final Configuration conf)
280      throws UnsupportedFileSystemException, IOException {
281    absOrFqPath.checkNotSchemeWithRelative();
282    absOrFqPath.checkNotRelative();
283
284    // Uses the default file system if not fully qualified
285    return get(absOrFqPath.toUri(), conf);
286  }
287
288  /**
289   * Get a canonical service name for this file system.  The token cache is
290   * the only user of the canonical service name, and uses it to lookup this
291   * filesystem's service tokens.
292   * If file system provides a token of its own then it must have a canonical
293   * name, otherwise canonical name can be null.
294   * 
295   * Default Impl: If the file system has child file systems 
296   * (such as an embedded file system) then it is assumed that the fs has no
297   * tokens of its own and hence returns a null name; otherwise a service
298   * name is built using Uri and port.
299   * 
300   * @return a service string that uniquely identifies this file system, null
301   *         if the filesystem does not implement tokens
302   * @see SecurityUtil#buildDTServiceName(URI, int) 
303   */
304  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
305  public String getCanonicalServiceName() {
306    return (getChildFileSystems() == null)
307      ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
308      : null;
309  }
310
311  /** @deprecated call #getUri() instead.*/
312  @Deprecated
313  public String getName() { return getUri().toString(); }
314
315  /** @deprecated call #get(URI,Configuration) instead. */
316  @Deprecated
317  public static FileSystem getNamed(String name, Configuration conf)
318    throws IOException {
319    return get(URI.create(fixName(name)), conf);
320  }
321  
322  /** Update old-format filesystem names, for back-compatibility.  This should
323   * eventually be replaced with a checkName() method that throws an exception
324   * for old-format names. */ 
325  private static String fixName(String name) {
326    // convert old-format name to new-format name
327    if (name.equals("local")) {         // "local" is now "file:///".
328      LOG.warn("\"local\" is a deprecated filesystem name."
329               +" Use \"file:///\" instead.");
330      name = "file:///";
331    } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
332      LOG.warn("\""+name+"\" is a deprecated filesystem name."
333               +" Use \"hdfs://"+name+"/\" instead.");
334      name = "hdfs://"+name;
335    }
336    return name;
337  }
338
339  /**
340   * Get the local file system.
341   * @param conf the configuration to configure the file system with
342   * @return a LocalFileSystem
343   */
344  public static LocalFileSystem getLocal(Configuration conf)
345    throws IOException {
346    return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
347  }
348
349  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
350   * of the URI determines a configuration property name,
351   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
352   * The entire URI is passed to the FileSystem instance's initialize method.
353   */
354  public static FileSystem get(URI uri, Configuration conf) throws IOException {
355    String scheme = uri.getScheme();
356    String authority = uri.getAuthority();
357
358    if (scheme == null && authority == null) {     // use default FS
359      return get(conf);
360    }
361
362    if (scheme != null && authority == null) {     // no authority
363      URI defaultUri = getDefaultUri(conf);
364      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
365          && defaultUri.getAuthority() != null) {  // & default has authority
366        return get(defaultUri, conf);              // return default
367      }
368    }
369    
370    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
371    if (conf.getBoolean(disableCacheName, false)) {
372      return createFileSystem(uri, conf);
373    }
374
375    return CACHE.get(uri, conf);
376  }
377
378  /**
379   * Returns the FileSystem for this URI's scheme and authority and the 
380   * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
381   * @param uri of the filesystem
382   * @param conf the configuration to use
383   * @param user to perform the get as
384   * @return filesystem instance
385   * @throws IOException
386   * @throws InterruptedException
387   */
388  public static FileSystem newInstance(final URI uri, final Configuration conf,
389      final String user) throws IOException, InterruptedException {
390    String ticketCachePath =
391      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
392    UserGroupInformation ugi =
393        UserGroupInformation.getBestUGI(ticketCachePath, user);
394    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
395      @Override
396      public FileSystem run() throws IOException {
397        return newInstance(uri,conf); 
398      }
399    });
400  }
401  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
402   * of the URI determines a configuration property name,
403   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
404   * The entire URI is passed to the FileSystem instance's initialize method.
405   * This always returns a new FileSystem object.
406   */
407  public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
408    String scheme = uri.getScheme();
409    String authority = uri.getAuthority();
410
411    if (scheme == null) {                       // no scheme: use default FS
412      return newInstance(conf);
413    }
414
415    if (authority == null) {                       // no authority
416      URI defaultUri = getDefaultUri(conf);
417      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
418          && defaultUri.getAuthority() != null) {  // & default has authority
419        return newInstance(defaultUri, conf);              // return default
420      }
421    }
422    return CACHE.getUnique(uri, conf);
423  }
424
425  /** Returns a unique configured filesystem implementation.
426   * This always returns a new FileSystem object.
427   * @param conf the configuration to use
428   */
429  public static FileSystem newInstance(Configuration conf) throws IOException {
430    return newInstance(getDefaultUri(conf), conf);
431  }
432
433  /**
434   * Get a unique local file system object
435   * @param conf the configuration to configure the file system with
436   * @return a LocalFileSystem
437   * This always returns a new FileSystem object.
438   */
439  public static LocalFileSystem newInstanceLocal(Configuration conf)
440    throws IOException {
441    return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
442  }
443
444  /**
445   * Close all cached filesystems. Be sure those filesystems are not
446   * used anymore.
447   * 
448   * @throws IOException
449   */
450  public static void closeAll() throws IOException {
451    CACHE.closeAll();
452  }
453
454  /**
455   * Close all cached filesystems for a given UGI. Be sure those filesystems 
456   * are not used anymore.
457   * @param ugi user group info to close
458   * @throws IOException
459   */
460  public static void closeAllForUGI(UserGroupInformation ugi) 
461  throws IOException {
462    CACHE.closeAll(ugi);
463  }
464
465  /** 
466   * Make sure that a path specifies a FileSystem.
467   * @param path to use
468   */
469  public Path makeQualified(Path path) {
470    checkPath(path);
471    return path.makeQualified(this.getUri(), this.getWorkingDirectory());
472  }
473    
474  /**
475   * Get a new delegation token for this file system.
476   * This is an internal method that should have been declared protected
477   * but wasn't historically.
478   * Callers should use {@link #addDelegationTokens(String, Credentials)}
479   * 
480   * @param renewer the account name that is allowed to renew the token.
481   * @return a new delegation token
482   * @throws IOException
483   */
484  @InterfaceAudience.Private()
485  public Token<?> getDelegationToken(String renewer) throws IOException {
486    return null;
487  }
488  
489  /**
490   * Obtain all delegation tokens used by this FileSystem that are not
491   * already present in the given Credentials.  Existing tokens will neither
492   * be verified as valid nor having the given renewer.  Missing tokens will
493   * be acquired and added to the given Credentials.
494   * 
495   * Default Impl: works for simple fs with its own token
496   * and also for an embedded fs whose tokens are those of its
497   * children file system (i.e. the embedded fs has not tokens of its
498   * own).
499   * 
500   * @param renewer the user allowed to renew the delegation tokens
501   * @param credentials cache in which to add new delegation tokens
502   * @return list of new delegation tokens
503   * @throws IOException
504   */
505  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
506  public Token<?>[] addDelegationTokens(
507      final String renewer, Credentials credentials) throws IOException {
508    if (credentials == null) {
509      credentials = new Credentials();
510    }
511    final List<Token<?>> tokens = new ArrayList<Token<?>>();
512    collectDelegationTokens(renewer, credentials, tokens);
513    return tokens.toArray(new Token<?>[tokens.size()]);
514  }
515  
516  /**
517   * Recursively obtain the tokens for this FileSystem and all descended
518   * FileSystems as determined by getChildFileSystems().
519   * @param renewer the user allowed to renew the delegation tokens
520   * @param credentials cache in which to add the new delegation tokens
521   * @param tokens list in which to add acquired tokens
522   * @throws IOException
523   */
524  private void collectDelegationTokens(final String renewer,
525                                       final Credentials credentials,
526                                       final List<Token<?>> tokens)
527                                           throws IOException {
528    final String serviceName = getCanonicalServiceName();
529    // Collect token of the this filesystem and then of its embedded children
530    if (serviceName != null) { // fs has token, grab it
531      final Text service = new Text(serviceName);
532      Token<?> token = credentials.getToken(service);
533      if (token == null) {
534        token = getDelegationToken(renewer);
535        if (token != null) {
536          tokens.add(token);
537          credentials.addToken(service, token);
538        }
539      }
540    }
541    // Now collect the tokens from the children
542    final FileSystem[] children = getChildFileSystems();
543    if (children != null) {
544      for (final FileSystem fs : children) {
545        fs.collectDelegationTokens(renewer, credentials, tokens);
546      }
547    }
548  }
549
550  /**
551   * Get all the immediate child FileSystems embedded in this FileSystem.
552   * It does not recurse and get grand children.  If a FileSystem
553   * has multiple child FileSystems, then it should return a unique list
554   * of those FileSystems.  Default is to return null to signify no children.
555   * 
556   * @return FileSystems used by this FileSystem
557   */
558  @InterfaceAudience.LimitedPrivate({ "HDFS" })
559  @VisibleForTesting
560  public FileSystem[] getChildFileSystems() {
561    return null;
562  }
563  
564  /** create a file with the provided permission
565   * The permission of the file is set to be the provided permission as in
566   * setPermission, not permission&~umask
567   * 
568   * It is implemented using two RPCs. It is understood that it is inefficient,
569   * but the implementation is thread-safe. The other option is to change the
570   * value of umask in configuration to be 0, but it is not thread-safe.
571   * 
572   * @param fs file system handle
573   * @param file the name of the file to be created
574   * @param permission the permission of the file
575   * @return an output stream
576   * @throws IOException
577   */
578  public static FSDataOutputStream create(FileSystem fs,
579      Path file, FsPermission permission) throws IOException {
580    // create the file with default permission
581    FSDataOutputStream out = fs.create(file);
582    // set its permission to the supplied one
583    fs.setPermission(file, permission);
584    return out;
585  }
586
587  /** create a directory with the provided permission
588   * The permission of the directory is set to be the provided permission as in
589   * setPermission, not permission&~umask
590   * 
591   * @see #create(FileSystem, Path, FsPermission)
592   * 
593   * @param fs file system handle
594   * @param dir the name of the directory to be created
595   * @param permission the permission of the directory
596   * @return true if the directory creation succeeds; false otherwise
597   * @throws IOException
598   */
599  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
600  throws IOException {
601    // create the directory using the default permission
602    boolean result = fs.mkdirs(dir);
603    // set its permission to be the supplied one
604    fs.setPermission(dir, permission);
605    return result;
606  }
607
608  ///////////////////////////////////////////////////////////////
609  // FileSystem
610  ///////////////////////////////////////////////////////////////
611
612  protected FileSystem() {
613    super(null);
614  }
615
616  /** 
617   * Check that a Path belongs to this FileSystem.
618   * @param path to check
619   */
620  protected void checkPath(Path path) {
621    URI uri = path.toUri();
622    String thatScheme = uri.getScheme();
623    if (thatScheme == null)                // fs is relative
624      return;
625    URI thisUri = getCanonicalUri();
626    String thisScheme = thisUri.getScheme();
627    //authority and scheme are not case sensitive
628    if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
629      String thisAuthority = thisUri.getAuthority();
630      String thatAuthority = uri.getAuthority();
631      if (thatAuthority == null &&                // path's authority is null
632          thisAuthority != null) {                // fs has an authority
633        URI defaultUri = getDefaultUri(getConf());
634        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
635          uri = defaultUri; // schemes match, so use this uri instead
636        } else {
637          uri = null; // can't determine auth of the path
638        }
639      }
640      if (uri != null) {
641        // canonicalize uri before comparing with this fs
642        uri = canonicalizeUri(uri);
643        thatAuthority = uri.getAuthority();
644        if (thisAuthority == thatAuthority ||       // authorities match
645            (thisAuthority != null &&
646             thisAuthority.equalsIgnoreCase(thatAuthority)))
647          return;
648      }
649    }
650    throw new IllegalArgumentException("Wrong FS: "+path+
651                                       ", expected: "+this.getUri());
652  }
653
654  /**
655   * Return an array containing hostnames, offset and size of 
656   * portions of the given file.  For a nonexistent 
657   * file or regions, null will be returned.
658   *
659   * This call is most helpful with DFS, where it returns 
660   * hostnames of machines that contain the given file.
661   *
662   * The FileSystem will simply return an elt containing 'localhost'.
663   *
664   * @param file FilesStatus to get data from
665   * @param start offset into the given file
666   * @param len length for which to get locations for
667   */
668  public BlockLocation[] getFileBlockLocations(FileStatus file, 
669      long start, long len) throws IOException {
670    if (file == null) {
671      return null;
672    }
673
674    if (start < 0 || len < 0) {
675      throw new IllegalArgumentException("Invalid start or len parameter");
676    }
677
678    if (file.getLen() <= start) {
679      return new BlockLocation[0];
680
681    }
682    String[] name = { "localhost:50010" };
683    String[] host = { "localhost" };
684    return new BlockLocation[] {
685      new BlockLocation(name, host, 0, file.getLen()) };
686  }
687 
688
689  /**
690   * Return an array containing hostnames, offset and size of 
691   * portions of the given file.  For a nonexistent 
692   * file or regions, null will be returned.
693   *
694   * This call is most helpful with DFS, where it returns 
695   * hostnames of machines that contain the given file.
696   *
697   * The FileSystem will simply return an elt containing 'localhost'.
698   *
699   * @param p path is used to identify an FS since an FS could have
700   *          another FS that it could be delegating the call to
701   * @param start offset into the given file
702   * @param len length for which to get locations for
703   */
704  public BlockLocation[] getFileBlockLocations(Path p, 
705      long start, long len) throws IOException {
706    if (p == null) {
707      throw new NullPointerException();
708    }
709    FileStatus file = getFileStatus(p);
710    return getFileBlockLocations(file, start, len);
711  }
712  
713  /**
714   * Return a set of server default configuration values
715   * @return server default configuration values
716   * @throws IOException
717   * @deprecated use {@link #getServerDefaults(Path)} instead
718   */
719  @Deprecated
720  public FsServerDefaults getServerDefaults() throws IOException {
721    Configuration conf = getConf();
722    // CRC32 is chosen as default as it is available in all 
723    // releases that support checksum.
724    // The client trash configuration is ignored.
725    return new FsServerDefaults(getDefaultBlockSize(), 
726        conf.getInt("io.bytes.per.checksum", 512), 
727        64 * 1024, 
728        getDefaultReplication(),
729        conf.getInt("io.file.buffer.size", 4096),
730        false,
731        CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
732        DataChecksum.Type.CRC32);
733  }
734
735  /**
736   * Return a set of server default configuration values
737   * @param p path is used to identify an FS since an FS could have
738   *          another FS that it could be delegating the call to
739   * @return server default configuration values
740   * @throws IOException
741   */
742  public FsServerDefaults getServerDefaults(Path p) throws IOException {
743    return getServerDefaults();
744  }
745
746  /**
747   * Return the fully-qualified path of path f resolving the path
748   * through any symlinks or mount point
749   * @param p path to be resolved
750   * @return fully qualified path 
751   * @throws FileNotFoundException
752   */
753   public Path resolvePath(final Path p) throws IOException {
754     checkPath(p);
755     return getFileStatus(p).getPath();
756   }
757
758  /**
759   * Opens an FSDataInputStream at the indicated Path.
760   * @param f the file name to open
761   * @param bufferSize the size of the buffer to be used.
762   */
763  public abstract FSDataInputStream open(Path f, int bufferSize)
764    throws IOException;
765    
766  /**
767   * Opens an FSDataInputStream at the indicated Path.
768   * @param f the file to open
769   */
770  public FSDataInputStream open(Path f) throws IOException {
771    return open(f, getConf().getInt("io.file.buffer.size", 4096));
772  }
773
774  /**
775   * Create an FSDataOutputStream at the indicated Path.
776   * Files are overwritten by default.
777   * @param f the file to create
778   */
779  public FSDataOutputStream create(Path f) throws IOException {
780    return create(f, true);
781  }
782
783  /**
784   * Create an FSDataOutputStream at the indicated Path.
785   * @param f the file to create
786   * @param overwrite if a file with this name already exists, then if true,
787   *   the file will be overwritten, and if false an exception will be thrown.
788   */
789  public FSDataOutputStream create(Path f, boolean overwrite)
790      throws IOException {
791    return create(f, overwrite, 
792                  getConf().getInt("io.file.buffer.size", 4096),
793                  getDefaultReplication(f),
794                  getDefaultBlockSize(f));
795  }
796
797  /**
798   * Create an FSDataOutputStream at the indicated Path with write-progress
799   * reporting.
800   * Files are overwritten by default.
801   * @param f the file to create
802   * @param progress to report progress
803   */
804  public FSDataOutputStream create(Path f, Progressable progress) 
805      throws IOException {
806    return create(f, true, 
807                  getConf().getInt("io.file.buffer.size", 4096),
808                  getDefaultReplication(f),
809                  getDefaultBlockSize(f), progress);
810  }
811
812  /**
813   * Create an FSDataOutputStream at the indicated Path.
814   * Files are overwritten by default.
815   * @param f the file to create
816   * @param replication the replication factor
817   */
818  public FSDataOutputStream create(Path f, short replication)
819      throws IOException {
820    return create(f, true, 
821                  getConf().getInt("io.file.buffer.size", 4096),
822                  replication,
823                  getDefaultBlockSize(f));
824  }
825
826  /**
827   * Create an FSDataOutputStream at the indicated Path with write-progress
828   * reporting.
829   * Files are overwritten by default.
830   * @param f the file to create
831   * @param replication the replication factor
832   * @param progress to report progress
833   */
834  public FSDataOutputStream create(Path f, short replication, 
835      Progressable progress) throws IOException {
836    return create(f, true, 
837                  getConf().getInt(
838                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
839                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
840                  replication,
841                  getDefaultBlockSize(f), progress);
842  }
843
844    
845  /**
846   * Create an FSDataOutputStream at the indicated Path.
847   * @param f the file name to create
848   * @param overwrite if a file with this name already exists, then if true,
849   *   the file will be overwritten, and if false an error will be thrown.
850   * @param bufferSize the size of the buffer to be used.
851   */
852  public FSDataOutputStream create(Path f, 
853                                   boolean overwrite,
854                                   int bufferSize
855                                   ) throws IOException {
856    return create(f, overwrite, bufferSize, 
857                  getDefaultReplication(f),
858                  getDefaultBlockSize(f));
859  }
860    
861  /**
862   * Create an FSDataOutputStream at the indicated Path with write-progress
863   * reporting.
864   * @param f the path of the file to open
865   * @param overwrite if a file with this name already exists, then if true,
866   *   the file will be overwritten, and if false an error will be thrown.
867   * @param bufferSize the size of the buffer to be used.
868   */
869  public FSDataOutputStream create(Path f, 
870                                   boolean overwrite,
871                                   int bufferSize,
872                                   Progressable progress
873                                   ) throws IOException {
874    return create(f, overwrite, bufferSize, 
875                  getDefaultReplication(f),
876                  getDefaultBlockSize(f), progress);
877  }
878    
879    
880  /**
881   * Create an FSDataOutputStream at the indicated Path.
882   * @param f the file name to open
883   * @param overwrite if a file with this name already exists, then if true,
884   *   the file will be overwritten, and if false an error will be thrown.
885   * @param bufferSize the size of the buffer to be used.
886   * @param replication required block replication for the file. 
887   */
888  public FSDataOutputStream create(Path f, 
889                                   boolean overwrite,
890                                   int bufferSize,
891                                   short replication,
892                                   long blockSize
893                                   ) throws IOException {
894    return create(f, overwrite, bufferSize, replication, blockSize, null);
895  }
896
897  /**
898   * Create an FSDataOutputStream at the indicated Path with write-progress
899   * reporting.
900   * @param f the file name to open
901   * @param overwrite if a file with this name already exists, then if true,
902   *   the file will be overwritten, and if false an error will be thrown.
903   * @param bufferSize the size of the buffer to be used.
904   * @param replication required block replication for the file. 
905   */
906  public FSDataOutputStream create(Path f,
907                                            boolean overwrite,
908                                            int bufferSize,
909                                            short replication,
910                                            long blockSize,
911                                            Progressable progress
912                                            ) throws IOException {
913    return this.create(f, FsPermission.getFileDefault().applyUMask(
914        FsPermission.getUMask(getConf())), overwrite, bufferSize,
915        replication, blockSize, progress);
916  }
917
918  /**
919   * Create an FSDataOutputStream at the indicated Path with write-progress
920   * reporting.
921   * @param f the file name to open
922   * @param permission
923   * @param overwrite if a file with this name already exists, then if true,
924   *   the file will be overwritten, and if false an error will be thrown.
925   * @param bufferSize the size of the buffer to be used.
926   * @param replication required block replication for the file.
927   * @param blockSize
928   * @param progress
929   * @throws IOException
930   * @see #setPermission(Path, FsPermission)
931   */
932  public abstract FSDataOutputStream create(Path f,
933      FsPermission permission,
934      boolean overwrite,
935      int bufferSize,
936      short replication,
937      long blockSize,
938      Progressable progress) throws IOException;
939  
940  /**
941   * Create an FSDataOutputStream at the indicated Path with write-progress
942   * reporting.
943   * @param f the file name to open
944   * @param permission
945   * @param flags {@link CreateFlag}s to use for this stream.
946   * @param bufferSize the size of the buffer to be used.
947   * @param replication required block replication for the file.
948   * @param blockSize
949   * @param progress
950   * @throws IOException
951   * @see #setPermission(Path, FsPermission)
952   */
953  public FSDataOutputStream create(Path f,
954      FsPermission permission,
955      EnumSet<CreateFlag> flags,
956      int bufferSize,
957      short replication,
958      long blockSize,
959      Progressable progress) throws IOException {
960    return create(f, permission, flags, bufferSize, replication,
961        blockSize, progress, null);
962  }
963  
964  /**
965   * Create an FSDataOutputStream at the indicated Path with a custom
966   * checksum option
967   * @param f the file name to open
968   * @param permission
969   * @param flags {@link CreateFlag}s to use for this stream.
970   * @param bufferSize the size of the buffer to be used.
971   * @param replication required block replication for the file.
972   * @param blockSize
973   * @param progress
974   * @param checksumOpt checksum parameter. If null, the values
975   *        found in conf will be used.
976   * @throws IOException
977   * @see #setPermission(Path, FsPermission)
978   */
979  public FSDataOutputStream create(Path f,
980      FsPermission permission,
981      EnumSet<CreateFlag> flags,
982      int bufferSize,
983      short replication,
984      long blockSize,
985      Progressable progress,
986      ChecksumOpt checksumOpt) throws IOException {
987    // Checksum options are ignored by default. The file systems that
988    // implement checksum need to override this method. The full
989    // support is currently only available in DFS.
990    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
991        bufferSize, replication, blockSize, progress);
992  }
993
994  /*.
995   * This create has been added to support the FileContext that processes
996   * the permission
997   * with umask before calling this method.
998   * This a temporary method added to support the transition from FileSystem
999   * to FileContext for user applications.
1000   */
1001  @Deprecated
1002  protected FSDataOutputStream primitiveCreate(Path f,
1003     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
1004     short replication, long blockSize, Progressable progress,
1005     ChecksumOpt checksumOpt) throws IOException {
1006
1007    boolean pathExists = exists(f);
1008    CreateFlag.validate(f, pathExists, flag);
1009    
1010    // Default impl  assumes that permissions do not matter and 
1011    // nor does the bytesPerChecksum  hence
1012    // calling the regular create is good enough.
1013    // FSs that implement permissions should override this.
1014
1015    if (pathExists && flag.contains(CreateFlag.APPEND)) {
1016      return append(f, bufferSize, progress);
1017    }
1018    
1019    return this.create(f, absolutePermission,
1020        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1021        blockSize, progress);
1022  }
1023  
1024  /**
1025   * This version of the mkdirs method assumes that the permission is absolute.
1026   * It has been added to support the FileContext that processes the permission
1027   * with umask before calling this method.
1028   * This a temporary method added to support the transition from FileSystem
1029   * to FileContext for user applications.
1030   */
1031  @Deprecated
1032  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1033    throws IOException {
1034    // Default impl is to assume that permissions do not matter and hence
1035    // calling the regular mkdirs is good enough.
1036    // FSs that implement permissions should override this.
1037   return this.mkdirs(f, absolutePermission);
1038  }
1039
1040
1041  /**
1042   * This version of the mkdirs method assumes that the permission is absolute.
1043   * It has been added to support the FileContext that processes the permission
1044   * with umask before calling this method.
1045   * This a temporary method added to support the transition from FileSystem
1046   * to FileContext for user applications.
1047   */
1048  @Deprecated
1049  protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1050                    boolean createParent)
1051    throws IOException {
1052    
1053    if (!createParent) { // parent must exist.
1054      // since the this.mkdirs makes parent dirs automatically
1055      // we must throw exception if parent does not exist.
1056      final FileStatus stat = getFileStatus(f.getParent());
1057      if (stat == null) {
1058        throw new FileNotFoundException("Missing parent:" + f);
1059      }
1060      if (!stat.isDirectory()) {
1061        throw new ParentNotDirectoryException("parent is not a dir");
1062      }
1063      // parent does exist - go ahead with mkdir of leaf
1064    }
1065    // Default impl is to assume that permissions do not matter and hence
1066    // calling the regular mkdirs is good enough.
1067    // FSs that implement permissions should override this.
1068    if (!this.mkdirs(f, absolutePermission)) {
1069      throw new IOException("mkdir of "+ f + " failed");
1070    }
1071  }
1072
1073  /**
1074   * Opens an FSDataOutputStream at the indicated Path with write-progress
1075   * reporting. Same as create(), except fails if parent directory doesn't
1076   * already exist.
1077   * @param f the file name to open
1078   * @param overwrite if a file with this name already exists, then if true,
1079   * the file will be overwritten, and if false an error will be thrown.
1080   * @param bufferSize the size of the buffer to be used.
1081   * @param replication required block replication for the file.
1082   * @param blockSize
1083   * @param progress
1084   * @throws IOException
1085   * @see #setPermission(Path, FsPermission)
1086   * @deprecated API only for 0.20-append
1087   */
1088  @Deprecated
1089  public FSDataOutputStream createNonRecursive(Path f,
1090      boolean overwrite,
1091      int bufferSize, short replication, long blockSize,
1092      Progressable progress) throws IOException {
1093    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1094        overwrite, bufferSize, replication, blockSize, progress);
1095  }
1096
1097  /**
1098   * Opens an FSDataOutputStream at the indicated Path with write-progress
1099   * reporting. Same as create(), except fails if parent directory doesn't
1100   * already exist.
1101   * @param f the file name to open
1102   * @param permission
1103   * @param overwrite if a file with this name already exists, then if true,
1104   * the file will be overwritten, and if false an error will be thrown.
1105   * @param bufferSize the size of the buffer to be used.
1106   * @param replication required block replication for the file.
1107   * @param blockSize
1108   * @param progress
1109   * @throws IOException
1110   * @see #setPermission(Path, FsPermission)
1111   * @deprecated API only for 0.20-append
1112   */
1113   @Deprecated
1114   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1115       boolean overwrite, int bufferSize, short replication, long blockSize,
1116       Progressable progress) throws IOException {
1117     return createNonRecursive(f, permission,
1118         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1119             : EnumSet.of(CreateFlag.CREATE), bufferSize,
1120             replication, blockSize, progress);
1121   }
1122
1123   /**
1124    * Opens an FSDataOutputStream at the indicated Path with write-progress
1125    * reporting. Same as create(), except fails if parent directory doesn't
1126    * already exist.
1127    * @param f the file name to open
1128    * @param permission
1129    * @param flags {@link CreateFlag}s to use for this stream.
1130    * @param bufferSize the size of the buffer to be used.
1131    * @param replication required block replication for the file.
1132    * @param blockSize
1133    * @param progress
1134    * @throws IOException
1135    * @see #setPermission(Path, FsPermission)
1136    * @deprecated API only for 0.20-append
1137    */
1138    @Deprecated
1139    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1140        EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1141        Progressable progress) throws IOException {
1142      throw new IOException("createNonRecursive unsupported for this filesystem "
1143          + this.getClass());
1144    }
1145
1146  /**
1147   * Creates the given Path as a brand-new zero-length file.  If
1148   * create fails, or if it already existed, return false.
1149   *
1150   * @param f path to use for create
1151   */
1152  public boolean createNewFile(Path f) throws IOException {
1153    if (exists(f)) {
1154      return false;
1155    } else {
1156      create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
1157      return true;
1158    }
1159  }
1160
1161  /**
1162   * Append to an existing file (optional operation).
1163   * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
1164   * @param f the existing file to be appended.
1165   * @throws IOException
1166   */
1167  public FSDataOutputStream append(Path f) throws IOException {
1168    return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
1169  }
1170  /**
1171   * Append to an existing file (optional operation).
1172   * Same as append(f, bufferSize, null).
1173   * @param f the existing file to be appended.
1174   * @param bufferSize the size of the buffer to be used.
1175   * @throws IOException
1176   */
1177  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1178    return append(f, bufferSize, null);
1179  }
1180
1181  /**
1182   * Append to an existing file (optional operation).
1183   * @param f the existing file to be appended.
1184   * @param bufferSize the size of the buffer to be used.
1185   * @param progress for reporting progress if it is not null.
1186   * @throws IOException
1187   */
1188  public abstract FSDataOutputStream append(Path f, int bufferSize,
1189      Progressable progress) throws IOException;
1190
1191  /**
1192   * Concat existing files together.
1193   * @param trg the path to the target destination.
1194   * @param psrcs the paths to the sources to use for the concatenation.
1195   * @throws IOException
1196   */
1197  public void concat(final Path trg, final Path [] psrcs) throws IOException {
1198    throw new UnsupportedOperationException("Not implemented by the " + 
1199        getClass().getSimpleName() + " FileSystem implementation");
1200  }
1201
1202 /**
1203   * Get replication.
1204   * 
1205   * @deprecated Use getFileStatus() instead
1206   * @param src file name
1207   * @return file replication
1208   * @throws IOException
1209   */ 
1210  @Deprecated
1211  public short getReplication(Path src) throws IOException {
1212    return getFileStatus(src).getReplication();
1213  }
1214
1215  /**
1216   * Set replication for an existing file.
1217   * 
1218   * @param src file name
1219   * @param replication new replication
1220   * @throws IOException
1221   * @return true if successful;
1222   *         false if file does not exist or is a directory
1223   */
1224  public boolean setReplication(Path src, short replication)
1225    throws IOException {
1226    return true;
1227  }
1228
1229  /**
1230   * Renames Path src to Path dst.  Can take place on local fs
1231   * or remote DFS.
1232   * @param src path to be renamed
1233   * @param dst new path after rename
1234   * @throws IOException on failure
1235   * @return true if rename is successful
1236   */
1237  public abstract boolean rename(Path src, Path dst) throws IOException;
1238
1239  /**
1240   * Renames Path src to Path dst
1241   * <ul>
1242   * <li
1243   * <li>Fails if src is a file and dst is a directory.
1244   * <li>Fails if src is a directory and dst is a file.
1245   * <li>Fails if the parent of dst does not exist or is a file.
1246   * </ul>
1247   * <p>
1248   * If OVERWRITE option is not passed as an argument, rename fails
1249   * if the dst already exists.
1250   * <p>
1251   * If OVERWRITE option is passed as an argument, rename overwrites
1252   * the dst if it is a file or an empty directory. Rename fails if dst is
1253   * a non-empty directory.
1254   * <p>
1255   * Note that atomicity of rename is dependent on the file system
1256   * implementation. Please refer to the file system documentation for
1257   * details. This default implementation is non atomic.
1258   * <p>
1259   * This method is deprecated since it is a temporary method added to 
1260   * support the transition from FileSystem to FileContext for user 
1261   * applications.
1262   * 
1263   * @param src path to be renamed
1264   * @param dst new path after rename
1265   * @throws IOException on failure
1266   */
1267  @Deprecated
1268  protected void rename(final Path src, final Path dst,
1269      final Rename... options) throws IOException {
1270    // Default implementation
1271    final FileStatus srcStatus = getFileLinkStatus(src);
1272    if (srcStatus == null) {
1273      throw new FileNotFoundException("rename source " + src + " not found.");
1274    }
1275
1276    boolean overwrite = false;
1277    if (null != options) {
1278      for (Rename option : options) {
1279        if (option == Rename.OVERWRITE) {
1280          overwrite = true;
1281        }
1282      }
1283    }
1284
1285    FileStatus dstStatus;
1286    try {
1287      dstStatus = getFileLinkStatus(dst);
1288    } catch (IOException e) {
1289      dstStatus = null;
1290    }
1291    if (dstStatus != null) {
1292      if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1293        throw new IOException("Source " + src + " Destination " + dst
1294            + " both should be either file or directory");
1295      }
1296      if (!overwrite) {
1297        throw new FileAlreadyExistsException("rename destination " + dst
1298            + " already exists.");
1299      }
1300      // Delete the destination that is a file or an empty directory
1301      if (dstStatus.isDirectory()) {
1302        FileStatus[] list = listStatus(dst);
1303        if (list != null && list.length != 0) {
1304          throw new IOException(
1305              "rename cannot overwrite non empty destination directory " + dst);
1306        }
1307      }
1308      delete(dst, false);
1309    } else {
1310      final Path parent = dst.getParent();
1311      final FileStatus parentStatus = getFileStatus(parent);
1312      if (parentStatus == null) {
1313        throw new FileNotFoundException("rename destination parent " + parent
1314            + " not found.");
1315      }
1316      if (!parentStatus.isDirectory()) {
1317        throw new ParentNotDirectoryException("rename destination parent " + parent
1318            + " is a file.");
1319      }
1320    }
1321    if (!rename(src, dst)) {
1322      throw new IOException("rename from " + src + " to " + dst + " failed.");
1323    }
1324  }
1325
1326  /**
1327   * Truncate the file in the indicated path to the indicated size.
1328   * <ul>
1329   * <li>Fails if path is a directory.
1330   * <li>Fails if path does not exist.
1331   * <li>Fails if path is not closed.
1332   * <li>Fails if new size is greater than current size.
1333   * </ul>
1334   * @param f The path to the file to be truncated
1335   * @param newLength The size the file is to be truncated to
1336   *
1337   * @return <code>true</code> if the file has been truncated to the desired
1338   * <code>newLength</code> and is immediately available to be reused for
1339   * write operations such as <code>append</code>, or
1340   * <code>false</code> if a background process of adjusting the length of
1341   * the last block has been started, and clients should wait for it to
1342   * complete before proceeding with further file updates.
1343   */
1344  public boolean truncate(Path f, long newLength) throws IOException {
1345    throw new UnsupportedOperationException("Not implemented by the " +
1346        getClass().getSimpleName() + " FileSystem implementation");
1347  }
1348  
1349  /**
1350   * Delete a file 
1351   * @deprecated Use {@link #delete(Path, boolean)} instead.
1352   */
1353  @Deprecated
1354  public boolean delete(Path f) throws IOException {
1355    return delete(f, true);
1356  }
1357  
1358  /** Delete a file.
1359   *
1360   * @param f the path to delete.
1361   * @param recursive if path is a directory and set to 
1362   * true, the directory is deleted else throws an exception. In
1363   * case of a file the recursive can be set to either true or false. 
1364   * @return  true if delete is successful else false. 
1365   * @throws IOException
1366   */
1367  public abstract boolean delete(Path f, boolean recursive) throws IOException;
1368
1369  /**
1370   * Mark a path to be deleted when FileSystem is closed.
1371   * When the JVM shuts down,
1372   * all FileSystem objects will be closed automatically.
1373   * Then,
1374   * the marked path will be deleted as a result of closing the FileSystem.
1375   *
1376   * The path has to exist in the file system.
1377   * 
1378   * @param f the path to delete.
1379   * @return  true if deleteOnExit is successful, otherwise false.
1380   * @throws IOException
1381   */
1382  public boolean deleteOnExit(Path f) throws IOException {
1383    if (!exists(f)) {
1384      return false;
1385    }
1386    synchronized (deleteOnExit) {
1387      deleteOnExit.add(f);
1388    }
1389    return true;
1390  }
1391  
1392  /**
1393   * Cancel the deletion of the path when the FileSystem is closed
1394   * @param f the path to cancel deletion
1395   */
1396  public boolean cancelDeleteOnExit(Path f) {
1397    synchronized (deleteOnExit) {
1398      return deleteOnExit.remove(f);
1399    }
1400  }
1401
1402  /**
1403   * Delete all files that were marked as delete-on-exit. This recursively
1404   * deletes all files in the specified paths.
1405   */
1406  protected void processDeleteOnExit() {
1407    synchronized (deleteOnExit) {
1408      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1409        Path path = iter.next();
1410        try {
1411          if (exists(path)) {
1412            delete(path, true);
1413          }
1414        }
1415        catch (IOException e) {
1416          LOG.info("Ignoring failure to deleteOnExit for path " + path);
1417        }
1418        iter.remove();
1419      }
1420    }
1421  }
1422  
1423  /** Check if exists.
1424   * @param f source file
1425   */
1426  public boolean exists(Path f) throws IOException {
1427    try {
1428      return getFileStatus(f) != null;
1429    } catch (FileNotFoundException e) {
1430      return false;
1431    }
1432  }
1433
1434  /** True iff the named path is a directory.
1435   * Note: Avoid using this method. Instead reuse the FileStatus 
1436   * returned by getFileStatus() or listStatus() methods.
1437   * @param f path to check
1438   */
1439  public boolean isDirectory(Path f) throws IOException {
1440    try {
1441      return getFileStatus(f).isDirectory();
1442    } catch (FileNotFoundException e) {
1443      return false;               // f does not exist
1444    }
1445  }
1446
1447  /** True iff the named path is a regular file.
1448   * Note: Avoid using this method. Instead reuse the FileStatus 
1449   * returned by getFileStatus() or listStatus() methods.
1450   * @param f path to check
1451   */
1452  public boolean isFile(Path f) throws IOException {
1453    try {
1454      return getFileStatus(f).isFile();
1455    } catch (FileNotFoundException e) {
1456      return false;               // f does not exist
1457    }
1458  }
1459  
1460  /** The number of bytes in a file. */
1461  /** @deprecated Use getFileStatus() instead */
1462  @Deprecated
1463  public long getLength(Path f) throws IOException {
1464    return getFileStatus(f).getLen();
1465  }
1466    
1467  /** Return the {@link ContentSummary} of a given {@link Path}.
1468  * @param f path to use
1469  */
1470  public ContentSummary getContentSummary(Path f) throws IOException {
1471    FileStatus status = getFileStatus(f);
1472    if (status.isFile()) {
1473      // f is a file
1474      long length = status.getLen();
1475      return new ContentSummary.Builder().length(length).
1476          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1477    }
1478    // f is a directory
1479    long[] summary = {0, 0, 1};
1480    for(FileStatus s : listStatus(f)) {
1481      long length = s.getLen();
1482      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1483          new ContentSummary.Builder().length(length).
1484          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1485      summary[0] += c.getLength();
1486      summary[1] += c.getFileCount();
1487      summary[2] += c.getDirectoryCount();
1488    }
1489    return new ContentSummary.Builder().length(summary[0]).
1490        fileCount(summary[1]).directoryCount(summary[2]).
1491        spaceConsumed(summary[0]).build();
1492  }
1493
1494  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1495    @Override
1496    public boolean accept(Path file) {
1497      return true;
1498    }
1499  };
1500    
1501  /**
1502   * List the statuses of the files/directories in the given path if the path is
1503   * a directory.
1504   * <p>
1505   * Does not guarantee to return the List of files/directories status in a
1506   * sorted order.
1507   * @param f given path
1508   * @return the statuses of the files/directories in the given patch
1509   * @throws FileNotFoundException when the path does not exist;
1510   *         IOException see specific implementation
1511   */
1512  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1513                                                         IOException;
1514    
1515  /*
1516   * Filter files/directories in the given path using the user-supplied path
1517   * filter. Results are added to the given array <code>results</code>.
1518   */
1519  private void listStatus(ArrayList<FileStatus> results, Path f,
1520      PathFilter filter) throws FileNotFoundException, IOException {
1521    FileStatus listing[] = listStatus(f);
1522    if (listing == null) {
1523      throw new IOException("Error accessing " + f);
1524    }
1525
1526    for (int i = 0; i < listing.length; i++) {
1527      if (filter.accept(listing[i].getPath())) {
1528        results.add(listing[i]);
1529      }
1530    }
1531  }
1532
1533  /**
1534   * @return an iterator over the corrupt files under the given path
1535   * (may contain duplicates if a file has more than one corrupt block)
1536   * @throws IOException
1537   */
1538  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1539    throws IOException {
1540    throw new UnsupportedOperationException(getClass().getCanonicalName() +
1541                                            " does not support" +
1542                                            " listCorruptFileBlocks");
1543  }
1544
1545  /**
1546   * Filter files/directories in the given path using the user-supplied path
1547   * filter.
1548   * <p>
1549   * Does not guarantee to return the List of files/directories status in a
1550   * sorted order.
1551   * 
1552   * @param f
1553   *          a path name
1554   * @param filter
1555   *          the user-supplied path filter
1556   * @return an array of FileStatus objects for the files under the given path
1557   *         after applying the filter
1558   * @throws FileNotFoundException when the path does not exist;
1559   *         IOException see specific implementation   
1560   */
1561  public FileStatus[] listStatus(Path f, PathFilter filter) 
1562                                   throws FileNotFoundException, IOException {
1563    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1564    listStatus(results, f, filter);
1565    return results.toArray(new FileStatus[results.size()]);
1566  }
1567
1568  /**
1569   * Filter files/directories in the given list of paths using default
1570   * path filter.
1571   * <p>
1572   * Does not guarantee to return the List of files/directories status in a
1573   * sorted order.
1574   * 
1575   * @param files
1576   *          a list of paths
1577   * @return a list of statuses for the files under the given paths after
1578   *         applying the filter default Path filter
1579   * @throws FileNotFoundException when the path does not exist;
1580   *         IOException see specific implementation
1581   */
1582  public FileStatus[] listStatus(Path[] files)
1583      throws FileNotFoundException, IOException {
1584    return listStatus(files, DEFAULT_FILTER);
1585  }
1586
1587  /**
1588   * Filter files/directories in the given list of paths using user-supplied
1589   * path filter.
1590   * <p>
1591   * Does not guarantee to return the List of files/directories status in a
1592   * sorted order.
1593   * 
1594   * @param files
1595   *          a list of paths
1596   * @param filter
1597   *          the user-supplied path filter
1598   * @return a list of statuses for the files under the given paths after
1599   *         applying the filter
1600   * @throws FileNotFoundException when the path does not exist;
1601   *         IOException see specific implementation
1602   */
1603  public FileStatus[] listStatus(Path[] files, PathFilter filter)
1604      throws FileNotFoundException, IOException {
1605    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1606    for (int i = 0; i < files.length; i++) {
1607      listStatus(results, files[i], filter);
1608    }
1609    return results.toArray(new FileStatus[results.size()]);
1610  }
1611
1612  /**
1613   * <p>Return all the files that match filePattern and are not checksum
1614   * files. Results are sorted by their names.
1615   * 
1616   * <p>
1617   * A filename pattern is composed of <i>regular</i> characters and
1618   * <i>special pattern matching</i> characters, which are:
1619   *
1620   * <dl>
1621   *  <dd>
1622   *   <dl>
1623   *    <p>
1624   *    <dt> <tt> ? </tt>
1625   *    <dd> Matches any single character.
1626   *
1627   *    <p>
1628   *    <dt> <tt> * </tt>
1629   *    <dd> Matches zero or more characters.
1630   *
1631   *    <p>
1632   *    <dt> <tt> [<i>abc</i>] </tt>
1633   *    <dd> Matches a single character from character set
1634   *     <tt>{<i>a,b,c</i>}</tt>.
1635   *
1636   *    <p>
1637   *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1638   *    <dd> Matches a single character from the character range
1639   *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1640   *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1641   *
1642   *    <p>
1643   *    <dt> <tt> [^<i>a</i>] </tt>
1644   *    <dd> Matches a single character that is not from character set or range
1645   *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1646   *     immediately to the right of the opening bracket.
1647   *
1648   *    <p>
1649   *    <dt> <tt> \<i>c</i> </tt>
1650   *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1651   *
1652   *    <p>
1653   *    <dt> <tt> {ab,cd} </tt>
1654   *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1655   *    
1656   *    <p>
1657   *    <dt> <tt> {ab,c{de,fh}} </tt>
1658   *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1659   *
1660   *   </dl>
1661   *  </dd>
1662   * </dl>
1663   *
1664   * @param pathPattern a regular expression specifying a pth pattern
1665
1666   * @return an array of paths that match the path pattern
1667   * @throws IOException
1668   */
1669  public FileStatus[] globStatus(Path pathPattern) throws IOException {
1670    return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1671  }
1672  
1673  /**
1674   * Return an array of FileStatus objects whose path names match
1675   * {@code pathPattern} and is accepted by the user-supplied path filter.
1676   * Results are sorted by their path names.
1677   * 
1678   * @param pathPattern a regular expression specifying the path pattern
1679   * @param filter a user-supplied path filter
1680   * @return null if {@code pathPattern} has no glob and the path does not exist
1681   *         an empty array if {@code pathPattern} has a glob and no path
1682   *         matches it else an array of {@link FileStatus} objects matching the
1683   *         pattern
1684   * @throws IOException if any I/O error occurs when fetching file status
1685   */
1686  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1687      throws IOException {
1688    return new Globber(this, pathPattern, filter).glob();
1689  }
1690  
1691  /**
1692   * List the statuses of the files/directories in the given path if the path is
1693   * a directory. 
1694   * Return the file's status and block locations If the path is a file.
1695   * 
1696   * If a returned status is a file, it contains the file's block locations.
1697   * 
1698   * @param f is the path
1699   *
1700   * @return an iterator that traverses statuses of the files/directories 
1701   *         in the given path
1702   *
1703   * @throws FileNotFoundException If <code>f</code> does not exist
1704   * @throws IOException If an I/O error occurred
1705   */
1706  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1707  throws FileNotFoundException, IOException {
1708    return listLocatedStatus(f, DEFAULT_FILTER);
1709  }
1710
1711  /**
1712   * Listing a directory
1713   * The returned results include its block location if it is a file
1714   * The results are filtered by the given path filter
1715   * @param f a path
1716   * @param filter a path filter
1717   * @return an iterator that traverses statuses of the files/directories 
1718   *         in the given path
1719   * @throws FileNotFoundException if <code>f</code> does not exist
1720   * @throws IOException if any I/O error occurred
1721   */
1722  protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1723      final PathFilter filter)
1724  throws FileNotFoundException, IOException {
1725    return new RemoteIterator<LocatedFileStatus>() {
1726      private final FileStatus[] stats = listStatus(f, filter);
1727      private int i = 0;
1728
1729      @Override
1730      public boolean hasNext() {
1731        return i<stats.length;
1732      }
1733
1734      @Override
1735      public LocatedFileStatus next() throws IOException {
1736        if (!hasNext()) {
1737          throw new NoSuchElementException("No more entries in " + f);
1738        }
1739        FileStatus result = stats[i++];
1740        BlockLocation[] locs = result.isFile() ?
1741            getFileBlockLocations(result.getPath(), 0, result.getLen()) :
1742            null;
1743        return new LocatedFileStatus(result, locs);
1744      }
1745    };
1746  }
1747
1748  /**
1749   * Returns a remote iterator so that followup calls are made on demand
1750   * while consuming the entries. Each file system implementation should
1751   * override this method and provide a more efficient implementation, if
1752   * possible. 
1753   * Does not guarantee to return the iterator that traverses statuses
1754   * of the files in a sorted order.
1755   *
1756   * @param p target path
1757   * @return remote iterator
1758   */
1759  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
1760  throws FileNotFoundException, IOException {
1761    return new RemoteIterator<FileStatus>() {
1762      private final FileStatus[] stats = listStatus(p);
1763      private int i = 0;
1764
1765      @Override
1766      public boolean hasNext() {
1767        return i<stats.length;
1768      }
1769
1770      @Override
1771      public FileStatus next() throws IOException {
1772        if (!hasNext()) {
1773          throw new NoSuchElementException("No more entry in " + p);
1774        }
1775        return stats[i++];
1776      }
1777    };
1778  }
1779
1780  /**
1781   * List the statuses and block locations of the files in the given path.
1782   * Does not guarantee to return the iterator that traverses statuses
1783   * of the files in a sorted order.
1784   * 
1785   * If the path is a directory, 
1786   *   if recursive is false, returns files in the directory;
1787   *   if recursive is true, return files in the subtree rooted at the path.
1788   * If the path is a file, return the file's status and block locations.
1789   * 
1790   * @param f is the path
1791   * @param recursive if the subdirectories need to be traversed recursively
1792   *
1793   * @return an iterator that traverses statuses of the files
1794   *
1795   * @throws FileNotFoundException when the path does not exist;
1796   *         IOException see specific implementation
1797   */
1798  public RemoteIterator<LocatedFileStatus> listFiles(
1799      final Path f, final boolean recursive)
1800  throws FileNotFoundException, IOException {
1801    return new RemoteIterator<LocatedFileStatus>() {
1802      private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1803        new Stack<RemoteIterator<LocatedFileStatus>>();
1804      private RemoteIterator<LocatedFileStatus> curItor =
1805        listLocatedStatus(f);
1806      private LocatedFileStatus curFile;
1807     
1808      @Override
1809      public boolean hasNext() throws IOException {
1810        while (curFile == null) {
1811          if (curItor.hasNext()) {
1812            handleFileStat(curItor.next());
1813          } else if (!itors.empty()) {
1814            curItor = itors.pop();
1815          } else {
1816            return false;
1817          }
1818        }
1819        return true;
1820      }
1821
1822      /**
1823       * Process the input stat.
1824       * If it is a file, return the file stat.
1825       * If it is a directory, traverse the directory if recursive is true;
1826       * ignore it if recursive is false.
1827       * @param stat input status
1828       * @throws IOException if any IO error occurs
1829       */
1830      private void handleFileStat(LocatedFileStatus stat) throws IOException {
1831        if (stat.isFile()) { // file
1832          curFile = stat;
1833        } else if (recursive) { // directory
1834          itors.push(curItor);
1835          curItor = listLocatedStatus(stat.getPath());
1836        }
1837      }
1838
1839      @Override
1840      public LocatedFileStatus next() throws IOException {
1841        if (hasNext()) {
1842          LocatedFileStatus result = curFile;
1843          curFile = null;
1844          return result;
1845        } 
1846        throw new java.util.NoSuchElementException("No more entry in " + f);
1847      }
1848    };
1849  }
1850  
1851  /** Return the current user's home directory in this filesystem.
1852   * The default implementation returns "/user/$USER/".
1853   */
1854  public Path getHomeDirectory() {
1855    return this.makeQualified(
1856        new Path("/user/"+System.getProperty("user.name")));
1857  }
1858
1859
1860  /**
1861   * Set the current working directory for the given file system. All relative
1862   * paths will be resolved relative to it.
1863   * 
1864   * @param new_dir
1865   */
1866  public abstract void setWorkingDirectory(Path new_dir);
1867    
1868  /**
1869   * Get the current working directory for the given file system
1870   * @return the directory pathname
1871   */
1872  public abstract Path getWorkingDirectory();
1873  
1874  
1875  /**
1876   * Note: with the new FilesContext class, getWorkingDirectory()
1877   * will be removed. 
1878   * The working directory is implemented in FilesContext.
1879   * 
1880   * Some file systems like LocalFileSystem have an initial workingDir
1881   * that we use as the starting workingDir. For other file systems
1882   * like HDFS there is no built in notion of an initial workingDir.
1883   * 
1884   * @return if there is built in notion of workingDir then it
1885   * is returned; else a null is returned.
1886   */
1887  protected Path getInitialWorkingDirectory() {
1888    return null;
1889  }
1890
1891  /**
1892   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1893   */
1894  public boolean mkdirs(Path f) throws IOException {
1895    return mkdirs(f, FsPermission.getDirDefault());
1896  }
1897
1898  /**
1899   * Make the given file and all non-existent parents into
1900   * directories. Has the semantics of Unix 'mkdir -p'.
1901   * Existence of the directory hierarchy is not an error.
1902   * @param f path to create
1903   * @param permission to apply to f
1904   */
1905  public abstract boolean mkdirs(Path f, FsPermission permission
1906      ) throws IOException;
1907
1908  /**
1909   * The src file is on the local disk.  Add it to FS at
1910   * the given dst name and the source is kept intact afterwards
1911   * @param src path
1912   * @param dst path
1913   */
1914  public void copyFromLocalFile(Path src, Path dst)
1915    throws IOException {
1916    copyFromLocalFile(false, src, dst);
1917  }
1918
1919  /**
1920   * The src files is on the local disk.  Add it to FS at
1921   * the given dst name, removing the source afterwards.
1922   * @param srcs path
1923   * @param dst path
1924   */
1925  public void moveFromLocalFile(Path[] srcs, Path dst)
1926    throws IOException {
1927    copyFromLocalFile(true, true, srcs, dst);
1928  }
1929
1930  /**
1931   * The src file is on the local disk.  Add it to FS at
1932   * the given dst name, removing the source afterwards.
1933   * @param src path
1934   * @param dst path
1935   */
1936  public void moveFromLocalFile(Path src, Path dst)
1937    throws IOException {
1938    copyFromLocalFile(true, src, dst);
1939  }
1940
1941  /**
1942   * The src file is on the local disk.  Add it to FS at
1943   * the given dst name.
1944   * delSrc indicates if the source should be removed
1945   * @param delSrc whether to delete the src
1946   * @param src path
1947   * @param dst path
1948   */
1949  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1950    throws IOException {
1951    copyFromLocalFile(delSrc, true, src, dst);
1952  }
1953  
1954  /**
1955   * The src files are on the local disk.  Add it to FS at
1956   * the given dst name.
1957   * delSrc indicates if the source should be removed
1958   * @param delSrc whether to delete the src
1959   * @param overwrite whether to overwrite an existing file
1960   * @param srcs array of paths which are source
1961   * @param dst path
1962   */
1963  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1964                                Path[] srcs, Path dst)
1965    throws IOException {
1966    Configuration conf = getConf();
1967    FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1968  }
1969  
1970  /**
1971   * The src file is on the local disk.  Add it to FS at
1972   * the given dst name.
1973   * delSrc indicates if the source should be removed
1974   * @param delSrc whether to delete the src
1975   * @param overwrite whether to overwrite an existing file
1976   * @param src path
1977   * @param dst path
1978   */
1979  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1980                                Path src, Path dst)
1981    throws IOException {
1982    Configuration conf = getConf();
1983    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1984  }
1985    
1986  /**
1987   * The src file is under FS, and the dst is on the local disk.
1988   * Copy it from FS control to the local dst name.
1989   * @param src path
1990   * @param dst path
1991   */
1992  public void copyToLocalFile(Path src, Path dst) throws IOException {
1993    copyToLocalFile(false, src, dst);
1994  }
1995    
1996  /**
1997   * The src file is under FS, and the dst is on the local disk.
1998   * Copy it from FS control to the local dst name.
1999   * Remove the source afterwards
2000   * @param src path
2001   * @param dst path
2002   */
2003  public void moveToLocalFile(Path src, Path dst) throws IOException {
2004    copyToLocalFile(true, src, dst);
2005  }
2006
2007  /**
2008   * The src file is under FS, and the dst is on the local disk.
2009   * Copy it from FS control to the local dst name.
2010   * delSrc indicates if the src will be removed or not.
2011   * @param delSrc whether to delete the src
2012   * @param src path
2013   * @param dst path
2014   */   
2015  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
2016    throws IOException {
2017    copyToLocalFile(delSrc, src, dst, false);
2018  }
2019  
2020    /**
2021   * The src file is under FS, and the dst is on the local disk. Copy it from FS
2022   * control to the local dst name. delSrc indicates if the src will be removed
2023   * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
2024   * as local file system or not. RawLocalFileSystem is non crc file system.So,
2025   * It will not create any crc files at local.
2026   * 
2027   * @param delSrc
2028   *          whether to delete the src
2029   * @param src
2030   *          path
2031   * @param dst
2032   *          path
2033   * @param useRawLocalFileSystem
2034   *          whether to use RawLocalFileSystem as local file system or not.
2035   * 
2036   * @throws IOException
2037   *           - if any IO error
2038   */
2039  public void copyToLocalFile(boolean delSrc, Path src, Path dst,
2040      boolean useRawLocalFileSystem) throws IOException {
2041    Configuration conf = getConf();
2042    FileSystem local = null;
2043    if (useRawLocalFileSystem) {
2044      local = getLocal(conf).getRawFileSystem();
2045    } else {
2046      local = getLocal(conf);
2047    }
2048    FileUtil.copy(this, src, local, dst, delSrc, conf);
2049  }
2050
2051  /**
2052   * Returns a local File that the user can write output to.  The caller
2053   * provides both the eventual FS target name and the local working
2054   * file.  If the FS is local, we write directly into the target.  If
2055   * the FS is remote, we write into the tmp local area.
2056   * @param fsOutputFile path of output file
2057   * @param tmpLocalFile path of local tmp file
2058   */
2059  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2060    throws IOException {
2061    return tmpLocalFile;
2062  }
2063
2064  /**
2065   * Called when we're all done writing to the target.  A local FS will
2066   * do nothing, because we've written to exactly the right place.  A remote
2067   * FS will copy the contents of tmpLocalFile to the correct target at
2068   * fsOutputFile.
2069   * @param fsOutputFile path of output file
2070   * @param tmpLocalFile path to local tmp file
2071   */
2072  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2073    throws IOException {
2074    moveFromLocalFile(tmpLocalFile, fsOutputFile);
2075  }
2076
2077  /**
2078   * No more filesystem operations are needed.  Will
2079   * release any held locks.
2080   */
2081  @Override
2082  public void close() throws IOException {
2083    // delete all files that were marked as delete-on-exit.
2084    processDeleteOnExit();
2085    CACHE.remove(this.key, this);
2086  }
2087
2088  /** Return the total size of all files in the filesystem.*/
2089  public long getUsed() throws IOException{
2090    long used = 0;
2091    RemoteIterator<LocatedFileStatus> files = listFiles(new Path("/"), true);
2092    while (files.hasNext()) {
2093      used += files.next().getLen();
2094    }
2095    return used;
2096  }
2097  
2098  /**
2099   * Get the block size for a particular file.
2100   * @param f the filename
2101   * @return the number of bytes in a block
2102   */
2103  /** @deprecated Use getFileStatus() instead */
2104  @Deprecated
2105  public long getBlockSize(Path f) throws IOException {
2106    return getFileStatus(f).getBlockSize();
2107  }
2108
2109  /**
2110   * Return the number of bytes that large input files should be optimally
2111   * be split into to minimize i/o time.
2112   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2113   */
2114  @Deprecated
2115  public long getDefaultBlockSize() {
2116    // default to 32MB: large enough to minimize the impact of seeks
2117    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2118  }
2119    
2120  /** Return the number of bytes that large input files should be optimally
2121   * be split into to minimize i/o time.  The given path will be used to
2122   * locate the actual filesystem.  The full path does not have to exist.
2123   * @param f path of file
2124   * @return the default block size for the path's filesystem
2125   */
2126  public long getDefaultBlockSize(Path f) {
2127    return getDefaultBlockSize();
2128  }
2129
2130  /**
2131   * Get the default replication.
2132   * @deprecated use {@link #getDefaultReplication(Path)} instead
2133   */
2134  @Deprecated
2135  public short getDefaultReplication() { return 1; }
2136
2137  /**
2138   * Get the default replication for a path.   The given path will be used to
2139   * locate the actual filesystem.  The full path does not have to exist.
2140   * @param path of the file
2141   * @return default replication for the path's filesystem 
2142   */
2143  public short getDefaultReplication(Path path) {
2144    return getDefaultReplication();
2145  }
2146  
2147  /**
2148   * Return a file status object that represents the path.
2149   * @param f The path we want information from
2150   * @return a FileStatus object
2151   * @throws FileNotFoundException when the path does not exist;
2152   *         IOException see specific implementation
2153   */
2154  public abstract FileStatus getFileStatus(Path f) throws IOException;
2155
2156  /**
2157   * Checks if the user can access a path.  The mode specifies which access
2158   * checks to perform.  If the requested permissions are granted, then the
2159   * method returns normally.  If access is denied, then the method throws an
2160   * {@link AccessControlException}.
2161   * <p/>
2162   * The default implementation of this method calls {@link #getFileStatus(Path)}
2163   * and checks the returned permissions against the requested permissions.
2164   * Note that the getFileStatus call will be subject to authorization checks.
2165   * Typically, this requires search (execute) permissions on each directory in
2166   * the path's prefix, but this is implementation-defined.  Any file system
2167   * that provides a richer authorization model (such as ACLs) may override the
2168   * default implementation so that it checks against that model instead.
2169   * <p>
2170   * In general, applications should avoid using this method, due to the risk of
2171   * time-of-check/time-of-use race conditions.  The permissions on a file may
2172   * change immediately after the access call returns.  Most applications should
2173   * prefer running specific file system actions as the desired user represented
2174   * by a {@link UserGroupInformation}.
2175   *
2176   * @param path Path to check
2177   * @param mode type of access to check
2178   * @throws AccessControlException if access is denied
2179   * @throws FileNotFoundException if the path does not exist
2180   * @throws IOException see specific implementation
2181   */
2182  @InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
2183  public void access(Path path, FsAction mode) throws AccessControlException,
2184      FileNotFoundException, IOException {
2185    checkAccessPermissions(this.getFileStatus(path), mode);
2186  }
2187
2188  /**
2189   * This method provides the default implementation of
2190   * {@link #access(Path, FsAction)}.
2191   *
2192   * @param stat FileStatus to check
2193   * @param mode type of access to check
2194   * @throws IOException for any error
2195   */
2196  @InterfaceAudience.Private
2197  static void checkAccessPermissions(FileStatus stat, FsAction mode)
2198      throws IOException {
2199    FsPermission perm = stat.getPermission();
2200    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
2201    String user = ugi.getShortUserName();
2202    List<String> groups = Arrays.asList(ugi.getGroupNames());
2203    if (user.equals(stat.getOwner())) {
2204      if (perm.getUserAction().implies(mode)) {
2205        return;
2206      }
2207    } else if (groups.contains(stat.getGroup())) {
2208      if (perm.getGroupAction().implies(mode)) {
2209        return;
2210      }
2211    } else {
2212      if (perm.getOtherAction().implies(mode)) {
2213        return;
2214      }
2215    }
2216    throw new AccessControlException(String.format(
2217      "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
2218      stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
2219  }
2220
2221  /**
2222   * See {@link FileContext#fixRelativePart}
2223   */
2224  protected Path fixRelativePart(Path p) {
2225    if (p.isUriPathAbsolute()) {
2226      return p;
2227    } else {
2228      return new Path(getWorkingDirectory(), p);
2229    }
2230  }
2231
2232  /**
2233   * See {@link FileContext#createSymlink(Path, Path, boolean)}
2234   */
2235  public void createSymlink(final Path target, final Path link,
2236      final boolean createParent) throws AccessControlException,
2237      FileAlreadyExistsException, FileNotFoundException,
2238      ParentNotDirectoryException, UnsupportedFileSystemException, 
2239      IOException {
2240    // Supporting filesystems should override this method
2241    throw new UnsupportedOperationException(
2242        "Filesystem does not support symlinks!");
2243  }
2244
2245  /**
2246   * See {@link FileContext#getFileLinkStatus(Path)}
2247   */
2248  public FileStatus getFileLinkStatus(final Path f)
2249      throws AccessControlException, FileNotFoundException,
2250      UnsupportedFileSystemException, IOException {
2251    // Supporting filesystems should override this method
2252    return getFileStatus(f);
2253  }
2254
2255  /**
2256   * See {@link AbstractFileSystem#supportsSymlinks()}
2257   */
2258  public boolean supportsSymlinks() {
2259    return false;
2260  }
2261
2262  /**
2263   * See {@link FileContext#getLinkTarget(Path)}
2264   */
2265  public Path getLinkTarget(Path f) throws IOException {
2266    // Supporting filesystems should override this method
2267    throw new UnsupportedOperationException(
2268        "Filesystem does not support symlinks!");
2269  }
2270
2271  /**
2272   * See {@link AbstractFileSystem#getLinkTarget(Path)}
2273   */
2274  protected Path resolveLink(Path f) throws IOException {
2275    // Supporting filesystems should override this method
2276    throw new UnsupportedOperationException(
2277        "Filesystem does not support symlinks!");
2278  }
2279
2280  /**
2281   * Get the checksum of a file.
2282   *
2283   * @param f The file path
2284   * @return The file checksum.  The default return value is null,
2285   *  which indicates that no checksum algorithm is implemented
2286   *  in the corresponding FileSystem.
2287   */
2288  public FileChecksum getFileChecksum(Path f) throws IOException {
2289    return getFileChecksum(f, Long.MAX_VALUE);
2290  }
2291
2292  /**
2293   * Get the checksum of a file, from the beginning of the file till the
2294   * specific length.
2295   * @param f The file path
2296   * @param length The length of the file range for checksum calculation
2297   * @return The file checksum.
2298   */
2299  public FileChecksum getFileChecksum(Path f, final long length)
2300      throws IOException {
2301    return null;
2302  }
2303
2304  /**
2305   * Set the verify checksum flag. This is only applicable if the 
2306   * corresponding FileSystem supports checksum. By default doesn't do anything.
2307   * @param verifyChecksum
2308   */
2309  public void setVerifyChecksum(boolean verifyChecksum) {
2310    //doesn't do anything
2311  }
2312
2313  /**
2314   * Set the write checksum flag. This is only applicable if the 
2315   * corresponding FileSystem supports checksum. By default doesn't do anything.
2316   * @param writeChecksum
2317   */
2318  public void setWriteChecksum(boolean writeChecksum) {
2319    //doesn't do anything
2320  }
2321
2322  /**
2323   * Returns a status object describing the use and capacity of the
2324   * file system. If the file system has multiple partitions, the
2325   * use and capacity of the root partition is reflected.
2326   * 
2327   * @return a FsStatus object
2328   * @throws IOException
2329   *           see specific implementation
2330   */
2331  public FsStatus getStatus() throws IOException {
2332    return getStatus(null);
2333  }
2334
2335  /**
2336   * Returns a status object describing the use and capacity of the
2337   * file system. If the file system has multiple partitions, the
2338   * use and capacity of the partition pointed to by the specified
2339   * path is reflected.
2340   * @param p Path for which status should be obtained. null means
2341   * the default partition. 
2342   * @return a FsStatus object
2343   * @throws IOException
2344   *           see specific implementation
2345   */
2346  public FsStatus getStatus(Path p) throws IOException {
2347    return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2348  }
2349
2350  /**
2351   * Set permission of a path.
2352   * @param p
2353   * @param permission
2354   */
2355  public void setPermission(Path p, FsPermission permission
2356      ) throws IOException {
2357  }
2358
2359  /**
2360   * Set owner of a path (i.e. a file or a directory).
2361   * The parameters username and groupname cannot both be null.
2362   * @param p The path
2363   * @param username If it is null, the original username remains unchanged.
2364   * @param groupname If it is null, the original groupname remains unchanged.
2365   */
2366  public void setOwner(Path p, String username, String groupname
2367      ) throws IOException {
2368  }
2369
2370  /**
2371   * Set access time of a file
2372   * @param p The path
2373   * @param mtime Set the modification time of this file.
2374   *              The number of milliseconds since Jan 1, 1970. 
2375   *              A value of -1 means that this call should not set modification time.
2376   * @param atime Set the access time of this file.
2377   *              The number of milliseconds since Jan 1, 1970. 
2378   *              A value of -1 means that this call should not set access time.
2379   */
2380  public void setTimes(Path p, long mtime, long atime
2381      ) throws IOException {
2382  }
2383
2384  /**
2385   * Create a snapshot with a default name.
2386   * @param path The directory where snapshots will be taken.
2387   * @return the snapshot path.
2388   */
2389  public final Path createSnapshot(Path path) throws IOException {
2390    return createSnapshot(path, null);
2391  }
2392
2393  /**
2394   * Create a snapshot
2395   * @param path The directory where snapshots will be taken.
2396   * @param snapshotName The name of the snapshot
2397   * @return the snapshot path.
2398   */
2399  public Path createSnapshot(Path path, String snapshotName)
2400      throws IOException {
2401    throw new UnsupportedOperationException(getClass().getSimpleName()
2402        + " doesn't support createSnapshot");
2403  }
2404  
2405  /**
2406   * Rename a snapshot
2407   * @param path The directory path where the snapshot was taken
2408   * @param snapshotOldName Old name of the snapshot
2409   * @param snapshotNewName New name of the snapshot
2410   * @throws IOException
2411   */
2412  public void renameSnapshot(Path path, String snapshotOldName,
2413      String snapshotNewName) throws IOException {
2414    throw new UnsupportedOperationException(getClass().getSimpleName()
2415        + " doesn't support renameSnapshot");
2416  }
2417  
2418  /**
2419   * Delete a snapshot of a directory
2420   * @param path  The directory that the to-be-deleted snapshot belongs to
2421   * @param snapshotName The name of the snapshot
2422   */
2423  public void deleteSnapshot(Path path, String snapshotName)
2424      throws IOException {
2425    throw new UnsupportedOperationException(getClass().getSimpleName()
2426        + " doesn't support deleteSnapshot");
2427  }
2428  
2429  /**
2430   * Modifies ACL entries of files and directories.  This method can add new ACL
2431   * entries or modify the permissions on existing ACL entries.  All existing
2432   * ACL entries that are not specified in this call are retained without
2433   * changes.  (Modifications are merged into the current ACL.)
2434   *
2435   * @param path Path to modify
2436   * @param aclSpec List<AclEntry> describing modifications
2437   * @throws IOException if an ACL could not be modified
2438   */
2439  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2440      throws IOException {
2441    throw new UnsupportedOperationException(getClass().getSimpleName()
2442        + " doesn't support modifyAclEntries");
2443  }
2444
2445  /**
2446   * Removes ACL entries from files and directories.  Other ACL entries are
2447   * retained.
2448   *
2449   * @param path Path to modify
2450   * @param aclSpec List<AclEntry> describing entries to remove
2451   * @throws IOException if an ACL could not be modified
2452   */
2453  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2454      throws IOException {
2455    throw new UnsupportedOperationException(getClass().getSimpleName()
2456        + " doesn't support removeAclEntries");
2457  }
2458
2459  /**
2460   * Removes all default ACL entries from files and directories.
2461   *
2462   * @param path Path to modify
2463   * @throws IOException if an ACL could not be modified
2464   */
2465  public void removeDefaultAcl(Path path)
2466      throws IOException {
2467    throw new UnsupportedOperationException(getClass().getSimpleName()
2468        + " doesn't support removeDefaultAcl");
2469  }
2470
2471  /**
2472   * Removes all but the base ACL entries of files and directories.  The entries
2473   * for user, group, and others are retained for compatibility with permission
2474   * bits.
2475   *
2476   * @param path Path to modify
2477   * @throws IOException if an ACL could not be removed
2478   */
2479  public void removeAcl(Path path)
2480      throws IOException {
2481    throw new UnsupportedOperationException(getClass().getSimpleName()
2482        + " doesn't support removeAcl");
2483  }
2484
2485  /**
2486   * Fully replaces ACL of files and directories, discarding all existing
2487   * entries.
2488   *
2489   * @param path Path to modify
2490   * @param aclSpec List<AclEntry> describing modifications, must include entries
2491   *   for user, group, and others for compatibility with permission bits.
2492   * @throws IOException if an ACL could not be modified
2493   */
2494  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2495    throw new UnsupportedOperationException(getClass().getSimpleName()
2496        + " doesn't support setAcl");
2497  }
2498
2499  /**
2500   * Gets the ACL of a file or directory.
2501   *
2502   * @param path Path to get
2503   * @return AclStatus describing the ACL of the file or directory
2504   * @throws IOException if an ACL could not be read
2505   */
2506  public AclStatus getAclStatus(Path path) throws IOException {
2507    throw new UnsupportedOperationException(getClass().getSimpleName()
2508        + " doesn't support getAclStatus");
2509  }
2510
2511  /**
2512   * Set an xattr of a file or directory.
2513   * The name must be prefixed with the namespace followed by ".". For example,
2514   * "user.attr".
2515   * <p/>
2516   * Refer to the HDFS extended attributes user documentation for details.
2517   *
2518   * @param path Path to modify
2519   * @param name xattr name.
2520   * @param value xattr value.
2521   * @throws IOException
2522   */
2523  public void setXAttr(Path path, String name, byte[] value)
2524      throws IOException {
2525    setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
2526        XAttrSetFlag.REPLACE));
2527  }
2528
2529  /**
2530   * Set an xattr of a file or directory.
2531   * The name must be prefixed with the namespace followed by ".". For example,
2532   * "user.attr".
2533   * <p/>
2534   * Refer to the HDFS extended attributes user documentation for details.
2535   *
2536   * @param path Path to modify
2537   * @param name xattr name.
2538   * @param value xattr value.
2539   * @param flag xattr set flag
2540   * @throws IOException
2541   */
2542  public void setXAttr(Path path, String name, byte[] value,
2543      EnumSet<XAttrSetFlag> flag) throws IOException {
2544    throw new UnsupportedOperationException(getClass().getSimpleName()
2545        + " doesn't support setXAttr");
2546  }
2547
2548  /**
2549   * Get an xattr name and value for a file or directory.
2550   * The name must be prefixed with the namespace followed by ".". For example,
2551   * "user.attr".
2552   * <p/>
2553   * Refer to the HDFS extended attributes user documentation for details.
2554   *
2555   * @param path Path to get extended attribute
2556   * @param name xattr name.
2557   * @return byte[] xattr value.
2558   * @throws IOException
2559   */
2560  public byte[] getXAttr(Path path, String name) throws IOException {
2561    throw new UnsupportedOperationException(getClass().getSimpleName()
2562        + " doesn't support getXAttr");
2563  }
2564
2565  /**
2566   * Get all of the xattr name/value pairs for a file or directory.
2567   * Only those xattrs which the logged-in user has permissions to view
2568   * are returned.
2569   * <p/>
2570   * Refer to the HDFS extended attributes user documentation for details.
2571   *
2572   * @param path Path to get extended attributes
2573   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2574   * @throws IOException
2575   */
2576  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
2577    throw new UnsupportedOperationException(getClass().getSimpleName()
2578        + " doesn't support getXAttrs");
2579  }
2580
2581  /**
2582   * Get all of the xattrs name/value pairs for a file or directory.
2583   * Only those xattrs which the logged-in user has permissions to view
2584   * are returned.
2585   * <p/>
2586   * Refer to the HDFS extended attributes user documentation for details.
2587   *
2588   * @param path Path to get extended attributes
2589   * @param names XAttr names.
2590   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2591   * @throws IOException
2592   */
2593  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
2594      throws IOException {
2595    throw new UnsupportedOperationException(getClass().getSimpleName()
2596        + " doesn't support getXAttrs");
2597  }
2598
2599  /**
2600   * Get all of the xattr names for a file or directory.
2601   * Only those xattr names which the logged-in user has permissions to view
2602   * are returned.
2603   * <p/>
2604   * Refer to the HDFS extended attributes user documentation for details.
2605   *
2606   * @param path Path to get extended attributes
2607   * @return List<String> of the XAttr names of the file or directory
2608   * @throws IOException
2609   */
2610  public List<String> listXAttrs(Path path) throws IOException {
2611    throw new UnsupportedOperationException(getClass().getSimpleName()
2612            + " doesn't support listXAttrs");
2613  }
2614
2615  /**
2616   * Remove an xattr of a file or directory.
2617   * The name must be prefixed with the namespace followed by ".". For example,
2618   * "user.attr".
2619   * <p/>
2620   * Refer to the HDFS extended attributes user documentation for details.
2621   *
2622   * @param path Path to remove extended attribute
2623   * @param name xattr name
2624   * @throws IOException
2625   */
2626  public void removeXAttr(Path path, String name) throws IOException {
2627    throw new UnsupportedOperationException(getClass().getSimpleName()
2628        + " doesn't support removeXAttr");
2629  }
2630
2631  /**
2632   * Set the storage policy for a given file or directory.
2633   *
2634   * @param src file or directory path.
2635   * @param policyName the name of the target storage policy. The list
2636   *                   of supported Storage policies can be retrieved
2637   *                   via {@link #getAllStoragePolicies}.
2638   * @throws IOException
2639   */
2640  public void setStoragePolicy(final Path src, final String policyName)
2641      throws IOException {
2642    throw new UnsupportedOperationException(getClass().getSimpleName()
2643        + " doesn't support setStoragePolicy");
2644  }
2645
2646  /**
2647   * Query the effective storage policy ID for the given file or directory.
2648   *
2649   * @param src file or directory path.
2650   * @return storage policy for give file.
2651   * @throws IOException
2652   */
2653  public BlockStoragePolicySpi getStoragePolicy(final Path src)
2654      throws IOException {
2655    throw new UnsupportedOperationException(getClass().getSimpleName()
2656        + " doesn't support getStoragePolicy");
2657  }
2658
2659  /**
2660   * Retrieve all the storage policies supported by this file system.
2661   *
2662   * @return all storage policies supported by this filesystem.
2663   * @throws IOException
2664   */
2665  public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
2666      throws IOException {
2667    throw new UnsupportedOperationException(getClass().getSimpleName()
2668        + " doesn't support getAllStoragePolicies");
2669  }
2670
2671  // making it volatile to be able to do a double checked locking
2672  private volatile static boolean FILE_SYSTEMS_LOADED = false;
2673
2674  private static final Map<String, Class<? extends FileSystem>>
2675    SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2676
2677  private static void loadFileSystems() {
2678    synchronized (FileSystem.class) {
2679      if (!FILE_SYSTEMS_LOADED) {
2680        ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2681        for (FileSystem fs : serviceLoader) {
2682          SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2683        }
2684        FILE_SYSTEMS_LOADED = true;
2685      }
2686    }
2687  }
2688
2689  public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2690      Configuration conf) throws IOException {
2691    if (!FILE_SYSTEMS_LOADED) {
2692      loadFileSystems();
2693    }
2694    Class<? extends FileSystem> clazz = null;
2695    if (conf != null) {
2696      clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2697    }
2698    if (clazz == null) {
2699      clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2700    }
2701    if (clazz == null) {
2702      throw new IOException("No FileSystem for scheme: " + scheme);
2703    }
2704    return clazz;
2705  }
2706
2707  private static FileSystem createFileSystem(URI uri, Configuration conf
2708      ) throws IOException {
2709    TraceScope scope = Trace.startSpan("FileSystem#createFileSystem");
2710    Span span = scope.getSpan();
2711    if (span != null) {
2712      span.addKVAnnotation("scheme", uri.getScheme());
2713    }
2714    try {
2715      Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2716      FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2717      fs.initialize(uri, conf);
2718      return fs;
2719    } finally {
2720      scope.close();
2721    }
2722  }
2723
2724  /** Caching FileSystem objects */
2725  static class Cache {
2726    private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2727
2728    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2729    private final Set<Key> toAutoClose = new HashSet<Key>();
2730
2731    /** A variable that makes all objects in the cache unique */
2732    private static AtomicLong unique = new AtomicLong(1);
2733
2734    FileSystem get(URI uri, Configuration conf) throws IOException{
2735      Key key = new Key(uri, conf);
2736      return getInternal(uri, conf, key);
2737    }
2738
2739    /** The objects inserted into the cache using this method are all unique */
2740    FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2741      Key key = new Key(uri, conf, unique.getAndIncrement());
2742      return getInternal(uri, conf, key);
2743    }
2744
2745    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2746      FileSystem fs;
2747      synchronized (this) {
2748        fs = map.get(key);
2749      }
2750      if (fs != null) {
2751        return fs;
2752      }
2753
2754      fs = createFileSystem(uri, conf);
2755      synchronized (this) { // refetch the lock again
2756        FileSystem oldfs = map.get(key);
2757        if (oldfs != null) { // a file system is created while lock is releasing
2758          fs.close(); // close the new file system
2759          return oldfs;  // return the old file system
2760        }
2761        
2762        // now insert the new file system into the map
2763        if (map.isEmpty()
2764                && !ShutdownHookManager.get().isShutdownInProgress()) {
2765          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2766        }
2767        fs.key = key;
2768        map.put(key, fs);
2769        if (conf.getBoolean("fs.automatic.close", true)) {
2770          toAutoClose.add(key);
2771        }
2772        return fs;
2773      }
2774    }
2775
2776    synchronized void remove(Key key, FileSystem fs) {
2777      FileSystem cachedFs = map.remove(key);
2778      if (fs == cachedFs) {
2779        toAutoClose.remove(key);
2780      } else if (cachedFs != null) {
2781        map.put(key, cachedFs);
2782      }
2783    }
2784
2785    synchronized void closeAll() throws IOException {
2786      closeAll(false);
2787    }
2788
2789    /**
2790     * Close all FileSystem instances in the Cache.
2791     * @param onlyAutomatic only close those that are marked for automatic closing
2792     */
2793    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2794      List<IOException> exceptions = new ArrayList<IOException>();
2795
2796      // Make a copy of the keys in the map since we'll be modifying
2797      // the map while iterating over it, which isn't safe.
2798      List<Key> keys = new ArrayList<Key>();
2799      keys.addAll(map.keySet());
2800
2801      for (Key key : keys) {
2802        final FileSystem fs = map.get(key);
2803
2804        if (onlyAutomatic && !toAutoClose.contains(key)) {
2805          continue;
2806        }
2807
2808        //remove from cache
2809        map.remove(key);
2810        toAutoClose.remove(key);
2811
2812        if (fs != null) {
2813          try {
2814            fs.close();
2815          }
2816          catch(IOException ioe) {
2817            exceptions.add(ioe);
2818          }
2819        }
2820      }
2821
2822      if (!exceptions.isEmpty()) {
2823        throw MultipleIOException.createIOException(exceptions);
2824      }
2825    }
2826
2827    private class ClientFinalizer implements Runnable {
2828      @Override
2829      public synchronized void run() {
2830        try {
2831          closeAll(true);
2832        } catch (IOException e) {
2833          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2834        }
2835      }
2836    }
2837
2838    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2839      List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2840      //Make a pass over the list and collect the filesystems to close
2841      //we cannot close inline since close() removes the entry from the Map
2842      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2843        final Key key = entry.getKey();
2844        final FileSystem fs = entry.getValue();
2845        if (ugi.equals(key.ugi) && fs != null) {
2846          targetFSList.add(fs);   
2847        }
2848      }
2849      List<IOException> exceptions = new ArrayList<IOException>();
2850      //now make a pass over the target list and close each
2851      for (FileSystem fs : targetFSList) {
2852        try {
2853          fs.close();
2854        }
2855        catch(IOException ioe) {
2856          exceptions.add(ioe);
2857        }
2858      }
2859      if (!exceptions.isEmpty()) {
2860        throw MultipleIOException.createIOException(exceptions);
2861      }
2862    }
2863
2864    /** FileSystem.Cache.Key */
2865    static class Key {
2866      final String scheme;
2867      final String authority;
2868      final UserGroupInformation ugi;
2869      final long unique;   // an artificial way to make a key unique
2870
2871      Key(URI uri, Configuration conf) throws IOException {
2872        this(uri, conf, 0);
2873      }
2874
2875      Key(URI uri, Configuration conf, long unique) throws IOException {
2876        scheme = uri.getScheme()==null ?
2877            "" : StringUtils.toLowerCase(uri.getScheme());
2878        authority = uri.getAuthority()==null ?
2879            "" : StringUtils.toLowerCase(uri.getAuthority());
2880        this.unique = unique;
2881        
2882        this.ugi = UserGroupInformation.getCurrentUser();
2883      }
2884
2885      @Override
2886      public int hashCode() {
2887        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2888      }
2889
2890      static boolean isEqual(Object a, Object b) {
2891        return a == b || (a != null && a.equals(b));        
2892      }
2893
2894      @Override
2895      public boolean equals(Object obj) {
2896        if (obj == this) {
2897          return true;
2898        }
2899        if (obj != null && obj instanceof Key) {
2900          Key that = (Key)obj;
2901          return isEqual(this.scheme, that.scheme)
2902                 && isEqual(this.authority, that.authority)
2903                 && isEqual(this.ugi, that.ugi)
2904                 && (this.unique == that.unique);
2905        }
2906        return false;        
2907      }
2908
2909      @Override
2910      public String toString() {
2911        return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
2912      }
2913    }
2914  }
2915  
2916  /**
2917   * Tracks statistics about how many reads, writes, and so forth have been
2918   * done in a FileSystem.
2919   * 
2920   * Since there is only one of these objects per FileSystem, there will 
2921   * typically be many threads writing to this object.  Almost every operation
2922   * on an open file will involve a write to this object.  In contrast, reading
2923   * statistics is done infrequently by most programs, and not at all by others.
2924   * Hence, this is optimized for writes.
2925   * 
2926   * Each thread writes to its own thread-local area of memory.  This removes 
2927   * contention and allows us to scale up to many, many threads.  To read
2928   * statistics, the reader thread totals up the contents of all of the 
2929   * thread-local data areas.
2930   */
2931  public static final class Statistics {
2932    /**
2933     * Statistics data.
2934     * 
2935     * There is only a single writer to thread-local StatisticsData objects.
2936     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
2937     * to prevent lost updates.
2938     * The Java specification guarantees that updates to volatile longs will
2939     * be perceived as atomic with respect to other threads, which is all we
2940     * need.
2941     */
2942    public static class StatisticsData {
2943      volatile long bytesRead;
2944      volatile long bytesWritten;
2945      volatile int readOps;
2946      volatile int largeReadOps;
2947      volatile int writeOps;
2948
2949      /**
2950       * Add another StatisticsData object to this one.
2951       */
2952      void add(StatisticsData other) {
2953        this.bytesRead += other.bytesRead;
2954        this.bytesWritten += other.bytesWritten;
2955        this.readOps += other.readOps;
2956        this.largeReadOps += other.largeReadOps;
2957        this.writeOps += other.writeOps;
2958      }
2959
2960      /**
2961       * Negate the values of all statistics.
2962       */
2963      void negate() {
2964        this.bytesRead = -this.bytesRead;
2965        this.bytesWritten = -this.bytesWritten;
2966        this.readOps = -this.readOps;
2967        this.largeReadOps = -this.largeReadOps;
2968        this.writeOps = -this.writeOps;
2969      }
2970
2971      @Override
2972      public String toString() {
2973        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
2974            + readOps + " read ops, " + largeReadOps + " large read ops, "
2975            + writeOps + " write ops";
2976      }
2977      
2978      public long getBytesRead() {
2979        return bytesRead;
2980      }
2981      
2982      public long getBytesWritten() {
2983        return bytesWritten;
2984      }
2985      
2986      public int getReadOps() {
2987        return readOps;
2988      }
2989      
2990      public int getLargeReadOps() {
2991        return largeReadOps;
2992      }
2993      
2994      public int getWriteOps() {
2995        return writeOps;
2996      }
2997    }
2998
2999    private interface StatisticsAggregator<T> {
3000      void accept(StatisticsData data);
3001      T aggregate();
3002    }
3003
3004    private final String scheme;
3005
3006    /**
3007     * rootData is data that doesn't belong to any thread, but will be added
3008     * to the totals.  This is useful for making copies of Statistics objects,
3009     * and for storing data that pertains to threads that have been garbage
3010     * collected.  Protected by the Statistics lock.
3011     */
3012    private final StatisticsData rootData;
3013
3014    /**
3015     * Thread-local data.
3016     */
3017    private final ThreadLocal<StatisticsData> threadData;
3018
3019    /**
3020     * Set of all thread-local data areas.  Protected by the Statistics lock.
3021     * The references to the statistics data are kept using phantom references
3022     * to the associated threads. Proper clean-up is performed by the cleaner
3023     * thread when the threads are garbage collected.
3024     */
3025    private final Set<StatisticsDataReference> allData;
3026
3027    /**
3028     * Global reference queue and a cleaner thread that manage statistics data
3029     * references from all filesystem instances.
3030     */
3031    private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
3032    private static final Thread STATS_DATA_CLEANER;
3033
3034    static {
3035      STATS_DATA_REF_QUEUE = new ReferenceQueue<Thread>();
3036      // start a single daemon cleaner thread
3037      STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
3038      STATS_DATA_CLEANER.
3039          setName(StatisticsDataReferenceCleaner.class.getName());
3040      STATS_DATA_CLEANER.setDaemon(true);
3041      STATS_DATA_CLEANER.start();
3042    }
3043
3044    public Statistics(String scheme) {
3045      this.scheme = scheme;
3046      this.rootData = new StatisticsData();
3047      this.threadData = new ThreadLocal<StatisticsData>();
3048      this.allData = new HashSet<StatisticsDataReference>();
3049    }
3050
3051    /**
3052     * Copy constructor.
3053     * 
3054     * @param other    The input Statistics object which is cloned.
3055     */
3056    public Statistics(Statistics other) {
3057      this.scheme = other.scheme;
3058      this.rootData = new StatisticsData();
3059      other.visitAll(new StatisticsAggregator<Void>() {
3060        @Override
3061        public void accept(StatisticsData data) {
3062          rootData.add(data);
3063        }
3064
3065        public Void aggregate() {
3066          return null;
3067        }
3068      });
3069      this.threadData = new ThreadLocal<StatisticsData>();
3070      this.allData = new HashSet<StatisticsDataReference>();
3071    }
3072
3073    /**
3074     * A phantom reference to a thread that also includes the data associated
3075     * with that thread. On the thread being garbage collected, it is enqueued
3076     * to the reference queue for clean-up.
3077     */
3078    private class StatisticsDataReference extends PhantomReference<Thread> {
3079      private final StatisticsData data;
3080
3081      public StatisticsDataReference(StatisticsData data, Thread thread) {
3082        super(thread, STATS_DATA_REF_QUEUE);
3083        this.data = data;
3084      }
3085
3086      public StatisticsData getData() {
3087        return data;
3088      }
3089
3090      /**
3091       * Performs clean-up action when the associated thread is garbage
3092       * collected.
3093       */
3094      public void cleanUp() {
3095        // use the statistics lock for safety
3096        synchronized (Statistics.this) {
3097          /*
3098           * If the thread that created this thread-local data no longer exists,
3099           * remove the StatisticsData from our list and fold the values into
3100           * rootData.
3101           */
3102          rootData.add(data);
3103          allData.remove(this);
3104        }
3105      }
3106    }
3107
3108    /**
3109     * Background action to act on references being removed.
3110     */
3111    private static class StatisticsDataReferenceCleaner implements Runnable {
3112      @Override
3113      public void run() {
3114        while (true) {
3115          try {
3116            StatisticsDataReference ref =
3117                (StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
3118            ref.cleanUp();
3119          } catch (Throwable th) {
3120            // the cleaner thread should continue to run even if there are
3121            // exceptions, including InterruptedException
3122            LOG.warn("exception in the cleaner thread but it will continue to "
3123                + "run", th);
3124          }
3125        }
3126      }
3127    }
3128
3129    /**
3130     * Get or create the thread-local data associated with the current thread.
3131     */
3132    public StatisticsData getThreadStatistics() {
3133      StatisticsData data = threadData.get();
3134      if (data == null) {
3135        data = new StatisticsData();
3136        threadData.set(data);
3137        StatisticsDataReference ref =
3138            new StatisticsDataReference(data, Thread.currentThread());
3139        synchronized(this) {
3140          allData.add(ref);
3141        }
3142      }
3143      return data;
3144    }
3145
3146    /**
3147     * Increment the bytes read in the statistics
3148     * @param newBytes the additional bytes read
3149     */
3150    public void incrementBytesRead(long newBytes) {
3151      getThreadStatistics().bytesRead += newBytes;
3152    }
3153    
3154    /**
3155     * Increment the bytes written in the statistics
3156     * @param newBytes the additional bytes written
3157     */
3158    public void incrementBytesWritten(long newBytes) {
3159      getThreadStatistics().bytesWritten += newBytes;
3160    }
3161    
3162    /**
3163     * Increment the number of read operations
3164     * @param count number of read operations
3165     */
3166    public void incrementReadOps(int count) {
3167      getThreadStatistics().readOps += count;
3168    }
3169
3170    /**
3171     * Increment the number of large read operations
3172     * @param count number of large read operations
3173     */
3174    public void incrementLargeReadOps(int count) {
3175      getThreadStatistics().largeReadOps += count;
3176    }
3177
3178    /**
3179     * Increment the number of write operations
3180     * @param count number of write operations
3181     */
3182    public void incrementWriteOps(int count) {
3183      getThreadStatistics().writeOps += count;
3184    }
3185
3186    /**
3187     * Apply the given aggregator to all StatisticsData objects associated with
3188     * this Statistics object.
3189     *
3190     * For each StatisticsData object, we will call accept on the visitor.
3191     * Finally, at the end, we will call aggregate to get the final total. 
3192     *
3193     * @param         The visitor to use.
3194     * @return        The total.
3195     */
3196    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
3197      visitor.accept(rootData);
3198      for (StatisticsDataReference ref: allData) {
3199        StatisticsData data = ref.getData();
3200        visitor.accept(data);
3201      }
3202      return visitor.aggregate();
3203    }
3204
3205    /**
3206     * Get the total number of bytes read
3207     * @return the number of bytes
3208     */
3209    public long getBytesRead() {
3210      return visitAll(new StatisticsAggregator<Long>() {
3211        private long bytesRead = 0;
3212
3213        @Override
3214        public void accept(StatisticsData data) {
3215          bytesRead += data.bytesRead;
3216        }
3217
3218        public Long aggregate() {
3219          return bytesRead;
3220        }
3221      });
3222    }
3223    
3224    /**
3225     * Get the total number of bytes written
3226     * @return the number of bytes
3227     */
3228    public long getBytesWritten() {
3229      return visitAll(new StatisticsAggregator<Long>() {
3230        private long bytesWritten = 0;
3231
3232        @Override
3233        public void accept(StatisticsData data) {
3234          bytesWritten += data.bytesWritten;
3235        }
3236
3237        public Long aggregate() {
3238          return bytesWritten;
3239        }
3240      });
3241    }
3242    
3243    /**
3244     * Get the number of file system read operations such as list files
3245     * @return number of read operations
3246     */
3247    public int getReadOps() {
3248      return visitAll(new StatisticsAggregator<Integer>() {
3249        private int readOps = 0;
3250
3251        @Override
3252        public void accept(StatisticsData data) {
3253          readOps += data.readOps;
3254          readOps += data.largeReadOps;
3255        }
3256
3257        public Integer aggregate() {
3258          return readOps;
3259        }
3260      });
3261    }
3262
3263    /**
3264     * Get the number of large file system read operations such as list files
3265     * under a large directory
3266     * @return number of large read operations
3267     */
3268    public int getLargeReadOps() {
3269      return visitAll(new StatisticsAggregator<Integer>() {
3270        private int largeReadOps = 0;
3271
3272        @Override
3273        public void accept(StatisticsData data) {
3274          largeReadOps += data.largeReadOps;
3275        }
3276
3277        public Integer aggregate() {
3278          return largeReadOps;
3279        }
3280      });
3281    }
3282
3283    /**
3284     * Get the number of file system write operations such as create, append 
3285     * rename etc.
3286     * @return number of write operations
3287     */
3288    public int getWriteOps() {
3289      return visitAll(new StatisticsAggregator<Integer>() {
3290        private int writeOps = 0;
3291
3292        @Override
3293        public void accept(StatisticsData data) {
3294          writeOps += data.writeOps;
3295        }
3296
3297        public Integer aggregate() {
3298          return writeOps;
3299        }
3300      });
3301    }
3302
3303
3304    @Override
3305    public String toString() {
3306      return visitAll(new StatisticsAggregator<String>() {
3307        private StatisticsData total = new StatisticsData();
3308
3309        @Override
3310        public void accept(StatisticsData data) {
3311          total.add(data);
3312        }
3313
3314        public String aggregate() {
3315          return total.toString();
3316        }
3317      });
3318    }
3319
3320    /**
3321     * Resets all statistics to 0.
3322     *
3323     * In order to reset, we add up all the thread-local statistics data, and
3324     * set rootData to the negative of that.
3325     *
3326     * This may seem like a counterintuitive way to reset the statsitics.  Why
3327     * can't we just zero out all the thread-local data?  Well, thread-local
3328     * data can only be modified by the thread that owns it.  If we tried to
3329     * modify the thread-local data from this thread, our modification might get
3330     * interleaved with a read-modify-write operation done by the thread that
3331     * owns the data.  That would result in our update getting lost.
3332     *
3333     * The approach used here avoids this problem because it only ever reads
3334     * (not writes) the thread-local data.  Both reads and writes to rootData
3335     * are done under the lock, so we're free to modify rootData from any thread
3336     * that holds the lock.
3337     */
3338    public void reset() {
3339      visitAll(new StatisticsAggregator<Void>() {
3340        private StatisticsData total = new StatisticsData();
3341
3342        @Override
3343        public void accept(StatisticsData data) {
3344          total.add(data);
3345        }
3346
3347        public Void aggregate() {
3348          total.negate();
3349          rootData.add(total);
3350          return null;
3351        }
3352      });
3353    }
3354    
3355    /**
3356     * Get the uri scheme associated with this statistics object.
3357     * @return the schema associated with this set of statistics
3358     */
3359    public String getScheme() {
3360      return scheme;
3361    }
3362
3363    @VisibleForTesting
3364    synchronized int getAllThreadLocalDataSize() {
3365      return allData.size();
3366    }
3367  }
3368  
3369  /**
3370   * Get the Map of Statistics object indexed by URI Scheme.
3371   * @return a Map having a key as URI scheme and value as Statistics object
3372   * @deprecated use {@link #getAllStatistics} instead
3373   */
3374  @Deprecated
3375  public static synchronized Map<String, Statistics> getStatistics() {
3376    Map<String, Statistics> result = new HashMap<String, Statistics>();
3377    for(Statistics stat: statisticsTable.values()) {
3378      result.put(stat.getScheme(), stat);
3379    }
3380    return result;
3381  }
3382
3383  /**
3384   * Return the FileSystem classes that have Statistics
3385   */
3386  public static synchronized List<Statistics> getAllStatistics() {
3387    return new ArrayList<Statistics>(statisticsTable.values());
3388  }
3389  
3390  /**
3391   * Get the statistics for a particular file system
3392   * @param cls the class to lookup
3393   * @return a statistics object
3394   */
3395  public static synchronized 
3396  Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
3397    Statistics result = statisticsTable.get(cls);
3398    if (result == null) {
3399      result = new Statistics(scheme);
3400      statisticsTable.put(cls, result);
3401    }
3402    return result;
3403  }
3404  
3405  /**
3406   * Reset all statistics for all file systems
3407   */
3408  public static synchronized void clearStatistics() {
3409    for(Statistics stat: statisticsTable.values()) {
3410      stat.reset();
3411    }
3412  }
3413
3414  /**
3415   * Print all statistics for all file systems
3416   */
3417  public static synchronized
3418  void printStatistics() throws IOException {
3419    for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3420            statisticsTable.entrySet()) {
3421      System.out.println("  FileSystem " + pair.getKey().getName() + 
3422                         ": " + pair.getValue());
3423    }
3424  }
3425
3426  // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3427  private static boolean symlinksEnabled = false;
3428
3429  private static Configuration conf = null;
3430
3431  @VisibleForTesting
3432  public static boolean areSymlinksEnabled() {
3433    return symlinksEnabled;
3434  }
3435
3436  @VisibleForTesting
3437  public static void enableSymlinks() {
3438    symlinksEnabled = true;
3439  }
3440}