001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.s3;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.URI;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.TimeUnit;
029
030import org.apache.hadoop.classification.InterfaceAudience;
031import org.apache.hadoop.classification.InterfaceStability;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileAlreadyExistsException;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.fs.permission.FsPermission;
040import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
041import org.apache.hadoop.io.retry.RetryPolicies;
042import org.apache.hadoop.io.retry.RetryPolicy;
043import org.apache.hadoop.io.retry.RetryProxy;
044import org.apache.hadoop.util.Progressable;
045
046/**
047 * A block-based {@link FileSystem} backed by
048 * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
049 *
050 * @see NativeS3FileSystem
051 */
052@InterfaceAudience.Public
053@InterfaceStability.Stable
054public class S3FileSystem extends FileSystem {
055
056  private URI uri;
057
058  private FileSystemStore store;
059
060  private Path workingDir;
061
062  public S3FileSystem() {
063    // set store in initialize()
064  }
065  
066  public S3FileSystem(FileSystemStore store) {
067    this.store = store;
068  }
069
070  /**
071   * Return the protocol scheme for the FileSystem.
072   *
073   * @return <code>s3</code>
074   */
075  @Override
076  public String getScheme() {
077    return "s3";
078  }
079
080  @Override
081  public URI getUri() {
082    return uri;
083  }
084
085  @Override
086  public void initialize(URI uri, Configuration conf) throws IOException {
087    super.initialize(uri, conf);
088    if (store == null) {
089      store = createDefaultStore(conf);
090    }
091    store.initialize(uri, conf);
092    setConf(conf);
093    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());    
094    this.workingDir =
095      new Path("/user", System.getProperty("user.name")).makeQualified(this);
096  }  
097
098  private static FileSystemStore createDefaultStore(Configuration conf) {
099    FileSystemStore store = new Jets3tFileSystemStore();
100    
101    RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
102                                                                               conf.getInt("fs.s3.maxRetries", 4),
103                                                                               conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
104    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
105      new HashMap<Class<? extends Exception>, RetryPolicy>();
106    exceptionToPolicyMap.put(IOException.class, basePolicy);
107    exceptionToPolicyMap.put(S3Exception.class, basePolicy);
108    
109    RetryPolicy methodPolicy = RetryPolicies.retryByException(
110                                                              RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
111    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
112    methodNameToPolicyMap.put("storeBlock", methodPolicy);
113    methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
114    
115    return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
116                                               store, methodNameToPolicyMap);
117  }
118
119  @Override
120  public Path getWorkingDirectory() {
121    return workingDir;
122  }
123
124  @Override
125  public void setWorkingDirectory(Path dir) {
126    workingDir = makeAbsolute(dir);
127  }
128
129  private Path makeAbsolute(Path path) {
130    if (path.isAbsolute()) {
131      return path;
132    }
133    return new Path(workingDir, path);
134  }
135
136  /**
137   * @param permission Currently ignored.
138   */
139  @Override
140  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
141    Path absolutePath = makeAbsolute(path);
142    List<Path> paths = new ArrayList<Path>();
143    do {
144      paths.add(0, absolutePath);
145      absolutePath = absolutePath.getParent();
146    } while (absolutePath != null);
147    
148    boolean result = true;
149    for (Path p : paths) {
150      result &= mkdir(p);
151    }
152    return result;
153  }
154  
155  private boolean mkdir(Path path) throws IOException {
156    Path absolutePath = makeAbsolute(path);
157    INode inode = store.retrieveINode(absolutePath);
158    if (inode == null) {
159      store.storeINode(absolutePath, INode.DIRECTORY_INODE);
160    } else if (inode.isFile()) {
161      throw new IOException(String.format(
162          "Can't make directory for path %s since it is a file.",
163          absolutePath));
164    }
165    return true;
166  }
167
168  @Override
169  public boolean isFile(Path path) throws IOException {
170    INode inode = store.retrieveINode(makeAbsolute(path));
171    if (inode == null) {
172      return false;
173    }
174    return inode.isFile();
175  }
176
177  private INode checkFile(Path path) throws IOException {
178    INode inode = store.retrieveINode(makeAbsolute(path));
179    if (inode == null) {
180      throw new IOException("No such file.");
181    }
182    if (inode.isDirectory()) {
183      throw new IOException("Path " + path + " is a directory.");
184    }
185    return inode;
186  }
187
188  @Override
189  public FileStatus[] listStatus(Path f) throws IOException {
190    Path absolutePath = makeAbsolute(f);
191    INode inode = store.retrieveINode(absolutePath);
192    if (inode == null) {
193      throw new FileNotFoundException("File " + f + " does not exist.");
194    }
195    if (inode.isFile()) {
196      return new FileStatus[] {
197        new S3FileStatus(f.makeQualified(this), inode)
198      };
199    }
200    ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
201    for (Path p : store.listSubPaths(absolutePath)) {
202      ret.add(getFileStatus(p.makeQualified(this)));
203    }
204    return ret.toArray(new FileStatus[0]);
205  }
206
207  /** This optional operation is not yet supported. */
208  @Override
209  public FSDataOutputStream append(Path f, int bufferSize,
210      Progressable progress) throws IOException {
211    throw new IOException("Not supported");
212  }
213
214  /**
215   * @param permission Currently ignored.
216   */
217  @Override
218  public FSDataOutputStream create(Path file, FsPermission permission,
219      boolean overwrite, int bufferSize,
220      short replication, long blockSize, Progressable progress)
221    throws IOException {
222
223    INode inode = store.retrieveINode(makeAbsolute(file));
224    if (inode != null) {
225      if (overwrite) {
226        delete(file, true);
227      } else {
228        throw new FileAlreadyExistsException("File already exists: " + file);
229      }
230    } else {
231      Path parent = file.getParent();
232      if (parent != null) {
233        if (!mkdirs(parent)) {
234          throw new IOException("Mkdirs failed to create " + parent.toString());
235        }
236      }      
237    }
238    return new FSDataOutputStream
239        (new S3OutputStream(getConf(), store, makeAbsolute(file),
240                            blockSize, progress, bufferSize),
241         statistics);
242  }
243
244  @Override
245  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
246    INode inode = checkFile(path);
247    return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
248                                                   statistics));
249  }
250
251  @Override
252  public boolean rename(Path src, Path dst) throws IOException {
253    Path absoluteSrc = makeAbsolute(src);
254    final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
255    INode srcINode = store.retrieveINode(absoluteSrc);
256    boolean debugEnabled = LOG.isDebugEnabled();
257    if (srcINode == null) {
258      // src path doesn't exist
259      if (debugEnabled) {
260        LOG.debug(debugPreamble + "returning false as src does not exist");
261      }
262      return false; 
263    }
264
265    Path absoluteDst = makeAbsolute(dst);
266
267    //validate the parent dir of the destination
268    Path dstParent = absoluteDst.getParent();
269    if (dstParent != null) {
270      //if the dst parent is not root, make sure it exists
271      INode dstParentINode = store.retrieveINode(dstParent);
272      if (dstParentINode == null) {
273        // dst parent doesn't exist
274        if (debugEnabled) {
275          LOG.debug(debugPreamble +
276                    "returning false as dst parent does not exist");
277        }
278        return false;
279      }
280      if (dstParentINode.isFile()) {
281        // dst parent exists but is a file
282        if (debugEnabled) {
283          LOG.debug(debugPreamble +
284                    "returning false as dst parent exists and is a file");
285        }
286        return false;
287      }
288    }
289
290    //get status of source
291    boolean srcIsFile = srcINode.isFile();
292
293    INode dstINode = store.retrieveINode(absoluteDst);
294    boolean destExists = dstINode != null;
295    boolean destIsDir = destExists && !dstINode.isFile();
296    if (srcIsFile) {
297
298      //source is a simple file
299      if (destExists) {
300        if (destIsDir) {
301          //outcome #1 dest exists and is dir -filename to subdir of dest
302          if (debugEnabled) {
303            LOG.debug(debugPreamble +
304                      "copying src file under dest dir to " + absoluteDst);
305          }
306          absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
307        } else {
308          //outcome #2 dest it's a file: fail iff different from src
309          boolean renamingOnToSelf = absoluteSrc.equals(absoluteDst);
310          if (debugEnabled) {
311            LOG.debug(debugPreamble +
312                      "copying file onto file, outcome is " + renamingOnToSelf);
313          }
314          return renamingOnToSelf;
315        }
316      } else {
317        // #3 dest does not exist: use dest as path for rename
318        if (debugEnabled) {
319          LOG.debug(debugPreamble +
320                    "copying file onto file");
321        }
322      }
323    } else {
324      //here the source exists and is a directory
325      // outcomes (given we know the parent dir exists if we get this far)
326      // #1 destination is a file: fail
327      // #2 destination is a directory: create a new dir under that one
328      // #3 destination doesn't exist: create a new dir with that name
329      // #3 and #4 are only allowed if the dest path is not == or under src
330
331      if (destExists) {
332        if (!destIsDir) {
333          // #1 destination is a file: fail
334          if (debugEnabled) {
335            LOG.debug(debugPreamble +
336                      "returning false as src is a directory, but not dest");
337          }
338          return false;
339        } else {
340          // the destination dir exists
341          // destination for rename becomes a subdir of the target name
342          absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
343          if (debugEnabled) {
344            LOG.debug(debugPreamble +
345                      "copying src dir under dest dir to " + absoluteDst);
346          }
347        }
348      }
349      //the final destination directory is now know, so validate it for
350      //illegal moves
351
352      if (absoluteSrc.equals(absoluteDst)) {
353        //you can't rename a directory onto itself
354        if (debugEnabled) {
355          LOG.debug(debugPreamble +
356                    "Dest==source && isDir -failing");
357        }
358        return false;
359      }
360      if (absoluteDst.toString().startsWith(absoluteSrc.toString() + "/")) {
361        //you can't move a directory under itself
362        if (debugEnabled) {
363          LOG.debug(debugPreamble +
364                    "dst is equal to or under src dir -failing");
365        }
366        return false;
367      }
368    }
369    //here the dest path is set up -so rename
370    return renameRecursive(absoluteSrc, absoluteDst);
371  }
372
373  private boolean renameRecursive(Path src, Path dst) throws IOException {
374    INode srcINode = store.retrieveINode(src);
375    store.storeINode(dst, srcINode);
376    store.deleteINode(src);
377    if (srcINode.isDirectory()) {
378      for (Path oldSrc : store.listDeepSubPaths(src)) {
379        INode inode = store.retrieveINode(oldSrc);
380        if (inode == null) {
381          return false;
382        }
383        String oldSrcPath = oldSrc.toUri().getPath();
384        String srcPath = src.toUri().getPath();
385        String dstPath = dst.toUri().getPath();
386        Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
387        store.storeINode(newDst, inode);
388        store.deleteINode(oldSrc);
389      }
390    }
391    return true;
392  }
393
394  @Override
395  public boolean delete(Path path, boolean recursive) throws IOException {
396   Path absolutePath = makeAbsolute(path);
397   INode inode = store.retrieveINode(absolutePath);
398   if (inode == null) {
399     return false;
400   }
401   if (inode.isFile()) {
402     store.deleteINode(absolutePath);
403     for (Block block: inode.getBlocks()) {
404       store.deleteBlock(block);
405     }
406   } else {
407     FileStatus[] contents = null; 
408     try {
409       contents = listStatus(absolutePath);
410     } catch(FileNotFoundException fnfe) {
411       return false;
412     }
413
414     if ((contents.length !=0) && (!recursive)) {
415       throw new IOException("Directory " + path.toString() 
416           + " is not empty.");
417     }
418     for (FileStatus p:contents) {
419       if (!delete(p.getPath(), recursive)) {
420         return false;
421       }
422     }
423     store.deleteINode(absolutePath);
424   }
425   return true;
426  }
427  
428  /**
429   * FileStatus for S3 file systems. 
430   */
431  @Override
432  public FileStatus getFileStatus(Path f)  throws IOException {
433    INode inode = store.retrieveINode(makeAbsolute(f));
434    if (inode == null) {
435      throw new FileNotFoundException(f + ": No such file or directory.");
436    }
437    return new S3FileStatus(f.makeQualified(this), inode);
438  }
439  
440  @Override
441  public long getDefaultBlockSize() {
442    return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
443  }
444
445  @Override
446  public String getCanonicalServiceName() {
447    // Does not support Token
448    return null;
449  }
450
451  // diagnostic methods
452
453  void dump() throws IOException {
454    store.dump();
455  }
456
457  void purge() throws IOException {
458    store.purge();
459  }
460
461  private static class S3FileStatus extends FileStatus {
462
463    S3FileStatus(Path f, INode inode) throws IOException {
464      super(findLength(inode), inode.isDirectory(), 1,
465            findBlocksize(inode), 0, f);
466    }
467
468    private static long findLength(INode inode) {
469      if (!inode.isDirectory()) {
470        long length = 0L;
471        for (Block block : inode.getBlocks()) {
472          length += block.getLength();
473        }
474        return length;
475      }
476      return 0;
477    }
478
479    private static long findBlocksize(INode inode) {
480      final Block[] ret = inode.getBlocks();
481      return ret == null ? 0L : ret[0].getLength();
482    }
483  }
484}