001/* 002 * AbstractMetricsContext.java 003 * 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 */ 020 021package org.apache.hadoop.metrics.spi; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Set; 032import java.util.Timer; 033import java.util.TimerTask; 034import java.util.TreeMap; 035import java.util.Map.Entry; 036 037import org.apache.hadoop.classification.InterfaceAudience; 038import org.apache.hadoop.classification.InterfaceStability; 039import org.apache.hadoop.metrics.ContextFactory; 040import org.apache.hadoop.metrics.MetricsContext; 041import org.apache.hadoop.metrics.MetricsException; 042import org.apache.hadoop.metrics.MetricsRecord; 043import org.apache.hadoop.metrics.Updater; 044 045/** 046 * The main class of the Service Provider Interface. This class should be 047 * extended in order to integrate the Metrics API with a specific metrics 048 * client library. <p/> 049 * 050 * This class implements the internal table of metric data, and the timer 051 * on which data is to be sent to the metrics system. Subclasses must 052 * override the abstract <code>emitRecord</code> method in order to transmit 053 * the data. <p/> 054 * 055 * @deprecated Use org.apache.hadoop.metrics2 package instead. 056 */ 057@Deprecated 058@InterfaceAudience.Public 059@InterfaceStability.Evolving 060public abstract class AbstractMetricsContext implements MetricsContext { 061 062 private int period = MetricsContext.DEFAULT_PERIOD; 063 private Timer timer = null; 064 065 private Set<Updater> updaters = new HashSet<Updater>(1); 066 private volatile boolean isMonitoring = false; 067 068 private ContextFactory factory = null; 069 private String contextName = null; 070 071 @InterfaceAudience.Private 072 public static class TagMap extends TreeMap<String,Object> { 073 private static final long serialVersionUID = 3546309335061952993L; 074 TagMap() { 075 super(); 076 } 077 TagMap(TagMap orig) { 078 super(orig); 079 } 080 /** 081 * Returns true if this tagmap contains every tag in other. 082 */ 083 public boolean containsAll(TagMap other) { 084 for (Map.Entry<String,Object> entry : other.entrySet()) { 085 Object value = get(entry.getKey()); 086 if (value == null || !value.equals(entry.getValue())) { 087 // either key does not exist here, or the value is different 088 return false; 089 } 090 } 091 return true; 092 } 093 } 094 095 @InterfaceAudience.Private 096 public static class MetricMap extends TreeMap<String,Number> { 097 private static final long serialVersionUID = -7495051861141631609L; 098 MetricMap() { 099 super(); 100 } 101 MetricMap(MetricMap orig) { 102 super(orig); 103 } 104 } 105 106 static class RecordMap extends HashMap<TagMap,MetricMap> { 107 private static final long serialVersionUID = 259835619700264611L; 108 } 109 110 private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>(); 111 112 113 /** 114 * Creates a new instance of AbstractMetricsContext 115 */ 116 protected AbstractMetricsContext() { 117 } 118 119 /** 120 * Initializes the context. 121 */ 122 @Override 123 public void init(String contextName, ContextFactory factory) 124 { 125 this.contextName = contextName; 126 this.factory = factory; 127 } 128 129 /** 130 * Convenience method for subclasses to access factory attributes. 131 */ 132 protected String getAttribute(String attributeName) { 133 String factoryAttribute = contextName + "." + attributeName; 134 return (String) factory.getAttribute(factoryAttribute); 135 } 136 137 /** 138 * Returns an attribute-value map derived from the factory attributes 139 * by finding all factory attributes that begin with 140 * <i>contextName</i>.<i>tableName</i>. The returned map consists of 141 * those attributes with the contextName and tableName stripped off. 142 */ 143 protected Map<String,String> getAttributeTable(String tableName) { 144 String prefix = contextName + "." + tableName + "."; 145 Map<String,String> result = new HashMap<String,String>(); 146 for (String attributeName : factory.getAttributeNames()) { 147 if (attributeName.startsWith(prefix)) { 148 String name = attributeName.substring(prefix.length()); 149 String value = (String) factory.getAttribute(attributeName); 150 result.put(name, value); 151 } 152 } 153 return result; 154 } 155 156 /** 157 * Returns the context name. 158 */ 159 @Override 160 public String getContextName() { 161 return contextName; 162 } 163 164 /** 165 * Returns the factory by which this context was created. 166 */ 167 public ContextFactory getContextFactory() { 168 return factory; 169 } 170 171 /** 172 * Starts or restarts monitoring, the emitting of metrics records. 173 */ 174 @Override 175 public synchronized void startMonitoring() 176 throws IOException { 177 if (!isMonitoring) { 178 startTimer(); 179 isMonitoring = true; 180 } 181 } 182 183 /** 184 * Stops monitoring. This does not free buffered data. 185 * @see #close() 186 */ 187 @Override 188 public synchronized void stopMonitoring() { 189 if (isMonitoring) { 190 stopTimer(); 191 isMonitoring = false; 192 } 193 } 194 195 /** 196 * Returns true if monitoring is currently in progress. 197 */ 198 @Override 199 public boolean isMonitoring() { 200 return isMonitoring; 201 } 202 203 /** 204 * Stops monitoring and frees buffered data, returning this 205 * object to its initial state. 206 */ 207 @Override 208 public synchronized void close() { 209 stopMonitoring(); 210 clearUpdaters(); 211 } 212 213 /** 214 * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>. 215 * Throws an exception if the metrics implementation is configured with a fixed 216 * set of record names and <code>recordName</code> is not in that set. 217 * 218 * @param recordName the name of the record 219 * @throws MetricsException if recordName conflicts with configuration data 220 */ 221 @Override 222 public final synchronized MetricsRecord createRecord(String recordName) { 223 if (bufferedData.get(recordName) == null) { 224 bufferedData.put(recordName, new RecordMap()); 225 } 226 return newRecord(recordName); 227 } 228 229 /** 230 * Subclasses should override this if they subclass MetricsRecordImpl. 231 * @param recordName the name of the record 232 * @return newly created instance of MetricsRecordImpl or subclass 233 */ 234 protected MetricsRecord newRecord(String recordName) { 235 return new MetricsRecordImpl(recordName, this); 236 } 237 238 /** 239 * Registers a callback to be called at time intervals determined by 240 * the configuration. 241 * 242 * @param updater object to be run periodically; it should update 243 * some metrics records 244 */ 245 @Override 246 public synchronized void registerUpdater(final Updater updater) { 247 if (!updaters.contains(updater)) { 248 updaters.add(updater); 249 } 250 } 251 252 /** 253 * Removes a callback, if it exists. 254 * 255 * @param updater object to be removed from the callback list 256 */ 257 @Override 258 public synchronized void unregisterUpdater(Updater updater) { 259 updaters.remove(updater); 260 } 261 262 private synchronized void clearUpdaters() { 263 updaters.clear(); 264 } 265 266 /** 267 * Starts timer if it is not already started 268 */ 269 private synchronized void startTimer() { 270 if (timer == null) { 271 timer = new Timer("Timer thread for monitoring " + getContextName(), 272 true); 273 TimerTask task = new TimerTask() { 274 @Override 275 public void run() { 276 try { 277 timerEvent(); 278 } catch (IOException ioe) { 279 ioe.printStackTrace(); 280 } 281 } 282 }; 283 long millis = period * 1000; 284 timer.scheduleAtFixedRate(task, millis, millis); 285 } 286 } 287 288 /** 289 * Stops timer if it is running 290 */ 291 private synchronized void stopTimer() { 292 if (timer != null) { 293 timer.cancel(); 294 timer = null; 295 } 296 } 297 298 /** 299 * Timer callback. 300 */ 301 private void timerEvent() throws IOException { 302 if (isMonitoring) { 303 Collection<Updater> myUpdaters; 304 synchronized (this) { 305 myUpdaters = new ArrayList<Updater>(updaters); 306 } 307 // Run all the registered updates without holding a lock 308 // on this context 309 for (Updater updater : myUpdaters) { 310 try { 311 updater.doUpdates(this); 312 } catch (Throwable throwable) { 313 throwable.printStackTrace(); 314 } 315 } 316 emitRecords(); 317 } 318 } 319 320 /** 321 * Emits the records. 322 */ 323 private synchronized void emitRecords() throws IOException { 324 for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) { 325 RecordMap recordMap = recordEntry.getValue(); 326 synchronized (recordMap) { 327 Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet (); 328 for (Entry<TagMap, MetricMap> entry : entrySet) { 329 OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue()); 330 emitRecord(contextName, recordEntry.getKey(), outRec); 331 } 332 } 333 } 334 flush(); 335 } 336 337 /** 338 * Retrieves all the records managed by this MetricsContext. 339 * Useful for monitoring systems that are polling-based. 340 * @return A non-null collection of all monitoring records. 341 */ 342 @Override 343 public synchronized Map<String, Collection<OutputRecord>> getAllRecords() { 344 Map<String, Collection<OutputRecord>> out = new TreeMap<String, Collection<OutputRecord>>(); 345 for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) { 346 RecordMap recordMap = recordEntry.getValue(); 347 synchronized (recordMap) { 348 List<OutputRecord> records = new ArrayList<OutputRecord>(); 349 Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet(); 350 for (Entry<TagMap, MetricMap> entry : entrySet) { 351 OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue()); 352 records.add(outRec); 353 } 354 out.put(recordEntry.getKey(), records); 355 } 356 } 357 return out; 358 } 359 360 /** 361 * Sends a record to the metrics system. 362 */ 363 protected abstract void emitRecord(String contextName, String recordName, 364 OutputRecord outRec) throws IOException; 365 366 /** 367 * Called each period after all records have been emitted, this method does nothing. 368 * Subclasses may override it in order to perform some kind of flush. 369 */ 370 protected void flush() throws IOException { 371 } 372 373 /** 374 * Called by MetricsRecordImpl.update(). Creates or updates a row in 375 * the internal table of metric data. 376 */ 377 protected void update(MetricsRecordImpl record) { 378 String recordName = record.getRecordName(); 379 TagMap tagTable = record.getTagTable(); 380 Map<String,MetricValue> metricUpdates = record.getMetricTable(); 381 382 RecordMap recordMap = getRecordMap(recordName); 383 synchronized (recordMap) { 384 MetricMap metricMap = recordMap.get(tagTable); 385 if (metricMap == null) { 386 metricMap = new MetricMap(); 387 TagMap tagMap = new TagMap(tagTable); // clone tags 388 recordMap.put(tagMap, metricMap); 389 } 390 391 Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet(); 392 for (Entry<String, MetricValue> entry : entrySet) { 393 String metricName = entry.getKey (); 394 MetricValue updateValue = entry.getValue (); 395 Number updateNumber = updateValue.getNumber(); 396 Number currentNumber = metricMap.get(metricName); 397 if (currentNumber == null || updateValue.isAbsolute()) { 398 metricMap.put(metricName, updateNumber); 399 } 400 else { 401 Number newNumber = sum(updateNumber, currentNumber); 402 metricMap.put(metricName, newNumber); 403 } 404 } 405 } 406 } 407 408 private synchronized RecordMap getRecordMap(String recordName) { 409 return bufferedData.get(recordName); 410 } 411 412 /** 413 * Adds two numbers, coercing the second to the type of the first. 414 * 415 */ 416 private Number sum(Number a, Number b) { 417 if (a instanceof Integer) { 418 return Integer.valueOf(a.intValue() + b.intValue()); 419 } 420 else if (a instanceof Float) { 421 return new Float(a.floatValue() + b.floatValue()); 422 } 423 else if (a instanceof Short) { 424 return Short.valueOf((short)(a.shortValue() + b.shortValue())); 425 } 426 else if (a instanceof Byte) { 427 return Byte.valueOf((byte)(a.byteValue() + b.byteValue())); 428 } 429 else if (a instanceof Long) { 430 return Long.valueOf((a.longValue() + b.longValue())); 431 } 432 else { 433 // should never happen 434 throw new MetricsException("Invalid number type"); 435 } 436 437 } 438 439 /** 440 * Called by MetricsRecordImpl.remove(). Removes all matching rows in 441 * the internal table of metric data. A row matches if it has the same 442 * tag names and values as record, but it may also have additional 443 * tags. 444 */ 445 protected void remove(MetricsRecordImpl record) { 446 String recordName = record.getRecordName(); 447 TagMap tagTable = record.getTagTable(); 448 449 RecordMap recordMap = getRecordMap(recordName); 450 synchronized (recordMap) { 451 Iterator<TagMap> it = recordMap.keySet().iterator(); 452 while (it.hasNext()) { 453 TagMap rowTags = it.next(); 454 if (rowTags.containsAll(tagTable)) { 455 it.remove(); 456 } 457 } 458 } 459 } 460 461 /** 462 * Returns the timer period. 463 */ 464 @Override 465 public int getPeriod() { 466 return period; 467 } 468 469 /** 470 * Sets the timer period 471 */ 472 protected void setPeriod(int period) { 473 this.period = period; 474 } 475 476 /** 477 * If a period is set in the attribute passed in, override 478 * the default with it. 479 */ 480 protected void parseAndSetPeriod(String attributeName) { 481 String periodStr = getAttribute(attributeName); 482 if (periodStr != null) { 483 int period = 0; 484 try { 485 period = Integer.parseInt(periodStr); 486 } catch (NumberFormatException nfe) { 487 } 488 if (period <= 0) { 489 throw new MetricsException("Invalid period: " + periodStr); 490 } 491 setPeriod(period); 492 } 493 } 494}