001/*
002 * AbstractMetricsContext.java
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package org.apache.hadoop.metrics.spi;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.Timer;
033import java.util.TimerTask;
034import java.util.TreeMap;
035import java.util.Map.Entry;
036
037import org.apache.hadoop.classification.InterfaceAudience;
038import org.apache.hadoop.classification.InterfaceStability;
039import org.apache.hadoop.metrics.ContextFactory;
040import org.apache.hadoop.metrics.MetricsContext;
041import org.apache.hadoop.metrics.MetricsException;
042import org.apache.hadoop.metrics.MetricsRecord;
043import org.apache.hadoop.metrics.Updater;
044
045/**
046 * The main class of the Service Provider Interface.  This class should be
047 * extended in order to integrate the Metrics API with a specific metrics
048 * client library. <p/>
049 *
050 * This class implements the internal table of metric data, and the timer
051 * on which data is to be sent to the metrics system.  Subclasses must
052 * override the abstract <code>emitRecord</code> method in order to transmit
053 * the data. <p/>
054 */
055@InterfaceAudience.Public
056@InterfaceStability.Evolving
057public abstract class AbstractMetricsContext implements MetricsContext {
058    
059  private int period = MetricsContext.DEFAULT_PERIOD;
060  private Timer timer = null;
061    
062  private Set<Updater> updaters = new HashSet<Updater>(1);
063  private volatile boolean isMonitoring = false;
064    
065  private ContextFactory factory = null;
066  private String contextName = null;
067    
068  @InterfaceAudience.Private
069  public static class TagMap extends TreeMap<String,Object> {
070    private static final long serialVersionUID = 3546309335061952993L;
071    TagMap() {
072      super();
073    }
074    TagMap(TagMap orig) {
075      super(orig);
076    }
077    /**
078     * Returns true if this tagmap contains every tag in other.
079     */
080    public boolean containsAll(TagMap other) {
081      for (Map.Entry<String,Object> entry : other.entrySet()) {
082        Object value = get(entry.getKey());
083        if (value == null || !value.equals(entry.getValue())) {
084          // either key does not exist here, or the value is different
085          return false;
086        }
087      }
088      return true;
089    }
090  }
091  
092  @InterfaceAudience.Private
093  public static class MetricMap extends TreeMap<String,Number> {
094    private static final long serialVersionUID = -7495051861141631609L;
095    MetricMap() {
096      super();
097    }
098    MetricMap(MetricMap orig) {
099      super(orig);
100    }
101  }
102            
103  static class RecordMap extends HashMap<TagMap,MetricMap> {
104    private static final long serialVersionUID = 259835619700264611L;
105  }
106    
107  private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
108    
109
110  /**
111   * Creates a new instance of AbstractMetricsContext
112   */
113  protected AbstractMetricsContext() {
114  }
115    
116  /**
117   * Initializes the context.
118   */
119  @Override
120  public void init(String contextName, ContextFactory factory) 
121  {
122    this.contextName = contextName;
123    this.factory = factory;
124  }
125    
126  /**
127   * Convenience method for subclasses to access factory attributes.
128   */
129  protected String getAttribute(String attributeName) {
130    String factoryAttribute = contextName + "." + attributeName;
131    return (String) factory.getAttribute(factoryAttribute);  
132  }
133    
134  /**
135   * Returns an attribute-value map derived from the factory attributes
136   * by finding all factory attributes that begin with 
137   * <i>contextName</i>.<i>tableName</i>.  The returned map consists of
138   * those attributes with the contextName and tableName stripped off.
139   */
140  protected Map<String,String> getAttributeTable(String tableName) {
141    String prefix = contextName + "." + tableName + ".";
142    Map<String,String> result = new HashMap<String,String>();
143    for (String attributeName : factory.getAttributeNames()) {
144      if (attributeName.startsWith(prefix)) {
145        String name = attributeName.substring(prefix.length());
146        String value = (String) factory.getAttribute(attributeName);
147        result.put(name, value);
148      }
149    }
150    return result;
151  }
152    
153  /**
154   * Returns the context name.
155   */
156  @Override
157  public String getContextName() {
158    return contextName;
159  }
160    
161  /**
162   * Returns the factory by which this context was created.
163   */
164  public ContextFactory getContextFactory() {
165    return factory;
166  }
167    
168  /**
169   * Starts or restarts monitoring, the emitting of metrics records.
170   */
171  @Override
172  public synchronized void startMonitoring()
173    throws IOException {
174    if (!isMonitoring) {
175      startTimer();
176      isMonitoring = true;
177    }
178  }
179    
180  /**
181   * Stops monitoring.  This does not free buffered data. 
182   * @see #close()
183   */
184  @Override
185  public synchronized void stopMonitoring() {
186    if (isMonitoring) {
187      stopTimer();
188      isMonitoring = false;
189    }
190  }
191    
192  /**
193   * Returns true if monitoring is currently in progress.
194   */
195  @Override
196  public boolean isMonitoring() {
197    return isMonitoring;
198  }
199    
200  /**
201   * Stops monitoring and frees buffered data, returning this
202   * object to its initial state.  
203   */
204  @Override
205  public synchronized void close() {
206    stopMonitoring();
207    clearUpdaters();
208  } 
209    
210  /**
211   * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
212   * Throws an exception if the metrics implementation is configured with a fixed
213   * set of record names and <code>recordName</code> is not in that set.
214   * 
215   * @param recordName the name of the record
216   * @throws MetricsException if recordName conflicts with configuration data
217   */
218  @Override
219  public final synchronized MetricsRecord createRecord(String recordName) {
220    if (bufferedData.get(recordName) == null) {
221      bufferedData.put(recordName, new RecordMap());
222    }
223    return newRecord(recordName);
224  }
225    
226  /**
227   * Subclasses should override this if they subclass MetricsRecordImpl.
228   * @param recordName the name of the record
229   * @return newly created instance of MetricsRecordImpl or subclass
230   */
231  protected MetricsRecord newRecord(String recordName) {
232    return new MetricsRecordImpl(recordName, this);
233  }
234    
235  /**
236   * Registers a callback to be called at time intervals determined by
237   * the configuration.
238   *
239   * @param updater object to be run periodically; it should update
240   * some metrics records 
241   */
242  @Override
243  public synchronized void registerUpdater(final Updater updater) {
244    if (!updaters.contains(updater)) {
245      updaters.add(updater);
246    }
247  }
248    
249  /**
250   * Removes a callback, if it exists.
251   *
252   * @param updater object to be removed from the callback list
253   */
254  @Override
255  public synchronized void unregisterUpdater(Updater updater) {
256    updaters.remove(updater);
257  }
258    
259  private synchronized void clearUpdaters() {
260    updaters.clear();
261  }
262    
263  /**
264   * Starts timer if it is not already started
265   */
266  private synchronized void startTimer() {
267    if (timer == null) {
268      timer = new Timer("Timer thread for monitoring " + getContextName(), 
269                        true);
270      TimerTask task = new TimerTask() {
271          @Override
272          public void run() {
273            try {
274              timerEvent();
275            } catch (IOException ioe) {
276              ioe.printStackTrace();
277            }
278          }
279        };
280      long millis = period * 1000;
281      timer.scheduleAtFixedRate(task, millis, millis);
282    }
283  }
284    
285  /**
286   * Stops timer if it is running
287   */
288  private synchronized void stopTimer() {
289    if (timer != null) {
290      timer.cancel();
291      timer = null;
292    }
293  }
294    
295  /**
296   * Timer callback.
297   */
298  private void timerEvent() throws IOException {
299    if (isMonitoring) {
300      Collection<Updater> myUpdaters;
301      synchronized (this) {
302        myUpdaters = new ArrayList<Updater>(updaters);
303      }     
304      // Run all the registered updates without holding a lock
305      // on this context
306      for (Updater updater : myUpdaters) {
307        try {
308          updater.doUpdates(this);
309        } catch (Throwable throwable) {
310          throwable.printStackTrace();
311        }
312      }
313      emitRecords();
314    }
315  }
316    
317  /**
318   *  Emits the records.
319   */
320  private synchronized void emitRecords() throws IOException {
321    for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) {
322      RecordMap recordMap = recordEntry.getValue();
323      synchronized (recordMap) {
324        Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet ();
325        for (Entry<TagMap, MetricMap> entry : entrySet) {
326          OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
327          emitRecord(contextName, recordEntry.getKey(), outRec);
328        }
329      }
330    }
331    flush();
332  }
333  
334  /**
335   * Retrieves all the records managed by this MetricsContext.
336   * Useful for monitoring systems that are polling-based.
337   * @return A non-null collection of all monitoring records.
338   */
339  @Override
340  public synchronized Map<String, Collection<OutputRecord>> getAllRecords() {
341    Map<String, Collection<OutputRecord>> out = new TreeMap<String, Collection<OutputRecord>>();
342    for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) {
343      RecordMap recordMap = recordEntry.getValue();
344      synchronized (recordMap) {
345        List<OutputRecord> records = new ArrayList<OutputRecord>();
346        Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet();
347        for (Entry<TagMap, MetricMap> entry : entrySet) {
348          OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
349          records.add(outRec);
350        }
351        out.put(recordEntry.getKey(), records);
352      }
353    }
354    return out;
355  }
356
357  /**
358   * Sends a record to the metrics system.
359   */
360  protected abstract void emitRecord(String contextName, String recordName, 
361                                     OutputRecord outRec) throws IOException;
362    
363  /**
364   * Called each period after all records have been emitted, this method does nothing.
365   * Subclasses may override it in order to perform some kind of flush.
366   */
367  protected void flush() throws IOException {
368  }
369    
370  /**
371   * Called by MetricsRecordImpl.update().  Creates or updates a row in
372   * the internal table of metric data.
373   */
374  protected void update(MetricsRecordImpl record) {
375    String recordName = record.getRecordName();
376    TagMap tagTable = record.getTagTable();
377    Map<String,MetricValue> metricUpdates = record.getMetricTable();
378        
379    RecordMap recordMap = getRecordMap(recordName);
380    synchronized (recordMap) {
381      MetricMap metricMap = recordMap.get(tagTable);
382      if (metricMap == null) {
383        metricMap = new MetricMap();
384        TagMap tagMap = new TagMap(tagTable); // clone tags
385        recordMap.put(tagMap, metricMap);
386      }
387
388      Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
389      for (Entry<String, MetricValue> entry : entrySet) {
390        String metricName = entry.getKey ();
391        MetricValue updateValue = entry.getValue ();
392        Number updateNumber = updateValue.getNumber();
393        Number currentNumber = metricMap.get(metricName);
394        if (currentNumber == null || updateValue.isAbsolute()) {
395          metricMap.put(metricName, updateNumber);
396        }
397        else {
398          Number newNumber = sum(updateNumber, currentNumber);
399          metricMap.put(metricName, newNumber);
400        }
401      }
402    }
403  }
404    
405  private synchronized RecordMap getRecordMap(String recordName) {
406    return bufferedData.get(recordName);
407  }
408    
409  /**
410   * Adds two numbers, coercing the second to the type of the first.
411   *
412   */
413  private Number sum(Number a, Number b) {
414    if (a instanceof Integer) {
415      return Integer.valueOf(a.intValue() + b.intValue());
416    }
417    else if (a instanceof Float) {
418      return new Float(a.floatValue() + b.floatValue());
419    }
420    else if (a instanceof Short) {
421      return Short.valueOf((short)(a.shortValue() + b.shortValue()));
422    }
423    else if (a instanceof Byte) {
424      return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
425    }
426    else if (a instanceof Long) {
427      return Long.valueOf((a.longValue() + b.longValue()));
428    }
429    else {
430      // should never happen
431      throw new MetricsException("Invalid number type");
432    }
433            
434  }
435    
436  /**
437   * Called by MetricsRecordImpl.remove().  Removes all matching rows in
438   * the internal table of metric data.  A row matches if it has the same
439   * tag names and values as record, but it may also have additional
440   * tags.
441   */    
442  protected void remove(MetricsRecordImpl record) {
443    String recordName = record.getRecordName();
444    TagMap tagTable = record.getTagTable();
445        
446    RecordMap recordMap = getRecordMap(recordName);
447    synchronized (recordMap) {
448      Iterator<TagMap> it = recordMap.keySet().iterator();
449      while (it.hasNext()) {
450        TagMap rowTags = it.next();
451        if (rowTags.containsAll(tagTable)) {
452          it.remove();
453        }
454      }
455    }
456  }
457    
458  /**
459   * Returns the timer period.
460   */
461  @Override
462  public int getPeriod() {
463    return period;
464  }
465    
466  /**
467   * Sets the timer period
468   */
469  protected void setPeriod(int period) {
470    this.period = period;
471  }
472  
473  /**
474   * If a period is set in the attribute passed in, override
475   * the default with it.
476   */
477  protected void parseAndSetPeriod(String attributeName) {
478    String periodStr = getAttribute(attributeName);
479    if (periodStr != null) {
480      int period = 0;
481      try {
482        period = Integer.parseInt(periodStr);
483      } catch (NumberFormatException nfe) {
484      }
485      if (period <= 0) {
486        throw new MetricsException("Invalid period: " + periodStr);
487      }
488      setPeriod(period);
489    }
490  }
491}