001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.fs.azure;
020
021import java.io.DataInputStream;
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InputStream;
025import java.io.OutputStream;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.nio.charset.Charset;
029import java.text.SimpleDateFormat;
030import java.util.ArrayList;
031import java.util.Date;
032import java.util.EnumSet;
033import java.util.Iterator;
034import java.util.Set;
035import java.util.TimeZone;
036import java.util.TreeSet;
037import java.util.UUID;
038import java.util.concurrent.atomic.AtomicInteger;
039import java.util.regex.Matcher;
040import java.util.regex.Pattern;
041
042import org.apache.commons.lang.StringUtils;
043import org.apache.commons.lang.exception.ExceptionUtils;
044import org.apache.commons.logging.Log;
045import org.apache.commons.logging.LogFactory;
046import org.apache.hadoop.classification.InterfaceAudience;
047import org.apache.hadoop.classification.InterfaceStability;
048import org.apache.hadoop.conf.Configuration;
049import org.apache.hadoop.fs.BlockLocation;
050import org.apache.hadoop.fs.BufferedFSInputStream;
051import org.apache.hadoop.fs.CreateFlag;
052import org.apache.hadoop.fs.FSDataInputStream;
053import org.apache.hadoop.fs.FSDataOutputStream;
054import org.apache.hadoop.fs.FSInputStream;
055import org.apache.hadoop.fs.FileStatus;
056import org.apache.hadoop.fs.FileSystem;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
059import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
060import org.apache.hadoop.fs.permission.FsPermission;
061import org.apache.hadoop.fs.permission.PermissionStatus;
062import org.apache.hadoop.fs.azure.AzureException;
063import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper;
064import org.apache.hadoop.io.IOUtils;
065import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
066import org.apache.hadoop.security.UserGroupInformation;
067import org.apache.hadoop.util.Progressable;
068
069
070import org.codehaus.jackson.JsonNode;
071import org.codehaus.jackson.JsonParseException;
072import org.codehaus.jackson.JsonParser;
073import org.codehaus.jackson.map.JsonMappingException;
074import org.codehaus.jackson.map.ObjectMapper;
075
076import com.google.common.annotations.VisibleForTesting;
077import com.microsoft.azure.storage.AccessCondition;
078import com.microsoft.azure.storage.OperationContext;
079import com.microsoft.azure.storage.StorageException;
080import com.microsoft.azure.storage.blob.CloudBlob;
081import com.microsoft.azure.storage.core.*;
082
083/**
084 * A {@link FileSystem} for reading and writing files stored on <a
085 * href="http://store.azure.com/">Windows Azure</a>. This implementation is
086 * blob-based and stores files on Azure in their native form so they can be read
087 * by other Azure tools.
088 */
089@InterfaceAudience.Public
090@InterfaceStability.Stable
091public class NativeAzureFileSystem extends FileSystem {
092  private static final int USER_WX_PERMISION = 0300;
093
094  /**
095   * A description of a folder rename operation, including the source and
096   * destination keys, and descriptions of the files in the source folder.
097   */
098  public static class FolderRenamePending {
099    private SelfRenewingLease folderLease;
100    private String srcKey;
101    private String dstKey;
102    private FileMetadata[] fileMetadata = null;    // descriptions of source files
103    private ArrayList<String> fileStrings = null;
104    private NativeAzureFileSystem fs;
105    private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000;
106    private static final int FORMATTING_BUFFER = 10000;
107    private boolean committed;
108    public static final String SUFFIX = "-RenamePending.json";
109
110    // Prepare in-memory information needed to do or redo a folder rename.
111    public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease,
112        NativeAzureFileSystem fs) throws IOException {
113      this.srcKey = srcKey;
114      this.dstKey = dstKey;
115      this.folderLease = lease;
116      this.fs = fs;
117      ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
118
119      // List all the files in the folder.
120      String priorLastKey = null;
121      do {
122        PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL,
123          AZURE_UNBOUNDED_DEPTH, priorLastKey);
124        for(FileMetadata file : listing.getFiles()) {
125          fileMetadataList.add(file);
126        }
127        priorLastKey = listing.getPriorLastKey();
128      } while (priorLastKey != null);
129      fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
130      this.committed = true;
131    }
132
133    // Prepare in-memory information needed to do or redo folder rename from
134    // a -RenamePending.json file read from storage. This constructor is to use during
135    // redo processing.
136    public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs)
137        throws IllegalArgumentException, IOException {
138
139      this.fs = fs;
140
141      // open redo file
142      Path f = redoFile;
143      FSDataInputStream input = fs.open(f);
144      byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE];
145      int l = input.read(bytes);
146      if (l < 0) {
147        throw new IOException(
148            "Error reading pending rename file contents -- no data available");
149      }
150      if (l == MAX_RENAME_PENDING_FILE_SIZE) {
151        throw new IOException(
152            "Error reading pending rename file contents -- "
153                + "maximum file size exceeded");
154      }
155      String contents = new String(bytes, 0, l, Charset.forName("UTF-8"));
156
157      // parse the JSON
158      ObjectMapper objMapper = new ObjectMapper();
159      objMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
160      JsonNode json = null;
161      try {
162        json = objMapper.readValue(contents, JsonNode.class);
163        this.committed = true;
164      } catch (JsonMappingException e) {
165
166        // The -RedoPending.json file is corrupted, so we assume it was
167        // not completely written
168        // and the redo operation did not commit.
169        this.committed = false;
170      } catch (JsonParseException e) {
171        this.committed = false;
172      } catch (IOException e) {
173        this.committed = false;  
174      }
175      
176      if (!this.committed) {
177        LOG.error("Deleting corruped rename pending file "
178            + redoFile + "\n" + contents);
179
180        // delete the -RenamePending.json file
181        fs.delete(redoFile, false);
182        return;
183      }
184
185      // initialize this object's fields
186      ArrayList<String> fileStrList = new ArrayList<String>();
187      JsonNode oldFolderName = json.get("OldFolderName");
188      JsonNode newFolderName = json.get("NewFolderName");
189      if (oldFolderName == null || newFolderName == null) {
190          this.committed = false;
191      } else {
192        this.srcKey = oldFolderName.getTextValue();
193        this.dstKey = newFolderName.getTextValue();
194        if (this.srcKey == null || this.dstKey == null) {
195          this.committed = false;         
196        } else {
197          JsonNode fileList = json.get("FileList");
198          if (fileList == null) {
199            this.committed = false;     
200          } else {
201            for (int i = 0; i < fileList.size(); i++) {
202              fileStrList.add(fileList.get(i).getTextValue());
203            }
204          }
205        }
206      }
207      this.fileStrings = fileStrList;
208    }
209
210    public FileMetadata[] getFiles() {
211      return fileMetadata;
212    }
213
214    public SelfRenewingLease getFolderLease() {
215      return folderLease;
216    }
217
218    /**
219     * Write to disk the information needed to redo folder rename,
220     * in JSON format. The file name will be
221     * {@code wasb://<sourceFolderPrefix>/folderName-RenamePending.json}
222     * The file format will be:
223     * <pre>{@code
224     * {
225     *   FormatVersion: "1.0",
226     *   OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>",
227     *   OldFolderName: "<key>",
228     *   NewFolderName: "<key>",
229     *   FileList: [ <string> , <string> , ... ]
230     * }
231     *
232     * Here's a sample:
233     * {
234     *  FormatVersion: "1.0",
235     *  OperationUTCTime: "2014-07-01 23:50:35.572",
236     *  OldFolderName: "user/ehans/folderToRename",
237     *  NewFolderName: "user/ehans/renamedFolder",
238     *  FileList: [
239     *    "innerFile",
240     *    "innerFile2"
241     *  ]
242     * } }</pre>
243     * @throws IOException
244     */
245    public void writeFile(FileSystem fs) throws IOException {
246      Path path = getRenamePendingFilePath();
247      if (LOG.isDebugEnabled()){
248        LOG.debug("Preparing to write atomic rename state to " + path.toString());
249      }
250      OutputStream output = null;
251
252      String contents = makeRenamePendingFileContents();
253
254      // Write file.
255      try {
256        output = fs.create(path);
257        output.write(contents.getBytes(Charset.forName("UTF-8")));
258      } catch (IOException e) {
259        throw new IOException("Unable to write RenamePending file for folder rename from "
260            + srcKey + " to " + dstKey, e);
261      } finally {
262        IOUtils.cleanup(LOG, output);
263      }
264    }
265
266    /**
267     * Return the contents of the JSON file to represent the operations
268     * to be performed for a folder rename.
269     */
270    public String makeRenamePendingFileContents() {
271      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
272      sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
273      String time = sdf.format(new Date());
274
275      // Make file list string
276      StringBuilder builder = new StringBuilder();
277      builder.append("[\n");
278      for (int i = 0; i != fileMetadata.length; i++) {
279        if (i > 0) {
280          builder.append(",\n");
281        }
282        builder.append("    ");
283        String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/");
284
285        // Quote string file names, escaping any possible " characters or other
286        // necessary characters in the name.
287        builder.append(quote(noPrefix));
288        if (builder.length() >=
289            MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) {
290
291          // Give up now to avoid using too much memory.
292          LOG.error("Internal error: Exceeded maximum rename pending file size of "
293              + MAX_RENAME_PENDING_FILE_SIZE + " bytes.");
294
295          // return some bad JSON with an error message to make it human readable
296          return "exceeded maximum rename pending file size";
297        }
298      }
299      builder.append("\n  ]");
300      String fileList = builder.toString();
301
302      // Make file contents as a string. Again, quote file names, escaping
303      // characters as appropriate.
304      String contents = "{\n"
305          + "  FormatVersion: \"1.0\",\n"
306          + "  OperationUTCTime: \"" + time + "\",\n"
307          + "  OldFolderName: " + quote(srcKey) + ",\n"
308          + "  NewFolderName: " + quote(dstKey) + ",\n"
309          + "  FileList: " + fileList + "\n"
310          + "}\n";
311
312      return contents;
313    }
314    
315    /**
316     * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote 
317     * method.
318     * 
319     * Produce a string in double quotes with backslash sequences in all the
320     * right places. A backslash will be inserted within </, allowing JSON
321     * text to be delivered in HTML. In JSON text, a string cannot contain a
322     * control character or an unescaped quote or backslash.
323     * @param string A String
324     * @return  A String correctly formatted for insertion in a JSON text.
325     */
326    private String quote(String string) {
327        if (string == null || string.length() == 0) {
328            return "\"\"";
329        }
330
331        char c = 0;
332        int  i;
333        int  len = string.length();
334        StringBuilder sb = new StringBuilder(len + 4);
335        String t;
336
337        sb.append('"');
338        for (i = 0; i < len; i += 1) {
339            c = string.charAt(i);
340            switch (c) {
341            case '\\':
342            case '"':
343                sb.append('\\');
344                sb.append(c);
345                break;
346            case '/':
347                sb.append('\\');
348                sb.append(c);
349                break;
350            case '\b':
351                sb.append("\\b");
352                break;
353            case '\t':
354                sb.append("\\t");
355                break;
356            case '\n':
357                sb.append("\\n");
358                break;
359            case '\f':
360                sb.append("\\f");
361                break;
362            case '\r':
363                sb.append("\\r");
364                break;
365            default:
366                if (c < ' ') {
367                    t = "000" + Integer.toHexString(c);
368                    sb.append("\\u" + t.substring(t.length() - 4));
369                } else {
370                    sb.append(c);
371                }
372            }
373        }
374        sb.append('"');
375        return sb.toString();
376    }
377
378    public String getSrcKey() {
379      return srcKey;
380    }
381
382    public String getDstKey() {
383      return dstKey;
384    }
385
386    public FileMetadata getSourceMetadata() throws IOException {
387      return fs.getStoreInterface().retrieveMetadata(srcKey);
388    }
389
390    /**
391     * Execute a folder rename. This is the execution path followed
392     * when everything is working normally. See redo() for the alternate
393     * execution path for the case where we're recovering from a folder rename
394     * failure.
395     * @throws IOException
396     */
397    public void execute() throws IOException {
398
399      for (FileMetadata file : this.getFiles()) {
400
401        // Rename all materialized entries under the folder to point to the
402        // final destination.
403        if (file.getBlobMaterialization() == BlobMaterialization.Explicit) {
404          String srcName = file.getKey();
405          String suffix  = srcName.substring((this.getSrcKey()).length());
406          String dstName = this.getDstKey() + suffix;
407
408          // Rename gets exclusive access (via a lease) for files
409          // designated for atomic rename.
410          // The main use case is for HBase write-ahead log (WAL) and data
411          // folder processing correctness.  See the rename code for details.
412          boolean acquireLease = fs.getStoreInterface().isAtomicRenameKey(srcName);
413          fs.getStoreInterface().rename(srcName, dstName, acquireLease, null);
414        }
415      }
416
417      // Rename the source folder 0-byte root file itself.
418      FileMetadata srcMetadata2 = this.getSourceMetadata();
419      if (srcMetadata2.getBlobMaterialization() ==
420          BlobMaterialization.Explicit) {
421
422        // It already has a lease on it from the "prepare" phase so there's no
423        // need to get one now. Pass in existing lease to allow file delete.
424        fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(),
425            false, folderLease);
426      }
427
428      // Update the last-modified time of the parent folders of both source and
429      // destination.
430      fs.updateParentFolderLastModifiedTime(srcKey);
431      fs.updateParentFolderLastModifiedTime(dstKey);
432    }
433
434    /** Clean up after execution of rename.
435     * @throws IOException */
436    public void cleanup() throws IOException {
437
438      if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) {
439
440        // Remove RenamePending file
441        fs.delete(getRenamePendingFilePath(), false);
442
443        // Freeing source folder lease is not necessary since the source
444        // folder file was deleted.
445      }
446    }
447
448    private Path getRenamePendingFilePath() {
449      String fileName = srcKey + SUFFIX;
450      Path fileNamePath = keyToPath(fileName);
451      Path path = fs.makeAbsolute(fileNamePath);
452      return path;
453    }
454
455    /**
456     * Recover from a folder rename failure by redoing the intended work,
457     * as recorded in the -RenamePending.json file.
458     * 
459     * @throws IOException
460     */
461    public void redo() throws IOException {
462
463      if (!committed) {
464
465        // Nothing to do. The -RedoPending.json file should have already been
466        // deleted.
467        return;
468      }
469
470      // Try to get a lease on source folder to block concurrent access to it.
471      // It may fail if the folder is already gone. We don't check if the
472      // source exists explicitly because that could recursively trigger redo
473      // and give an infinite recursion.
474      SelfRenewingLease lease = null;
475      boolean sourceFolderGone = false;
476      try {
477        lease = fs.leaseSourceFolder(srcKey);
478      } catch (AzureException e) {
479
480        // If the source folder was not found then somebody probably
481        // raced with us and finished the rename first, or the
482        // first rename failed right before deleting the rename pending
483        // file.
484        String errorCode = "";
485        try {
486          StorageException se = (StorageException) e.getCause();
487          errorCode = se.getErrorCode();
488        } catch (Exception e2) {
489          ; // do nothing -- could not get errorCode
490        }
491        if (errorCode.equals("BlobNotFound")) {
492          sourceFolderGone = true;
493        } else {
494          throw new IOException(
495              "Unexpected error when trying to lease source folder name during "
496              + "folder rename redo",
497              e);
498        }
499      }
500
501      if (!sourceFolderGone) {
502        // Make sure the target folder exists.
503        Path dst = fullPath(dstKey);
504        if (!fs.exists(dst)) {
505          fs.mkdirs(dst);
506        }
507
508        // For each file inside the folder to be renamed,
509        // make sure it has been renamed.
510        for(String fileName : fileStrings) {
511          finishSingleFileRename(fileName);
512        }
513
514        // Remove the source folder. Don't check explicitly if it exists,
515        // to avoid triggering redo recursively.
516        try {
517          fs.getStoreInterface().delete(srcKey, lease);
518        } catch (Exception e) {
519          LOG.info("Unable to delete source folder during folder rename redo. "
520              + "If the source folder is already gone, this is not an error "
521              + "condition. Continuing with redo.", e);
522        }
523
524        // Update the last-modified time of the parent folders of both source
525        // and destination.
526        fs.updateParentFolderLastModifiedTime(srcKey);
527        fs.updateParentFolderLastModifiedTime(dstKey);
528      }
529
530      // Remove the -RenamePending.json file.
531      fs.delete(getRenamePendingFilePath(), false);
532    }
533
534    // See if the source file is still there, and if it is, rename it.
535    private void finishSingleFileRename(String fileName)
536        throws IOException {
537      Path srcFile = fullPath(srcKey, fileName);
538      Path dstFile = fullPath(dstKey, fileName);
539      boolean srcExists = fs.exists(srcFile);
540      boolean dstExists = fs.exists(dstFile);
541      if (srcExists && !dstExists) {
542
543        // Rename gets exclusive access (via a lease) for HBase write-ahead log
544        // (WAL) file processing correctness.  See the rename code for details.
545        String srcName = fs.pathToKey(srcFile);
546        String dstName = fs.pathToKey(dstFile);
547        fs.getStoreInterface().rename(srcName, dstName, true, null);
548      } else if (srcExists && dstExists) {
549
550        // Get a lease on source to block write access.
551        String srcName = fs.pathToKey(srcFile);
552        SelfRenewingLease lease = fs.acquireLease(srcFile);
553
554        // Delete the file. This will free the lease too.
555        fs.getStoreInterface().delete(srcName, lease);
556      } else if (!srcExists && dstExists) {
557
558        // The rename already finished, so do nothing.
559        ;
560      } else {
561        throw new IOException(
562            "Attempting to complete rename of file " + srcKey + "/" + fileName
563            + " during folder rename redo, and file was not found in source "
564            + "or destination.");
565      }
566    }
567
568    // Return an absolute path for the specific fileName within the folder
569    // specified by folderKey.
570    private Path fullPath(String folderKey, String fileName) {
571      return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName);
572    }
573
574    private Path fullPath(String fileKey) {
575      return new Path(new Path(fs.getUri()), "/" + fileKey);
576    }
577  }
578
579  private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]";
580  private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN =
581      Pattern.compile("\\[\\[\\.\\]\\](?=$|/)");
582  private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)");
583
584  @Override
585  public String getScheme() {
586    return "wasb";
587  }
588
589  
590  /**
591   * <p>
592   * A {@link FileSystem} for reading and writing files stored on <a
593   * href="http://store.azure.com/">Windows Azure</a>. This implementation is
594   * blob-based and stores files on Azure in their native form so they can be read
595   * by other Azure tools. This implementation uses HTTPS for secure network communication.
596   * </p>
597   */
598  public static class Secure extends NativeAzureFileSystem {
599    @Override
600    public String getScheme() {
601      return "wasbs";
602    }
603  }
604
605  public static final Log LOG = LogFactory.getLog(NativeAzureFileSystem.class);
606
607  static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
608  /**
609   * The time span in seconds before which we consider a temp blob to be
610   * dangling (not being actively uploaded to) and up for reclamation.
611   * 
612   * So e.g. if this is 60, then any temporary blobs more than a minute old
613   * would be considered dangling.
614   */
615  static final String AZURE_TEMP_EXPIRY_PROPERTY_NAME = "fs.azure.fsck.temp.expiry.seconds";
616  private static final int AZURE_TEMP_EXPIRY_DEFAULT = 3600;
617  static final String PATH_DELIMITER = Path.SEPARATOR;
618  static final String AZURE_TEMP_FOLDER = "_$azuretmpfolder$";
619
620  private static final int AZURE_LIST_ALL = -1;
621  private static final int AZURE_UNBOUNDED_DEPTH = -1;
622
623  private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L;
624
625  /**
626   * The configuration property that determines which group owns files created
627   * in WASB.
628   */
629  private static final String AZURE_DEFAULT_GROUP_PROPERTY_NAME = "fs.azure.permissions.supergroup";
630  /**
631   * The default value for fs.azure.permissions.supergroup. Chosen as the same
632   * default as DFS.
633   */
634  static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup";
635
636  static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME =
637      "fs.azure.block.location.impersonatedhost";
638  private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT =
639      "localhost";
640  static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME =
641      "fs.azure.ring.buffer.capacity";
642  static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME =
643      "fs.azure.output.stream.buffer.size";
644
645  public static final String SKIP_AZURE_METRICS_PROPERTY_NAME = "fs.azure.skip.metrics";
646
647  private class NativeAzureFsInputStream extends FSInputStream {
648    private InputStream in;
649    private final String key;
650    private long pos = 0;
651    private boolean closed = false;
652    private boolean isPageBlob;
653
654    // File length, valid only for streams over block blobs.
655    private long fileLength;
656
657    public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) {
658      this.in = in;
659      this.key = key;
660      this.isPageBlob = store.isPageBlobKey(key);
661      this.fileLength = fileLength;
662    }
663
664    /**
665     * Return the size of the remaining available bytes
666     * if the size is less than or equal to {@link Integer#MAX_VALUE},
667     * otherwise, return {@link Integer#MAX_VALUE}.
668     *
669     * This is to match the behavior of DFSInputStream.available(),
670     * which some clients may rely on (HBase write-ahead log reading in
671     * particular).
672     */
673    @Override
674    public synchronized int available() throws IOException {
675      if (isPageBlob) {
676        return in.available();
677      } else {
678        if (closed) {
679          throw new IOException("Stream closed");
680        }
681        final long remaining = this.fileLength - pos;
682        return remaining <= Integer.MAX_VALUE ?
683            (int) remaining : Integer.MAX_VALUE;
684      }
685    }
686
687    /*
688     * Reads the next byte of data from the input stream. The value byte is
689     * returned as an integer in the range 0 to 255. If no byte is available
690     * because the end of the stream has been reached, the value -1 is returned.
691     * This method blocks until input data is available, the end of the stream
692     * is detected, or an exception is thrown.
693     *
694     * @returns int An integer corresponding to the byte read.
695     */
696    @Override
697    public synchronized int read() throws IOException {
698      int result = 0;
699      result = in.read();
700      if (result != -1) {
701        pos++;
702        if (statistics != null) {
703          statistics.incrementBytesRead(1);
704        }
705      }
706
707      // Return to the caller with the result.
708      //
709      return result;
710    }
711
712    /*
713     * Reads up to len bytes of data from the input stream into an array of
714     * bytes. An attempt is made to read as many as len bytes, but a smaller
715     * number may be read. The number of bytes actually read is returned as an
716     * integer. This method blocks until input data is available, end of file is
717     * detected, or an exception is thrown. If len is zero, then no bytes are
718     * read and 0 is returned; otherwise, there is an attempt to read at least
719     * one byte. If no byte is available because the stream is at end of file,
720     * the value -1 is returned; otherwise, at least one byte is read and stored
721     * into b.
722     *
723     * @param b -- the buffer into which data is read
724     *
725     * @param off -- the start offset in the array b at which data is written
726     *
727     * @param len -- the maximum number of bytes read
728     *
729     * @ returns int The total number of byes read into the buffer, or -1 if
730     * there is no more data because the end of stream is reached.
731     */
732    @Override
733    public synchronized int read(byte[] b, int off, int len) throws IOException {
734      int result = 0;
735      result = in.read(b, off, len);
736      if (result > 0) {
737        pos += result;
738      }
739
740      if (null != statistics) {
741        statistics.incrementBytesRead(result);
742      }
743
744      // Return to the caller with the result.
745      return result;
746    }
747
748    @Override
749    public void close() throws IOException {
750      in.close();
751      closed = true;
752    }
753
754    @Override
755    public synchronized void seek(long pos) throws IOException {
756     in.close();
757     in = store.retrieve(key);
758     this.pos = in.skip(pos);
759     if (LOG.isDebugEnabled()) {
760       LOG.debug(String.format("Seek to position %d. Bytes skipped %d", pos,
761         this.pos));
762     }
763    }
764
765    @Override
766    public synchronized long getPos() throws IOException {
767      return pos;
768    }
769
770    @Override
771    public boolean seekToNewSource(long targetPos) throws IOException {
772      return false;
773    }
774  }
775
776  private class NativeAzureFsOutputStream extends OutputStream {
777    // We should not override flush() to actually close current block and flush
778    // to DFS, this will break applications that assume flush() is a no-op.
779    // Applications are advised to use Syncable.hflush() for that purpose.
780    // NativeAzureFsOutputStream needs to implement Syncable if needed.
781    private String key;
782    private String keyEncoded;
783    private OutputStream out;
784
785    public NativeAzureFsOutputStream(OutputStream out, String aKey,
786        String anEncodedKey) throws IOException {
787      // Check input arguments. The output stream should be non-null and the
788      // keys
789      // should be valid strings.
790      if (null == out) {
791        throw new IllegalArgumentException(
792            "Illegal argument: the output stream is null.");
793      }
794
795      if (null == aKey || 0 == aKey.length()) {
796        throw new IllegalArgumentException(
797            "Illegal argument the key string is null or empty");
798      }
799
800      if (null == anEncodedKey || 0 == anEncodedKey.length()) {
801        throw new IllegalArgumentException(
802            "Illegal argument the encoded key string is null or empty");
803      }
804
805      // Initialize the member variables with the incoming parameters.
806      this.out = out;
807
808      setKey(aKey);
809      setEncodedKey(anEncodedKey);
810    }
811
812    @Override
813    public synchronized void close() throws IOException {
814      if (out != null) {
815        // Close the output stream and decode the key for the output stream
816        // before returning to the caller.
817        //
818        out.close();
819        restoreKey();
820        out = null;
821      }
822    }
823
824    /**
825     * Writes the specified byte to this output stream. The general contract for
826     * write is that one byte is written to the output stream. The byte to be
827     * written is the eight low-order bits of the argument b. The 24 high-order
828     * bits of b are ignored.
829     * 
830     * @param b
831     *          32-bit integer of block of 4 bytes
832     */
833    @Override
834    public void write(int b) throws IOException {
835      out.write(b);
836    }
837
838    /**
839     * Writes b.length bytes from the specified byte array to this output
840     * stream. The general contract for write(b) is that it should have exactly
841     * the same effect as the call write(b, 0, b.length).
842     * 
843     * @param b
844     *          Block of bytes to be written to the output stream.
845     */
846    @Override
847    public void write(byte[] b) throws IOException {
848      out.write(b);
849    }
850
851    /**
852     * Writes <code>len</code> from the specified byte array starting at offset
853     * <code>off</code> to the output stream. The general contract for write(b,
854     * off, len) is that some of the bytes in the array <code>
855     * b</code b> are written to the output stream in order; element
856     * <code>b[off]</code> is the first byte written and
857     * <code>b[off+len-1]</code> is the last byte written by this operation.
858     * 
859     * @param b
860     *          Byte array to be written.
861     * @param off
862     *          Write this offset in stream.
863     * @param len
864     *          Number of bytes to be written.
865     */
866    @Override
867    public void write(byte[] b, int off, int len) throws IOException {
868      out.write(b, off, len);
869    }
870
871    /**
872     * Get the blob name.
873     * 
874     * @return String Blob name.
875     */
876    public String getKey() {
877      return key;
878    }
879
880    /**
881     * Set the blob name.
882     * 
883     * @param key
884     *          Blob name.
885     */
886    public void setKey(String key) {
887      this.key = key;
888    }
889
890    /**
891     * Get the blob name.
892     * 
893     * @return String Blob name.
894     */
895    public String getEncodedKey() {
896      return keyEncoded;
897    }
898
899    /**
900     * Set the blob name.
901     * 
902     * @param anEncodedKey
903     *          Blob name.
904     */
905    public void setEncodedKey(String anEncodedKey) {
906      this.keyEncoded = anEncodedKey;
907    }
908
909    /**
910     * Restore the original key name from the m_key member variable. Note: The
911     * output file stream is created with an encoded blob store key to guarantee
912     * load balancing on the front end of the Azure storage partition servers.
913     * The create also includes the name of the original key value which is
914     * stored in the m_key member variable. This method should only be called
915     * when the stream is closed.
916     */
917    private void restoreKey() throws IOException {
918      store.rename(getEncodedKey(), getKey());
919    }
920  }
921
922  private URI uri;
923  private NativeFileSystemStore store;
924  private AzureNativeFileSystemStore actualStore;
925  private Path workingDir;
926  private long blockSize = MAX_AZURE_BLOCK_SIZE;
927  private AzureFileSystemInstrumentation instrumentation;
928  private String metricsSourceName;
929  private boolean isClosed = false;
930  private static boolean suppressRetryPolicy = false;
931  // A counter to create unique (within-process) names for my metrics sources.
932  private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
933
934  
935  public NativeAzureFileSystem() {
936    // set store in initialize()
937  }
938
939  public NativeAzureFileSystem(NativeFileSystemStore store) {
940    this.store = store;
941  }
942
943  /**
944   * Suppress the default retry policy for the Storage, useful in unit tests to
945   * test negative cases without waiting forever.
946   */
947  @VisibleForTesting
948  static void suppressRetryPolicy() {
949    suppressRetryPolicy = true;
950  }
951
952  /**
953   * Undo the effect of suppressRetryPolicy.
954   */
955  @VisibleForTesting
956  static void resumeRetryPolicy() {
957    suppressRetryPolicy = false;
958  }
959
960  /**
961   * Creates a new metrics source name that's unique within this process.
962   */
963  @VisibleForTesting
964  public static String newMetricsSourceName() {
965    int number = metricsSourceNameCounter.incrementAndGet();
966    final String baseName = "AzureFileSystemMetrics";
967    if (number == 1) { // No need for a suffix for the first one
968      return baseName;
969    } else {
970      return baseName + number;
971    }
972  }
973  
974  /**
975   * Checks if the given URI scheme is a scheme that's affiliated with the Azure
976   * File System.
977   * 
978   * @param scheme
979   *          The URI scheme.
980   * @return true iff it's an Azure File System URI scheme.
981   */
982  private static boolean isWasbScheme(String scheme) {
983    // The valid schemes are: asv (old name), asvs (old name over HTTPS),
984    // wasb (new name), wasbs (new name over HTTPS).
985    return scheme != null
986        && (scheme.equalsIgnoreCase("asv") || scheme.equalsIgnoreCase("asvs")
987            || scheme.equalsIgnoreCase("wasb") || scheme
988              .equalsIgnoreCase("wasbs"));
989  }
990
991  /**
992   * Puts in the authority of the default file system if it is a WASB file
993   * system and the given URI's authority is null.
994   * 
995   * @return The URI with reconstructed authority if necessary and possible.
996   */
997  private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) {
998    if (null == uri.getAuthority()) {
999      // If WASB is the default file system, get the authority from there
1000      URI defaultUri = FileSystem.getDefaultUri(conf);
1001      if (defaultUri != null && isWasbScheme(defaultUri.getScheme())) {
1002        try {
1003          // Reconstruct the URI with the authority from the default URI.
1004          return new URI(uri.getScheme(), defaultUri.getAuthority(),
1005              uri.getPath(), uri.getQuery(), uri.getFragment());
1006        } catch (URISyntaxException e) {
1007          // This should never happen.
1008          throw new Error("Bad URI construction", e);
1009        }
1010      }
1011    }
1012    return uri;
1013  }
1014
1015  @Override
1016  protected void checkPath(Path path) {
1017    // Make sure to reconstruct the path's authority if needed
1018    super.checkPath(new Path(reconstructAuthorityIfNeeded(path.toUri(),
1019        getConf())));
1020  }
1021
1022  @Override
1023  public void initialize(URI uri, Configuration conf)
1024      throws IOException, IllegalArgumentException {
1025    // Check authority for the URI to guarantee that it is non-null.
1026    uri = reconstructAuthorityIfNeeded(uri, conf);
1027    if (null == uri.getAuthority()) {
1028      final String errMsg = String
1029          .format("Cannot initialize WASB file system, URI authority not recognized.");
1030      throw new IllegalArgumentException(errMsg);
1031    }
1032    super.initialize(uri, conf);
1033
1034    if (store == null) {
1035      store = createDefaultStore(conf);
1036    }
1037
1038    instrumentation = new AzureFileSystemInstrumentation(conf);
1039    if(!conf.getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) {
1040      // Make sure the metrics system is available before interacting with Azure
1041      AzureFileSystemMetricsSystem.fileSystemStarted();
1042      metricsSourceName = newMetricsSourceName();
1043      String sourceDesc = "Azure Storage Volume File System metrics";
1044      AzureFileSystemMetricsSystem.registerSource(metricsSourceName, sourceDesc,
1045        instrumentation);
1046    }
1047
1048    store.initialize(uri, conf, instrumentation);
1049    setConf(conf);
1050    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
1051    this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
1052        .getShortUserName()).makeQualified(getUri(), getWorkingDirectory());
1053    this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
1054        MAX_AZURE_BLOCK_SIZE);
1055
1056    if (LOG.isDebugEnabled()) {
1057      LOG.debug("NativeAzureFileSystem. Initializing.");
1058      LOG.debug("  blockSize  = "
1059          + conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
1060    }
1061  }
1062
1063  private NativeFileSystemStore createDefaultStore(Configuration conf) {
1064    actualStore = new AzureNativeFileSystemStore();
1065
1066    if (suppressRetryPolicy) {
1067      actualStore.suppressRetryPolicy();
1068    }
1069    return actualStore;
1070  }
1071
1072  /**
1073   * Azure Storage doesn't allow the blob names to end in a period,
1074   * so encode this here to work around that limitation.
1075   */
1076  private static String encodeTrailingPeriod(String toEncode) {
1077    Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode);
1078    return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER);
1079  }
1080
1081  /**
1082   * Reverse the encoding done by encodeTrailingPeriod().
1083   */
1084  private static String decodeTrailingPeriod(String toDecode) {
1085    Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode);
1086    return matcher.replaceAll(".");
1087  }
1088
1089  /**
1090   * Convert the path to a key. By convention, any leading or trailing slash is
1091   * removed, except for the special case of a single slash.
1092   */
1093  @VisibleForTesting
1094  public String pathToKey(Path path) {
1095    // Convert the path to a URI to parse the scheme, the authority, and the
1096    // path from the path object.
1097    URI tmpUri = path.toUri();
1098    String pathUri = tmpUri.getPath();
1099
1100    // The scheme and authority is valid. If the path does not exist add a "/"
1101    // separator to list the root of the container.
1102    Path newPath = path;
1103    if ("".equals(pathUri)) {
1104      newPath = new Path(tmpUri.toString() + Path.SEPARATOR);
1105    }
1106
1107    // Verify path is absolute if the path refers to a windows drive scheme.
1108    if (!newPath.isAbsolute()) {
1109      throw new IllegalArgumentException("Path must be absolute: " + path);
1110    }
1111
1112    String key = null;
1113    key = newPath.toUri().getPath();
1114    key = removeTrailingSlash(key);
1115    key = encodeTrailingPeriod(key);
1116    if (key.length() == 1) {
1117      return key;
1118    } else {
1119      return key.substring(1); // remove initial slash
1120    }
1121  }
1122
1123  // Remove any trailing slash except for the case of a single slash.
1124  private static String removeTrailingSlash(String key) {
1125    if (key.length() == 0 || key.length() == 1) {
1126      return key;
1127    }
1128    if (key.charAt(key.length() - 1) == '/') {
1129      return key.substring(0, key.length() - 1);
1130    } else {
1131      return key;
1132    }
1133  }
1134
1135  private static Path keyToPath(String key) {
1136    if (key.equals("/")) {
1137      return new Path("/"); // container
1138    }
1139    return new Path("/" + decodeTrailingPeriod(key));
1140  }
1141
1142  /**
1143   * Get the absolute version of the path (fully qualified).
1144   * This is public for testing purposes.
1145   *
1146   * @param path
1147   * @return fully qualified path
1148   */
1149  @VisibleForTesting
1150  public Path makeAbsolute(Path path) {
1151    if (path.isAbsolute()) {
1152      return path;
1153    }
1154    return new Path(workingDir, path);
1155  }
1156
1157  /**
1158   * For unit test purposes, retrieves the AzureNativeFileSystemStore store
1159   * backing this file system.
1160   * 
1161   * @return The store object.
1162   */
1163  @VisibleForTesting
1164  public AzureNativeFileSystemStore getStore() {
1165    return actualStore;
1166  }
1167  
1168  NativeFileSystemStore getStoreInterface() {
1169    return store;
1170  }
1171
1172  /**
1173   * Gets the metrics source for this file system.
1174   * This is mainly here for unit testing purposes.
1175   *
1176   * @return the metrics source.
1177   */
1178  public AzureFileSystemInstrumentation getInstrumentation() {
1179    return instrumentation;
1180  }
1181
1182  /** This optional operation is not yet supported. */
1183  @Override
1184  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
1185      throws IOException {
1186    throw new IOException("Not supported");
1187  }
1188
1189  @Override
1190  public FSDataOutputStream create(Path f, FsPermission permission,
1191      boolean overwrite, int bufferSize, short replication, long blockSize,
1192      Progressable progress) throws IOException {
1193    return create(f, permission, overwrite, true,
1194        bufferSize, replication, blockSize, progress,
1195        (SelfRenewingLease) null);
1196  }
1197
1198  /**
1199   * Get a self-renewing lease on the specified file.
1200   */
1201  public SelfRenewingLease acquireLease(Path path) throws AzureException {
1202    String fullKey = pathToKey(makeAbsolute(path));
1203    return getStore().acquireLease(fullKey);
1204  }
1205
1206  @Override
1207  @SuppressWarnings("deprecation")
1208  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1209      boolean overwrite, int bufferSize, short replication, long blockSize,
1210      Progressable progress) throws IOException {
1211
1212    Path parent = f.getParent();
1213
1214    // Get exclusive access to folder if this is a directory designated
1215    // for atomic rename. The primary use case of for HBase write-ahead
1216    // log file management.
1217    SelfRenewingLease lease = null;
1218    if (store.isAtomicRenameKey(pathToKey(f))) {
1219      try {
1220        lease = acquireLease(parent);
1221      } catch (AzureException e) {
1222
1223        String errorCode = "";
1224        try {
1225          StorageException e2 = (StorageException) e.getCause();
1226          errorCode = e2.getErrorCode();
1227        } catch (Exception e3) {
1228          // do nothing if cast fails
1229        }
1230        if (errorCode.equals("BlobNotFound")) {
1231          throw new FileNotFoundException("Cannot create file " +
1232              f.getName() + " because parent folder does not exist.");
1233        }
1234
1235        LOG.warn("Got unexpected exception trying to get lease on "
1236          + pathToKey(parent) + ". " + e.getMessage());
1237        throw e;
1238      }
1239    }
1240
1241    // See if the parent folder exists. If not, throw error.
1242    // The exists() check will push any pending rename operation forward,
1243    // if there is one, and return false.
1244    //
1245    // At this point, we have exclusive access to the source folder
1246    // via the lease, so we will not conflict with an active folder
1247    // rename operation.
1248    if (!exists(parent)) {
1249      try {
1250
1251        // This'll let the keep-alive thread exit as soon as it wakes up.
1252        lease.free();
1253      } catch (Exception e) {
1254        LOG.warn("Unable to free lease because: " + e.getMessage());
1255      }
1256      throw new FileNotFoundException("Cannot create file " +
1257          f.getName() + " because parent folder does not exist.");
1258    }
1259
1260    // Create file inside folder.
1261    FSDataOutputStream out = null;
1262    try {
1263      out = create(f, permission, overwrite, false,
1264          bufferSize, replication, blockSize, progress, lease);
1265    } finally {
1266      // Release exclusive access to folder.
1267      try {
1268        if (lease != null) {
1269          lease.free();
1270        }
1271      } catch (Exception e) {
1272        IOUtils.cleanup(LOG, out);
1273        String msg = "Unable to free lease on " + parent.toUri();
1274        LOG.error(msg);
1275        throw new IOException(msg, e);
1276      }
1277    }
1278    return out;
1279  }
1280
1281  @Override
1282  @SuppressWarnings("deprecation")
1283  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
1284      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
1285      Progressable progress) throws IOException {
1286
1287    // Check if file should be appended or overwritten. Assume that the file
1288    // is overwritten on if the CREATE and OVERWRITE create flags are set. Note
1289    // that any other combinations of create flags will result in an open new or
1290    // open with append.
1291    final EnumSet<CreateFlag> createflags =
1292        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
1293    boolean overwrite = flags.containsAll(createflags);
1294
1295    // Delegate the create non-recursive call.
1296    return this.createNonRecursive(f, permission, overwrite,
1297        bufferSize, replication, blockSize, progress);
1298  }
1299
1300  @Override
1301  @SuppressWarnings("deprecation")
1302  public FSDataOutputStream createNonRecursive(Path f,
1303      boolean overwrite, int bufferSize, short replication, long blockSize,
1304      Progressable progress) throws IOException {
1305    return this.createNonRecursive(f, FsPermission.getFileDefault(),
1306        overwrite, bufferSize, replication, blockSize, progress);
1307  }
1308
1309
1310  /**
1311   * Create an Azure blob and return an output stream to use
1312   * to write data to it.
1313   *
1314   * @param f
1315   * @param permission
1316   * @param overwrite
1317   * @param createParent
1318   * @param bufferSize
1319   * @param replication
1320   * @param blockSize
1321   * @param progress
1322   * @param parentFolderLease Lease on parent folder (or null if
1323   * no lease).
1324   * @return
1325   * @throws IOException
1326   */
1327  private FSDataOutputStream create(Path f, FsPermission permission,
1328      boolean overwrite, boolean createParent, int bufferSize,
1329      short replication, long blockSize, Progressable progress,
1330      SelfRenewingLease parentFolderLease)
1331          throws IOException {
1332
1333    if (LOG.isDebugEnabled()) {
1334      LOG.debug("Creating file: " + f.toString());
1335    }
1336
1337    if (containsColon(f)) {
1338      throw new IOException("Cannot create file " + f
1339          + " through WASB that has colons in the name");
1340    }
1341
1342    Path absolutePath = makeAbsolute(f);
1343    String key = pathToKey(absolutePath);
1344
1345    FileMetadata existingMetadata = store.retrieveMetadata(key);
1346    if (existingMetadata != null) {
1347      if (existingMetadata.isDir()) {
1348        throw new IOException("Cannot create file " + f
1349            + "; already exists as a directory.");
1350      }
1351      if (!overwrite) {
1352        throw new IOException("File already exists:" + f);
1353      }
1354    }
1355
1356    Path parentFolder = absolutePath.getParent();
1357    if (parentFolder != null && parentFolder.getParent() != null) { // skip root
1358      // Update the parent folder last modified time if the parent folder
1359      // already exists.
1360      String parentKey = pathToKey(parentFolder);
1361      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
1362      if (parentMetadata != null && parentMetadata.isDir() &&
1363        parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
1364        if (parentFolderLease != null) {
1365          store.updateFolderLastModifiedTime(parentKey, parentFolderLease);
1366        } else {
1367          updateParentFolderLastModifiedTime(key);
1368        }
1369      } else {
1370        // Make sure that the parent folder exists.
1371        // Create it using inherited permissions from the first existing directory going up the path
1372        Path firstExisting = parentFolder.getParent();
1373        FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting));
1374        while(metadata == null) {
1375          // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata
1376          firstExisting = firstExisting.getParent();
1377          metadata = store.retrieveMetadata(pathToKey(firstExisting));
1378        }
1379        mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true);
1380      }
1381    }
1382
1383    // Mask the permission first (with the default permission mask as well).
1384    FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile);
1385    PermissionStatus permissionStatus = createPermissionStatus(masked);
1386
1387    OutputStream bufOutStream;
1388    if (store.isPageBlobKey(key)) {
1389      // Store page blobs directly in-place without renames.
1390      bufOutStream = store.storefile(key, permissionStatus);
1391    } else {
1392      // This is a block blob, so open the output blob stream based on the
1393      // encoded key.
1394      //
1395      String keyEncoded = encodeKey(key);
1396
1397
1398      // First create a blob at the real key, pointing back to the temporary file
1399      // This accomplishes a few things:
1400      // 1. Makes sure we can create a file there.
1401      // 2. Makes it visible to other concurrent threads/processes/nodes what
1402      // we're
1403      // doing.
1404      // 3. Makes it easier to restore/cleanup data in the event of us crashing.
1405      store.storeEmptyLinkFile(key, keyEncoded, permissionStatus);
1406
1407      // The key is encoded to point to a common container at the storage server.
1408      // This reduces the number of splits on the server side when load balancing.
1409      // Ingress to Azure storage can take advantage of earlier splits. We remove
1410      // the root path to the key and prefix a random GUID to the tail (or leaf
1411      // filename) of the key. Keys are thus broadly and randomly distributed over
1412      // a single container to ease load balancing on the storage server. When the
1413      // blob is committed it is renamed to its earlier key. Uncommitted blocks
1414      // are not cleaned up and we leave it to Azure storage to garbage collect
1415      // these
1416      // blocks.
1417      bufOutStream = new NativeAzureFsOutputStream(store.storefile(
1418          keyEncoded, permissionStatus), key, keyEncoded);
1419    }
1420    // Construct the data output stream from the buffered output stream.
1421    FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
1422
1423    
1424    // Increment the counter
1425    instrumentation.fileCreated();
1426    
1427    // Return data output stream to caller.
1428    return fsOut;
1429  }
1430
1431  @Override
1432  @Deprecated
1433  public boolean delete(Path path) throws IOException {
1434    return delete(path, true);
1435  }
1436
1437  @Override
1438  public boolean delete(Path f, boolean recursive) throws IOException {
1439    return delete(f, recursive, false);
1440  }
1441
1442  /**
1443   * Delete the specified file or folder. The parameter
1444   * skipParentFolderLastModifidedTimeUpdate
1445   * is used in the case of atomic folder rename redo. In that case, there is
1446   * a lease on the parent folder, so (without reworking the code) modifying
1447   * the parent folder update time will fail because of a conflict with the
1448   * lease. Since we are going to delete the folder soon anyway so accurate
1449   * modified time is not necessary, it's easier to just skip
1450   * the modified time update.
1451   *
1452   * @param f
1453   * @param recursive
1454   * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last
1455   * modified time.
1456   * @return true if and only if the file is deleted
1457   * @throws IOException
1458   */
1459  public boolean delete(Path f, boolean recursive,
1460      boolean skipParentFolderLastModifidedTimeUpdate) throws IOException {
1461
1462    if (LOG.isDebugEnabled()) {
1463      LOG.debug("Deleting file: " + f.toString());
1464    }
1465
1466    Path absolutePath = makeAbsolute(f);
1467    String key = pathToKey(absolutePath);
1468
1469    // Capture the metadata for the path.
1470    //
1471    FileMetadata metaFile = store.retrieveMetadata(key);
1472
1473    if (null == metaFile) {
1474      // The path to be deleted does not exist.
1475      return false;
1476    }
1477
1478    // The path exists, determine if it is a folder containing objects,
1479    // an empty folder, or a simple file and take the appropriate actions.
1480    if (!metaFile.isDir()) {
1481      // The path specifies a file. We need to check the parent path
1482      // to make sure it's a proper materialized directory before we
1483      // delete the file. Otherwise we may get into a situation where
1484      // the file we were deleting was the last one in an implicit directory
1485      // (e.g. the blob store only contains the blob a/b and there's no
1486      // corresponding directory blob a) and that would implicitly delete
1487      // the directory as well, which is not correct.
1488      Path parentPath = absolutePath.getParent();
1489      if (parentPath.getParent() != null) {// Not root
1490        String parentKey = pathToKey(parentPath);
1491        FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
1492        if (!parentMetadata.isDir()) {
1493          // Invalid state: the parent path is actually a file. Throw.
1494          throw new AzureException("File " + f + " has a parent directory "
1495              + parentPath + " which is also a file. Can't resolve.");
1496        }
1497        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
1498          if (LOG.isDebugEnabled()) {
1499            LOG.debug("Found an implicit parent directory while trying to"
1500                + " delete the file " + f + ". Creating the directory blob for"
1501                + " it in " + parentKey + ".");
1502          }
1503          store.storeEmptyFolder(parentKey,
1504              createPermissionStatus(FsPermission.getDefault()));
1505        } else {
1506          if (!skipParentFolderLastModifidedTimeUpdate) {
1507            updateParentFolderLastModifiedTime(key);
1508          }
1509        }
1510      }
1511      store.delete(key);
1512      instrumentation.fileDeleted();
1513    } else {
1514      // The path specifies a folder. Recursively delete all entries under the
1515      // folder.
1516      Path parentPath = absolutePath.getParent();
1517      if (parentPath.getParent() != null) {
1518        String parentKey = pathToKey(parentPath);
1519        FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
1520
1521        if (parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
1522          if (LOG.isDebugEnabled()) {
1523            LOG.debug("Found an implicit parent directory while trying to"
1524                + " delete the directory " + f
1525                + ". Creating the directory blob for" + " it in " + parentKey
1526                + ".");
1527          }
1528          store.storeEmptyFolder(parentKey,
1529              createPermissionStatus(FsPermission.getDefault()));
1530        }
1531      }
1532
1533      // List all the blobs in the current folder.
1534      String priorLastKey = null;
1535      PartialListing listing = store.listAll(key, AZURE_LIST_ALL, 1,
1536          priorLastKey);
1537      FileMetadata[] contents = listing.getFiles();
1538      if (!recursive && contents.length > 0) {
1539        // The folder is non-empty and recursive delete was not specified.
1540        // Throw an exception indicating that a non-recursive delete was
1541        // specified for a non-empty folder.
1542        throw new IOException("Non-recursive delete of non-empty directory "
1543            + f.toString());
1544      }
1545
1546      // Delete all the files in the folder.
1547      for (FileMetadata p : contents) {
1548        // Tag on the directory name found as the suffix of the suffix of the
1549        // parent directory to get the new absolute path.
1550        String suffix = p.getKey().substring(
1551            p.getKey().lastIndexOf(PATH_DELIMITER));
1552        if (!p.isDir()) {
1553          store.delete(key + suffix);
1554          instrumentation.fileDeleted();
1555        } else {
1556          // Recursively delete contents of the sub-folders. Notice this also
1557          // deletes the blob for the directory.
1558          if (!delete(new Path(f.toString() + suffix), true)) {
1559            return false;
1560          }
1561        }
1562      }
1563      store.delete(key);
1564
1565      // Update parent directory last modified time
1566      Path parent = absolutePath.getParent();
1567      if (parent != null && parent.getParent() != null) { // not root
1568        if (!skipParentFolderLastModifidedTimeUpdate) {
1569          updateParentFolderLastModifiedTime(key);
1570        }
1571      }
1572      instrumentation.directoryDeleted();
1573    }
1574
1575    // File or directory was successfully deleted.
1576    return true;
1577  }
1578
1579  @Override
1580  public FileStatus getFileStatus(Path f) throws IOException {
1581
1582    if (LOG.isDebugEnabled()) {
1583      LOG.debug("Getting the file status for " + f.toString());
1584    }
1585
1586    // Capture the absolute path and the path to key.
1587    Path absolutePath = makeAbsolute(f);
1588    String key = pathToKey(absolutePath);
1589    if (key.length() == 0) { // root always exists
1590      return newDirectory(null, absolutePath);
1591    }
1592
1593    // The path is either a folder or a file. Retrieve metadata to
1594    // determine if it is a directory or file.
1595    FileMetadata meta = store.retrieveMetadata(key);
1596    if (meta != null) {
1597      if (meta.isDir()) {
1598        // The path is a folder with files in it.
1599        //
1600        if (LOG.isDebugEnabled()) {
1601          LOG.debug("Path " + f.toString() + "is a folder.");
1602        }
1603
1604        // If a rename operation for the folder was pending, redo it.
1605        // Then the file does not exist, so signal that.
1606        if (conditionalRedoFolderRename(f)) {
1607          throw new FileNotFoundException(
1608              absolutePath + ": No such file or directory.");
1609        }
1610
1611        // Return reference to the directory object.
1612        return newDirectory(meta, absolutePath);
1613      }
1614
1615      // The path is a file.
1616      if (LOG.isDebugEnabled()) {
1617        LOG.debug("Found the path: " + f.toString() + " as a file.");
1618      }
1619
1620      // Return with reference to a file object.
1621      return newFile(meta, absolutePath);
1622    }
1623
1624    // File not found. Throw exception no such file or directory.
1625    //
1626    throw new FileNotFoundException(
1627        absolutePath + ": No such file or directory.");
1628  }
1629
1630  // Return true if there is a rename pending and we redo it, otherwise false.
1631  private boolean conditionalRedoFolderRename(Path f) throws IOException {
1632
1633    // Can't rename /, so return immediately in that case.
1634    if (f.getName().equals("")) {
1635      return false;
1636    }
1637
1638    // Check if there is a -RenamePending.json file for this folder, and if so,
1639    // redo the rename.
1640    Path absoluteRenamePendingFile = renamePendingFilePath(f);
1641    if (exists(absoluteRenamePendingFile)) {
1642      FolderRenamePending pending =
1643          new FolderRenamePending(absoluteRenamePendingFile, this);
1644      pending.redo();
1645      return true;
1646    } else {
1647      return false;
1648    }
1649  }
1650
1651  // Return the path name that would be used for rename of folder with path f.
1652  private Path renamePendingFilePath(Path f) {
1653    Path absPath = makeAbsolute(f);
1654    String key = pathToKey(absPath);
1655    key += "-RenamePending.json";
1656    return keyToPath(key);
1657  }
1658
1659  @Override
1660  public URI getUri() {
1661    return uri;
1662  }
1663
1664  /**
1665   * Retrieve the status of a given path if it is a file, or of all the
1666   * contained files if it is a directory.
1667   */
1668  @Override
1669  public FileStatus[] listStatus(Path f) throws IOException {
1670
1671    if (LOG.isDebugEnabled()) {
1672      LOG.debug("Listing status for " + f.toString());
1673    }
1674
1675    Path absolutePath = makeAbsolute(f);
1676    String key = pathToKey(absolutePath);
1677    Set<FileStatus> status = new TreeSet<FileStatus>();
1678    FileMetadata meta = store.retrieveMetadata(key);
1679
1680    if (meta != null) {
1681      if (!meta.isDir()) {
1682        if (LOG.isDebugEnabled()) {
1683          LOG.debug("Found path as a file");
1684        }
1685        return new FileStatus[] { newFile(meta, absolutePath) };
1686      }
1687      String partialKey = null;
1688      PartialListing listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
1689
1690      // For any -RenamePending.json files in the listing,
1691      // push the rename forward.
1692      boolean renamed = conditionalRedoFolderRenames(listing);
1693
1694      // If any renames were redone, get another listing,
1695      // since the current one may have changed due to the redo.
1696      if (renamed) {
1697        listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
1698      }
1699
1700      for (FileMetadata fileMetadata : listing.getFiles()) {
1701        Path subpath = keyToPath(fileMetadata.getKey());
1702
1703        // Test whether the metadata represents a file or directory and
1704        // add the appropriate metadata object.
1705        //
1706        // Note: There was a very old bug here where directories were added
1707        // to the status set as files flattening out recursive listings
1708        // using "-lsr" down the file system hierarchy.
1709        if (fileMetadata.isDir()) {
1710          // Make sure we hide the temp upload folder
1711          if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) {
1712            // Don't expose that.
1713            continue;
1714          }
1715          status.add(newDirectory(fileMetadata, subpath));
1716        } else {
1717          status.add(newFile(fileMetadata, subpath));
1718        }
1719      }
1720      if (LOG.isDebugEnabled()) {
1721        LOG.debug("Found path as a directory with " + status.size()
1722            + " files in it.");
1723      }
1724    } else {
1725      // There is no metadata found for the path.
1726      if (LOG.isDebugEnabled()) {
1727        LOG.debug("Did not find any metadata for path: " + key);
1728      }
1729
1730      throw new FileNotFoundException("File" + f + " does not exist.");
1731    }
1732
1733    return status.toArray(new FileStatus[0]);
1734  }
1735
1736  // Redo any folder renames needed if there are rename pending files in the
1737  // directory listing. Return true if one or more redo operations were done.
1738  private boolean conditionalRedoFolderRenames(PartialListing listing)
1739      throws IllegalArgumentException, IOException {
1740    boolean renamed = false;
1741    for (FileMetadata fileMetadata : listing.getFiles()) {
1742      Path subpath = keyToPath(fileMetadata.getKey());
1743      if (isRenamePendingFile(subpath)) {
1744        FolderRenamePending pending =
1745            new FolderRenamePending(subpath, this);
1746        pending.redo();
1747        renamed = true;
1748      }
1749    }
1750    return renamed;
1751  }
1752
1753  // True if this is a folder rename pending file, else false.
1754  private boolean isRenamePendingFile(Path path) {
1755    return path.toString().endsWith(FolderRenamePending.SUFFIX);
1756  }
1757
1758  private FileStatus newFile(FileMetadata meta, Path path) {
1759    return new FileStatus (
1760        meta.getLength(),
1761        false,
1762        1,
1763        blockSize,
1764        meta.getLastModified(),
1765        0,
1766        meta.getPermissionStatus().getPermission(),
1767        meta.getPermissionStatus().getUserName(),
1768        meta.getPermissionStatus().getGroupName(),
1769        path.makeQualified(getUri(), getWorkingDirectory()));
1770  }
1771
1772  private FileStatus newDirectory(FileMetadata meta, Path path) {
1773    return new FileStatus (
1774        0,
1775        true,
1776        1,
1777        blockSize,
1778        meta == null ? 0 : meta.getLastModified(),
1779        0,
1780        meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(),
1781        meta == null ? "" : meta.getPermissionStatus().getUserName(),
1782        meta == null ? "" : meta.getPermissionStatus().getGroupName(),
1783        path.makeQualified(getUri(), getWorkingDirectory()));
1784  }
1785
1786  private static enum UMaskApplyMode {
1787    NewFile,
1788    NewDirectory,
1789    NewDirectoryNoUmask,
1790    ChangeExistingFile,
1791    ChangeExistingDirectory,
1792  }
1793
1794  /**
1795   * Applies the applicable UMASK's on the given permission.
1796   * 
1797   * @param permission
1798   *          The permission to mask.
1799   * @param applyMode
1800   *          Whether to also apply the default umask.
1801   * @return The masked persmission.
1802   */
1803  private FsPermission applyUMask(final FsPermission permission,
1804      final UMaskApplyMode applyMode) {
1805    FsPermission newPermission = new FsPermission(permission);
1806    // Apply the default umask - this applies for new files or directories.
1807    if (applyMode == UMaskApplyMode.NewFile
1808        || applyMode == UMaskApplyMode.NewDirectory) {
1809      newPermission = newPermission
1810          .applyUMask(FsPermission.getUMask(getConf()));
1811    }
1812    return newPermission;
1813  }
1814
1815  /**
1816   * Creates the PermissionStatus object to use for the given permission, based
1817   * on the current user in context.
1818   * 
1819   * @param permission
1820   *          The permission for the file.
1821   * @return The permission status object to use.
1822   * @throws IOException
1823   *           If login fails in getCurrentUser
1824   */
1825  private PermissionStatus createPermissionStatus(FsPermission permission)
1826      throws IOException {
1827    // Create the permission status for this file based on current user
1828    return new PermissionStatus(
1829        UserGroupInformation.getCurrentUser().getShortUserName(),
1830        getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME,
1831            AZURE_DEFAULT_GROUP_DEFAULT),
1832        permission);
1833  }
1834
1835  @Override
1836  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
1837      return mkdirs(f, permission, false);
1838  }
1839
1840  public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException {
1841    if (LOG.isDebugEnabled()) {
1842      LOG.debug("Creating directory: " + f.toString());
1843    }
1844
1845    if (containsColon(f)) {
1846      throw new IOException("Cannot create directory " + f
1847          + " through WASB that has colons in the name");
1848    }
1849
1850    Path absolutePath = makeAbsolute(f);
1851    PermissionStatus permissionStatus = null;
1852    if(noUmask) {
1853      // ensure owner still has wx permissions at the minimum
1854      permissionStatus = createPermissionStatus(
1855          applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)),
1856              UMaskApplyMode.NewDirectoryNoUmask));
1857    } else {
1858      permissionStatus = createPermissionStatus(
1859          applyUMask(permission, UMaskApplyMode.NewDirectory));
1860    }
1861
1862
1863    ArrayList<String> keysToCreateAsFolder = new ArrayList<String>();
1864    ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>();
1865    boolean childCreated = false;
1866    // Check that there is no file in the parent chain of the given path.
1867    for (Path current = absolutePath, parent = current.getParent();
1868        parent != null; // Stop when you get to the root
1869        current = parent, parent = current.getParent()) {
1870      String currentKey = pathToKey(current);
1871      FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
1872      if (currentMetadata != null && !currentMetadata.isDir()) {
1873        throw new IOException("Cannot create directory " + f + " because " +
1874            current + " is an existing file.");
1875      } else if (currentMetadata == null) {
1876        keysToCreateAsFolder.add(currentKey);
1877        childCreated = true;
1878      } else {
1879        // The directory already exists. Its last modified time need to be
1880        // updated if there is a child directory created under it.
1881        if (childCreated) {
1882          keysToUpdateAsFolder.add(currentKey);
1883        }
1884        childCreated = false;
1885      }
1886    }
1887
1888    for (String currentKey : keysToCreateAsFolder) {
1889      store.storeEmptyFolder(currentKey, permissionStatus);
1890    }
1891
1892    instrumentation.directoryCreated();
1893
1894    // otherwise throws exception
1895    return true;
1896  }
1897
1898  @Override
1899  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
1900    if (LOG.isDebugEnabled()) {
1901      LOG.debug("Opening file: " + f.toString());
1902    }
1903
1904    Path absolutePath = makeAbsolute(f);
1905    String key = pathToKey(absolutePath);
1906    FileMetadata meta = store.retrieveMetadata(key);
1907    if (meta == null) {
1908      throw new FileNotFoundException(f.toString());
1909    }
1910    if (meta.isDir()) {
1911      throw new FileNotFoundException(f.toString()
1912          + " is a directory not a file.");
1913    }
1914
1915    return new FSDataInputStream(new BufferedFSInputStream(
1916        new NativeAzureFsInputStream(store.retrieve(key), key, meta.getLength()), bufferSize));
1917  }
1918
1919  @Override
1920  public boolean rename(Path src, Path dst) throws IOException {
1921
1922    FolderRenamePending renamePending = null;
1923
1924    if (LOG.isDebugEnabled()) {
1925      LOG.debug("Moving " + src + " to " + dst);
1926    }
1927
1928    if (containsColon(dst)) {
1929      throw new IOException("Cannot rename to file " + dst
1930          + " through WASB that has colons in the name");
1931    }
1932
1933    String srcKey = pathToKey(makeAbsolute(src));
1934
1935    if (srcKey.length() == 0) {
1936      // Cannot rename root of file system
1937      return false;
1938    }
1939
1940    // Figure out the final destination
1941    Path absoluteDst = makeAbsolute(dst);
1942    String dstKey = pathToKey(absoluteDst);
1943    FileMetadata dstMetadata = store.retrieveMetadata(dstKey);
1944    if (dstMetadata != null && dstMetadata.isDir()) {
1945      // It's an existing directory.
1946      dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
1947      if (LOG.isDebugEnabled()) {
1948        LOG.debug("Destination " + dst
1949            + " is a directory, adjusted the destination to be " + dstKey);
1950      }
1951    } else if (dstMetadata != null) {
1952      // Attempting to overwrite a file using rename()
1953      if (LOG.isDebugEnabled()) {
1954        LOG.debug("Destination " + dst
1955            + " is an already existing file, failing the rename.");
1956      }
1957      return false;
1958    } else {
1959      // Check that the parent directory exists.
1960      FileMetadata parentOfDestMetadata =
1961          store.retrieveMetadata(pathToKey(absoluteDst.getParent()));
1962      if (parentOfDestMetadata == null) {
1963        if (LOG.isDebugEnabled()) {
1964          LOG.debug("Parent of the destination " + dst
1965              + " doesn't exist, failing the rename.");
1966        }
1967        return false;
1968      } else if (!parentOfDestMetadata.isDir()) {
1969        if (LOG.isDebugEnabled()) {
1970          LOG.debug("Parent of the destination " + dst
1971              + " is a file, failing the rename.");
1972        }
1973        return false;
1974      }
1975    }
1976    FileMetadata srcMetadata = store.retrieveMetadata(srcKey);
1977    if (srcMetadata == null) {
1978      // Source doesn't exist
1979      if (LOG.isDebugEnabled()) {
1980        LOG.debug("Source " + src + " doesn't exist, failing the rename.");
1981      }
1982      return false;
1983    } else if (!srcMetadata.isDir()) {
1984      if (LOG.isDebugEnabled()) {
1985        LOG.debug("Source " + src + " found as a file, renaming.");
1986      }
1987      store.rename(srcKey, dstKey);
1988    } else {
1989
1990      // Prepare for, execute and clean up after of all files in folder, and
1991      // the root file, and update the last modified time of the source and
1992      // target parent folders. The operation can be redone if it fails part
1993      // way through, by applying the "Rename Pending" file.
1994
1995      // The following code (internally) only does atomic rename preparation
1996      // and lease management for page blob folders, limiting the scope of the
1997      // operation to HBase log file folders, where atomic rename is required.
1998      // In the future, we could generalize it easily to all folders.
1999      renamePending = prepareAtomicFolderRename(srcKey, dstKey);
2000      renamePending.execute();
2001      if (LOG.isDebugEnabled()) {
2002        LOG.debug("Renamed " + src + " to " + dst + " successfully.");
2003      }
2004      renamePending.cleanup();
2005      return true;
2006    }
2007
2008    // Update the last-modified time of the parent folders of both source
2009    // and destination.
2010    updateParentFolderLastModifiedTime(srcKey);
2011    updateParentFolderLastModifiedTime(dstKey);
2012
2013    if (LOG.isDebugEnabled()) {
2014      LOG.debug("Renamed " + src + " to " + dst + " successfully.");
2015    }
2016    return true;
2017  }
2018
2019  /**
2020   * Update the last-modified time of the parent folder of the file
2021   * identified by key.
2022   * @param key
2023   * @throws IOException
2024   */
2025  private void updateParentFolderLastModifiedTime(String key)
2026      throws IOException {
2027    Path parent = makeAbsolute(keyToPath(key)).getParent();
2028    if (parent != null && parent.getParent() != null) { // not root
2029      String parentKey = pathToKey(parent);
2030
2031      // ensure the parent is a materialized folder
2032      FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
2033      // The metadata could be null if the implicit folder only contains a
2034      // single file. In this case, the parent folder no longer exists if the
2035      // file is renamed; so we can safely ignore the null pointer case.
2036      if (parentMetadata != null) {
2037        if (parentMetadata.isDir()
2038            && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2039          store.storeEmptyFolder(parentKey,
2040              createPermissionStatus(FsPermission.getDefault()));
2041        }
2042
2043        if (store.isAtomicRenameKey(parentKey)) {
2044          SelfRenewingLease lease = null;
2045          try {
2046            lease = leaseSourceFolder(parentKey);
2047            store.updateFolderLastModifiedTime(parentKey, lease);
2048          } catch (AzureException e) {
2049            String errorCode = "";
2050            try {
2051              StorageException e2 = (StorageException) e.getCause();
2052              errorCode = e2.getErrorCode();
2053            } catch (Exception e3) {
2054              // do nothing if cast fails
2055            }
2056            if (errorCode.equals("BlobNotFound")) {
2057              throw new FileNotFoundException("Folder does not exist: " + parentKey);
2058            }
2059            LOG.warn("Got unexpected exception trying to get lease on "
2060                + parentKey + ". " + e.getMessage());
2061            throw e;
2062          } finally {
2063            try {
2064              if (lease != null) {
2065                lease.free();
2066              }
2067            } catch (Exception e) {
2068              LOG.error("Unable to free lease on " + parentKey, e);
2069            }
2070          }
2071        } else {
2072          store.updateFolderLastModifiedTime(parentKey, null);
2073        }
2074      }
2075    }
2076  }
2077
2078  /**
2079   * If the source is a page blob folder,
2080   * prepare to rename this folder atomically. This means to get exclusive
2081   * access to the source folder, and record the actions to be performed for
2082   * this rename in a "Rename Pending" file. This code was designed to
2083   * meet the needs of HBase, which requires atomic rename of write-ahead log
2084   * (WAL) folders for correctness.
2085   *
2086   * Before calling this method, the caller must ensure that the source is a
2087   * folder.
2088   *
2089   * For non-page-blob directories, prepare the in-memory information needed,
2090   * but don't take the lease or write the redo file. This is done to limit the
2091   * scope of atomic folder rename to HBase, at least at the time of writing
2092   * this code.
2093   *
2094   * @param srcKey Source folder name.
2095   * @param dstKey Destination folder name.
2096   * @throws IOException
2097   */
2098  private FolderRenamePending prepareAtomicFolderRename(
2099      String srcKey, String dstKey) throws IOException {
2100
2101    if (store.isAtomicRenameKey(srcKey)) {
2102
2103      // Block unwanted concurrent access to source folder.
2104      SelfRenewingLease lease = leaseSourceFolder(srcKey);
2105
2106      // Prepare in-memory information needed to do or redo a folder rename.
2107      FolderRenamePending renamePending =
2108          new FolderRenamePending(srcKey, dstKey, lease, this);
2109
2110      // Save it to persistent storage to help recover if the operation fails.
2111      renamePending.writeFile(this);
2112      return renamePending;
2113    } else {
2114      FolderRenamePending renamePending =
2115          new FolderRenamePending(srcKey, dstKey, null, this);
2116      return renamePending;
2117    }
2118  }
2119
2120  /**
2121   * Get a self-renewing Azure blob lease on the source folder zero-byte file.
2122   */
2123  private SelfRenewingLease leaseSourceFolder(String srcKey)
2124      throws AzureException {
2125    return store.acquireLease(srcKey);
2126  }
2127
2128  /**
2129   * Return an array containing hostnames, offset and size of
2130   * portions of the given file. For WASB we'll just lie and give
2131   * fake hosts to make sure we get many splits in MR jobs.
2132   */
2133  @Override
2134  public BlockLocation[] getFileBlockLocations(FileStatus file,
2135      long start, long len) throws IOException {
2136    if (file == null) {
2137      return null;
2138    }
2139
2140    if ((start < 0) || (len < 0)) {
2141      throw new IllegalArgumentException("Invalid start or len parameter");
2142    }
2143
2144    if (file.getLen() < start) {
2145      return new BlockLocation[0];
2146    }
2147    final String blobLocationHost = getConf().get(
2148        AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
2149        AZURE_BLOCK_LOCATION_HOST_DEFAULT);
2150    final String[] name = { blobLocationHost };
2151    final String[] host = { blobLocationHost };
2152    long blockSize = file.getBlockSize();
2153    if (blockSize <= 0) {
2154      throw new IllegalArgumentException(
2155          "The block size for the given file is not a positive number: "
2156              + blockSize);
2157    }
2158    int numberOfLocations = (int) (len / blockSize)
2159        + ((len % blockSize == 0) ? 0 : 1);
2160    BlockLocation[] locations = new BlockLocation[numberOfLocations];
2161    for (int i = 0; i < locations.length; i++) {
2162      long currentOffset = start + (i * blockSize);
2163      long currentLength = Math.min(blockSize, start + len - currentOffset);
2164      locations[i] = new BlockLocation(name, host, currentOffset, currentLength);
2165    }
2166    return locations;
2167  }
2168
2169  /**
2170   * Set the working directory to the given directory.
2171   */
2172  @Override
2173  public void setWorkingDirectory(Path newDir) {
2174    workingDir = makeAbsolute(newDir);
2175  }
2176
2177  @Override
2178  public Path getWorkingDirectory() {
2179    return workingDir;
2180  }
2181
2182  @Override
2183  public void setPermission(Path p, FsPermission permission) throws IOException {
2184    Path absolutePath = makeAbsolute(p);
2185    String key = pathToKey(absolutePath);
2186    FileMetadata metadata = store.retrieveMetadata(key);
2187    if (metadata == null) {
2188      throw new FileNotFoundException("File doesn't exist: " + p);
2189    }
2190    permission = applyUMask(permission,
2191        metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory
2192            : UMaskApplyMode.ChangeExistingFile);
2193    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2194      // It's an implicit folder, need to materialize it.
2195      store.storeEmptyFolder(key, createPermissionStatus(permission));
2196    } else if (!metadata.getPermissionStatus().getPermission().
2197        equals(permission)) {
2198      store.changePermissionStatus(key, new PermissionStatus(
2199          metadata.getPermissionStatus().getUserName(),
2200          metadata.getPermissionStatus().getGroupName(),
2201          permission));
2202    }
2203  }
2204
2205  @Override
2206  public void setOwner(Path p, String username, String groupname)
2207      throws IOException {
2208    Path absolutePath = makeAbsolute(p);
2209    String key = pathToKey(absolutePath);
2210    FileMetadata metadata = store.retrieveMetadata(key);
2211    if (metadata == null) {
2212      throw new FileNotFoundException("File doesn't exist: " + p);
2213    }
2214    PermissionStatus newPermissionStatus = new PermissionStatus(
2215        username == null ?
2216            metadata.getPermissionStatus().getUserName() : username,
2217        groupname == null ?
2218            metadata.getPermissionStatus().getGroupName() : groupname,
2219        metadata.getPermissionStatus().getPermission());
2220    if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
2221      // It's an implicit folder, need to materialize it.
2222      store.storeEmptyFolder(key, newPermissionStatus);
2223    } else {
2224      store.changePermissionStatus(key, newPermissionStatus);
2225    }
2226  }
2227
2228  @Override
2229  public synchronized void close() throws IOException {
2230    if (isClosed) {
2231      return;
2232    }
2233
2234    // Call the base close() to close any resources there.
2235    super.close();
2236    // Close the store to close any resources there - e.g. the bandwidth
2237    // updater thread would be stopped at this time.
2238    store.close();
2239    // Notify the metrics system that this file system is closed, which may
2240    // trigger one final metrics push to get the accurate final file system
2241    // metrics out.
2242
2243    long startTime = System.currentTimeMillis();
2244
2245    if(!getConf().getBoolean(SKIP_AZURE_METRICS_PROPERTY_NAME, false)) {
2246      AzureFileSystemMetricsSystem.unregisterSource(metricsSourceName);
2247      AzureFileSystemMetricsSystem.fileSystemClosed();
2248    }
2249
2250    if (LOG.isDebugEnabled()) {
2251        LOG.debug("Submitting metrics when file system closed took "
2252                + (System.currentTimeMillis() - startTime) + " ms.");
2253    }
2254    isClosed = true;
2255  }
2256
2257  /**
2258   * A handler that defines what to do with blobs whose upload was
2259   * interrupted.
2260   */
2261  private abstract class DanglingFileHandler {
2262    abstract void handleFile(FileMetadata file, FileMetadata tempFile)
2263      throws IOException;
2264  }
2265
2266  /**
2267   * Handler implementation for just deleting dangling files and cleaning
2268   * them up.
2269   */
2270  private class DanglingFileDeleter extends DanglingFileHandler {
2271    @Override
2272    void handleFile(FileMetadata file, FileMetadata tempFile)
2273        throws IOException {
2274      if (LOG.isDebugEnabled()) {
2275        LOG.debug("Deleting dangling file " + file.getKey());
2276      }
2277      store.delete(file.getKey());
2278      store.delete(tempFile.getKey());
2279    }
2280  }
2281
2282  /**
2283   * Handler implementation for just moving dangling files to recovery
2284   * location (/lost+found).
2285   */
2286  private class DanglingFileRecoverer extends DanglingFileHandler {
2287    private final Path destination;
2288
2289    DanglingFileRecoverer(Path destination) {
2290      this.destination = destination;
2291    }
2292
2293    @Override
2294    void handleFile(FileMetadata file, FileMetadata tempFile)
2295        throws IOException {
2296      if (LOG.isDebugEnabled()) {
2297        LOG.debug("Recovering " + file.getKey());
2298      }
2299      // Move to the final destination
2300      String finalDestinationKey =
2301          pathToKey(new Path(destination, file.getKey()));
2302      store.rename(tempFile.getKey(), finalDestinationKey);
2303      if (!finalDestinationKey.equals(file.getKey())) {
2304        // Delete the empty link file now that we've restored it.
2305        store.delete(file.getKey());
2306      }
2307    }
2308  }
2309
2310  /**
2311   * Check if a path has colons in its name
2312   */
2313  private boolean containsColon(Path p) {
2314    return p.toUri().getPath().toString().contains(":");
2315  }
2316
2317  /**
2318   * Implements recover and delete (-move and -delete) behaviors for handling
2319   * dangling files (blobs whose upload was interrupted).
2320   * 
2321   * @param root
2322   *          The root path to check from.
2323   * @param handler
2324   *          The handler that deals with dangling files.
2325   */
2326  private void handleFilesWithDanglingTempData(Path root,
2327      DanglingFileHandler handler) throws IOException {
2328    // Calculate the cut-off for when to consider a blob to be dangling.
2329    long cutoffForDangling = new Date().getTime()
2330        - getConf().getInt(AZURE_TEMP_EXPIRY_PROPERTY_NAME,
2331            AZURE_TEMP_EXPIRY_DEFAULT) * 1000;
2332    // Go over all the blobs under the given root and look for blobs to
2333    // recover.
2334    String priorLastKey = null;
2335    do {
2336      PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL,
2337          AZURE_UNBOUNDED_DEPTH, priorLastKey);
2338
2339      for (FileMetadata file : listing.getFiles()) {
2340        if (!file.isDir()) { // We don't recover directory blobs
2341          // See if this blob has a link in it (meaning it's a place-holder
2342          // blob for when the upload to the temp blob is complete).
2343          String link = store.getLinkInFileMetadata(file.getKey());
2344          if (link != null) {
2345            // It has a link, see if the temp blob it is pointing to is
2346            // existent and old enough to be considered dangling.
2347            FileMetadata linkMetadata = store.retrieveMetadata(link);
2348            if (linkMetadata != null
2349                && linkMetadata.getLastModified() >= cutoffForDangling) {
2350              // Found one!
2351              handler.handleFile(file, linkMetadata);
2352            }
2353          }
2354        }
2355      }
2356      priorLastKey = listing.getPriorLastKey();
2357    } while (priorLastKey != null);
2358  }
2359
2360  /**
2361   * Looks under the given root path for any blob that are left "dangling",
2362   * meaning that they are place-holder blobs that we created while we upload
2363   * the data to a temporary blob, but for some reason we crashed in the middle
2364   * of the upload and left them there. If any are found, we move them to the
2365   * destination given.
2366   * 
2367   * @param root
2368   *          The root path to consider.
2369   * @param destination
2370   *          The destination path to move any recovered files to.
2371   * @throws IOException
2372   */
2373  public void recoverFilesWithDanglingTempData(Path root, Path destination)
2374      throws IOException {
2375    if (LOG.isDebugEnabled()) {
2376      LOG.debug("Recovering files with dangling temp data in " + root);
2377    }
2378    handleFilesWithDanglingTempData(root,
2379        new DanglingFileRecoverer(destination));
2380  }
2381
2382  /**
2383   * Looks under the given root path for any blob that are left "dangling",
2384   * meaning that they are place-holder blobs that we created while we upload
2385   * the data to a temporary blob, but for some reason we crashed in the middle
2386   * of the upload and left them there. If any are found, we delete them.
2387   * 
2388   * @param root
2389   *          The root path to consider.
2390   * @throws IOException
2391   */
2392  public void deleteFilesWithDanglingTempData(Path root) throws IOException {
2393    if (LOG.isDebugEnabled()) {
2394      LOG.debug("Deleting files with dangling temp data in " + root);
2395    }
2396    handleFilesWithDanglingTempData(root, new DanglingFileDeleter());
2397  }
2398
2399  @Override
2400  protected void finalize() throws Throwable {
2401    LOG.debug("finalize() called.");
2402    close();
2403    super.finalize();
2404  }
2405
2406  /**
2407   * Encode the key with a random prefix for load balancing in Azure storage.
2408   * Upload data to a random temporary file then do storage side renaming to
2409   * recover the original key.
2410   * 
2411   * @param aKey
2412   * @return Encoded version of the original key.
2413   */
2414  private static String encodeKey(String aKey) {
2415    // Get the tail end of the key name.
2416    //
2417    String fileName = aKey.substring(aKey.lastIndexOf(Path.SEPARATOR) + 1,
2418        aKey.length());
2419
2420    // Construct the randomized prefix of the file name. The prefix ensures the
2421    // file always drops into the same folder but with a varying tail key name.
2422    String filePrefix = AZURE_TEMP_FOLDER + Path.SEPARATOR
2423        + UUID.randomUUID().toString();
2424
2425    // Concatenate the randomized prefix with the tail of the key name.
2426    String randomizedKey = filePrefix + fileName;
2427
2428    // Return to the caller with the randomized key.
2429    return randomizedKey;
2430  }
2431}