001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.s3;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.URI;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.TimeUnit;
029
030import org.apache.hadoop.classification.InterfaceAudience;
031import org.apache.hadoop.classification.InterfaceStability;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileAlreadyExistsException;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.ParentNotDirectoryException;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.fs.permission.FsPermission;
041import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
042import org.apache.hadoop.io.retry.RetryPolicies;
043import org.apache.hadoop.io.retry.RetryPolicy;
044import org.apache.hadoop.io.retry.RetryProxy;
045import org.apache.hadoop.util.Progressable;
046
047/**
048 * A block-based {@link FileSystem} backed by
049 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
050 *
051 * @see NativeS3FileSystem
052 */
053@InterfaceAudience.Public
054@InterfaceStability.Stable
055public class S3FileSystem extends FileSystem {
056
057  private URI uri;
058
059  private FileSystemStore store;
060
061  private Path workingDir;
062
063  public S3FileSystem() {
064    // set store in initialize()
065  }
066
067  public S3FileSystem(FileSystemStore store) {
068    this.store = store;
069  }
070
071  /**
072   * Return the protocol scheme for the FileSystem.
073   *
074   * @return <code>s3</code>
075   */
076  @Override
077  public String getScheme() {
078    return "s3";
079  }
080
081  @Override
082  public URI getUri() {
083    return uri;
084  }
085
086  @Override
087  public void initialize(URI uri, Configuration conf) throws IOException {
088    super.initialize(uri, conf);
089    if (store == null) {
090      store = createDefaultStore(conf);
091    }
092    store.initialize(uri, conf);
093    setConf(conf);
094    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
095    this.workingDir =
096      new Path("/user", System.getProperty("user.name")).makeQualified(this);
097  }
098
099  private static FileSystemStore createDefaultStore(Configuration conf) {
100    FileSystemStore store = new Jets3tFileSystemStore();
101
102    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
103                                                                               conf.getInt("fs.s3.maxRetries", 4),
104                                                                               conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
105    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
106      new HashMap<Class<? extends Exception>, RetryPolicy>();
107    exceptionToPolicyMap.put(IOException.class, basePolicy);
108    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
109
110    RetryPolicy methodPolicy = RetryPolicies.retryByException(
111                                                              RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
112    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
113    methodNameToPolicyMap.put("storeBlock", methodPolicy);
114    methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
115
116    return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
117                                               store, methodNameToPolicyMap);
118  }
119
120  @Override
121  public Path getWorkingDirectory() {
122    return workingDir;
123  }
124
125  @Override
126  public void setWorkingDirectory(Path dir) {
127    workingDir = makeAbsolute(dir);
128  }
129
130  private Path makeAbsolute(Path path) {
131    if (path.isAbsolute()) {
132      return path;
133    }
134    return new Path(workingDir, path);
135  }
136
137  /**
138   * @param permission Currently ignored.
139   */
140  @Override
141  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
142    Path absolutePath = makeAbsolute(path);
143    List<Path> paths = new ArrayList<Path>();
144    do {
145      paths.add(0, absolutePath);
146      absolutePath = absolutePath.getParent();
147    } while (absolutePath != null);
148
149    boolean result = true;
150    for (int i = 0; i < paths.size(); i++) {
151      Path p = paths.get(i);
152      try {
153        result &= mkdir(p);
154      } catch(FileAlreadyExistsException e) {
155        if (i + 1 < paths.size()) {
156          throw new ParentNotDirectoryException(e.getMessage());
157        }
158        throw e;
159      }
160    }
161    return result;
162  }
163
164  private boolean mkdir(Path path) throws IOException {
165    Path absolutePath = makeAbsolute(path);
166    INode inode = store.retrieveINode(absolutePath);
167    if (inode == null) {
168      store.storeINode(absolutePath, INode.DIRECTORY_INODE);
169    } else if (inode.isFile()) {
170      throw new FileAlreadyExistsException(String.format(
171          "Can't make directory for path %s since it is a file.",
172          absolutePath));
173    }
174    return true;
175  }
176
177  @Override
178  public boolean isFile(Path path) throws IOException {
179    INode inode = store.retrieveINode(makeAbsolute(path));
180    if (inode == null) {
181      return false;
182    }
183    return inode.isFile();
184  }
185
186  private INode checkFile(Path path) throws IOException {
187    INode inode = store.retrieveINode(makeAbsolute(path));
188    String message = String.format("No such file: '%s'", path.toString());
189    if (inode == null) {
190      throw new FileNotFoundException(message + " does not exist");
191    }
192    if (inode.isDirectory()) {
193      throw new FileNotFoundException(message + " is a directory");
194    }
195    return inode;
196  }
197
198  @Override
199  public FileStatus[] listStatus(Path f) throws IOException {
200    Path absolutePath = makeAbsolute(f);
201    INode inode = store.retrieveINode(absolutePath);
202    if (inode == null) {
203      throw new FileNotFoundException("File " + f + " does not exist.");
204    }
205    if (inode.isFile()) {
206      return new FileStatus[] {
207        new S3FileStatus(f.makeQualified(this), inode)
208      };
209    }
210    ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
211    for (Path p : store.listSubPaths(absolutePath)) {
212      ret.add(getFileStatus(p.makeQualified(this)));
213    }
214    return ret.toArray(new FileStatus[0]);
215  }
216
217  /** This optional operation is not yet supported. */
218  @Override
219  public FSDataOutputStream append(Path f, int bufferSize,
220      Progressable progress) throws IOException {
221    throw new IOException("Not supported");
222  }
223
224  /**
225   * @param permission Currently ignored.
226   */
227  @Override
228  public FSDataOutputStream create(Path file, FsPermission permission,
229      boolean overwrite, int bufferSize,
230      short replication, long blockSize, Progressable progress)
231    throws IOException {
232
233    INode inode = store.retrieveINode(makeAbsolute(file));
234    if (inode != null) {
235      if (overwrite && !inode.isDirectory()) {
236        delete(file, true);
237      } else {
238        String message = String.format("File already exists: '%s'", file);
239        if (inode.isDirectory()) {
240          message = message + " is a directory";
241        }
242        throw new FileAlreadyExistsException(message);
243      }
244    } else {
245      Path parent = file.getParent();
246      if (parent != null) {
247        if (!mkdirs(parent)) {
248          throw new IOException("Mkdirs failed to create " + parent.toString());
249        }
250      }
251    }
252    return new FSDataOutputStream
253        (new S3OutputStream(getConf(), store, makeAbsolute(file),
254                            blockSize, progress, bufferSize),
255         statistics);
256  }
257
258  @Override
259  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
260    INode inode = checkFile(path);
261    return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
262                                                   statistics));
263  }
264
265  @Override
266  public boolean rename(Path src, Path dst) throws IOException {
267    Path absoluteSrc = makeAbsolute(src);
268    final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
269    INode srcINode = store.retrieveINode(absoluteSrc);
270    boolean debugEnabled = LOG.isDebugEnabled();
271    if (srcINode == null) {
272      // src path doesn't exist
273      if (debugEnabled) {
274        LOG.debug(debugPreamble + "returning false as src does not exist");
275      }
276      return false;
277    }
278
279    Path absoluteDst = makeAbsolute(dst);
280
281    //validate the parent dir of the destination
282    Path dstParent = absoluteDst.getParent();
283    if (dstParent != null) {
284      //if the dst parent is not root, make sure it exists
285      INode dstParentINode = store.retrieveINode(dstParent);
286      if (dstParentINode == null) {
287        // dst parent doesn't exist
288        if (debugEnabled) {
289          LOG.debug(debugPreamble +
290                    "returning false as dst parent does not exist");
291        }
292        return false;
293      }
294      if (dstParentINode.isFile()) {
295        // dst parent exists but is a file
296        if (debugEnabled) {
297          LOG.debug(debugPreamble +
298                    "returning false as dst parent exists and is a file");
299        }
300        return false;
301      }
302    }
303
304    //get status of source
305    boolean srcIsFile = srcINode.isFile();
306
307    INode dstINode = store.retrieveINode(absoluteDst);
308    boolean destExists = dstINode != null;
309    boolean destIsDir = destExists && !dstINode.isFile();
310    if (srcIsFile) {
311
312      //source is a simple file
313      if (destExists) {
314        if (destIsDir) {
315          //outcome #1 dest exists and is dir -filename to subdir of dest
316          if (debugEnabled) {
317            LOG.debug(debugPreamble +
318                      "copying src file under dest dir to " + absoluteDst);
319          }
320          absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
321        } else {
322          //outcome #2 dest it's a file: fail iff different from src
323          boolean renamingOnToSelf = absoluteSrc.equals(absoluteDst);
324          if (debugEnabled) {
325            LOG.debug(debugPreamble +
326                      "copying file onto file, outcome is " + renamingOnToSelf);
327          }
328          return renamingOnToSelf;
329        }
330      } else {
331        // #3 dest does not exist: use dest as path for rename
332        if (debugEnabled) {
333          LOG.debug(debugPreamble +
334                    "copying file onto file");
335        }
336      }
337    } else {
338      //here the source exists and is a directory
339      // outcomes (given we know the parent dir exists if we get this far)
340      // #1 destination is a file: fail
341      // #2 destination is a directory: create a new dir under that one
342      // #3 destination doesn't exist: create a new dir with that name
343      // #3 and #4 are only allowed if the dest path is not == or under src
344
345      if (destExists) {
346        if (!destIsDir) {
347          // #1 destination is a file: fail
348          if (debugEnabled) {
349            LOG.debug(debugPreamble +
350                      "returning false as src is a directory, but not dest");
351          }
352          return false;
353        } else {
354          // the destination dir exists
355          // destination for rename becomes a subdir of the target name
356          absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
357          if (debugEnabled) {
358            LOG.debug(debugPreamble +
359                      "copying src dir under dest dir to " + absoluteDst);
360          }
361        }
362      }
363      //the final destination directory is now know, so validate it for
364      //illegal moves
365
366      if (absoluteSrc.equals(absoluteDst)) {
367        //you can't rename a directory onto itself
368        if (debugEnabled) {
369          LOG.debug(debugPreamble +
370                    "Dest==source && isDir -failing");
371        }
372        return false;
373      }
374      if (absoluteDst.toString().startsWith(absoluteSrc.toString() + "/")) {
375        //you can't move a directory under itself
376        if (debugEnabled) {
377          LOG.debug(debugPreamble +
378                    "dst is equal to or under src dir -failing");
379        }
380        return false;
381      }
382    }
383    //here the dest path is set up -so rename
384    return renameRecursive(absoluteSrc, absoluteDst);
385  }
386
387  private boolean renameRecursive(Path src, Path dst) throws IOException {
388    INode srcINode = store.retrieveINode(src);
389    store.storeINode(dst, srcINode);
390    store.deleteINode(src);
391    if (srcINode.isDirectory()) {
392      for (Path oldSrc : store.listDeepSubPaths(src)) {
393        INode inode = store.retrieveINode(oldSrc);
394        if (inode == null) {
395          return false;
396        }
397        String oldSrcPath = oldSrc.toUri().getPath();
398        String srcPath = src.toUri().getPath();
399        String dstPath = dst.toUri().getPath();
400        Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
401        store.storeINode(newDst, inode);
402        store.deleteINode(oldSrc);
403      }
404    }
405    return true;
406  }
407
408  @Override
409  public boolean delete(Path path, boolean recursive) throws IOException {
410   Path absolutePath = makeAbsolute(path);
411   INode inode = store.retrieveINode(absolutePath);
412   if (inode == null) {
413     return false;
414   }
415   if (inode.isFile()) {
416     store.deleteINode(absolutePath);
417     for (Block block: inode.getBlocks()) {
418       store.deleteBlock(block);
419     }
420   } else {
421     FileStatus[] contents = null;
422     try {
423       contents = listStatus(absolutePath);
424     } catch(FileNotFoundException fnfe) {
425       return false;
426     }
427
428     if ((contents.length !=0) && (!recursive)) {
429       throw new IOException("Directory " + path.toString()
430           + " is not empty.");
431     }
432     for (FileStatus p:contents) {
433       if (!delete(p.getPath(), recursive)) {
434         return false;
435       }
436     }
437     store.deleteINode(absolutePath);
438   }
439   return true;
440  }
441
442  /**
443   * FileStatus for S3 file systems.
444   */
445  @Override
446  public FileStatus getFileStatus(Path f)  throws IOException {
447    INode inode = store.retrieveINode(makeAbsolute(f));
448    if (inode == null) {
449      throw new FileNotFoundException(f + ": No such file or directory.");
450    }
451    return new S3FileStatus(f.makeQualified(this), inode);
452  }
453
454  @Override
455  public long getDefaultBlockSize() {
456    return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
457  }
458
459  @Override
460  public String getCanonicalServiceName() {
461    // Does not support Token
462    return null;
463  }
464
465  // diagnostic methods
466
467  void dump() throws IOException {
468    store.dump();
469  }
470
471  void purge() throws IOException {
472    store.purge();
473  }
474
475  private static class S3FileStatus extends FileStatus {
476
477    S3FileStatus(Path f, INode inode) throws IOException {
478      super(findLength(inode), inode.isDirectory(), 1,
479            findBlocksize(inode), 0, f);
480    }
481
482    private static long findLength(INode inode) {
483      if (!inode.isDirectory()) {
484        long length = 0L;
485        for (Block block : inode.getBlocks()) {
486          length += block.getLength();
487        }
488        return length;
489      }
490      return 0;
491    }
492
493    private static long findBlocksize(INode inode) {
494      final Block[] ret = inode.getBlocks();
495      return ret == null ? 0L : ret[0].getLength();
496    }
497  }
498}