001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.azure;
020
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.EOFException;
024import java.io.FileNotFoundException;
025import java.io.IOException;
026import java.io.InputStream;
027import java.io.OutputStream;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.nio.charset.Charset;
031import java.text.SimpleDateFormat;
032import java.util.ArrayList;
033import java.util.Date;
034import java.util.EnumSet;
035import java.util.Set;
036import java.util.TimeZone;
037import java.util.TreeSet;
038import java.util.UUID;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.regex.Matcher;
041import java.util.regex.Pattern;
042
043import org.apache.commons.lang.StringUtils;
044import org.apache.hadoop.classification.InterfaceAudience;
045import org.apache.hadoop.classification.InterfaceStability;
046import org.apache.hadoop.conf.Configuration;
047import org.apache.hadoop.fs.BlockLocation;
048import org.apache.hadoop.fs.BufferedFSInputStream;
049import org.apache.hadoop.fs.CreateFlag;
050import org.apache.hadoop.fs.FSDataInputStream;
051import org.apache.hadoop.fs.FSDataOutputStream;
052import org.apache.hadoop.fs.FSExceptionMessages;
053import org.apache.hadoop.fs.FSInputStream;
054import org.apache.hadoop.fs.FileAlreadyExistsException;
055import org.apache.hadoop.fs.FileStatus;
056import org.apache.hadoop.fs.FileSystem;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
059import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
060import org.apache.hadoop.fs.permission.FsPermission;
061import org.apache.hadoop.fs.permission.PermissionStatus;
062import org.apache.hadoop.fs.azure.AzureException;
063import org.apache.hadoop.security.UserGroupInformation;
064import org.apache.hadoop.util.Progressable;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067import org.codehaus.jackson.JsonNode;
068import org.codehaus.jackson.JsonParseException;
069import org.codehaus.jackson.JsonParser;
070import org.codehaus.jackson.map.JsonMappingException;
071import org.codehaus.jackson.map.ObjectMapper;
072
073import com.google.common.annotations.VisibleForTesting;
074import com.microsoft.azure.storage.StorageException;
075
076
077import org.apache.hadoop.io.IOUtils;
078
079/**
080 * A {@link FileSystem} for reading and writing files stored on <a
081 * href="http://store.azure.com/">Windows Azure</a>. This implementation is
082 * blob-based and stores files on Azure in their native form so they can be read
083 * by other Azure tools.
084 */
085@InterfaceAudience.Public
086@InterfaceStability.Stable
087public class NativeAzureFileSystem extends FileSystem {
088  private static final int USER_WX_PERMISION = 0300;
089  /**
090   * A description of a folder rename operation, including the source and
091   * destination keys, and descriptions of the files in the source folder.
092   */
093  public static class FolderRenamePending {
094    private SelfRenewingLease folderLease;
095    private String srcKey;
096    private String dstKey;
097    private FileMetadata[] fileMetadata = null;    // descriptions of source files
098    private ArrayList<String> fileStrings = null;
099    private NativeAzureFileSystem fs;
100    private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000;
101    private static final int FORMATTING_BUFFER = 10000;
102    private boolean committed;
103    public static final String SUFFIX = "-RenamePending.json";
104
105    // Prepare in-memory information needed to do or redo a folder rename.
106    public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease,
107        NativeAzureFileSystem fs) throws IOException {
108      this.srcKey = srcKey;
109      this.dstKey = dstKey;
110      this.folderLease = lease;
111      this.fs = fs;
112      ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
113
114      // List all the files in the folder.
115      String priorLastKey = null;
116      do {
117        PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL,
118          AZURE_UNBOUNDED_DEPTH, priorLastKey);
119        for(FileMetadata file : listing.getFiles()) {
120          fileMetadataList.add(file);
121        }
122        priorLastKey = listing.getPriorLastKey();
123      } while (priorLastKey != null);
124      fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
125      this.committed = true;
126    }
127
128    // Prepare in-memory information needed to do or redo folder rename from
129    // a -RenamePending.json file read from storage. This constructor is to use during
130    // redo processing.
131    public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs)
132        throws IllegalArgumentException, IOException {
133
134      this.fs = fs;
135
136      // open redo file
137      Path f = redoFile;
138      FSDataInputStream input = fs.open(f);
139      byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE];
140      int l = input.read(bytes);
141      if (l <= 0) {
142        // Jira HADOOP-12678 -Handle empty rename pending metadata file during
143        // atomic rename in redo path. If during renamepending file is created
144        // but not written yet, then this means that rename operation
145        // has not started yet. So we should delete rename pending metadata file.
146        LOG.error("Deleting empty rename pending file "
147            + redoFile + " -- no data available");
148        deleteRenamePendingFile(fs, redoFile);
149        return;
150      }
151      if (l == MAX_RENAME_PENDING_FILE_SIZE) {
152        throw new IOException(
153            "Error reading pending rename file contents -- "
154                + "maximum file size exceeded");
155      }
156      String contents = new String(bytes, 0, l, Charset.forName("UTF-8"));
157
158      // parse the JSON
159      ObjectMapper objMapper = new ObjectMapper();
160      objMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
161      JsonNode json = null;
162      try {
163        json = objMapper.readValue(contents, JsonNode.class);
164        this.committed = true;
165      } catch (JsonMappingException e) {
166
167        // The -RedoPending.json file is corrupted, so we assume it was
168        // not completely written
169        // and the redo operation did not commit.
170        this.committed = false;
171      } catch (JsonParseException e) {
172        this.committed = false;
173      } catch (IOException e) {
174        this.committed = false;
175      }
176      
177      if (!this.committed) {
178        LOG.error("Deleting corruped rename pending file {} \n {}",
179            redoFile, contents);
180
181        // delete the -RenamePending.json file
182        deleteRenamePendingFile(fs, redoFile);
183        return;
184      }
185
186      // initialize this object's fields
187      ArrayList<String> fileStrList = new ArrayList<String>();
188      JsonNode oldFolderName = json.get("OldFolderName");
189      JsonNode newFolderName = json.get("NewFolderName");
190      if (oldFolderName == null || newFolderName == null) {
191          this.committed = false;
192      } else {
193        this.srcKey = oldFolderName.getTextValue();
194        this.dstKey = newFolderName.getTextValue();
195        if (this.srcKey == null || this.dstKey == null) {
196          this.committed = false;
197        } else {
198          JsonNode fileList = json.get("FileList");
199          if (fileList == null) {
200            this.committed = false;
201          } else {
202            for (int i = 0; i < fileList.size(); i++) {
203              fileStrList.add(fileList.get(i).getTextValue());
204            }
205          }
206        }
207      }
208      this.fileStrings = fileStrList;
209    }
210
211    public FileMetadata[] getFiles() {
212      return fileMetadata;
213    }
214
215    public SelfRenewingLease getFolderLease() {
216      return folderLease;
217    }
218
219    /**
220     * Deletes rename pending metadata file
221     * @param fs -- the file system
222     * @param redoFile - rename pending metadata file path
223     * @throws IOException - If deletion fails
224     */
225    @VisibleForTesting
226    void deleteRenamePendingFile(FileSystem fs, Path redoFile)
227        throws IOException {
228      try {
229        fs.delete(redoFile, false);
230      } catch (IOException e) {
231        // If the rename metadata was not found then somebody probably
232        // raced with us and finished the delete first
233        Throwable t = e.getCause();
234        if (t != null && t instanceof StorageException
235            && "BlobNotFound".equals(((StorageException) t).getErrorCode())) {
236          LOG.warn("rename pending file " + redoFile + " is already deleted");
237        } else {
238          throw e;
239        }
240      }
241    }
242
243    /**
244     * Write to disk the information needed to redo folder rename,
245     * in JSON format. The file name will be
246     * {@code wasb://<sourceFolderPrefix>/folderName-RenamePending.json}
247     * The file format will be:
248     * <pre>{@code
249     * {
250     *   FormatVersion: "1.0",
251     *   OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>",
252     *   OldFolderName: "<key>",
253     *   NewFolderName: "<key>",
254     *   FileList: [ <string> , <string> , ... ]
255     * }
256     *
257     * Here's a sample:
258     * {
259     *  FormatVersion: "1.0",
260     *  OperationUTCTime: "2014-07-01 23:50:35.572",
261     *  OldFolderName: "user/ehans/folderToRename",
262     *  NewFolderName: "user/ehans/renamedFolder",
263     *  FileList: [
264     *    "innerFile",
265     *    "innerFile2"
266     *  ]
267     * } }</pre>
268     * @throws IOException
269     */
270    public void writeFile(FileSystem fs) throws IOException {
271      Path path = getRenamePendingFilePath();
272      LOG.debug("Preparing to write atomic rename state to {}", path.toString());
273      OutputStream output = null;
274
275      String contents = makeRenamePendingFileContents();
276
277      // Write file.
278      try {
279        output = fs.create(path);
280        output.write(contents.getBytes(Charset.forName("UTF-8")));
281      } catch (IOException e) {
282        throw new IOException("Unable to write RenamePending file for folder rename from "
283            + srcKey + " to " + dstKey, e);
284      } finally {
285        NativeAzureFileSystemHelper.cleanup(LOG, output);
286      }
287    }
288
289    /**
290     * Return the contents of the JSON file to represent the operations
291     * to be performed for a folder rename.
292     */
293    public String makeRenamePendingFileContents() {
294      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
295      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
296      String time = sdf.format(new Date());
297
298      // Make file list string
299      StringBuilder builder = new StringBuilder();
300      builder.append("[\n");
301      for (int i = 0; i != fileMetadata.length; i++) {
302        if (i > 0) {
303          builder.append(",\n");
304        }
305        builder.append("    ");
306        String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/");
307
308        // Quote string file names, escaping any possible " characters or other
309        // necessary characters in the name.
310        builder.append(quote(noPrefix));
311        if (builder.length() >=
312            MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) {
313
314          // Give up now to avoid using too much memory.
315          LOG.error("Internal error: Exceeded maximum rename pending file size of {} bytes.",
316              MAX_RENAME_PENDING_FILE_SIZE);
317
318          // return some bad JSON with an error message to make it human readable
319          return "exceeded maximum rename pending file size";
320        }
321      }
322      builder.append("\n  ]");
323      String fileList = builder.toString();
324
325      // Make file contents as a string. Again, quote file names, escaping
326      // characters as appropriate.
327      String contents = "{\n"
328          + "  FormatVersion: \"1.0\",\n"
329          + "  OperationUTCTime: \"" + time + "\",\n"
330          + "  OldFolderName: " + quote(srcKey) + ",\n"
331          + "  NewFolderName: " + quote(dstKey) + ",\n"
332          + "  FileList: " + fileList + "\n"
333          + "}\n";
334
335      return contents;
336    }
337    
338    /**
339     * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote 
340     * method.
341     * 
342     * Produce a string in double quotes with backslash sequences in all the
343     * right places. A backslash will be inserted within </, allowing JSON
344     * text to be delivered in HTML. In JSON text, a string cannot contain a
345     * control character or an unescaped quote or backslash.
346     * @param string A String
347     * @return  A String correctly formatted for insertion in a JSON text.
348     */
349    private String quote(String string) {
350        if (string == null || string.length() == 0) {
351            return "\"\"";
352        }
353
354        char c = 0;
355        int  i;
356        int  len = string.length();
357        StringBuilder sb = new StringBuilder(len + 4);
358        String t;
359
360        sb.append('"');
361        for (i = 0; i < len; i += 1) {
362            c = string.charAt(i);
363            switch (c) {
364            case '\\':
365            case '"':
366                sb.append('\\');
367                sb.append(c);
368                break;
369            case '/':
370                sb.append('\\');
371                sb.append(c);
372                break;
373            case '\b':
374                sb.append("\\b");
375                break;
376            case '\t':
377                sb.append("\\t");
378                break;
379            case '\n':
380                sb.append("\\n");
381                break;
382            case '\f':
383                sb.append("\\f");
384                break;
385            case '\r':
386                sb.append("\\r");
387                break;
388            default:
389                if (c < ' ') {
390                    t = "000" + Integer.toHexString(c);
391                    sb.append("\\u" + t.substring(t.length() - 4));
392                } else {
393                    sb.append(c);
394                }
395            }
396        }
397        sb.append('"');
398        return sb.toString();
399    }
400
401    public String getSrcKey() {
402      return srcKey;
403    }
404
405    public String getDstKey() {
406      return dstKey;
407    }
408
409    public FileMetadata getSourceMetadata() throws IOException {
410      return fs.getStoreInterface().retrieveMetadata(srcKey);
411    }
412
413    /**
414     * Execute a folder rename. This is the execution path followed
415     * when everything is working normally. See redo() for the alternate
416     * execution path for the case where we're recovering from a folder rename
417     * failure.
418     * @throws IOException
419     */
420    public void execute() throws IOException {
421
422      for (FileMetadata file : this.getFiles()) {
423
424        // Rename all materialized entries under the folder to point to the
425        // final destination.
426        if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
427          String srcName = file.getKey();
428          String suffix  = srcName.substring((this.getSrcKey()).length());
429          String dstName = this.getDstKey() + suffix;
430
431          // Rename gets exclusive access (via a lease) for files
432          // designated for atomic rename.
433          // The main use case is for HBase write-ahead log (WAL) and data
434          // folder processing correctness.  See the rename code for details.
435          boolean acquireLease = fs.getStoreInterface().isAtomicRenameKey(srcName);
436          fs.getStoreInterface().rename(srcName, dstName, acquireLease, null);
437        }
438      }
439
440      // Rename the source folder 0-byte root file itself.
441      FileMetadata srcMetadata2 = this.getSourceMetadata();
442      if (srcMetadata2.getBlobMaterialization() ==
443          BlobMaterialization.Explicit) {
444
445        // It already has a lease on it from the "prepare" phase so there's no
446        // need to get one now. Pass in existing lease to allow file delete.
447        fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(),
448            false, folderLease);
449      }
450
451      // Update the last-modified time of the parent folders of both source and
452      // destination.
453      fs.updateParentFolderLastModifiedTime(srcKey);
454      fs.updateParentFolderLastModifiedTime(dstKey);
455    }
456
457    /** Clean up after execution of rename.
458     * @throws IOException */
459    public void cleanup() throws IOException {
460
461      if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) {
462
463        // Remove RenamePending file
464        fs.delete(getRenamePendingFilePath(), false);
465
466        // Freeing source folder lease is not necessary since the source
467        // folder file was deleted.
468      }
469    }
470
471    private Path getRenamePendingFilePath() {
472      String fileName = srcKey + SUFFIX;
473      Path fileNamePath = keyToPath(fileName);
474      Path path = fs.makeAbsolute(fileNamePath);
475      return path;
476    }
477
478    /**
479     * Recover from a folder rename failure by redoing the intended work,
480     * as recorded in the -RenamePending.json file.
481     * 
482     * @throws IOException
483     */
484    public void redo() throws IOException {
485
486      if (!committed) {
487
488        // Nothing to do. The -RedoPending.json file should have already been
489        // deleted.
490        return;
491      }
492
493      // Try to get a lease on source folder to block concurrent access to it.
494      // It may fail if the folder is already gone. We don't check if the
495      // source exists explicitly because that could recursively trigger redo
496      // and give an infinite recursion.
497      SelfRenewingLease lease = null;
498      boolean sourceFolderGone = false;
499      try {
500        lease = fs.leaseSourceFolder(srcKey);
501      } catch (AzureException e) {
502
503        // If the source folder was not found then somebody probably
504        // raced with us and finished the rename first, or the
505        // first rename failed right before deleting the rename pending
506        // file.
507        String errorCode = "";
508        try {
509          StorageException se = (StorageException) e.getCause();
510          errorCode = se.getErrorCode();
511        } catch (Exception e2) {
512          ; // do nothing -- could not get errorCode
513        }
514        if (errorCode.equals("BlobNotFound")) {
515          sourceFolderGone = true;
516        } else {
517          throw new IOException(
518              "Unexpected error when trying to lease source folder name during "
519              + "folder rename redo",
520              e);
521        }
522      }
523
524      if (!sourceFolderGone) {
525        // Make sure the target folder exists.
526        Path dst = fullPath(dstKey);
527        if (!fs.exists(dst)) {
528          fs.mkdirs(dst);
529        }
530
531        // For each file inside the folder to be renamed,
532        // make sure it has been renamed.
533        for(String fileName : fileStrings) {
534          finishSingleFileRename(fileName);
535        }
536
537        // Remove the source folder. Don't check explicitly if it exists,
538        // to avoid triggering redo recursively.
539        try {
540          fs.getStoreInterface().delete(srcKey, lease);
541        } catch (Exception e) {
542          LOG.info("Unable to delete source folder during folder rename redo. "
543              + "If the source folder is already gone, this is not an error "
544              + "condition. Continuing with redo.", e);
545        }
546
547        // Update the last-modified time of the parent folders of both source
548        // and destination.
549        fs.updateParentFolderLastModifiedTime(srcKey);
550        fs.updateParentFolderLastModifiedTime(dstKey);
551      }
552
553      // Remove the -RenamePending.json file.
554      fs.delete(getRenamePendingFilePath(), false);
555    }
556
557    // See if the source file is still there, and if it is, rename it.
558    private void finishSingleFileRename(String fileName)
559        throws IOException {
560      Path srcFile = fullPath(srcKey, fileName);
561      Path dstFile = fullPath(dstKey, fileName);
562      String srcName = fs.pathToKey(srcFile);
563      String dstName = fs.pathToKey(dstFile);
564      boolean srcExists = fs.getStoreInterface().explicitFileExists(srcName);
565      boolean dstExists = fs.getStoreInterface().explicitFileExists(dstName);
566      if(srcExists) {
567        // Rename gets exclusive access (via a lease) for HBase write-ahead log
568        // (WAL) file processing correctness.  See the rename code for details.
569        fs.getStoreInterface().rename(srcName, dstName, true, null);
570      } else if (!srcExists && dstExists) {
571        // The rename already finished, so do nothing.
572        ;
573      } else {
574        throw new IOException(
575            "Attempting to complete rename of file " + srcKey + "/" + fileName
576            + " during folder rename redo, and file was not found in source "
577            + "or destination.");
578      }
579    }
580
581    // Return an absolute path for the specific fileName within the folder
582    // specified by folderKey.
583    private Path fullPath(String folderKey, String fileName) {
584      return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName);
585    }
586
587    private Path fullPath(String fileKey) {
588      return new Path(new Path(fs.getUri()), "/" + fileKey);
589    }
590  }
591
592  private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]";
593  private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN =
594      Pattern.compile("\\[\\[\\.\\]\\](?=$|/)");
595  private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)");
596
597  @Override
598  public String getScheme() {
599    return "wasb";
600  }
601
602  
603  /**
604   * <p>
605   * A {@link FileSystem} for reading and writing files stored on <a
606   * href="http://store.azure.com/">Windows Azure</a>. This implementation is
607   * blob-based and stores files on Azure in their native form so they can be read
608   * by other Azure tools. This implementation uses HTTPS for secure network communication.
609   * </p>
610   */
611  public static class Secure extends NativeAzureFileSystem {
612    @Override
613    public String getScheme() {
614      return "wasbs";
615    }
616  }
617
618  public static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystem.class);
619
620  static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
621  /**
622   * The time span in seconds before which we consider a temp blob to be
623   * dangling (not being actively uploaded to) and up for reclamation.
624   * 
625   * So e.g. if this is 60, then any temporary blobs more than a minute old
626   * would be considered dangling.
627   */
628  static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds";
629  private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600;
630  static final String PATH_DELIMITER = Path.SEPARATOR;
631  static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$";
632
633  private static final int AZURE_LIST_ALL = -1;
634  private static final int AZURE_UNBOUNDED_DEPTH = -1;
635
636  private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L;
637
638  /**
639   * The configuration property that determines which group owns files created
640   * in WASB.
641   */
642  private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup";
643  /**
644   * The default value for fs.azure.permissions.supergroup. Chosen as the same
645   * default as DFS.
646   */
647  static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup";
648
649  static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME =
650      "fs.azure.block.location.impersonatedhost";
651  private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT =
652      "localhost";
653  static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME =
654      "fs.azure.ring.buffer.capacity";
655  static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME =
656      "fs.azure.output.stream.buffer.size";
657
658  public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics";
659
660  /*
661   * Property to enable Append API.
662   */
663  public static final String APPEND_SUPPORT_ENABLE_PROPERTY_NAME = "fs.azure.enable.append.support";
664
665  private class NativeAzureFsInputStream extends FSInputStream {
666    private InputStream in;
667    private final String key;
668    private long pos = 0;
669    private boolean closed = false;
670    private boolean isPageBlob;
671
672    // File length, valid only for streams over block blobs.
673    private long fileLength;
674
675    public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) {
676      this.in = in;
677      this.key = key;
678      this.isPageBlob = store.isPageBlobKey(key);
679      this.fileLength = fileLength;
680    }
681
682    /**
683     * Return the size of the remaining available bytes
684     * if the size is less than or equal to {@link Integer#MAX_VALUE},
685     * otherwise, return {@link Integer#MAX_VALUE}.
686     *
687     * This is to match the behavior of DFSInputStream.available(),
688     * which some clients may rely on (HBase write-ahead log reading in
689     * particular).
690     */
691    @Override
692    public synchronized int available() throws IOException {
693      if (isPageBlob) {
694        return in.available();
695      } else {
696        if (closed) {
697          throw new IOException("Stream closed");
698        }
699        final long remaining = this.fileLength - pos;
700        return remaining <= Integer.MAX_VALUE ?
701            (int) remaining : Integer.MAX_VALUE;
702      }
703    }
704
705    /*
706     * Reads the next byte of data from the input stream. The value byte is
707     * returned as an integer in the range 0 to 255. If no byte is available
708     * because the end of the stream has been reached, the value -1 is returned.
709     * This method blocks until input data is available, the end of the stream
710     * is detected, or an exception is thrown.
711     *
712     * @returns int An integer corresponding to the byte read.
713     */
714    @Override
715    public synchronized int read() throws FileNotFoundException, IOException {
716      try {
717        int result = 0;
718        result = in.read();
719        if (result != -1) {
720          pos++;
721          if (statistics != null) {
722            statistics.incrementBytesRead(1);
723          }
724        }
725      // Return to the caller with the result.
726      //
727        return result;
728      } catch(IOException e) {
729
730        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
731
732        if (innerException instanceof StorageException) {
733
734          LOG.error("Encountered Storage Exception for read on Blob : {}"
735              + " Exception details: {} Error Code : {}",
736              key, e, ((StorageException) innerException).getErrorCode());
737
738          if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
739            throw new FileNotFoundException(String.format("%s is not found", key));
740          }
741        }
742
743       throw e;
744      }
745    }
746
747    /*
748     * Reads up to len bytes of data from the input stream into an array of
749     * bytes. An attempt is made to read as many as len bytes, but a smaller
750     * number may be read. The number of bytes actually read is returned as an
751     * integer. This method blocks until input data is available, end of file is
752     * detected, or an exception is thrown. If len is zero, then no bytes are
753     * read and 0 is returned; otherwise, there is an attempt to read at least
754     * one byte. If no byte is available because the stream is at end of file,
755     * the value -1 is returned; otherwise, at least one byte is read and stored
756     * into b.
757     *
758     * @param b -- the buffer into which data is read
759     *
760     * @param off -- the start offset in the array b at which data is written
761     *
762     * @param len -- the maximum number of bytes read
763     *
764     * @ returns int The total number of byes read into the buffer, or -1 if
765     * there is no more data because the end of stream is reached.
766     */
767    @Override
768    public synchronized int read(byte[] b, int off, int len) throws FileNotFoundException, IOException {
769      try {
770        int result = 0;
771        result = in.read(b, off, len);
772        if (result > 0) {
773          pos += result;
774        }
775
776        if (null != statistics) {
777          statistics.incrementBytesRead(result);
778        }
779
780        // Return to the caller with the result.
781        return result;
782      } catch(IOException e) {
783
784        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
785
786        if (innerException instanceof StorageException) {
787
788          LOG.error("Encountered Storage Exception for read on Blob : {}"
789              + " Exception details: {} Error Code : {}",
790              key, e, ((StorageException) innerException).getErrorCode());
791
792          if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
793            throw new FileNotFoundException(String.format("%s is not found", key));
794          }
795        }
796
797       throw e;
798      }
799    }
800
801    @Override
802    public synchronized void close() throws IOException {
803      if (!closed) {
804        closed = true;
805        IOUtils.closeStream(in);
806        in = null;
807      }
808    }
809
810    @Override
811    public synchronized void seek(long pos) throws FileNotFoundException, EOFException, IOException {
812      try {
813        checkNotClosed();
814        if (pos < 0) {
815          throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
816        }
817        IOUtils.closeStream(in);
818        in = store.retrieve(key);
819        this.pos = in.skip(pos);
820        LOG.debug("Seek to position {}. Bytes skipped {}", pos,
821          this.pos);
822      } catch(IOException e) {
823
824        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
825
826        if (innerException instanceof StorageException
827            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
828          throw new FileNotFoundException(String.format("%s is not found", key));
829        }
830
831        throw e;
832      } catch(IndexOutOfBoundsException e) {
833        throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
834      }
835    }
836
837    @Override
838    public synchronized long getPos() throws IOException {
839      return pos;
840    }
841
842    @Override
843    public boolean seekToNewSource(long targetPos) throws IOException {
844      return false;
845    }
846
847
848    /*
849     * Helper method to check if a stream is closed.
850     */
851    private void checkNotClosed() throws IOException {
852      if (closed) {
853        throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
854      }
855    }
856  }
857
858  private class NativeAzureFsOutputStream extends OutputStream {
859    // We should not override flush() to actually close current block and flush
860    // to DFS, this will break applications that assume flush() is a no-op.
861    // Applications are advised to use Syncable.hflush() for that purpose.
862    // NativeAzureFsOutputStream needs to implement Syncable if needed.
863    private String key;
864    private String keyEncoded;
865    private OutputStream out;
866
867    public NativeAzureFsOutputStream(OutputStream out, String aKey,
868        String anEncodedKey) throws IOException {
869      // Check input arguments. The output stream should be non-null and the
870      // keys
871      // should be valid strings.
872      if (null == out) {
873        throw new IllegalArgumentException(
874            "Illegal argument: the output stream is null.");
875      }
876
877      if (null == aKey || 0 == aKey.length()) {
878        throw new IllegalArgumentException(
879            "Illegal argument the key string is null or empty");
880      }
881
882      if (null == anEncodedKey || 0 == anEncodedKey.length()) {
883        throw new IllegalArgumentException(
884            "Illegal argument the encoded key string is null or empty");
885      }
886
887      // Initialize the member variables with the incoming parameters.
888      this.out = out;
889
890      setKey(aKey);
891      setEncodedKey(anEncodedKey);
892    }
893
894    @Override
895    public synchronized void close() throws IOException {
896      if (out != null) {
897        // Close the output stream and decode the key for the output stream
898        // before returning to the caller.
899        //
900        out.close();
901        restoreKey();
902        out = null;
903      }
904    }
905
906    /**
907     * Writes the specified byte to this output stream. The general contract for
908     * write is that one byte is written to the output stream. The byte to be
909     * written is the eight low-order bits of the argument b. The 24 high-order
910     * bits of b are ignored.
911     * 
912     * @param b
913     *          32-bit integer of block of 4 bytes
914     */
915    @Override
916    public void write(int b) throws IOException {
917      try {
918        out.write(b);
919      } catch(IOException e) {
920        if (e.getCause() instanceof StorageException) {
921          StorageException storageExcp  = (StorageException) e.getCause();
922          LOG.error("Encountered Storage Exception for write on Blob : {}"
923              + " Exception details: {} Error Code : {}",
924              key, e.getMessage(), storageExcp.getErrorCode());
925        }
926        throw e;
927      }
928    }
929
930    /**
931     * Writes b.length bytes from the specified byte array to this output
932     * stream. The general contract for write(b) is that it should have exactly
933     * the same effect as the call write(b, 0, b.length).
934     * 
935     * @param b
936     *          Block of bytes to be written to the output stream.
937     */
938    @Override
939    public void write(byte[] b) throws IOException {
940      try {
941        out.write(b);
942      } catch(IOException e) {
943        if (e.getCause() instanceof StorageException) {
944          StorageException storageExcp  = (StorageException) e.getCause();
945          LOG.error("Encountered Storage Exception for write on Blob : {}"
946              + " Exception details: {} Error Code : {}",
947              key, e.getMessage(), storageExcp.getErrorCode());
948        }
949        throw e;
950      }
951    }
952
953    /**
954     * Writes <code>len</code> from the specified byte array starting at offset
955     * <code>off</code> to the output stream. The general contract for write(b,
956     * off, len) is that some of the bytes in the array <code>
957     * b</code b> are written to the output stream in order; element
958     * <code>b[off]</code> is the first byte written and
959     * <code>b[off+len-1]</code> is the last byte written by this operation.
960     * 
961     * @param b
962     *          Byte array to be written.
963     * @param off
964     *          Write this offset in stream.
965     * @param len
966     *          Number of bytes to be written.
967     */
968    @Override
969    public void write(byte[] b, int off, int len) throws IOException {
970      try {
971        out.write(b, off, len);
972      } catch(IOException e) {
973        if (e.getCause() instanceof StorageException) {
974          StorageException storageExcp  = (StorageException) e.getCause();
975          LOG.error("Encountered Storage Exception for write on Blob : {}"
976              + " Exception details: {} Error Code : {}",
977              key, e.getMessage(), storageExcp.getErrorCode());
978        }
979        throw e;
980      }
981    }
982
983    /**
984     * Get the blob name.
985     * 
986     * @return String Blob name.
987     */
988    public String getKey() {
989      return key;
990    }
991
992    /**
993     * Set the blob name.
994     * 
995     * @param key
996     *          Blob name.
997     */
998    public void setKey(String key) {
999      this.key = key;
1000    }
1001
1002    /**
1003     * Get the blob name.
1004     * 
1005     * @return String Blob name.
1006     */
1007    public String getEncodedKey() {
1008      return keyEncoded;
1009    }
1010
1011    /**
1012     * Set the blob name.
1013     * 
1014     * @param anEncodedKey
1015     *          Blob name.
1016     */
1017    public void setEncodedKey(String anEncodedKey) {
1018      this.keyEncoded = anEncodedKey;
1019    }
1020
1021    /**
1022     * Restore the original key name from the m_key member variable. Note: The
1023     * output file stream is created with an encoded blob store key to guarantee
1024     * load balancing on the front end of the Azure storage partition servers.
1025     * The create also includes the name of the original key value which is
1026     * stored in the m_key member variable. This method should only be called
1027     * when the stream is closed.
1028     */
1029    private void restoreKey() throws IOException {
1030      store.rename(getEncodedKey(), getKey());
1031    }
1032  }
1033
1034  private URI uri;
1035  private NativeFileSystemStore store;
1036  private AzureNativeFileSystemStore actualStore;
1037  private Path workingDir;
1038  private long blockSize = MAX_AZURE_BLOCK_SIZE;
1039  private AzureFileSystemInstrumentation instrumentation;
1040  private String metricsSourceName;
1041  private boolean isClosed = false;
1042  private static boolean suppressRetryPolicy = false;
1043  // A counter to create unique (within-process) names for my metrics sources.
1044  private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
1045  private boolean appendSupportEnabled = false;
1046  
1047  public NativeAzureFileSystem() {
1048    // set store in initialize()
1049  }
1050
1051  public NativeAzureFileSystem(NativeFileSystemStore store) {
1052    this.store = store;
1053  }
1054
1055  /**
1056   * Suppress the default retry policy for the Storage, useful in unit tests to
1057   * test negative cases without waiting forever.
1058   */
1059  @VisibleForTesting
1060  static void suppressRetryPolicy() {
1061    suppressRetryPolicy = true;
1062  }
1063
1064  /**
1065   * Undo the effect of suppressRetryPolicy.
1066   */
1067  @VisibleForTesting
1068  static void resumeRetryPolicy() {
1069    suppressRetryPolicy = false;
1070  }
1071
1072  /**
1073   * Creates a new metrics source name that's unique within this process.
1074   */
1075  @VisibleForTesting
1076  public static String newMetricsSourceName() {
1077    int number = metricsSourceNameCounter.incrementAndGet();
1078    final String baseName = "AzureFileSystemMetrics";
1079    if (number == 1) { // No need for a suffix for the first one
1080      return baseName;
1081    } else {
1082      return baseName + number;
1083    }
1084  }
1085  
1086  /**
1087   * Checks if the given URI scheme is a scheme that's affiliated with the Azure
1088   * File System.
1089   * 
1090   * @param scheme
1091   *          The URI scheme.
1092   * @return true iff it's an Azure File System URI scheme.
1093   */
1094  private static boolean isWasbScheme(String scheme) {
1095    // The valid schemes are: asv (old name), asvs (old name over HTTPS),
1096    // wasb (new name), wasbs (new name over HTTPS).
1097    return scheme != null
1098        && (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs")
1099            || scheme.equalsIgnoreCase("wasb") || scheme
1100              .equalsIgnoreCase("wasbs"));
1101  }
1102
1103  /**
1104   * Puts in the authority of the default file system if it is a WASB file
1105   * system and the given URI's authority is null.
1106   * 
1107   * @return The URI with reconstructed authority if necessary and possible.
1108   */
1109  private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) {
1110    if (null == uri.getAuthority()) {
1111      // If WASB is the default file system, get the authority from there
1112      URI defaultUri = FileSystem.getDefaultUri(conf);
1113      if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) {
1114        try {
1115          // Reconstruct the URI with the authority from the default URI.
1116          return new URI(uri.getScheme(), defaultUri.getAuthority(),
1117              uri.getPath(), uri.getQuery(), uri.getFragment());
1118        } catch (URISyntaxException e) {
1119          // This should never happen.
1120          throw new Error("Bad URI construction", e);
1121        }
1122      }
1123    }
1124    return uri;
1125  }
1126
1127  @Override
1128  protected void checkPath(Path path) {
1129    // Make sure to reconstruct the path's authority if needed
1130    super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(),
1131        getConf())));
1132  }
1133
1134  @Override
1135  public void initialize(URI uri, Configuration conf)
1136      throws IOException, IllegalArgumentException {
1137    // Check authority for the URI to guarantee that it is non-null.
1138    uri = reconstructAuthorityIfNeeded(uri, conf);
1139    if (null == uri.getAuthority()) {
1140      final String errMsg = String
1141          .format("Cannot initialize WASB file system, URI authority not recognized.");
1142      throw new IllegalArgumentException(errMsg);
1143    }
1144    super.initialize(uri, conf);
1145
1146    if (store == null) {
1147      store = createDefaultStore(conf);
1148    }
1149
1150    instrumentation = new AzureFileSystemInstrumentation(conf);
1151    if(!conf.getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) {
1152      // Make sure the metrics system is available before interacting with Azure
1153      AzureFileSystemMetricsSystem.fileSystemStarted();
1154      metricsSourceName = newMetricsSourceName();
1155      String sourceDesc = "Azure Storage Volume File System metrics";
1156      AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc,
1157        instrumentation);
1158    }
1159
1160    store.initialize(uri, conf, instrumentation);
1161    setConf(conf);
1162    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
1163    this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
1164        .getShortUserName()).makeQualified(getUri(), getWorkingDirectory());
1165    this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
1166        MAX_AZURE_BLOCK_SIZE);
1167
1168    this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
1169    LOG.debug("NativeAzureFileSystem. Initializing.");
1170    LOG.debug("  blockSize  = {}",
1171        conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
1172
1173  }
1174
1175  private NativeFileSystemStore createDefaultStore(Configuration conf) {
1176    actualStore = new AzureNativeFileSystemStore();
1177
1178    if (suppressRetryPolicy) {
1179      actualStore.suppressRetryPolicy();
1180    }
1181    return actualStore;
1182  }
1183
1184  /**
1185   * Azure Storage doesn't allow the blob names to end in a period,
1186   * so encode this here to work around that limitation.
1187   */
1188  private static String encodeTrailingPeriod(String toEncode) {
1189    Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode);
1190    return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER);
1191  }
1192
1193  /**
1194   * Reverse the encoding done by encodeTrailingPeriod().
1195   */
1196  private static String decodeTrailingPeriod(String toDecode) {
1197    Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode);
1198    return matcher.replaceAll(".");
1199  }
1200
1201  /**
1202   * Convert the path to a key. By convention, any leading or trailing slash is
1203   * removed, except for the special case of a single slash.
1204   */
1205  @VisibleForTesting
1206  public String pathToKey(Path path) {
1207    // Convert the path to a URI to parse the scheme, the authority, and the
1208    // path from the path object.
1209    URI tmpUri = path.toUri();
1210    String pathUri = tmpUri.getPath();
1211
1212    // The scheme and authority is valid. If the path does not exist add a "/"
1213    // separator to list the root of the container.
1214    Path newPath = path;
1215    if ("".equals(pathUri)) {
1216      newPath = new Path(tmpUri.toString() + Path.SEPARATOR);
1217    }
1218
1219    // Verify path is absolute if the path refers to a windows drive scheme.
1220    if (!newPath.isAbsolute()) {
1221      throw new IllegalArgumentException("Path must be absolute: " + path);
1222    }
1223
1224    String key = null;
1225    key = newPath.toUri().getPath();
1226    key = removeTrailingSlash(key);
1227    key = encodeTrailingPeriod(key);
1228    if (key.length() == 1) {
1229      return key;
1230    } else {
1231      return key.substring(1); // remove initial slash
1232    }
1233  }
1234
1235  // Remove any trailing slash except for the case of a single slash.
1236  private static String removeTrailingSlash(String key) {
1237    if (key.length() == 0 || key.length() == 1) {
1238      return key;
1239    }
1240    if (key.charAt(key.length() - 1) == '/') {
1241      return key.substring(0, key.length() - 1);
1242    } else {
1243      return key;
1244    }
1245  }
1246
1247  private static Path keyToPath(String key) {
1248    if (key.equals("/")) {
1249      return new Path("/"); // container
1250    }
1251    return new Path("/" + decodeTrailingPeriod(key));
1252  }
1253
1254  /**
1255   * Get the absolute version of the path (fully qualified).
1256   * This is public for testing purposes.
1257   *
1258   * @param path
1259   * @return fully qualified path
1260   */
1261  @VisibleForTesting
1262  public Path makeAbsolute(Path path) {
1263    if (path.isAbsolute()) {
1264      return path;
1265    }
1266    return new Path(workingDir, path);
1267  }
1268
1269  /**
1270   * For unit test purposes, retrieves the AzureNativeFileSystemStore store
1271   * backing this file system.
1272   * 
1273   * @return The store object.
1274   */
1275  @VisibleForTesting
1276  public AzureNativeFileSystemStore getStore() {
1277    return actualStore;
1278  }
1279  
1280  NativeFileSystemStore getStoreInterface() {
1281    return store;
1282  }
1283
1284  /**
1285   * Gets the metrics source for this file system.
1286   * This is mainly here for unit testing purposes.
1287   *
1288   * @return the metrics source.
1289   */
1290  public AzureFileSystemInstrumentation getInstrumentation() {
1291    return instrumentation;
1292  }
1293
1294  /** This optional operation is not yet supported. */
1295  @Override
1296  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
1297      throws IOException {
1298
1299    if (!appendSupportEnabled) {
1300      throw new UnsupportedOperationException("Append Support not enabled");
1301    }
1302
1303    LOG.debug("Opening file: {} for append", f);
1304
1305    Path absolutePath = makeAbsolute(f);
1306    String key = pathToKey(absolutePath);
1307    FileMetadata meta = null;
1308    try {
1309      meta = store.retrieveMetadata(key);
1310    } catch(Exception ex) {
1311
1312      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
1313
1314      if (innerException instanceof StorageException
1315          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1316
1317        throw new FileNotFoundException(String.format("%s is not found", key));
1318      } else {
1319        throw ex;
1320      }
1321    }
1322
1323    if (meta == null) {
1324      throw new FileNotFoundException(f.toString());
1325    }
1326
1327    if (meta.isDir()) {
1328      throw new FileNotFoundException(f.toString()
1329          + " is a directory not a file.");
1330    }
1331
1332    if (store.isPageBlobKey(key)) {
1333      throw new IOException("Append not supported for Page Blobs");
1334    }
1335
1336    DataOutputStream appendStream = null;
1337
1338    try {
1339      appendStream = store.retrieveAppendStream(key, bufferSize);
1340    } catch (Exception ex) {
1341
1342      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
1343
1344      if (innerException instanceof StorageException
1345          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1346        throw new FileNotFoundException(String.format("%s is not found", key));
1347      } else {
1348        throw ex;
1349      }
1350    }
1351
1352    return new FSDataOutputStream(appendStream, statistics);
1353  }
1354
1355  @Override
1356  public FSDataOutputStream create(Path f, FsPermission permission,
1357      boolean overwrite, int bufferSize, short replication, long blockSize,
1358      Progressable progress) throws IOException {
1359    return create(f, permission, overwrite, true,
1360        bufferSize, replication, blockSize, progress,
1361        (SelfRenewingLease) null);
1362  }
1363
1364  /**
1365   * Get a self-renewing lease on the specified file.
1366   */
1367  public SelfRenewingLease acquireLease(Path path) throws AzureException {
1368    String fullKey = pathToKey(makeAbsolute(path));
1369    return getStore().acquireLease(fullKey);
1370  }
1371
1372  @Override
1373  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1374      boolean overwrite, int bufferSize, short replication, long blockSize,
1375      Progressable progress) throws IOException {
1376
1377    Path parent = f.getParent();
1378
1379    // Get exclusive access to folder if this is a directory designated
1380    // for atomic rename. The primary use case of for HBase write-ahead
1381    // log file management.
1382    SelfRenewingLease lease = null;
1383    if (store.isAtomicRenameKey(pathToKey(f))) {
1384      try {
1385        lease = acquireLease(parent);
1386      } catch (AzureException e) {
1387
1388        String errorCode = "";
1389        try {
1390          StorageException e2 = (StorageException) e.getCause();
1391          errorCode = e2.getErrorCode();
1392        } catch (Exception e3) {
1393          // do nothing if cast fails
1394        }
1395        if (errorCode.equals("BlobNotFound")) {
1396          throw new FileNotFoundException("Cannot create file " +
1397              f.getName() + " because parent folder does not exist.");
1398        }
1399
1400        LOG.warn("Got unexpected exception trying to get lease on {} . {}",
1401          pathToKey(parent), e.getMessage());
1402        throw e;
1403      }
1404    }
1405
1406    // See if the parent folder exists. If not, throw error.
1407    // The exists() check will push any pending rename operation forward,
1408    // if there is one, and return false.
1409    //
1410    // At this point, we have exclusive access to the source folder
1411    // via the lease, so we will not conflict with an active folder
1412    // rename operation.
1413    if (!exists(parent)) {
1414      try {
1415
1416        // This'll let the keep-alive thread exit as soon as it wakes up.
1417        lease.free();
1418      } catch (Exception e) {
1419        LOG.warn("Unable to free lease because: {}", e.getMessage());
1420      }
1421      throw new FileNotFoundException("Cannot create file " +
1422          f.getName() + " because parent folder does not exist.");
1423    }
1424
1425    // Create file inside folder.
1426    FSDataOutputStream out = null;
1427    try {
1428      out = create(f, permission, overwrite, false,
1429          bufferSize, replication, blockSize, progress, lease);
1430    } finally {
1431      // Release exclusive access to folder.
1432      try {
1433        if (lease != null) {
1434          lease.free();
1435        }
1436      } catch (Exception e) {
1437        NativeAzureFileSystemHelper.cleanup(LOG, out);
1438        String msg = "Unable to free lease on " + parent.toUri();
1439        LOG.error(msg);
1440        throw new IOException(msg, e);
1441      }
1442    }
1443    return out;
1444  }
1445
1446  @Override
1447  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1448      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1449      Progressable progress) throws IOException {
1450
1451    // Check if file should be appended or overwritten. Assume that the file
1452    // is overwritten on if the CREATE and OVERWRITE create flags are set. Note
1453    // that any other combinations of create flags will result in an open new or
1454    // open with append.
1455    final EnumSet<CreateFlag> createflags =
1456        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
1457    boolean overwrite = flags.containsAll(createflags);
1458
1459    // Delegate the create non-recursive call.
1460    return this.createNonRecursive(f, permission, overwrite,
1461        bufferSize, replication, blockSize, progress);
1462  }
1463
1464  @Override
1465  public FSDataOutputStream createNonRecursive(Path f,
1466      boolean overwrite, int bufferSize, short replication, long blockSize,
1467      Progressable progress) throws IOException {
1468    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1469        overwrite, bufferSize, replication, blockSize, progress);
1470  }
1471
1472
1473  /**
1474   * Create an Azure blob and return an output stream to use
1475   * to write data to it.
1476   *
1477   * @param f
1478   * @param permission
1479   * @param overwrite
1480   * @param createParent
1481   * @param bufferSize
1482   * @param replication
1483   * @param blockSize
1484   * @param progress
1485   * @param parentFolderLease Lease on parent folder (or null if
1486   * no lease).
1487   * @return
1488   * @throws IOException
1489   */
1490  private FSDataOutputStream create(Path f, FsPermission permission,
1491      boolean overwrite, boolean createParent, int bufferSize,
1492      short replication, long blockSize, Progressable progress,
1493      SelfRenewingLease parentFolderLease)
1494          throws FileAlreadyExistsException, IOException {
1495
1496    LOG.debug("Creating file: {}", f.toString());
1497
1498    if (containsColon(f)) {
1499      throw new IOException("Cannot create file " + f
1500          + " through WASB that has colons in the name");
1501    }
1502
1503    Path absolutePath = makeAbsolute(f);
1504    String key = pathToKey(absolutePath);
1505
1506    FileMetadata existingMetadata = store.retrieveMetadata(key);
1507    if (existingMetadata != null) {
1508      if (existingMetadata.isDir()) {
1509        throw new FileAlreadyExistsException("Cannot create file " + f
1510            + "; already exists as a directory.");
1511      }
1512      if (!overwrite) {
1513        throw new FileAlreadyExistsException("File already exists:" + f);
1514      }
1515    }
1516
1517    Path parentFolder = absolutePath.getParent();
1518    if (parentFolder != null && parentFolder.getParent() != null) { // skip root
1519      // Update the parent folder last modified time if the parent folder
1520      // already exists.
1521      String parentKey = pathToKey(parentFolder);
1522      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
1523      if (parentMetadata != null && parentMetadata.isDir() &&
1524        parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
1525        if (parentFolderLease != null) {
1526          store.updateFolderLastModifiedTime(parentKey, parentFolderLease);
1527        } else {
1528          updateParentFolderLastModifiedTime(key);
1529        }
1530      } else {
1531        // Make sure that the parent folder exists.
1532        // Create it using inherited permissions from the first existing directory going up the path
1533        Path firstExisting = parentFolder.getParent();
1534        FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting));
1535        while(metadata == null) {
1536          // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata
1537          firstExisting = firstExisting.getParent();
1538          metadata = store.retrieveMetadata(pathToKey(firstExisting));
1539        }
1540        mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true);
1541      }
1542    }
1543
1544    // Mask the permission first (with the default permission mask as well).
1545    FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile);
1546    PermissionStatus permissionStatus = createPermissionStatus(masked);
1547
1548    OutputStream bufOutStream;
1549    if (store.isPageBlobKey(key)) {
1550      // Store page blobs directly in-place without renames.
1551      bufOutStream = store.storefile(key, permissionStatus);
1552    } else {
1553      // This is a block blob, so open the output blob stream based on the
1554      // encoded key.
1555      //
1556      String keyEncoded = encodeKey(key);
1557
1558
1559      // First create a blob at the real key, pointing back to the temporary file
1560      // This accomplishes a few things:
1561      // 1. Makes sure we can create a file there.
1562      // 2. Makes it visible to other concurrent threads/processes/nodes what
1563      // we're
1564      // doing.
1565      // 3. Makes it easier to restore/cleanup data in the event of us crashing.
1566      store.storeEmptyLinkFile(key, keyEncoded, permissionStatus);
1567
1568      // The key is encoded to point to a common container at the storage server.
1569      // This reduces the number of splits on the server side when load balancing.
1570      // Ingress to Azure storage can take advantage of earlier splits. We remove
1571      // the root path to the key and prefix a random GUID to the tail (or leaf
1572      // filename) of the key. Keys are thus broadly and randomly distributed over
1573      // a single container to ease load balancing on the storage server. When the
1574      // blob is committed it is renamed to its earlier key. Uncommitted blocks
1575      // are not cleaned up and we leave it to Azure storage to garbage collect
1576      // these
1577      // blocks.
1578      bufOutStream = new NativeAzureFsOutputStream(store.storefile(
1579          keyEncoded, permissionStatus), key, keyEncoded);
1580    }
1581    // Construct the data output stream from the buffered output stream.
1582    FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
1583
1584    
1585    // Increment the counter
1586    instrumentation.fileCreated();
1587    
1588    // Return data output stream to caller.
1589    return fsOut;
1590  }
1591
1592  @Override
1593  @Deprecated
1594  public boolean delete(Path path) throws IOException {
1595    return delete(path, true);
1596  }
1597
1598  @Override
1599  public boolean delete(Path f, boolean recursive) throws IOException {
1600    return delete(f, recursive, false);
1601  }
1602
1603  /**
1604   * Delete the specified file or folder. The parameter
1605   * skipParentFolderLastModifidedTimeUpdate
1606   * is used in the case of atomic folder rename redo. In that case, there is
1607   * a lease on the parent folder, so (without reworking the code) modifying
1608   * the parent folder update time will fail because of a conflict with the
1609   * lease. Since we are going to delete the folder soon anyway so accurate
1610   * modified time is not necessary, it's easier to just skip
1611   * the modified time update.
1612   *
1613   * @param f
1614   * @param recursive
1615   * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last
1616   * modified time.
1617   * @return true if and only if the file is deleted
1618   * @throws IOException
1619   */
1620  public boolean delete(Path f, boolean recursive,
1621      boolean skipParentFolderLastModifidedTimeUpdate) throws IOException {
1622
1623    LOG.debug("Deleting file: {}", f.toString());
1624
1625    Path absolutePath = makeAbsolute(f);
1626    String key = pathToKey(absolutePath);
1627
1628    // Capture the metadata for the path.
1629    //
1630    FileMetadata metaFile = null;
1631    try {
1632      metaFile = store.retrieveMetadata(key);
1633    } catch (IOException e) {
1634
1635      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1636
1637      if (innerException instanceof StorageException
1638          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1639
1640        return false;
1641      }
1642      throw e;
1643    }
1644
1645    if (null == metaFile) {
1646      // The path to be deleted does not exist.
1647      return false;
1648    }
1649
1650    // The path exists, determine if it is a folder containing objects,
1651    // an empty folder, or a simple file and take the appropriate actions.
1652    if (!metaFile.isDir()) {
1653      // The path specifies a file. We need to check the parent path
1654      // to make sure it's a proper materialized directory before we
1655      // delete the file. Otherwise we may get into a situation where
1656      // the file we were deleting was the last one in an implicit directory
1657      // (e.g. the blob store only contains the blob a/b and there's no
1658      // corresponding directory blob a) and that would implicitly delete
1659      // the directory as well, which is not correct.
1660      Path parentPath = absolutePath.getParent();
1661      if (parentPath.getParent() != null) {// Not root
1662        String parentKey = pathToKey(parentPath);
1663
1664        FileMetadata parentMetadata = null;
1665        try {
1666          parentMetadata = store.retrieveMetadata(parentKey);
1667        } catch (IOException e) {
1668
1669          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1670
1671          if (innerException instanceof StorageException) {
1672            // Invalid State.
1673            // A FileNotFoundException is not thrown here as the API returns false
1674            // if the file not present. But not retrieving metadata here is an
1675            // unrecoverable state and can only happen if there is a race condition
1676            // hence throwing a IOException
1677            if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1678              throw new IOException("File " + f + " has a parent directory "
1679                  + parentPath + " whose metadata cannot be retrieved. Can't resolve");
1680            }
1681          }
1682          throw e;
1683        }
1684
1685        // Invalid State.
1686        // A FileNotFoundException is not thrown here as the API returns false
1687        // if the file not present. But not retrieving metadata here is an
1688        // unrecoverable state and can only happen if there is a race condition
1689        // hence throwing a IOException
1690        if (parentMetadata == null) {
1691          throw new IOException("File " + f + " has a parent directory "
1692              + parentPath + " whose metadata cannot be retrieved. Can't resolve");
1693        }
1694
1695        if (!parentMetadata.isDir()) {
1696          // Invalid state: the parent path is actually a file. Throw.
1697          throw new AzureException("File " + f + " has a parent directory "
1698              + parentPath + " which is also a file. Can't resolve.");
1699        }
1700
1701        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
1702          LOG.debug("Found an implicit parent directory while trying to"
1703              + " delete the file {}. Creating the directory blob for"
1704              + " it in {}.", f, parentKey);
1705
1706          store.storeEmptyFolder(parentKey,
1707              createPermissionStatus(FsPermission.getDefault()));
1708        } else {
1709          if (!skipParentFolderLastModifidedTimeUpdate) {
1710            updateParentFolderLastModifiedTime(key);
1711          }
1712        }
1713      }
1714
1715      try {
1716        store.delete(key);
1717        instrumentation.fileDeleted();
1718      } catch(IOException e) {
1719
1720        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1721
1722        if (innerException instanceof StorageException
1723            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1724          return false;
1725        }
1726
1727       throw e;
1728      }
1729    } else {
1730      // The path specifies a folder. Recursively delete all entries under the
1731      // folder.
1732      LOG.debug("Directory Delete encountered: {}", f.toString());
1733      Path parentPath = absolutePath.getParent();
1734      if (parentPath.getParent() != null) {
1735        String parentKey = pathToKey(parentPath);
1736        FileMetadata parentMetadata = null;
1737
1738        try {
1739          parentMetadata = store.retrieveMetadata(parentKey);
1740        } catch (IOException e) {
1741
1742          Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1743
1744          if (innerException instanceof StorageException) {
1745            // Invalid State.
1746            // A FileNotFoundException is not thrown here as the API returns false
1747            // if the file not present. But not retrieving metadata here is an
1748            // unrecoverable state and can only happen if there is a race condition
1749            // hence throwing a IOException
1750            if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1751              throw new IOException("File " + f + " has a parent directory "
1752                  + parentPath + " whose metadata cannot be retrieved. Can't resolve");
1753            }
1754          }
1755          throw e;
1756        }
1757
1758        // Invalid State.
1759        // A FileNotFoundException is not thrown here as the API returns false
1760        // if the file not present. But not retrieving metadata here is an
1761        // unrecoverable state and can only happen if there is a race condition
1762        // hence throwing a IOException
1763        if (parentMetadata == null) {
1764          throw new IOException("File " + f + " has a parent directory "
1765              + parentPath + " whose metadata cannot be retrieved. Can't resolve");
1766        }
1767
1768        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
1769          LOG.debug("Found an implicit parent directory while trying to"
1770              + " delete the directory {}. Creating the directory blob for"
1771              + " it in {}. ", f, parentKey);
1772
1773          store.storeEmptyFolder(parentKey,
1774              createPermissionStatus(FsPermission.getDefault()));
1775        }
1776      }
1777
1778      // List all the blobs in the current folder.
1779      String priorLastKey = null;
1780      PartialListing listing = null;
1781      try {
1782        listing = store.listAll(key, AZURE_LIST_ALL, 1,
1783            priorLastKey);
1784      } catch(IOException e) {
1785
1786        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1787
1788        if (innerException instanceof StorageException
1789            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1790          return false;
1791        }
1792
1793        throw e;
1794      }
1795
1796      if (listing == null) {
1797        return false;
1798      }
1799
1800      FileMetadata[] contents = listing.getFiles();
1801      if (!recursive && contents.length > 0) {
1802        // The folder is non-empty and recursive delete was not specified.
1803        // Throw an exception indicating that a non-recursive delete was
1804        // specified for a non-empty folder.
1805        throw new IOException("Non-recursive delete of non-empty directory "
1806            + f.toString());
1807      }
1808
1809      // Delete all the files in the folder.
1810      for (FileMetadata p : contents) {
1811        // Tag on the directory name found as the suffix of the suffix of the
1812        // parent directory to get the new absolute path.
1813        String suffix = p.getKey().substring(
1814            p.getKey().lastIndexOf(PATH_DELIMITER));
1815        if (!p.isDir()) {
1816          try {
1817            store.delete(key + suffix);
1818            instrumentation.fileDeleted();
1819          } catch(IOException e) {
1820
1821            Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1822
1823            if (innerException instanceof StorageException
1824                && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1825              return false;
1826            }
1827
1828            throw e;
1829          }
1830        } else {
1831          // Recursively delete contents of the sub-folders. Notice this also
1832          // deletes the blob for the directory.
1833          if (!delete(new Path(f.toString() + suffix), true)) {
1834            return false;
1835          }
1836        }
1837      }
1838
1839      try {
1840        store.delete(key);
1841      } catch(IOException e) {
1842
1843        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
1844
1845        if (innerException instanceof StorageException
1846            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1847          return false;
1848        }
1849
1850        throw e;
1851      }
1852
1853      // Update parent directory last modified time
1854      Path parent = absolutePath.getParent();
1855      if (parent != null && parent.getParent() != null) { // not root
1856        if (!skipParentFolderLastModifidedTimeUpdate) {
1857          updateParentFolderLastModifiedTime(key);
1858        }
1859      }
1860      instrumentation.directoryDeleted();
1861    }
1862
1863    // File or directory was successfully deleted.
1864    LOG.debug("Delete Successful for : {}", f.toString());
1865    return true;
1866  }
1867
1868  @Override
1869  public FileStatus getFileStatus(Path f) throws FileNotFoundException, IOException {
1870
1871    LOG.debug("Getting the file status for {}", f.toString());
1872
1873    // Capture the absolute path and the path to key.
1874    Path absolutePath = makeAbsolute(f);
1875    String key = pathToKey(absolutePath);
1876    if (key.length() == 0) { // root always exists
1877      return newDirectory(null, absolutePath);
1878    }
1879
1880    // The path is either a folder or a file. Retrieve metadata to
1881    // determine if it is a directory or file.
1882    FileMetadata meta = null;
1883    try {
1884      meta = store.retrieveMetadata(key);
1885    } catch(Exception ex) {
1886
1887      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
1888
1889      if (innerException instanceof StorageException
1890          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1891
1892          throw new FileNotFoundException(String.format("%s is not found", key));
1893       }
1894
1895      throw ex;
1896    }
1897
1898    if (meta != null) {
1899      if (meta.isDir()) {
1900        // The path is a folder with files in it.
1901        //
1902
1903        LOG.debug("Path {} is a folder.", f.toString());
1904
1905        // If a rename operation for the folder was pending, redo it.
1906        // Then the file does not exist, so signal that.
1907        if (conditionalRedoFolderRename(f)) {
1908          throw new FileNotFoundException(
1909              absolutePath + ": No such file or directory.");
1910        }
1911
1912        // Return reference to the directory object.
1913        return newDirectory(meta, absolutePath);
1914      }
1915
1916      // The path is a file.
1917      LOG.debug("Found the path: {} as a file.", f.toString());
1918
1919      // Return with reference to a file object.
1920      return newFile(meta, absolutePath);
1921    }
1922
1923    // File not found. Throw exception no such file or directory.
1924    //
1925    throw new FileNotFoundException(
1926        absolutePath + ": No such file or directory.");
1927  }
1928
1929  // Return true if there is a rename pending and we redo it, otherwise false.
1930  private boolean conditionalRedoFolderRename(Path f) throws IOException {
1931
1932    // Can't rename /, so return immediately in that case.
1933    if (f.getName().equals("")) {
1934      return false;
1935    }
1936
1937    // Check if there is a -RenamePending.json file for this folder, and if so,
1938    // redo the rename.
1939    Path absoluteRenamePendingFile = renamePendingFilePath(f);
1940    if (exists(absoluteRenamePendingFile)) {
1941      FolderRenamePending pending =
1942          new FolderRenamePending(absoluteRenamePendingFile, this);
1943      pending.redo();
1944      return true;
1945    } else {
1946      return false;
1947    }
1948  }
1949
1950  // Return the path name that would be used for rename of folder with path f.
1951  private Path renamePendingFilePath(Path f) {
1952    Path absPath = makeAbsolute(f);
1953    String key = pathToKey(absPath);
1954    key += "-RenamePending.json";
1955    return keyToPath(key);
1956  }
1957
1958  @Override
1959  public URI getUri() {
1960    return uri;
1961  }
1962
1963  /**
1964   * Retrieve the status of a given path if it is a file, or of all the
1965   * contained files if it is a directory.
1966   */
1967  @Override
1968  public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
1969
1970    LOG.debug("Listing status for {}", f.toString());
1971
1972    Path absolutePath = makeAbsolute(f);
1973    String key = pathToKey(absolutePath);
1974    Set<FileStatus> status = new TreeSet<FileStatus>();
1975    FileMetadata meta = null;
1976    try {
1977      meta = store.retrieveMetadata(key);
1978    } catch (IOException ex) {
1979
1980      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
1981
1982      if (innerException instanceof StorageException
1983          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
1984
1985        throw new FileNotFoundException(String.format("%s is not found", f));
1986      }
1987
1988      throw ex;
1989    }
1990
1991    if (meta != null) {
1992      if (!meta.isDir()) {
1993
1994        LOG.debug("Found path as a file");
1995
1996        return new FileStatus[] { newFile(meta, absolutePath) };
1997      }
1998
1999      String partialKey = null;
2000      PartialListing listing = null;
2001
2002      try {
2003        listing  = store.list(key, AZURE_LIST_ALL, 1, partialKey);
2004      } catch (IOException ex) {
2005
2006        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2007
2008        if (innerException instanceof StorageException
2009            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2010
2011            throw new FileNotFoundException(String.format("%s is not found", key));
2012        }
2013
2014        throw ex;
2015      }
2016      // NOTE: We don't check for Null condition as the Store API should return
2017      // an empty list if there are not listing.
2018
2019      // For any -RenamePending.json files in the listing,
2020      // push the rename forward.
2021      boolean renamed = conditionalRedoFolderRenames(listing);
2022
2023      // If any renames were redone, get another listing,
2024      // since the current one may have changed due to the redo.
2025      if (renamed) {
2026       listing = null;
2027       try {
2028         listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
2029       } catch (IOException ex) {
2030         Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2031
2032         if (innerException instanceof StorageException
2033             && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2034
2035           throw new FileNotFoundException(String.format("%s is not found", key));
2036         }
2037
2038         throw ex;
2039       }
2040      }
2041
2042      // NOTE: We don't check for Null condition as the Store API should return
2043      // and empty list if there are not listing.
2044
2045      for (FileMetadata fileMetadata : listing.getFiles()) {
2046        Path subpath = keyToPath(fileMetadata.getKey());
2047
2048        // Test whether the metadata represents a file or directory and
2049        // add the appropriate metadata object.
2050        //
2051        // Note: There was a very old bug here where directories were added
2052        // to the status set as files flattening out recursive listings
2053        // using "-lsr" down the file system hierarchy.
2054        if (fileMetadata.isDir()) {
2055          // Make sure we hide the temp upload folder
2056          if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) {
2057            // Don't expose that.
2058            continue;
2059          }
2060          status.add(newDirectory(fileMetadata, subpath));
2061        } else {
2062          status.add(newFile(fileMetadata, subpath));
2063        }
2064      }
2065
2066      LOG.debug("Found path as a directory with {}"
2067          + " files in it.", status.size());
2068
2069    } else {
2070      // There is no metadata found for the path.
2071      LOG.debug("Did not find any metadata for path: {}", key);
2072
2073      throw new FileNotFoundException("File" + f + " does not exist.");
2074    }
2075
2076    return status.toArray(new FileStatus[0]);
2077  }
2078
2079  // Redo any folder renames needed if there are rename pending files in the
2080  // directory listing. Return true if one or more redo operations were done.
2081  private boolean conditionalRedoFolderRenames(PartialListing listing)
2082      throws IllegalArgumentException, IOException {
2083    boolean renamed = false;
2084    for (FileMetadata fileMetadata : listing.getFiles()) {
2085      Path subpath = keyToPath(fileMetadata.getKey());
2086      if (isRenamePendingFile(subpath)) {
2087        FolderRenamePending pending =
2088            new FolderRenamePending(subpath, this);
2089        pending.redo();
2090        renamed = true;
2091      }
2092    }
2093    return renamed;
2094  }
2095
2096  // True if this is a folder rename pending file, else false.
2097  private boolean isRenamePendingFile(Path path) {
2098    return path.toString().endsWith(FolderRenamePending.SUFFIX);
2099  }
2100
2101  private FileStatus newFile(FileMetadata meta, Path path) {
2102    return new FileStatus (
2103        meta.getLength(),
2104        false,
2105        1,
2106        blockSize,
2107        meta.getLastModified(),
2108        0,
2109        meta.getPermissionStatus().getPermission(),
2110        meta.getPermissionStatus().getUserName(),
2111        meta.getPermissionStatus().getGroupName(),
2112        path.makeQualified(getUri(), getWorkingDirectory()));
2113  }
2114
2115  private FileStatus newDirectory(FileMetadata meta, Path path) {
2116    return new FileStatus (
2117        0,
2118        true,
2119        1,
2120        blockSize,
2121        meta == null ? 0 : meta.getLastModified(),
2122        0,
2123        meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(),
2124        meta == null ? "" : meta.getPermissionStatus().getUserName(),
2125        meta == null ? "" : meta.getPermissionStatus().getGroupName(),
2126        path.makeQualified(getUri(), getWorkingDirectory()));
2127  }
2128
2129  private static enum UMaskApplyMode {
2130    NewFile,
2131    NewDirectory,
2132    NewDirectoryNoUmask,
2133    ChangeExistingFile,
2134    ChangeExistingDirectory,
2135  }
2136
2137  /**
2138   * Applies the applicable UMASK's on the given permission.
2139   * 
2140   * @param permission
2141   *          The permission to mask.
2142   * @param applyMode
2143   *          Whether to also apply the default umask.
2144   * @return The masked persmission.
2145   */
2146  private FsPermission applyUMask(final FsPermission permission,
2147      final UMaskApplyMode applyMode) {
2148    FsPermission newPermission = new FsPermission(permission);
2149    // Apply the default umask - this applies for new files or directories.
2150    if (applyMode == UMaskApplyMode.NewFile
2151        || applyMode == UMaskApplyMode.NewDirectory) {
2152      newPermission = newPermission
2153          .applyUMask(FsPermission.getUMask(getConf()));
2154    }
2155    return newPermission;
2156  }
2157
2158  /**
2159   * Creates the PermissionStatus object to use for the given permission, based
2160   * on the current user in context.
2161   * 
2162   * @param permission
2163   *          The permission for the file.
2164   * @return The permission status object to use.
2165   * @throws IOException
2166   *           If login fails in getCurrentUser
2167   */
2168  @VisibleForTesting
2169  PermissionStatus createPermissionStatus(FsPermission permission)
2170    throws IOException {
2171    // Create the permission status for this file based on current user
2172    return new PermissionStatus(
2173        UserGroupInformation.getCurrentUser().getShortUserName(),
2174        getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME,
2175            AZURE_DEFAULT_GROUP_DEFAULT),
2176        permission);
2177  }
2178
2179  @Override
2180  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
2181      return mkdirs(f, permission, false);
2182  }
2183
2184  public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException {
2185
2186
2187    LOG.debug("Creating directory: {}", f.toString());
2188
2189    if (containsColon(f)) {
2190      throw new IOException("Cannot create directory " + f
2191          + " through WASB that has colons in the name");
2192    }
2193
2194    Path absolutePath = makeAbsolute(f);
2195    PermissionStatus permissionStatus = null;
2196    if(noUmask) {
2197      // ensure owner still has wx permissions at the minimum
2198      permissionStatus = createPermissionStatus(
2199          applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)),
2200              UMaskApplyMode.NewDirectoryNoUmask));
2201    } else {
2202      permissionStatus = createPermissionStatus(
2203          applyUMask(permission, UMaskApplyMode.NewDirectory));
2204    }
2205
2206
2207    ArrayList<String> keysToCreateAsFolder = new ArrayList<String>();
2208    ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>();
2209    boolean childCreated = false;
2210    // Check that there is no file in the parent chain of the given path.
2211    for (Path current = absolutePath, parent = current.getParent();
2212        parent != null; // Stop when you get to the root
2213        current = parent, parent = current.getParent()) {
2214      String currentKey = pathToKey(current);
2215      FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
2216      if (currentMetadata != null && !currentMetadata.isDir()) {
2217        throw new FileAlreadyExistsException("Cannot create directory " + f + " because "
2218            + current + " is an existing file.");
2219      } else if (currentMetadata == null) {
2220        keysToCreateAsFolder.add(currentKey);
2221        childCreated = true;
2222      } else {
2223        // The directory already exists. Its last modified time need to be
2224        // updated if there is a child directory created under it.
2225        if (childCreated) {
2226          keysToUpdateAsFolder.add(currentKey);
2227        }
2228        childCreated = false;
2229      }
2230    }
2231
2232    for (String currentKey : keysToCreateAsFolder) {
2233      store.storeEmptyFolder(currentKey, permissionStatus);
2234    }
2235
2236    instrumentation.directoryCreated();
2237
2238    // otherwise throws exception
2239    return true;
2240  }
2241
2242  @Override
2243  public FSDataInputStream open(Path f, int bufferSize) throws FileNotFoundException, IOException {
2244
2245    LOG.debug("Opening file: {}", f.toString());
2246
2247    Path absolutePath = makeAbsolute(f);
2248    String key = pathToKey(absolutePath);
2249    FileMetadata meta = null;
2250    try {
2251      meta = store.retrieveMetadata(key);
2252    } catch(Exception ex) {
2253
2254      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2255
2256      if (innerException instanceof StorageException
2257          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2258
2259        throw new FileNotFoundException(String.format("%s is not found", key));
2260      }
2261
2262      throw ex;
2263    }
2264
2265    if (meta == null) {
2266      throw new FileNotFoundException(f.toString());
2267    }
2268    if (meta.isDir()) {
2269      throw new FileNotFoundException(f.toString()
2270          + " is a directory not a file.");
2271    }
2272
2273    DataInputStream inputStream = null;
2274    try {
2275      inputStream = store.retrieve(key);
2276    } catch(Exception ex) {
2277      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2278
2279      if (innerException instanceof StorageException
2280          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2281
2282        throw new FileNotFoundException(String.format("%s is not found", key));
2283      }
2284
2285      throw ex;
2286    }
2287
2288    return new FSDataInputStream(new BufferedFSInputStream(
2289        new NativeAzureFsInputStream(inputStream, key, meta.getLength()), bufferSize));
2290  }
2291
2292  @Override
2293  public boolean rename(Path src, Path dst) throws FileNotFoundException, IOException {
2294
2295    FolderRenamePending renamePending = null;
2296
2297    LOG.debug("Moving {} to {}", src, dst);
2298
2299    if (containsColon(dst)) {
2300      throw new IOException("Cannot rename to file " + dst
2301          + " through WASB that has colons in the name");
2302    }
2303
2304    String srcKey = pathToKey(makeAbsolute(src));
2305
2306    if (srcKey.length() == 0) {
2307      // Cannot rename root of file system
2308      return false;
2309    }
2310
2311    // Figure out the final destination
2312    Path absoluteDst = makeAbsolute(dst);
2313    String dstKey = pathToKey(absoluteDst);
2314    FileMetadata dstMetadata = null;
2315    try {
2316      dstMetadata = store.retrieveMetadata(dstKey);
2317    } catch (IOException ex) {
2318
2319      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2320
2321      // A BlobNotFound storage exception in only thrown from retrieveMetdata API when
2322      // there is a race condition. If there is another thread which deletes the destination
2323      // file or folder, then this thread calling rename should be able to continue with
2324      // rename gracefully. Hence the StorageException is swallowed here.
2325      if (innerException instanceof StorageException) {
2326        if (NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2327          LOG.debug("BlobNotFound exception encountered for Destination key : {}. "
2328              + "Swallowin the exception to handle race condition gracefully", dstKey);
2329        }
2330      } else {
2331        throw ex;
2332      }
2333    }
2334
2335    if (dstMetadata != null && dstMetadata.isDir()) {
2336      // It's an existing directory.
2337      dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
2338      LOG.debug("Destination {} "
2339          + " is a directory, adjusted the destination to be {}", dst, dstKey);
2340    } else if (dstMetadata != null) {
2341      // Attempting to overwrite a file using rename()
2342      LOG.debug("Destination {}"
2343          + " is an already existing file, failing the rename.", dst);
2344      return false;
2345    } else {
2346      // Check that the parent directory exists.
2347      FileMetadata parentOfDestMetadata = null;
2348      try {
2349        parentOfDestMetadata = store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
2350      } catch (IOException ex) {
2351
2352        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2353
2354        if (innerException instanceof StorageException
2355            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2356
2357          LOG.debug("Parent of destination {} doesn't exists. Failing rename", dst);
2358          return false;
2359        }
2360
2361        throw ex;
2362      }
2363
2364      if (parentOfDestMetadata == null) {
2365        LOG.debug("Parent of the destination {}"
2366            + " doesn't exist, failing the rename.", dst);
2367        return false;
2368      } else if (!parentOfDestMetadata.isDir()) {
2369        LOG.debug("Parent of the destination {}"
2370            + " is a file, failing the rename.", dst);
2371        return false;
2372      }
2373    }
2374    FileMetadata srcMetadata = null;
2375    try {
2376      srcMetadata = store.retrieveMetadata(srcKey);
2377    } catch (IOException ex) {
2378      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2379
2380      if (innerException instanceof StorageException
2381          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2382
2383        LOG.debug("Source {} doesn't exists. Failing rename", src);
2384        return false;
2385      }
2386
2387      throw ex;
2388    }
2389
2390    if (srcMetadata == null) {
2391      // Source doesn't exist
2392      LOG.debug("Source {} doesn't exist, failing the rename.", src);
2393      return false;
2394    } else if (!srcMetadata.isDir()) {
2395      LOG.debug("Source {} found as a file, renaming.", src);
2396      try {
2397        store.rename(srcKey, dstKey);
2398      } catch(IOException ex) {
2399
2400        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2401
2402        if (innerException instanceof StorageException
2403            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2404
2405          LOG.debug("BlobNotFoundException encountered. Failing rename", src);
2406          return false;
2407        }
2408
2409        throw ex;
2410      }
2411    } else {
2412
2413      // Prepare for, execute and clean up after of all files in folder, and
2414      // the root file, and update the last modified time of the source and
2415      // target parent folders. The operation can be redone if it fails part
2416      // way through, by applying the "Rename Pending" file.
2417
2418      // The following code (internally) only does atomic rename preparation
2419      // and lease management for page blob folders, limiting the scope of the
2420      // operation to HBase log file folders, where atomic rename is required.
2421      // In the future, we could generalize it easily to all folders.
2422      renamePending = prepareAtomicFolderRename(srcKey, dstKey);
2423      renamePending.execute();
2424
2425      LOG.debug("Renamed {} to {} successfully.", src, dst);
2426      renamePending.cleanup();
2427      return true;
2428    }
2429
2430    // Update the last-modified time of the parent folders of both source
2431    // and destination.
2432    updateParentFolderLastModifiedTime(srcKey);
2433    updateParentFolderLastModifiedTime(dstKey);
2434
2435    LOG.debug("Renamed {} to {} successfully.", src, dst);
2436    return true;
2437  }
2438
2439  /**
2440   * Update the last-modified time of the parent folder of the file
2441   * identified by key.
2442   * @param key
2443   * @throws IOException
2444   */
2445  private void updateParentFolderLastModifiedTime(String key)
2446      throws IOException {
2447    Path parent = makeAbsolute(keyToPath(key)).getParent();
2448    if (parent != null && parent.getParent() != null) { // not root
2449      String parentKey = pathToKey(parent);
2450
2451      // ensure the parent is a materialized folder
2452      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
2453      // The metadata could be null if the implicit folder only contains a
2454      // single file. In this case, the parent folder no longer exists if the
2455      // file is renamed; so we can safely ignore the null pointer case.
2456      if (parentMetadata != null) {
2457        if (parentMetadata.isDir()
2458            && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2459          store.storeEmptyFolder(parentKey,
2460              createPermissionStatus(FsPermission.getDefault()));
2461        }
2462
2463        if (store.isAtomicRenameKey(parentKey)) {
2464          SelfRenewingLease lease = null;
2465          try {
2466            lease = leaseSourceFolder(parentKey);
2467            store.updateFolderLastModifiedTime(parentKey, lease);
2468          } catch (AzureException e) {
2469            String errorCode = "";
2470            try {
2471              StorageException e2 = (StorageException) e.getCause();
2472              errorCode = e2.getErrorCode();
2473            } catch (Exception e3) {
2474              // do nothing if cast fails
2475            }
2476            if (errorCode.equals("BlobNotFound")) {
2477              throw new FileNotFoundException("Folder does not exist: " + parentKey);
2478            }
2479            LOG.warn("Got unexpected exception trying to get lease on {}. {}",
2480                parentKey, e.getMessage());
2481            throw e;
2482          } finally {
2483            try {
2484              if (lease != null) {
2485                lease.free();
2486              }
2487            } catch (Exception e) {
2488              LOG.error("Unable to free lease on {}", parentKey, e);
2489            }
2490          }
2491        } else {
2492          store.updateFolderLastModifiedTime(parentKey, null);
2493        }
2494      }
2495    }
2496  }
2497
2498  /**
2499   * If the source is a page blob folder,
2500   * prepare to rename this folder atomically. This means to get exclusive
2501   * access to the source folder, and record the actions to be performed for
2502   * this rename in a "Rename Pending" file. This code was designed to
2503   * meet the needs of HBase, which requires atomic rename of write-ahead log
2504   * (WAL) folders for correctness.
2505   *
2506   * Before calling this method, the caller must ensure that the source is a
2507   * folder.
2508   *
2509   * For non-page-blob directories, prepare the in-memory information needed,
2510   * but don't take the lease or write the redo file. This is done to limit the
2511   * scope of atomic folder rename to HBase, at least at the time of writing
2512   * this code.
2513   *
2514   * @param srcKey Source folder name.
2515   * @param dstKey Destination folder name.
2516   * @throws IOException
2517   */
2518  private FolderRenamePending prepareAtomicFolderRename(
2519      String srcKey, String dstKey) throws IOException {
2520
2521    if (store.isAtomicRenameKey(srcKey)) {
2522
2523      // Block unwanted concurrent access to source folder.
2524      SelfRenewingLease lease = leaseSourceFolder(srcKey);
2525
2526      // Prepare in-memory information needed to do or redo a folder rename.
2527      FolderRenamePending renamePending =
2528          new FolderRenamePending(srcKey, dstKey, lease, this);
2529
2530      // Save it to persistent storage to help recover if the operation fails.
2531      renamePending.writeFile(this);
2532      return renamePending;
2533    } else {
2534      FolderRenamePending renamePending =
2535          new FolderRenamePending(srcKey, dstKey, null, this);
2536      return renamePending;
2537    }
2538  }
2539
2540  /**
2541   * Get a self-renewing Azure blob lease on the source folder zero-byte file.
2542   */
2543  private SelfRenewingLease leaseSourceFolder(String srcKey)
2544      throws AzureException {
2545    return store.acquireLease(srcKey);
2546  }
2547
2548  /**
2549   * Return an array containing hostnames, offset and size of
2550   * portions of the given file. For WASB we'll just lie and give
2551   * fake hosts to make sure we get many splits in MR jobs.
2552   */
2553  @Override
2554  public BlockLocation[] getFileBlockLocations(FileStatus file,
2555      long start, long len) throws IOException {
2556    if (file == null) {
2557      return null;
2558    }
2559
2560    if ((start < 0) || (len < 0)) {
2561      throw new IllegalArgumentException("Invalid start or len parameter");
2562    }
2563
2564    if (file.getLen() < start) {
2565      return new BlockLocation[0];
2566    }
2567    final String blobLocationHost = getConf().get(
2568        AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
2569        AZURE_BLOCK_LOCATION_HOST_DEFAULT);
2570    final String[] name = { blobLocationHost };
2571    final String[] host = { blobLocationHost };
2572    long blockSize = file.getBlockSize();
2573    if (blockSize <= 0) {
2574      throw new IllegalArgumentException(
2575          "The block size for the given file is not a positive number: "
2576              + blockSize);
2577    }
2578    int numberOfLocations = (int) (len / blockSize)
2579        + ((len % blockSize == 0) ? 0 : 1);
2580    BlockLocation[] locations = new BlockLocation[numberOfLocations];
2581    for (int i = 0; i < locations.length; i++) {
2582      long currentOffset = start + (i * blockSize);
2583      long currentLength = Math.min(blockSize, start + len - currentOffset);
2584      locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
2585    }
2586    return locations;
2587  }
2588
2589  /**
2590   * Set the working directory to the given directory.
2591   */
2592  @Override
2593  public void setWorkingDirectory(Path newDir) {
2594    workingDir = makeAbsolute(newDir);
2595  }
2596
2597  @Override
2598  public Path getWorkingDirectory() {
2599    return workingDir;
2600  }
2601
2602  @Override
2603  public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException {
2604    Path absolutePath = makeAbsolute(p);
2605    String key = pathToKey(absolutePath);
2606    FileMetadata metadata = null;
2607    try {
2608      metadata = store.retrieveMetadata(key);
2609    } catch (IOException ex) {
2610      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2611
2612      if (innerException instanceof StorageException
2613          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2614
2615        throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
2616      }
2617
2618      throw ex;
2619    }
2620
2621    if (metadata == null) {
2622      throw new FileNotFoundException("File doesn't exist: " + p);
2623    }
2624    permission = applyUMask(permission,
2625        metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory
2626            : UMaskApplyMode.ChangeExistingFile);
2627    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2628      // It's an implicit folder, need to materialize it.
2629      store.storeEmptyFolder(key, createPermissionStatus(permission));
2630    } else if (!metadata.getPermissionStatus().getPermission().
2631        equals(permission)) {
2632      store.changePermissionStatus(key, new PermissionStatus(
2633          metadata.getPermissionStatus().getUserName(),
2634          metadata.getPermissionStatus().getGroupName(),
2635          permission));
2636    }
2637  }
2638
2639  @Override
2640  public void setOwner(Path p, String username, String groupname)
2641      throws IOException {
2642    Path absolutePath = makeAbsolute(p);
2643    String key = pathToKey(absolutePath);
2644    FileMetadata metadata = null;
2645
2646    try {
2647      metadata = store.retrieveMetadata(key);
2648    } catch (IOException ex) {
2649      Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
2650
2651      if (innerException instanceof StorageException
2652          && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
2653
2654        throw new FileNotFoundException(String.format("File %s doesn't exists.", p));
2655      }
2656
2657      throw ex;
2658    }
2659
2660    if (metadata == null) {
2661      throw new FileNotFoundException("File doesn't exist: " + p);
2662    }
2663
2664    PermissionStatus newPermissionStatus = new PermissionStatus(
2665        username == null ?
2666            metadata.getPermissionStatus().getUserName() : username,
2667        groupname == null ?
2668            metadata.getPermissionStatus().getGroupName() : groupname,
2669        metadata.getPermissionStatus().getPermission());
2670    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2671      // It's an implicit folder, need to materialize it.
2672      store.storeEmptyFolder(key, newPermissionStatus);
2673    } else {
2674      store.changePermissionStatus(key, newPermissionStatus);
2675    }
2676  }
2677
2678  @Override
2679  public synchronized void close() throws IOException {
2680    if (isClosed) {
2681      return;
2682    }
2683
2684    // Call the base close() to close any resources there.
2685    super.close();
2686    // Close the store to close any resources there - e.g. the bandwidth
2687    // updater thread would be stopped at this time.
2688    store.close();
2689    // Notify the metrics system that this file system is closed, which may
2690    // trigger one final metrics push to get the accurate final file system
2691    // metrics out.
2692
2693    long startTime = System.currentTimeMillis();
2694
2695    if(!getConf().getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) {
2696      AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName);
2697      AzureFileSystemMetricsSystem.fileSystemClosed();
2698    }
2699
2700    LOG.debug("Submitting metrics when file system closed took {} ms.",
2701        (System.currentTimeMillis() - startTime));
2702    isClosed = true;
2703  }
2704
2705  /**
2706   * A handler that defines what to do with blobs whose upload was
2707   * interrupted.
2708   */
2709  private abstract class DanglingFileHandler {
2710    abstract void handleFile(FileMetadata file, FileMetadata tempFile)
2711      throws IOException;
2712  }
2713
2714  /**
2715   * Handler implementation for just deleting dangling files and cleaning
2716   * them up.
2717   */
2718  private class DanglingFileDeleter extends DanglingFileHandler {
2719    @Override
2720    void handleFile(FileMetadata file, FileMetadata tempFile)
2721        throws IOException {
2722
2723      LOG.debug("Deleting dangling file {}", file.getKey());
2724      store.delete(file.getKey());
2725      store.delete(tempFile.getKey());
2726    }
2727  }
2728
2729  /**
2730   * Handler implementation for just moving dangling files to recovery
2731   * location (/lost+found).
2732   */
2733  private class DanglingFileRecoverer extends DanglingFileHandler {
2734    private final Path destination;
2735
2736    DanglingFileRecoverer(Path destination) {
2737      this.destination = destination;
2738    }
2739
2740    @Override
2741    void handleFile(FileMetadata file, FileMetadata tempFile)
2742        throws IOException {
2743
2744      LOG.debug("Recovering {}", file.getKey());
2745      // Move to the final destination
2746      String finalDestinationKey =
2747          pathToKey(new Path(destination, file.getKey()));
2748      store.rename(tempFile.getKey(), finalDestinationKey);
2749      if (!finalDestinationKey.equals(file.getKey())) {
2750        // Delete the empty link file now that we've restored it.
2751        store.delete(file.getKey());
2752      }
2753    }
2754  }
2755
2756  /**
2757   * Check if a path has colons in its name
2758   */
2759  private boolean containsColon(Path p) {
2760    return p.toUri().getPath().toString().contains(":");
2761  }
2762
2763  /**
2764   * Implements recover and delete (-move and -delete) behaviors for handling
2765   * dangling files (blobs whose upload was interrupted).
2766   * 
2767   * @param root
2768   *          The root path to check from.
2769   * @param handler
2770   *          The handler that deals with dangling files.
2771   */
2772  private void handleFilesWithDanglingTempData(Path root,
2773      DanglingFileHandler handler) throws IOException {
2774    // Calculate the cut-off for when to consider a blob to be dangling.
2775    long cutoffForDangling = new Date().getTime()
2776        - getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME,
2777            AZURE_TEMP_EXPIRY_DEFAULT) * 1000;
2778    // Go over all the blobs under the given root and look for blobs to
2779    // recover.
2780    String priorLastKey = null;
2781    do {
2782      PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL,
2783          AZURE_UNBOUNDED_DEPTH, priorLastKey);
2784
2785      for (FileMetadata file : listing.getFiles()) {
2786        if (!file.isDir()) { // We don't recover directory blobs
2787          // See if this blob has a link in it (meaning it's a place-holder
2788          // blob for when the upload to the temp blob is complete).
2789          String link = store.getLinkInFileMetadata(file.getKey());
2790          if (link != null) {
2791            // It has a link, see if the temp blob it is pointing to is
2792            // existent and old enough to be considered dangling.
2793            FileMetadata linkMetadata = store.retrieveMetadata(link);
2794            if (linkMetadata != null
2795                && linkMetadata.getLastModified() >= cutoffForDangling) {
2796              // Found one!
2797              handler.handleFile(file, linkMetadata);
2798            }
2799          }
2800        }
2801      }
2802      priorLastKey = listing.getPriorLastKey();
2803    } while (priorLastKey != null);
2804  }
2805
2806  /**
2807   * Looks under the given root path for any blob that are left "dangling",
2808   * meaning that they are place-holder blobs that we created while we upload
2809   * the data to a temporary blob, but for some reason we crashed in the middle
2810   * of the upload and left them there. If any are found, we move them to the
2811   * destination given.
2812   * 
2813   * @param root
2814   *          The root path to consider.
2815   * @param destination
2816   *          The destination path to move any recovered files to.
2817   * @throws IOException
2818   */
2819  public void recoverFilesWithDanglingTempData(Path root, Path destination)
2820      throws IOException {
2821
2822    LOG.debug("Recovering files with dangling temp data in {}", root);
2823    handleFilesWithDanglingTempData(root,
2824        new DanglingFileRecoverer(destination));
2825  }
2826
2827  /**
2828   * Looks under the given root path for any blob that are left "dangling",
2829   * meaning that they are place-holder blobs that we created while we upload
2830   * the data to a temporary blob, but for some reason we crashed in the middle
2831   * of the upload and left them there. If any are found, we delete them.
2832   * 
2833   * @param root
2834   *          The root path to consider.
2835   * @throws IOException
2836   */
2837  public void deleteFilesWithDanglingTempData(Path root) throws IOException {
2838
2839    LOG.debug("Deleting files with dangling temp data in {}", root);
2840    handleFilesWithDanglingTempData(root, new DanglingFileDeleter());
2841  }
2842
2843  @Override
2844  protected void finalize() throws Throwable {
2845    LOG.debug("finalize() called.");
2846    close();
2847    super.finalize();
2848  }
2849
2850  /**
2851   * Encode the key with a random prefix for load balancing in Azure storage.
2852   * Upload data to a random temporary file then do storage side renaming to
2853   * recover the original key.
2854   * 
2855   * @param aKey
2856   * @return Encoded version of the original key.
2857   */
2858  private static String encodeKey(String aKey) {
2859    // Get the tail end of the key name.
2860    //
2861    String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1,
2862        aKey.length());
2863
2864    // Construct the randomized prefix of the file name. The prefix ensures the
2865    // file always drops into the same folder but with a varying tail key name.
2866    String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR
2867        + UUID.randomUUID().toString();
2868
2869    // Concatenate the randomized prefix with the tail of the key name.
2870    String randomizedKey = filePrefix + fileName;
2871
2872    // Return to the caller with the randomized key.
2873    return randomizedKey;
2874  }
2875}