001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.s3; 020 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.net.URI; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.concurrent.TimeUnit; 029 030import org.apache.hadoop.classification.InterfaceAudience; 031import org.apache.hadoop.classification.InterfaceStability; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FSDataInputStream; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileAlreadyExistsException; 036import org.apache.hadoop.fs.FileStatus; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.fs.permission.FsPermission; 040import org.apache.hadoop.fs.s3native.NativeS3FileSystem; 041import org.apache.hadoop.io.retry.RetryPolicies; 042import org.apache.hadoop.io.retry.RetryPolicy; 043import org.apache.hadoop.io.retry.RetryProxy; 044import org.apache.hadoop.util.Progressable; 045 046/** 047 * A block-based {@link FileSystem} backed by 048 * <a href="http://aws.amazon.com/s3">Amazon S3</a>. 049 * 050 * @see NativeS3FileSystem 051 */ 052@InterfaceAudience.Public 053@InterfaceStability.Stable 054public class S3FileSystem extends FileSystem { 055 056 private URI uri; 057 058 private FileSystemStore store; 059 060 private Path workingDir; 061 062 public S3FileSystem() { 063 // set store in initialize() 064 } 065 066 public S3FileSystem(FileSystemStore store) { 067 this.store = store; 068 } 069 070 /** 071 * Return the protocol scheme for the FileSystem. 072 * 073 * @return <code>s3</code> 074 */ 075 @Override 076 public String getScheme() { 077 return "s3"; 078 } 079 080 @Override 081 public URI getUri() { 082 return uri; 083 } 084 085 @Override 086 public void initialize(URI uri, Configuration conf) throws IOException { 087 super.initialize(uri, conf); 088 if (store == null) { 089 store = createDefaultStore(conf); 090 } 091 store.initialize(uri, conf); 092 setConf(conf); 093 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 094 this.workingDir = 095 new Path("/user", System.getProperty("user.name")).makeQualified(this); 096 } 097 098 private static FileSystemStore createDefaultStore(Configuration conf) { 099 FileSystemStore store = new Jets3tFileSystemStore(); 100 101 RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep( 102 conf.getInt("fs.s3.maxRetries", 4), 103 conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS); 104 Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap = 105 new HashMap<Class<? extends Exception>, RetryPolicy>(); 106 exceptionToPolicyMap.put(IOException.class, basePolicy); 107 exceptionToPolicyMap.put(S3Exception.class, basePolicy); 108 109 RetryPolicy methodPolicy = RetryPolicies.retryByException( 110 RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap); 111 Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>(); 112 methodNameToPolicyMap.put("storeBlock", methodPolicy); 113 methodNameToPolicyMap.put("retrieveBlock", methodPolicy); 114 115 return (FileSystemStore) RetryProxy.create(FileSystemStore.class, 116 store, methodNameToPolicyMap); 117 } 118 119 @Override 120 public Path getWorkingDirectory() { 121 return workingDir; 122 } 123 124 @Override 125 public void setWorkingDirectory(Path dir) { 126 workingDir = makeAbsolute(dir); 127 } 128 129 private Path makeAbsolute(Path path) { 130 if (path.isAbsolute()) { 131 return path; 132 } 133 return new Path(workingDir, path); 134 } 135 136 /** 137 * @param permission Currently ignored. 138 */ 139 @Override 140 public boolean mkdirs(Path path, FsPermission permission) throws IOException { 141 Path absolutePath = makeAbsolute(path); 142 List<Path> paths = new ArrayList<Path>(); 143 do { 144 paths.add(0, absolutePath); 145 absolutePath = absolutePath.getParent(); 146 } while (absolutePath != null); 147 148 boolean result = true; 149 for (Path p : paths) { 150 result &= mkdir(p); 151 } 152 return result; 153 } 154 155 private boolean mkdir(Path path) throws IOException { 156 Path absolutePath = makeAbsolute(path); 157 INode inode = store.retrieveINode(absolutePath); 158 if (inode == null) { 159 store.storeINode(absolutePath, INode.DIRECTORY_INODE); 160 } else if (inode.isFile()) { 161 throw new IOException(String.format( 162 "Can't make directory for path %s since it is a file.", 163 absolutePath)); 164 } 165 return true; 166 } 167 168 @Override 169 public boolean isFile(Path path) throws IOException { 170 INode inode = store.retrieveINode(makeAbsolute(path)); 171 if (inode == null) { 172 return false; 173 } 174 return inode.isFile(); 175 } 176 177 private INode checkFile(Path path) throws IOException { 178 INode inode = store.retrieveINode(makeAbsolute(path)); 179 if (inode == null) { 180 throw new IOException("No such file."); 181 } 182 if (inode.isDirectory()) { 183 throw new IOException("Path " + path + " is a directory."); 184 } 185 return inode; 186 } 187 188 @Override 189 public FileStatus[] listStatus(Path f) throws IOException { 190 Path absolutePath = makeAbsolute(f); 191 INode inode = store.retrieveINode(absolutePath); 192 if (inode == null) { 193 throw new FileNotFoundException("File " + f + " does not exist."); 194 } 195 if (inode.isFile()) { 196 return new FileStatus[] { 197 new S3FileStatus(f.makeQualified(this), inode) 198 }; 199 } 200 ArrayList<FileStatus> ret = new ArrayList<FileStatus>(); 201 for (Path p : store.listSubPaths(absolutePath)) { 202 ret.add(getFileStatus(p.makeQualified(this))); 203 } 204 return ret.toArray(new FileStatus[0]); 205 } 206 207 /** This optional operation is not yet supported. */ 208 @Override 209 public FSDataOutputStream append(Path f, int bufferSize, 210 Progressable progress) throws IOException { 211 throw new IOException("Not supported"); 212 } 213 214 /** 215 * @param permission Currently ignored. 216 */ 217 @Override 218 public FSDataOutputStream create(Path file, FsPermission permission, 219 boolean overwrite, int bufferSize, 220 short replication, long blockSize, Progressable progress) 221 throws IOException { 222 223 INode inode = store.retrieveINode(makeAbsolute(file)); 224 if (inode != null) { 225 if (overwrite) { 226 delete(file, true); 227 } else { 228 throw new FileAlreadyExistsException("File already exists: " + file); 229 } 230 } else { 231 Path parent = file.getParent(); 232 if (parent != null) { 233 if (!mkdirs(parent)) { 234 throw new IOException("Mkdirs failed to create " + parent.toString()); 235 } 236 } 237 } 238 return new FSDataOutputStream 239 (new S3OutputStream(getConf(), store, makeAbsolute(file), 240 blockSize, progress, bufferSize), 241 statistics); 242 } 243 244 @Override 245 public FSDataInputStream open(Path path, int bufferSize) throws IOException { 246 INode inode = checkFile(path); 247 return new FSDataInputStream(new S3InputStream(getConf(), store, inode, 248 statistics)); 249 } 250 251 @Override 252 public boolean rename(Path src, Path dst) throws IOException { 253 Path absoluteSrc = makeAbsolute(src); 254 final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - "; 255 INode srcINode = store.retrieveINode(absoluteSrc); 256 boolean debugEnabled = LOG.isDebugEnabled(); 257 if (srcINode == null) { 258 // src path doesn't exist 259 if (debugEnabled) { 260 LOG.debug(debugPreamble + "returning false as src does not exist"); 261 } 262 return false; 263 } 264 265 Path absoluteDst = makeAbsolute(dst); 266 267 //validate the parent dir of the destination 268 Path dstParent = absoluteDst.getParent(); 269 if (dstParent != null) { 270 //if the dst parent is not root, make sure it exists 271 INode dstParentINode = store.retrieveINode(dstParent); 272 if (dstParentINode == null) { 273 // dst parent doesn't exist 274 if (debugEnabled) { 275 LOG.debug(debugPreamble + 276 "returning false as dst parent does not exist"); 277 } 278 return false; 279 } 280 if (dstParentINode.isFile()) { 281 // dst parent exists but is a file 282 if (debugEnabled) { 283 LOG.debug(debugPreamble + 284 "returning false as dst parent exists and is a file"); 285 } 286 return false; 287 } 288 } 289 290 //get status of source 291 boolean srcIsFile = srcINode.isFile(); 292 293 INode dstINode = store.retrieveINode(absoluteDst); 294 boolean destExists = dstINode != null; 295 boolean destIsDir = destExists && !dstINode.isFile(); 296 if (srcIsFile) { 297 298 //source is a simple file 299 if (destExists) { 300 if (destIsDir) { 301 //outcome #1 dest exists and is dir -filename to subdir of dest 302 if (debugEnabled) { 303 LOG.debug(debugPreamble + 304 "copying src file under dest dir to " + absoluteDst); 305 } 306 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 307 } else { 308 //outcome #2 dest it's a file: fail iff different from src 309 boolean renamingOnToSelf = absoluteSrc.equals(absoluteDst); 310 if (debugEnabled) { 311 LOG.debug(debugPreamble + 312 "copying file onto file, outcome is " + renamingOnToSelf); 313 } 314 return renamingOnToSelf; 315 } 316 } else { 317 // #3 dest does not exist: use dest as path for rename 318 if (debugEnabled) { 319 LOG.debug(debugPreamble + 320 "copying file onto file"); 321 } 322 } 323 } else { 324 //here the source exists and is a directory 325 // outcomes (given we know the parent dir exists if we get this far) 326 // #1 destination is a file: fail 327 // #2 destination is a directory: create a new dir under that one 328 // #3 destination doesn't exist: create a new dir with that name 329 // #3 and #4 are only allowed if the dest path is not == or under src 330 331 if (destExists) { 332 if (!destIsDir) { 333 // #1 destination is a file: fail 334 if (debugEnabled) { 335 LOG.debug(debugPreamble + 336 "returning false as src is a directory, but not dest"); 337 } 338 return false; 339 } else { 340 // the destination dir exists 341 // destination for rename becomes a subdir of the target name 342 absoluteDst = new Path(absoluteDst, absoluteSrc.getName()); 343 if (debugEnabled) { 344 LOG.debug(debugPreamble + 345 "copying src dir under dest dir to " + absoluteDst); 346 } 347 } 348 } 349 //the final destination directory is now know, so validate it for 350 //illegal moves 351 352 if (absoluteSrc.equals(absoluteDst)) { 353 //you can't rename a directory onto itself 354 if (debugEnabled) { 355 LOG.debug(debugPreamble + 356 "Dest==source && isDir -failing"); 357 } 358 return false; 359 } 360 if (absoluteDst.toString().startsWith(absoluteSrc.toString() + "/")) { 361 //you can't move a directory under itself 362 if (debugEnabled) { 363 LOG.debug(debugPreamble + 364 "dst is equal to or under src dir -failing"); 365 } 366 return false; 367 } 368 } 369 //here the dest path is set up -so rename 370 return renameRecursive(absoluteSrc, absoluteDst); 371 } 372 373 private boolean renameRecursive(Path src, Path dst) throws IOException { 374 INode srcINode = store.retrieveINode(src); 375 store.storeINode(dst, srcINode); 376 store.deleteINode(src); 377 if (srcINode.isDirectory()) { 378 for (Path oldSrc : store.listDeepSubPaths(src)) { 379 INode inode = store.retrieveINode(oldSrc); 380 if (inode == null) { 381 return false; 382 } 383 String oldSrcPath = oldSrc.toUri().getPath(); 384 String srcPath = src.toUri().getPath(); 385 String dstPath = dst.toUri().getPath(); 386 Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath)); 387 store.storeINode(newDst, inode); 388 store.deleteINode(oldSrc); 389 } 390 } 391 return true; 392 } 393 394 @Override 395 public boolean delete(Path path, boolean recursive) throws IOException { 396 Path absolutePath = makeAbsolute(path); 397 INode inode = store.retrieveINode(absolutePath); 398 if (inode == null) { 399 return false; 400 } 401 if (inode.isFile()) { 402 store.deleteINode(absolutePath); 403 for (Block block: inode.getBlocks()) { 404 store.deleteBlock(block); 405 } 406 } else { 407 FileStatus[] contents = null; 408 try { 409 contents = listStatus(absolutePath); 410 } catch(FileNotFoundException fnfe) { 411 return false; 412 } 413 414 if ((contents.length !=0) && (!recursive)) { 415 throw new IOException("Directory " + path.toString() 416 + " is not empty."); 417 } 418 for (FileStatus p:contents) { 419 if (!delete(p.getPath(), recursive)) { 420 return false; 421 } 422 } 423 store.deleteINode(absolutePath); 424 } 425 return true; 426 } 427 428 /** 429 * FileStatus for S3 file systems. 430 */ 431 @Override 432 public FileStatus getFileStatus(Path f) throws IOException { 433 INode inode = store.retrieveINode(makeAbsolute(f)); 434 if (inode == null) { 435 throw new FileNotFoundException(f + ": No such file or directory."); 436 } 437 return new S3FileStatus(f.makeQualified(this), inode); 438 } 439 440 @Override 441 public long getDefaultBlockSize() { 442 return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024); 443 } 444 445 @Override 446 public String getCanonicalServiceName() { 447 // Does not support Token 448 return null; 449 } 450 451 // diagnostic methods 452 453 void dump() throws IOException { 454 store.dump(); 455 } 456 457 void purge() throws IOException { 458 store.purge(); 459 } 460 461 private static class S3FileStatus extends FileStatus { 462 463 S3FileStatus(Path f, INode inode) throws IOException { 464 super(findLength(inode), inode.isDirectory(), 1, 465 findBlocksize(inode), 0, f); 466 } 467 468 private static long findLength(INode inode) { 469 if (!inode.isDirectory()) { 470 long length = 0L; 471 for (Block block : inode.getBlocks()) { 472 length += block.getLength(); 473 } 474 return length; 475 } 476 return 0; 477 } 478 479 private static long findBlocksize(INode inode) { 480 final Block[] ret = inode.getBlocks(); 481 return ret == null ? 0L : ret[0].getLength(); 482 } 483 } 484}