001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.s3;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.URI;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.TimeUnit;
029
030import org.apache.hadoop.classification.InterfaceAudience;
031import org.apache.hadoop.classification.InterfaceStability;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileAlreadyExistsException;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.ParentNotDirectoryException;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.fs.permission.FsPermission;
041import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
042import org.apache.hadoop.fs.s3native.S3xLoginHelper;
043import org.apache.hadoop.io.retry.RetryPolicies;
044import org.apache.hadoop.io.retry.RetryPolicy;
045import org.apache.hadoop.io.retry.RetryProxy;
046import org.apache.hadoop.util.Progressable;
047
048/**
049 * A block-based {@link FileSystem} backed by
050 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
051 *
052 * @see NativeS3FileSystem
053 */
054@InterfaceAudience.Public
055@InterfaceStability.Stable
056public class S3FileSystem extends FileSystem {
057
058  private URI uri;
059
060  private FileSystemStore store;
061
062  private Path workingDir;
063
064  public S3FileSystem() {
065    // set store in initialize()
066  }
067
068  public S3FileSystem(FileSystemStore store) {
069    this.store = store;
070  }
071
072  /**
073   * Return the protocol scheme for the FileSystem.
074   *
075   * @return <code>s3</code>
076   */
077  @Override
078  public String getScheme() {
079    return "s3";
080  }
081
082  @Override
083  public URI getUri() {
084    return uri;
085  }
086
087  @Override
088  public void initialize(URI uri, Configuration conf) throws IOException {
089    super.initialize(uri, conf);
090    if (store == null) {
091      store = createDefaultStore(conf);
092    }
093    store.initialize(uri, conf);
094    setConf(conf);
095    this.uri = S3xLoginHelper.buildFSURI(uri);
096    this.workingDir =
097      new Path("/user", System.getProperty("user.name")).makeQualified(this);
098  }
099
100  private static FileSystemStore createDefaultStore(Configuration conf) {
101    FileSystemStore store = new Jets3tFileSystemStore();
102
103    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
104                                                                               conf.getInt("fs.s3.maxRetries", 4),
105                                                                               conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
106    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
107      new HashMap<Class<? extends Exception>, RetryPolicy>();
108    exceptionToPolicyMap.put(IOException.class, basePolicy);
109    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
110
111    RetryPolicy methodPolicy = RetryPolicies.retryByException(
112                                                              RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
113    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
114    methodNameToPolicyMap.put("storeBlock", methodPolicy);
115    methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
116
117    return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
118                                               store, methodNameToPolicyMap);
119  }
120
121  @Override
122  public Path getWorkingDirectory() {
123    return workingDir;
124  }
125
126  @Override
127  public void setWorkingDirectory(Path dir) {
128    workingDir = makeAbsolute(dir);
129  }
130
131  private Path makeAbsolute(Path path) {
132    if (path.isAbsolute()) {
133      return path;
134    }
135    return new Path(workingDir, path);
136  }
137
138  /**
139   * Check that a Path belongs to this FileSystem.
140   * Unlike the superclass, this version does not look at authority,
141   * only hostnames.
142   * @param path to check
143   * @throws IllegalArgumentException if there is an FS mismatch
144   */
145  @Override
146  protected void checkPath(Path path) {
147    S3xLoginHelper.checkPath(getConf(), getUri(), path, getDefaultPort());
148  }
149
150  @Override
151  protected URI canonicalizeUri(URI rawUri) {
152    return S3xLoginHelper.canonicalizeUri(rawUri, getDefaultPort());
153  }
154
155  /**
156   * @param permission Currently ignored.
157   */
158  @Override
159  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
160    Path absolutePath = makeAbsolute(path);
161    List<Path> paths = new ArrayList<Path>();
162    do {
163      paths.add(0, absolutePath);
164      absolutePath = absolutePath.getParent();
165    } while (absolutePath != null);
166
167    boolean result = true;
168    for (int i = 0; i < paths.size(); i++) {
169      Path p = paths.get(i);
170      try {
171        result &= mkdir(p);
172      } catch(FileAlreadyExistsException e) {
173        if (i + 1 < paths.size()) {
174          throw new ParentNotDirectoryException(e.getMessage());
175        }
176        throw e;
177      }
178    }
179    return result;
180  }
181
182  private boolean mkdir(Path path) throws IOException {
183    Path absolutePath = makeAbsolute(path);
184    INode inode = store.retrieveINode(absolutePath);
185    if (inode == null) {
186      store.storeINode(absolutePath, INode.DIRECTORY_INODE);
187    } else if (inode.isFile()) {
188      throw new FileAlreadyExistsException(String.format(
189          "Can't make directory for path %s since it is a file.",
190          absolutePath));
191    }
192    return true;
193  }
194
195  @Override
196  public boolean isFile(Path path) throws IOException {
197    INode inode = store.retrieveINode(makeAbsolute(path));
198    if (inode == null) {
199      return false;
200    }
201    return inode.isFile();
202  }
203
204  private INode checkFile(Path path) throws IOException {
205    INode inode = store.retrieveINode(makeAbsolute(path));
206    String message = String.format("No such file: '%s'", path.toString());
207    if (inode == null) {
208      throw new FileNotFoundException(message + " does not exist");
209    }
210    if (inode.isDirectory()) {
211      throw new FileNotFoundException(message + " is a directory");
212    }
213    return inode;
214  }
215
216  @Override
217  public FileStatus[] listStatus(Path f) throws IOException {
218    Path absolutePath = makeAbsolute(f);
219    INode inode = store.retrieveINode(absolutePath);
220    if (inode == null) {
221      throw new FileNotFoundException("File " + f + " does not exist.");
222    }
223    if (inode.isFile()) {
224      return new FileStatus[] {
225        new S3FileStatus(f.makeQualified(this), inode)
226      };
227    }
228    ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
229    for (Path p : store.listSubPaths(absolutePath)) {
230      ret.add(getFileStatus(p.makeQualified(this)));
231    }
232    return ret.toArray(new FileStatus[0]);
233  }
234
235  /** This optional operation is not yet supported. */
236  @Override
237  public FSDataOutputStream append(Path f, int bufferSize,
238      Progressable progress) throws IOException {
239    throw new IOException("Not supported");
240  }
241
242  /**
243   * @param permission Currently ignored.
244   */
245  @Override
246  public FSDataOutputStream create(Path file, FsPermission permission,
247      boolean overwrite, int bufferSize,
248      short replication, long blockSize, Progressable progress)
249    throws IOException {
250
251    INode inode = store.retrieveINode(makeAbsolute(file));
252    if (inode != null) {
253      if (overwrite && !inode.isDirectory()) {
254        delete(file, true);
255      } else {
256        String message = String.format("File already exists: '%s'", file);
257        if (inode.isDirectory()) {
258          message = message + " is a directory";
259        }
260        throw new FileAlreadyExistsException(message);
261      }
262    } else {
263      Path parent = file.getParent();
264      if (parent != null) {
265        if (!mkdirs(parent)) {
266          throw new IOException("Mkdirs failed to create " + parent.toString());
267        }
268      }
269    }
270    return new FSDataOutputStream
271        (new S3OutputStream(getConf(), store, makeAbsolute(file),
272                            blockSize, progress, bufferSize),
273         statistics);
274  }
275
276  @Override
277  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
278    INode inode = checkFile(path);
279    return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
280                                                   statistics));
281  }
282
283  @Override
284  public boolean rename(Path src, Path dst) throws IOException {
285    Path absoluteSrc = makeAbsolute(src);
286    final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
287    INode srcINode = store.retrieveINode(absoluteSrc);
288    boolean debugEnabled = LOG.isDebugEnabled();
289    if (srcINode == null) {
290      // src path doesn't exist
291      if (debugEnabled) {
292        LOG.debug(debugPreamble + "returning false as src does not exist");
293      }
294      return false;
295    }
296
297    Path absoluteDst = makeAbsolute(dst);
298
299    //validate the parent dir of the destination
300    Path dstParent = absoluteDst.getParent();
301    if (dstParent != null) {
302      //if the dst parent is not root, make sure it exists
303      INode dstParentINode = store.retrieveINode(dstParent);
304      if (dstParentINode == null) {
305        // dst parent doesn't exist
306        if (debugEnabled) {
307          LOG.debug(debugPreamble +
308                    "returning false as dst parent does not exist");
309        }
310        return false;
311      }
312      if (dstParentINode.isFile()) {
313        // dst parent exists but is a file
314        if (debugEnabled) {
315          LOG.debug(debugPreamble +
316                    "returning false as dst parent exists and is a file");
317        }
318        return false;
319      }
320    }
321
322    //get status of source
323    boolean srcIsFile = srcINode.isFile();
324
325    INode dstINode = store.retrieveINode(absoluteDst);
326    boolean destExists = dstINode != null;
327    boolean destIsDir = destExists && !dstINode.isFile();
328    if (srcIsFile) {
329
330      //source is a simple file
331      if (destExists) {
332        if (destIsDir) {
333          //outcome #1 dest exists and is dir -filename to subdir of dest
334          if (debugEnabled) {
335            LOG.debug(debugPreamble +
336                      "copying src file under dest dir to " + absoluteDst);
337          }
338          absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
339        } else {
340          //outcome #2 dest it's a file: fail iff different from src
341          boolean renamingOnToSelf = absoluteSrc.equals(absoluteDst);
342          if (debugEnabled) {
343            LOG.debug(debugPreamble +
344                      "copying file onto file, outcome is " + renamingOnToSelf);
345          }
346          return renamingOnToSelf;
347        }
348      } else {
349        // #3 dest does not exist: use dest as path for rename
350        if (debugEnabled) {
351          LOG.debug(debugPreamble +
352                    "copying file onto file");
353        }
354      }
355    } else {
356      //here the source exists and is a directory
357      // outcomes (given we know the parent dir exists if we get this far)
358      // #1 destination is a file: fail
359      // #2 destination is a directory: create a new dir under that one
360      // #3 destination doesn't exist: create a new dir with that name
361      // #3 and #4 are only allowed if the dest path is not == or under src
362
363      if (destExists) {
364        if (!destIsDir) {
365          // #1 destination is a file: fail
366          if (debugEnabled) {
367            LOG.debug(debugPreamble +
368                      "returning false as src is a directory, but not dest");
369          }
370          return false;
371        } else {
372          // the destination dir exists
373          // destination for rename becomes a subdir of the target name
374          absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
375          if (debugEnabled) {
376            LOG.debug(debugPreamble +
377                      "copying src dir under dest dir to " + absoluteDst);
378          }
379        }
380      }
381      //the final destination directory is now know, so validate it for
382      //illegal moves
383
384      if (absoluteSrc.equals(absoluteDst)) {
385        //you can't rename a directory onto itself
386        if (debugEnabled) {
387          LOG.debug(debugPreamble +
388                    "Dest==source && isDir -failing");
389        }
390        return false;
391      }
392      if (absoluteDst.toString().startsWith(absoluteSrc.toString() + "/")) {
393        //you can't move a directory under itself
394        if (debugEnabled) {
395          LOG.debug(debugPreamble +
396                    "dst is equal to or under src dir -failing");
397        }
398        return false;
399      }
400    }
401    //here the dest path is set up -so rename
402    return renameRecursive(absoluteSrc, absoluteDst);
403  }
404
405  private boolean renameRecursive(Path src, Path dst) throws IOException {
406    INode srcINode = store.retrieveINode(src);
407    store.storeINode(dst, srcINode);
408    store.deleteINode(src);
409    if (srcINode.isDirectory()) {
410      for (Path oldSrc : store.listDeepSubPaths(src)) {
411        INode inode = store.retrieveINode(oldSrc);
412        if (inode == null) {
413          return false;
414        }
415        String oldSrcPath = oldSrc.toUri().getPath();
416        String srcPath = src.toUri().getPath();
417        String dstPath = dst.toUri().getPath();
418        Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
419        store.storeINode(newDst, inode);
420        store.deleteINode(oldSrc);
421      }
422    }
423    return true;
424  }
425
426  @Override
427  public boolean delete(Path path, boolean recursive) throws IOException {
428   Path absolutePath = makeAbsolute(path);
429   INode inode = store.retrieveINode(absolutePath);
430   if (inode == null) {
431     return false;
432   }
433   if (inode.isFile()) {
434     store.deleteINode(absolutePath);
435     for (Block block: inode.getBlocks()) {
436       store.deleteBlock(block);
437     }
438   } else {
439     FileStatus[] contents = null;
440     try {
441       contents = listStatus(absolutePath);
442     } catch(FileNotFoundException fnfe) {
443       return false;
444     }
445
446     if ((contents.length !=0) && (!recursive)) {
447       throw new IOException("Directory " + path.toString()
448           + " is not empty.");
449     }
450     for (FileStatus p:contents) {
451       if (!delete(p.getPath(), recursive)) {
452         return false;
453       }
454     }
455     store.deleteINode(absolutePath);
456   }
457   return true;
458  }
459
460  /**
461   * FileStatus for S3 file systems.
462   */
463  @Override
464  public FileStatus getFileStatus(Path f)  throws IOException {
465    INode inode = store.retrieveINode(makeAbsolute(f));
466    if (inode == null) {
467      throw new FileNotFoundException(f + ": No such file or directory.");
468    }
469    return new S3FileStatus(f.makeQualified(this), inode);
470  }
471
472  @Override
473  public long getDefaultBlockSize() {
474    return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
475  }
476
477  @Override
478  public String getCanonicalServiceName() {
479    // Does not support Token
480    return null;
481  }
482
483  // diagnostic methods
484
485  void dump() throws IOException {
486    store.dump();
487  }
488
489  void purge() throws IOException {
490    store.purge();
491  }
492
493  private static class S3FileStatus extends FileStatus {
494
495    S3FileStatus(Path f, INode inode) throws IOException {
496      super(findLength(inode), inode.isDirectory(), 1,
497            findBlocksize(inode), 0, f);
498    }
499
500    private static long findLength(INode inode) {
501      if (!inode.isDirectory()) {
502        long length = 0L;
503        for (Block block : inode.getBlocks()) {
504          length += block.getLength();
505        }
506        return length;
507      }
508      return 0;
509    }
510
511    private static long findBlocksize(INode inode) {
512      final Block[] ret = inode.getBlocks();
513      return ret == null ? 0L : ret[0].getLength();
514    }
515  }
516}