001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.s3; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.URI; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.TimeUnit; 029 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileAlreadyExistsException; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.ParentNotDirectoryException; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.fs.permission.FsPermission; 041import org.apache.hadoop.fs.s3native.NativeS3FileSystem; 042import org.apache.hadoop.io.retry.RetryPolicies; 043import org.apache.hadoop.io.retry.RetryPolicy; 044import org.apache.hadoop.io.retry.RetryProxy; 045import org.apache.hadoop.util.Progressable; 046 047/** 048 * A block-based {@link FileSystem} backed by 049 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 050 * 051 * @see NativeS3FileSystem 052 */ 053@InterfaceAudience.Public 054@InterfaceStability.Stable 055public class S3FileSystem extends FileSystem { 056 057 private URI uri; 058 059 private FileSystemStore store; 060 061 private Path workingDir; 062 063 public S3FileSystem() { 064 // set store in initialize() 065 } 066 067 public S3FileSystem(FileSystemStore store) { 068 this.store = store; 069 } 070 071 /** 072 * Return the protocol scheme for the FileSystem. 073 * 074 * @return <code>s3</code> 075 */ 076 @Override 077 public String getScheme() { 078 return "s3"; 079 } 080 081 @Override 082 public URI getUri() { 083 return uri; 084 } 085 086 @Override 087 public void initialize(URI uri, Configuration conf) throws IOException { 088 super.initialize(uri, conf); 089 if (store == null) { 090 store = createDefaultStore(conf); 091 } 092 store.initialize(uri, conf); 093 setConf(conf); 094 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 095 this.workingDir = 096 new Path("/user", System.getProperty("user.name")).makeQualified(this); 097 } 098 099 private static FileSystemStore createDefaultStore(Configuration conf) { 100 FileSystemStore store = new Jets3tFileSystemStore(); 101 102 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 103 conf.getInt("fs.s3.maxRetries", 4), 104 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 105 Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = 106 new HashMap<Class<? extends Exception>, RetryPolicy>(); 107 exceptionToPolicyMap.put(IOException.class, basePolicy); 108 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 109 110 RetryPolicy methodPolicy = RetryPolicies.retryByException( 111 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 112 Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); 113 methodNameToPolicyMap.put("storeBlock", methodPolicy); 114 methodNameToPolicyMap.put("retrieveBlock", methodPolicy); 115 116 return (FileSystemStore) RetryProxy.create(FileSystemStore.class, 117 store, methodNameToPolicyMap); 118 } 119 120 @Override 121 public Path getWorkingDirectory() { 122 return workingDir; 123 } 124 125 @Override 126 public void setWorkingDirectory(Path dir) { 127 workingDir = makeAbsolute(dir); 128 } 129 130 private Path makeAbsolute(Path path) { 131 if (path.isAbsolute()) { 132 return path; 133 } 134 return new Path(workingDir, path); 135 } 136 137 /** 138 * @param permission Currently ignored. 139 */ 140 @Override 141 public boolean mkdirs(Path path, FsPermission permission) throws IOException { 142 Path absolutePath = makeAbsolute(path); 143 List<Path> paths = new ArrayList<Path>(); 144 do { 145 paths.add(0, absolutePath); 146 absolutePath = absolutePath.getParent(); 147 } while (absolutePath != null); 148 149 boolean result = true; 150 for (int i = 0; i < paths.size(); i++) { 151 Path p = paths.get(i); 152 try { 153 result &= mkdir(p); 154 } catch(FileAlreadyExistsException e) { 155 if (i + 1 < paths.size()) { 156 throw new ParentNotDirectoryException(e.getMessage()); 157 } 158 throw e; 159 } 160 } 161 return result; 162 } 163 164 private boolean mkdir(Path path) throws IOException { 165 Path absolutePath = makeAbsolute(path); 166 INode inode = store.retrieveINode(absolutePath); 167 if (inode == null) { 168 store.storeINode(absolutePath, INode.DIRECTORY_INODE); 169 } else if (inode.isFile()) { 170 throw new FileAlreadyExistsException(String.format( 171 "Can't make directory for path %s since it is a file.", 172 absolutePath)); 173 } 174 return true; 175 } 176 177 @Override 178 public boolean isFile(Path path) throws IOException { 179 INode inode = store.retrieveINode(makeAbsolute(path)); 180 if (inode == null) { 181 return false; 182 } 183 return inode.isFile(); 184 } 185 186 private INode checkFile(Path path) throws IOException { 187 INode inode = store.retrieveINode(makeAbsolute(path)); 188 String message = String.format("No such file: '%s'", path.toString()); 189 if (inode == null) { 190 throw new FileNotFoundException(message + " does not exist"); 191 } 192 if (inode.isDirectory()) { 193 throw new FileNotFoundException(message + " is a directory"); 194 } 195 return inode; 196 } 197 198 @Override 199 public FileStatus[] listStatus(Path f) throws IOException { 200 Path absolutePath = makeAbsolute(f); 201 INode inode = store.retrieveINode(absolutePath); 202 if (inode == null) { 203 throw new FileNotFoundException("File " + f + " does not exist."); 204 } 205 if (inode.isFile()) { 206 return new FileStatus[] { 207 new S3FileStatus(f.makeQualified(this), inode) 208 }; 209 } 210 ArrayList<FileStatus> ret = new ArrayList<FileStatus>(); 211 for (Path p : store.listSubPaths(absolutePath)) { 212 ret.add(getFileStatus(p.makeQualified(this))); 213 } 214 return ret.toArray(new FileStatus[0]); 215 } 216 217 /** This optional operation is not yet supported. */ 218 @Override 219 public FSDataOutputStream append(Path f, int bufferSize, 220 Progressable progress) throws IOException { 221 throw new IOException("Not supported"); 222 } 223 224 /** 225 * @param permission Currently ignored. 226 */ 227 @Override 228 public FSDataOutputStream create(Path file, FsPermission permission, 229 boolean overwrite, int bufferSize, 230 short replication, long blockSize, Progressable progress) 231 throws IOException { 232 233 INode inode = store.retrieveINode(makeAbsolute(file)); 234 if (inode != null) { 235 if (overwrite && !inode.isDirectory()) { 236 delete(file, true); 237 } else { 238 String message = String.format("File already exists: '%s'", file); 239 if (inode.isDirectory()) { 240 message = message + " is a directory"; 241 } 242 throw new FileAlreadyExistsException(message); 243 } 244 } else { 245 Path parent = file.getParent(); 246 if (parent != null) { 247 if (!mkdirs(parent)) { 248 throw new IOException("Mkdirs failed to create " + parent.toString()); 249 } 250 } 251 } 252 return new FSDataOutputStream 253 (new S3OutputStream(getConf(), store, makeAbsolute(file), 254 blockSize, progress, bufferSize), 255 statistics); 256 } 257 258 @Override 259 public FSDataInputStream open(Path path, int bufferSize) throws IOException { 260 INode inode = checkFile(path); 261 return new FSDataInputStream(new S3InputStream(getConf(), store, inode, 262 statistics)); 263 } 264 265 @Override 266 public boolean rename(Path src, Path dst) throws IOException { 267 Path absoluteSrc = makeAbsolute(src); 268 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - "; 269 INode srcINode = store.retrieveINode(absoluteSrc); 270 boolean debugEnabled = LOG.isDebugEnabled(); 271 if (srcINode == null) { 272 // src path doesn't exist 273 if (debugEnabled) { 274 LOG.debug(debugPreamble + "returning false as src does not exist"); 275 } 276 return false; 277 } 278 279 Path absoluteDst = makeAbsolute(dst); 280 281 //validate the parent dir of the destination 282 Path dstParent = absoluteDst.getParent(); 283 if (dstParent != null) { 284 //if the dst parent is not root, make sure it exists 285 INode dstParentINode = store.retrieveINode(dstParent); 286 if (dstParentINode == null) { 287 // dst parent doesn't exist 288 if (debugEnabled) { 289 LOG.debug(debugPreamble + 290 "returning false as dst parent does not exist"); 291 } 292 return false; 293 } 294 if (dstParentINode.isFile()) { 295 // dst parent exists but is a file 296 if (debugEnabled) { 297 LOG.debug(debugPreamble + 298 "returning false as dst parent exists and is a file"); 299 } 300 return false; 301 } 302 } 303 304 //get status of source 305 boolean srcIsFile = srcINode.isFile(); 306 307 INode dstINode = store.retrieveINode(absoluteDst); 308 boolean destExists = dstINode != null; 309 boolean destIsDir = destExists && !dstINode.isFile(); 310 if (srcIsFile) { 311 312 //source is a simple file 313 if (destExists) { 314 if (destIsDir) { 315 //outcome #1 dest exists and is dir -filename to subdir of dest 316 if (debugEnabled) { 317 LOG.debug(debugPreamble + 318 "copying src file under dest dir to " + absoluteDst); 319 } 320 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 321 } else { 322 //outcome #2 dest it's a file: fail iff different from src 323 boolean renamingOnToSelf = absoluteSrc.equals(absoluteDst); 324 if (debugEnabled) { 325 LOG.debug(debugPreamble + 326 "copying file onto file, outcome is " + renamingOnToSelf); 327 } 328 return renamingOnToSelf; 329 } 330 } else { 331 // #3 dest does not exist: use dest as path for rename 332 if (debugEnabled) { 333 LOG.debug(debugPreamble + 334 "copying file onto file"); 335 } 336 } 337 } else { 338 //here the source exists and is a directory 339 // outcomes (given we know the parent dir exists if we get this far) 340 // #1 destination is a file: fail 341 // #2 destination is a directory: create a new dir under that one 342 // #3 destination doesn't exist: create a new dir with that name 343 // #3 and #4 are only allowed if the dest path is not == or under src 344 345 if (destExists) { 346 if (!destIsDir) { 347 // #1 destination is a file: fail 348 if (debugEnabled) { 349 LOG.debug(debugPreamble + 350 "returning false as src is a directory, but not dest"); 351 } 352 return false; 353 } else { 354 // the destination dir exists 355 // destination for rename becomes a subdir of the target name 356 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 357 if (debugEnabled) { 358 LOG.debug(debugPreamble + 359 "copying src dir under dest dir to " + absoluteDst); 360 } 361 } 362 } 363 //the final destination directory is now know, so validate it for 364 //illegal moves 365 366 if (absoluteSrc.equals(absoluteDst)) { 367 //you can't rename a directory onto itself 368 if (debugEnabled) { 369 LOG.debug(debugPreamble + 370 "Dest==source && isDir -failing"); 371 } 372 return false; 373 } 374 if (absoluteDst.toString().startsWith(absoluteSrc.toString() + "/")) { 375 //you can't move a directory under itself 376 if (debugEnabled) { 377 LOG.debug(debugPreamble + 378 "dst is equal to or under src dir -failing"); 379 } 380 return false; 381 } 382 } 383 //here the dest path is set up -so rename 384 return renameRecursive(absoluteSrc, absoluteDst); 385 } 386 387 private boolean renameRecursive(Path src, Path dst) throws IOException { 388 INode srcINode = store.retrieveINode(src); 389 store.storeINode(dst, srcINode); 390 store.deleteINode(src); 391 if (srcINode.isDirectory()) { 392 for (Path oldSrc : store.listDeepSubPaths(src)) { 393 INode inode = store.retrieveINode(oldSrc); 394 if (inode == null) { 395 return false; 396 } 397 String oldSrcPath = oldSrc.toUri().getPath(); 398 String srcPath = src.toUri().getPath(); 399 String dstPath = dst.toUri().getPath(); 400 Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath)); 401 store.storeINode(newDst, inode); 402 store.deleteINode(oldSrc); 403 } 404 } 405 return true; 406 } 407 408 @Override 409 public boolean delete(Path path, boolean recursive) throws IOException { 410 Path absolutePath = makeAbsolute(path); 411 INode inode = store.retrieveINode(absolutePath); 412 if (inode == null) { 413 return false; 414 } 415 if (inode.isFile()) { 416 store.deleteINode(absolutePath); 417 for (Block block: inode.getBlocks()) { 418 store.deleteBlock(block); 419 } 420 } else { 421 FileStatus[] contents = null; 422 try { 423 contents = listStatus(absolutePath); 424 } catch(FileNotFoundException fnfe) { 425 return false; 426 } 427 428 if ((contents.length !=0) && (!recursive)) { 429 throw new IOException("Directory " + path.toString() 430 + " is not empty."); 431 } 432 for (FileStatus p:contents) { 433 if (!delete(p.getPath(), recursive)) { 434 return false; 435 } 436 } 437 store.deleteINode(absolutePath); 438 } 439 return true; 440 } 441 442 /** 443 * FileStatus for S3 file systems. 444 */ 445 @Override 446 public FileStatus getFileStatus(Path f) throws IOException { 447 INode inode = store.retrieveINode(makeAbsolute(f)); 448 if (inode == null) { 449 throw new FileNotFoundException(f + ": No such file or directory."); 450 } 451 return new S3FileStatus(f.makeQualified(this), inode); 452 } 453 454 @Override 455 public long getDefaultBlockSize() { 456 return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024); 457 } 458 459 @Override 460 public String getCanonicalServiceName() { 461 // Does not support Token 462 return null; 463 } 464 465 // diagnostic methods 466 467 void dump() throws IOException { 468 store.dump(); 469 } 470 471 void purge() throws IOException { 472 store.purge(); 473 } 474 475 private static class S3FileStatus extends FileStatus { 476 477 S3FileStatus(Path f, INode inode) throws IOException { 478 super(findLength(inode), inode.isDirectory(), 1, 479 findBlocksize(inode), 0, f); 480 } 481 482 private static long findLength(INode inode) { 483 if (!inode.isDirectory()) { 484 long length = 0L; 485 for (Block block : inode.getBlocks()) { 486 length += block.getLength(); 487 } 488 return length; 489 } 490 return 0; 491 } 492 493 private static long findBlocksize(INode inode) { 494 final Block[] ret = inode.getBlocks(); 495 return ret == null ? 0L : ret[0].getLength(); 496 } 497 } 498}