001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.fs.azure; 020 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.EOFException; 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.io.InputStream; 027import java.io.OutputStream; 028import java.net.URI; 029import java.net.URISyntaxException; 030import java.nio.charset.Charset; 031import java.text.SimpleDateFormat; 032import java.util.ArrayList; 033import java.util.Date; 034import java.util.EnumSet; 035import java.util.Set; 036import java.util.TimeZone; 037import java.util.TreeSet; 038import java.util.UUID; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.regex.Matcher; 041import java.util.regex.Pattern; 042 043import org.apache.commons.lang.StringUtils; 044import org.apache.hadoop.classification.InterfaceAudience; 045import org.apache.hadoop.classification.InterfaceStability; 046import org.apache.hadoop.conf.Configuration; 047import org.apache.hadoop.fs.BlockLocation; 048import org.apache.hadoop.fs.BufferedFSInputStream; 049import org.apache.hadoop.fs.CreateFlag; 050import org.apache.hadoop.fs.FSDataInputStream; 051import org.apache.hadoop.fs.FSDataOutputStream; 052import org.apache.hadoop.fs.FSExceptionMessages; 053import org.apache.hadoop.fs.FSInputStream; 054import org.apache.hadoop.fs.FileAlreadyExistsException; 055import org.apache.hadoop.fs.FileStatus; 056import org.apache.hadoop.fs.FileSystem; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; 059import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem; 060import org.apache.hadoop.fs.permission.FsPermission; 061import org.apache.hadoop.fs.permission.PermissionStatus; 062import org.apache.hadoop.fs.azure.AzureException; 063import org.apache.hadoop.security.UserGroupInformation; 064import org.apache.hadoop.util.Progressable; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067import org.codehaus.jackson.JsonNode; 068import org.codehaus.jackson.JsonParseException; 069import org.codehaus.jackson.JsonParser; 070import org.codehaus.jackson.map.JsonMappingException; 071import org.codehaus.jackson.map.ObjectMapper; 072 073import com.google.common.annotations.VisibleForTesting; 074import com.microsoft.azure.storage.StorageException; 075 076 077import org.apache.hadoop.io.IOUtils; 078 079/** 080 * A {@link FileSystem} for reading and writing files stored on <a 081 * href="http://store.azure.com/">Windows Azure</a>. This implementation is 082 * blob-based and stores files on Azure in their native form so they can be read 083 * by other Azure tools. 084 */ 085@InterfaceAudience.Public 086@InterfaceStability.Stable 087public class NativeAzureFileSystem extends FileSystem { 088 private static final int USER_WX_PERMISION = 0300; 089 /** 090 * A description of a folder rename operation, including the source and 091 * destination keys, and descriptions of the files in the source folder. 092 */ 093 public static class FolderRenamePending { 094 private SelfRenewingLease folderLease; 095 private String srcKey; 096 private String dstKey; 097 private FileMetadata[] fileMetadata = null; // descriptions of source files 098 private ArrayList<String> fileStrings = null; 099 private NativeAzureFileSystem fs; 100 private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000; 101 private static final int FORMATTING_BUFFER = 10000; 102 private boolean committed; 103 public static final String SUFFIX = "-RenamePending.json"; 104 105 // Prepare in-memory information needed to do or redo a folder rename. 106 public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease, 107 NativeAzureFileSystem fs) throws IOException { 108 this.srcKey = srcKey; 109 this.dstKey = dstKey; 110 this.folderLease = lease; 111 this.fs = fs; 112 ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>(); 113 114 // List all the files in the folder. 115 String priorLastKey = null; 116 do { 117 PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL, 118 AZURE_UNBOUNDED_DEPTH, priorLastKey); 119 for(FileMetadata file : listing.getFiles()) { 120 fileMetadataList.add(file); 121 } 122 priorLastKey = listing.getPriorLastKey(); 123 } while (priorLastKey != null); 124 fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]); 125 this.committed = true; 126 } 127 128 // Prepare in-memory information needed to do or redo folder rename from 129 // a -RenamePending.json file read from storage. This constructor is to use during 130 // redo processing. 131 public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs) 132 throws IllegalArgumentException, IOException { 133 134 this.fs = fs; 135 136 // open redo file 137 Path f = redoFile; 138 FSDataInputStream input = fs.open(f); 139 byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE]; 140 int l = input.read(bytes); 141 if (l <= 0) { 142 // Jira HADOOP-12678 -Handle empty rename pending metadata file during 143 // atomic rename in redo path. If during renamepending file is created 144 // but not written yet, then this means that rename operation 145 // has not started yet. So we should delete rename pending metadata file. 146 LOG.error("Deleting empty rename pending file " 147 + redoFile + " -- no data available"); 148 deleteRenamePendingFile(fs, redoFile); 149 return; 150 } 151 if (l == MAX_RENAME_PENDING_FILE_SIZE) { 152 throw new IOException( 153 "Error reading pending rename file contents -- " 154 + "maximum file size exceeded"); 155 } 156 String contents = new String(bytes, 0, l, Charset.forName("UTF-8")); 157 158 // parse the JSON 159 ObjectMapper objMapper = new ObjectMapper(); 160 objMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); 161 JsonNode json = null; 162 try { 163 json = objMapper.readValue(contents, JsonNode.class); 164 this.committed = true; 165 } catch (JsonMappingException e) { 166 167 // The -RedoPending.json file is corrupted, so we assume it was 168 // not completely written 169 // and the redo operation did not commit. 170 this.committed = false; 171 } catch (JsonParseException e) { 172 this.committed = false; 173 } catch (IOException e) { 174 this.committed = false; 175 } 176 177 if (!this.committed) { 178 LOG.error("Deleting corruped rename pending file {} \n {}", 179 redoFile, contents); 180 181 // delete the -RenamePending.json file 182 deleteRenamePendingFile(fs, redoFile); 183 return; 184 } 185 186 // initialize this object's fields 187 ArrayList<String> fileStrList = new ArrayList<String>(); 188 JsonNode oldFolderName = json.get("OldFolderName"); 189 JsonNode newFolderName = json.get("NewFolderName"); 190 if (oldFolderName == null || newFolderName == null) { 191 this.committed = false; 192 } else { 193 this.srcKey = oldFolderName.getTextValue(); 194 this.dstKey = newFolderName.getTextValue(); 195 if (this.srcKey == null || this.dstKey == null) { 196 this.committed = false; 197 } else { 198 JsonNode fileList = json.get("FileList"); 199 if (fileList == null) { 200 this.committed = false; 201 } else { 202 for (int i = 0; i < fileList.size(); i++) { 203 fileStrList.add(fileList.get(i).getTextValue()); 204 } 205 } 206 } 207 } 208 this.fileStrings = fileStrList; 209 } 210 211 public FileMetadata[] getFiles() { 212 return fileMetadata; 213 } 214 215 public SelfRenewingLease getFolderLease() { 216 return folderLease; 217 } 218 219 /** 220 * Deletes rename pending metadata file 221 * @param fs -- the file system 222 * @param redoFile - rename pending metadata file path 223 * @throws IOException - If deletion fails 224 */ 225 @VisibleForTesting 226 void deleteRenamePendingFile(FileSystem fs, Path redoFile) 227 throws IOException { 228 try { 229 fs.delete(redoFile, false); 230 } catch (IOException e) { 231 // If the rename metadata was not found then somebody probably 232 // raced with us and finished the delete first 233 Throwable t = e.getCause(); 234 if (t != null && t instanceof StorageException 235 && "BlobNotFound".equals(((StorageException) t).getErrorCode())) { 236 LOG.warn("rename pending file " + redoFile + " is already deleted"); 237 } else { 238 throw e; 239 } 240 } 241 } 242 243 /** 244 * Write to disk the information needed to redo folder rename, 245 * in JSON format. The file name will be 246 * {@code wasb://<sourceFolderPrefix>/folderName-RenamePending.json} 247 * The file format will be: 248 * <pre>{@code 249 * { 250 * FormatVersion: "1.0", 251 * OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>", 252 * OldFolderName: "<key>", 253 * NewFolderName: "<key>", 254 * FileList: [ <string> , <string> , ... ] 255 * } 256 * 257 * Here's a sample: 258 * { 259 * FormatVersion: "1.0", 260 * OperationUTCTime: "2014-07-01 23:50:35.572", 261 * OldFolderName: "user/ehans/folderToRename", 262 * NewFolderName: "user/ehans/renamedFolder", 263 * FileList: [ 264 * "innerFile", 265 * "innerFile2" 266 * ] 267 * } }</pre> 268 * @throws IOException 269 */ 270 public void writeFile(FileSystem fs) throws IOException { 271 Path path = getRenamePendingFilePath(); 272 LOG.debug("Preparing to write atomic rename state to {}", path.toString()); 273 OutputStream output = null; 274 275 String contents = makeRenamePendingFileContents(); 276 277 // Write file. 278 try { 279 output = fs.create(path); 280 output.write(contents.getBytes(Charset.forName("UTF-8"))); 281 } catch (IOException e) { 282 throw new IOException("Unable to write RenamePending file for folder rename from " 283 + srcKey + " to " + dstKey, e); 284 } finally { 285 NativeAzureFileSystemHelper.cleanup(LOG, output); 286 } 287 } 288 289 /** 290 * Return the contents of the JSON file to represent the operations 291 * to be performed for a folder rename. 292 */ 293 public String makeRenamePendingFileContents() { 294 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 295 sdf.setTimeZone(TimeZone.getTimeZone("UTC")); 296 String time = sdf.format(new Date()); 297 298 // Make file list string 299 StringBuilder builder = new StringBuilder(); 300 builder.append("[\n"); 301 for (int i = 0; i != fileMetadata.length; i++) { 302 if (i > 0) { 303 builder.append(",\n"); 304 } 305 builder.append(" "); 306 String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/"); 307 308 // Quote string file names, escaping any possible " characters or other 309 // necessary characters in the name. 310 builder.append(quote(noPrefix)); 311 if (builder.length() >= 312 MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) { 313 314 // Give up now to avoid using too much memory. 315 LOG.error("Internal error: Exceeded maximum rename pending file size of {} bytes.", 316 MAX_RENAME_PENDING_FILE_SIZE); 317 318 // return some bad JSON with an error message to make it human readable 319 return "exceeded maximum rename pending file size"; 320 } 321 } 322 builder.append("\n ]"); 323 String fileList = builder.toString(); 324 325 // Make file contents as a string. Again, quote file names, escaping 326 // characters as appropriate. 327 String contents = "{\n" 328 + " FormatVersion: \"1.0\",\n" 329 + " OperationUTCTime: \"" + time + "\",\n" 330 + " OldFolderName: " + quote(srcKey) + ",\n" 331 + " NewFolderName: " + quote(dstKey) + ",\n" 332 + " FileList: " + fileList + "\n" 333 + "}\n"; 334 335 return contents; 336 } 337 338 /** 339 * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote 340 * method. 341 * 342 * Produce a string in double quotes with backslash sequences in all the 343 * right places. A backslash will be inserted within </, allowing JSON 344 * text to be delivered in HTML. In JSON text, a string cannot contain a 345 * control character or an unescaped quote or backslash. 346 * @param string A String 347 * @return A String correctly formatted for insertion in a JSON text. 348 */ 349 private String quote(String string) { 350 if (string == null || string.length() == 0) { 351 return "\"\""; 352 } 353 354 char c = 0; 355 int i; 356 int len = string.length(); 357 StringBuilder sb = new StringBuilder(len + 4); 358 String t; 359 360 sb.append('"'); 361 for (i = 0; i < len; i += 1) { 362 c = string.charAt(i); 363 switch (c) { 364 case '\\': 365 case '"': 366 sb.append('\\'); 367 sb.append(c); 368 break; 369 case '/': 370 sb.append('\\'); 371 sb.append(c); 372 break; 373 case '\b': 374 sb.append("\\b"); 375 break; 376 case '\t': 377 sb.append("\\t"); 378 break; 379 case '\n': 380 sb.append("\\n"); 381 break; 382 case '\f': 383 sb.append("\\f"); 384 break; 385 case '\r': 386 sb.append("\\r"); 387 break; 388 default: 389 if (c < ' ') { 390 t = "000" + Integer.toHexString(c); 391 sb.append("\\u" + t.substring(t.length() - 4)); 392 } else { 393 sb.append(c); 394 } 395 } 396 } 397 sb.append('"'); 398 return sb.toString(); 399 } 400 401 public String getSrcKey() { 402 return srcKey; 403 } 404 405 public String getDstKey() { 406 return dstKey; 407 } 408 409 public FileMetadata getSourceMetadata() throws IOException { 410 return fs.getStoreInterface().retrieveMetadata(srcKey); 411 } 412 413 /** 414 * Execute a folder rename. This is the execution path followed 415 * when everything is working normally. See redo() for the alternate 416 * execution path for the case where we're recovering from a folder rename 417 * failure. 418 * @throws IOException 419 */ 420 public void execute() throws IOException { 421 422 for (FileMetadata file : this.getFiles()) { 423 424 // Rename all materialized entries under the folder to point to the 425 // final destination. 426 if (file.getBlobMaterialization() == BlobMaterialization.Explicit) { 427 String srcName = file.getKey(); 428 String suffix = srcName.substring((this.getSrcKey()).length()); 429 String dstName = this.getDstKey() + suffix; 430 431 // Rename gets exclusive access (via a lease) for files 432 // designated for atomic rename. 433 // The main use case is for HBase write-ahead log (WAL) and data 434 // folder processing correctness. See the rename code for details. 435 boolean acquireLease = fs.getStoreInterface().isAtomicRenameKey(srcName); 436 fs.getStoreInterface().rename(srcName, dstName, acquireLease, null); 437 } 438 } 439 440 // Rename the source folder 0-byte root file itself. 441 FileMetadata srcMetadata2 = this.getSourceMetadata(); 442 if (srcMetadata2.getBlobMaterialization() == 443 BlobMaterialization.Explicit) { 444 445 // It already has a lease on it from the "prepare" phase so there's no 446 // need to get one now. Pass in existing lease to allow file delete. 447 fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(), 448 false, folderLease); 449 } 450 451 // Update the last-modified time of the parent folders of both source and 452 // destination. 453 fs.updateParentFolderLastModifiedTime(srcKey); 454 fs.updateParentFolderLastModifiedTime(dstKey); 455 } 456 457 /** Clean up after execution of rename. 458 * @throws IOException */ 459 public void cleanup() throws IOException { 460 461 if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) { 462 463 // Remove RenamePending file 464 fs.delete(getRenamePendingFilePath(), false); 465 466 // Freeing source folder lease is not necessary since the source 467 // folder file was deleted. 468 } 469 } 470 471 private Path getRenamePendingFilePath() { 472 String fileName = srcKey + SUFFIX; 473 Path fileNamePath = keyToPath(fileName); 474 Path path = fs.makeAbsolute(fileNamePath); 475 return path; 476 } 477 478 /** 479 * Recover from a folder rename failure by redoing the intended work, 480 * as recorded in the -RenamePending.json file. 481 * 482 * @throws IOException 483 */ 484 public void redo() throws IOException { 485 486 if (!committed) { 487 488 // Nothing to do. The -RedoPending.json file should have already been 489 // deleted. 490 return; 491 } 492 493 // Try to get a lease on source folder to block concurrent access to it. 494 // It may fail if the folder is already gone. We don't check if the 495 // source exists explicitly because that could recursively trigger redo 496 // and give an infinite recursion. 497 SelfRenewingLease lease = null; 498 boolean sourceFolderGone = false; 499 try { 500 lease = fs.leaseSourceFolder(srcKey); 501 } catch (AzureException e) { 502 503 // If the source folder was not found then somebody probably 504 // raced with us and finished the rename first, or the 505 // first rename failed right before deleting the rename pending 506 // file. 507 String errorCode = ""; 508 try { 509 StorageException se = (StorageException) e.getCause(); 510 errorCode = se.getErrorCode(); 511 } catch (Exception e2) { 512 ; // do nothing -- could not get errorCode 513 } 514 if (errorCode.equals("BlobNotFound")) { 515 sourceFolderGone = true; 516 } else { 517 throw new IOException( 518 "Unexpected error when trying to lease source folder name during " 519 + "folder rename redo", 520 e); 521 } 522 } 523 524 if (!sourceFolderGone) { 525 // Make sure the target folder exists. 526 Path dst = fullPath(dstKey); 527 if (!fs.exists(dst)) { 528 fs.mkdirs(dst); 529 } 530 531 // For each file inside the folder to be renamed, 532 // make sure it has been renamed. 533 for(String fileName : fileStrings) { 534 finishSingleFileRename(fileName); 535 } 536 537 // Remove the source folder. Don't check explicitly if it exists, 538 // to avoid triggering redo recursively. 539 try { 540 fs.getStoreInterface().delete(srcKey, lease); 541 } catch (Exception e) { 542 LOG.info("Unable to delete source folder during folder rename redo. " 543 + "If the source folder is already gone, this is not an error " 544 + "condition. Continuing with redo.", e); 545 } 546 547 // Update the last-modified time of the parent folders of both source 548 // and destination. 549 fs.updateParentFolderLastModifiedTime(srcKey); 550 fs.updateParentFolderLastModifiedTime(dstKey); 551 } 552 553 // Remove the -RenamePending.json file. 554 fs.delete(getRenamePendingFilePath(), false); 555 } 556 557 // See if the source file is still there, and if it is, rename it. 558 private void finishSingleFileRename(String fileName) 559 throws IOException { 560 Path srcFile = fullPath(srcKey, fileName); 561 Path dstFile = fullPath(dstKey, fileName); 562 String srcName = fs.pathToKey(srcFile); 563 String dstName = fs.pathToKey(dstFile); 564 boolean srcExists = fs.getStoreInterface().explicitFileExists(srcName); 565 boolean dstExists = fs.getStoreInterface().explicitFileExists(dstName); 566 if(srcExists) { 567 // Rename gets exclusive access (via a lease) for HBase write-ahead log 568 // (WAL) file processing correctness. See the rename code for details. 569 fs.getStoreInterface().rename(srcName, dstName, true, null); 570 } else if (!srcExists && dstExists) { 571 // The rename already finished, so do nothing. 572 ; 573 } else { 574 throw new IOException( 575 "Attempting to complete rename of file " + srcKey + "/" + fileName 576 + " during folder rename redo, and file was not found in source " 577 + "or destination."); 578 } 579 } 580 581 // Return an absolute path for the specific fileName within the folder 582 // specified by folderKey. 583 private Path fullPath(String folderKey, String fileName) { 584 return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName); 585 } 586 587 private Path fullPath(String fileKey) { 588 return new Path(new Path(fs.getUri()), "/" + fileKey); 589 } 590 } 591 592 private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]"; 593 private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN = 594 Pattern.compile("\\[\\[\\.\\]\\](?=$|/)"); 595 private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)"); 596 597 @Override 598 public String getScheme() { 599 return "wasb"; 600 } 601 602 603 /** 604 * <p> 605 * A {@link FileSystem} for reading and writing files stored on <a 606 * href="http://store.azure.com/">Windows Azure</a>. This implementation is 607 * blob-based and stores files on Azure in their native form so they can be read 608 * by other Azure tools. This implementation uses HTTPS for secure network communication. 609 * </p> 610 */ 611 public static class Secure extends NativeAzureFileSystem { 612 @Override 613 public String getScheme() { 614 return "wasbs"; 615 } 616 } 617 618 public static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystem.class); 619 620 static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size"; 621 /** 622 * The time span in seconds before which we consider a temp blob to be 623 * dangling (not being actively uploaded to) and up for reclamation. 624 * 625 * So e.g. if this is 60, then any temporary blobs more than a minute old 626 * would be considered dangling. 627 */ 628 static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds"; 629 private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600; 630 static final String PATH_DELIMITER = Path.SEPARATOR; 631 static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$"; 632 633 private static final int AZURE_LIST_ALL = -1; 634 private static final int AZURE_UNBOUNDED_DEPTH = -1; 635 636 private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L; 637 638 /** 639 * The configuration property that determines which group owns files created 640 * in WASB. 641 */ 642 private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup"; 643 /** 644 * The default value for fs.azure.permissions.supergroup. Chosen as the same 645 * default as DFS. 646 */ 647 static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup"; 648 649 static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = 650 "fs.azure.block.location.impersonatedhost"; 651 private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = 652 "localhost"; 653 static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME = 654 "fs.azure.ring.buffer.capacity"; 655 static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME = 656 "fs.azure.output.stream.buffer.size"; 657 658 public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics"; 659 660 /* 661 * Property to enable Append API. 662 */ 663 public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support"; 664 665 private class NativeAzureFsInputStream extends FSInputStream { 666 private InputStream in; 667 private final String key; 668 private long pos = 0; 669 private boolean closed = false; 670 private boolean isPageBlob; 671 672 // File length, valid only for streams over block blobs. 673 private long fileLength; 674 675 public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) { 676 this.in = in; 677 this.key = key; 678 this.isPageBlob = store.isPageBlobKey(key); 679 this.fileLength = fileLength; 680 } 681 682 /** 683 * Return the size of the remaining available bytes 684 * if the size is less than or equal to {@link Integer#MAX_VALUE}, 685 * otherwise, return {@link Integer#MAX_VALUE}. 686 * 687 * This is to match the behavior of DFSInputStream.available(), 688 * which some clients may rely on (HBase write-ahead log reading in 689 * particular). 690 */ 691 @Override 692 public synchronized int available() throws IOException { 693 if (isPageBlob) { 694 return in.available(); 695 } else { 696 if (closed) { 697 throw new IOException("Stream closed"); 698 } 699 final long remaining = this.fileLength - pos; 700 return remaining <= Integer.MAX_VALUE ? 701 (int) remaining : Integer.MAX_VALUE; 702 } 703 } 704 705 /* 706 * Reads the next byte of data from the input stream. The value byte is 707 * returned as an integer in the range 0 to 255. If no byte is available 708 * because the end of the stream has been reached, the value -1 is returned. 709 * This method blocks until input data is available, the end of the stream 710 * is detected, or an exception is thrown. 711 * 712 * @returns int An integer corresponding to the byte read. 713 */ 714 @Override 715 public synchronized int read() throws FileNotFoundException, IOException { 716 try { 717 int result = 0; 718 result = in.read(); 719 if (result != -1) { 720 pos++; 721 if (statistics != null) { 722 statistics.incrementBytesRead(1); 723 } 724 } 725 // Return to the caller with the result. 726 // 727 return result; 728 } catch(IOException e) { 729 730 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 731 732 if (innerException instanceof StorageException) { 733 734 LOG.error("Encountered Storage Exception for read on Blob : {}" 735 + " Exception details: {} Error Code : {}", 736 key, e, ((StorageException) innerException).getErrorCode()); 737 738 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 739 throw new FileNotFoundException(String.format("%s is not found", key)); 740 } 741 } 742 743 throw e; 744 } 745 } 746 747 /* 748 * Reads up to len bytes of data from the input stream into an array of 749 * bytes. An attempt is made to read as many as len bytes, but a smaller 750 * number may be read. The number of bytes actually read is returned as an 751 * integer. This method blocks until input data is available, end of file is 752 * detected, or an exception is thrown. If len is zero, then no bytes are 753 * read and 0 is returned; otherwise, there is an attempt to read at least 754 * one byte. If no byte is available because the stream is at end of file, 755 * the value -1 is returned; otherwise, at least one byte is read and stored 756 * into b. 757 * 758 * @param b -- the buffer into which data is read 759 * 760 * @param off -- the start offset in the array b at which data is written 761 * 762 * @param len -- the maximum number of bytes read 763 * 764 * @ returns int The total number of byes read into the buffer, or -1 if 765 * there is no more data because the end of stream is reached. 766 */ 767 @Override 768 public synchronized int read(byte[] b, int off, int len) throws FileNotFoundException, IOException { 769 try { 770 int result = 0; 771 result = in.read(b, off, len); 772 if (result > 0) { 773 pos += result; 774 } 775 776 if (null != statistics) { 777 statistics.incrementBytesRead(result); 778 } 779 780 // Return to the caller with the result. 781 return result; 782 } catch(IOException e) { 783 784 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 785 786 if (innerException instanceof StorageException) { 787 788 LOG.error("Encountered Storage Exception for read on Blob : {}" 789 + " Exception details: {} Error Code : {}", 790 key, e, ((StorageException) innerException).getErrorCode()); 791 792 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 793 throw new FileNotFoundException(String.format("%s is not found", key)); 794 } 795 } 796 797 throw e; 798 } 799 } 800 801 @Override 802 public synchronized void close() throws IOException { 803 if (!closed) { 804 closed = true; 805 IOUtils.closeStream(in); 806 in = null; 807 } 808 } 809 810 @Override 811 public synchronized void seek(long pos) throws FileNotFoundException, EOFException, IOException { 812 try { 813 checkNotClosed(); 814 if (pos < 0) { 815 throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); 816 } 817 IOUtils.closeStream(in); 818 in = store.retrieve(key); 819 this.pos = in.skip(pos); 820 LOG.debug("Seek to position {}. Bytes skipped {}", pos, 821 this.pos); 822 } catch(IOException e) { 823 824 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 825 826 if (innerException instanceof StorageException 827 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 828 throw new FileNotFoundException(String.format("%s is not found", key)); 829 } 830 831 throw e; 832 } catch(IndexOutOfBoundsException e) { 833 throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); 834 } 835 } 836 837 @Override 838 public synchronized long getPos() throws IOException { 839 return pos; 840 } 841 842 @Override 843 public boolean seekToNewSource(long targetPos) throws IOException { 844 return false; 845 } 846 847 848 /* 849 * Helper method to check if a stream is closed. 850 */ 851 private void checkNotClosed() throws IOException { 852 if (closed) { 853 throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); 854 } 855 } 856 } 857 858 private class NativeAzureFsOutputStream extends OutputStream { 859 // We should not override flush() to actually close current block and flush 860 // to DFS, this will break applications that assume flush() is a no-op. 861 // Applications are advised to use Syncable.hflush() for that purpose. 862 // NativeAzureFsOutputStream needs to implement Syncable if needed. 863 private String key; 864 private String keyEncoded; 865 private OutputStream out; 866 867 public NativeAzureFsOutputStream(OutputStream out, String aKey, 868 String anEncodedKey) throws IOException { 869 // Check input arguments. The output stream should be non-null and the 870 // keys 871 // should be valid strings. 872 if (null == out) { 873 throw new IllegalArgumentException( 874 "Illegal argument: the output stream is null."); 875 } 876 877 if (null == aKey || 0 == aKey.length()) { 878 throw new IllegalArgumentException( 879 "Illegal argument the key string is null or empty"); 880 } 881 882 if (null == anEncodedKey || 0 == anEncodedKey.length()) { 883 throw new IllegalArgumentException( 884 "Illegal argument the encoded key string is null or empty"); 885 } 886 887 // Initialize the member variables with the incoming parameters. 888 this.out = out; 889 890 setKey(aKey); 891 setEncodedKey(anEncodedKey); 892 } 893 894 @Override 895 public synchronized void close() throws IOException { 896 if (out != null) { 897 // Close the output stream and decode the key for the output stream 898 // before returning to the caller. 899 // 900 out.close(); 901 restoreKey(); 902 out = null; 903 } 904 } 905 906 /** 907 * Writes the specified byte to this output stream. The general contract for 908 * write is that one byte is written to the output stream. The byte to be 909 * written is the eight low-order bits of the argument b. The 24 high-order 910 * bits of b are ignored. 911 * 912 * @param b 913 * 32-bit integer of block of 4 bytes 914 */ 915 @Override 916 public void write(int b) throws IOException { 917 try { 918 out.write(b); 919 } catch(IOException e) { 920 if (e.getCause() instanceof StorageException) { 921 StorageException storageExcp = (StorageException) e.getCause(); 922 LOG.error("Encountered Storage Exception for write on Blob : {}" 923 + " Exception details: {} Error Code : {}", 924 key, e.getMessage(), storageExcp.getErrorCode()); 925 } 926 throw e; 927 } 928 } 929 930 /** 931 * Writes b.length bytes from the specified byte array to this output 932 * stream. The general contract for write(b) is that it should have exactly 933 * the same effect as the call write(b, 0, b.length). 934 * 935 * @param b 936 * Block of bytes to be written to the output stream. 937 */ 938 @Override 939 public void write(byte[] b) throws IOException { 940 try { 941 out.write(b); 942 } catch(IOException e) { 943 if (e.getCause() instanceof StorageException) { 944 StorageException storageExcp = (StorageException) e.getCause(); 945 LOG.error("Encountered Storage Exception for write on Blob : {}" 946 + " Exception details: {} Error Code : {}", 947 key, e.getMessage(), storageExcp.getErrorCode()); 948 } 949 throw e; 950 } 951 } 952 953 /** 954 * Writes <code>len</code> from the specified byte array starting at offset 955 * <code>off</code> to the output stream. The general contract for write(b, 956 * off, len) is that some of the bytes in the array <code> 957 * b</code b> are written to the output stream in order; element 958 * <code>b[off]</code> is the first byte written and 959 * <code>b[off+len-1]</code> is the last byte written by this operation. 960 * 961 * @param b 962 * Byte array to be written. 963 * @param off 964 * Write this offset in stream. 965 * @param len 966 * Number of bytes to be written. 967 */ 968 @Override 969 public void write(byte[] b, int off, int len) throws IOException { 970 try { 971 out.write(b, off, len); 972 } catch(IOException e) { 973 if (e.getCause() instanceof StorageException) { 974 StorageException storageExcp = (StorageException) e.getCause(); 975 LOG.error("Encountered Storage Exception for write on Blob : {}" 976 + " Exception details: {} Error Code : {}", 977 key, e.getMessage(), storageExcp.getErrorCode()); 978 } 979 throw e; 980 } 981 } 982 983 /** 984 * Get the blob name. 985 * 986 * @return String Blob name. 987 */ 988 public String getKey() { 989 return key; 990 } 991 992 /** 993 * Set the blob name. 994 * 995 * @param key 996 * Blob name. 997 */ 998 public void setKey(String key) { 999 this.key = key; 1000 } 1001 1002 /** 1003 * Get the blob name. 1004 * 1005 * @return String Blob name. 1006 */ 1007 public String getEncodedKey() { 1008 return keyEncoded; 1009 } 1010 1011 /** 1012 * Set the blob name. 1013 * 1014 * @param anEncodedKey 1015 * Blob name. 1016 */ 1017 public void setEncodedKey(String anEncodedKey) { 1018 this.keyEncoded = anEncodedKey; 1019 } 1020 1021 /** 1022 * Restore the original key name from the m_key member variable. Note: The 1023 * output file stream is created with an encoded blob store key to guarantee 1024 * load balancing on the front end of the Azure storage partition servers. 1025 * The create also includes the name of the original key value which is 1026 * stored in the m_key member variable. This method should only be called 1027 * when the stream is closed. 1028 */ 1029 private void restoreKey() throws IOException { 1030 store.rename(getEncodedKey(), getKey()); 1031 } 1032 } 1033 1034 private URI uri; 1035 private NativeFileSystemStore store; 1036 private AzureNativeFileSystemStore actualStore; 1037 private Path workingDir; 1038 private long blockSize = MAX_AZURE_BLOCK_SIZE; 1039 private AzureFileSystemInstrumentation instrumentation; 1040 private String metricsSourceName; 1041 private boolean isClosed = false; 1042 private static boolean suppressRetryPolicy = false; 1043 // A counter to create unique (within-process) names for my metrics sources. 1044 private static AtomicInteger metricsSourceNameCounter = new AtomicInteger(); 1045 private boolean appendSupportEnabled = false; 1046 1047 public NativeAzureFileSystem() { 1048 // set store in initialize() 1049 } 1050 1051 public NativeAzureFileSystem(NativeFileSystemStore store) { 1052 this.store = store; 1053 } 1054 1055 /** 1056 * Suppress the default retry policy for the Storage, useful in unit tests to 1057 * test negative cases without waiting forever. 1058 */ 1059 @VisibleForTesting 1060 static void suppressRetryPolicy() { 1061 suppressRetryPolicy = true; 1062 } 1063 1064 /** 1065 * Undo the effect of suppressRetryPolicy. 1066 */ 1067 @VisibleForTesting 1068 static void resumeRetryPolicy() { 1069 suppressRetryPolicy = false; 1070 } 1071 1072 /** 1073 * Creates a new metrics source name that's unique within this process. 1074 */ 1075 @VisibleForTesting 1076 public static String newMetricsSourceName() { 1077 int number = metricsSourceNameCounter.incrementAndGet(); 1078 final String baseName = "AzureFileSystemMetrics"; 1079 if (number == 1) { // No need for a suffix for the first one 1080 return baseName; 1081 } else { 1082 return baseName + number; 1083 } 1084 } 1085 1086 /** 1087 * Checks if the given URI scheme is a scheme that's affiliated with the Azure 1088 * File System. 1089 * 1090 * @param scheme 1091 * The URI scheme. 1092 * @return true iff it's an Azure File System URI scheme. 1093 */ 1094 private static boolean isWasbScheme(String scheme) { 1095 // The valid schemes are: asv (old name), asvs (old name over HTTPS), 1096 // wasb (new name), wasbs (new name over HTTPS). 1097 return scheme != null 1098 && (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs") 1099 || scheme.equalsIgnoreCase("wasb") || scheme 1100 .equalsIgnoreCase("wasbs")); 1101 } 1102 1103 /** 1104 * Puts in the authority of the default file system if it is a WASB file 1105 * system and the given URI's authority is null. 1106 * 1107 * @return The URI with reconstructed authority if necessary and possible. 1108 */ 1109 private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) { 1110 if (null == uri.getAuthority()) { 1111 // If WASB is the default file system, get the authority from there 1112 URI defaultUri = FileSystem.getDefaultUri(conf); 1113 if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) { 1114 try { 1115 // Reconstruct the URI with the authority from the default URI. 1116 return new URI(uri.getScheme(), defaultUri.getAuthority(), 1117 uri.getPath(), uri.getQuery(), uri.getFragment()); 1118 } catch (URISyntaxException e) { 1119 // This should never happen. 1120 throw new Error("Bad URI construction", e); 1121 } 1122 } 1123 } 1124 return uri; 1125 } 1126 1127 @Override 1128 protected void checkPath(Path path) { 1129 // Make sure to reconstruct the path's authority if needed 1130 super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(), 1131 getConf()))); 1132 } 1133 1134 @Override 1135 public void initialize(URI uri, Configuration conf) 1136 throws IOException, IllegalArgumentException { 1137 // Check authority for the URI to guarantee that it is non-null. 1138 uri = reconstructAuthorityIfNeeded(uri, conf); 1139 if (null == uri.getAuthority()) { 1140 final String errMsg = String 1141 .format("Cannot initialize WASB file system, URI authority not recognized."); 1142 throw new IllegalArgumentException(errMsg); 1143 } 1144 super.initialize(uri, conf); 1145 1146 if (store == null) { 1147 store = createDefaultStore(conf); 1148 } 1149 1150 instrumentation = new AzureFileSystemInstrumentation(conf); 1151 if(!conf.getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { 1152 // Make sure the metrics system is available before interacting with Azure 1153 AzureFileSystemMetricsSystem.fileSystemStarted(); 1154 metricsSourceName = newMetricsSourceName(); 1155 String sourceDesc = "Azure Storage Volume File System metrics"; 1156 AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc, 1157 instrumentation); 1158 } 1159 1160 store.initialize(uri, conf, instrumentation); 1161 setConf(conf); 1162 this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); 1163 this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() 1164 .getShortUserName()).makeQualified(getUri(), getWorkingDirectory()); 1165 this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, 1166 MAX_AZURE_BLOCK_SIZE); 1167 1168 this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false); 1169 LOG.debug("NativeAzureFileSystem. Initializing."); 1170 LOG.debug(" blockSize = {}", 1171 conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE)); 1172 1173 } 1174 1175 private NativeFileSystemStore createDefaultStore(Configuration conf) { 1176 actualStore = new AzureNativeFileSystemStore(); 1177 1178 if (suppressRetryPolicy) { 1179 actualStore.suppressRetryPolicy(); 1180 } 1181 return actualStore; 1182 } 1183 1184 /** 1185 * Azure Storage doesn't allow the blob names to end in a period, 1186 * so encode this here to work around that limitation. 1187 */ 1188 private static String encodeTrailingPeriod(String toEncode) { 1189 Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode); 1190 return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER); 1191 } 1192 1193 /** 1194 * Reverse the encoding done by encodeTrailingPeriod(). 1195 */ 1196 private static String decodeTrailingPeriod(String toDecode) { 1197 Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode); 1198 return matcher.replaceAll("."); 1199 } 1200 1201 /** 1202 * Convert the path to a key. By convention, any leading or trailing slash is 1203 * removed, except for the special case of a single slash. 1204 */ 1205 @VisibleForTesting 1206 public String pathToKey(Path path) { 1207 // Convert the path to a URI to parse the scheme, the authority, and the 1208 // path from the path object. 1209 URI tmpUri = path.toUri(); 1210 String pathUri = tmpUri.getPath(); 1211 1212 // The scheme and authority is valid. If the path does not exist add a "/" 1213 // separator to list the root of the container. 1214 Path newPath = path; 1215 if ("".equals(pathUri)) { 1216 newPath = new Path(tmpUri.toString() + Path.SEPARATOR); 1217 } 1218 1219 // Verify path is absolute if the path refers to a windows drive scheme. 1220 if (!newPath.isAbsolute()) { 1221 throw new IllegalArgumentException("Path must be absolute: " + path); 1222 } 1223 1224 String key = null; 1225 key = newPath.toUri().getPath(); 1226 key = removeTrailingSlash(key); 1227 key = encodeTrailingPeriod(key); 1228 if (key.length() == 1) { 1229 return key; 1230 } else { 1231 return key.substring(1); // remove initial slash 1232 } 1233 } 1234 1235 // Remove any trailing slash except for the case of a single slash. 1236 private static String removeTrailingSlash(String key) { 1237 if (key.length() == 0 || key.length() == 1) { 1238 return key; 1239 } 1240 if (key.charAt(key.length() - 1) == '/') { 1241 return key.substring(0, key.length() - 1); 1242 } else { 1243 return key; 1244 } 1245 } 1246 1247 private static Path keyToPath(String key) { 1248 if (key.equals("/")) { 1249 return new Path("/"); // container 1250 } 1251 return new Path("/" + decodeTrailingPeriod(key)); 1252 } 1253 1254 /** 1255 * Get the absolute version of the path (fully qualified). 1256 * This is public for testing purposes. 1257 * 1258 * @param path 1259 * @return fully qualified path 1260 */ 1261 @VisibleForTesting 1262 public Path makeAbsolute(Path path) { 1263 if (path.isAbsolute()) { 1264 return path; 1265 } 1266 return new Path(workingDir, path); 1267 } 1268 1269 /** 1270 * For unit test purposes, retrieves the AzureNativeFileSystemStore store 1271 * backing this file system. 1272 * 1273 * @return The store object. 1274 */ 1275 @VisibleForTesting 1276 public AzureNativeFileSystemStore getStore() { 1277 return actualStore; 1278 } 1279 1280 NativeFileSystemStore getStoreInterface() { 1281 return store; 1282 } 1283 1284 /** 1285 * Gets the metrics source for this file system. 1286 * This is mainly here for unit testing purposes. 1287 * 1288 * @return the metrics source. 1289 */ 1290 public AzureFileSystemInstrumentation getInstrumentation() { 1291 return instrumentation; 1292 } 1293 1294 /** This optional operation is not yet supported. */ 1295 @Override 1296 public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) 1297 throws IOException { 1298 1299 if (!appendSupportEnabled) { 1300 throw new UnsupportedOperationException("Append Support not enabled"); 1301 } 1302 1303 LOG.debug("Opening file: {} for append", f); 1304 1305 Path absolutePath = makeAbsolute(f); 1306 String key = pathToKey(absolutePath); 1307 FileMetadata meta = null; 1308 try { 1309 meta = store.retrieveMetadata(key); 1310 } catch(Exception ex) { 1311 1312 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 1313 1314 if (innerException instanceof StorageException 1315 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1316 1317 throw new FileNotFoundException(String.format("%s is not found", key)); 1318 } else { 1319 throw ex; 1320 } 1321 } 1322 1323 if (meta == null) { 1324 throw new FileNotFoundException(f.toString()); 1325 } 1326 1327 if (meta.isDir()) { 1328 throw new FileNotFoundException(f.toString() 1329 + " is a directory not a file."); 1330 } 1331 1332 if (store.isPageBlobKey(key)) { 1333 throw new IOException("Append not supported for Page Blobs"); 1334 } 1335 1336 DataOutputStream appendStream = null; 1337 1338 try { 1339 appendStream = store.retrieveAppendStream(key, bufferSize); 1340 } catch (Exception ex) { 1341 1342 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 1343 1344 if (innerException instanceof StorageException 1345 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1346 throw new FileNotFoundException(String.format("%s is not found", key)); 1347 } else { 1348 throw ex; 1349 } 1350 } 1351 1352 return new FSDataOutputStream(appendStream, statistics); 1353 } 1354 1355 @Override 1356 public FSDataOutputStream create(Path f, FsPermission permission, 1357 boolean overwrite, int bufferSize, short replication, long blockSize, 1358 Progressable progress) throws IOException { 1359 return create(f, permission, overwrite, true, 1360 bufferSize, replication, blockSize, progress, 1361 (SelfRenewingLease) null); 1362 } 1363 1364 /** 1365 * Get a self-renewing lease on the specified file. 1366 */ 1367 public SelfRenewingLease acquireLease(Path path) throws AzureException { 1368 String fullKey = pathToKey(makeAbsolute(path)); 1369 return getStore().acquireLease(fullKey); 1370 } 1371 1372 @Override 1373 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 1374 boolean overwrite, int bufferSize, short replication, long blockSize, 1375 Progressable progress) throws IOException { 1376 1377 Path parent = f.getParent(); 1378 1379 // Get exclusive access to folder if this is a directory designated 1380 // for atomic rename. The primary use case of for HBase write-ahead 1381 // log file management. 1382 SelfRenewingLease lease = null; 1383 if (store.isAtomicRenameKey(pathToKey(f))) { 1384 try { 1385 lease = acquireLease(parent); 1386 } catch (AzureException e) { 1387 1388 String errorCode = ""; 1389 try { 1390 StorageException e2 = (StorageException) e.getCause(); 1391 errorCode = e2.getErrorCode(); 1392 } catch (Exception e3) { 1393 // do nothing if cast fails 1394 } 1395 if (errorCode.equals("BlobNotFound")) { 1396 throw new FileNotFoundException("Cannot create file " + 1397 f.getName() + " because parent folder does not exist."); 1398 } 1399 1400 LOG.warn("Got unexpected exception trying to get lease on {} . {}", 1401 pathToKey(parent), e.getMessage()); 1402 throw e; 1403 } 1404 } 1405 1406 // See if the parent folder exists. If not, throw error. 1407 // The exists() check will push any pending rename operation forward, 1408 // if there is one, and return false. 1409 // 1410 // At this point, we have exclusive access to the source folder 1411 // via the lease, so we will not conflict with an active folder 1412 // rename operation. 1413 if (!exists(parent)) { 1414 try { 1415 1416 // This'll let the keep-alive thread exit as soon as it wakes up. 1417 lease.free(); 1418 } catch (Exception e) { 1419 LOG.warn("Unable to free lease because: {}", e.getMessage()); 1420 } 1421 throw new FileNotFoundException("Cannot create file " + 1422 f.getName() + " because parent folder does not exist."); 1423 } 1424 1425 // Create file inside folder. 1426 FSDataOutputStream out = null; 1427 try { 1428 out = create(f, permission, overwrite, false, 1429 bufferSize, replication, blockSize, progress, lease); 1430 } finally { 1431 // Release exclusive access to folder. 1432 try { 1433 if (lease != null) { 1434 lease.free(); 1435 } 1436 } catch (Exception e) { 1437 NativeAzureFileSystemHelper.cleanup(LOG, out); 1438 String msg = "Unable to free lease on " + parent.toUri(); 1439 LOG.error(msg); 1440 throw new IOException(msg, e); 1441 } 1442 } 1443 return out; 1444 } 1445 1446 @Override 1447 public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, 1448 EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, 1449 Progressable progress) throws IOException { 1450 1451 // Check if file should be appended or overwritten. Assume that the file 1452 // is overwritten on if the CREATE and OVERWRITE create flags are set. Note 1453 // that any other combinations of create flags will result in an open new or 1454 // open with append. 1455 final EnumSet<CreateFlag> createflags = 1456 EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); 1457 boolean overwrite = flags.containsAll(createflags); 1458 1459 // Delegate the create non-recursive call. 1460 return this.createNonRecursive(f, permission, overwrite, 1461 bufferSize, replication, blockSize, progress); 1462 } 1463 1464 @Override 1465 public FSDataOutputStream createNonRecursive(Path f, 1466 boolean overwrite, int bufferSize, short replication, long blockSize, 1467 Progressable progress) throws IOException { 1468 return this.createNonRecursive(f, FsPermission.getFileDefault(), 1469 overwrite, bufferSize, replication, blockSize, progress); 1470 } 1471 1472 1473 /** 1474 * Create an Azure blob and return an output stream to use 1475 * to write data to it. 1476 * 1477 * @param f 1478 * @param permission 1479 * @param overwrite 1480 * @param createParent 1481 * @param bufferSize 1482 * @param replication 1483 * @param blockSize 1484 * @param progress 1485 * @param parentFolderLease Lease on parent folder (or null if 1486 * no lease). 1487 * @return 1488 * @throws IOException 1489 */ 1490 private FSDataOutputStream create(Path f, FsPermission permission, 1491 boolean overwrite, boolean createParent, int bufferSize, 1492 short replication, long blockSize, Progressable progress, 1493 SelfRenewingLease parentFolderLease) 1494 throws FileAlreadyExistsException, IOException { 1495 1496 LOG.debug("Creating file: {}", f.toString()); 1497 1498 if (containsColon(f)) { 1499 throw new IOException("Cannot create file " + f 1500 + " through WASB that has colons in the name"); 1501 } 1502 1503 Path absolutePath = makeAbsolute(f); 1504 String key = pathToKey(absolutePath); 1505 1506 FileMetadata existingMetadata = store.retrieveMetadata(key); 1507 if (existingMetadata != null) { 1508 if (existingMetadata.isDir()) { 1509 throw new FileAlreadyExistsException("Cannot create file " + f 1510 + "; already exists as a directory."); 1511 } 1512 if (!overwrite) { 1513 throw new FileAlreadyExistsException("File already exists:" + f); 1514 } 1515 } 1516 1517 Path parentFolder = absolutePath.getParent(); 1518 if (parentFolder != null && parentFolder.getParent() != null) { // skip root 1519 // Update the parent folder last modified time if the parent folder 1520 // already exists. 1521 String parentKey = pathToKey(parentFolder); 1522 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 1523 if (parentMetadata != null && parentMetadata.isDir() && 1524 parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) { 1525 if (parentFolderLease != null) { 1526 store.updateFolderLastModifiedTime(parentKey, parentFolderLease); 1527 } else { 1528 updateParentFolderLastModifiedTime(key); 1529 } 1530 } else { 1531 // Make sure that the parent folder exists. 1532 // Create it using inherited permissions from the first existing directory going up the path 1533 Path firstExisting = parentFolder.getParent(); 1534 FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting)); 1535 while(metadata == null) { 1536 // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata 1537 firstExisting = firstExisting.getParent(); 1538 metadata = store.retrieveMetadata(pathToKey(firstExisting)); 1539 } 1540 mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true); 1541 } 1542 } 1543 1544 // Mask the permission first (with the default permission mask as well). 1545 FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile); 1546 PermissionStatus permissionStatus = createPermissionStatus(masked); 1547 1548 OutputStream bufOutStream; 1549 if (store.isPageBlobKey(key)) { 1550 // Store page blobs directly in-place without renames. 1551 bufOutStream = store.storefile(key, permissionStatus); 1552 } else { 1553 // This is a block blob, so open the output blob stream based on the 1554 // encoded key. 1555 // 1556 String keyEncoded = encodeKey(key); 1557 1558 1559 // First create a blob at the real key, pointing back to the temporary file 1560 // This accomplishes a few things: 1561 // 1. Makes sure we can create a file there. 1562 // 2. Makes it visible to other concurrent threads/processes/nodes what 1563 // we're 1564 // doing. 1565 // 3. Makes it easier to restore/cleanup data in the event of us crashing. 1566 store.storeEmptyLinkFile(key, keyEncoded, permissionStatus); 1567 1568 // The key is encoded to point to a common container at the storage server. 1569 // This reduces the number of splits on the server side when load balancing. 1570 // Ingress to Azure storage can take advantage of earlier splits. We remove 1571 // the root path to the key and prefix a random GUID to the tail (or leaf 1572 // filename) of the key. Keys are thus broadly and randomly distributed over 1573 // a single container to ease load balancing on the storage server. When the 1574 // blob is committed it is renamed to its earlier key. Uncommitted blocks 1575 // are not cleaned up and we leave it to Azure storage to garbage collect 1576 // these 1577 // blocks. 1578 bufOutStream = new NativeAzureFsOutputStream(store.storefile( 1579 keyEncoded, permissionStatus), key, keyEncoded); 1580 } 1581 // Construct the data output stream from the buffered output stream. 1582 FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics); 1583 1584 1585 // Increment the counter 1586 instrumentation.fileCreated(); 1587 1588 // Return data output stream to caller. 1589 return fsOut; 1590 } 1591 1592 @Override 1593 @Deprecated 1594 public boolean delete(Path path) throws IOException { 1595 return delete(path, true); 1596 } 1597 1598 @Override 1599 public boolean delete(Path f, boolean recursive) throws IOException { 1600 return delete(f, recursive, false); 1601 } 1602 1603 /** 1604 * Delete the specified file or folder. The parameter 1605 * skipParentFolderLastModifidedTimeUpdate 1606 * is used in the case of atomic folder rename redo. In that case, there is 1607 * a lease on the parent folder, so (without reworking the code) modifying 1608 * the parent folder update time will fail because of a conflict with the 1609 * lease. Since we are going to delete the folder soon anyway so accurate 1610 * modified time is not necessary, it's easier to just skip 1611 * the modified time update. 1612 * 1613 * @param f 1614 * @param recursive 1615 * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last 1616 * modified time. 1617 * @return true if and only if the file is deleted 1618 * @throws IOException 1619 */ 1620 public boolean delete(Path f, boolean recursive, 1621 boolean skipParentFolderLastModifidedTimeUpdate) throws IOException { 1622 1623 LOG.debug("Deleting file: {}", f.toString()); 1624 1625 Path absolutePath = makeAbsolute(f); 1626 String key = pathToKey(absolutePath); 1627 1628 // Capture the metadata for the path. 1629 // 1630 FileMetadata metaFile = null; 1631 try { 1632 metaFile = store.retrieveMetadata(key); 1633 } catch (IOException e) { 1634 1635 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1636 1637 if (innerException instanceof StorageException 1638 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1639 1640 return false; 1641 } 1642 throw e; 1643 } 1644 1645 if (null == metaFile) { 1646 // The path to be deleted does not exist. 1647 return false; 1648 } 1649 1650 // The path exists, determine if it is a folder containing objects, 1651 // an empty folder, or a simple file and take the appropriate actions. 1652 if (!metaFile.isDir()) { 1653 // The path specifies a file. We need to check the parent path 1654 // to make sure it's a proper materialized directory before we 1655 // delete the file. Otherwise we may get into a situation where 1656 // the file we were deleting was the last one in an implicit directory 1657 // (e.g. the blob store only contains the blob a/b and there's no 1658 // corresponding directory blob a) and that would implicitly delete 1659 // the directory as well, which is not correct. 1660 Path parentPath = absolutePath.getParent(); 1661 if (parentPath.getParent() != null) {// Not root 1662 String parentKey = pathToKey(parentPath); 1663 1664 FileMetadata parentMetadata = null; 1665 try { 1666 parentMetadata = store.retrieveMetadata(parentKey); 1667 } catch (IOException e) { 1668 1669 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1670 1671 if (innerException instanceof StorageException) { 1672 // Invalid State. 1673 // A FileNotFoundException is not thrown here as the API returns false 1674 // if the file not present. But not retrieving metadata here is an 1675 // unrecoverable state and can only happen if there is a race condition 1676 // hence throwing a IOException 1677 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1678 throw new IOException("File " + f + " has a parent directory " 1679 + parentPath + " whose metadata cannot be retrieved. Can't resolve"); 1680 } 1681 } 1682 throw e; 1683 } 1684 1685 // Invalid State. 1686 // A FileNotFoundException is not thrown here as the API returns false 1687 // if the file not present. But not retrieving metadata here is an 1688 // unrecoverable state and can only happen if there is a race condition 1689 // hence throwing a IOException 1690 if (parentMetadata == null) { 1691 throw new IOException("File " + f + " has a parent directory " 1692 + parentPath + " whose metadata cannot be retrieved. Can't resolve"); 1693 } 1694 1695 if (!parentMetadata.isDir()) { 1696 // Invalid state: the parent path is actually a file. Throw. 1697 throw new AzureException("File " + f + " has a parent directory " 1698 + parentPath + " which is also a file. Can't resolve."); 1699 } 1700 1701 if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 1702 LOG.debug("Found an implicit parent directory while trying to" 1703 + " delete the file {}. Creating the directory blob for" 1704 + " it in {}.", f, parentKey); 1705 1706 store.storeEmptyFolder(parentKey, 1707 createPermissionStatus(FsPermission.getDefault())); 1708 } else { 1709 if (!skipParentFolderLastModifidedTimeUpdate) { 1710 updateParentFolderLastModifiedTime(key); 1711 } 1712 } 1713 } 1714 1715 try { 1716 store.delete(key); 1717 instrumentation.fileDeleted(); 1718 } catch(IOException e) { 1719 1720 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1721 1722 if (innerException instanceof StorageException 1723 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1724 return false; 1725 } 1726 1727 throw e; 1728 } 1729 } else { 1730 // The path specifies a folder. Recursively delete all entries under the 1731 // folder. 1732 LOG.debug("Directory Delete encountered: {}", f.toString()); 1733 Path parentPath = absolutePath.getParent(); 1734 if (parentPath.getParent() != null) { 1735 String parentKey = pathToKey(parentPath); 1736 FileMetadata parentMetadata = null; 1737 1738 try { 1739 parentMetadata = store.retrieveMetadata(parentKey); 1740 } catch (IOException e) { 1741 1742 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1743 1744 if (innerException instanceof StorageException) { 1745 // Invalid State. 1746 // A FileNotFoundException is not thrown here as the API returns false 1747 // if the file not present. But not retrieving metadata here is an 1748 // unrecoverable state and can only happen if there is a race condition 1749 // hence throwing a IOException 1750 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1751 throw new IOException("File " + f + " has a parent directory " 1752 + parentPath + " whose metadata cannot be retrieved. Can't resolve"); 1753 } 1754 } 1755 throw e; 1756 } 1757 1758 // Invalid State. 1759 // A FileNotFoundException is not thrown here as the API returns false 1760 // if the file not present. But not retrieving metadata here is an 1761 // unrecoverable state and can only happen if there is a race condition 1762 // hence throwing a IOException 1763 if (parentMetadata == null) { 1764 throw new IOException("File " + f + " has a parent directory " 1765 + parentPath + " whose metadata cannot be retrieved. Can't resolve"); 1766 } 1767 1768 if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 1769 LOG.debug("Found an implicit parent directory while trying to" 1770 + " delete the directory {}. Creating the directory blob for" 1771 + " it in {}. ", f, parentKey); 1772 1773 store.storeEmptyFolder(parentKey, 1774 createPermissionStatus(FsPermission.getDefault())); 1775 } 1776 } 1777 1778 // List all the blobs in the current folder. 1779 String priorLastKey = null; 1780 PartialListing listing = null; 1781 try { 1782 listing = store.listAll(key, AZURE_LIST_ALL, 1, 1783 priorLastKey); 1784 } catch(IOException e) { 1785 1786 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1787 1788 if (innerException instanceof StorageException 1789 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1790 return false; 1791 } 1792 1793 throw e; 1794 } 1795 1796 if (listing == null) { 1797 return false; 1798 } 1799 1800 FileMetadata[] contents = listing.getFiles(); 1801 if (!recursive && contents.length > 0) { 1802 // The folder is non-empty and recursive delete was not specified. 1803 // Throw an exception indicating that a non-recursive delete was 1804 // specified for a non-empty folder. 1805 throw new IOException("Non-recursive delete of non-empty directory " 1806 + f.toString()); 1807 } 1808 1809 // Delete all the files in the folder. 1810 for (FileMetadata p : contents) { 1811 // Tag on the directory name found as the suffix of the suffix of the 1812 // parent directory to get the new absolute path. 1813 String suffix = p.getKey().substring( 1814 p.getKey().lastIndexOf(PATH_DELIMITER)); 1815 if (!p.isDir()) { 1816 try { 1817 store.delete(key + suffix); 1818 instrumentation.fileDeleted(); 1819 } catch(IOException e) { 1820 1821 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1822 1823 if (innerException instanceof StorageException 1824 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1825 return false; 1826 } 1827 1828 throw e; 1829 } 1830 } else { 1831 // Recursively delete contents of the sub-folders. Notice this also 1832 // deletes the blob for the directory. 1833 if (!delete(new Path(f.toString() + suffix), true)) { 1834 return false; 1835 } 1836 } 1837 } 1838 1839 try { 1840 store.delete(key); 1841 } catch(IOException e) { 1842 1843 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); 1844 1845 if (innerException instanceof StorageException 1846 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1847 return false; 1848 } 1849 1850 throw e; 1851 } 1852 1853 // Update parent directory last modified time 1854 Path parent = absolutePath.getParent(); 1855 if (parent != null && parent.getParent() != null) { // not root 1856 if (!skipParentFolderLastModifidedTimeUpdate) { 1857 updateParentFolderLastModifiedTime(key); 1858 } 1859 } 1860 instrumentation.directoryDeleted(); 1861 } 1862 1863 // File or directory was successfully deleted. 1864 LOG.debug("Delete Successful for : {}", f.toString()); 1865 return true; 1866 } 1867 1868 @Override 1869 public FileStatus getFileStatus(Path f) throws FileNotFoundException, IOException { 1870 1871 LOG.debug("Getting the file status for {}", f.toString()); 1872 1873 // Capture the absolute path and the path to key. 1874 Path absolutePath = makeAbsolute(f); 1875 String key = pathToKey(absolutePath); 1876 if (key.length() == 0) { // root always exists 1877 return newDirectory(null, absolutePath); 1878 } 1879 1880 // The path is either a folder or a file. Retrieve metadata to 1881 // determine if it is a directory or file. 1882 FileMetadata meta = null; 1883 try { 1884 meta = store.retrieveMetadata(key); 1885 } catch(Exception ex) { 1886 1887 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 1888 1889 if (innerException instanceof StorageException 1890 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1891 1892 throw new FileNotFoundException(String.format("%s is not found", key)); 1893 } 1894 1895 throw ex; 1896 } 1897 1898 if (meta != null) { 1899 if (meta.isDir()) { 1900 // The path is a folder with files in it. 1901 // 1902 1903 LOG.debug("Path {} is a folder.", f.toString()); 1904 1905 // If a rename operation for the folder was pending, redo it. 1906 // Then the file does not exist, so signal that. 1907 if (conditionalRedoFolderRename(f)) { 1908 throw new FileNotFoundException( 1909 absolutePath + ": No such file or directory."); 1910 } 1911 1912 // Return reference to the directory object. 1913 return newDirectory(meta, absolutePath); 1914 } 1915 1916 // The path is a file. 1917 LOG.debug("Found the path: {} as a file.", f.toString()); 1918 1919 // Return with reference to a file object. 1920 return newFile(meta, absolutePath); 1921 } 1922 1923 // File not found. Throw exception no such file or directory. 1924 // 1925 throw new FileNotFoundException( 1926 absolutePath + ": No such file or directory."); 1927 } 1928 1929 // Return true if there is a rename pending and we redo it, otherwise false. 1930 private boolean conditionalRedoFolderRename(Path f) throws IOException { 1931 1932 // Can't rename /, so return immediately in that case. 1933 if (f.getName().equals("")) { 1934 return false; 1935 } 1936 1937 // Check if there is a -RenamePending.json file for this folder, and if so, 1938 // redo the rename. 1939 Path absoluteRenamePendingFile = renamePendingFilePath(f); 1940 if (exists(absoluteRenamePendingFile)) { 1941 FolderRenamePending pending = 1942 new FolderRenamePending(absoluteRenamePendingFile, this); 1943 pending.redo(); 1944 return true; 1945 } else { 1946 return false; 1947 } 1948 } 1949 1950 // Return the path name that would be used for rename of folder with path f. 1951 private Path renamePendingFilePath(Path f) { 1952 Path absPath = makeAbsolute(f); 1953 String key = pathToKey(absPath); 1954 key += "-RenamePending.json"; 1955 return keyToPath(key); 1956 } 1957 1958 @Override 1959 public URI getUri() { 1960 return uri; 1961 } 1962 1963 /** 1964 * Retrieve the status of a given path if it is a file, or of all the 1965 * contained files if it is a directory. 1966 */ 1967 @Override 1968 public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { 1969 1970 LOG.debug("Listing status for {}", f.toString()); 1971 1972 Path absolutePath = makeAbsolute(f); 1973 String key = pathToKey(absolutePath); 1974 Set<FileStatus> status = new TreeSet<FileStatus>(); 1975 FileMetadata meta = null; 1976 try { 1977 meta = store.retrieveMetadata(key); 1978 } catch (IOException ex) { 1979 1980 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 1981 1982 if (innerException instanceof StorageException 1983 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 1984 1985 throw new FileNotFoundException(String.format("%s is not found", f)); 1986 } 1987 1988 throw ex; 1989 } 1990 1991 if (meta != null) { 1992 if (!meta.isDir()) { 1993 1994 LOG.debug("Found path as a file"); 1995 1996 return new FileStatus[] { newFile(meta, absolutePath) }; 1997 } 1998 1999 String partialKey = null; 2000 PartialListing listing = null; 2001 2002 try { 2003 listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); 2004 } catch (IOException ex) { 2005 2006 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2007 2008 if (innerException instanceof StorageException 2009 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2010 2011 throw new FileNotFoundException(String.format("%s is not found", key)); 2012 } 2013 2014 throw ex; 2015 } 2016 // NOTE: We don't check for Null condition as the Store API should return 2017 // an empty list if there are not listing. 2018 2019 // For any -RenamePending.json files in the listing, 2020 // push the rename forward. 2021 boolean renamed = conditionalRedoFolderRenames(listing); 2022 2023 // If any renames were redone, get another listing, 2024 // since the current one may have changed due to the redo. 2025 if (renamed) { 2026 listing = null; 2027 try { 2028 listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); 2029 } catch (IOException ex) { 2030 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2031 2032 if (innerException instanceof StorageException 2033 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2034 2035 throw new FileNotFoundException(String.format("%s is not found", key)); 2036 } 2037 2038 throw ex; 2039 } 2040 } 2041 2042 // NOTE: We don't check for Null condition as the Store API should return 2043 // and empty list if there are not listing. 2044 2045 for (FileMetadata fileMetadata : listing.getFiles()) { 2046 Path subpath = keyToPath(fileMetadata.getKey()); 2047 2048 // Test whether the metadata represents a file or directory and 2049 // add the appropriate metadata object. 2050 // 2051 // Note: There was a very old bug here where directories were added 2052 // to the status set as files flattening out recursive listings 2053 // using "-lsr" down the file system hierarchy. 2054 if (fileMetadata.isDir()) { 2055 // Make sure we hide the temp upload folder 2056 if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) { 2057 // Don't expose that. 2058 continue; 2059 } 2060 status.add(newDirectory(fileMetadata, subpath)); 2061 } else { 2062 status.add(newFile(fileMetadata, subpath)); 2063 } 2064 } 2065 2066 LOG.debug("Found path as a directory with {}" 2067 + " files in it.", status.size()); 2068 2069 } else { 2070 // There is no metadata found for the path. 2071 LOG.debug("Did not find any metadata for path: {}", key); 2072 2073 throw new FileNotFoundException("File" + f + " does not exist."); 2074 } 2075 2076 return status.toArray(new FileStatus[0]); 2077 } 2078 2079 // Redo any folder renames needed if there are rename pending files in the 2080 // directory listing. Return true if one or more redo operations were done. 2081 private boolean conditionalRedoFolderRenames(PartialListing listing) 2082 throws IllegalArgumentException, IOException { 2083 boolean renamed = false; 2084 for (FileMetadata fileMetadata : listing.getFiles()) { 2085 Path subpath = keyToPath(fileMetadata.getKey()); 2086 if (isRenamePendingFile(subpath)) { 2087 FolderRenamePending pending = 2088 new FolderRenamePending(subpath, this); 2089 pending.redo(); 2090 renamed = true; 2091 } 2092 } 2093 return renamed; 2094 } 2095 2096 // True if this is a folder rename pending file, else false. 2097 private boolean isRenamePendingFile(Path path) { 2098 return path.toString().endsWith(FolderRenamePending.SUFFIX); 2099 } 2100 2101 private FileStatus newFile(FileMetadata meta, Path path) { 2102 return new FileStatus ( 2103 meta.getLength(), 2104 false, 2105 1, 2106 blockSize, 2107 meta.getLastModified(), 2108 0, 2109 meta.getPermissionStatus().getPermission(), 2110 meta.getPermissionStatus().getUserName(), 2111 meta.getPermissionStatus().getGroupName(), 2112 path.makeQualified(getUri(), getWorkingDirectory())); 2113 } 2114 2115 private FileStatus newDirectory(FileMetadata meta, Path path) { 2116 return new FileStatus ( 2117 0, 2118 true, 2119 1, 2120 blockSize, 2121 meta == null ? 0 : meta.getLastModified(), 2122 0, 2123 meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(), 2124 meta == null ? "" : meta.getPermissionStatus().getUserName(), 2125 meta == null ? "" : meta.getPermissionStatus().getGroupName(), 2126 path.makeQualified(getUri(), getWorkingDirectory())); 2127 } 2128 2129 private static enum UMaskApplyMode { 2130 NewFile, 2131 NewDirectory, 2132 NewDirectoryNoUmask, 2133 ChangeExistingFile, 2134 ChangeExistingDirectory, 2135 } 2136 2137 /** 2138 * Applies the applicable UMASK's on the given permission. 2139 * 2140 * @param permission 2141 * The permission to mask. 2142 * @param applyMode 2143 * Whether to also apply the default umask. 2144 * @return The masked persmission. 2145 */ 2146 private FsPermission applyUMask(final FsPermission permission, 2147 final UMaskApplyMode applyMode) { 2148 FsPermission newPermission = new FsPermission(permission); 2149 // Apply the default umask - this applies for new files or directories. 2150 if (applyMode == UMaskApplyMode.NewFile 2151 || applyMode == UMaskApplyMode.NewDirectory) { 2152 newPermission = newPermission 2153 .applyUMask(FsPermission.getUMask(getConf())); 2154 } 2155 return newPermission; 2156 } 2157 2158 /** 2159 * Creates the PermissionStatus object to use for the given permission, based 2160 * on the current user in context. 2161 * 2162 * @param permission 2163 * The permission for the file. 2164 * @return The permission status object to use. 2165 * @throws IOException 2166 * If login fails in getCurrentUser 2167 */ 2168 @VisibleForTesting 2169 PermissionStatus createPermissionStatus(FsPermission permission) 2170 throws IOException { 2171 // Create the permission status for this file based on current user 2172 return new PermissionStatus( 2173 UserGroupInformation.getCurrentUser().getShortUserName(), 2174 getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME, 2175 AZURE_DEFAULT_GROUP_DEFAULT), 2176 permission); 2177 } 2178 2179 @Override 2180 public boolean mkdirs(Path f, FsPermission permission) throws IOException { 2181 return mkdirs(f, permission, false); 2182 } 2183 2184 public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException { 2185 2186 2187 LOG.debug("Creating directory: {}", f.toString()); 2188 2189 if (containsColon(f)) { 2190 throw new IOException("Cannot create directory " + f 2191 + " through WASB that has colons in the name"); 2192 } 2193 2194 Path absolutePath = makeAbsolute(f); 2195 PermissionStatus permissionStatus = null; 2196 if(noUmask) { 2197 // ensure owner still has wx permissions at the minimum 2198 permissionStatus = createPermissionStatus( 2199 applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)), 2200 UMaskApplyMode.NewDirectoryNoUmask)); 2201 } else { 2202 permissionStatus = createPermissionStatus( 2203 applyUMask(permission, UMaskApplyMode.NewDirectory)); 2204 } 2205 2206 2207 ArrayList<String> keysToCreateAsFolder = new ArrayList<String>(); 2208 ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>(); 2209 boolean childCreated = false; 2210 // Check that there is no file in the parent chain of the given path. 2211 for (Path current = absolutePath, parent = current.getParent(); 2212 parent != null; // Stop when you get to the root 2213 current = parent, parent = current.getParent()) { 2214 String currentKey = pathToKey(current); 2215 FileMetadata currentMetadata = store.retrieveMetadata(currentKey); 2216 if (currentMetadata != null && !currentMetadata.isDir()) { 2217 throw new FileAlreadyExistsException("Cannot create directory " + f + " because " 2218 + current + " is an existing file."); 2219 } else if (currentMetadata == null) { 2220 keysToCreateAsFolder.add(currentKey); 2221 childCreated = true; 2222 } else { 2223 // The directory already exists. Its last modified time need to be 2224 // updated if there is a child directory created under it. 2225 if (childCreated) { 2226 keysToUpdateAsFolder.add(currentKey); 2227 } 2228 childCreated = false; 2229 } 2230 } 2231 2232 for (String currentKey : keysToCreateAsFolder) { 2233 store.storeEmptyFolder(currentKey, permissionStatus); 2234 } 2235 2236 instrumentation.directoryCreated(); 2237 2238 // otherwise throws exception 2239 return true; 2240 } 2241 2242 @Override 2243 public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException { 2244 2245 LOG.debug("Opening file: {}", f.toString()); 2246 2247 Path absolutePath = makeAbsolute(f); 2248 String key = pathToKey(absolutePath); 2249 FileMetadata meta = null; 2250 try { 2251 meta = store.retrieveMetadata(key); 2252 } catch(Exception ex) { 2253 2254 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2255 2256 if (innerException instanceof StorageException 2257 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2258 2259 throw new FileNotFoundException(String.format("%s is not found", key)); 2260 } 2261 2262 throw ex; 2263 } 2264 2265 if (meta == null) { 2266 throw new FileNotFoundException(f.toString()); 2267 } 2268 if (meta.isDir()) { 2269 throw new FileNotFoundException(f.toString() 2270 + " is a directory not a file."); 2271 } 2272 2273 DataInputStream inputStream = null; 2274 try { 2275 inputStream = store.retrieve(key); 2276 } catch(Exception ex) { 2277 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2278 2279 if (innerException instanceof StorageException 2280 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2281 2282 throw new FileNotFoundException(String.format("%s is not found", key)); 2283 } 2284 2285 throw ex; 2286 } 2287 2288 return new FSDataInputStream(new BufferedFSInputStream( 2289 new NativeAzureFsInputStream(inputStream, key, meta.getLength()), bufferSize)); 2290 } 2291 2292 @Override 2293 public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException { 2294 2295 FolderRenamePending renamePending = null; 2296 2297 LOG.debug("Moving {} to {}", src, dst); 2298 2299 if (containsColon(dst)) { 2300 throw new IOException("Cannot rename to file " + dst 2301 + " through WASB that has colons in the name"); 2302 } 2303 2304 String srcKey = pathToKey(makeAbsolute(src)); 2305 2306 if (srcKey.length() == 0) { 2307 // Cannot rename root of file system 2308 return false; 2309 } 2310 2311 // Figure out the final destination 2312 Path absoluteDst = makeAbsolute(dst); 2313 String dstKey = pathToKey(absoluteDst); 2314 FileMetadata dstMetadata = null; 2315 try { 2316 dstMetadata = store.retrieveMetadata(dstKey); 2317 } catch (IOException ex) { 2318 2319 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2320 2321 // A BlobNotFound storage exception in only thrown from retrieveMetdata API when 2322 // there is a race condition. If there is another thread which deletes the destination 2323 // file or folder, then this thread calling rename should be able to continue with 2324 // rename gracefully. Hence the StorageException is swallowed here. 2325 if (innerException instanceof StorageException) { 2326 if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2327 LOG.debug("BlobNotFound exception encountered for Destination key : {}. " 2328 + "Swallowin the exception to handle race condition gracefully", dstKey); 2329 } 2330 } else { 2331 throw ex; 2332 } 2333 } 2334 2335 if (dstMetadata != null && dstMetadata.isDir()) { 2336 // It's an existing directory. 2337 dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); 2338 LOG.debug("Destination {} " 2339 + " is a directory, adjusted the destination to be {}", dst, dstKey); 2340 } else if (dstMetadata != null) { 2341 // Attempting to overwrite a file using rename() 2342 LOG.debug("Destination {}" 2343 + " is an already existing file, failing the rename.", dst); 2344 return false; 2345 } else { 2346 // Check that the parent directory exists. 2347 FileMetadata parentOfDestMetadata = null; 2348 try { 2349 parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent())); 2350 } catch (IOException ex) { 2351 2352 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2353 2354 if (innerException instanceof StorageException 2355 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2356 2357 LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst); 2358 return false; 2359 } 2360 2361 throw ex; 2362 } 2363 2364 if (parentOfDestMetadata == null) { 2365 LOG.debug("Parent of the destination {}" 2366 + " doesn't exist, failing the rename.", dst); 2367 return false; 2368 } else if (!parentOfDestMetadata.isDir()) { 2369 LOG.debug("Parent of the destination {}" 2370 + " is a file, failing the rename.", dst); 2371 return false; 2372 } 2373 } 2374 FileMetadata srcMetadata = null; 2375 try { 2376 srcMetadata = store.retrieveMetadata(srcKey); 2377 } catch (IOException ex) { 2378 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2379 2380 if (innerException instanceof StorageException 2381 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2382 2383 LOG.debug("Source {} doesn't exists. Failing rename", src); 2384 return false; 2385 } 2386 2387 throw ex; 2388 } 2389 2390 if (srcMetadata == null) { 2391 // Source doesn't exist 2392 LOG.debug("Source {} doesn't exist, failing the rename.", src); 2393 return false; 2394 } else if (!srcMetadata.isDir()) { 2395 LOG.debug("Source {} found as a file, renaming.", src); 2396 try { 2397 store.rename(srcKey, dstKey); 2398 } catch(IOException ex) { 2399 2400 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2401 2402 if (innerException instanceof StorageException 2403 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2404 2405 LOG.debug("BlobNotFoundException encountered. Failing rename", src); 2406 return false; 2407 } 2408 2409 throw ex; 2410 } 2411 } else { 2412 2413 // Prepare for, execute and clean up after of all files in folder, and 2414 // the root file, and update the last modified time of the source and 2415 // target parent folders. The operation can be redone if it fails part 2416 // way through, by applying the "Rename Pending" file. 2417 2418 // The following code (internally) only does atomic rename preparation 2419 // and lease management for page blob folders, limiting the scope of the 2420 // operation to HBase log file folders, where atomic rename is required. 2421 // In the future, we could generalize it easily to all folders. 2422 renamePending = prepareAtomicFolderRename(srcKey, dstKey); 2423 renamePending.execute(); 2424 2425 LOG.debug("Renamed {} to {} successfully.", src, dst); 2426 renamePending.cleanup(); 2427 return true; 2428 } 2429 2430 // Update the last-modified time of the parent folders of both source 2431 // and destination. 2432 updateParentFolderLastModifiedTime(srcKey); 2433 updateParentFolderLastModifiedTime(dstKey); 2434 2435 LOG.debug("Renamed {} to {} successfully.", src, dst); 2436 return true; 2437 } 2438 2439 /** 2440 * Update the last-modified time of the parent folder of the file 2441 * identified by key. 2442 * @param key 2443 * @throws IOException 2444 */ 2445 private void updateParentFolderLastModifiedTime(String key) 2446 throws IOException { 2447 Path parent = makeAbsolute(keyToPath(key)).getParent(); 2448 if (parent != null && parent.getParent() != null) { // not root 2449 String parentKey = pathToKey(parent); 2450 2451 // ensure the parent is a materialized folder 2452 FileMetadata parentMetadata = store.retrieveMetadata(parentKey); 2453 // The metadata could be null if the implicit folder only contains a 2454 // single file. In this case, the parent folder no longer exists if the 2455 // file is renamed; so we can safely ignore the null pointer case. 2456 if (parentMetadata != null) { 2457 if (parentMetadata.isDir() 2458 && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2459 store.storeEmptyFolder(parentKey, 2460 createPermissionStatus(FsPermission.getDefault())); 2461 } 2462 2463 if (store.isAtomicRenameKey(parentKey)) { 2464 SelfRenewingLease lease = null; 2465 try { 2466 lease = leaseSourceFolder(parentKey); 2467 store.updateFolderLastModifiedTime(parentKey, lease); 2468 } catch (AzureException e) { 2469 String errorCode = ""; 2470 try { 2471 StorageException e2 = (StorageException) e.getCause(); 2472 errorCode = e2.getErrorCode(); 2473 } catch (Exception e3) { 2474 // do nothing if cast fails 2475 } 2476 if (errorCode.equals("BlobNotFound")) { 2477 throw new FileNotFoundException("Folder does not exist: " + parentKey); 2478 } 2479 LOG.warn("Got unexpected exception trying to get lease on {}. {}", 2480 parentKey, e.getMessage()); 2481 throw e; 2482 } finally { 2483 try { 2484 if (lease != null) { 2485 lease.free(); 2486 } 2487 } catch (Exception e) { 2488 LOG.error("Unable to free lease on {}", parentKey, e); 2489 } 2490 } 2491 } else { 2492 store.updateFolderLastModifiedTime(parentKey, null); 2493 } 2494 } 2495 } 2496 } 2497 2498 /** 2499 * If the source is a page blob folder, 2500 * prepare to rename this folder atomically. This means to get exclusive 2501 * access to the source folder, and record the actions to be performed for 2502 * this rename in a "Rename Pending" file. This code was designed to 2503 * meet the needs of HBase, which requires atomic rename of write-ahead log 2504 * (WAL) folders for correctness. 2505 * 2506 * Before calling this method, the caller must ensure that the source is a 2507 * folder. 2508 * 2509 * For non-page-blob directories, prepare the in-memory information needed, 2510 * but don't take the lease or write the redo file. This is done to limit the 2511 * scope of atomic folder rename to HBase, at least at the time of writing 2512 * this code. 2513 * 2514 * @param srcKey Source folder name. 2515 * @param dstKey Destination folder name. 2516 * @throws IOException 2517 */ 2518 private FolderRenamePending prepareAtomicFolderRename( 2519 String srcKey, String dstKey) throws IOException { 2520 2521 if (store.isAtomicRenameKey(srcKey)) { 2522 2523 // Block unwanted concurrent access to source folder. 2524 SelfRenewingLease lease = leaseSourceFolder(srcKey); 2525 2526 // Prepare in-memory information needed to do or redo a folder rename. 2527 FolderRenamePending renamePending = 2528 new FolderRenamePending(srcKey, dstKey, lease, this); 2529 2530 // Save it to persistent storage to help recover if the operation fails. 2531 renamePending.writeFile(this); 2532 return renamePending; 2533 } else { 2534 FolderRenamePending renamePending = 2535 new FolderRenamePending(srcKey, dstKey, null, this); 2536 return renamePending; 2537 } 2538 } 2539 2540 /** 2541 * Get a self-renewing Azure blob lease on the source folder zero-byte file. 2542 */ 2543 private SelfRenewingLease leaseSourceFolder(String srcKey) 2544 throws AzureException { 2545 return store.acquireLease(srcKey); 2546 } 2547 2548 /** 2549 * Return an array containing hostnames, offset and size of 2550 * portions of the given file. For WASB we'll just lie and give 2551 * fake hosts to make sure we get many splits in MR jobs. 2552 */ 2553 @Override 2554 public BlockLocation[] getFileBlockLocations(FileStatus file, 2555 long start, long len) throws IOException { 2556 if (file == null) { 2557 return null; 2558 } 2559 2560 if ((start < 0) || (len < 0)) { 2561 throw new IllegalArgumentException("Invalid start or len parameter"); 2562 } 2563 2564 if (file.getLen() < start) { 2565 return new BlockLocation[0]; 2566 } 2567 final String blobLocationHost = getConf().get( 2568 AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, 2569 AZURE_BLOCK_LOCATION_HOST_DEFAULT); 2570 final String[] name = { blobLocationHost }; 2571 final String[] host = { blobLocationHost }; 2572 long blockSize = file.getBlockSize(); 2573 if (blockSize <= 0) { 2574 throw new IllegalArgumentException( 2575 "The block size for the given file is not a positive number: " 2576 + blockSize); 2577 } 2578 int numberOfLocations = (int) (len / blockSize) 2579 + ((len % blockSize == 0) ? 0 : 1); 2580 BlockLocation[] locations = new BlockLocation[numberOfLocations]; 2581 for (int i = 0; i < locations.length; i++) { 2582 long currentOffset = start + (i * blockSize); 2583 long currentLength = Math.min(blockSize, start + len - currentOffset); 2584 locations[i] = new BlockLocation(name, host, currentOffset, currentLength); 2585 } 2586 return locations; 2587 } 2588 2589 /** 2590 * Set the working directory to the given directory. 2591 */ 2592 @Override 2593 public void setWorkingDirectory(Path newDir) { 2594 workingDir = makeAbsolute(newDir); 2595 } 2596 2597 @Override 2598 public Path getWorkingDirectory() { 2599 return workingDir; 2600 } 2601 2602 @Override 2603 public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException { 2604 Path absolutePath = makeAbsolute(p); 2605 String key = pathToKey(absolutePath); 2606 FileMetadata metadata = null; 2607 try { 2608 metadata = store.retrieveMetadata(key); 2609 } catch (IOException ex) { 2610 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2611 2612 if (innerException instanceof StorageException 2613 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2614 2615 throw new FileNotFoundException(String.format("File %s doesn't exists.", p)); 2616 } 2617 2618 throw ex; 2619 } 2620 2621 if (metadata == null) { 2622 throw new FileNotFoundException("File doesn't exist: " + p); 2623 } 2624 permission = applyUMask(permission, 2625 metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory 2626 : UMaskApplyMode.ChangeExistingFile); 2627 if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2628 // It's an implicit folder, need to materialize it. 2629 store.storeEmptyFolder(key, createPermissionStatus(permission)); 2630 } else if (!metadata.getPermissionStatus().getPermission(). 2631 equals(permission)) { 2632 store.changePermissionStatus(key, new PermissionStatus( 2633 metadata.getPermissionStatus().getUserName(), 2634 metadata.getPermissionStatus().getGroupName(), 2635 permission)); 2636 } 2637 } 2638 2639 @Override 2640 public void setOwner(Path p, String username, String groupname) 2641 throws IOException { 2642 Path absolutePath = makeAbsolute(p); 2643 String key = pathToKey(absolutePath); 2644 FileMetadata metadata = null; 2645 2646 try { 2647 metadata = store.retrieveMetadata(key); 2648 } catch (IOException ex) { 2649 Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex); 2650 2651 if (innerException instanceof StorageException 2652 && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) { 2653 2654 throw new FileNotFoundException(String.format("File %s doesn't exists.", p)); 2655 } 2656 2657 throw ex; 2658 } 2659 2660 if (metadata == null) { 2661 throw new FileNotFoundException("File doesn't exist: " + p); 2662 } 2663 2664 PermissionStatus newPermissionStatus = new PermissionStatus( 2665 username == null ? 2666 metadata.getPermissionStatus().getUserName() : username, 2667 groupname == null ? 2668 metadata.getPermissionStatus().getGroupName() : groupname, 2669 metadata.getPermissionStatus().getPermission()); 2670 if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { 2671 // It's an implicit folder, need to materialize it. 2672 store.storeEmptyFolder(key, newPermissionStatus); 2673 } else { 2674 store.changePermissionStatus(key, newPermissionStatus); 2675 } 2676 } 2677 2678 @Override 2679 public synchronized void close() throws IOException { 2680 if (isClosed) { 2681 return; 2682 } 2683 2684 // Call the base close() to close any resources there. 2685 super.close(); 2686 // Close the store to close any resources there - e.g. the bandwidth 2687 // updater thread would be stopped at this time. 2688 store.close(); 2689 // Notify the metrics system that this file system is closed, which may 2690 // trigger one final metrics push to get the accurate final file system 2691 // metrics out. 2692 2693 long startTime = System.currentTimeMillis(); 2694 2695 if(!getConf().getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { 2696 AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName); 2697 AzureFileSystemMetricsSystem.fileSystemClosed(); 2698 } 2699 2700 LOG.debug("Submitting metrics when file system closed took {} ms.", 2701 (System.currentTimeMillis() - startTime)); 2702 isClosed = true; 2703 } 2704 2705 /** 2706 * A handler that defines what to do with blobs whose upload was 2707 * interrupted. 2708 */ 2709 private abstract class DanglingFileHandler { 2710 abstract void handleFile(FileMetadata file, FileMetadata tempFile) 2711 throws IOException; 2712 } 2713 2714 /** 2715 * Handler implementation for just deleting dangling files and cleaning 2716 * them up. 2717 */ 2718 private class DanglingFileDeleter extends DanglingFileHandler { 2719 @Override 2720 void handleFile(FileMetadata file, FileMetadata tempFile) 2721 throws IOException { 2722 2723 LOG.debug("Deleting dangling file {}", file.getKey()); 2724 store.delete(file.getKey()); 2725 store.delete(tempFile.getKey()); 2726 } 2727 } 2728 2729 /** 2730 * Handler implementation for just moving dangling files to recovery 2731 * location (/lost+found). 2732 */ 2733 private class DanglingFileRecoverer extends DanglingFileHandler { 2734 private final Path destination; 2735 2736 DanglingFileRecoverer(Path destination) { 2737 this.destination = destination; 2738 } 2739 2740 @Override 2741 void handleFile(FileMetadata file, FileMetadata tempFile) 2742 throws IOException { 2743 2744 LOG.debug("Recovering {}", file.getKey()); 2745 // Move to the final destination 2746 String finalDestinationKey = 2747 pathToKey(new Path(destination, file.getKey())); 2748 store.rename(tempFile.getKey(), finalDestinationKey); 2749 if (!finalDestinationKey.equals(file.getKey())) { 2750 // Delete the empty link file now that we've restored it. 2751 store.delete(file.getKey()); 2752 } 2753 } 2754 } 2755 2756 /** 2757 * Check if a path has colons in its name 2758 */ 2759 private boolean containsColon(Path p) { 2760 return p.toUri().getPath().toString().contains(":"); 2761 } 2762 2763 /** 2764 * Implements recover and delete (-move and -delete) behaviors for handling 2765 * dangling files (blobs whose upload was interrupted). 2766 * 2767 * @param root 2768 * The root path to check from. 2769 * @param handler 2770 * The handler that deals with dangling files. 2771 */ 2772 private void handleFilesWithDanglingTempData(Path root, 2773 DanglingFileHandler handler) throws IOException { 2774 // Calculate the cut-off for when to consider a blob to be dangling. 2775 long cutoffForDangling = new Date().getTime() 2776 - getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME, 2777 AZURE_TEMP_EXPIRY_DEFAULT) * 1000; 2778 // Go over all the blobs under the given root and look for blobs to 2779 // recover. 2780 String priorLastKey = null; 2781 do { 2782 PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL, 2783 AZURE_UNBOUNDED_DEPTH, priorLastKey); 2784 2785 for (FileMetadata file : listing.getFiles()) { 2786 if (!file.isDir()) { // We don't recover directory blobs 2787 // See if this blob has a link in it (meaning it's a place-holder 2788 // blob for when the upload to the temp blob is complete). 2789 String link = store.getLinkInFileMetadata(file.getKey()); 2790 if (link != null) { 2791 // It has a link, see if the temp blob it is pointing to is 2792 // existent and old enough to be considered dangling. 2793 FileMetadata linkMetadata = store.retrieveMetadata(link); 2794 if (linkMetadata != null 2795 && linkMetadata.getLastModified() >= cutoffForDangling) { 2796 // Found one! 2797 handler.handleFile(file, linkMetadata); 2798 } 2799 } 2800 } 2801 } 2802 priorLastKey = listing.getPriorLastKey(); 2803 } while (priorLastKey != null); 2804 } 2805 2806 /** 2807 * Looks under the given root path for any blob that are left "dangling", 2808 * meaning that they are place-holder blobs that we created while we upload 2809 * the data to a temporary blob, but for some reason we crashed in the middle 2810 * of the upload and left them there. If any are found, we move them to the 2811 * destination given. 2812 * 2813 * @param root 2814 * The root path to consider. 2815 * @param destination 2816 * The destination path to move any recovered files to. 2817 * @throws IOException 2818 */ 2819 public void recoverFilesWithDanglingTempData(Path root, Path destination) 2820 throws IOException { 2821 2822 LOG.debug("Recovering files with dangling temp data in {}", root); 2823 handleFilesWithDanglingTempData(root, 2824 new DanglingFileRecoverer(destination)); 2825 } 2826 2827 /** 2828 * Looks under the given root path for any blob that are left "dangling", 2829 * meaning that they are place-holder blobs that we created while we upload 2830 * the data to a temporary blob, but for some reason we crashed in the middle 2831 * of the upload and left them there. If any are found, we delete them. 2832 * 2833 * @param root 2834 * The root path to consider. 2835 * @throws IOException 2836 */ 2837 public void deleteFilesWithDanglingTempData(Path root) throws IOException { 2838 2839 LOG.debug("Deleting files with dangling temp data in {}", root); 2840 handleFilesWithDanglingTempData(root, new DanglingFileDeleter()); 2841 } 2842 2843 @Override 2844 protected void finalize() throws Throwable { 2845 LOG.debug("finalize() called."); 2846 close(); 2847 super.finalize(); 2848 } 2849 2850 /** 2851 * Encode the key with a random prefix for load balancing in Azure storage. 2852 * Upload data to a random temporary file then do storage side renaming to 2853 * recover the original key. 2854 * 2855 * @param aKey 2856 * @return Encoded version of the original key. 2857 */ 2858 private static String encodeKey(String aKey) { 2859 // Get the tail end of the key name. 2860 // 2861 String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1, 2862 aKey.length()); 2863 2864 // Construct the randomized prefix of the file name. The prefix ensures the 2865 // file always drops into the same folder but with a varying tail key name. 2866 String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR 2867 + UUID.randomUUID().toString(); 2868 2869 // Concatenate the randomized prefix with the tail of the key name. 2870 String randomizedKey = filePrefix + fileName; 2871 2872 // Return to the caller with the randomized key. 2873 return randomizedKey; 2874 } 2875}