001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.metrics2.lib;
020
021import static org.apache.hadoop.metrics2.lib.Interns.info;
022
023import java.util.Map;
024import java.util.concurrent.Executors;
025import java.util.concurrent.ScheduledExecutorService;
026import java.util.concurrent.TimeUnit;
027
028import org.apache.commons.lang.StringUtils;
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.metrics2.MetricsInfo;
032import org.apache.hadoop.metrics2.MetricsRecordBuilder;
033import org.apache.hadoop.metrics2.util.Quantile;
034import org.apache.hadoop.metrics2.util.QuantileEstimator;
035import org.apache.hadoop.metrics2.util.SampleQuantiles;
036
037import com.google.common.annotations.VisibleForTesting;
038import com.google.common.util.concurrent.ThreadFactoryBuilder;
039
040/**
041 * Watches a stream of long values, maintaining online estimates of specific
042 * quantiles with provably low error bounds. This is particularly useful for
043 * accurate high-percentile (e.g. 95th, 99th) latency metrics.
044 */
045@InterfaceAudience.Public
046@InterfaceStability.Evolving
047public class MutableQuantiles extends MutableMetric {
048
049  @VisibleForTesting
050  public static final Quantile[] quantiles = { new Quantile(0.50, 0.050),
051      new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
052      new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };
053
054  private final MetricsInfo numInfo;
055  private final MetricsInfo[] quantileInfos;
056  private final int interval;
057
058  private QuantileEstimator estimator;
059  private long previousCount = 0;
060
061  @VisibleForTesting
062  protected Map<Quantile, Long> previousSnapshot = null;
063
064  private static final ScheduledExecutorService scheduler = Executors
065      .newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true)
066          .setNameFormat("MutableQuantiles-%d").build());
067
068  /**
069   * Instantiates a new {@link MutableQuantiles} for a metric that rolls itself
070   * over on the specified time interval.
071   * 
072   * @param name
073   *          of the metric
074   * @param description
075   *          long-form textual description of the metric
076   * @param sampleName
077   *          type of items in the stream (e.g., "Ops")
078   * @param valueName
079   *          type of the values
080   * @param interval
081   *          rollover interval (in seconds) of the estimator
082   */
083  public MutableQuantiles(String name, String description, String sampleName,
084      String valueName, int interval) {
085    String ucName = StringUtils.capitalize(name);
086    String usName = StringUtils.capitalize(sampleName);
087    String uvName = StringUtils.capitalize(valueName);
088    String desc = StringUtils.uncapitalize(description);
089    String lsName = StringUtils.uncapitalize(sampleName);
090    String lvName = StringUtils.uncapitalize(valueName);
091
092    numInfo = info(ucName + "Num" + usName, String.format(
093        "Number of %s for %s with %ds interval", lsName, desc, interval));
094    // Construct the MetricsInfos for the quantiles, converting to percentiles
095    quantileInfos = new MetricsInfo[quantiles.length];
096    String nameTemplate = ucName + "%dthPercentile" + uvName;
097    String descTemplate = "%d percentile " + lvName + " with " + interval
098        + " second interval for " + desc;
099    for (int i = 0; i < quantiles.length; i++) {
100      int percentile = (int) (100 * quantiles[i].quantile);
101      quantileInfos[i] = info(String.format(nameTemplate, percentile),
102          String.format(descTemplate, percentile));
103    }
104
105    estimator = new SampleQuantiles(quantiles);
106
107    this.interval = interval;
108    scheduler.scheduleAtFixedRate(new RolloverSample(this), interval, interval,
109        TimeUnit.SECONDS);
110  }
111
112  @Override
113  public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
114    if (all || changed()) {
115      builder.addGauge(numInfo, previousCount);
116      for (int i = 0; i < quantiles.length; i++) {
117        long newValue = 0;
118        // If snapshot is null, we failed to update since the window was empty
119        if (previousSnapshot != null) {
120          newValue = previousSnapshot.get(quantiles[i]);
121        }
122        builder.addGauge(quantileInfos[i], newValue);
123      }
124      if (changed()) {
125        clearChanged();
126      }
127    }
128  }
129
130  public synchronized void add(long value) {
131    estimator.insert(value);
132  }
133
134  public int getInterval() {
135    return interval;
136  }
137
138  public synchronized void setEstimator(QuantileEstimator quantileEstimator) {
139    this.estimator = quantileEstimator;
140  }
141
142  /**
143   * Runnable used to periodically roll over the internal
144   * {@link SampleQuantiles} every interval.
145   */
146  private static class RolloverSample implements Runnable {
147
148    MutableQuantiles parent;
149
150    public RolloverSample(MutableQuantiles parent) {
151      this.parent = parent;
152    }
153
154    @Override
155    public void run() {
156      synchronized (parent) {
157        parent.previousCount = parent.estimator.getCount();
158        parent.previousSnapshot = parent.estimator.snapshot();
159        parent.estimator.clear();
160      }
161      parent.setChanged();
162    }
163
164  }
165}