001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.s3; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.URI; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.TimeUnit; 029 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileAlreadyExistsException; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.ParentNotDirectoryException; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.fs.permission.FsPermission; 041import org.apache.hadoop.fs.s3native.NativeS3FileSystem; 042import org.apache.hadoop.fs.s3native.S3xLoginHelper; 043import org.apache.hadoop.io.retry.RetryPolicies; 044import org.apache.hadoop.io.retry.RetryPolicy; 045import org.apache.hadoop.io.retry.RetryProxy; 046import org.apache.hadoop.util.Progressable; 047 048/** 049 * A block-based {@link FileSystem} backed by 050 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 051 * 052 * @see NativeS3FileSystem 053 */ 054@InterfaceAudience.Public 055@InterfaceStability.Stable 056public class S3FileSystem extends FileSystem { 057 058 private URI uri; 059 060 private FileSystemStore store; 061 062 private Path workingDir; 063 064 public S3FileSystem() { 065 // set store in initialize() 066 } 067 068 public S3FileSystem(FileSystemStore store) { 069 this.store = store; 070 } 071 072 /** 073 * Return the protocol scheme for the FileSystem. 074 * 075 * @return <code>s3</code> 076 */ 077 @Override 078 public String getScheme() { 079 return "s3"; 080 } 081 082 @Override 083 public URI getUri() { 084 return uri; 085 } 086 087 @Override 088 public void initialize(URI uri, Configuration conf) throws IOException { 089 super.initialize(uri, conf); 090 if (store == null) { 091 store = createDefaultStore(conf); 092 } 093 store.initialize(uri, conf); 094 setConf(conf); 095 this.uri = S3xLoginHelper.buildFSURI(uri); 096 this.workingDir = 097 new Path("/user", System.getProperty("user.name")).makeQualified(this); 098 } 099 100 private static FileSystemStore createDefaultStore(Configuration conf) { 101 FileSystemStore store = new Jets3tFileSystemStore(); 102 103 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 104 conf.getInt("fs.s3.maxRetries", 4), 105 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 106 Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = 107 new HashMap<Class<? extends Exception>, RetryPolicy>(); 108 exceptionToPolicyMap.put(IOException.class, basePolicy); 109 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 110 111 RetryPolicy methodPolicy = RetryPolicies.retryByException( 112 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 113 Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); 114 methodNameToPolicyMap.put("storeBlock", methodPolicy); 115 methodNameToPolicyMap.put("retrieveBlock", methodPolicy); 116 117 return (FileSystemStore) RetryProxy.create(FileSystemStore.class, 118 store, methodNameToPolicyMap); 119 } 120 121 @Override 122 public Path getWorkingDirectory() { 123 return workingDir; 124 } 125 126 @Override 127 public void setWorkingDirectory(Path dir) { 128 workingDir = makeAbsolute(dir); 129 } 130 131 private Path makeAbsolute(Path path) { 132 if (path.isAbsolute()) { 133 return path; 134 } 135 return new Path(workingDir, path); 136 } 137 138 /** 139 * Check that a Path belongs to this FileSystem. 140 * Unlike the superclass, this version does not look at authority, 141 * only hostnames. 142 * @param path to check 143 * @throws IllegalArgumentException if there is an FS mismatch 144 */ 145 @Override 146 protected void checkPath(Path path) { 147 S3xLoginHelper.checkPath(getConf(), getUri(), path, getDefaultPort()); 148 } 149 150 @Override 151 protected URI canonicalizeUri(URI rawUri) { 152 return S3xLoginHelper.canonicalizeUri(rawUri, getDefaultPort()); 153 } 154 155 /** 156 * @param permission Currently ignored. 157 */ 158 @Override 159 public boolean mkdirs(Path path, FsPermission permission) throws IOException { 160 Path absolutePath = makeAbsolute(path); 161 List<Path> paths = new ArrayList<Path>(); 162 do { 163 paths.add(0, absolutePath); 164 absolutePath = absolutePath.getParent(); 165 } while (absolutePath != null); 166 167 boolean result = true; 168 for (int i = 0; i < paths.size(); i++) { 169 Path p = paths.get(i); 170 try { 171 result &= mkdir(p); 172 } catch(FileAlreadyExistsException e) { 173 if (i + 1 < paths.size()) { 174 throw new ParentNotDirectoryException(e.getMessage()); 175 } 176 throw e; 177 } 178 } 179 return result; 180 } 181 182 private boolean mkdir(Path path) throws IOException { 183 Path absolutePath = makeAbsolute(path); 184 INode inode = store.retrieveINode(absolutePath); 185 if (inode == null) { 186 store.storeINode(absolutePath, INode.DIRECTORY_INODE); 187 } else if (inode.isFile()) { 188 throw new FileAlreadyExistsException(String.format( 189 "Can't make directory for path %s since it is a file.", 190 absolutePath)); 191 } 192 return true; 193 } 194 195 @Override 196 public boolean isFile(Path path) throws IOException { 197 INode inode = store.retrieveINode(makeAbsolute(path)); 198 if (inode == null) { 199 return false; 200 } 201 return inode.isFile(); 202 } 203 204 private INode checkFile(Path path) throws IOException { 205 INode inode = store.retrieveINode(makeAbsolute(path)); 206 String message = String.format("No such file: '%s'", path.toString()); 207 if (inode == null) { 208 throw new FileNotFoundException(message + " does not exist"); 209 } 210 if (inode.isDirectory()) { 211 throw new FileNotFoundException(message + " is a directory"); 212 } 213 return inode; 214 } 215 216 @Override 217 public FileStatus[] listStatus(Path f) throws IOException { 218 Path absolutePath = makeAbsolute(f); 219 INode inode = store.retrieveINode(absolutePath); 220 if (inode == null) { 221 throw new FileNotFoundException("File " + f + " does not exist."); 222 } 223 if (inode.isFile()) { 224 return new FileStatus[] { 225 new S3FileStatus(f.makeQualified(this), inode) 226 }; 227 } 228 ArrayList<FileStatus> ret = new ArrayList<FileStatus>(); 229 for (Path p : store.listSubPaths(absolutePath)) { 230 ret.add(getFileStatus(p.makeQualified(this))); 231 } 232 return ret.toArray(new FileStatus[0]); 233 } 234 235 /** This optional operation is not yet supported. */ 236 @Override 237 public FSDataOutputStream append(Path f, int bufferSize, 238 Progressable progress) throws IOException { 239 throw new IOException("Not supported"); 240 } 241 242 /** 243 * @param permission Currently ignored. 244 */ 245 @Override 246 public FSDataOutputStream create(Path file, FsPermission permission, 247 boolean overwrite, int bufferSize, 248 short replication, long blockSize, Progressable progress) 249 throws IOException { 250 251 INode inode = store.retrieveINode(makeAbsolute(file)); 252 if (inode != null) { 253 if (overwrite && !inode.isDirectory()) { 254 delete(file, true); 255 } else { 256 String message = String.format("File already exists: '%s'", file); 257 if (inode.isDirectory()) { 258 message = message + " is a directory"; 259 } 260 throw new FileAlreadyExistsException(message); 261 } 262 } else { 263 Path parent = file.getParent(); 264 if (parent != null) { 265 if (!mkdirs(parent)) { 266 throw new IOException("Mkdirs failed to create " + parent.toString()); 267 } 268 } 269 } 270 return new FSDataOutputStream 271 (new S3OutputStream(getConf(), store, makeAbsolute(file), 272 blockSize, progress, bufferSize), 273 statistics); 274 } 275 276 @Override 277 public FSDataInputStream open(Path path, int bufferSize) throws IOException { 278 INode inode = checkFile(path); 279 return new FSDataInputStream(new S3InputStream(getConf(), store, inode, 280 statistics)); 281 } 282 283 @Override 284 public boolean rename(Path src, Path dst) throws IOException { 285 Path absoluteSrc = makeAbsolute(src); 286 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - "; 287 INode srcINode = store.retrieveINode(absoluteSrc); 288 boolean debugEnabled = LOG.isDebugEnabled(); 289 if (srcINode == null) { 290 // src path doesn't exist 291 if (debugEnabled) { 292 LOG.debug(debugPreamble + "returning false as src does not exist"); 293 } 294 return false; 295 } 296 297 Path absoluteDst = makeAbsolute(dst); 298 299 //validate the parent dir of the destination 300 Path dstParent = absoluteDst.getParent(); 301 if (dstParent != null) { 302 //if the dst parent is not root, make sure it exists 303 INode dstParentINode = store.retrieveINode(dstParent); 304 if (dstParentINode == null) { 305 // dst parent doesn't exist 306 if (debugEnabled) { 307 LOG.debug(debugPreamble + 308 "returning false as dst parent does not exist"); 309 } 310 return false; 311 } 312 if (dstParentINode.isFile()) { 313 // dst parent exists but is a file 314 if (debugEnabled) { 315 LOG.debug(debugPreamble + 316 "returning false as dst parent exists and is a file"); 317 } 318 return false; 319 } 320 } 321 322 //get status of source 323 boolean srcIsFile = srcINode.isFile(); 324 325 INode dstINode = store.retrieveINode(absoluteDst); 326 boolean destExists = dstINode != null; 327 boolean destIsDir = destExists && !dstINode.isFile(); 328 if (srcIsFile) { 329 330 //source is a simple file 331 if (destExists) { 332 if (destIsDir) { 333 //outcome #1 dest exists and is dir -filename to subdir of dest 334 if (debugEnabled) { 335 LOG.debug(debugPreamble + 336 "copying src file under dest dir to " + absoluteDst); 337 } 338 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 339 } else { 340 //outcome #2 dest it's a file: fail iff different from src 341 boolean renamingOnToSelf = absoluteSrc.equals(absoluteDst); 342 if (debugEnabled) { 343 LOG.debug(debugPreamble + 344 "copying file onto file, outcome is " + renamingOnToSelf); 345 } 346 return renamingOnToSelf; 347 } 348 } else { 349 // #3 dest does not exist: use dest as path for rename 350 if (debugEnabled) { 351 LOG.debug(debugPreamble + 352 "copying file onto file"); 353 } 354 } 355 } else { 356 //here the source exists and is a directory 357 // outcomes (given we know the parent dir exists if we get this far) 358 // #1 destination is a file: fail 359 // #2 destination is a directory: create a new dir under that one 360 // #3 destination doesn't exist: create a new dir with that name 361 // #3 and #4 are only allowed if the dest path is not == or under src 362 363 if (destExists) { 364 if (!destIsDir) { 365 // #1 destination is a file: fail 366 if (debugEnabled) { 367 LOG.debug(debugPreamble + 368 "returning false as src is a directory, but not dest"); 369 } 370 return false; 371 } else { 372 // the destination dir exists 373 // destination for rename becomes a subdir of the target name 374 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 375 if (debugEnabled) { 376 LOG.debug(debugPreamble + 377 "copying src dir under dest dir to " + absoluteDst); 378 } 379 } 380 } 381 //the final destination directory is now know, so validate it for 382 //illegal moves 383 384 if (absoluteSrc.equals(absoluteDst)) { 385 //you can't rename a directory onto itself 386 if (debugEnabled) { 387 LOG.debug(debugPreamble + 388 "Dest==source && isDir -failing"); 389 } 390 return false; 391 } 392 if (absoluteDst.toString().startsWith(absoluteSrc.toString() + "/")) { 393 //you can't move a directory under itself 394 if (debugEnabled) { 395 LOG.debug(debugPreamble + 396 "dst is equal to or under src dir -failing"); 397 } 398 return false; 399 } 400 } 401 //here the dest path is set up -so rename 402 return renameRecursive(absoluteSrc, absoluteDst); 403 } 404 405 private boolean renameRecursive(Path src, Path dst) throws IOException { 406 INode srcINode = store.retrieveINode(src); 407 store.storeINode(dst, srcINode); 408 store.deleteINode(src); 409 if (srcINode.isDirectory()) { 410 for (Path oldSrc : store.listDeepSubPaths(src)) { 411 INode inode = store.retrieveINode(oldSrc); 412 if (inode == null) { 413 return false; 414 } 415 String oldSrcPath = oldSrc.toUri().getPath(); 416 String srcPath = src.toUri().getPath(); 417 String dstPath = dst.toUri().getPath(); 418 Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath)); 419 store.storeINode(newDst, inode); 420 store.deleteINode(oldSrc); 421 } 422 } 423 return true; 424 } 425 426 @Override 427 public boolean delete(Path path, boolean recursive) throws IOException { 428 Path absolutePath = makeAbsolute(path); 429 INode inode = store.retrieveINode(absolutePath); 430 if (inode == null) { 431 return false; 432 } 433 if (inode.isFile()) { 434 store.deleteINode(absolutePath); 435 for (Block block: inode.getBlocks()) { 436 store.deleteBlock(block); 437 } 438 } else { 439 FileStatus[] contents = null; 440 try { 441 contents = listStatus(absolutePath); 442 } catch(FileNotFoundException fnfe) { 443 return false; 444 } 445 446 if ((contents.length !=0) && (!recursive)) { 447 throw new IOException("Directory " + path.toString() 448 + " is not empty."); 449 } 450 for (FileStatus p:contents) { 451 if (!delete(p.getPath(), recursive)) { 452 return false; 453 } 454 } 455 store.deleteINode(absolutePath); 456 } 457 return true; 458 } 459 460 /** 461 * FileStatus for S3 file systems. 462 */ 463 @Override 464 public FileStatus getFileStatus(Path f) throws IOException { 465 INode inode = store.retrieveINode(makeAbsolute(f)); 466 if (inode == null) { 467 throw new FileNotFoundException(f + ": No such file or directory."); 468 } 469 return new S3FileStatus(f.makeQualified(this), inode); 470 } 471 472 @Override 473 public long getDefaultBlockSize() { 474 return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024); 475 } 476 477 @Override 478 public String getCanonicalServiceName() { 479 // Does not support Token 480 return null; 481 } 482 483 // diagnostic methods 484 485 void dump() throws IOException { 486 store.dump(); 487 } 488 489 void purge() throws IOException { 490 store.purge(); 491 } 492 493 private static class S3FileStatus extends FileStatus { 494 495 S3FileStatus(Path f, INode inode) throws IOException { 496 super(findLength(inode), inode.isDirectory(), 1, 497 findBlocksize(inode), 0, f); 498 } 499 500 private static long findLength(INode inode) { 501 if (!inode.isDirectory()) { 502 long length = 0L; 503 for (Block block : inode.getBlocks()) { 504 length += block.getLength(); 505 } 506 return length; 507 } 508 return 0; 509 } 510 511 private static long findBlocksize(INode inode) { 512 final Block[] ret = inode.getBlocks(); 513 return ret == null ? 0L : ret[0].getLength(); 514 } 515 } 516}