001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.azure; 020 021import java.io.DataInputStream; 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InputStream; 025import java.io.OutputStream; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.nio.charset.Charset; 029import java.text.SimpleDateFormat; 030import java.util.ArrayList; 031import java.util.Date; 032import java.util.EnumSet; 033import java.util.Iterator; 034import java.util.Set; 035import java.util.TimeZone; 036import java.util.TreeSet; 037import java.util.UUID; 038import java.util.concurrent.atomic.AtomicInteger; 039import java.util.regex.Matcher; 040import java.util.regex.Pattern; 041 042import org.apache.commons.lang.StringUtils; 043import org.apache.commons.lang.exception.ExceptionUtils; 044import org.apache.commons.logging.Log; 045import org.apache.commons.logging.LogFactory; 046import org.apache.hadoop.classification.InterfaceAudience; 047import org.apache.hadoop.classification.InterfaceStability; 048import org.apache.hadoop.conf.Configuration; 049import org.apache.hadoop.fs.BlockLocation; 050import org.apache.hadoop.fs.BufferedFSInputStream; 051import org.apache.hadoop.fs.CreateFlag; 052import org.apache.hadoop.fs.FSDataInputStream; 053import org.apache.hadoop.fs.FSDataOutputStream; 054import org.apache.hadoop.fs.FSInputStream; 055import org.apache.hadoop.fs.FileStatus; 056import org.apache.hadoop.fs.FileSystem; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; 059import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem; 060import org.apache.hadoop.fs.permission.FsPermission; 061import org.apache.hadoop.fs.permission.PermissionStatus; 062import org.apache.hadoop.fs.azure.AzureException; 063import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper; 064import org.apache.hadoop.io.IOUtils; 065import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; 066import org.apache.hadoop.security.UserGroupInformation; 067import org.apache.hadoop.util.Progressable; 068 069 070import org.codehaus.jackson.JsonNode; 071import org.codehaus.jackson.JsonParseException; 072import org.codehaus.jackson.JsonParser; 073import org.codehaus.jackson.map.JsonMappingException; 074import org.codehaus.jackson.map.ObjectMapper; 075 076import com.google.common.annotations.VisibleForTesting; 077import com.microsoft.azure.storage.AccessCondition; 078import com.microsoft.azure.storage.OperationContext; 079import com.microsoft.azure.storage.StorageException; 080import com.microsoft.azure.storage.blob.CloudBlob; 081import com.microsoft.azure.storage.core.*; 082 083/** 084 * A {@link FileSystem} for reading and writing files stored on <a 085 * href="http://store.azure.com/">Windows Azure</a>. This implementation is 086 * blob-based and stores files on Azure in their native form so they can be read 087 * by other Azure tools. 088 */ 089@InterfaceAudience.Public 090@InterfaceStability.Stable 091public class NativeAzureFileSystem extends FileSystem { 092 private static final int USER_WX_PERMISION = 0300; 093 094 /** 095 * A description of a folder rename operation, including the source and 096 * destination keys, and descriptions of the files in the source folder. 097 */ 098 public static class FolderRenamePending { 099 private SelfRenewingLease folderLease; 100 private String srcKey; 101 private String dstKey; 102 private FileMetadata[] fileMetadata = null; // descriptions of source files 103 private ArrayList<String> fileStrings = null; 104 private NativeAzureFileSystem fs; 105 private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000; 106 private static final int FORMATTING_BUFFER = 10000; 107 private boolean committed; 108 public static final String SUFFIX = "-RenamePending.json"; 109 110 // Prepare in-memory information needed to do or redo a folder rename. 111 public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease, 112 NativeAzureFileSystem fs) throws IOException { 113 this.srcKey = srcKey; 114 this.dstKey = dstKey; 115 this.folderLease = lease; 116 this.fs = fs; 117 ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>(); 118 119 // List all the files in the folder. 120 String priorLastKey = null; 121 do { 122 PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL, 123 AZURE_UNBOUNDED_DEPTH, priorLastKey); 124 for(FileMetadata file : listing.getFiles()) { 125 fileMetadataList.add(file); 126 } 127 priorLastKey = listing.getPriorLastKey(); 128 } while (priorLastKey != null); 129 fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]); 130 this.committed = true; 131 } 132 133 // Prepare in-memory information needed to do or redo folder rename from 134 // a -RenamePending.json file read from storage. This constructor is to use during 135 // redo processing. 136 public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs) 137 throws IllegalArgumentException, IOException { 138 139 this.fs = fs; 140 141 // open redo file 142 Path f = redoFile; 143 FSDataInputStream input = fs.open(f); 144 byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE]; 145 int l = input.read(bytes); 146 if (l < 0) { 147 throw new IOException( 148 "Error reading pending rename file contents -- no data available"); 149 } 150 if (l == MAX_RENAME_PENDING_FILE_SIZE) { 151 throw new IOException( 152 "Error reading pending rename file contents -- " 153 + "maximum file size exceeded"); 154 } 155 String contents = new String(bytes, 0, l, Charset.forName("UTF-8")); 156 157 // parse the JSON 158 ObjectMapper objMapper = new ObjectMapper(); 159 objMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); 160 JsonNode json = null; 161 try { 162 json = objMapper.readValue(contents, JsonNode.class); 163 this.committed = true; 164 } catch (JsonMappingException e) { 165 166 // The -RedoPending.json file is corrupted, so we assume it was 167 // not completely written 168 // and the redo operation did not commit. 169 this.committed = false; 170 } catch (JsonParseException e) { 171 this.committed = false; 172 } catch (IOException e) { 173 this.committed = false; 174 } 175 176 if (!this.committed) { 177 LOG.error("Deleting corruped rename pending file " 178 + redoFile + "\n" + contents); 179 180 // delete the -RenamePending.json file 181 fs.delete(redoFile, false); 182 return; 183 } 184 185 // initialize this object's fields 186 ArrayList<String> fileStrList = new ArrayList<String>(); 187 JsonNode oldFolderName = json.get("OldFolderName"); 188 JsonNode newFolderName = json.get("NewFolderName"); 189 if (oldFolderName == null || newFolderName == null) { 190 this.committed = false; 191 } else { 192 this.srcKey = oldFolderName.getTextValue(); 193 this.dstKey = newFolderName.getTextValue(); 194 if (this.srcKey == null || this.dstKey == null) { 195 this.committed = false; 196 } else { 197 JsonNode fileList = json.get("FileList"); 198 if (fileList == null) { 199 this.committed = false; 200 } else { 201 for (int i = 0; i < fileList.size(); i++) { 202 fileStrList.add(fileList.get(i).getTextValue()); 203 } 204 } 205 } 206 } 207 this.fileStrings = fileStrList; 208 } 209 210 public FileMetadata[] getFiles() { 211 return fileMetadata; 212 } 213 214 public SelfRenewingLease getFolderLease() { 215 return folderLease; 216 } 217 218 /** 219 * Write to disk the information needed to redo folder rename, 220 * in JSON format. The file name will be 221 * {@code wasb://<sourceFolderPrefix>/folderName-RenamePending.json} 222 * The file format will be: 223 * <pre>{@code 224 * { 225 * FormatVersion: "1.0", 226 * OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>", 227 * OldFolderName: "<key>", 228 * NewFolderName: "<key>", 229 * FileList: [ <string> , <string> , ... ] 230 * } 231 * 232 * Here's a sample: 233 * { 234 * FormatVersion: "1.0", 235 * OperationUTCTime: "2014-07-01 23:50:35.572", 236 * OldFolderName: "user/ehans/folderToRename", 237 * NewFolderName: "user/ehans/renamedFolder", 238 * FileList: [ 239 * "innerFile", 240 * "innerFile2" 241 * ] 242 * } }</pre> 243 * @throws IOException 244 */ 245 public void writeFile(FileSystem fs) throws IOException { 246 Path path = getRenamePendingFilePath(); 247 if (LOG.isDebugEnabled()){ 248 LOG.debug("Preparing to write atomic rename state to " + path.toString()); 249 } 250 OutputStream output = null; 251 252 String contents = makeRenamePendingFileContents(); 253 254 // Write file. 255 try { 256 output = fs.create(path); 257 output.write(contents.getBytes(Charset.forName("UTF-8"))); 258 } catch (IOException e) { 259 throw new IOException("Unable to write RenamePending file for folder rename from " 260 + srcKey + " to " + dstKey, e); 261 } finally { 262 IOUtils.cleanup(LOG, output); 263 } 264 } 265 266 /** 267 * Return the contents of the JSON file to represent the operations 268 * to be performed for a folder rename. 269 */ 270 public String makeRenamePendingFileContents() { 271 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 272 sdf.setTimeZone(TimeZone.getTimeZone("UTC")); 273 String time = sdf.format(new Date()); 274 275 // Make file list string 276 StringBuilder builder = new StringBuilder(); 277 builder.append("[\n"); 278 for (int i = 0; i != fileMetadata.length; i++) { 279 if (i > 0) { 280 builder.append(",\n"); 281 } 282 builder.append(" "); 283 String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/"); 284 285 // Quote string file names, escaping any possible " characters or other 286 // necessary characters in the name. 287 builder.append(quote(noPrefix)); 288 if (builder.length() >= 289 MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) { 290 291 // Give up now to avoid using too much memory. 292 LOG.error("Internal error: Exceeded maximum rename pending file size of " 293 + MAX_RENAME_PENDING_FILE_SIZE + " bytes."); 294 295 // return some bad JSON with an error message to make it human readable 296 return "exceeded maximum rename pending file size"; 297 } 298 } 299 builder.append("\n ]"); 300 String fileList = builder.toString(); 301 302 // Make file contents as a string. Again, quote file names, escaping 303 // characters as appropriate. 304 String contents = "{\n" 305 + " FormatVersion: \"1.0\",\n" 306 + " OperationUTCTime: \"" + time + "\",\n" 307 + " OldFolderName: " + quote(srcKey) + ",\n" 308 + " NewFolderName: " + quote(dstKey) + ",\n" 309 + " FileList: " + fileList + "\n" 310 + "}\n"; 311 312 return contents; 313 } 314 315 /** 316 * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote 317 * method. 318 * 319 * Produce a string in double quotes with backslash sequences in all the 320 * right places. A backslash will be inserted within </, allowing JSON 321 * text to be delivered in HTML. In JSON text, a string cannot contain a 322 * control character or an unescaped quote or backslash. 323 * @param string A String 324 * @return A String correctly formatted for insertion in a JSON text. 325 */ 326 private String quote(String string) { 327 if (string == null || string.length() == 0) { 328 return "\"\""; 329 } 330 331 char c = 0; 332 int i; 333 int len = string.length(); 334 StringBuilder sb = new StringBuilder(len + 4); 335 String t; 336 337 sb.append('"'); 338 for (i = 0; i < len; i += 1) { 339 c = string.charAt(i); 340 switch (c) { 341 case '\\': 342 case '"': 343 sb.append('\\'); 344 sb.append(c); 345 break; 346 case '/': 347 sb.append('\\'); 348 sb.append(c); 349 break; 350 case '\b': 351 sb.append("\\b"); 352 break; 353 case '\t': 354 sb.append("\\t"); 355 break; 356 case '\n': 357 sb.append("\\n"); 358 break; 359 case '\f': 360 sb.append("\\f"); 361 break; 362 case '\r': 363 sb.append("\\r"); 364 break; 365 default: 366 if (c < ' ') { 367 t = "000" + Integer.toHexString(c); 368 sb.append("\\u" + t.substring(t.length() - 4)); 369 } else { 370 sb.append(c); 371 } 372 } 373 } 374 sb.append('"'); 375 return sb.toString(); 376 } 377 378 public String getSrcKey() { 379 return srcKey; 380 } 381 382 public String getDstKey() { 383 return dstKey; 384 } 385 386 public FileMetadata getSourceMetadata() throws IOException { 387 return fs.getStoreInterface().retrieveMetadata(srcKey); 388 } 389 390 /** 391 * Execute a folder rename. This is the execution path followed 392 * when everything is working normally. See redo() for the alternate 393 * execution path for the case where we're recovering from a folder rename 394 * failure. 395 * @throws IOException 396 */ 397 public void execute() throws IOException { 398 399 for (FileMetadata file : this.getFiles()) { 400 401 // Rename all materialized entries under the folder to point to the 402 // final destination. 403 if (file.getBlobMaterialization() == BlobMaterialization.Explicit) { 404 String srcName = file.getKey(); 405 String suffix = srcName.substring((this.getSrcKey()).length()); 406 String dstName = this.getDstKey() + suffix; 407 408 // Rename gets exclusive access (via a lease) for files 409 // designated for atomic rename. 410 // The main use case is for HBase write-ahead log (WAL) and data 411 // folder processing correctness. See the rename code for details. 412 boolean acquireLease = fs.getStoreInterface().isAtomicRenameKey(srcName); 413 fs.getStoreInterface().rename(srcName, dstName, acquireLease, null); 414 } 415 } 416 417 // Rename the source folder 0-byte root file itself. 418 FileMetadata srcMetadata2 = this.getSourceMetadata(); 419 if (srcMetadata2.getBlobMaterialization() == 420 BlobMaterialization.Explicit) { 421 422 // It already has a lease on it from the "prepare" phase so there's no 423 // need to get one now. Pass in existing lease to allow file delete. 424 fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(), 425 false, folderLease); 426 } 427 428 // Update the last-modified time of the parent folders of both source and 429 // destination. 430 fs.updateParentFolderLastModifiedTime(srcKey); 431 fs.updateParentFolderLastModifiedTime(dstKey); 432 } 433 434 /** Clean up after execution of rename. 435 * @throws IOException */ 436 public void cleanup() throws IOException { 437 438 if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) { 439 440 // Remove RenamePending file 441 fs.delete(getRenamePendingFilePath(), false); 442 443 // Freeing source folder lease is not necessary since the source 444 // folder file was deleted. 445 } 446 } 447 448 private Path getRenamePendingFilePath() { 449 String fileName = srcKey + SUFFIX; 450 Path fileNamePath = keyToPath(fileName); 451 Path path = fs.makeAbsolute(fileNamePath); 452 return path; 453 } 454 455 /** 456 * Recover from a folder rename failure by redoing the intended work, 457 * as recorded in the -RenamePending.json file. 458 * 459 * @throws IOException 460 */ 461 public void redo() throws IOException { 462 463 if (!committed) { 464 465 // Nothing to do. The -RedoPending.json file should have already been 466 // deleted. 467 return; 468 } 469 470 // Try to get a lease on source folder to block concurrent access to it. 471 // It may fail if the folder is already gone. We don't check if the 472 // source exists explicitly because that could recursively trigger redo 473 // and give an infinite recursion. 474 SelfRenewingLease lease = null; 475 boolean sourceFolderGone = false; 476 try { 477 lease = fs.leaseSourceFolder(srcKey); 478 } catch (AzureException e) { 479 480 // If the source folder was not found then somebody probably 481 // raced with us and finished the rename first, or the 482 // first rename failed right before deleting the rename pending 483 // file. 484 String errorCode = ""; 485 try { 486 StorageException se = (StorageException) e.getCause(); 487 errorCode = se.getErrorCode(); 488 } catch (Exception e2) { 489 ; // do nothing -- could not get errorCode 490 } 491 if (errorCode.equals("BlobNotFound")) { 492 sourceFolderGone = true; 493 } else { 494 throw new IOException( 495 "Unexpected error when trying to lease source folder name during " 496 + "folder rename redo", 497 e); 498 } 499 } 500 501 if (!sourceFolderGone) { 502 // Make sure the target folder exists. 503 Path dst = fullPath(dstKey); 504 if (!fs.exists(dst)) { 505 fs.mkdirs(dst); 506 } 507 508 // For each file inside the folder to be renamed, 509 // make sure it has been renamed. 510 for(String fileName : fileStrings) { 511 finishSingleFileRename(fileName); 512 } 513 514 // Remove the source folder. Don't check explicitly if it exists, 515 // to avoid triggering redo recursively. 516 try { 517 fs.getStoreInterface().delete(srcKey, lease); 518 } catch (Exception e) { 519 LOG.info("Unable to delete source folder during folder rename redo. " 520 + "If the source folder is already gone, this is not an error " 521 + "condition. Continuing with redo.", e); 522 } 523 524 // Update the last-modified time of the parent folders of both source 525 // and destination. 526 fs.updateParentFolderLastModifiedTime(srcKey); 527 fs.updateParentFolderLastModifiedTime(dstKey); 528 } 529 530 // Remove the -RenamePending.json file. 531 fs.delete(getRenamePendingFilePath(), false); 532 } 533 534 // See if the source file is still there, and if it is, rename it. 535 private void finishSingleFileRename(String fileName) 536 throws IOException { 537 Path srcFile = fullPath(srcKey, fileName); 538 Path dstFile = fullPath(dstKey, fileName); 539 boolean srcExists = fs.exists(srcFile); 540 boolean dstExists = fs.exists(dstFile); 541 if (srcExists && !dstExists) { 542 543 // Rename gets exclusive access (via a lease) for HBase write-ahead log 544 // (WAL) file processing correctness. See the rename code for details. 545 String srcName = fs.pathToKey(srcFile); 546 String dstName = fs.pathToKey(dstFile); 547 fs.getStoreInterface().rename(srcName, dstName, true, null); 548 } else if (srcExists && dstExists) { 549 550 // Get a lease on source to block write access. 551 String srcName = fs.pathToKey(srcFile); 552 SelfRenewingLease lease = fs.acquireLease(srcFile); 553 554 // Delete the file. This will free the lease too. 555 fs.getStoreInterface().delete(srcName, lease); 556 } else if (!srcExists && dstExists) { 557 558 // The rename already finished, so do nothing. 559 ; 560 } else { 561 throw new IOException( 562 "Attempting to complete rename of file " + srcKey + "/" + fileName 563 + " during folder rename redo, and file was not found in source " 564 + "or destination."); 565 } 566 } 567 568 // Return an absolute path for the specific fileName within the folder 569 // specified by folderKey. 570 private Path fullPath(String folderKey, String fileName) { 571 return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName); 572 } 573 574 private Path fullPath(String fileKey) { 575 return new Path(new Path(fs.getUri()), "/" + fileKey); 576 } 577 } 578 579 private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]"; 580 private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN = 581 Pattern.compile("\\[\\[\\.\\]\\](?=$|/)"); 582 private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)"); 583 584 @Override 585 public String getScheme() { 586 return "wasb"; 587 } 588 589 590 /** 591 * <p> 592 * A {@link FileSystem} for reading and writing files stored on <a 593 * href="http://store.azure.com/">Windows Azure</a>. This implementation is 594 * blob-based and stores files on Azure in their native form so they can be read 595 * by other Azure tools. This implementation uses HTTPS for secure network communication. 596 * </p> 597 */ 598 public static class Secure extends NativeAzureFileSystem { 599 @Override 600 public String getScheme() { 601 return "wasbs"; 602 } 603 } 604 605 public static final Log LOG = LogFactory.getLog(NativeAzureFileSystem.class); 606 607 static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; 608 /** 609 * The time span in seconds before which we consider a temp blob to be 610 * dangling (not being actively uploaded to) and up for reclamation. 611 * 612 * So e.g. if this is 60, then any temporary blobs more than a minute old 613 * would be considered dangling. 614 */ 615 static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds"; 616 private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600; 617 static final String PATH_DELIMITER = Path.SEPARATOR; 618 static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$"; 619 620 private static final int AZURE_LIST_ALL = -1; 621 private static final int AZURE_UNBOUNDED_DEPTH = -1; 622 623 private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L; 624 625 /** 626 * The configuration property that determines which group owns files created 627 * in WASB. 628 */ 629 private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup"; 630 /** 631 * The default value for fs.azure.permissions.supergroup. Chosen as the same 632 * default as DFS. 633 */ 634 static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup"; 635 636 static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = 637 "fs.azure.block.location.impersonatedhost"; 638 private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = 639 "localhost"; 640 static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME = 641 "fs.azure.ring.buffer.capacity"; 642 static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME = 643 "fs.azure.output.stream.buffer.size"; 644 645 public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics"; 646 647 private class NativeAzureFsInputStream extends FSInputStream { 648 private InputStream in; 649 private final String key; 650 private long pos = 0; 651 private boolean closed = false; 652 private boolean isPageBlob; 653 654 // File length, valid only for streams over block blobs. 655 private long fileLength; 656 657 public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) { 658 this.in = in; 659 this.key = key; 660 this.isPageBlob = store.isPageBlobKey(key); 661 this.fileLength = fileLength; 662 } 663 664 /** 665 * Return the size of the remaining available bytes 666 * if the size is less than or equal to {@link Integer#MAX_VALUE}, 667 * otherwise, return {@link Integer#MAX_VALUE}. 668 * 669 * This is to match the behavior of DFSInputStream.available(), 670 * which some clients may rely on (HBase write-ahead log reading in 671 * particular). 672 */ 673 @Override 674 public synchronized int available() throws IOException { 675 if (isPageBlob) { 676 return in.available(); 677 } else { 678 if (closed) { 679 throw new IOException("Stream closed"); 680 } 681 final long remaining = this.fileLength - pos; 682 return remaining <= Integer.MAX_VALUE ? 683 (int) remaining : Integer.MAX_VALUE; 684 } 685 } 686 687 /* 688 * Reads the next byte of data from the input stream. The value byte is 689 * returned as an integer in the range 0 to 255. If no byte is available 690 * because the end of the stream has been reached, the value -1 is returned. 691 * This method blocks until input data is available, the end of the stream 692 * is detected, or an exception is thrown. 693 * 694 * @returns int An integer corresponding to the byte read. 695 */ 696 @Override 697 public synchronized int read() throws IOException { 698 int result = 0; 699 result = in.read(); 700 if (result != -1) { 701 pos++; 702 if (statistics != null) { 703 statistics.incrementBytesRead(1); 704 } 705 } 706 707 // Return to the caller with the result. 708 // 709 return result; 710 } 711 712 /* 713 * Reads up to len bytes of data from the input stream into an array of 714 * bytes. An attempt is made to read as many as len bytes, but a smaller 715 * number may be read. The number of bytes actually read is returned as an 716 * integer. This method blocks until input data is available, end of file is 717 * detected, or an exception is thrown. If len is zero, then no bytes are 718 * read and 0 is returned; otherwise, there is an attempt to read at least 719 * one byte. If no byte is available because the stream is at end of file, 720 * the value -1 is returned; otherwise, at least one byte is read and stored 721 * into b. 722 * 723 * @param b -- the buffer into which data is read 724 * 725 * @param off -- the start offset in the array b at which data is written 726 * 727 * @param len -- the maximum number of bytes read 728 * 729 * @ returns int The total number of byes read into the buffer, or -1 if 730 * there is no more data because the end of stream is reached. 731 */ 732 @Override 733 public synchronized int read(byte[] b, int off, int len) throws IOException { 734 int result = 0; 735 result = in.read(b, off, len); 736 if (result > 0) { 737 pos += result; 738 } 739 740 if (null != statistics) { 741 statistics.incrementBytesRead(result); 742 } 743 744 // Return to the caller with the result. 745 return result; 746 } 747 748 @Override 749 public void close() throws IOException { 750 in.close(); 751 closed = true; 752 } 753 754 @Override 755 public synchronized void seek(long pos) throws IOException { 756 in.close(); 757 in = store.retrieve(key); 758 this.pos = in.skip(pos); 759 if (LOG.isDebugEnabled()) { 760 LOG.debug(String.format("Seek to position %d. Bytes skipped %d", pos, 761 this.pos)); 762 } 763 } 764 765 @Override 766 public synchronized long getPos() throws IOException { 767 return pos; 768 } 769 770 @Override 771 public boolean seekToNewSource(long targetPos) throws IOException { 772 return false; 773 } 774 } 775 776 private class NativeAzureFsOutputStream extends OutputStream { 777 // We should not override flush() to actually close current block and flush 778 // to DFS, this will break applications that assume flush() is a no-op. 779 // Applications are advised to use Syncable.hflush() for that purpose. 780 // NativeAzureFsOutputStream needs to implement Syncable if needed. 781 private String key; 782 private String keyEncoded; 783 private OutputStream out; 784 785 public NativeAzureFsOutputStream(OutputStream out, String aKey, 786 String anEncodedKey) throws IOException { 787 // Check input arguments. The output stream should be non-null and the 788 // keys 789 // should be valid strings. 790 if (null == out) { 791 throw new IllegalArgumentException( 792 "Illegal argument: the output stream is null."); 793 } 794 795 if (null == aKey || 0 == aKey.length()) { 796 throw new IllegalArgumentException( 797 "Illegal argument the key string is null or empty"); 798 } 799 800 if (null == anEncodedKey || 0 == anEncodedKey.length()) { 801 throw new IllegalArgumentException( 802 "Illegal argument the encoded key string is null or empty"); 803 } 804 805 // Initialize the member variables with the incoming parameters. 806 this.out = out; 807 808 setKey(aKey); 809 setEncodedKey(anEncodedKey); 810 } 811 812 @Override 813 public synchronized void close() throws IOException { 814 if (out != null) { 815 // Close the output stream and decode the key for the output stream 816 // before returning to the caller. 817 // 818 out.close(); 819 restoreKey(); 820 out = null; 821 } 822 } 823 824 /** 825 * Writes the specified byte to this output stream. The general contract for 826 * write is that one byte is written to the output stream. The byte to be 827 * written is the eight low-order bits of the argument b. The 24 high-order 828 * bits of b are ignored. 829 * 830 * @param b 831 * 32-bit integer of block of 4 bytes 832 */ 833 @Override 834 public void write(int b) throws IOException { 835 out.write(b); 836 } 837 838 /** 839 * Writes b.length bytes from the specified byte array to this output 840 * stream. The general contract for write(b) is that it should have exactly 841 * the same effect as the call write(b, 0, b.length). 842 * 843 * @param b 844 * Block of bytes to be written to the output stream. 845 */ 846 @Override 847 public void write(byte[] b) throws IOException { 848 out.write(b); 849 } 850 851 /** 852 * Writes <code>len</code> from the specified byte array starting at offset 853 * <code>off</code> to the output stream. The general contract for write(b, 854 * off, len) is that some of the bytes in the array <code> 855 * b</code b> are written to the output stream in order; element 856 * <code>b[off]</code> is the first byte written and 857 * <code>b[off+len-1]</code> is the last byte written by this operation. 858 * 859 * @param b 860 * Byte array to be written. 861 * @param off 862 * Write this offset in stream. 863 * @param len 864 * Number of bytes to be written. 865 */ 866 @Override 867 public void write(byte[] b, int off, int len) throws IOException { 868 out.write(b, off, len); 869 } 870 871 /** 872 * Get the blob name. 873 * 874 * @return String Blob name. 875 */ 876 public String getKey() { 877 return key; 878 } 879 880 /** 881 * Set the blob name. 882 * 883 * @param key 884 * Blob name. 885 */ 886 public void setKey(String key) { 887 this.key = key; 888 } 889 890 /** 891 * Get the blob name. 892 * 893 * @return String Blob name. 894 */ 895 public String getEncodedKey() { 896 return keyEncoded; 897 } 898 899 /** 900 * Set the blob name. 901 * 902 * @param anEncodedKey 903 * Blob name. 904 */ 905 public void setEncodedKey(String anEncodedKey) { 906 this.keyEncoded = anEncodedKey; 907 } 908 909 /** 910 * Restore the original key name from the m_key member variable. Note: The 911 * output file stream is created with an encoded blob store key to guarantee 912 * load balancing on the front end of the Azure storage partition servers. 913 * The create also includes the name of the original key value which is 914 * stored in the m_key member variable. This method should only be called 915 * when the stream is closed. 916 */ 917 private void restoreKey() throws IOException { 918 store.rename(getEncodedKey(), getKey()); 919 } 920 } 921 922 private URI uri; 923 private NativeFileSystemStore store; 924 private AzureNativeFileSystemStore actualStore; 925 private Path workingDir; 926 private long blockSize = MAX_AZURE_BLOCK_SIZE; 927 private AzureFileSystemInstrumentation instrumentation; 928 private String metricsSourceName; 929 private boolean isClosed = false; 930 private static boolean suppressRetryPolicy = false; 931 // A counter to create unique (within-process) names for my metrics sources. 932 private static AtomicInteger metricsSourceNameCounter = new AtomicInteger(); 933 934 935 public NativeAzureFileSystem() { 936 // set store in initialize() 937 } 938 939 public NativeAzureFileSystem(NativeFileSystemStore store) { 940 this.store = store; 941 } 942 943 /** 944 * Suppress the default retry policy for the Storage, useful in unit tests to 945 * test negative cases without waiting forever. 946 */ 947 @VisibleForTesting 948 static void suppressRetryPolicy() { 949 suppressRetryPolicy = true; 950 } 951 952 /** 953 * Undo the effect of suppressRetryPolicy. 954 */ 955 @VisibleForTesting 956 static void resumeRetryPolicy() { 957 suppressRetryPolicy = false; 958 } 959 960 /** 961 * Creates a new metrics source name that's unique within this process. 962 */ 963 @VisibleForTesting 964 public static String newMetricsSourceName() { 965 int number = metricsSourceNameCounter.incrementAndGet(); 966 final String baseName = "AzureFileSystemMetrics"; 967 if (number == 1) { // No need for a suffix for the first one 968 return baseName; 969 } else { 970 return baseName + number; 971 } 972 } 973 974 /** 975 * Checks if the given URI scheme is a scheme that's affiliated with the Azure 976 * File System. 977 * 978 * @param scheme 979 * The URI scheme. 980 * @return true iff it's an Azure File System URI scheme. 981 */ 982 private static boolean isWasbScheme(String scheme) { 983 // The valid schemes are: asv (old name), asvs (old name over HTTPS), 984 // wasb (new name), wasbs (new name over HTTPS). 985 return scheme != null 986 && (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs") 987 || scheme.equalsIgnoreCase("wasb") || scheme 988 .equalsIgnoreCase("wasbs")); 989 } 990 991 /** 992 * Puts in the authority of the default file system if it is a WASB file 993 * system and the given URI's authority is null. 994 * 995 * @return The URI with reconstructed authority if necessary and possible. 996 */ 997 private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) { 998 if (null == uri.getAuthority()) { 999 // If WASB is the default file system, get the authority from there 1000 URI defaultUri = FileSystem.getDefaultUri(conf); 1001 if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) { 1002 try { 1003 // Reconstruct the URI with the authority from the default URI. 1004 return new URI(uri.getScheme(), defaultUri.getAuthority(), 1005 uri.getPath(), uri.getQuery(), uri.getFragment()); 1006 } catch (URISyntaxException e) { 1007 // This should never happen. 1008 throw new Error("Bad URI construction", e); 1009 } 1010 } 1011 } 1012 return uri; 1013 } 1014 1015 @Override 1016 protected void checkPath(Path path) { 1017 // Make sure to reconstruct the path's authority if needed 1018 super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(), 1019 getConf()))); 1020 } 1021 1022 @Override 1023 public void initialize(URI uri, Configuration conf) 1024 throws IOException, IllegalArgumentException { 1025 // Check authority for the URI to guarantee that it is non-null. 1026 uri = reconstructAuthorityIfNeeded(uri, conf); 1027 if (null == uri.getAuthority()) { 1028 final String errMsg = String 1029 .format("Cannot initialize WASB file system, URI authority not recognized."); 1030 throw new IllegalArgumentException(errMsg); 1031 } 1032 super.initialize(uri, conf); 1033 1034 if (store == null) { 1035 store = createDefaultStore(conf); 1036 } 1037 1038 instrumentation = new AzureFileSystemInstrumentation(conf); 1039 if(!conf.getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { 1040 // Make sure the metrics system is available before interacting with Azure 1041 AzureFileSystemMetricsSystem.fileSystemStarted(); 1042 metricsSourceName = newMetricsSourceName(); 1043 String sourceDesc = "Azure Storage Volume File System metrics"; 1044 AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc, 1045 instrumentation); 1046 } 1047 1048 store.initialize(uri, conf, instrumentation); 1049 setConf(conf); 1050 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 1051 this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() 1052 .getShortUserName()).makeQualified(getUri(), getWorkingDirectory()); 1053 this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, 1054 MAX_AZURE_BLOCK_SIZE); 1055 1056 if (LOG.isDebugEnabled()) { 1057 LOG.debug("NativeAzureFileSystem. Initializing."); 1058 LOG.debug(" blockSize = " 1059 + conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); 1060 } 1061 } 1062 1063 private NativeFileSystemStore createDefaultStore(Configuration conf) { 1064 actualStore = new AzureNativeFileSystemStore(); 1065 1066 if (suppressRetryPolicy) { 1067 actualStore.suppressRetryPolicy(); 1068 } 1069 return actualStore; 1070 } 1071 1072 /** 1073 * Azure Storage doesn't allow the blob names to end in a period, 1074 * so encode this here to work around that limitation. 1075 */ 1076 private static String encodeTrailingPeriod(String toEncode) { 1077 Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode); 1078 return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER); 1079 } 1080 1081 /** 1082 * Reverse the encoding done by encodeTrailingPeriod(). 1083 */ 1084 private static String decodeTrailingPeriod(String toDecode) { 1085 Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode); 1086 return matcher.replaceAll("."); 1087 } 1088 1089 /** 1090 * Convert the path to a key. By convention, any leading or trailing slash is 1091 * removed, except for the special case of a single slash. 1092 */ 1093 @VisibleForTesting 1094 public String pathToKey(Path path) { 1095 // Convert the path to a URI to parse the scheme, the authority, and the 1096 // path from the path object. 1097 URI tmpUri = path.toUri(); 1098 String pathUri = tmpUri.getPath(); 1099 1100 // The scheme and authority is valid. If the path does not exist add a "/" 1101 // separator to list the root of the container. 1102 Path newPath = path; 1103 if ("".equals(pathUri)) { 1104 newPath = new Path(tmpUri.toString() + Path.SEPARATOR); 1105 } 1106 1107 // Verify path is absolute if the path refers to a windows drive scheme. 1108 if (!newPath.isAbsolute()) { 1109 throw new IllegalArgumentException("Path must be absolute: " + path); 1110 } 1111 1112 String key = null; 1113 key = newPath.toUri().getPath(); 1114 key = removeTrailingSlash(key); 1115 key = encodeTrailingPeriod(key); 1116 if (key.length() == 1) { 1117 return key; 1118 } else { 1119 return key.substring(1); // remove initial slash 1120 } 1121 } 1122 1123 // Remove any trailing slash except for the case of a single slash. 1124 private static String removeTrailingSlash(String key) { 1125 if (key.length() == 0 || key.length() == 1) { 1126 return key; 1127 } 1128 if (key.charAt(key.length() - 1) == '/') { 1129 return key.substring(0, key.length() - 1); 1130 } else { 1131 return key; 1132 } 1133 } 1134 1135 private static Path keyToPath(String key) { 1136 if (key.equals("/")) { 1137 return new Path("/"); // container 1138 } 1139 return new Path("/" + decodeTrailingPeriod(key)); 1140 } 1141 1142 /** 1143 * Get the absolute version of the path (fully qualified). 1144 * This is public for testing purposes. 1145 * 1146 * @param path 1147 * @return fully qualified path 1148 */ 1149 @VisibleForTesting 1150 public Path makeAbsolute(Path path) { 1151 if (path.isAbsolute()) { 1152 return path; 1153 } 1154 return new Path(workingDir, path); 1155 } 1156 1157 /** 1158 * For unit test purposes, retrieves the AzureNativeFileSystemStore store 1159 * backing this file system. 1160 * 1161 * @return The store object. 1162 */ 1163 @VisibleForTesting 1164 public AzureNativeFileSystemStore getStore() { 1165 return actualStore; 1166 } 1167 1168 NativeFileSystemStore getStoreInterface() { 1169 return store; 1170 } 1171 1172 /** 1173 * Gets the metrics source for this file system. 1174 * This is mainly here for unit testing purposes. 1175 * 1176 * @return the metrics source. 1177 */ 1178 public AzureFileSystemInstrumentation getInstrumentation() { 1179 return instrumentation; 1180 } 1181 1182 /** This optional operation is not yet supported. */ 1183 @Override 1184 public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) 1185 throws IOException { 1186 throw new IOException("Not supported"); 1187 } 1188 1189 @Override 1190 public FSDataOutputStream create(Path f, FsPermission permission, 1191 boolean overwrite, int bufferSize, short replication, long blockSize, 1192 Progressable progress) throws IOException { 1193 return create(f, permission, overwrite, true, 1194 bufferSize, replication, blockSize, progress, 1195 (SelfRenewingLease) null); 1196 } 1197 1198 /** 1199 * Get a self-renewing lease on the specified file. 1200 */ 1201 public SelfRenewingLease acquireLease(Path path) throws AzureException { 1202 String fullKey = pathToKey(makeAbsolute(path)); 1203 return getStore().acquireLease(fullKey); 1204 } 1205 1206 @Override 1207 @SuppressWarnings("deprecation") 1208 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 1209 boolean overwrite, int bufferSize, short replication, long blockSize, 1210 Progressable progress) throws IOException { 1211 1212 Path parent = f.getParent(); 1213 1214 // Get exclusive access to folder if this is a directory designated 1215 // for atomic rename. The primary use case of for HBase write-ahead 1216 // log file management. 1217 SelfRenewingLease lease = null; 1218 if (store.isAtomicRenameKey(pathToKey(f))) { 1219 try { 1220 lease = acquireLease(parent); 1221 } catch (AzureException e) { 1222 1223 String errorCode = ""; 1224 try { 1225 StorageException e2 = (StorageException) e.getCause(); 1226 errorCode = e2.getErrorCode(); 1227 } catch (Exception e3) { 1228 // do nothing if cast fails 1229 } 1230 if (errorCode.equals("BlobNotFound")) { 1231 throw new FileNotFoundException("Cannot create file " + 1232 f.getName() + " because parent folder does not exist."); 1233 } 1234 1235 LOG.warn("Got unexpected exception trying to get lease on " 1236 + pathToKey(parent) + ". " + e.getMessage()); 1237 throw e; 1238 } 1239 } 1240 1241 // See if the parent folder exists. If not, throw error. 1242 // The exists() check will push any pending rename operation forward, 1243 // if there is one, and return false. 1244 // 1245 // At this point, we have exclusive access to the source folder 1246 // via the lease, so we will not conflict with an active folder 1247 // rename operation. 1248 if (!exists(parent)) { 1249 try { 1250 1251 // This'll let the keep-alive thread exit as soon as it wakes up. 1252 lease.free(); 1253 } catch (Exception e) { 1254 LOG.warn("Unable to free lease because: " + e.getMessage()); 1255 } 1256 throw new FileNotFoundException("Cannot create file " + 1257 f.getName() + " because parent folder does not exist."); 1258 } 1259 1260 // Create file inside folder. 1261 FSDataOutputStream out = null; 1262 try { 1263 out = create(f, permission, overwrite, false, 1264 bufferSize, replication, blockSize, progress, lease); 1265 } finally { 1266 // Release exclusive access to folder. 1267 try { 1268 if (lease != null) { 1269 lease.free(); 1270 } 1271 } catch (Exception e) { 1272 IOUtils.cleanup(LOG, out); 1273 String msg = "Unable to free lease on " + parent.toUri(); 1274 LOG.error(msg); 1275 throw new IOException(msg, e); 1276 } 1277 } 1278 return out; 1279 } 1280 1281 @Override 1282 @SuppressWarnings("deprecation") 1283 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 1284 EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, 1285 Progressable progress) throws IOException { 1286 1287 // Check if file should be appended or overwritten. Assume that the file 1288 // is overwritten on if the CREATE and OVERWRITE create flags are set. Note 1289 // that any other combinations of create flags will result in an open new or 1290 // open with append. 1291 final EnumSet<CreateFlag> createflags = 1292 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); 1293 boolean overwrite = flags.containsAll(createflags); 1294 1295 // Delegate the create non-recursive call. 1296 return this.createNonRecursive(f, permission, overwrite, 1297 bufferSize, replication, blockSize, progress); 1298 } 1299 1300 @Override 1301 @SuppressWarnings("deprecation") 1302 public FSDataOutputStream createNonRecursive(Path f, 1303 boolean overwrite, int bufferSize, short replication, long blockSize, 1304 Progressable progress) throws IOException { 1305 return this.createNonRecursive(f, FsPermission.getFileDefault(), 1306 overwrite, bufferSize, replication, blockSize, progress); 1307 } 1308 1309 1310 /** 1311 * Create an Azure blob and return an output stream to use 1312 * to write data to it. 1313 * 1314 * @param f 1315 * @param permission 1316 * @param overwrite 1317 * @param createParent 1318 * @param bufferSize 1319 * @param replication 1320 * @param blockSize 1321 * @param progress 1322 * @param parentFolderLease Lease on parent folder (or null if 1323 * no lease). 1324 * @return 1325 * @throws IOException 1326 */ 1327 private FSDataOutputStream create(Path f, FsPermission permission, 1328 boolean overwrite, boolean createParent, int bufferSize, 1329 short replication, long blockSize, Progressable progress, 1330 SelfRenewingLease parentFolderLease) 1331 throws IOException { 1332 1333 if (LOG.isDebugEnabled()) { 1334 LOG.debug("Creating file: " + f.toString()); 1335 } 1336 1337 if (containsColon(f)) { 1338 throw new IOException("Cannot create file " + f 1339 + " through WASB that has colons in the name"); 1340 } 1341 1342 Path absolutePath = makeAbsolute(f); 1343 String key = pathToKey(absolutePath); 1344 1345 FileMetadata existingMetadata = store.retrieveMetadata(key); 1346 if (existingMetadata != null) { 1347 if (existingMetadata.isDir()) { 1348 throw new IOException("Cannot create file " + f 1349 + "; already exists as a directory."); 1350 } 1351 if (!overwrite) { 1352 throw new IOException("File already exists:" + f); 1353 } 1354 } 1355 1356 Path parentFolder = absolutePath.getParent(); 1357 if (parentFolder != null && parentFolder.getParent() != null) { // skip root 1358 // Update the parent folder last modified time if the parent folder 1359 // already exists. 1360 String parentKey = pathToKey(parentFolder); 1361 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 1362 if (parentMetadata != null && parentMetadata.isDir() && 1363 parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) { 1364 if (parentFolderLease != null) { 1365 store.updateFolderLastModifiedTime(parentKey, parentFolderLease); 1366 } else { 1367 updateParentFolderLastModifiedTime(key); 1368 } 1369 } else { 1370 // Make sure that the parent folder exists. 1371 // Create it using inherited permissions from the first existing directory going up the path 1372 Path firstExisting = parentFolder.getParent(); 1373 FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting)); 1374 while(metadata == null) { 1375 // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata 1376 firstExisting = firstExisting.getParent(); 1377 metadata = store.retrieveMetadata(pathToKey(firstExisting)); 1378 } 1379 mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true); 1380 } 1381 } 1382 1383 // Mask the permission first (with the default permission mask as well). 1384 FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile); 1385 PermissionStatus permissionStatus = createPermissionStatus(masked); 1386 1387 OutputStream bufOutStream; 1388 if (store.isPageBlobKey(key)) { 1389 // Store page blobs directly in-place without renames. 1390 bufOutStream = store.storefile(key, permissionStatus); 1391 } else { 1392 // This is a block blob, so open the output blob stream based on the 1393 // encoded key. 1394 // 1395 String keyEncoded = encodeKey(key); 1396 1397 1398 // First create a blob at the real key, pointing back to the temporary file 1399 // This accomplishes a few things: 1400 // 1. Makes sure we can create a file there. 1401 // 2. Makes it visible to other concurrent threads/processes/nodes what 1402 // we're 1403 // doing. 1404 // 3. Makes it easier to restore/cleanup data in the event of us crashing. 1405 store.storeEmptyLinkFile(key, keyEncoded, permissionStatus); 1406 1407 // The key is encoded to point to a common container at the storage server. 1408 // This reduces the number of splits on the server side when load balancing. 1409 // Ingress to Azure storage can take advantage of earlier splits. We remove 1410 // the root path to the key and prefix a random GUID to the tail (or leaf 1411 // filename) of the key. Keys are thus broadly and randomly distributed over 1412 // a single container to ease load balancing on the storage server. When the 1413 // blob is committed it is renamed to its earlier key. Uncommitted blocks 1414 // are not cleaned up and we leave it to Azure storage to garbage collect 1415 // these 1416 // blocks. 1417 bufOutStream = new NativeAzureFsOutputStream(store.storefile( 1418 keyEncoded, permissionStatus), key, keyEncoded); 1419 } 1420 // Construct the data output stream from the buffered output stream. 1421 FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics); 1422 1423 1424 // Increment the counter 1425 instrumentation.fileCreated(); 1426 1427 // Return data output stream to caller. 1428 return fsOut; 1429 } 1430 1431 @Override 1432 @Deprecated 1433 public boolean delete(Path path) throws IOException { 1434 return delete(path, true); 1435 } 1436 1437 @Override 1438 public boolean delete(Path f, boolean recursive) throws IOException { 1439 return delete(f, recursive, false); 1440 } 1441 1442 /** 1443 * Delete the specified file or folder. The parameter 1444 * skipParentFolderLastModifidedTimeUpdate 1445 * is used in the case of atomic folder rename redo. In that case, there is 1446 * a lease on the parent folder, so (without reworking the code) modifying 1447 * the parent folder update time will fail because of a conflict with the 1448 * lease. Since we are going to delete the folder soon anyway so accurate 1449 * modified time is not necessary, it's easier to just skip 1450 * the modified time update. 1451 * 1452 * @param f 1453 * @param recursive 1454 * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last 1455 * modified time. 1456 * @return true if and only if the file is deleted 1457 * @throws IOException 1458 */ 1459 public boolean delete(Path f, boolean recursive, 1460 boolean skipParentFolderLastModifidedTimeUpdate) throws IOException { 1461 1462 if (LOG.isDebugEnabled()) { 1463 LOG.debug("Deleting file: " + f.toString()); 1464 } 1465 1466 Path absolutePath = makeAbsolute(f); 1467 String key = pathToKey(absolutePath); 1468 1469 // Capture the metadata for the path. 1470 // 1471 FileMetadata metaFile = store.retrieveMetadata(key); 1472 1473 if (null == metaFile) { 1474 // The path to be deleted does not exist. 1475 return false; 1476 } 1477 1478 // The path exists, determine if it is a folder containing objects, 1479 // an empty folder, or a simple file and take the appropriate actions. 1480 if (!metaFile.isDir()) { 1481 // The path specifies a file. We need to check the parent path 1482 // to make sure it's a proper materialized directory before we 1483 // delete the file. Otherwise we may get into a situation where 1484 // the file we were deleting was the last one in an implicit directory 1485 // (e.g. the blob store only contains the blob a/b and there's no 1486 // corresponding directory blob a) and that would implicitly delete 1487 // the directory as well, which is not correct. 1488 Path parentPath = absolutePath.getParent(); 1489 if (parentPath.getParent() != null) {// Not root 1490 String parentKey = pathToKey(parentPath); 1491 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 1492 if (!parentMetadata.isDir()) { 1493 // Invalid state: the parent path is actually a file. Throw. 1494 throw new AzureException("File " + f + " has a parent directory " 1495 + parentPath + " which is also a file. Can't resolve."); 1496 } 1497 if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 1498 if (LOG.isDebugEnabled()) { 1499 LOG.debug("Found an implicit parent directory while trying to" 1500 + " delete the file " + f + ". Creating the directory blob for" 1501 + " it in " + parentKey + "."); 1502 } 1503 store.storeEmptyFolder(parentKey, 1504 createPermissionStatus(FsPermission.getDefault())); 1505 } else { 1506 if (!skipParentFolderLastModifidedTimeUpdate) { 1507 updateParentFolderLastModifiedTime(key); 1508 } 1509 } 1510 } 1511 store.delete(key); 1512 instrumentation.fileDeleted(); 1513 } else { 1514 // The path specifies a folder. Recursively delete all entries under the 1515 // folder. 1516 Path parentPath = absolutePath.getParent(); 1517 if (parentPath.getParent() != null) { 1518 String parentKey = pathToKey(parentPath); 1519 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 1520 1521 if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 1522 if (LOG.isDebugEnabled()) { 1523 LOG.debug("Found an implicit parent directory while trying to" 1524 + " delete the directory " + f 1525 + ". Creating the directory blob for" + " it in " + parentKey 1526 + "."); 1527 } 1528 store.storeEmptyFolder(parentKey, 1529 createPermissionStatus(FsPermission.getDefault())); 1530 } 1531 } 1532 1533 // List all the blobs in the current folder. 1534 String priorLastKey = null; 1535 PartialListing listing = store.listAll(key, AZURE_LIST_ALL, 1, 1536 priorLastKey); 1537 FileMetadata[] contents = listing.getFiles(); 1538 if (!recursive && contents.length > 0) { 1539 // The folder is non-empty and recursive delete was not specified. 1540 // Throw an exception indicating that a non-recursive delete was 1541 // specified for a non-empty folder. 1542 throw new IOException("Non-recursive delete of non-empty directory " 1543 + f.toString()); 1544 } 1545 1546 // Delete all the files in the folder. 1547 for (FileMetadata p : contents) { 1548 // Tag on the directory name found as the suffix of the suffix of the 1549 // parent directory to get the new absolute path. 1550 String suffix = p.getKey().substring( 1551 p.getKey().lastIndexOf(PATH_DELIMITER)); 1552 if (!p.isDir()) { 1553 store.delete(key + suffix); 1554 instrumentation.fileDeleted(); 1555 } else { 1556 // Recursively delete contents of the sub-folders. Notice this also 1557 // deletes the blob for the directory. 1558 if (!delete(new Path(f.toString() + suffix), true)) { 1559 return false; 1560 } 1561 } 1562 } 1563 store.delete(key); 1564 1565 // Update parent directory last modified time 1566 Path parent = absolutePath.getParent(); 1567 if (parent != null && parent.getParent() != null) { // not root 1568 if (!skipParentFolderLastModifidedTimeUpdate) { 1569 updateParentFolderLastModifiedTime(key); 1570 } 1571 } 1572 instrumentation.directoryDeleted(); 1573 } 1574 1575 // File or directory was successfully deleted. 1576 return true; 1577 } 1578 1579 @Override 1580 public FileStatus getFileStatus(Path f) throws IOException { 1581 1582 if (LOG.isDebugEnabled()) { 1583 LOG.debug("Getting the file status for " + f.toString()); 1584 } 1585 1586 // Capture the absolute path and the path to key. 1587 Path absolutePath = makeAbsolute(f); 1588 String key = pathToKey(absolutePath); 1589 if (key.length() == 0) { // root always exists 1590 return newDirectory(null, absolutePath); 1591 } 1592 1593 // The path is either a folder or a file. Retrieve metadata to 1594 // determine if it is a directory or file. 1595 FileMetadata meta = store.retrieveMetadata(key); 1596 if (meta != null) { 1597 if (meta.isDir()) { 1598 // The path is a folder with files in it. 1599 // 1600 if (LOG.isDebugEnabled()) { 1601 LOG.debug("Path " + f.toString() + "is a folder."); 1602 } 1603 1604 // If a rename operation for the folder was pending, redo it. 1605 // Then the file does not exist, so signal that. 1606 if (conditionalRedoFolderRename(f)) { 1607 throw new FileNotFoundException( 1608 absolutePath + ": No such file or directory."); 1609 } 1610 1611 // Return reference to the directory object. 1612 return newDirectory(meta, absolutePath); 1613 } 1614 1615 // The path is a file. 1616 if (LOG.isDebugEnabled()) { 1617 LOG.debug("Found the path: " + f.toString() + " as a file."); 1618 } 1619 1620 // Return with reference to a file object. 1621 return newFile(meta, absolutePath); 1622 } 1623 1624 // File not found. Throw exception no such file or directory. 1625 // 1626 throw new FileNotFoundException( 1627 absolutePath + ": No such file or directory."); 1628 } 1629 1630 // Return true if there is a rename pending and we redo it, otherwise false. 1631 private boolean conditionalRedoFolderRename(Path f) throws IOException { 1632 1633 // Can't rename /, so return immediately in that case. 1634 if (f.getName().equals("")) { 1635 return false; 1636 } 1637 1638 // Check if there is a -RenamePending.json file for this folder, and if so, 1639 // redo the rename. 1640 Path absoluteRenamePendingFile = renamePendingFilePath(f); 1641 if (exists(absoluteRenamePendingFile)) { 1642 FolderRenamePending pending = 1643 new FolderRenamePending(absoluteRenamePendingFile, this); 1644 pending.redo(); 1645 return true; 1646 } else { 1647 return false; 1648 } 1649 } 1650 1651 // Return the path name that would be used for rename of folder with path f. 1652 private Path renamePendingFilePath(Path f) { 1653 Path absPath = makeAbsolute(f); 1654 String key = pathToKey(absPath); 1655 key += "-RenamePending.json"; 1656 return keyToPath(key); 1657 } 1658 1659 @Override 1660 public URI getUri() { 1661 return uri; 1662 } 1663 1664 /** 1665 * Retrieve the status of a given path if it is a file, or of all the 1666 * contained files if it is a directory. 1667 */ 1668 @Override 1669 public FileStatus[] listStatus(Path f) throws IOException { 1670 1671 if (LOG.isDebugEnabled()) { 1672 LOG.debug("Listing status for " + f.toString()); 1673 } 1674 1675 Path absolutePath = makeAbsolute(f); 1676 String key = pathToKey(absolutePath); 1677 Set<FileStatus> status = new TreeSet<FileStatus>(); 1678 FileMetadata meta = store.retrieveMetadata(key); 1679 1680 if (meta != null) { 1681 if (!meta.isDir()) { 1682 if (LOG.isDebugEnabled()) { 1683 LOG.debug("Found path as a file"); 1684 } 1685 return new FileStatus[] { newFile(meta, absolutePath) }; 1686 } 1687 String partialKey = null; 1688 PartialListing listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); 1689 1690 // For any -RenamePending.json files in the listing, 1691 // push the rename forward. 1692 boolean renamed = conditionalRedoFolderRenames(listing); 1693 1694 // If any renames were redone, get another listing, 1695 // since the current one may have changed due to the redo. 1696 if (renamed) { 1697 listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); 1698 } 1699 1700 for (FileMetadata fileMetadata : listing.getFiles()) { 1701 Path subpath = keyToPath(fileMetadata.getKey()); 1702 1703 // Test whether the metadata represents a file or directory and 1704 // add the appropriate metadata object. 1705 // 1706 // Note: There was a very old bug here where directories were added 1707 // to the status set as files flattening out recursive listings 1708 // using "-lsr" down the file system hierarchy. 1709 if (fileMetadata.isDir()) { 1710 // Make sure we hide the temp upload folder 1711 if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) { 1712 // Don't expose that. 1713 continue; 1714 } 1715 status.add(newDirectory(fileMetadata, subpath)); 1716 } else { 1717 status.add(newFile(fileMetadata, subpath)); 1718 } 1719 } 1720 if (LOG.isDebugEnabled()) { 1721 LOG.debug("Found path as a directory with " + status.size() 1722 + " files in it."); 1723 } 1724 } else { 1725 // There is no metadata found for the path. 1726 if (LOG.isDebugEnabled()) { 1727 LOG.debug("Did not find any metadata for path: " + key); 1728 } 1729 1730 throw new FileNotFoundException("File" + f + " does not exist."); 1731 } 1732 1733 return status.toArray(new FileStatus[0]); 1734 } 1735 1736 // Redo any folder renames needed if there are rename pending files in the 1737 // directory listing. Return true if one or more redo operations were done. 1738 private boolean conditionalRedoFolderRenames(PartialListing listing) 1739 throws IllegalArgumentException, IOException { 1740 boolean renamed = false; 1741 for (FileMetadata fileMetadata : listing.getFiles()) { 1742 Path subpath = keyToPath(fileMetadata.getKey()); 1743 if (isRenamePendingFile(subpath)) { 1744 FolderRenamePending pending = 1745 new FolderRenamePending(subpath, this); 1746 pending.redo(); 1747 renamed = true; 1748 } 1749 } 1750 return renamed; 1751 } 1752 1753 // True if this is a folder rename pending file, else false. 1754 private boolean isRenamePendingFile(Path path) { 1755 return path.toString().endsWith(FolderRenamePending.SUFFIX); 1756 } 1757 1758 private FileStatus newFile(FileMetadata meta, Path path) { 1759 return new FileStatus ( 1760 meta.getLength(), 1761 false, 1762 1, 1763 blockSize, 1764 meta.getLastModified(), 1765 0, 1766 meta.getPermissionStatus().getPermission(), 1767 meta.getPermissionStatus().getUserName(), 1768 meta.getPermissionStatus().getGroupName(), 1769 path.makeQualified(getUri(), getWorkingDirectory())); 1770 } 1771 1772 private FileStatus newDirectory(FileMetadata meta, Path path) { 1773 return new FileStatus ( 1774 0, 1775 true, 1776 1, 1777 blockSize, 1778 meta == null ? 0 : meta.getLastModified(), 1779 0, 1780 meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(), 1781 meta == null ? "" : meta.getPermissionStatus().getUserName(), 1782 meta == null ? "" : meta.getPermissionStatus().getGroupName(), 1783 path.makeQualified(getUri(), getWorkingDirectory())); 1784 } 1785 1786 private static enum UMaskApplyMode { 1787 NewFile, 1788 NewDirectory, 1789 NewDirectoryNoUmask, 1790 ChangeExistingFile, 1791 ChangeExistingDirectory, 1792 } 1793 1794 /** 1795 * Applies the applicable UMASK's on the given permission. 1796 * 1797 * @param permission 1798 * The permission to mask. 1799 * @param applyMode 1800 * Whether to also apply the default umask. 1801 * @return The masked persmission. 1802 */ 1803 private FsPermission applyUMask(final FsPermission permission, 1804 final UMaskApplyMode applyMode) { 1805 FsPermission newPermission = new FsPermission(permission); 1806 // Apply the default umask - this applies for new files or directories. 1807 if (applyMode == UMaskApplyMode.NewFile 1808 || applyMode == UMaskApplyMode.NewDirectory) { 1809 newPermission = newPermission 1810 .applyUMask(FsPermission.getUMask(getConf())); 1811 } 1812 return newPermission; 1813 } 1814 1815 /** 1816 * Creates the PermissionStatus object to use for the given permission, based 1817 * on the current user in context. 1818 * 1819 * @param permission 1820 * The permission for the file. 1821 * @return The permission status object to use. 1822 * @throws IOException 1823 * If login fails in getCurrentUser 1824 */ 1825 private PermissionStatus createPermissionStatus(FsPermission permission) 1826 throws IOException { 1827 // Create the permission status for this file based on current user 1828 return new PermissionStatus( 1829 UserGroupInformation.getCurrentUser().getShortUserName(), 1830 getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME, 1831 AZURE_DEFAULT_GROUP_DEFAULT), 1832 permission); 1833 } 1834 1835 @Override 1836 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 1837 return mkdirs(f, permission, false); 1838 } 1839 1840 public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException { 1841 if (LOG.isDebugEnabled()) { 1842 LOG.debug("Creating directory: " + f.toString()); 1843 } 1844 1845 if (containsColon(f)) { 1846 throw new IOException("Cannot create directory " + f 1847 + " through WASB that has colons in the name"); 1848 } 1849 1850 Path absolutePath = makeAbsolute(f); 1851 PermissionStatus permissionStatus = null; 1852 if(noUmask) { 1853 // ensure owner still has wx permissions at the minimum 1854 permissionStatus = createPermissionStatus( 1855 applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)), 1856 UMaskApplyMode.NewDirectoryNoUmask)); 1857 } else { 1858 permissionStatus = createPermissionStatus( 1859 applyUMask(permission, UMaskApplyMode.NewDirectory)); 1860 } 1861 1862 1863 ArrayList<String> keysToCreateAsFolder = new ArrayList<String>(); 1864 ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>(); 1865 boolean childCreated = false; 1866 // Check that there is no file in the parent chain of the given path. 1867 for (Path current = absolutePath, parent = current.getParent(); 1868 parent != null; // Stop when you get to the root 1869 current = parent, parent = current.getParent()) { 1870 String currentKey = pathToKey(current); 1871 FileMetadata currentMetadata = store.retrieveMetadata(currentKey); 1872 if (currentMetadata != null && !currentMetadata.isDir()) { 1873 throw new IOException("Cannot create directory " + f + " because " + 1874 current + " is an existing file."); 1875 } else if (currentMetadata == null) { 1876 keysToCreateAsFolder.add(currentKey); 1877 childCreated = true; 1878 } else { 1879 // The directory already exists. Its last modified time need to be 1880 // updated if there is a child directory created under it. 1881 if (childCreated) { 1882 keysToUpdateAsFolder.add(currentKey); 1883 } 1884 childCreated = false; 1885 } 1886 } 1887 1888 for (String currentKey : keysToCreateAsFolder) { 1889 store.storeEmptyFolder(currentKey, permissionStatus); 1890 } 1891 1892 instrumentation.directoryCreated(); 1893 1894 // otherwise throws exception 1895 return true; 1896 } 1897 1898 @Override 1899 public FSDataInputStream open(Path f, int bufferSize) throws IOException { 1900 if (LOG.isDebugEnabled()) { 1901 LOG.debug("Opening file: " + f.toString()); 1902 } 1903 1904 Path absolutePath = makeAbsolute(f); 1905 String key = pathToKey(absolutePath); 1906 FileMetadata meta = store.retrieveMetadata(key); 1907 if (meta == null) { 1908 throw new FileNotFoundException(f.toString()); 1909 } 1910 if (meta.isDir()) { 1911 throw new FileNotFoundException(f.toString() 1912 + " is a directory not a file."); 1913 } 1914 1915 return new FSDataInputStream(new BufferedFSInputStream( 1916 new NativeAzureFsInputStream(store.retrieve(key), key, meta.getLength()), bufferSize)); 1917 } 1918 1919 @Override 1920 public boolean rename(Path src, Path dst) throws IOException { 1921 1922 FolderRenamePending renamePending = null; 1923 1924 if (LOG.isDebugEnabled()) { 1925 LOG.debug("Moving " + src + " to " + dst); 1926 } 1927 1928 if (containsColon(dst)) { 1929 throw new IOException("Cannot rename to file " + dst 1930 + " through WASB that has colons in the name"); 1931 } 1932 1933 String srcKey = pathToKey(makeAbsolute(src)); 1934 1935 if (srcKey.length() == 0) { 1936 // Cannot rename root of file system 1937 return false; 1938 } 1939 1940 // Figure out the final destination 1941 Path absoluteDst = makeAbsolute(dst); 1942 String dstKey = pathToKey(absoluteDst); 1943 FileMetadata dstMetadata = store.retrieveMetadata(dstKey); 1944 if (dstMetadata != null && dstMetadata.isDir()) { 1945 // It's an existing directory. 1946 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); 1947 if (LOG.isDebugEnabled()) { 1948 LOG.debug("Destination " + dst 1949 + " is a directory, adjusted the destination to be " + dstKey); 1950 } 1951 } else if (dstMetadata != null) { 1952 // Attempting to overwrite a file using rename() 1953 if (LOG.isDebugEnabled()) { 1954 LOG.debug("Destination " + dst 1955 + " is an already existing file, failing the rename."); 1956 } 1957 return false; 1958 } else { 1959 // Check that the parent directory exists. 1960 FileMetadata parentOfDestMetadata = 1961 store.retrieveMetadata(pathToKey(absoluteDst.getParent())); 1962 if (parentOfDestMetadata == null) { 1963 if (LOG.isDebugEnabled()) { 1964 LOG.debug("Parent of the destination " + dst 1965 + " doesn't exist, failing the rename."); 1966 } 1967 return false; 1968 } else if (!parentOfDestMetadata.isDir()) { 1969 if (LOG.isDebugEnabled()) { 1970 LOG.debug("Parent of the destination " + dst 1971 + " is a file, failing the rename."); 1972 } 1973 return false; 1974 } 1975 } 1976 FileMetadata srcMetadata = store.retrieveMetadata(srcKey); 1977 if (srcMetadata == null) { 1978 // Source doesn't exist 1979 if (LOG.isDebugEnabled()) { 1980 LOG.debug("Source " + src + " doesn't exist, failing the rename."); 1981 } 1982 return false; 1983 } else if (!srcMetadata.isDir()) { 1984 if (LOG.isDebugEnabled()) { 1985 LOG.debug("Source " + src + " found as a file, renaming."); 1986 } 1987 store.rename(srcKey, dstKey); 1988 } else { 1989 1990 // Prepare for, execute and clean up after of all files in folder, and 1991 // the root file, and update the last modified time of the source and 1992 // target parent folders. The operation can be redone if it fails part 1993 // way through, by applying the "Rename Pending" file. 1994 1995 // The following code (internally) only does atomic rename preparation 1996 // and lease management for page blob folders, limiting the scope of the 1997 // operation to HBase log file folders, where atomic rename is required. 1998 // In the future, we could generalize it easily to all folders. 1999 renamePending = prepareAtomicFolderRename(srcKey, dstKey); 2000 renamePending.execute(); 2001 if (LOG.isDebugEnabled()) { 2002 LOG.debug("Renamed " + src + " to " + dst + " successfully."); 2003 } 2004 renamePending.cleanup(); 2005 return true; 2006 } 2007 2008 // Update the last-modified time of the parent folders of both source 2009 // and destination. 2010 updateParentFolderLastModifiedTime(srcKey); 2011 updateParentFolderLastModifiedTime(dstKey); 2012 2013 if (LOG.isDebugEnabled()) { 2014 LOG.debug("Renamed " + src + " to " + dst + " successfully."); 2015 } 2016 return true; 2017 } 2018 2019 /** 2020 * Update the last-modified time of the parent folder of the file 2021 * identified by key. 2022 * @param key 2023 * @throws IOException 2024 */ 2025 private void updateParentFolderLastModifiedTime(String key) 2026 throws IOException { 2027 Path parent = makeAbsolute(keyToPath(key)).getParent(); 2028 if (parent != null && parent.getParent() != null) { // not root 2029 String parentKey = pathToKey(parent); 2030 2031 // ensure the parent is a materialized folder 2032 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 2033 // The metadata could be null if the implicit folder only contains a 2034 // single file. In this case, the parent folder no longer exists if the 2035 // file is renamed; so we can safely ignore the null pointer case. 2036 if (parentMetadata != null) { 2037 if (parentMetadata.isDir() 2038 && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2039 store.storeEmptyFolder(parentKey, 2040 createPermissionStatus(FsPermission.getDefault())); 2041 } 2042 2043 if (store.isAtomicRenameKey(parentKey)) { 2044 SelfRenewingLease lease = null; 2045 try { 2046 lease = leaseSourceFolder(parentKey); 2047 store.updateFolderLastModifiedTime(parentKey, lease); 2048 } catch (AzureException e) { 2049 String errorCode = ""; 2050 try { 2051 StorageException e2 = (StorageException) e.getCause(); 2052 errorCode = e2.getErrorCode(); 2053 } catch (Exception e3) { 2054 // do nothing if cast fails 2055 } 2056 if (errorCode.equals("BlobNotFound")) { 2057 throw new FileNotFoundException("Folder does not exist: " + parentKey); 2058 } 2059 LOG.warn("Got unexpected exception trying to get lease on " 2060 + parentKey + ". " + e.getMessage()); 2061 throw e; 2062 } finally { 2063 try { 2064 if (lease != null) { 2065 lease.free(); 2066 } 2067 } catch (Exception e) { 2068 LOG.error("Unable to free lease on " + parentKey, e); 2069 } 2070 } 2071 } else { 2072 store.updateFolderLastModifiedTime(parentKey, null); 2073 } 2074 } 2075 } 2076 } 2077 2078 /** 2079 * If the source is a page blob folder, 2080 * prepare to rename this folder atomically. This means to get exclusive 2081 * access to the source folder, and record the actions to be performed for 2082 * this rename in a "Rename Pending" file. This code was designed to 2083 * meet the needs of HBase, which requires atomic rename of write-ahead log 2084 * (WAL) folders for correctness. 2085 * 2086 * Before calling this method, the caller must ensure that the source is a 2087 * folder. 2088 * 2089 * For non-page-blob directories, prepare the in-memory information needed, 2090 * but don't take the lease or write the redo file. This is done to limit the 2091 * scope of atomic folder rename to HBase, at least at the time of writing 2092 * this code. 2093 * 2094 * @param srcKey Source folder name. 2095 * @param dstKey Destination folder name. 2096 * @throws IOException 2097 */ 2098 private FolderRenamePending prepareAtomicFolderRename( 2099 String srcKey, String dstKey) throws IOException { 2100 2101 if (store.isAtomicRenameKey(srcKey)) { 2102 2103 // Block unwanted concurrent access to source folder. 2104 SelfRenewingLease lease = leaseSourceFolder(srcKey); 2105 2106 // Prepare in-memory information needed to do or redo a folder rename. 2107 FolderRenamePending renamePending = 2108 new FolderRenamePending(srcKey, dstKey, lease, this); 2109 2110 // Save it to persistent storage to help recover if the operation fails. 2111 renamePending.writeFile(this); 2112 return renamePending; 2113 } else { 2114 FolderRenamePending renamePending = 2115 new FolderRenamePending(srcKey, dstKey, null, this); 2116 return renamePending; 2117 } 2118 } 2119 2120 /** 2121 * Get a self-renewing Azure blob lease on the source folder zero-byte file. 2122 */ 2123 private SelfRenewingLease leaseSourceFolder(String srcKey) 2124 throws AzureException { 2125 return store.acquireLease(srcKey); 2126 } 2127 2128 /** 2129 * Return an array containing hostnames, offset and size of 2130 * portions of the given file. For WASB we'll just lie and give 2131 * fake hosts to make sure we get many splits in MR jobs. 2132 */ 2133 @Override 2134 public BlockLocation[] getFileBlockLocations(FileStatus file, 2135 long start, long len) throws IOException { 2136 if (file == null) { 2137 return null; 2138 } 2139 2140 if ((start < 0) || (len < 0)) { 2141 throw new IllegalArgumentException("Invalid start or len parameter"); 2142 } 2143 2144 if (file.getLen() < start) { 2145 return new BlockLocation[0]; 2146 } 2147 final String blobLocationHost = getConf().get( 2148 AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, 2149 AZURE_BLOCK_LOCATION_HOST_DEFAULT); 2150 final String[] name = { blobLocationHost }; 2151 final String[] host = { blobLocationHost }; 2152 long blockSize = file.getBlockSize(); 2153 if (blockSize <= 0) { 2154 throw new IllegalArgumentException( 2155 "The block size for the given file is not a positive number: " 2156 + blockSize); 2157 } 2158 int numberOfLocations = (int) (len / blockSize) 2159 + ((len % blockSize == 0) ? 0 : 1); 2160 BlockLocation[] locations = new BlockLocation[numberOfLocations]; 2161 for (int i = 0; i < locations.length; i++) { 2162 long currentOffset = start + (i * blockSize); 2163 long currentLength = Math.min(blockSize, start + len - currentOffset); 2164 locations[i] = new BlockLocation(name, host, currentOffset, currentLength); 2165 } 2166 return locations; 2167 } 2168 2169 /** 2170 * Set the working directory to the given directory. 2171 */ 2172 @Override 2173 public void setWorkingDirectory(Path newDir) { 2174 workingDir = makeAbsolute(newDir); 2175 } 2176 2177 @Override 2178 public Path getWorkingDirectory() { 2179 return workingDir; 2180 } 2181 2182 @Override 2183 public void setPermission(Path p, FsPermission permission) throws IOException { 2184 Path absolutePath = makeAbsolute(p); 2185 String key = pathToKey(absolutePath); 2186 FileMetadata metadata = store.retrieveMetadata(key); 2187 if (metadata == null) { 2188 throw new FileNotFoundException("File doesn't exist: " + p); 2189 } 2190 permission = applyUMask(permission, 2191 metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory 2192 : UMaskApplyMode.ChangeExistingFile); 2193 if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2194 // It's an implicit folder, need to materialize it. 2195 store.storeEmptyFolder(key, createPermissionStatus(permission)); 2196 } else if (!metadata.getPermissionStatus().getPermission(). 2197 equals(permission)) { 2198 store.changePermissionStatus(key, new PermissionStatus( 2199 metadata.getPermissionStatus().getUserName(), 2200 metadata.getPermissionStatus().getGroupName(), 2201 permission)); 2202 } 2203 } 2204 2205 @Override 2206 public void setOwner(Path p, String username, String groupname) 2207 throws IOException { 2208 Path absolutePath = makeAbsolute(p); 2209 String key = pathToKey(absolutePath); 2210 FileMetadata metadata = store.retrieveMetadata(key); 2211 if (metadata == null) { 2212 throw new FileNotFoundException("File doesn't exist: " + p); 2213 } 2214 PermissionStatus newPermissionStatus = new PermissionStatus( 2215 username == null ? 2216 metadata.getPermissionStatus().getUserName() : username, 2217 groupname == null ? 2218 metadata.getPermissionStatus().getGroupName() : groupname, 2219 metadata.getPermissionStatus().getPermission()); 2220 if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2221 // It's an implicit folder, need to materialize it. 2222 store.storeEmptyFolder(key, newPermissionStatus); 2223 } else { 2224 store.changePermissionStatus(key, newPermissionStatus); 2225 } 2226 } 2227 2228 @Override 2229 public synchronized void close() throws IOException { 2230 if (isClosed) { 2231 return; 2232 } 2233 2234 // Call the base close() to close any resources there. 2235 super.close(); 2236 // Close the store to close any resources there - e.g. the bandwidth 2237 // updater thread would be stopped at this time. 2238 store.close(); 2239 // Notify the metrics system that this file system is closed, which may 2240 // trigger one final metrics push to get the accurate final file system 2241 // metrics out. 2242 2243 long startTime = System.currentTimeMillis(); 2244 2245 if(!getConf().getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { 2246 AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName); 2247 AzureFileSystemMetricsSystem.fileSystemClosed(); 2248 } 2249 2250 if (LOG.isDebugEnabled()) { 2251 LOG.debug("Submitting metrics when file system closed took " 2252 + (System.currentTimeMillis() - startTime) + " ms."); 2253 } 2254 isClosed = true; 2255 } 2256 2257 /** 2258 * A handler that defines what to do with blobs whose upload was 2259 * interrupted. 2260 */ 2261 private abstract class DanglingFileHandler { 2262 abstract void handleFile(FileMetadata file, FileMetadata tempFile) 2263 throws IOException; 2264 } 2265 2266 /** 2267 * Handler implementation for just deleting dangling files and cleaning 2268 * them up. 2269 */ 2270 private class DanglingFileDeleter extends DanglingFileHandler { 2271 @Override 2272 void handleFile(FileMetadata file, FileMetadata tempFile) 2273 throws IOException { 2274 if (LOG.isDebugEnabled()) { 2275 LOG.debug("Deleting dangling file " + file.getKey()); 2276 } 2277 store.delete(file.getKey()); 2278 store.delete(tempFile.getKey()); 2279 } 2280 } 2281 2282 /** 2283 * Handler implementation for just moving dangling files to recovery 2284 * location (/lost+found). 2285 */ 2286 private class DanglingFileRecoverer extends DanglingFileHandler { 2287 private final Path destination; 2288 2289 DanglingFileRecoverer(Path destination) { 2290 this.destination = destination; 2291 } 2292 2293 @Override 2294 void handleFile(FileMetadata file, FileMetadata tempFile) 2295 throws IOException { 2296 if (LOG.isDebugEnabled()) { 2297 LOG.debug("Recovering " + file.getKey()); 2298 } 2299 // Move to the final destination 2300 String finalDestinationKey = 2301 pathToKey(new Path(destination, file.getKey())); 2302 store.rename(tempFile.getKey(), finalDestinationKey); 2303 if (!finalDestinationKey.equals(file.getKey())) { 2304 // Delete the empty link file now that we've restored it. 2305 store.delete(file.getKey()); 2306 } 2307 } 2308 } 2309 2310 /** 2311 * Check if a path has colons in its name 2312 */ 2313 private boolean containsColon(Path p) { 2314 return p.toUri().getPath().toString().contains(":"); 2315 } 2316 2317 /** 2318 * Implements recover and delete (-move and -delete) behaviors for handling 2319 * dangling files (blobs whose upload was interrupted). 2320 * 2321 * @param root 2322 * The root path to check from. 2323 * @param handler 2324 * The handler that deals with dangling files. 2325 */ 2326 private void handleFilesWithDanglingTempData(Path root, 2327 DanglingFileHandler handler) throws IOException { 2328 // Calculate the cut-off for when to consider a blob to be dangling. 2329 long cutoffForDangling = new Date().getTime() 2330 - getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME, 2331 AZURE_TEMP_EXPIRY_DEFAULT) * 1000; 2332 // Go over all the blobs under the given root and look for blobs to 2333 // recover. 2334 String priorLastKey = null; 2335 do { 2336 PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL, 2337 AZURE_UNBOUNDED_DEPTH, priorLastKey); 2338 2339 for (FileMetadata file : listing.getFiles()) { 2340 if (!file.isDir()) { // We don't recover directory blobs 2341 // See if this blob has a link in it (meaning it's a place-holder 2342 // blob for when the upload to the temp blob is complete). 2343 String link = store.getLinkInFileMetadata(file.getKey()); 2344 if (link != null) { 2345 // It has a link, see if the temp blob it is pointing to is 2346 // existent and old enough to be considered dangling. 2347 FileMetadata linkMetadata = store.retrieveMetadata(link); 2348 if (linkMetadata != null 2349 && linkMetadata.getLastModified() >= cutoffForDangling) { 2350 // Found one! 2351 handler.handleFile(file, linkMetadata); 2352 } 2353 } 2354 } 2355 } 2356 priorLastKey = listing.getPriorLastKey(); 2357 } while (priorLastKey != null); 2358 } 2359 2360 /** 2361 * Looks under the given root path for any blob that are left "dangling", 2362 * meaning that they are place-holder blobs that we created while we upload 2363 * the data to a temporary blob, but for some reason we crashed in the middle 2364 * of the upload and left them there. If any are found, we move them to the 2365 * destination given. 2366 * 2367 * @param root 2368 * The root path to consider. 2369 * @param destination 2370 * The destination path to move any recovered files to. 2371 * @throws IOException 2372 */ 2373 public void recoverFilesWithDanglingTempData(Path root, Path destination) 2374 throws IOException { 2375 if (LOG.isDebugEnabled()) { 2376 LOG.debug("Recovering files with dangling temp data in " + root); 2377 } 2378 handleFilesWithDanglingTempData(root, 2379 new DanglingFileRecoverer(destination)); 2380 } 2381 2382 /** 2383 * Looks under the given root path for any blob that are left "dangling", 2384 * meaning that they are place-holder blobs that we created while we upload 2385 * the data to a temporary blob, but for some reason we crashed in the middle 2386 * of the upload and left them there. If any are found, we delete them. 2387 * 2388 * @param root 2389 * The root path to consider. 2390 * @throws IOException 2391 */ 2392 public void deleteFilesWithDanglingTempData(Path root) throws IOException { 2393 if (LOG.isDebugEnabled()) { 2394 LOG.debug("Deleting files with dangling temp data in " + root); 2395 } 2396 handleFilesWithDanglingTempData(root, new DanglingFileDeleter()); 2397 } 2398 2399 @Override 2400 protected void finalize() throws Throwable { 2401 LOG.debug("finalize() called."); 2402 close(); 2403 super.finalize(); 2404 } 2405 2406 /** 2407 * Encode the key with a random prefix for load balancing in Azure storage. 2408 * Upload data to a random temporary file then do storage side renaming to 2409 * recover the original key. 2410 * 2411 * @param aKey 2412 * @return Encoded version of the original key. 2413 */ 2414 private static String encodeKey(String aKey) { 2415 // Get the tail end of the key name. 2416 // 2417 String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1, 2418 aKey.length()); 2419 2420 // Construct the randomized prefix of the file name. The prefix ensures the 2421 // file always drops into the same folder but with a varying tail key name. 2422 String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR 2423 + UUID.randomUUID().toString(); 2424 2425 // Concatenate the randomized prefix with the tail of the key name. 2426 String randomizedKey = filePrefix + fileName; 2427 2428 // Return to the caller with the randomized key. 2429 return randomizedKey; 2430 } 2431}