001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.metrics2.lib; 020 021import static org.apache.hadoop.metrics2.lib.Interns.info; 022 023import java.util.Map; 024import java.util.concurrent.Executors; 025import java.util.concurrent.ScheduledExecutorService; 026import java.util.concurrent.TimeUnit; 027 028import org.apache.commons.lang.StringUtils; 029import org.apache.hadoop.classification.InterfaceAudience; 030import org.apache.hadoop.classification.InterfaceStability; 031import org.apache.hadoop.metrics2.MetricsInfo; 032import org.apache.hadoop.metrics2.MetricsRecordBuilder; 033import org.apache.hadoop.metrics2.util.Quantile; 034import org.apache.hadoop.metrics2.util.QuantileEstimator; 035import org.apache.hadoop.metrics2.util.SampleQuantiles; 036 037import com.google.common.annotations.VisibleForTesting; 038import com.google.common.util.concurrent.ThreadFactoryBuilder; 039 040/** 041 * Watches a stream of long values, maintaining online estimates of specific 042 * quantiles with provably low error bounds. This is particularly useful for 043 * accurate high-percentile (e.g. 95th, 99th) latency metrics. 044 */ 045@InterfaceAudience.Public 046@InterfaceStability.Evolving 047public class MutableQuantiles extends MutableMetric { 048 049 @VisibleForTesting 050 public static final Quantile[] quantiles = { new Quantile(0.50, 0.050), 051 new Quantile(0.75, 0.025), new Quantile(0.90, 0.010), 052 new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) }; 053 054 private final MetricsInfo numInfo; 055 private final MetricsInfo[] quantileInfos; 056 private final int interval; 057 058 private QuantileEstimator estimator; 059 private long previousCount = 0; 060 061 @VisibleForTesting 062 protected Map<Quantile, Long> previousSnapshot = null; 063 064 private static final ScheduledExecutorService scheduler = Executors 065 .newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true) 066 .setNameFormat("MutableQuantiles-%d").build()); 067 068 /** 069 * Instantiates a new {@link MutableQuantiles} for a metric that rolls itself 070 * over on the specified time interval. 071 * 072 * @param name 073 * of the metric 074 * @param description 075 * long-form textual description of the metric 076 * @param sampleName 077 * type of items in the stream (e.g., "Ops") 078 * @param valueName 079 * type of the values 080 * @param interval 081 * rollover interval (in seconds) of the estimator 082 */ 083 public MutableQuantiles(String name, String description, String sampleName, 084 String valueName, int interval) { 085 String ucName = StringUtils.capitalize(name); 086 String usName = StringUtils.capitalize(sampleName); 087 String uvName = StringUtils.capitalize(valueName); 088 String desc = StringUtils.uncapitalize(description); 089 String lsName = StringUtils.uncapitalize(sampleName); 090 String lvName = StringUtils.uncapitalize(valueName); 091 092 numInfo = info(ucName + "Num" + usName, String.format( 093 "Number of %s for %s with %ds interval", lsName, desc, interval)); 094 // Construct the MetricsInfos for the quantiles, converting to percentiles 095 quantileInfos = new MetricsInfo[quantiles.length]; 096 String nameTemplate = ucName + "%dthPercentile" + uvName; 097 String descTemplate = "%d percentile " + lvName + " with " + interval 098 + " second interval for " + desc; 099 for (int i = 0; i < quantiles.length; i++) { 100 int percentile = (int) (100 * quantiles[i].quantile); 101 quantileInfos[i] = info(String.format(nameTemplate, percentile), 102 String.format(descTemplate, percentile)); 103 } 104 105 estimator = new SampleQuantiles(quantiles); 106 107 this.interval = interval; 108 scheduler.scheduleAtFixedRate(new RolloverSample(this), interval, interval, 109 TimeUnit.SECONDS); 110 } 111 112 @Override 113 public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) { 114 if (all || changed()) { 115 builder.addGauge(numInfo, previousCount); 116 for (int i = 0; i < quantiles.length; i++) { 117 long newValue = 0; 118 // If snapshot is null, we failed to update since the window was empty 119 if (previousSnapshot != null) { 120 newValue = previousSnapshot.get(quantiles[i]); 121 } 122 builder.addGauge(quantileInfos[i], newValue); 123 } 124 if (changed()) { 125 clearChanged(); 126 } 127 } 128 } 129 130 public synchronized void add(long value) { 131 estimator.insert(value); 132 } 133 134 public int getInterval() { 135 return interval; 136 } 137 138 public synchronized void setEstimator(QuantileEstimator quantileEstimator) { 139 this.estimator = quantileEstimator; 140 } 141 142 /** 143 * Runnable used to periodically roll over the internal 144 * {@link SampleQuantiles} every interval. 145 */ 146 private static class RolloverSample implements Runnable { 147 148 MutableQuantiles parent; 149 150 public RolloverSample(MutableQuantiles parent) { 151 this.parent = parent; 152 } 153 154 @Override 155 public void run() { 156 synchronized (parent) { 157 parent.previousCount = parent.estimator.getCount(); 158 parent.previousSnapshot = parent.estimator.snapshot(); 159 parent.estimator.clear(); 160 } 161 parent.setChanged(); 162 } 163 164 } 165}