001/* 002 * AbstractMetricsContext.java 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package org.apache.hadoop.metrics.spi; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.Timer; 033import java.util.TimerTask; 034import java.util.TreeMap; 035import java.util.Map.Entry; 036 037import org.apache.hadoop.classification.InterfaceAudience; 038import org.apache.hadoop.classification.InterfaceStability; 039import org.apache.hadoop.metrics.ContextFactory; 040import org.apache.hadoop.metrics.MetricsContext; 041import org.apache.hadoop.metrics.MetricsException; 042import org.apache.hadoop.metrics.MetricsRecord; 043import org.apache.hadoop.metrics.Updater; 044 045/** 046 * The main class of the Service Provider Interface. This class should be 047 * extended in order to integrate the Metrics API with a specific metrics 048 * client library. <p/> 049 * 050 * This class implements the internal table of metric data, and the timer 051 * on which data is to be sent to the metrics system. Subclasses must 052 * override the abstract <code>emitRecord</code> method in order to transmit 053 * the data. <p/> 054 */ 055@InterfaceAudience.Public 056@InterfaceStability.Evolving 057public abstract class AbstractMetricsContext implements MetricsContext { 058 059 private int period = MetricsContext.DEFAULT_PERIOD; 060 private Timer timer = null; 061 062 private Set<Updater> updaters = new HashSet<Updater>(1); 063 private volatile boolean isMonitoring = false; 064 065 private ContextFactory factory = null; 066 private String contextName = null; 067 068 @InterfaceAudience.Private 069 public static class TagMap extends TreeMap<String,Object> { 070 private static final long serialVersionUID = 3546309335061952993L; 071 TagMap() { 072 super(); 073 } 074 TagMap(TagMap orig) { 075 super(orig); 076 } 077 /** 078 * Returns true if this tagmap contains every tag in other. 079 */ 080 public boolean containsAll(TagMap other) { 081 for (Map.Entry<String,Object> entry : other.entrySet()) { 082 Object value = get(entry.getKey()); 083 if (value == null || !value.equals(entry.getValue())) { 084 // either key does not exist here, or the value is different 085 return false; 086 } 087 } 088 return true; 089 } 090 } 091 092 @InterfaceAudience.Private 093 public static class MetricMap extends TreeMap<String,Number> { 094 private static final long serialVersionUID = -7495051861141631609L; 095 MetricMap() { 096 super(); 097 } 098 MetricMap(MetricMap orig) { 099 super(orig); 100 } 101 } 102 103 static class RecordMap extends HashMap<TagMap,MetricMap> { 104 private static final long serialVersionUID = 259835619700264611L; 105 } 106 107 private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>(); 108 109 110 /** 111 * Creates a new instance of AbstractMetricsContext 112 */ 113 protected AbstractMetricsContext() { 114 } 115 116 /** 117 * Initializes the context. 118 */ 119 @Override 120 public void init(String contextName, ContextFactory factory) 121 { 122 this.contextName = contextName; 123 this.factory = factory; 124 } 125 126 /** 127 * Convenience method for subclasses to access factory attributes. 128 */ 129 protected String getAttribute(String attributeName) { 130 String factoryAttribute = contextName + "." + attributeName; 131 return (String) factory.getAttribute(factoryAttribute); 132 } 133 134 /** 135 * Returns an attribute-value map derived from the factory attributes 136 * by finding all factory attributes that begin with 137 * <i>contextName</i>.<i>tableName</i>. The returned map consists of 138 * those attributes with the contextName and tableName stripped off. 139 */ 140 protected Map<String,String> getAttributeTable(String tableName) { 141 String prefix = contextName + "." + tableName + "."; 142 Map<String,String> result = new HashMap<String,String>(); 143 for (String attributeName : factory.getAttributeNames()) { 144 if (attributeName.startsWith(prefix)) { 145 String name = attributeName.substring(prefix.length()); 146 String value = (String) factory.getAttribute(attributeName); 147 result.put(name, value); 148 } 149 } 150 return result; 151 } 152 153 /** 154 * Returns the context name. 155 */ 156 @Override 157 public String getContextName() { 158 return contextName; 159 } 160 161 /** 162 * Returns the factory by which this context was created. 163 */ 164 public ContextFactory getContextFactory() { 165 return factory; 166 } 167 168 /** 169 * Starts or restarts monitoring, the emitting of metrics records. 170 */ 171 @Override 172 public synchronized void startMonitoring() 173 throws IOException { 174 if (!isMonitoring) { 175 startTimer(); 176 isMonitoring = true; 177 } 178 } 179 180 /** 181 * Stops monitoring. This does not free buffered data. 182 * @see #close() 183 */ 184 @Override 185 public synchronized void stopMonitoring() { 186 if (isMonitoring) { 187 stopTimer(); 188 isMonitoring = false; 189 } 190 } 191 192 /** 193 * Returns true if monitoring is currently in progress. 194 */ 195 @Override 196 public boolean isMonitoring() { 197 return isMonitoring; 198 } 199 200 /** 201 * Stops monitoring and frees buffered data, returning this 202 * object to its initial state. 203 */ 204 @Override 205 public synchronized void close() { 206 stopMonitoring(); 207 clearUpdaters(); 208 } 209 210 /** 211 * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>. 212 * Throws an exception if the metrics implementation is configured with a fixed 213 * set of record names and <code>recordName</code> is not in that set. 214 * 215 * @param recordName the name of the record 216 * @throws MetricsException if recordName conflicts with configuration data 217 */ 218 @Override 219 public final synchronized MetricsRecord createRecord(String recordName) { 220 if (bufferedData.get(recordName) == null) { 221 bufferedData.put(recordName, new RecordMap()); 222 } 223 return newRecord(recordName); 224 } 225 226 /** 227 * Subclasses should override this if they subclass MetricsRecordImpl. 228 * @param recordName the name of the record 229 * @return newly created instance of MetricsRecordImpl or subclass 230 */ 231 protected MetricsRecord newRecord(String recordName) { 232 return new MetricsRecordImpl(recordName, this); 233 } 234 235 /** 236 * Registers a callback to be called at time intervals determined by 237 * the configuration. 238 * 239 * @param updater object to be run periodically; it should update 240 * some metrics records 241 */ 242 @Override 243 public synchronized void registerUpdater(final Updater updater) { 244 if (!updaters.contains(updater)) { 245 updaters.add(updater); 246 } 247 } 248 249 /** 250 * Removes a callback, if it exists. 251 * 252 * @param updater object to be removed from the callback list 253 */ 254 @Override 255 public synchronized void unregisterUpdater(Updater updater) { 256 updaters.remove(updater); 257 } 258 259 private synchronized void clearUpdaters() { 260 updaters.clear(); 261 } 262 263 /** 264 * Starts timer if it is not already started 265 */ 266 private synchronized void startTimer() { 267 if (timer == null) { 268 timer = new Timer("Timer thread for monitoring " + getContextName(), 269 true); 270 TimerTask task = new TimerTask() { 271 @Override 272 public void run() { 273 try { 274 timerEvent(); 275 } catch (IOException ioe) { 276 ioe.printStackTrace(); 277 } 278 } 279 }; 280 long millis = period * 1000; 281 timer.scheduleAtFixedRate(task, millis, millis); 282 } 283 } 284 285 /** 286 * Stops timer if it is running 287 */ 288 private synchronized void stopTimer() { 289 if (timer != null) { 290 timer.cancel(); 291 timer = null; 292 } 293 } 294 295 /** 296 * Timer callback. 297 */ 298 private void timerEvent() throws IOException { 299 if (isMonitoring) { 300 Collection<Updater> myUpdaters; 301 synchronized (this) { 302 myUpdaters = new ArrayList<Updater>(updaters); 303 } 304 // Run all the registered updates without holding a lock 305 // on this context 306 for (Updater updater : myUpdaters) { 307 try { 308 updater.doUpdates(this); 309 } catch (Throwable throwable) { 310 throwable.printStackTrace(); 311 } 312 } 313 emitRecords(); 314 } 315 } 316 317 /** 318 * Emits the records. 319 */ 320 private synchronized void emitRecords() throws IOException { 321 for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) { 322 RecordMap recordMap = recordEntry.getValue(); 323 synchronized (recordMap) { 324 Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet (); 325 for (Entry<TagMap, MetricMap> entry : entrySet) { 326 OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue()); 327 emitRecord(contextName, recordEntry.getKey(), outRec); 328 } 329 } 330 } 331 flush(); 332 } 333 334 /** 335 * Retrieves all the records managed by this MetricsContext. 336 * Useful for monitoring systems that are polling-based. 337 * @return A non-null collection of all monitoring records. 338 */ 339 @Override 340 public synchronized Map<String, Collection<OutputRecord>> getAllRecords() { 341 Map<String, Collection<OutputRecord>> out = new TreeMap<String, Collection<OutputRecord>>(); 342 for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) { 343 RecordMap recordMap = recordEntry.getValue(); 344 synchronized (recordMap) { 345 List<OutputRecord> records = new ArrayList<OutputRecord>(); 346 Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet(); 347 for (Entry<TagMap, MetricMap> entry : entrySet) { 348 OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue()); 349 records.add(outRec); 350 } 351 out.put(recordEntry.getKey(), records); 352 } 353 } 354 return out; 355 } 356 357 /** 358 * Sends a record to the metrics system. 359 */ 360 protected abstract void emitRecord(String contextName, String recordName, 361 OutputRecord outRec) throws IOException; 362 363 /** 364 * Called each period after all records have been emitted, this method does nothing. 365 * Subclasses may override it in order to perform some kind of flush. 366 */ 367 protected void flush() throws IOException { 368 } 369 370 /** 371 * Called by MetricsRecordImpl.update(). Creates or updates a row in 372 * the internal table of metric data. 373 */ 374 protected void update(MetricsRecordImpl record) { 375 String recordName = record.getRecordName(); 376 TagMap tagTable = record.getTagTable(); 377 Map<String,MetricValue> metricUpdates = record.getMetricTable(); 378 379 RecordMap recordMap = getRecordMap(recordName); 380 synchronized (recordMap) { 381 MetricMap metricMap = recordMap.get(tagTable); 382 if (metricMap == null) { 383 metricMap = new MetricMap(); 384 TagMap tagMap = new TagMap(tagTable); // clone tags 385 recordMap.put(tagMap, metricMap); 386 } 387 388 Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet(); 389 for (Entry<String, MetricValue> entry : entrySet) { 390 String metricName = entry.getKey (); 391 MetricValue updateValue = entry.getValue (); 392 Number updateNumber = updateValue.getNumber(); 393 Number currentNumber = metricMap.get(metricName); 394 if (currentNumber == null || updateValue.isAbsolute()) { 395 metricMap.put(metricName, updateNumber); 396 } 397 else { 398 Number newNumber = sum(updateNumber, currentNumber); 399 metricMap.put(metricName, newNumber); 400 } 401 } 402 } 403 } 404 405 private synchronized RecordMap getRecordMap(String recordName) { 406 return bufferedData.get(recordName); 407 } 408 409 /** 410 * Adds two numbers, coercing the second to the type of the first. 411 * 412 */ 413 private Number sum(Number a, Number b) { 414 if (a instanceof Integer) { 415 return Integer.valueOf(a.intValue() + b.intValue()); 416 } 417 else if (a instanceof Float) { 418 return new Float(a.floatValue() + b.floatValue()); 419 } 420 else if (a instanceof Short) { 421 return Short.valueOf((short)(a.shortValue() + b.shortValue())); 422 } 423 else if (a instanceof Byte) { 424 return Byte.valueOf((byte)(a.byteValue() + b.byteValue())); 425 } 426 else if (a instanceof Long) { 427 return Long.valueOf((a.longValue() + b.longValue())); 428 } 429 else { 430 // should never happen 431 throw new MetricsException("Invalid number type"); 432 } 433 434 } 435 436 /** 437 * Called by MetricsRecordImpl.remove(). Removes all matching rows in 438 * the internal table of metric data. A row matches if it has the same 439 * tag names and values as record, but it may also have additional 440 * tags. 441 */ 442 protected void remove(MetricsRecordImpl record) { 443 String recordName = record.getRecordName(); 444 TagMap tagTable = record.getTagTable(); 445 446 RecordMap recordMap = getRecordMap(recordName); 447 synchronized (recordMap) { 448 Iterator<TagMap> it = recordMap.keySet().iterator(); 449 while (it.hasNext()) { 450 TagMap rowTags = it.next(); 451 if (rowTags.containsAll(tagTable)) { 452 it.remove(); 453 } 454 } 455 } 456 } 457 458 /** 459 * Returns the timer period. 460 */ 461 @Override 462 public int getPeriod() { 463 return period; 464 } 465 466 /** 467 * Sets the timer period 468 */ 469 protected void setPeriod(int period) { 470 this.period = period; 471 } 472 473 /** 474 * If a period is set in the attribute passed in, override 475 * the default with it. 476 */ 477 protected void parseAndSetPeriod(String attributeName) { 478 String periodStr = getAttribute(attributeName); 479 if (periodStr != null) { 480 int period = 0; 481 try { 482 period = Integer.parseInt(periodStr); 483 } catch (NumberFormatException nfe) { 484 } 485 if (period <= 0) { 486 throw new MetricsException("Invalid period: " + periodStr); 487 } 488 setPeriod(period); 489 } 490 } 491}