001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.metrics.spi;
019
020import java.io.IOException;
021import java.lang.reflect.InvocationHandler;
022import java.lang.reflect.Method;
023import java.lang.reflect.Proxy;
024import java.util.ArrayList;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028
029import org.apache.hadoop.classification.InterfaceAudience;
030import org.apache.hadoop.classification.InterfaceStability;
031import org.apache.hadoop.metrics.ContextFactory;
032import org.apache.hadoop.metrics.MetricsContext;
033import org.apache.hadoop.metrics.MetricsRecord;
034import org.apache.hadoop.metrics.MetricsUtil;
035import org.apache.hadoop.metrics.Updater;
036
037@InterfaceAudience.Public
038@InterfaceStability.Evolving
039public class CompositeContext extends AbstractMetricsContext {
040
041  private static final Log LOG = LogFactory.getLog(CompositeContext.class);
042  private static final String ARITY_LABEL = "arity";
043  private static final String SUB_FMT = "%s.sub%d";
044  private final ArrayList<MetricsContext> subctxt =
045    new ArrayList<MetricsContext>();
046
047  @InterfaceAudience.Private
048  public CompositeContext() {
049  }
050
051  @Override
052  @InterfaceAudience.Private
053  public void init(String contextName, ContextFactory factory) {
054    super.init(contextName, factory);
055    int nKids;
056    try {
057      String sKids = getAttribute(ARITY_LABEL);
058      nKids = Integer.parseInt(sKids);
059    } catch (Exception e) {
060      LOG.error("Unable to initialize composite metric " + contextName +
061                ": could not init arity", e);
062      return;
063    }
064    for (int i = 0; i < nKids; ++i) {
065      MetricsContext ctxt = MetricsUtil.getContext(
066          String.format(SUB_FMT, contextName, i), contextName);
067      if (null != ctxt) {
068        subctxt.add(ctxt);
069      }
070    }
071  }
072
073  @InterfaceAudience.Private
074  @Override
075  public MetricsRecord newRecord(String recordName) {
076    return (MetricsRecord) Proxy.newProxyInstance(
077        MetricsRecord.class.getClassLoader(),
078        new Class[] { MetricsRecord.class },
079        new MetricsRecordDelegator(recordName, subctxt));
080  }
081
082  @InterfaceAudience.Private
083  @Override
084  protected void emitRecord(String contextName, String recordName,
085      OutputRecord outRec) throws IOException {
086    for (MetricsContext ctxt : subctxt) {
087      try {
088        ((AbstractMetricsContext)ctxt).emitRecord(
089          contextName, recordName, outRec);
090        if (contextName == null || recordName == null || outRec == null) {
091          throw new IOException(contextName + ":" + recordName + ":" + outRec);
092        }
093      } catch (IOException e) {
094        LOG.warn("emitRecord failed: " + ctxt.getContextName(), e);
095      }
096    }
097  }
098
099  @InterfaceAudience.Private
100  @Override
101  protected void flush() throws IOException {
102    for (MetricsContext ctxt : subctxt) {
103      try {
104        ((AbstractMetricsContext)ctxt).flush();
105      } catch (IOException e) {
106        LOG.warn("flush failed: " + ctxt.getContextName(), e);
107      }
108    }
109  }
110
111  @InterfaceAudience.Private
112  @Override
113  public void startMonitoring() throws IOException {
114    for (MetricsContext ctxt : subctxt) {
115      try {
116        ctxt.startMonitoring();
117      } catch (IOException e) {
118        LOG.warn("startMonitoring failed: " + ctxt.getContextName(), e);
119      }
120    }
121  }
122
123  @InterfaceAudience.Private
124  @Override
125  public void stopMonitoring() {
126    for (MetricsContext ctxt : subctxt) {
127      ctxt.stopMonitoring();
128    }
129  }
130
131  /**
132   * Return true if all subcontexts are monitoring.
133   */
134  @InterfaceAudience.Private
135  @Override
136  public boolean isMonitoring() {
137    boolean ret = true;
138    for (MetricsContext ctxt : subctxt) {
139      ret &= ctxt.isMonitoring();
140    }
141    return ret;
142  }
143
144  @InterfaceAudience.Private
145  @Override
146  public void close() {
147    for (MetricsContext ctxt : subctxt) {
148      ctxt.close();
149    }
150  }
151
152  @InterfaceAudience.Private
153  @Override
154  public void registerUpdater(Updater updater) {
155    for (MetricsContext ctxt : subctxt) {
156      ctxt.registerUpdater(updater);
157    }
158  }
159
160  @InterfaceAudience.Private
161  @Override
162  public void unregisterUpdater(Updater updater) {
163    for (MetricsContext ctxt : subctxt) {
164      ctxt.unregisterUpdater(updater);
165    }
166  }
167
168  private static class MetricsRecordDelegator implements InvocationHandler {
169    private static final Method m_getRecordName = initMethod();
170    private static Method initMethod() {
171      try {
172        return MetricsRecord.class.getMethod("getRecordName", new Class[0]);
173      } catch (Exception e) {
174        throw new RuntimeException("Internal error", e);
175      }
176    }
177
178    private final String recordName;
179    private final ArrayList<MetricsRecord> subrecs;
180
181    MetricsRecordDelegator(String recordName, ArrayList<MetricsContext> ctxts) {
182      this.recordName = recordName;
183      this.subrecs = new ArrayList<MetricsRecord>(ctxts.size());
184      for (MetricsContext ctxt : ctxts) {
185        subrecs.add(ctxt.createRecord(recordName));
186      }
187    }
188
189    @Override
190    public Object invoke(Object p, Method m, Object[] args) throws Throwable {
191      if (m_getRecordName.equals(m)) {
192        return recordName;
193      }
194      assert Void.TYPE.equals(m.getReturnType());
195      for (MetricsRecord rec : subrecs) {
196        m.invoke(rec, args);
197      }
198      return null;
199    }
200  }
201
202}