001/*
002 * AbstractMetricsContext.java
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements.  See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership.  The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License.  You may obtain a copy of the License at
011 *
012 *     http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021package org.apache.hadoop.metrics.spi;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Set;
032import java.util.Timer;
033import java.util.TimerTask;
034import java.util.TreeMap;
035import java.util.Map.Entry;
036
037import org.apache.hadoop.classification.InterfaceAudience;
038import org.apache.hadoop.classification.InterfaceStability;
039import org.apache.hadoop.metrics.ContextFactory;
040import org.apache.hadoop.metrics.MetricsContext;
041import org.apache.hadoop.metrics.MetricsException;
042import org.apache.hadoop.metrics.MetricsRecord;
043import org.apache.hadoop.metrics.Updater;
044
045/**
046 * The main class of the Service Provider Interface.  This class should be
047 * extended in order to integrate the Metrics API with a specific metrics
048 * client library. <p/>
049 *
050 * This class implements the internal table of metric data, and the timer
051 * on which data is to be sent to the metrics system.  Subclasses must
052 * override the abstract <code>emitRecord</code> method in order to transmit
053 * the data. <p/>
054 *
055 * @deprecated Use org.apache.hadoop.metrics2 package instead.
056 */
057@Deprecated
058@InterfaceAudience.Public
059@InterfaceStability.Evolving
060public abstract class AbstractMetricsContext implements MetricsContext {
061    
062  private int period = MetricsContext.DEFAULT_PERIOD;
063  private Timer timer = null;
064    
065  private Set<Updater> updaters = new HashSet<Updater>(1);
066  private volatile boolean isMonitoring = false;
067    
068  private ContextFactory factory = null;
069  private String contextName = null;
070    
071  @InterfaceAudience.Private
072  public static class TagMap extends TreeMap<String,Object> {
073    private static final long serialVersionUID = 3546309335061952993L;
074    TagMap() {
075      super();
076    }
077    TagMap(TagMap orig) {
078      super(orig);
079    }
080    /**
081     * Returns true if this tagmap contains every tag in other.
082     */
083    public boolean containsAll(TagMap other) {
084      for (Map.Entry<String,Object> entry : other.entrySet()) {
085        Object value = get(entry.getKey());
086        if (value == null || !value.equals(entry.getValue())) {
087          // either key does not exist here, or the value is different
088          return false;
089        }
090      }
091      return true;
092    }
093  }
094  
095  @InterfaceAudience.Private
096  public static class MetricMap extends TreeMap<String,Number> {
097    private static final long serialVersionUID = -7495051861141631609L;
098    MetricMap() {
099      super();
100    }
101    MetricMap(MetricMap orig) {
102      super(orig);
103    }
104  }
105            
106  static class RecordMap extends HashMap<TagMap,MetricMap> {
107    private static final long serialVersionUID = 259835619700264611L;
108  }
109    
110  private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
111    
112
113  /**
114   * Creates a new instance of AbstractMetricsContext
115   */
116  protected AbstractMetricsContext() {
117  }
118    
119  /**
120   * Initializes the context.
121   */
122  @Override
123  public void init(String contextName, ContextFactory factory) 
124  {
125    this.contextName = contextName;
126    this.factory = factory;
127  }
128    
129  /**
130   * Convenience method for subclasses to access factory attributes.
131   */
132  protected String getAttribute(String attributeName) {
133    String factoryAttribute = contextName + "." + attributeName;
134    return (String) factory.getAttribute(factoryAttribute);  
135  }
136    
137  /**
138   * Returns an attribute-value map derived from the factory attributes
139   * by finding all factory attributes that begin with 
140   * <i>contextName</i>.<i>tableName</i>.  The returned map consists of
141   * those attributes with the contextName and tableName stripped off.
142   */
143  protected Map<String,String> getAttributeTable(String tableName) {
144    String prefix = contextName + "." + tableName + ".";
145    Map<String,String> result = new HashMap<String,String>();
146    for (String attributeName : factory.getAttributeNames()) {
147      if (attributeName.startsWith(prefix)) {
148        String name = attributeName.substring(prefix.length());
149        String value = (String) factory.getAttribute(attributeName);
150        result.put(name, value);
151      }
152    }
153    return result;
154  }
155    
156  /**
157   * Returns the context name.
158   */
159  @Override
160  public String getContextName() {
161    return contextName;
162  }
163    
164  /**
165   * Returns the factory by which this context was created.
166   */
167  public ContextFactory getContextFactory() {
168    return factory;
169  }
170    
171  /**
172   * Starts or restarts monitoring, the emitting of metrics records.
173   */
174  @Override
175  public synchronized void startMonitoring()
176    throws IOException {
177    if (!isMonitoring) {
178      startTimer();
179      isMonitoring = true;
180    }
181  }
182    
183  /**
184   * Stops monitoring.  This does not free buffered data. 
185   * @see #close()
186   */
187  @Override
188  public synchronized void stopMonitoring() {
189    if (isMonitoring) {
190      stopTimer();
191      isMonitoring = false;
192    }
193  }
194    
195  /**
196   * Returns true if monitoring is currently in progress.
197   */
198  @Override
199  public boolean isMonitoring() {
200    return isMonitoring;
201  }
202    
203  /**
204   * Stops monitoring and frees buffered data, returning this
205   * object to its initial state.  
206   */
207  @Override
208  public synchronized void close() {
209    stopMonitoring();
210    clearUpdaters();
211  } 
212    
213  /**
214   * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
215   * Throws an exception if the metrics implementation is configured with a fixed
216   * set of record names and <code>recordName</code> is not in that set.
217   * 
218   * @param recordName the name of the record
219   * @throws MetricsException if recordName conflicts with configuration data
220   */
221  @Override
222  public final synchronized MetricsRecord createRecord(String recordName) {
223    if (bufferedData.get(recordName) == null) {
224      bufferedData.put(recordName, new RecordMap());
225    }
226    return newRecord(recordName);
227  }
228    
229  /**
230   * Subclasses should override this if they subclass MetricsRecordImpl.
231   * @param recordName the name of the record
232   * @return newly created instance of MetricsRecordImpl or subclass
233   */
234  protected MetricsRecord newRecord(String recordName) {
235    return new MetricsRecordImpl(recordName, this);
236  }
237    
238  /**
239   * Registers a callback to be called at time intervals determined by
240   * the configuration.
241   *
242   * @param updater object to be run periodically; it should update
243   * some metrics records 
244   */
245  @Override
246  public synchronized void registerUpdater(final Updater updater) {
247    if (!updaters.contains(updater)) {
248      updaters.add(updater);
249    }
250  }
251    
252  /**
253   * Removes a callback, if it exists.
254   *
255   * @param updater object to be removed from the callback list
256   */
257  @Override
258  public synchronized void unregisterUpdater(Updater updater) {
259    updaters.remove(updater);
260  }
261    
262  private synchronized void clearUpdaters() {
263    updaters.clear();
264  }
265    
266  /**
267   * Starts timer if it is not already started
268   */
269  private synchronized void startTimer() {
270    if (timer == null) {
271      timer = new Timer("Timer thread for monitoring " + getContextName(), 
272                        true);
273      TimerTask task = new TimerTask() {
274          @Override
275          public void run() {
276            try {
277              timerEvent();
278            } catch (IOException ioe) {
279              ioe.printStackTrace();
280            }
281          }
282        };
283      long millis = period * 1000;
284      timer.scheduleAtFixedRate(task, millis, millis);
285    }
286  }
287    
288  /**
289   * Stops timer if it is running
290   */
291  private synchronized void stopTimer() {
292    if (timer != null) {
293      timer.cancel();
294      timer = null;
295    }
296  }
297    
298  /**
299   * Timer callback.
300   */
301  private void timerEvent() throws IOException {
302    if (isMonitoring) {
303      Collection<Updater> myUpdaters;
304      synchronized (this) {
305        myUpdaters = new ArrayList<Updater>(updaters);
306      }     
307      // Run all the registered updates without holding a lock
308      // on this context
309      for (Updater updater : myUpdaters) {
310        try {
311          updater.doUpdates(this);
312        } catch (Throwable throwable) {
313          throwable.printStackTrace();
314        }
315      }
316      emitRecords();
317    }
318  }
319    
320  /**
321   *  Emits the records.
322   */
323  private synchronized void emitRecords() throws IOException {
324    for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) {
325      RecordMap recordMap = recordEntry.getValue();
326      synchronized (recordMap) {
327        Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet ();
328        for (Entry<TagMap, MetricMap> entry : entrySet) {
329          OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
330          emitRecord(contextName, recordEntry.getKey(), outRec);
331        }
332      }
333    }
334    flush();
335  }
336  
337  /**
338   * Retrieves all the records managed by this MetricsContext.
339   * Useful for monitoring systems that are polling-based.
340   * @return A non-null collection of all monitoring records.
341   */
342  @Override
343  public synchronized Map<String, Collection<OutputRecord>> getAllRecords() {
344    Map<String, Collection<OutputRecord>> out = new TreeMap<String, Collection<OutputRecord>>();
345    for (Map.Entry<String,RecordMap> recordEntry : bufferedData.entrySet()) {
346      RecordMap recordMap = recordEntry.getValue();
347      synchronized (recordMap) {
348        List<OutputRecord> records = new ArrayList<OutputRecord>();
349        Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet();
350        for (Entry<TagMap, MetricMap> entry : entrySet) {
351          OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
352          records.add(outRec);
353        }
354        out.put(recordEntry.getKey(), records);
355      }
356    }
357    return out;
358  }
359
360  /**
361   * Sends a record to the metrics system.
362   */
363  protected abstract void emitRecord(String contextName, String recordName, 
364                                     OutputRecord outRec) throws IOException;
365    
366  /**
367   * Called each period after all records have been emitted, this method does nothing.
368   * Subclasses may override it in order to perform some kind of flush.
369   */
370  protected void flush() throws IOException {
371  }
372    
373  /**
374   * Called by MetricsRecordImpl.update().  Creates or updates a row in
375   * the internal table of metric data.
376   */
377  protected void update(MetricsRecordImpl record) {
378    String recordName = record.getRecordName();
379    TagMap tagTable = record.getTagTable();
380    Map<String,MetricValue> metricUpdates = record.getMetricTable();
381        
382    RecordMap recordMap = getRecordMap(recordName);
383    synchronized (recordMap) {
384      MetricMap metricMap = recordMap.get(tagTable);
385      if (metricMap == null) {
386        metricMap = new MetricMap();
387        TagMap tagMap = new TagMap(tagTable); // clone tags
388        recordMap.put(tagMap, metricMap);
389      }
390
391      Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
392      for (Entry<String, MetricValue> entry : entrySet) {
393        String metricName = entry.getKey ();
394        MetricValue updateValue = entry.getValue ();
395        Number updateNumber = updateValue.getNumber();
396        Number currentNumber = metricMap.get(metricName);
397        if (currentNumber == null || updateValue.isAbsolute()) {
398          metricMap.put(metricName, updateNumber);
399        }
400        else {
401          Number newNumber = sum(updateNumber, currentNumber);
402          metricMap.put(metricName, newNumber);
403        }
404      }
405    }
406  }
407    
408  private synchronized RecordMap getRecordMap(String recordName) {
409    return bufferedData.get(recordName);
410  }
411    
412  /**
413   * Adds two numbers, coercing the second to the type of the first.
414   *
415   */
416  private Number sum(Number a, Number b) {
417    if (a instanceof Integer) {
418      return Integer.valueOf(a.intValue() + b.intValue());
419    }
420    else if (a instanceof Float) {
421      return new Float(a.floatValue() + b.floatValue());
422    }
423    else if (a instanceof Short) {
424      return Short.valueOf((short)(a.shortValue() + b.shortValue()));
425    }
426    else if (a instanceof Byte) {
427      return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
428    }
429    else if (a instanceof Long) {
430      return Long.valueOf((a.longValue() + b.longValue()));
431    }
432    else {
433      // should never happen
434      throw new MetricsException("Invalid number type");
435    }
436            
437  }
438    
439  /**
440   * Called by MetricsRecordImpl.remove().  Removes all matching rows in
441   * the internal table of metric data.  A row matches if it has the same
442   * tag names and values as record, but it may also have additional
443   * tags.
444   */    
445  protected void remove(MetricsRecordImpl record) {
446    String recordName = record.getRecordName();
447    TagMap tagTable = record.getTagTable();
448        
449    RecordMap recordMap = getRecordMap(recordName);
450    synchronized (recordMap) {
451      Iterator<TagMap> it = recordMap.keySet().iterator();
452      while (it.hasNext()) {
453        TagMap rowTags = it.next();
454        if (rowTags.containsAll(tagTable)) {
455          it.remove();
456        }
457      }
458    }
459  }
460    
461  /**
462   * Returns the timer period.
463   */
464  @Override
465  public int getPeriod() {
466    return period;
467  }
468    
469  /**
470   * Sets the timer period
471   */
472  protected void setPeriod(int period) {
473    this.period = period;
474  }
475  
476  /**
477   * If a period is set in the attribute passed in, override
478   * the default with it.
479   */
480  protected void parseAndSetPeriod(String attributeName) {
481    String periodStr = getAttribute(attributeName);
482    if (periodStr != null) {
483      int period = 0;
484      try {
485        period = Integer.parseInt(periodStr);
486      } catch (NumberFormatException nfe) {
487      }
488      if (period <= 0) {
489        throw new MetricsException("Invalid period: " + periodStr);
490      }
491      setPeriod(period);
492    }
493  }
494}