001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.fs;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.lang.ref.PhantomReference;
024import java.lang.ref.ReferenceQueue;
025import java.net.URI;
026import java.net.URISyntaxException;
027import java.security.PrivilegedExceptionAction;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.EnumSet;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.IdentityHashMap;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.NoSuchElementException;
039import java.util.ServiceLoader;
040import java.util.Set;
041import java.util.Stack;
042import java.util.TreeSet;
043import java.util.concurrent.atomic.AtomicLong;
044
045import org.apache.commons.logging.Log;
046import org.apache.commons.logging.LogFactory;
047import org.apache.hadoop.classification.InterfaceAudience;
048import org.apache.hadoop.classification.InterfaceStability;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.conf.Configured;
051import org.apache.hadoop.fs.Options.ChecksumOpt;
052import org.apache.hadoop.fs.Options.Rename;
053import org.apache.hadoop.fs.permission.AclEntry;
054import org.apache.hadoop.fs.permission.AclStatus;
055import org.apache.hadoop.fs.permission.FsAction;
056import org.apache.hadoop.fs.permission.FsPermission;
057import org.apache.hadoop.io.MultipleIOException;
058import org.apache.hadoop.io.Text;
059import org.apache.hadoop.net.NetUtils;
060import org.apache.hadoop.security.AccessControlException;
061import org.apache.hadoop.security.Credentials;
062import org.apache.hadoop.security.SecurityUtil;
063import org.apache.hadoop.security.UserGroupInformation;
064import org.apache.hadoop.security.token.Token;
065import org.apache.hadoop.util.DataChecksum;
066import org.apache.hadoop.util.Progressable;
067import org.apache.hadoop.util.ReflectionUtils;
068import org.apache.hadoop.util.ShutdownHookManager;
069import org.apache.hadoop.util.StringUtils;
070import org.apache.htrace.Span;
071import org.apache.htrace.Trace;
072import org.apache.htrace.TraceScope;
073
074import com.google.common.annotations.VisibleForTesting;
075
076/****************************************************************
077 * An abstract base class for a fairly generic filesystem.  It
078 * may be implemented as a distributed filesystem, or as a "local"
079 * one that reflects the locally-connected disk.  The local version
080 * exists for small Hadoop instances and for testing.
081 *
082 * <p>
083 *
084 * All user code that may potentially use the Hadoop Distributed
085 * File System should be written to use a FileSystem object.  The
086 * Hadoop DFS is a multi-machine system that appears as a single
087 * disk.  It's useful because of its fault tolerance and potentially
088 * very large capacity.
089 * 
090 * <p>
091 * The local implementation is {@link LocalFileSystem} and distributed
092 * implementation is DistributedFileSystem.
093 *****************************************************************/
094@InterfaceAudience.Public
095@InterfaceStability.Stable
096public abstract class FileSystem extends Configured implements Closeable {
097  public static final String FS_DEFAULT_NAME_KEY = 
098                   CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
099  public static final String DEFAULT_FS = 
100                   CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
101
102  public static final Log LOG = LogFactory.getLog(FileSystem.class);
103
104  /**
105   * Priority of the FileSystem shutdown hook.
106   */
107  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
108
109  /** FileSystem cache */
110  static final Cache CACHE = new Cache();
111
112  /** The key this instance is stored under in the cache. */
113  private Cache.Key key;
114
115  /** Recording statistics per a FileSystem class */
116  private static final Map<Class<? extends FileSystem>, Statistics> 
117    statisticsTable =
118      new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
119  
120  /**
121   * The statistics for this file system.
122   */
123  protected Statistics statistics;
124
125  /**
126   * A cache of files that should be deleted when filsystem is closed
127   * or the JVM is exited.
128   */
129  private Set<Path> deleteOnExit = new TreeSet<Path>();
130  
131  boolean resolveSymlinks;
132  /**
133   * This method adds a file system for testing so that we can find it later. It
134   * is only for testing.
135   * @param uri the uri to store it under
136   * @param conf the configuration to store it under
137   * @param fs the file system to store
138   * @throws IOException
139   */
140  static void addFileSystemForTesting(URI uri, Configuration conf,
141      FileSystem fs) throws IOException {
142    CACHE.map.put(new Cache.Key(uri, conf), fs);
143  }
144
145  /**
146   * Get a filesystem instance based on the uri, the passed
147   * configuration and the user
148   * @param uri of the filesystem
149   * @param conf the configuration to use
150   * @param user to perform the get as
151   * @return the filesystem instance
152   * @throws IOException
153   * @throws InterruptedException
154   */
155  public static FileSystem get(final URI uri, final Configuration conf,
156        final String user) throws IOException, InterruptedException {
157    String ticketCachePath =
158      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
159    UserGroupInformation ugi =
160        UserGroupInformation.getBestUGI(ticketCachePath, user);
161    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
162      @Override
163      public FileSystem run() throws IOException {
164        return get(uri, conf);
165      }
166    });
167  }
168
169  /**
170   * Returns the configured filesystem implementation.
171   * @param conf the configuration to use
172   */
173  public static FileSystem get(Configuration conf) throws IOException {
174    return get(getDefaultUri(conf), conf);
175  }
176  
177  /** Get the default filesystem URI from a configuration.
178   * @param conf the configuration to use
179   * @return the uri of the default filesystem
180   */
181  public static URI getDefaultUri(Configuration conf) {
182    return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));
183  }
184
185  /** Set the default filesystem URI in a configuration.
186   * @param conf the configuration to alter
187   * @param uri the new default filesystem uri
188   */
189  public static void setDefaultUri(Configuration conf, URI uri) {
190    conf.set(FS_DEFAULT_NAME_KEY, uri.toString());
191  }
192
193  /** Set the default filesystem URI in a configuration.
194   * @param conf the configuration to alter
195   * @param uri the new default filesystem uri
196   */
197  public static void setDefaultUri(Configuration conf, String uri) {
198    setDefaultUri(conf, URI.create(fixName(uri)));
199  }
200
201  /** Called after a new FileSystem instance is constructed.
202   * @param name a uri whose authority section names the host, port, etc.
203   *   for this FileSystem
204   * @param conf the configuration
205   */
206  public void initialize(URI name, Configuration conf) throws IOException {
207    statistics = getStatistics(name.getScheme(), getClass());    
208    resolveSymlinks = conf.getBoolean(
209        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_KEY,
210        CommonConfigurationKeys.FS_CLIENT_RESOLVE_REMOTE_SYMLINKS_DEFAULT);
211  }
212
213  /**
214   * Return the protocol scheme for the FileSystem.
215   * <p/>
216   * This implementation throws an <code>UnsupportedOperationException</code>.
217   *
218   * @return the protocol scheme for the FileSystem.
219   */
220  public String getScheme() {
221    throw new UnsupportedOperationException("Not implemented by the " + getClass().getSimpleName() + " FileSystem implementation");
222  }
223
224  /** Returns a URI whose scheme and authority identify this FileSystem.*/
225  public abstract URI getUri();
226  
227  /**
228   * Return a canonicalized form of this FileSystem's URI.
229   * 
230   * The default implementation simply calls {@link #canonicalizeUri(URI)}
231   * on the filesystem's own URI, so subclasses typically only need to
232   * implement that method.
233   *
234   * @see #canonicalizeUri(URI)
235   */
236  protected URI getCanonicalUri() {
237    return canonicalizeUri(getUri());
238  }
239  
240  /**
241   * Canonicalize the given URI.
242   * 
243   * This is filesystem-dependent, but may for example consist of
244   * canonicalizing the hostname using DNS and adding the default
245   * port if not specified.
246   * 
247   * The default implementation simply fills in the default port if
248   * not specified and if the filesystem has a default port.
249   *
250   * @return URI
251   * @see NetUtils#getCanonicalUri(URI, int)
252   */
253  protected URI canonicalizeUri(URI uri) {
254    if (uri.getPort() == -1 && getDefaultPort() > 0) {
255      // reconstruct the uri with the default port set
256      try {
257        uri = new URI(uri.getScheme(), uri.getUserInfo(),
258            uri.getHost(), getDefaultPort(),
259            uri.getPath(), uri.getQuery(), uri.getFragment());
260      } catch (URISyntaxException e) {
261        // Should never happen!
262        throw new AssertionError("Valid URI became unparseable: " +
263            uri);
264      }
265    }
266    
267    return uri;
268  }
269  
270  /**
271   * Get the default port for this file system.
272   * @return the default port or 0 if there isn't one
273   */
274  protected int getDefaultPort() {
275    return 0;
276  }
277
278  protected static FileSystem getFSofPath(final Path absOrFqPath,
279      final Configuration conf)
280      throws UnsupportedFileSystemException, IOException {
281    absOrFqPath.checkNotSchemeWithRelative();
282    absOrFqPath.checkNotRelative();
283
284    // Uses the default file system if not fully qualified
285    return get(absOrFqPath.toUri(), conf);
286  }
287
288  /**
289   * Get a canonical service name for this file system.  The token cache is
290   * the only user of the canonical service name, and uses it to lookup this
291   * filesystem's service tokens.
292   * If file system provides a token of its own then it must have a canonical
293   * name, otherwise canonical name can be null.
294   * 
295   * Default Impl: If the file system has child file systems 
296   * (such as an embedded file system) then it is assumed that the fs has no
297   * tokens of its own and hence returns a null name; otherwise a service
298   * name is built using Uri and port.
299   * 
300   * @return a service string that uniquely identifies this file system, null
301   *         if the filesystem does not implement tokens
302   * @see SecurityUtil#buildDTServiceName(URI, int) 
303   */
304  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
305  public String getCanonicalServiceName() {
306    return (getChildFileSystems() == null)
307      ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
308      : null;
309  }
310
311  /** @deprecated call #getUri() instead.*/
312  @Deprecated
313  public String getName() { return getUri().toString(); }
314
315  /** @deprecated call #get(URI,Configuration) instead. */
316  @Deprecated
317  public static FileSystem getNamed(String name, Configuration conf)
318    throws IOException {
319    return get(URI.create(fixName(name)), conf);
320  }
321  
322  /** Update old-format filesystem names, for back-compatibility.  This should
323   * eventually be replaced with a checkName() method that throws an exception
324   * for old-format names. */ 
325  private static String fixName(String name) {
326    // convert old-format name to new-format name
327    if (name.equals("local")) {         // "local" is now "file:///".
328      LOG.warn("\"local\" is a deprecated filesystem name."
329               +" Use \"file:///\" instead.");
330      name = "file:///";
331    } else if (name.indexOf('/')==-1) {   // unqualified is "hdfs://"
332      LOG.warn("\""+name+"\" is a deprecated filesystem name."
333               +" Use \"hdfs://"+name+"/\" instead.");
334      name = "hdfs://"+name;
335    }
336    return name;
337  }
338
339  /**
340   * Get the local file system.
341   * @param conf the configuration to configure the file system with
342   * @return a LocalFileSystem
343   */
344  public static LocalFileSystem getLocal(Configuration conf)
345    throws IOException {
346    return (LocalFileSystem)get(LocalFileSystem.NAME, conf);
347  }
348
349  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
350   * of the URI determines a configuration property name,
351   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
352   * The entire URI is passed to the FileSystem instance's initialize method.
353   */
354  public static FileSystem get(URI uri, Configuration conf) throws IOException {
355    String scheme = uri.getScheme();
356    String authority = uri.getAuthority();
357
358    if (scheme == null && authority == null) {     // use default FS
359      return get(conf);
360    }
361
362    if (scheme != null && authority == null) {     // no authority
363      URI defaultUri = getDefaultUri(conf);
364      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
365          && defaultUri.getAuthority() != null) {  // & default has authority
366        return get(defaultUri, conf);              // return default
367      }
368    }
369    
370    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
371    if (conf.getBoolean(disableCacheName, false)) {
372      return createFileSystem(uri, conf);
373    }
374
375    return CACHE.get(uri, conf);
376  }
377
378  /**
379   * Returns the FileSystem for this URI's scheme and authority and the 
380   * passed user. Internally invokes {@link #newInstance(URI, Configuration)}
381   * @param uri of the filesystem
382   * @param conf the configuration to use
383   * @param user to perform the get as
384   * @return filesystem instance
385   * @throws IOException
386   * @throws InterruptedException
387   */
388  public static FileSystem newInstance(final URI uri, final Configuration conf,
389      final String user) throws IOException, InterruptedException {
390    String ticketCachePath =
391      conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
392    UserGroupInformation ugi =
393        UserGroupInformation.getBestUGI(ticketCachePath, user);
394    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
395      @Override
396      public FileSystem run() throws IOException {
397        return newInstance(uri,conf); 
398      }
399    });
400  }
401  /** Returns the FileSystem for this URI's scheme and authority.  The scheme
402   * of the URI determines a configuration property name,
403   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
404   * The entire URI is passed to the FileSystem instance's initialize method.
405   * This always returns a new FileSystem object.
406   */
407  public static FileSystem newInstance(URI uri, Configuration conf) throws IOException {
408    String scheme = uri.getScheme();
409    String authority = uri.getAuthority();
410
411    if (scheme == null) {                       // no scheme: use default FS
412      return newInstance(conf);
413    }
414
415    if (authority == null) {                       // no authority
416      URI defaultUri = getDefaultUri(conf);
417      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
418          && defaultUri.getAuthority() != null) {  // & default has authority
419        return newInstance(defaultUri, conf);              // return default
420      }
421    }
422    return CACHE.getUnique(uri, conf);
423  }
424
425  /** Returns a unique configured filesystem implementation.
426   * This always returns a new FileSystem object.
427   * @param conf the configuration to use
428   */
429  public static FileSystem newInstance(Configuration conf) throws IOException {
430    return newInstance(getDefaultUri(conf), conf);
431  }
432
433  /**
434   * Get a unique local file system object
435   * @param conf the configuration to configure the file system with
436   * @return a LocalFileSystem
437   * This always returns a new FileSystem object.
438   */
439  public static LocalFileSystem newInstanceLocal(Configuration conf)
440    throws IOException {
441    return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
442  }
443
444  /**
445   * Close all cached filesystems. Be sure those filesystems are not
446   * used anymore.
447   * 
448   * @throws IOException
449   */
450  public static void closeAll() throws IOException {
451    CACHE.closeAll();
452  }
453
454  /**
455   * Close all cached filesystems for a given UGI. Be sure those filesystems 
456   * are not used anymore.
457   * @param ugi user group info to close
458   * @throws IOException
459   */
460  public static void closeAllForUGI(UserGroupInformation ugi) 
461  throws IOException {
462    CACHE.closeAll(ugi);
463  }
464
465  /** 
466   * Make sure that a path specifies a FileSystem.
467   * @param path to use
468   */
469  public Path makeQualified(Path path) {
470    checkPath(path);
471    return path.makeQualified(this.getUri(), this.getWorkingDirectory());
472  }
473    
474  /**
475   * Get a new delegation token for this file system.
476   * This is an internal method that should have been declared protected
477   * but wasn't historically.
478   * Callers should use {@link #addDelegationTokens(String, Credentials)}
479   * 
480   * @param renewer the account name that is allowed to renew the token.
481   * @return a new delegation token
482   * @throws IOException
483   */
484  @InterfaceAudience.Private()
485  public Token<?> getDelegationToken(String renewer) throws IOException {
486    return null;
487  }
488  
489  /**
490   * Obtain all delegation tokens used by this FileSystem that are not
491   * already present in the given Credentials.  Existing tokens will neither
492   * be verified as valid nor having the given renewer.  Missing tokens will
493   * be acquired and added to the given Credentials.
494   * 
495   * Default Impl: works for simple fs with its own token
496   * and also for an embedded fs whose tokens are those of its
497   * children file system (i.e. the embedded fs has not tokens of its
498   * own).
499   * 
500   * @param renewer the user allowed to renew the delegation tokens
501   * @param credentials cache in which to add new delegation tokens
502   * @return list of new delegation tokens
503   * @throws IOException
504   */
505  @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
506  public Token<?>[] addDelegationTokens(
507      final String renewer, Credentials credentials) throws IOException {
508    if (credentials == null) {
509      credentials = new Credentials();
510    }
511    final List<Token<?>> tokens = new ArrayList<Token<?>>();
512    collectDelegationTokens(renewer, credentials, tokens);
513    return tokens.toArray(new Token<?>[tokens.size()]);
514  }
515  
516  /**
517   * Recursively obtain the tokens for this FileSystem and all descended
518   * FileSystems as determined by getChildFileSystems().
519   * @param renewer the user allowed to renew the delegation tokens
520   * @param credentials cache in which to add the new delegation tokens
521   * @param tokens list in which to add acquired tokens
522   * @throws IOException
523   */
524  private void collectDelegationTokens(final String renewer,
525                                       final Credentials credentials,
526                                       final List<Token<?>> tokens)
527                                           throws IOException {
528    final String serviceName = getCanonicalServiceName();
529    // Collect token of the this filesystem and then of its embedded children
530    if (serviceName != null) { // fs has token, grab it
531      final Text service = new Text(serviceName);
532      Token<?> token = credentials.getToken(service);
533      if (token == null) {
534        token = getDelegationToken(renewer);
535        if (token != null) {
536          tokens.add(token);
537          credentials.addToken(service, token);
538        }
539      }
540    }
541    // Now collect the tokens from the children
542    final FileSystem[] children = getChildFileSystems();
543    if (children != null) {
544      for (final FileSystem fs : children) {
545        fs.collectDelegationTokens(renewer, credentials, tokens);
546      }
547    }
548  }
549
550  /**
551   * Get all the immediate child FileSystems embedded in this FileSystem.
552   * It does not recurse and get grand children.  If a FileSystem
553   * has multiple child FileSystems, then it should return a unique list
554   * of those FileSystems.  Default is to return null to signify no children.
555   * 
556   * @return FileSystems used by this FileSystem
557   */
558  @InterfaceAudience.LimitedPrivate({ "HDFS" })
559  @VisibleForTesting
560  public FileSystem[] getChildFileSystems() {
561    return null;
562  }
563  
564  /** create a file with the provided permission
565   * The permission of the file is set to be the provided permission as in
566   * setPermission, not permission&~umask
567   * 
568   * It is implemented using two RPCs. It is understood that it is inefficient,
569   * but the implementation is thread-safe. The other option is to change the
570   * value of umask in configuration to be 0, but it is not thread-safe.
571   * 
572   * @param fs file system handle
573   * @param file the name of the file to be created
574   * @param permission the permission of the file
575   * @return an output stream
576   * @throws IOException
577   */
578  public static FSDataOutputStream create(FileSystem fs,
579      Path file, FsPermission permission) throws IOException {
580    // create the file with default permission
581    FSDataOutputStream out = fs.create(file);
582    // set its permission to the supplied one
583    fs.setPermission(file, permission);
584    return out;
585  }
586
587  /** create a directory with the provided permission
588   * The permission of the directory is set to be the provided permission as in
589   * setPermission, not permission&~umask
590   * 
591   * @see #create(FileSystem, Path, FsPermission)
592   * 
593   * @param fs file system handle
594   * @param dir the name of the directory to be created
595   * @param permission the permission of the directory
596   * @return true if the directory creation succeeds; false otherwise
597   * @throws IOException
598   */
599  public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
600  throws IOException {
601    // create the directory using the default permission
602    boolean result = fs.mkdirs(dir);
603    // set its permission to be the supplied one
604    fs.setPermission(dir, permission);
605    return result;
606  }
607
608  ///////////////////////////////////////////////////////////////
609  // FileSystem
610  ///////////////////////////////////////////////////////////////
611
612  protected FileSystem() {
613    super(null);
614  }
615
616  /** 
617   * Check that a Path belongs to this FileSystem.
618   * @param path to check
619   */
620  protected void checkPath(Path path) {
621    URI uri = path.toUri();
622    String thatScheme = uri.getScheme();
623    if (thatScheme == null)                // fs is relative
624      return;
625    URI thisUri = getCanonicalUri();
626    String thisScheme = thisUri.getScheme();
627    //authority and scheme are not case sensitive
628    if (thisScheme.equalsIgnoreCase(thatScheme)) {// schemes match
629      String thisAuthority = thisUri.getAuthority();
630      String thatAuthority = uri.getAuthority();
631      if (thatAuthority == null &&                // path's authority is null
632          thisAuthority != null) {                // fs has an authority
633        URI defaultUri = getDefaultUri(getConf());
634        if (thisScheme.equalsIgnoreCase(defaultUri.getScheme())) {
635          uri = defaultUri; // schemes match, so use this uri instead
636        } else {
637          uri = null; // can't determine auth of the path
638        }
639      }
640      if (uri != null) {
641        // canonicalize uri before comparing with this fs
642        uri = canonicalizeUri(uri);
643        thatAuthority = uri.getAuthority();
644        if (thisAuthority == thatAuthority ||       // authorities match
645            (thisAuthority != null &&
646             thisAuthority.equalsIgnoreCase(thatAuthority)))
647          return;
648      }
649    }
650    throw new IllegalArgumentException("Wrong FS: "+path+
651                                       ", expected: "+this.getUri());
652  }
653
654  /**
655   * Return an array containing hostnames, offset and size of 
656   * portions of the given file.  For a nonexistent 
657   * file or regions, null will be returned.
658   *
659   * This call is most helpful with DFS, where it returns 
660   * hostnames of machines that contain the given file.
661   *
662   * The FileSystem will simply return an elt containing 'localhost'.
663   *
664   * @param file FilesStatus to get data from
665   * @param start offset into the given file
666   * @param len length for which to get locations for
667   */
668  public BlockLocation[] getFileBlockLocations(FileStatus file, 
669      long start, long len) throws IOException {
670    if (file == null) {
671      return null;
672    }
673
674    if (start < 0 || len < 0) {
675      throw new IllegalArgumentException("Invalid start or len parameter");
676    }
677
678    if (file.getLen() <= start) {
679      return new BlockLocation[0];
680
681    }
682    String[] name = { "localhost:50010" };
683    String[] host = { "localhost" };
684    return new BlockLocation[] {
685      new BlockLocation(name, host, 0, file.getLen()) };
686  }
687 
688
689  /**
690   * Return an array containing hostnames, offset and size of 
691   * portions of the given file.  For a nonexistent 
692   * file or regions, null will be returned.
693   *
694   * This call is most helpful with DFS, where it returns 
695   * hostnames of machines that contain the given file.
696   *
697   * The FileSystem will simply return an elt containing 'localhost'.
698   *
699   * @param p path is used to identify an FS since an FS could have
700   *          another FS that it could be delegating the call to
701   * @param start offset into the given file
702   * @param len length for which to get locations for
703   */
704  public BlockLocation[] getFileBlockLocations(Path p, 
705      long start, long len) throws IOException {
706    if (p == null) {
707      throw new NullPointerException();
708    }
709    FileStatus file = getFileStatus(p);
710    return getFileBlockLocations(file, start, len);
711  }
712  
713  /**
714   * Return a set of server default configuration values
715   * @return server default configuration values
716   * @throws IOException
717   * @deprecated use {@link #getServerDefaults(Path)} instead
718   */
719  @Deprecated
720  public FsServerDefaults getServerDefaults() throws IOException {
721    Configuration conf = getConf();
722    // CRC32 is chosen as default as it is available in all 
723    // releases that support checksum.
724    // The client trash configuration is ignored.
725    return new FsServerDefaults(getDefaultBlockSize(), 
726        conf.getInt("io.bytes.per.checksum", 512), 
727        64 * 1024, 
728        getDefaultReplication(),
729        conf.getInt("io.file.buffer.size", 4096),
730        false,
731        CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
732        DataChecksum.Type.CRC32);
733  }
734
735  /**
736   * Return a set of server default configuration values
737   * @param p path is used to identify an FS since an FS could have
738   *          another FS that it could be delegating the call to
739   * @return server default configuration values
740   * @throws IOException
741   */
742  public FsServerDefaults getServerDefaults(Path p) throws IOException {
743    return getServerDefaults();
744  }
745
746  /**
747   * Return the fully-qualified path of path f resolving the path
748   * through any symlinks or mount point
749   * @param p path to be resolved
750   * @return fully qualified path 
751   * @throws FileNotFoundException
752   */
753   public Path resolvePath(final Path p) throws IOException {
754     checkPath(p);
755     return getFileStatus(p).getPath();
756   }
757
758  /**
759   * Opens an FSDataInputStream at the indicated Path.
760   * @param f the file name to open
761   * @param bufferSize the size of the buffer to be used.
762   */
763  public abstract FSDataInputStream open(Path f, int bufferSize)
764    throws IOException;
765    
766  /**
767   * Opens an FSDataInputStream at the indicated Path.
768   * @param f the file to open
769   */
770  public FSDataInputStream open(Path f) throws IOException {
771    return open(f, getConf().getInt("io.file.buffer.size", 4096));
772  }
773
774  /**
775   * Create an FSDataOutputStream at the indicated Path.
776   * Files are overwritten by default.
777   * @param f the file to create
778   */
779  public FSDataOutputStream create(Path f) throws IOException {
780    return create(f, true);
781  }
782
783  /**
784   * Create an FSDataOutputStream at the indicated Path.
785   * @param f the file to create
786   * @param overwrite if a file with this name already exists, then if true,
787   *   the file will be overwritten, and if false an exception will be thrown.
788   */
789  public FSDataOutputStream create(Path f, boolean overwrite)
790      throws IOException {
791    return create(f, overwrite, 
792                  getConf().getInt("io.file.buffer.size", 4096),
793                  getDefaultReplication(f),
794                  getDefaultBlockSize(f));
795  }
796
797  /**
798   * Create an FSDataOutputStream at the indicated Path with write-progress
799   * reporting.
800   * Files are overwritten by default.
801   * @param f the file to create
802   * @param progress to report progress
803   */
804  public FSDataOutputStream create(Path f, Progressable progress) 
805      throws IOException {
806    return create(f, true, 
807                  getConf().getInt("io.file.buffer.size", 4096),
808                  getDefaultReplication(f),
809                  getDefaultBlockSize(f), progress);
810  }
811
812  /**
813   * Create an FSDataOutputStream at the indicated Path.
814   * Files are overwritten by default.
815   * @param f the file to create
816   * @param replication the replication factor
817   */
818  public FSDataOutputStream create(Path f, short replication)
819      throws IOException {
820    return create(f, true, 
821                  getConf().getInt("io.file.buffer.size", 4096),
822                  replication,
823                  getDefaultBlockSize(f));
824  }
825
826  /**
827   * Create an FSDataOutputStream at the indicated Path with write-progress
828   * reporting.
829   * Files are overwritten by default.
830   * @param f the file to create
831   * @param replication the replication factor
832   * @param progress to report progress
833   */
834  public FSDataOutputStream create(Path f, short replication, 
835      Progressable progress) throws IOException {
836    return create(f, true, 
837                  getConf().getInt(
838                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
839                      CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
840                  replication,
841                  getDefaultBlockSize(f), progress);
842  }
843
844    
845  /**
846   * Create an FSDataOutputStream at the indicated Path.
847   * @param f the file name to create
848   * @param overwrite if a file with this name already exists, then if true,
849   *   the file will be overwritten, and if false an error will be thrown.
850   * @param bufferSize the size of the buffer to be used.
851   */
852  public FSDataOutputStream create(Path f, 
853                                   boolean overwrite,
854                                   int bufferSize
855                                   ) throws IOException {
856    return create(f, overwrite, bufferSize, 
857                  getDefaultReplication(f),
858                  getDefaultBlockSize(f));
859  }
860    
861  /**
862   * Create an FSDataOutputStream at the indicated Path with write-progress
863   * reporting.
864   * @param f the path of the file to open
865   * @param overwrite if a file with this name already exists, then if true,
866   *   the file will be overwritten, and if false an error will be thrown.
867   * @param bufferSize the size of the buffer to be used.
868   */
869  public FSDataOutputStream create(Path f, 
870                                   boolean overwrite,
871                                   int bufferSize,
872                                   Progressable progress
873                                   ) throws IOException {
874    return create(f, overwrite, bufferSize, 
875                  getDefaultReplication(f),
876                  getDefaultBlockSize(f), progress);
877  }
878    
879    
880  /**
881   * Create an FSDataOutputStream at the indicated Path.
882   * @param f the file name to open
883   * @param overwrite if a file with this name already exists, then if true,
884   *   the file will be overwritten, and if false an error will be thrown.
885   * @param bufferSize the size of the buffer to be used.
886   * @param replication required block replication for the file. 
887   */
888  public FSDataOutputStream create(Path f, 
889                                   boolean overwrite,
890                                   int bufferSize,
891                                   short replication,
892                                   long blockSize
893                                   ) throws IOException {
894    return create(f, overwrite, bufferSize, replication, blockSize, null);
895  }
896
897  /**
898   * Create an FSDataOutputStream at the indicated Path with write-progress
899   * reporting.
900   * @param f the file name to open
901   * @param overwrite if a file with this name already exists, then if true,
902   *   the file will be overwritten, and if false an error will be thrown.
903   * @param bufferSize the size of the buffer to be used.
904   * @param replication required block replication for the file. 
905   */
906  public FSDataOutputStream create(Path f,
907                                            boolean overwrite,
908                                            int bufferSize,
909                                            short replication,
910                                            long blockSize,
911                                            Progressable progress
912                                            ) throws IOException {
913    return this.create(f, FsPermission.getFileDefault().applyUMask(
914        FsPermission.getUMask(getConf())), overwrite, bufferSize,
915        replication, blockSize, progress);
916  }
917
918  /**
919   * Create an FSDataOutputStream at the indicated Path with write-progress
920   * reporting.
921   * @param f the file name to open
922   * @param permission
923   * @param overwrite if a file with this name already exists, then if true,
924   *   the file will be overwritten, and if false an error will be thrown.
925   * @param bufferSize the size of the buffer to be used.
926   * @param replication required block replication for the file.
927   * @param blockSize
928   * @param progress
929   * @throws IOException
930   * @see #setPermission(Path, FsPermission)
931   */
932  public abstract FSDataOutputStream create(Path f,
933      FsPermission permission,
934      boolean overwrite,
935      int bufferSize,
936      short replication,
937      long blockSize,
938      Progressable progress) throws IOException;
939  
940  /**
941   * Create an FSDataOutputStream at the indicated Path with write-progress
942   * reporting.
943   * @param f the file name to open
944   * @param permission
945   * @param flags {@link CreateFlag}s to use for this stream.
946   * @param bufferSize the size of the buffer to be used.
947   * @param replication required block replication for the file.
948   * @param blockSize
949   * @param progress
950   * @throws IOException
951   * @see #setPermission(Path, FsPermission)
952   */
953  public FSDataOutputStream create(Path f,
954      FsPermission permission,
955      EnumSet<CreateFlag> flags,
956      int bufferSize,
957      short replication,
958      long blockSize,
959      Progressable progress) throws IOException {
960    return create(f, permission, flags, bufferSize, replication,
961        blockSize, progress, null);
962  }
963  
964  /**
965   * Create an FSDataOutputStream at the indicated Path with a custom
966   * checksum option
967   * @param f the file name to open
968   * @param permission
969   * @param flags {@link CreateFlag}s to use for this stream.
970   * @param bufferSize the size of the buffer to be used.
971   * @param replication required block replication for the file.
972   * @param blockSize
973   * @param progress
974   * @param checksumOpt checksum parameter. If null, the values
975   *        found in conf will be used.
976   * @throws IOException
977   * @see #setPermission(Path, FsPermission)
978   */
979  public FSDataOutputStream create(Path f,
980      FsPermission permission,
981      EnumSet<CreateFlag> flags,
982      int bufferSize,
983      short replication,
984      long blockSize,
985      Progressable progress,
986      ChecksumOpt checksumOpt) throws IOException {
987    // Checksum options are ignored by default. The file systems that
988    // implement checksum need to override this method. The full
989    // support is currently only available in DFS.
990    return create(f, permission, flags.contains(CreateFlag.OVERWRITE), 
991        bufferSize, replication, blockSize, progress);
992  }
993
994  /*.
995   * This create has been added to support the FileContext that processes
996   * the permission
997   * with umask before calling this method.
998   * This a temporary method added to support the transition from FileSystem
999   * to FileContext for user applications.
1000   */
1001  @Deprecated
1002  protected FSDataOutputStream primitiveCreate(Path f,
1003     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
1004     short replication, long blockSize, Progressable progress,
1005     ChecksumOpt checksumOpt) throws IOException {
1006
1007    boolean pathExists = exists(f);
1008    CreateFlag.validate(f, pathExists, flag);
1009    
1010    // Default impl  assumes that permissions do not matter and 
1011    // nor does the bytesPerChecksum  hence
1012    // calling the regular create is good enough.
1013    // FSs that implement permissions should override this.
1014
1015    if (pathExists && flag.contains(CreateFlag.APPEND)) {
1016      return append(f, bufferSize, progress);
1017    }
1018    
1019    return this.create(f, absolutePermission,
1020        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication,
1021        blockSize, progress);
1022  }
1023  
1024  /**
1025   * This version of the mkdirs method assumes that the permission is absolute.
1026   * It has been added to support the FileContext that processes the permission
1027   * with umask before calling this method.
1028   * This a temporary method added to support the transition from FileSystem
1029   * to FileContext for user applications.
1030   */
1031  @Deprecated
1032  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
1033    throws IOException {
1034    // Default impl is to assume that permissions do not matter and hence
1035    // calling the regular mkdirs is good enough.
1036    // FSs that implement permissions should override this.
1037   return this.mkdirs(f, absolutePermission);
1038  }
1039
1040
1041  /**
1042   * This version of the mkdirs method assumes that the permission is absolute.
1043   * It has been added to support the FileContext that processes the permission
1044   * with umask before calling this method.
1045   * This a temporary method added to support the transition from FileSystem
1046   * to FileContext for user applications.
1047   */
1048  @Deprecated
1049  protected void primitiveMkdir(Path f, FsPermission absolutePermission, 
1050                    boolean createParent)
1051    throws IOException {
1052    
1053    if (!createParent) { // parent must exist.
1054      // since the this.mkdirs makes parent dirs automatically
1055      // we must throw exception if parent does not exist.
1056      final FileStatus stat = getFileStatus(f.getParent());
1057      if (stat == null) {
1058        throw new FileNotFoundException("Missing parent:" + f);
1059      }
1060      if (!stat.isDirectory()) {
1061        throw new ParentNotDirectoryException("parent is not a dir");
1062      }
1063      // parent does exist - go ahead with mkdir of leaf
1064    }
1065    // Default impl is to assume that permissions do not matter and hence
1066    // calling the regular mkdirs is good enough.
1067    // FSs that implement permissions should override this.
1068    if (!this.mkdirs(f, absolutePermission)) {
1069      throw new IOException("mkdir of "+ f + " failed");
1070    }
1071  }
1072
1073  /**
1074   * Opens an FSDataOutputStream at the indicated Path with write-progress
1075   * reporting. Same as create(), except fails if parent directory doesn't
1076   * already exist.
1077   * @param f the file name to open
1078   * @param overwrite if a file with this name already exists, then if true,
1079   * the file will be overwritten, and if false an error will be thrown.
1080   * @param bufferSize the size of the buffer to be used.
1081   * @param replication required block replication for the file.
1082   * @param blockSize
1083   * @param progress
1084   * @throws IOException
1085   * @see #setPermission(Path, FsPermission)
1086   * @deprecated API only for 0.20-append
1087   */
1088  @Deprecated
1089  public FSDataOutputStream createNonRecursive(Path f,
1090      boolean overwrite,
1091      int bufferSize, short replication, long blockSize,
1092      Progressable progress) throws IOException {
1093    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1094        overwrite, bufferSize, replication, blockSize, progress);
1095  }
1096
1097  /**
1098   * Opens an FSDataOutputStream at the indicated Path with write-progress
1099   * reporting. Same as create(), except fails if parent directory doesn't
1100   * already exist.
1101   * @param f the file name to open
1102   * @param permission
1103   * @param overwrite if a file with this name already exists, then if true,
1104   * the file will be overwritten, and if false an error will be thrown.
1105   * @param bufferSize the size of the buffer to be used.
1106   * @param replication required block replication for the file.
1107   * @param blockSize
1108   * @param progress
1109   * @throws IOException
1110   * @see #setPermission(Path, FsPermission)
1111   * @deprecated API only for 0.20-append
1112   */
1113   @Deprecated
1114   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1115       boolean overwrite, int bufferSize, short replication, long blockSize,
1116       Progressable progress) throws IOException {
1117     return createNonRecursive(f, permission,
1118         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
1119             : EnumSet.of(CreateFlag.CREATE), bufferSize,
1120             replication, blockSize, progress);
1121   }
1122
1123   /**
1124    * Opens an FSDataOutputStream at the indicated Path with write-progress
1125    * reporting. Same as create(), except fails if parent directory doesn't
1126    * already exist.
1127    * @param f the file name to open
1128    * @param permission
1129    * @param flags {@link CreateFlag}s to use for this stream.
1130    * @param bufferSize the size of the buffer to be used.
1131    * @param replication required block replication for the file.
1132    * @param blockSize
1133    * @param progress
1134    * @throws IOException
1135    * @see #setPermission(Path, FsPermission)
1136    * @deprecated API only for 0.20-append
1137    */
1138    @Deprecated
1139    public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1140        EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1141        Progressable progress) throws IOException {
1142      throw new IOException("createNonRecursive unsupported for this filesystem "
1143          + this.getClass());
1144    }
1145
1146  /**
1147   * Creates the given Path as a brand-new zero-length file.  If
1148   * create fails, or if it already existed, return false.
1149   *
1150   * @param f path to use for create
1151   */
1152  public boolean createNewFile(Path f) throws IOException {
1153    if (exists(f)) {
1154      return false;
1155    } else {
1156      create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close();
1157      return true;
1158    }
1159  }
1160
1161  /**
1162   * Append to an existing file (optional operation).
1163   * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null)
1164   * @param f the existing file to be appended.
1165   * @throws IOException
1166   */
1167  public FSDataOutputStream append(Path f) throws IOException {
1168    return append(f, getConf().getInt("io.file.buffer.size", 4096), null);
1169  }
1170  /**
1171   * Append to an existing file (optional operation).
1172   * Same as append(f, bufferSize, null).
1173   * @param f the existing file to be appended.
1174   * @param bufferSize the size of the buffer to be used.
1175   * @throws IOException
1176   */
1177  public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
1178    return append(f, bufferSize, null);
1179  }
1180
1181  /**
1182   * Append to an existing file (optional operation).
1183   * @param f the existing file to be appended.
1184   * @param bufferSize the size of the buffer to be used.
1185   * @param progress for reporting progress if it is not null.
1186   * @throws IOException
1187   */
1188  public abstract FSDataOutputStream append(Path f, int bufferSize,
1189      Progressable progress) throws IOException;
1190
1191  /**
1192   * Concat existing files together.
1193   * @param trg the path to the target destination.
1194   * @param psrcs the paths to the sources to use for the concatenation.
1195   * @throws IOException
1196   */
1197  public void concat(final Path trg, final Path [] psrcs) throws IOException {
1198    throw new UnsupportedOperationException("Not implemented by the " + 
1199        getClass().getSimpleName() + " FileSystem implementation");
1200  }
1201
1202 /**
1203   * Get replication.
1204   * 
1205   * @deprecated Use getFileStatus() instead
1206   * @param src file name
1207   * @return file replication
1208   * @throws IOException
1209   */ 
1210  @Deprecated
1211  public short getReplication(Path src) throws IOException {
1212    return getFileStatus(src).getReplication();
1213  }
1214
1215  /**
1216   * Set replication for an existing file.
1217   * 
1218   * @param src file name
1219   * @param replication new replication
1220   * @throws IOException
1221   * @return true if successful;
1222   *         false if file does not exist or is a directory
1223   */
1224  public boolean setReplication(Path src, short replication)
1225    throws IOException {
1226    return true;
1227  }
1228
1229  /**
1230   * Renames Path src to Path dst.  Can take place on local fs
1231   * or remote DFS.
1232   * @param src path to be renamed
1233   * @param dst new path after rename
1234   * @throws IOException on failure
1235   * @return true if rename is successful
1236   */
1237  public abstract boolean rename(Path src, Path dst) throws IOException;
1238
1239  /**
1240   * Renames Path src to Path dst
1241   * <ul>
1242   * <li
1243   * <li>Fails if src is a file and dst is a directory.
1244   * <li>Fails if src is a directory and dst is a file.
1245   * <li>Fails if the parent of dst does not exist or is a file.
1246   * </ul>
1247   * <p>
1248   * If OVERWRITE option is not passed as an argument, rename fails
1249   * if the dst already exists.
1250   * <p>
1251   * If OVERWRITE option is passed as an argument, rename overwrites
1252   * the dst if it is a file or an empty directory. Rename fails if dst is
1253   * a non-empty directory.
1254   * <p>
1255   * Note that atomicity of rename is dependent on the file system
1256   * implementation. Please refer to the file system documentation for
1257   * details. This default implementation is non atomic.
1258   * <p>
1259   * This method is deprecated since it is a temporary method added to 
1260   * support the transition from FileSystem to FileContext for user 
1261   * applications.
1262   * 
1263   * @param src path to be renamed
1264   * @param dst new path after rename
1265   * @throws IOException on failure
1266   */
1267  @Deprecated
1268  protected void rename(final Path src, final Path dst,
1269      final Rename... options) throws IOException {
1270    // Default implementation
1271    final FileStatus srcStatus = getFileLinkStatus(src);
1272    if (srcStatus == null) {
1273      throw new FileNotFoundException("rename source " + src + " not found.");
1274    }
1275
1276    boolean overwrite = false;
1277    if (null != options) {
1278      for (Rename option : options) {
1279        if (option == Rename.OVERWRITE) {
1280          overwrite = true;
1281        }
1282      }
1283    }
1284
1285    FileStatus dstStatus;
1286    try {
1287      dstStatus = getFileLinkStatus(dst);
1288    } catch (IOException e) {
1289      dstStatus = null;
1290    }
1291    if (dstStatus != null) {
1292      if (srcStatus.isDirectory() != dstStatus.isDirectory()) {
1293        throw new IOException("Source " + src + " Destination " + dst
1294            + " both should be either file or directory");
1295      }
1296      if (!overwrite) {
1297        throw new FileAlreadyExistsException("rename destination " + dst
1298            + " already exists.");
1299      }
1300      // Delete the destination that is a file or an empty directory
1301      if (dstStatus.isDirectory()) {
1302        FileStatus[] list = listStatus(dst);
1303        if (list != null && list.length != 0) {
1304          throw new IOException(
1305              "rename cannot overwrite non empty destination directory " + dst);
1306        }
1307      }
1308      delete(dst, false);
1309    } else {
1310      final Path parent = dst.getParent();
1311      final FileStatus parentStatus = getFileStatus(parent);
1312      if (parentStatus == null) {
1313        throw new FileNotFoundException("rename destination parent " + parent
1314            + " not found.");
1315      }
1316      if (!parentStatus.isDirectory()) {
1317        throw new ParentNotDirectoryException("rename destination parent " + parent
1318            + " is a file.");
1319      }
1320    }
1321    if (!rename(src, dst)) {
1322      throw new IOException("rename from " + src + " to " + dst + " failed.");
1323    }
1324  }
1325
1326  /**
1327   * Truncate the file in the indicated path to the indicated size.
1328   * <ul>
1329   * <li>Fails if path is a directory.
1330   * <li>Fails if path does not exist.
1331   * <li>Fails if path is not closed.
1332   * <li>Fails if new size is greater than current size.
1333   * </ul>
1334   * @param f The path to the file to be truncated
1335   * @param newLength The size the file is to be truncated to
1336   *
1337   * @return <code>true</code> if the file has been truncated to the desired
1338   * <code>newLength</code> and is immediately available to be reused for
1339   * write operations such as <code>append</code>, or
1340   * <code>false</code> if a background process of adjusting the length of
1341   * the last block has been started, and clients should wait for it to
1342   * complete before proceeding with further file updates.
1343   */
1344  public boolean truncate(Path f, long newLength) throws IOException {
1345    throw new UnsupportedOperationException("Not implemented by the " +
1346        getClass().getSimpleName() + " FileSystem implementation");
1347  }
1348  
1349  /**
1350   * Delete a file 
1351   * @deprecated Use {@link #delete(Path, boolean)} instead.
1352   */
1353  @Deprecated
1354  public boolean delete(Path f) throws IOException {
1355    return delete(f, true);
1356  }
1357  
1358  /** Delete a file.
1359   *
1360   * @param f the path to delete.
1361   * @param recursive if path is a directory and set to 
1362   * true, the directory is deleted else throws an exception. In
1363   * case of a file the recursive can be set to either true or false. 
1364   * @return  true if delete is successful else false. 
1365   * @throws IOException
1366   */
1367  public abstract boolean delete(Path f, boolean recursive) throws IOException;
1368
1369  /**
1370   * Mark a path to be deleted when FileSystem is closed.
1371   * When the JVM shuts down,
1372   * all FileSystem objects will be closed automatically.
1373   * Then,
1374   * the marked path will be deleted as a result of closing the FileSystem.
1375   *
1376   * The path has to exist in the file system.
1377   * 
1378   * @param f the path to delete.
1379   * @return  true if deleteOnExit is successful, otherwise false.
1380   * @throws IOException
1381   */
1382  public boolean deleteOnExit(Path f) throws IOException {
1383    if (!exists(f)) {
1384      return false;
1385    }
1386    synchronized (deleteOnExit) {
1387      deleteOnExit.add(f);
1388    }
1389    return true;
1390  }
1391  
1392  /**
1393   * Cancel the deletion of the path when the FileSystem is closed
1394   * @param f the path to cancel deletion
1395   */
1396  public boolean cancelDeleteOnExit(Path f) {
1397    synchronized (deleteOnExit) {
1398      return deleteOnExit.remove(f);
1399    }
1400  }
1401
1402  /**
1403   * Delete all files that were marked as delete-on-exit. This recursively
1404   * deletes all files in the specified paths.
1405   */
1406  protected void processDeleteOnExit() {
1407    synchronized (deleteOnExit) {
1408      for (Iterator<Path> iter = deleteOnExit.iterator(); iter.hasNext();) {
1409        Path path = iter.next();
1410        try {
1411          if (exists(path)) {
1412            delete(path, true);
1413          }
1414        }
1415        catch (IOException e) {
1416          LOG.info("Ignoring failure to deleteOnExit for path " + path);
1417        }
1418        iter.remove();
1419      }
1420    }
1421  }
1422  
1423  /** Check if exists.
1424   * @param f source file
1425   */
1426  public boolean exists(Path f) throws IOException {
1427    try {
1428      return getFileStatus(f) != null;
1429    } catch (FileNotFoundException e) {
1430      return false;
1431    }
1432  }
1433
1434  /** True iff the named path is a directory.
1435   * Note: Avoid using this method. Instead reuse the FileStatus 
1436   * returned by getFileStatus() or listStatus() methods.
1437   * @param f path to check
1438   */
1439  public boolean isDirectory(Path f) throws IOException {
1440    try {
1441      return getFileStatus(f).isDirectory();
1442    } catch (FileNotFoundException e) {
1443      return false;               // f does not exist
1444    }
1445  }
1446
1447  /** True iff the named path is a regular file.
1448   * Note: Avoid using this method. Instead reuse the FileStatus 
1449   * returned by getFileStatus() or listStatus() methods.
1450   * @param f path to check
1451   */
1452  public boolean isFile(Path f) throws IOException {
1453    try {
1454      return getFileStatus(f).isFile();
1455    } catch (FileNotFoundException e) {
1456      return false;               // f does not exist
1457    }
1458  }
1459  
1460  /** The number of bytes in a file. */
1461  /** @deprecated Use getFileStatus() instead */
1462  @Deprecated
1463  public long getLength(Path f) throws IOException {
1464    return getFileStatus(f).getLen();
1465  }
1466    
1467  /** Return the {@link ContentSummary} of a given {@link Path}.
1468  * @param f path to use
1469  */
1470  public ContentSummary getContentSummary(Path f) throws IOException {
1471    FileStatus status = getFileStatus(f);
1472    if (status.isFile()) {
1473      // f is a file
1474      long length = status.getLen();
1475      return new ContentSummary.Builder().length(length).
1476          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1477    }
1478    // f is a directory
1479    long[] summary = {0, 0, 1};
1480    for(FileStatus s : listStatus(f)) {
1481      long length = s.getLen();
1482      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
1483          new ContentSummary.Builder().length(length).
1484          fileCount(1).directoryCount(0).spaceConsumed(length).build();
1485      summary[0] += c.getLength();
1486      summary[1] += c.getFileCount();
1487      summary[2] += c.getDirectoryCount();
1488    }
1489    return new ContentSummary.Builder().length(summary[0]).
1490        fileCount(summary[1]).directoryCount(summary[2]).
1491        spaceConsumed(summary[0]).build();
1492  }
1493
1494  final private static PathFilter DEFAULT_FILTER = new PathFilter() {
1495      @Override
1496      public boolean accept(Path file) {
1497        return true;
1498      }     
1499    };
1500    
1501  /**
1502   * List the statuses of the files/directories in the given path if the path is
1503   * a directory.
1504   * 
1505   * @param f given path
1506   * @return the statuses of the files/directories in the given patch
1507   * @throws FileNotFoundException when the path does not exist;
1508   *         IOException see specific implementation
1509   */
1510  public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, 
1511                                                         IOException;
1512    
1513  /*
1514   * Filter files/directories in the given path using the user-supplied path
1515   * filter. Results are added to the given array <code>results</code>.
1516   */
1517  private void listStatus(ArrayList<FileStatus> results, Path f,
1518      PathFilter filter) throws FileNotFoundException, IOException {
1519    FileStatus listing[] = listStatus(f);
1520    if (listing == null) {
1521      throw new IOException("Error accessing " + f);
1522    }
1523
1524    for (int i = 0; i < listing.length; i++) {
1525      if (filter.accept(listing[i].getPath())) {
1526        results.add(listing[i]);
1527      }
1528    }
1529  }
1530
1531  /**
1532   * @return an iterator over the corrupt files under the given path
1533   * (may contain duplicates if a file has more than one corrupt block)
1534   * @throws IOException
1535   */
1536  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
1537    throws IOException {
1538    throw new UnsupportedOperationException(getClass().getCanonicalName() +
1539                                            " does not support" +
1540                                            " listCorruptFileBlocks");
1541  }
1542
1543  /**
1544   * Filter files/directories in the given path using the user-supplied path
1545   * filter.
1546   * 
1547   * @param f
1548   *          a path name
1549   * @param filter
1550   *          the user-supplied path filter
1551   * @return an array of FileStatus objects for the files under the given path
1552   *         after applying the filter
1553   * @throws FileNotFoundException when the path does not exist;
1554   *         IOException see specific implementation   
1555   */
1556  public FileStatus[] listStatus(Path f, PathFilter filter) 
1557                                   throws FileNotFoundException, IOException {
1558    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1559    listStatus(results, f, filter);
1560    return results.toArray(new FileStatus[results.size()]);
1561  }
1562
1563  /**
1564   * Filter files/directories in the given list of paths using default
1565   * path filter.
1566   * 
1567   * @param files
1568   *          a list of paths
1569   * @return a list of statuses for the files under the given paths after
1570   *         applying the filter default Path filter
1571   * @throws FileNotFoundException when the path does not exist;
1572   *         IOException see specific implementation
1573   */
1574  public FileStatus[] listStatus(Path[] files)
1575      throws FileNotFoundException, IOException {
1576    return listStatus(files, DEFAULT_FILTER);
1577  }
1578
1579  /**
1580   * Filter files/directories in the given list of paths using user-supplied
1581   * path filter.
1582   * 
1583   * @param files
1584   *          a list of paths
1585   * @param filter
1586   *          the user-supplied path filter
1587   * @return a list of statuses for the files under the given paths after
1588   *         applying the filter
1589   * @throws FileNotFoundException when the path does not exist;
1590   *         IOException see specific implementation
1591   */
1592  public FileStatus[] listStatus(Path[] files, PathFilter filter)
1593      throws FileNotFoundException, IOException {
1594    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
1595    for (int i = 0; i < files.length; i++) {
1596      listStatus(results, files[i], filter);
1597    }
1598    return results.toArray(new FileStatus[results.size()]);
1599  }
1600
1601  /**
1602   * <p>Return all the files that match filePattern and are not checksum
1603   * files. Results are sorted by their names.
1604   * 
1605   * <p>
1606   * A filename pattern is composed of <i>regular</i> characters and
1607   * <i>special pattern matching</i> characters, which are:
1608   *
1609   * <dl>
1610   *  <dd>
1611   *   <dl>
1612   *    <p>
1613   *    <dt> <tt> ? </tt>
1614   *    <dd> Matches any single character.
1615   *
1616   *    <p>
1617   *    <dt> <tt> * </tt>
1618   *    <dd> Matches zero or more characters.
1619   *
1620   *    <p>
1621   *    <dt> <tt> [<i>abc</i>] </tt>
1622   *    <dd> Matches a single character from character set
1623   *     <tt>{<i>a,b,c</i>}</tt>.
1624   *
1625   *    <p>
1626   *    <dt> <tt> [<i>a</i>-<i>b</i>] </tt>
1627   *    <dd> Matches a single character from the character range
1628   *     <tt>{<i>a...b</i>}</tt>.  Note that character <tt><i>a</i></tt> must be
1629   *     lexicographically less than or equal to character <tt><i>b</i></tt>.
1630   *
1631   *    <p>
1632   *    <dt> <tt> [^<i>a</i>] </tt>
1633   *    <dd> Matches a single character that is not from character set or range
1634   *     <tt>{<i>a</i>}</tt>.  Note that the <tt>^</tt> character must occur
1635   *     immediately to the right of the opening bracket.
1636   *
1637   *    <p>
1638   *    <dt> <tt> \<i>c</i> </tt>
1639   *    <dd> Removes (escapes) any special meaning of character <i>c</i>.
1640   *
1641   *    <p>
1642   *    <dt> <tt> {ab,cd} </tt>
1643   *    <dd> Matches a string from the string set <tt>{<i>ab, cd</i>} </tt>
1644   *    
1645   *    <p>
1646   *    <dt> <tt> {ab,c{de,fh}} </tt>
1647   *    <dd> Matches a string from the string set <tt>{<i>ab, cde, cfh</i>}</tt>
1648   *
1649   *   </dl>
1650   *  </dd>
1651   * </dl>
1652   *
1653   * @param pathPattern a regular expression specifying a pth pattern
1654
1655   * @return an array of paths that match the path pattern
1656   * @throws IOException
1657   */
1658  public FileStatus[] globStatus(Path pathPattern) throws IOException {
1659    return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
1660  }
1661  
1662  /**
1663   * Return an array of FileStatus objects whose path names match pathPattern
1664   * and is accepted by the user-supplied path filter. Results are sorted by
1665   * their path names.
1666   * Return null if pathPattern has no glob and the path does not exist.
1667   * Return an empty array if pathPattern has a glob and no path matches it. 
1668   * 
1669   * @param pathPattern
1670   *          a regular expression specifying the path pattern
1671   * @param filter
1672   *          a user-supplied path filter
1673   * @return an array of FileStatus objects
1674   * @throws IOException if any I/O error occurs when fetching file status
1675   */
1676  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
1677      throws IOException {
1678    return new Globber(this, pathPattern, filter).glob();
1679  }
1680  
1681  /**
1682   * List the statuses of the files/directories in the given path if the path is
1683   * a directory. 
1684   * Return the file's status and block locations If the path is a file.
1685   * 
1686   * If a returned status is a file, it contains the file's block locations.
1687   * 
1688   * @param f is the path
1689   *
1690   * @return an iterator that traverses statuses of the files/directories 
1691   *         in the given path
1692   *
1693   * @throws FileNotFoundException If <code>f</code> does not exist
1694   * @throws IOException If an I/O error occurred
1695   */
1696  public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f)
1697  throws FileNotFoundException, IOException {
1698    return listLocatedStatus(f, DEFAULT_FILTER);
1699  }
1700
1701  /**
1702   * Listing a directory
1703   * The returned results include its block location if it is a file
1704   * The results are filtered by the given path filter
1705   * @param f a path
1706   * @param filter a path filter
1707   * @return an iterator that traverses statuses of the files/directories 
1708   *         in the given path
1709   * @throws FileNotFoundException if <code>f</code> does not exist
1710   * @throws IOException if any I/O error occurred
1711   */
1712  protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
1713      final PathFilter filter)
1714  throws FileNotFoundException, IOException {
1715    return new RemoteIterator<LocatedFileStatus>() {
1716      private final FileStatus[] stats = listStatus(f, filter);
1717      private int i = 0;
1718
1719      @Override
1720      public boolean hasNext() {
1721        return i<stats.length;
1722      }
1723
1724      @Override
1725      public LocatedFileStatus next() throws IOException {
1726        if (!hasNext()) {
1727          throw new NoSuchElementException("No more entry in " + f);
1728        }
1729        FileStatus result = stats[i++];
1730        BlockLocation[] locs = result.isFile() ?
1731            getFileBlockLocations(result.getPath(), 0, result.getLen()) :
1732            null;
1733        return new LocatedFileStatus(result, locs);
1734      }
1735    };
1736  }
1737
1738  /**
1739   * Returns a remote iterator so that followup calls are made on demand
1740   * while consuming the entries. Each file system implementation should
1741   * override this method and provide a more efficient implementation, if
1742   * possible. 
1743   *
1744   * @param p target path
1745   * @return remote iterator
1746   */
1747  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
1748  throws FileNotFoundException, IOException {
1749    return new RemoteIterator<FileStatus>() {
1750      private final FileStatus[] stats = listStatus(p);
1751      private int i = 0;
1752
1753      @Override
1754      public boolean hasNext() {
1755        return i<stats.length;
1756      }
1757
1758      @Override
1759      public FileStatus next() throws IOException {
1760        if (!hasNext()) {
1761          throw new NoSuchElementException("No more entry in " + p);
1762        }
1763        return stats[i++];
1764      }
1765    };
1766  }
1767
1768  /**
1769   * List the statuses and block locations of the files in the given path.
1770   * 
1771   * If the path is a directory, 
1772   *   if recursive is false, returns files in the directory;
1773   *   if recursive is true, return files in the subtree rooted at the path.
1774   * If the path is a file, return the file's status and block locations.
1775   * 
1776   * @param f is the path
1777   * @param recursive if the subdirectories need to be traversed recursively
1778   *
1779   * @return an iterator that traverses statuses of the files
1780   *
1781   * @throws FileNotFoundException when the path does not exist;
1782   *         IOException see specific implementation
1783   */
1784  public RemoteIterator<LocatedFileStatus> listFiles(
1785      final Path f, final boolean recursive)
1786  throws FileNotFoundException, IOException {
1787    return new RemoteIterator<LocatedFileStatus>() {
1788      private Stack<RemoteIterator<LocatedFileStatus>> itors = 
1789        new Stack<RemoteIterator<LocatedFileStatus>>();
1790      private RemoteIterator<LocatedFileStatus> curItor =
1791        listLocatedStatus(f);
1792      private LocatedFileStatus curFile;
1793     
1794      @Override
1795      public boolean hasNext() throws IOException {
1796        while (curFile == null) {
1797          if (curItor.hasNext()) {
1798            handleFileStat(curItor.next());
1799          } else if (!itors.empty()) {
1800            curItor = itors.pop();
1801          } else {
1802            return false;
1803          }
1804        }
1805        return true;
1806      }
1807
1808      /**
1809       * Process the input stat.
1810       * If it is a file, return the file stat.
1811       * If it is a directory, traverse the directory if recursive is true;
1812       * ignore it if recursive is false.
1813       * @param stat input status
1814       * @throws IOException if any IO error occurs
1815       */
1816      private void handleFileStat(LocatedFileStatus stat) throws IOException {
1817        if (stat.isFile()) { // file
1818          curFile = stat;
1819        } else if (recursive) { // directory
1820          itors.push(curItor);
1821          curItor = listLocatedStatus(stat.getPath());
1822        }
1823      }
1824
1825      @Override
1826      public LocatedFileStatus next() throws IOException {
1827        if (hasNext()) {
1828          LocatedFileStatus result = curFile;
1829          curFile = null;
1830          return result;
1831        } 
1832        throw new java.util.NoSuchElementException("No more entry in " + f);
1833      }
1834    };
1835  }
1836  
1837  /** Return the current user's home directory in this filesystem.
1838   * The default implementation returns "/user/$USER/".
1839   */
1840  public Path getHomeDirectory() {
1841    return this.makeQualified(
1842        new Path("/user/"+System.getProperty("user.name")));
1843  }
1844
1845
1846  /**
1847   * Set the current working directory for the given file system. All relative
1848   * paths will be resolved relative to it.
1849   * 
1850   * @param new_dir
1851   */
1852  public abstract void setWorkingDirectory(Path new_dir);
1853    
1854  /**
1855   * Get the current working directory for the given file system
1856   * @return the directory pathname
1857   */
1858  public abstract Path getWorkingDirectory();
1859  
1860  
1861  /**
1862   * Note: with the new FilesContext class, getWorkingDirectory()
1863   * will be removed. 
1864   * The working directory is implemented in FilesContext.
1865   * 
1866   * Some file systems like LocalFileSystem have an initial workingDir
1867   * that we use as the starting workingDir. For other file systems
1868   * like HDFS there is no built in notion of an initial workingDir.
1869   * 
1870   * @return if there is built in notion of workingDir then it
1871   * is returned; else a null is returned.
1872   */
1873  protected Path getInitialWorkingDirectory() {
1874    return null;
1875  }
1876
1877  /**
1878   * Call {@link #mkdirs(Path, FsPermission)} with default permission.
1879   */
1880  public boolean mkdirs(Path f) throws IOException {
1881    return mkdirs(f, FsPermission.getDirDefault());
1882  }
1883
1884  /**
1885   * Make the given file and all non-existent parents into
1886   * directories. Has the semantics of Unix 'mkdir -p'.
1887   * Existence of the directory hierarchy is not an error.
1888   * @param f path to create
1889   * @param permission to apply to f
1890   */
1891  public abstract boolean mkdirs(Path f, FsPermission permission
1892      ) throws IOException;
1893
1894  /**
1895   * The src file is on the local disk.  Add it to FS at
1896   * the given dst name and the source is kept intact afterwards
1897   * @param src path
1898   * @param dst path
1899   */
1900  public void copyFromLocalFile(Path src, Path dst)
1901    throws IOException {
1902    copyFromLocalFile(false, src, dst);
1903  }
1904
1905  /**
1906   * The src files is on the local disk.  Add it to FS at
1907   * the given dst name, removing the source afterwards.
1908   * @param srcs path
1909   * @param dst path
1910   */
1911  public void moveFromLocalFile(Path[] srcs, Path dst)
1912    throws IOException {
1913    copyFromLocalFile(true, true, srcs, dst);
1914  }
1915
1916  /**
1917   * The src file is on the local disk.  Add it to FS at
1918   * the given dst name, removing the source afterwards.
1919   * @param src path
1920   * @param dst path
1921   */
1922  public void moveFromLocalFile(Path src, Path dst)
1923    throws IOException {
1924    copyFromLocalFile(true, src, dst);
1925  }
1926
1927  /**
1928   * The src file is on the local disk.  Add it to FS at
1929   * the given dst name.
1930   * delSrc indicates if the source should be removed
1931   * @param delSrc whether to delete the src
1932   * @param src path
1933   * @param dst path
1934   */
1935  public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
1936    throws IOException {
1937    copyFromLocalFile(delSrc, true, src, dst);
1938  }
1939  
1940  /**
1941   * The src files are on the local disk.  Add it to FS at
1942   * the given dst name.
1943   * delSrc indicates if the source should be removed
1944   * @param delSrc whether to delete the src
1945   * @param overwrite whether to overwrite an existing file
1946   * @param srcs array of paths which are source
1947   * @param dst path
1948   */
1949  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1950                                Path[] srcs, Path dst)
1951    throws IOException {
1952    Configuration conf = getConf();
1953    FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
1954  }
1955  
1956  /**
1957   * The src file is on the local disk.  Add it to FS at
1958   * the given dst name.
1959   * delSrc indicates if the source should be removed
1960   * @param delSrc whether to delete the src
1961   * @param overwrite whether to overwrite an existing file
1962   * @param src path
1963   * @param dst path
1964   */
1965  public void copyFromLocalFile(boolean delSrc, boolean overwrite, 
1966                                Path src, Path dst)
1967    throws IOException {
1968    Configuration conf = getConf();
1969    FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf);
1970  }
1971    
1972  /**
1973   * The src file is under FS, and the dst is on the local disk.
1974   * Copy it from FS control to the local dst name.
1975   * @param src path
1976   * @param dst path
1977   */
1978  public void copyToLocalFile(Path src, Path dst) throws IOException {
1979    copyToLocalFile(false, src, dst);
1980  }
1981    
1982  /**
1983   * The src file is under FS, and the dst is on the local disk.
1984   * Copy it from FS control to the local dst name.
1985   * Remove the source afterwards
1986   * @param src path
1987   * @param dst path
1988   */
1989  public void moveToLocalFile(Path src, Path dst) throws IOException {
1990    copyToLocalFile(true, src, dst);
1991  }
1992
1993  /**
1994   * The src file is under FS, and the dst is on the local disk.
1995   * Copy it from FS control to the local dst name.
1996   * delSrc indicates if the src will be removed or not.
1997   * @param delSrc whether to delete the src
1998   * @param src path
1999   * @param dst path
2000   */   
2001  public void copyToLocalFile(boolean delSrc, Path src, Path dst)
2002    throws IOException {
2003    copyToLocalFile(delSrc, src, dst, false);
2004  }
2005  
2006    /**
2007   * The src file is under FS, and the dst is on the local disk. Copy it from FS
2008   * control to the local dst name. delSrc indicates if the src will be removed
2009   * or not. useRawLocalFileSystem indicates whether to use RawLocalFileSystem
2010   * as local file system or not. RawLocalFileSystem is non crc file system.So,
2011   * It will not create any crc files at local.
2012   * 
2013   * @param delSrc
2014   *          whether to delete the src
2015   * @param src
2016   *          path
2017   * @param dst
2018   *          path
2019   * @param useRawLocalFileSystem
2020   *          whether to use RawLocalFileSystem as local file system or not.
2021   * 
2022   * @throws IOException
2023   *           - if any IO error
2024   */
2025  public void copyToLocalFile(boolean delSrc, Path src, Path dst,
2026      boolean useRawLocalFileSystem) throws IOException {
2027    Configuration conf = getConf();
2028    FileSystem local = null;
2029    if (useRawLocalFileSystem) {
2030      local = getLocal(conf).getRawFileSystem();
2031    } else {
2032      local = getLocal(conf);
2033    }
2034    FileUtil.copy(this, src, local, dst, delSrc, conf);
2035  }
2036
2037  /**
2038   * Returns a local File that the user can write output to.  The caller
2039   * provides both the eventual FS target name and the local working
2040   * file.  If the FS is local, we write directly into the target.  If
2041   * the FS is remote, we write into the tmp local area.
2042   * @param fsOutputFile path of output file
2043   * @param tmpLocalFile path of local tmp file
2044   */
2045  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2046    throws IOException {
2047    return tmpLocalFile;
2048  }
2049
2050  /**
2051   * Called when we're all done writing to the target.  A local FS will
2052   * do nothing, because we've written to exactly the right place.  A remote
2053   * FS will copy the contents of tmpLocalFile to the correct target at
2054   * fsOutputFile.
2055   * @param fsOutputFile path of output file
2056   * @param tmpLocalFile path to local tmp file
2057   */
2058  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
2059    throws IOException {
2060    moveFromLocalFile(tmpLocalFile, fsOutputFile);
2061  }
2062
2063  /**
2064   * No more filesystem operations are needed.  Will
2065   * release any held locks.
2066   */
2067  @Override
2068  public void close() throws IOException {
2069    // delete all files that were marked as delete-on-exit.
2070    processDeleteOnExit();
2071    CACHE.remove(this.key, this);
2072  }
2073
2074  /** Return the total size of all files in the filesystem.*/
2075  public long getUsed() throws IOException{
2076    long used = 0;
2077    RemoteIterator<LocatedFileStatus> files = listFiles(new Path("/"), true);
2078    while (files.hasNext()) {
2079      used += files.next().getLen();
2080    }
2081    return used;
2082  }
2083  
2084  /**
2085   * Get the block size for a particular file.
2086   * @param f the filename
2087   * @return the number of bytes in a block
2088   */
2089  /** @deprecated Use getFileStatus() instead */
2090  @Deprecated
2091  public long getBlockSize(Path f) throws IOException {
2092    return getFileStatus(f).getBlockSize();
2093  }
2094
2095  /**
2096   * Return the number of bytes that large input files should be optimally
2097   * be split into to minimize i/o time.
2098   * @deprecated use {@link #getDefaultBlockSize(Path)} instead
2099   */
2100  @Deprecated
2101  public long getDefaultBlockSize() {
2102    // default to 32MB: large enough to minimize the impact of seeks
2103    return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
2104  }
2105    
2106  /** Return the number of bytes that large input files should be optimally
2107   * be split into to minimize i/o time.  The given path will be used to
2108   * locate the actual filesystem.  The full path does not have to exist.
2109   * @param f path of file
2110   * @return the default block size for the path's filesystem
2111   */
2112  public long getDefaultBlockSize(Path f) {
2113    return getDefaultBlockSize();
2114  }
2115
2116  /**
2117   * Get the default replication.
2118   * @deprecated use {@link #getDefaultReplication(Path)} instead
2119   */
2120  @Deprecated
2121  public short getDefaultReplication() { return 1; }
2122
2123  /**
2124   * Get the default replication for a path.   The given path will be used to
2125   * locate the actual filesystem.  The full path does not have to exist.
2126   * @param path of the file
2127   * @return default replication for the path's filesystem 
2128   */
2129  public short getDefaultReplication(Path path) {
2130    return getDefaultReplication();
2131  }
2132  
2133  /**
2134   * Return a file status object that represents the path.
2135   * @param f The path we want information from
2136   * @return a FileStatus object
2137   * @throws FileNotFoundException when the path does not exist;
2138   *         IOException see specific implementation
2139   */
2140  public abstract FileStatus getFileStatus(Path f) throws IOException;
2141
2142  /**
2143   * Checks if the user can access a path.  The mode specifies which access
2144   * checks to perform.  If the requested permissions are granted, then the
2145   * method returns normally.  If access is denied, then the method throws an
2146   * {@link AccessControlException}.
2147   * <p/>
2148   * The default implementation of this method calls {@link #getFileStatus(Path)}
2149   * and checks the returned permissions against the requested permissions.
2150   * Note that the getFileStatus call will be subject to authorization checks.
2151   * Typically, this requires search (execute) permissions on each directory in
2152   * the path's prefix, but this is implementation-defined.  Any file system
2153   * that provides a richer authorization model (such as ACLs) may override the
2154   * default implementation so that it checks against that model instead.
2155   * <p>
2156   * In general, applications should avoid using this method, due to the risk of
2157   * time-of-check/time-of-use race conditions.  The permissions on a file may
2158   * change immediately after the access call returns.  Most applications should
2159   * prefer running specific file system actions as the desired user represented
2160   * by a {@link UserGroupInformation}.
2161   *
2162   * @param path Path to check
2163   * @param mode type of access to check
2164   * @throws AccessControlException if access is denied
2165   * @throws FileNotFoundException if the path does not exist
2166   * @throws IOException see specific implementation
2167   */
2168  @InterfaceAudience.LimitedPrivate({"HDFS", "Hive"})
2169  public void access(Path path, FsAction mode) throws AccessControlException,
2170      FileNotFoundException, IOException {
2171    checkAccessPermissions(this.getFileStatus(path), mode);
2172  }
2173
2174  /**
2175   * This method provides the default implementation of
2176   * {@link #access(Path, FsAction)}.
2177   *
2178   * @param stat FileStatus to check
2179   * @param mode type of access to check
2180   * @throws IOException for any error
2181   */
2182  @InterfaceAudience.Private
2183  static void checkAccessPermissions(FileStatus stat, FsAction mode)
2184      throws IOException {
2185    FsPermission perm = stat.getPermission();
2186    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
2187    String user = ugi.getShortUserName();
2188    List<String> groups = Arrays.asList(ugi.getGroupNames());
2189    if (user.equals(stat.getOwner())) {
2190      if (perm.getUserAction().implies(mode)) {
2191        return;
2192      }
2193    } else if (groups.contains(stat.getGroup())) {
2194      if (perm.getGroupAction().implies(mode)) {
2195        return;
2196      }
2197    } else {
2198      if (perm.getOtherAction().implies(mode)) {
2199        return;
2200      }
2201    }
2202    throw new AccessControlException(String.format(
2203      "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat.getPath(),
2204      stat.getOwner(), stat.getGroup(), stat.isDirectory() ? "d" : "-", perm));
2205  }
2206
2207  /**
2208   * See {@link FileContext#fixRelativePart}
2209   */
2210  protected Path fixRelativePart(Path p) {
2211    if (p.isUriPathAbsolute()) {
2212      return p;
2213    } else {
2214      return new Path(getWorkingDirectory(), p);
2215    }
2216  }
2217
2218  /**
2219   * See {@link FileContext#createSymlink(Path, Path, boolean)}
2220   */
2221  public void createSymlink(final Path target, final Path link,
2222      final boolean createParent) throws AccessControlException,
2223      FileAlreadyExistsException, FileNotFoundException,
2224      ParentNotDirectoryException, UnsupportedFileSystemException, 
2225      IOException {
2226    // Supporting filesystems should override this method
2227    throw new UnsupportedOperationException(
2228        "Filesystem does not support symlinks!");
2229  }
2230
2231  /**
2232   * See {@link FileContext#getFileLinkStatus(Path)}
2233   */
2234  public FileStatus getFileLinkStatus(final Path f)
2235      throws AccessControlException, FileNotFoundException,
2236      UnsupportedFileSystemException, IOException {
2237    // Supporting filesystems should override this method
2238    return getFileStatus(f);
2239  }
2240
2241  /**
2242   * See {@link AbstractFileSystem#supportsSymlinks()}
2243   */
2244  public boolean supportsSymlinks() {
2245    return false;
2246  }
2247
2248  /**
2249   * See {@link FileContext#getLinkTarget(Path)}
2250   */
2251  public Path getLinkTarget(Path f) throws IOException {
2252    // Supporting filesystems should override this method
2253    throw new UnsupportedOperationException(
2254        "Filesystem does not support symlinks!");
2255  }
2256
2257  /**
2258   * See {@link AbstractFileSystem#getLinkTarget(Path)}
2259   */
2260  protected Path resolveLink(Path f) throws IOException {
2261    // Supporting filesystems should override this method
2262    throw new UnsupportedOperationException(
2263        "Filesystem does not support symlinks!");
2264  }
2265
2266  /**
2267   * Get the checksum of a file.
2268   *
2269   * @param f The file path
2270   * @return The file checksum.  The default return value is null,
2271   *  which indicates that no checksum algorithm is implemented
2272   *  in the corresponding FileSystem.
2273   */
2274  public FileChecksum getFileChecksum(Path f) throws IOException {
2275    return getFileChecksum(f, Long.MAX_VALUE);
2276  }
2277
2278  /**
2279   * Get the checksum of a file, from the beginning of the file till the
2280   * specific length.
2281   * @param f The file path
2282   * @param length The length of the file range for checksum calculation
2283   * @return The file checksum.
2284   */
2285  public FileChecksum getFileChecksum(Path f, final long length)
2286      throws IOException {
2287    return null;
2288  }
2289
2290  /**
2291   * Set the verify checksum flag. This is only applicable if the 
2292   * corresponding FileSystem supports checksum. By default doesn't do anything.
2293   * @param verifyChecksum
2294   */
2295  public void setVerifyChecksum(boolean verifyChecksum) {
2296    //doesn't do anything
2297  }
2298
2299  /**
2300   * Set the write checksum flag. This is only applicable if the 
2301   * corresponding FileSystem supports checksum. By default doesn't do anything.
2302   * @param writeChecksum
2303   */
2304  public void setWriteChecksum(boolean writeChecksum) {
2305    //doesn't do anything
2306  }
2307
2308  /**
2309   * Returns a status object describing the use and capacity of the
2310   * file system. If the file system has multiple partitions, the
2311   * use and capacity of the root partition is reflected.
2312   * 
2313   * @return a FsStatus object
2314   * @throws IOException
2315   *           see specific implementation
2316   */
2317  public FsStatus getStatus() throws IOException {
2318    return getStatus(null);
2319  }
2320
2321  /**
2322   * Returns a status object describing the use and capacity of the
2323   * file system. If the file system has multiple partitions, the
2324   * use and capacity of the partition pointed to by the specified
2325   * path is reflected.
2326   * @param p Path for which status should be obtained. null means
2327   * the default partition. 
2328   * @return a FsStatus object
2329   * @throws IOException
2330   *           see specific implementation
2331   */
2332  public FsStatus getStatus(Path p) throws IOException {
2333    return new FsStatus(Long.MAX_VALUE, 0, Long.MAX_VALUE);
2334  }
2335
2336  /**
2337   * Set permission of a path.
2338   * @param p
2339   * @param permission
2340   */
2341  public void setPermission(Path p, FsPermission permission
2342      ) throws IOException {
2343  }
2344
2345  /**
2346   * Set owner of a path (i.e. a file or a directory).
2347   * The parameters username and groupname cannot both be null.
2348   * @param p The path
2349   * @param username If it is null, the original username remains unchanged.
2350   * @param groupname If it is null, the original groupname remains unchanged.
2351   */
2352  public void setOwner(Path p, String username, String groupname
2353      ) throws IOException {
2354  }
2355
2356  /**
2357   * Set access time of a file
2358   * @param p The path
2359   * @param mtime Set the modification time of this file.
2360   *              The number of milliseconds since Jan 1, 1970. 
2361   *              A value of -1 means that this call should not set modification time.
2362   * @param atime Set the access time of this file.
2363   *              The number of milliseconds since Jan 1, 1970. 
2364   *              A value of -1 means that this call should not set access time.
2365   */
2366  public void setTimes(Path p, long mtime, long atime
2367      ) throws IOException {
2368  }
2369
2370  /**
2371   * Create a snapshot with a default name.
2372   * @param path The directory where snapshots will be taken.
2373   * @return the snapshot path.
2374   */
2375  public final Path createSnapshot(Path path) throws IOException {
2376    return createSnapshot(path, null);
2377  }
2378
2379  /**
2380   * Create a snapshot
2381   * @param path The directory where snapshots will be taken.
2382   * @param snapshotName The name of the snapshot
2383   * @return the snapshot path.
2384   */
2385  public Path createSnapshot(Path path, String snapshotName)
2386      throws IOException {
2387    throw new UnsupportedOperationException(getClass().getSimpleName()
2388        + " doesn't support createSnapshot");
2389  }
2390  
2391  /**
2392   * Rename a snapshot
2393   * @param path The directory path where the snapshot was taken
2394   * @param snapshotOldName Old name of the snapshot
2395   * @param snapshotNewName New name of the snapshot
2396   * @throws IOException
2397   */
2398  public void renameSnapshot(Path path, String snapshotOldName,
2399      String snapshotNewName) throws IOException {
2400    throw new UnsupportedOperationException(getClass().getSimpleName()
2401        + " doesn't support renameSnapshot");
2402  }
2403  
2404  /**
2405   * Delete a snapshot of a directory
2406   * @param path  The directory that the to-be-deleted snapshot belongs to
2407   * @param snapshotName The name of the snapshot
2408   */
2409  public void deleteSnapshot(Path path, String snapshotName)
2410      throws IOException {
2411    throw new UnsupportedOperationException(getClass().getSimpleName()
2412        + " doesn't support deleteSnapshot");
2413  }
2414  
2415  /**
2416   * Modifies ACL entries of files and directories.  This method can add new ACL
2417   * entries or modify the permissions on existing ACL entries.  All existing
2418   * ACL entries that are not specified in this call are retained without
2419   * changes.  (Modifications are merged into the current ACL.)
2420   *
2421   * @param path Path to modify
2422   * @param aclSpec List<AclEntry> describing modifications
2423   * @throws IOException if an ACL could not be modified
2424   */
2425  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
2426      throws IOException {
2427    throw new UnsupportedOperationException(getClass().getSimpleName()
2428        + " doesn't support modifyAclEntries");
2429  }
2430
2431  /**
2432   * Removes ACL entries from files and directories.  Other ACL entries are
2433   * retained.
2434   *
2435   * @param path Path to modify
2436   * @param aclSpec List<AclEntry> describing entries to remove
2437   * @throws IOException if an ACL could not be modified
2438   */
2439  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
2440      throws IOException {
2441    throw new UnsupportedOperationException(getClass().getSimpleName()
2442        + " doesn't support removeAclEntries");
2443  }
2444
2445  /**
2446   * Removes all default ACL entries from files and directories.
2447   *
2448   * @param path Path to modify
2449   * @throws IOException if an ACL could not be modified
2450   */
2451  public void removeDefaultAcl(Path path)
2452      throws IOException {
2453    throw new UnsupportedOperationException(getClass().getSimpleName()
2454        + " doesn't support removeDefaultAcl");
2455  }
2456
2457  /**
2458   * Removes all but the base ACL entries of files and directories.  The entries
2459   * for user, group, and others are retained for compatibility with permission
2460   * bits.
2461   *
2462   * @param path Path to modify
2463   * @throws IOException if an ACL could not be removed
2464   */
2465  public void removeAcl(Path path)
2466      throws IOException {
2467    throw new UnsupportedOperationException(getClass().getSimpleName()
2468        + " doesn't support removeAcl");
2469  }
2470
2471  /**
2472   * Fully replaces ACL of files and directories, discarding all existing
2473   * entries.
2474   *
2475   * @param path Path to modify
2476   * @param aclSpec List<AclEntry> describing modifications, must include entries
2477   *   for user, group, and others for compatibility with permission bits.
2478   * @throws IOException if an ACL could not be modified
2479   */
2480  public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
2481    throw new UnsupportedOperationException(getClass().getSimpleName()
2482        + " doesn't support setAcl");
2483  }
2484
2485  /**
2486   * Gets the ACL of a file or directory.
2487   *
2488   * @param path Path to get
2489   * @return AclStatus describing the ACL of the file or directory
2490   * @throws IOException if an ACL could not be read
2491   */
2492  public AclStatus getAclStatus(Path path) throws IOException {
2493    throw new UnsupportedOperationException(getClass().getSimpleName()
2494        + " doesn't support getAclStatus");
2495  }
2496
2497  /**
2498   * Set an xattr of a file or directory.
2499   * The name must be prefixed with the namespace followed by ".". For example,
2500   * "user.attr".
2501   * <p/>
2502   * Refer to the HDFS extended attributes user documentation for details.
2503   *
2504   * @param path Path to modify
2505   * @param name xattr name.
2506   * @param value xattr value.
2507   * @throws IOException
2508   */
2509  public void setXAttr(Path path, String name, byte[] value)
2510      throws IOException {
2511    setXAttr(path, name, value, EnumSet.of(XAttrSetFlag.CREATE,
2512        XAttrSetFlag.REPLACE));
2513  }
2514
2515  /**
2516   * Set an xattr of a file or directory.
2517   * The name must be prefixed with the namespace followed by ".". For example,
2518   * "user.attr".
2519   * <p/>
2520   * Refer to the HDFS extended attributes user documentation for details.
2521   *
2522   * @param path Path to modify
2523   * @param name xattr name.
2524   * @param value xattr value.
2525   * @param flag xattr set flag
2526   * @throws IOException
2527   */
2528  public void setXAttr(Path path, String name, byte[] value,
2529      EnumSet<XAttrSetFlag> flag) throws IOException {
2530    throw new UnsupportedOperationException(getClass().getSimpleName()
2531        + " doesn't support setXAttr");
2532  }
2533
2534  /**
2535   * Get an xattr name and value for a file or directory.
2536   * The name must be prefixed with the namespace followed by ".". For example,
2537   * "user.attr".
2538   * <p/>
2539   * Refer to the HDFS extended attributes user documentation for details.
2540   *
2541   * @param path Path to get extended attribute
2542   * @param name xattr name.
2543   * @return byte[] xattr value.
2544   * @throws IOException
2545   */
2546  public byte[] getXAttr(Path path, String name) throws IOException {
2547    throw new UnsupportedOperationException(getClass().getSimpleName()
2548        + " doesn't support getXAttr");
2549  }
2550
2551  /**
2552   * Get all of the xattr name/value pairs for a file or directory.
2553   * Only those xattrs which the logged-in user has permissions to view
2554   * are returned.
2555   * <p/>
2556   * Refer to the HDFS extended attributes user documentation for details.
2557   *
2558   * @param path Path to get extended attributes
2559   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2560   * @throws IOException
2561   */
2562  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
2563    throw new UnsupportedOperationException(getClass().getSimpleName()
2564        + " doesn't support getXAttrs");
2565  }
2566
2567  /**
2568   * Get all of the xattrs name/value pairs for a file or directory.
2569   * Only those xattrs which the logged-in user has permissions to view
2570   * are returned.
2571   * <p/>
2572   * Refer to the HDFS extended attributes user documentation for details.
2573   *
2574   * @param path Path to get extended attributes
2575   * @param names XAttr names.
2576   * @return Map<String, byte[]> describing the XAttrs of the file or directory
2577   * @throws IOException
2578   */
2579  public Map<String, byte[]> getXAttrs(Path path, List<String> names)
2580      throws IOException {
2581    throw new UnsupportedOperationException(getClass().getSimpleName()
2582        + " doesn't support getXAttrs");
2583  }
2584
2585  /**
2586   * Get all of the xattr names for a file or directory.
2587   * Only those xattr names which the logged-in user has permissions to view
2588   * are returned.
2589   * <p/>
2590   * Refer to the HDFS extended attributes user documentation for details.
2591   *
2592   * @param path Path to get extended attributes
2593   * @return List<String> of the XAttr names of the file or directory
2594   * @throws IOException
2595   */
2596  public List<String> listXAttrs(Path path) throws IOException {
2597    throw new UnsupportedOperationException(getClass().getSimpleName()
2598            + " doesn't support listXAttrs");
2599  }
2600
2601  /**
2602   * Remove an xattr of a file or directory.
2603   * The name must be prefixed with the namespace followed by ".". For example,
2604   * "user.attr".
2605   * <p/>
2606   * Refer to the HDFS extended attributes user documentation for details.
2607   *
2608   * @param path Path to remove extended attribute
2609   * @param name xattr name
2610   * @throws IOException
2611   */
2612  public void removeXAttr(Path path, String name) throws IOException {
2613    throw new UnsupportedOperationException(getClass().getSimpleName()
2614        + " doesn't support removeXAttr");
2615  }
2616
2617  /**
2618   * Set the storage policy for a given file or directory.
2619   *
2620   * @param src file or directory path.
2621   * @param policyName the name of the target storage policy. The list
2622   *                   of supported Storage policies can be retrieved
2623   *                   via {@link #getAllStoragePolicies}.
2624   * @throws IOException
2625   */
2626  public void setStoragePolicy(final Path src, final String policyName)
2627      throws IOException {
2628    throw new UnsupportedOperationException(getClass().getSimpleName()
2629        + " doesn't support setStoragePolicy");
2630  }
2631
2632  /**
2633   * Query the effective storage policy ID for the given file or directory.
2634   *
2635   * @param src file or directory path.
2636   * @return storage policy for give file.
2637   * @throws IOException
2638   */
2639  public BlockStoragePolicySpi getStoragePolicy(final Path src)
2640      throws IOException {
2641    throw new UnsupportedOperationException(getClass().getSimpleName()
2642        + " doesn't support getStoragePolicy");
2643  }
2644
2645  /**
2646   * Retrieve all the storage policies supported by this file system.
2647   *
2648   * @return all storage policies supported by this filesystem.
2649   * @throws IOException
2650   */
2651  public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
2652      throws IOException {
2653    throw new UnsupportedOperationException(getClass().getSimpleName()
2654        + " doesn't support getAllStoragePolicies");
2655  }
2656
2657  // making it volatile to be able to do a double checked locking
2658  private volatile static boolean FILE_SYSTEMS_LOADED = false;
2659
2660  private static final Map<String, Class<? extends FileSystem>>
2661    SERVICE_FILE_SYSTEMS = new HashMap<String, Class<? extends FileSystem>>();
2662
2663  private static void loadFileSystems() {
2664    synchronized (FileSystem.class) {
2665      if (!FILE_SYSTEMS_LOADED) {
2666        ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
2667        for (FileSystem fs : serviceLoader) {
2668          SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
2669        }
2670        FILE_SYSTEMS_LOADED = true;
2671      }
2672    }
2673  }
2674
2675  public static Class<? extends FileSystem> getFileSystemClass(String scheme,
2676      Configuration conf) throws IOException {
2677    if (!FILE_SYSTEMS_LOADED) {
2678      loadFileSystems();
2679    }
2680    Class<? extends FileSystem> clazz = null;
2681    if (conf != null) {
2682      clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
2683    }
2684    if (clazz == null) {
2685      clazz = SERVICE_FILE_SYSTEMS.get(scheme);
2686    }
2687    if (clazz == null) {
2688      throw new IOException("No FileSystem for scheme: " + scheme);
2689    }
2690    return clazz;
2691  }
2692
2693  private static FileSystem createFileSystem(URI uri, Configuration conf
2694      ) throws IOException {
2695    TraceScope scope = Trace.startSpan("FileSystem#createFileSystem");
2696    Span span = scope.getSpan();
2697    if (span != null) {
2698      span.addKVAnnotation("scheme", uri.getScheme());
2699    }
2700    try {
2701      Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
2702      FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
2703      fs.initialize(uri, conf);
2704      return fs;
2705    } finally {
2706      scope.close();
2707    }
2708  }
2709
2710  /** Caching FileSystem objects */
2711  static class Cache {
2712    private final ClientFinalizer clientFinalizer = new ClientFinalizer();
2713
2714    private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
2715    private final Set<Key> toAutoClose = new HashSet<Key>();
2716
2717    /** A variable that makes all objects in the cache unique */
2718    private static AtomicLong unique = new AtomicLong(1);
2719
2720    FileSystem get(URI uri, Configuration conf) throws IOException{
2721      Key key = new Key(uri, conf);
2722      return getInternal(uri, conf, key);
2723    }
2724
2725    /** The objects inserted into the cache using this method are all unique */
2726    FileSystem getUnique(URI uri, Configuration conf) throws IOException{
2727      Key key = new Key(uri, conf, unique.getAndIncrement());
2728      return getInternal(uri, conf, key);
2729    }
2730
2731    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
2732      FileSystem fs;
2733      synchronized (this) {
2734        fs = map.get(key);
2735      }
2736      if (fs != null) {
2737        return fs;
2738      }
2739
2740      fs = createFileSystem(uri, conf);
2741      synchronized (this) { // refetch the lock again
2742        FileSystem oldfs = map.get(key);
2743        if (oldfs != null) { // a file system is created while lock is releasing
2744          fs.close(); // close the new file system
2745          return oldfs;  // return the old file system
2746        }
2747        
2748        // now insert the new file system into the map
2749        if (map.isEmpty()
2750                && !ShutdownHookManager.get().isShutdownInProgress()) {
2751          ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
2752        }
2753        fs.key = key;
2754        map.put(key, fs);
2755        if (conf.getBoolean("fs.automatic.close", true)) {
2756          toAutoClose.add(key);
2757        }
2758        return fs;
2759      }
2760    }
2761
2762    synchronized void remove(Key key, FileSystem fs) {
2763      FileSystem cachedFs = map.remove(key);
2764      if (fs == cachedFs) {
2765        toAutoClose.remove(key);
2766      } else if (cachedFs != null) {
2767        map.put(key, cachedFs);
2768      }
2769    }
2770
2771    synchronized void closeAll() throws IOException {
2772      closeAll(false);
2773    }
2774
2775    /**
2776     * Close all FileSystem instances in the Cache.
2777     * @param onlyAutomatic only close those that are marked for automatic closing
2778     */
2779    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
2780      List<IOException> exceptions = new ArrayList<IOException>();
2781
2782      // Make a copy of the keys in the map since we'll be modifying
2783      // the map while iterating over it, which isn't safe.
2784      List<Key> keys = new ArrayList<Key>();
2785      keys.addAll(map.keySet());
2786
2787      for (Key key : keys) {
2788        final FileSystem fs = map.get(key);
2789
2790        if (onlyAutomatic && !toAutoClose.contains(key)) {
2791          continue;
2792        }
2793
2794        //remove from cache
2795        map.remove(key);
2796        toAutoClose.remove(key);
2797
2798        if (fs != null) {
2799          try {
2800            fs.close();
2801          }
2802          catch(IOException ioe) {
2803            exceptions.add(ioe);
2804          }
2805        }
2806      }
2807
2808      if (!exceptions.isEmpty()) {
2809        throw MultipleIOException.createIOException(exceptions);
2810      }
2811    }
2812
2813    private class ClientFinalizer implements Runnable {
2814      @Override
2815      public synchronized void run() {
2816        try {
2817          closeAll(true);
2818        } catch (IOException e) {
2819          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
2820        }
2821      }
2822    }
2823
2824    synchronized void closeAll(UserGroupInformation ugi) throws IOException {
2825      List<FileSystem> targetFSList = new ArrayList<FileSystem>();
2826      //Make a pass over the list and collect the filesystems to close
2827      //we cannot close inline since close() removes the entry from the Map
2828      for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
2829        final Key key = entry.getKey();
2830        final FileSystem fs = entry.getValue();
2831        if (ugi.equals(key.ugi) && fs != null) {
2832          targetFSList.add(fs);   
2833        }
2834      }
2835      List<IOException> exceptions = new ArrayList<IOException>();
2836      //now make a pass over the target list and close each
2837      for (FileSystem fs : targetFSList) {
2838        try {
2839          fs.close();
2840        }
2841        catch(IOException ioe) {
2842          exceptions.add(ioe);
2843        }
2844      }
2845      if (!exceptions.isEmpty()) {
2846        throw MultipleIOException.createIOException(exceptions);
2847      }
2848    }
2849
2850    /** FileSystem.Cache.Key */
2851    static class Key {
2852      final String scheme;
2853      final String authority;
2854      final UserGroupInformation ugi;
2855      final long unique;   // an artificial way to make a key unique
2856
2857      Key(URI uri, Configuration conf) throws IOException {
2858        this(uri, conf, 0);
2859      }
2860
2861      Key(URI uri, Configuration conf, long unique) throws IOException {
2862        scheme = uri.getScheme()==null ?
2863            "" : StringUtils.toLowerCase(uri.getScheme());
2864        authority = uri.getAuthority()==null ?
2865            "" : StringUtils.toLowerCase(uri.getAuthority());
2866        this.unique = unique;
2867        
2868        this.ugi = UserGroupInformation.getCurrentUser();
2869      }
2870
2871      @Override
2872      public int hashCode() {
2873        return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
2874      }
2875
2876      static boolean isEqual(Object a, Object b) {
2877        return a == b || (a != null && a.equals(b));        
2878      }
2879
2880      @Override
2881      public boolean equals(Object obj) {
2882        if (obj == this) {
2883          return true;
2884        }
2885        if (obj != null && obj instanceof Key) {
2886          Key that = (Key)obj;
2887          return isEqual(this.scheme, that.scheme)
2888                 && isEqual(this.authority, that.authority)
2889                 && isEqual(this.ugi, that.ugi)
2890                 && (this.unique == that.unique);
2891        }
2892        return false;        
2893      }
2894
2895      @Override
2896      public String toString() {
2897        return "("+ugi.toString() + ")@" + scheme + "://" + authority;        
2898      }
2899    }
2900  }
2901  
2902  /**
2903   * Tracks statistics about how many reads, writes, and so forth have been
2904   * done in a FileSystem.
2905   * 
2906   * Since there is only one of these objects per FileSystem, there will 
2907   * typically be many threads writing to this object.  Almost every operation
2908   * on an open file will involve a write to this object.  In contrast, reading
2909   * statistics is done infrequently by most programs, and not at all by others.
2910   * Hence, this is optimized for writes.
2911   * 
2912   * Each thread writes to its own thread-local area of memory.  This removes 
2913   * contention and allows us to scale up to many, many threads.  To read
2914   * statistics, the reader thread totals up the contents of all of the 
2915   * thread-local data areas.
2916   */
2917  public static final class Statistics {
2918    /**
2919     * Statistics data.
2920     * 
2921     * There is only a single writer to thread-local StatisticsData objects.
2922     * Hence, volatile is adequate here-- we do not need AtomicLong or similar
2923     * to prevent lost updates.
2924     * The Java specification guarantees that updates to volatile longs will
2925     * be perceived as atomic with respect to other threads, which is all we
2926     * need.
2927     */
2928    public static class StatisticsData {
2929      volatile long bytesRead;
2930      volatile long bytesWritten;
2931      volatile int readOps;
2932      volatile int largeReadOps;
2933      volatile int writeOps;
2934
2935      /**
2936       * Add another StatisticsData object to this one.
2937       */
2938      void add(StatisticsData other) {
2939        this.bytesRead += other.bytesRead;
2940        this.bytesWritten += other.bytesWritten;
2941        this.readOps += other.readOps;
2942        this.largeReadOps += other.largeReadOps;
2943        this.writeOps += other.writeOps;
2944      }
2945
2946      /**
2947       * Negate the values of all statistics.
2948       */
2949      void negate() {
2950        this.bytesRead = -this.bytesRead;
2951        this.bytesWritten = -this.bytesWritten;
2952        this.readOps = -this.readOps;
2953        this.largeReadOps = -this.largeReadOps;
2954        this.writeOps = -this.writeOps;
2955      }
2956
2957      @Override
2958      public String toString() {
2959        return bytesRead + " bytes read, " + bytesWritten + " bytes written, "
2960            + readOps + " read ops, " + largeReadOps + " large read ops, "
2961            + writeOps + " write ops";
2962      }
2963      
2964      public long getBytesRead() {
2965        return bytesRead;
2966      }
2967      
2968      public long getBytesWritten() {
2969        return bytesWritten;
2970      }
2971      
2972      public int getReadOps() {
2973        return readOps;
2974      }
2975      
2976      public int getLargeReadOps() {
2977        return largeReadOps;
2978      }
2979      
2980      public int getWriteOps() {
2981        return writeOps;
2982      }
2983    }
2984
2985    private interface StatisticsAggregator<T> {
2986      void accept(StatisticsData data);
2987      T aggregate();
2988    }
2989
2990    private final String scheme;
2991
2992    /**
2993     * rootData is data that doesn't belong to any thread, but will be added
2994     * to the totals.  This is useful for making copies of Statistics objects,
2995     * and for storing data that pertains to threads that have been garbage
2996     * collected.  Protected by the Statistics lock.
2997     */
2998    private final StatisticsData rootData;
2999
3000    /**
3001     * Thread-local data.
3002     */
3003    private final ThreadLocal<StatisticsData> threadData;
3004
3005    /**
3006     * Set of all thread-local data areas.  Protected by the Statistics lock.
3007     * The references to the statistics data are kept using phantom references
3008     * to the associated threads. Proper clean-up is performed by the cleaner
3009     * thread when the threads are garbage collected.
3010     */
3011    private final Set<StatisticsDataReference> allData;
3012
3013    /**
3014     * Global reference queue and a cleaner thread that manage statistics data
3015     * references from all filesystem instances.
3016     */
3017    private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
3018    private static final Thread STATS_DATA_CLEANER;
3019
3020    static {
3021      STATS_DATA_REF_QUEUE = new ReferenceQueue<Thread>();
3022      // start a single daemon cleaner thread
3023      STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
3024      STATS_DATA_CLEANER.
3025          setName(StatisticsDataReferenceCleaner.class.getName());
3026      STATS_DATA_CLEANER.setDaemon(true);
3027      STATS_DATA_CLEANER.start();
3028    }
3029
3030    public Statistics(String scheme) {
3031      this.scheme = scheme;
3032      this.rootData = new StatisticsData();
3033      this.threadData = new ThreadLocal<StatisticsData>();
3034      this.allData = new HashSet<StatisticsDataReference>();
3035    }
3036
3037    /**
3038     * Copy constructor.
3039     * 
3040     * @param other    The input Statistics object which is cloned.
3041     */
3042    public Statistics(Statistics other) {
3043      this.scheme = other.scheme;
3044      this.rootData = new StatisticsData();
3045      other.visitAll(new StatisticsAggregator<Void>() {
3046        @Override
3047        public void accept(StatisticsData data) {
3048          rootData.add(data);
3049        }
3050
3051        public Void aggregate() {
3052          return null;
3053        }
3054      });
3055      this.threadData = new ThreadLocal<StatisticsData>();
3056      this.allData = new HashSet<StatisticsDataReference>();
3057    }
3058
3059    /**
3060     * A phantom reference to a thread that also includes the data associated
3061     * with that thread. On the thread being garbage collected, it is enqueued
3062     * to the reference queue for clean-up.
3063     */
3064    private class StatisticsDataReference extends PhantomReference<Thread> {
3065      private final StatisticsData data;
3066
3067      public StatisticsDataReference(StatisticsData data, Thread thread) {
3068        super(thread, STATS_DATA_REF_QUEUE);
3069        this.data = data;
3070      }
3071
3072      public StatisticsData getData() {
3073        return data;
3074      }
3075
3076      /**
3077       * Performs clean-up action when the associated thread is garbage
3078       * collected.
3079       */
3080      public void cleanUp() {
3081        // use the statistics lock for safety
3082        synchronized (Statistics.this) {
3083          /*
3084           * If the thread that created this thread-local data no longer exists,
3085           * remove the StatisticsData from our list and fold the values into
3086           * rootData.
3087           */
3088          rootData.add(data);
3089          allData.remove(this);
3090        }
3091      }
3092    }
3093
3094    /**
3095     * Background action to act on references being removed.
3096     */
3097    private static class StatisticsDataReferenceCleaner implements Runnable {
3098      @Override
3099      public void run() {
3100        while (true) {
3101          try {
3102            StatisticsDataReference ref =
3103                (StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
3104            ref.cleanUp();
3105          } catch (Throwable th) {
3106            // the cleaner thread should continue to run even if there are
3107            // exceptions, including InterruptedException
3108            LOG.warn("exception in the cleaner thread but it will continue to "
3109                + "run", th);
3110          }
3111        }
3112      }
3113    }
3114
3115    /**
3116     * Get or create the thread-local data associated with the current thread.
3117     */
3118    public StatisticsData getThreadStatistics() {
3119      StatisticsData data = threadData.get();
3120      if (data == null) {
3121        data = new StatisticsData();
3122        threadData.set(data);
3123        StatisticsDataReference ref =
3124            new StatisticsDataReference(data, Thread.currentThread());
3125        synchronized(this) {
3126          allData.add(ref);
3127        }
3128      }
3129      return data;
3130    }
3131
3132    /**
3133     * Increment the bytes read in the statistics
3134     * @param newBytes the additional bytes read
3135     */
3136    public void incrementBytesRead(long newBytes) {
3137      getThreadStatistics().bytesRead += newBytes;
3138    }
3139    
3140    /**
3141     * Increment the bytes written in the statistics
3142     * @param newBytes the additional bytes written
3143     */
3144    public void incrementBytesWritten(long newBytes) {
3145      getThreadStatistics().bytesWritten += newBytes;
3146    }
3147    
3148    /**
3149     * Increment the number of read operations
3150     * @param count number of read operations
3151     */
3152    public void incrementReadOps(int count) {
3153      getThreadStatistics().readOps += count;
3154    }
3155
3156    /**
3157     * Increment the number of large read operations
3158     * @param count number of large read operations
3159     */
3160    public void incrementLargeReadOps(int count) {
3161      getThreadStatistics().largeReadOps += count;
3162    }
3163
3164    /**
3165     * Increment the number of write operations
3166     * @param count number of write operations
3167     */
3168    public void incrementWriteOps(int count) {
3169      getThreadStatistics().writeOps += count;
3170    }
3171
3172    /**
3173     * Apply the given aggregator to all StatisticsData objects associated with
3174     * this Statistics object.
3175     *
3176     * For each StatisticsData object, we will call accept on the visitor.
3177     * Finally, at the end, we will call aggregate to get the final total. 
3178     *
3179     * @param         The visitor to use.
3180     * @return        The total.
3181     */
3182    private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
3183      visitor.accept(rootData);
3184      for (StatisticsDataReference ref: allData) {
3185        StatisticsData data = ref.getData();
3186        visitor.accept(data);
3187      }
3188      return visitor.aggregate();
3189    }
3190
3191    /**
3192     * Get the total number of bytes read
3193     * @return the number of bytes
3194     */
3195    public long getBytesRead() {
3196      return visitAll(new StatisticsAggregator<Long>() {
3197        private long bytesRead = 0;
3198
3199        @Override
3200        public void accept(StatisticsData data) {
3201          bytesRead += data.bytesRead;
3202        }
3203
3204        public Long aggregate() {
3205          return bytesRead;
3206        }
3207      });
3208    }
3209    
3210    /**
3211     * Get the total number of bytes written
3212     * @return the number of bytes
3213     */
3214    public long getBytesWritten() {
3215      return visitAll(new StatisticsAggregator<Long>() {
3216        private long bytesWritten = 0;
3217
3218        @Override
3219        public void accept(StatisticsData data) {
3220          bytesWritten += data.bytesWritten;
3221        }
3222
3223        public Long aggregate() {
3224          return bytesWritten;
3225        }
3226      });
3227    }
3228    
3229    /**
3230     * Get the number of file system read operations such as list files
3231     * @return number of read operations
3232     */
3233    public int getReadOps() {
3234      return visitAll(new StatisticsAggregator<Integer>() {
3235        private int readOps = 0;
3236
3237        @Override
3238        public void accept(StatisticsData data) {
3239          readOps += data.readOps;
3240          readOps += data.largeReadOps;
3241        }
3242
3243        public Integer aggregate() {
3244          return readOps;
3245        }
3246      });
3247    }
3248
3249    /**
3250     * Get the number of large file system read operations such as list files
3251     * under a large directory
3252     * @return number of large read operations
3253     */
3254    public int getLargeReadOps() {
3255      return visitAll(new StatisticsAggregator<Integer>() {
3256        private int largeReadOps = 0;
3257
3258        @Override
3259        public void accept(StatisticsData data) {
3260          largeReadOps += data.largeReadOps;
3261        }
3262
3263        public Integer aggregate() {
3264          return largeReadOps;
3265        }
3266      });
3267    }
3268
3269    /**
3270     * Get the number of file system write operations such as create, append 
3271     * rename etc.
3272     * @return number of write operations
3273     */
3274    public int getWriteOps() {
3275      return visitAll(new StatisticsAggregator<Integer>() {
3276        private int writeOps = 0;
3277
3278        @Override
3279        public void accept(StatisticsData data) {
3280          writeOps += data.writeOps;
3281        }
3282
3283        public Integer aggregate() {
3284          return writeOps;
3285        }
3286      });
3287    }
3288
3289
3290    @Override
3291    public String toString() {
3292      return visitAll(new StatisticsAggregator<String>() {
3293        private StatisticsData total = new StatisticsData();
3294
3295        @Override
3296        public void accept(StatisticsData data) {
3297          total.add(data);
3298        }
3299
3300        public String aggregate() {
3301          return total.toString();
3302        }
3303      });
3304    }
3305
3306    /**
3307     * Resets all statistics to 0.
3308     *
3309     * In order to reset, we add up all the thread-local statistics data, and
3310     * set rootData to the negative of that.
3311     *
3312     * This may seem like a counterintuitive way to reset the statsitics.  Why
3313     * can't we just zero out all the thread-local data?  Well, thread-local
3314     * data can only be modified by the thread that owns it.  If we tried to
3315     * modify the thread-local data from this thread, our modification might get
3316     * interleaved with a read-modify-write operation done by the thread that
3317     * owns the data.  That would result in our update getting lost.
3318     *
3319     * The approach used here avoids this problem because it only ever reads
3320     * (not writes) the thread-local data.  Both reads and writes to rootData
3321     * are done under the lock, so we're free to modify rootData from any thread
3322     * that holds the lock.
3323     */
3324    public void reset() {
3325      visitAll(new StatisticsAggregator<Void>() {
3326        private StatisticsData total = new StatisticsData();
3327
3328        @Override
3329        public void accept(StatisticsData data) {
3330          total.add(data);
3331        }
3332
3333        public Void aggregate() {
3334          total.negate();
3335          rootData.add(total);
3336          return null;
3337        }
3338      });
3339    }
3340    
3341    /**
3342     * Get the uri scheme associated with this statistics object.
3343     * @return the schema associated with this set of statistics
3344     */
3345    public String getScheme() {
3346      return scheme;
3347    }
3348
3349    @VisibleForTesting
3350    synchronized int getAllThreadLocalDataSize() {
3351      return allData.size();
3352    }
3353  }
3354  
3355  /**
3356   * Get the Map of Statistics object indexed by URI Scheme.
3357   * @return a Map having a key as URI scheme and value as Statistics object
3358   * @deprecated use {@link #getAllStatistics} instead
3359   */
3360  @Deprecated
3361  public static synchronized Map<String, Statistics> getStatistics() {
3362    Map<String, Statistics> result = new HashMap<String, Statistics>();
3363    for(Statistics stat: statisticsTable.values()) {
3364      result.put(stat.getScheme(), stat);
3365    }
3366    return result;
3367  }
3368
3369  /**
3370   * Return the FileSystem classes that have Statistics
3371   */
3372  public static synchronized List<Statistics> getAllStatistics() {
3373    return new ArrayList<Statistics>(statisticsTable.values());
3374  }
3375  
3376  /**
3377   * Get the statistics for a particular file system
3378   * @param cls the class to lookup
3379   * @return a statistics object
3380   */
3381  public static synchronized 
3382  Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
3383    Statistics result = statisticsTable.get(cls);
3384    if (result == null) {
3385      result = new Statistics(scheme);
3386      statisticsTable.put(cls, result);
3387    }
3388    return result;
3389  }
3390  
3391  /**
3392   * Reset all statistics for all file systems
3393   */
3394  public static synchronized void clearStatistics() {
3395    for(Statistics stat: statisticsTable.values()) {
3396      stat.reset();
3397    }
3398  }
3399
3400  /**
3401   * Print all statistics for all file systems
3402   */
3403  public static synchronized
3404  void printStatistics() throws IOException {
3405    for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
3406            statisticsTable.entrySet()) {
3407      System.out.println("  FileSystem " + pair.getKey().getName() + 
3408                         ": " + pair.getValue());
3409    }
3410  }
3411
3412  // Symlinks are temporarily disabled - see HADOOP-10020 and HADOOP-10052
3413  private static boolean symlinksEnabled = false;
3414
3415  private static Configuration conf = null;
3416
3417  @VisibleForTesting
3418  public static boolean areSymlinksEnabled() {
3419    return symlinksEnabled;
3420  }
3421
3422  @VisibleForTesting
3423  public static void enableSymlinks() {
3424    symlinksEnabled = true;
3425  }
3426}