001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 * <p/>
010 * http://www.apache.org/licenses/LICENSE-2.0
011 * <p/>
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.metrics2.sink;
020
021import com.google.common.base.Strings;
022import org.apache.kafka.clients.producer.Producer;
023import org.apache.kafka.clients.producer.KafkaProducer;
024import org.apache.commons.configuration.SubsetConfiguration;
025import org.apache.hadoop.classification.InterfaceAudience;
026import org.apache.hadoop.classification.InterfaceStability;
027import org.apache.hadoop.metrics2.AbstractMetric;
028import org.apache.hadoop.metrics2.MetricsException;
029import org.apache.hadoop.metrics2.MetricsRecord;
030import org.apache.hadoop.metrics2.MetricsSink;
031import org.apache.hadoop.metrics2.MetricsTag;
032import org.apache.kafka.clients.producer.ProducerRecord;
033import org.apache.kafka.clients.producer.RecordMetadata;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import java.io.Closeable;
038import java.io.IOException;
039import java.net.InetAddress;
040import java.nio.charset.Charset;
041import java.text.SimpleDateFormat;
042import java.util.Date;
043import java.util.Properties;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.Future;
046
047/**
048 * A metrics sink that writes to a Kafka broker. This requires you to configure
049 * a broker_list and a topic in the metrics2 configuration file. The broker_list
050 * must contain a comma-separated list of kafka broker host and ports. The topic
051 * will contain only one topic.
052 */
053@InterfaceAudience.Public
054@InterfaceStability.Evolving
055public class KafkaSink implements MetricsSink, Closeable {
056  private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
057  public static final String BROKER_LIST = "broker_list";
058  public static final String TOPIC = "topic";
059
060  private String hostname = null;
061  private String brokerList = null;
062  private String topic = null;
063  private Producer<Integer, byte[]> producer = null;
064
065  public void setProducer(Producer<Integer, byte[]> p) {
066    this.producer = p;
067  }
068
069  @Override
070  public void init(SubsetConfiguration conf) {
071    // Get Kafka broker configuration.
072    Properties props = new Properties();
073    brokerList = conf.getString(BROKER_LIST);
074    if (LOG.isDebugEnabled()) {
075      LOG.debug("Broker list " + brokerList);
076    }
077    props.put("bootstrap.servers", brokerList);
078    if (LOG.isDebugEnabled()) {
079      LOG.debug("Kafka brokers: " + brokerList);
080    }
081
082    // Get Kafka topic configuration.
083    topic = conf.getString(TOPIC);
084    if (LOG.isDebugEnabled()) {
085      LOG.debug("Kafka topic " + topic);
086    }
087    if (Strings.isNullOrEmpty(topic)) {
088      throw new MetricsException("Kafka topic can not be null");
089    }
090
091    // Set the rest of Kafka configuration.
092    props.put("key.serializer",
093        "org.apache.kafka.common.serialization.ByteArraySerializer");
094    props.put("value.serializer",
095        "org.apache.kafka.common.serialization.ByteArraySerializer");
096    props.put("request.required.acks", "0");
097
098    // Set the hostname once and use it in every message.
099    hostname = "null";
100    try {
101      hostname = InetAddress.getLocalHost().getHostName();
102    } catch (Exception e) {
103      LOG.warn("Error getting Hostname, going to continue");
104    }
105
106    try {
107      // Create the producer object.
108      producer = new KafkaProducer<Integer, byte[]>(props);
109    } catch (Exception e) {
110      throw new MetricsException("Error creating Producer, " + brokerList, e);
111    }
112  }
113
114  @Override
115  public void putMetrics(MetricsRecord record) {
116
117    if (producer == null) {
118      throw new MetricsException("Producer in KafkaSink is null!");
119    }
120
121    // Create the json object.
122    StringBuilder jsonLines = new StringBuilder();
123
124    Long timestamp = record.timestamp();
125    Date currDate = new Date(timestamp);
126    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
127    SimpleDateFormat timeFormat = new SimpleDateFormat("hh:mm:ss");
128    String date = dateFormat.format(currDate);
129    String time = timeFormat.format(currDate);
130
131    // Collect datapoints and populate the json object.
132    jsonLines.append("{\"hostname\": \"" + hostname);
133    jsonLines.append("\", \"timestamp\": " + timestamp);
134    jsonLines.append(", \"date\": \"" + date);
135    jsonLines.append("\",\"time\": \"" + time);
136    jsonLines.append("\",\"name\": \"" + record.name() + "\" ");
137    for (MetricsTag tag : record.tags()) {
138      jsonLines.append(
139          ", \"" + tag.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
140      jsonLines.append(" \"" + tag.value().toString() + "\"");
141    }
142    for (AbstractMetric metric : record.metrics()) {
143      jsonLines.append(", \""
144          + metric.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
145      jsonLines.append(" \"" + metric.value().toString() + "\"");
146    }
147    jsonLines.append("}");
148    LOG.debug("kafka message: " + jsonLines.toString());
149
150    // Create the record to be sent from the json.
151    ProducerRecord<Integer, byte[]> data = new ProducerRecord<Integer, byte[]>(
152        topic, jsonLines.toString().getBytes(Charset.forName("UTF-8")));
153
154    // Send the data to the Kafka broker. Here is an example of this data:
155    // {"hostname": "...", "timestamp": 1436913651516,
156    // "date": "2015-6-14","time": "22:40:51","context": "yarn","name":
157    // "QueueMetrics, "running_0": "1", "running_60": "0", "running_300": "0",
158    // "running_1440": "0", "AppsSubmitted": "1", "AppsRunning": "1",
159    // "AppsPending": "0", "AppsCompleted": "0", "AppsKilled": "0",
160    // "AppsFailed": "0", "AllocatedMB": "134656", "AllocatedVCores": "132",
161    // "AllocatedContainers": "132", "AggregateContainersAllocated": "132",
162    // "AggregateContainersReleased": "0", "AvailableMB": "0",
163    // "AvailableVCores": "0", "PendingMB": "275456", "PendingVCores": "269",
164    // "PendingContainers": "269", "ReservedMB": "0", "ReservedVCores": "0",
165    // "ReservedContainers": "0", "ActiveUsers": "1", "ActiveApplications": "1"}
166    Future<RecordMetadata> future = producer.send(data);
167    jsonLines.setLength(0);
168    try {
169      future.get();
170    } catch (InterruptedException e) {
171      throw new MetricsException("Error sending data", e);
172    } catch (ExecutionException e) {
173      throw new MetricsException("Error sending data", e);
174    }
175  }
176
177  @Override
178  public void flush() {
179    LOG.debug("Kafka seems not to have any flush() mechanism!");
180  }
181
182  @Override
183  public void close() throws IOException {
184    // Close the producer and set it to null.
185    try {
186      producer.close();
187    } catch (RuntimeException e) {
188      throw new MetricsException("Error closing producer", e);
189    } finally {
190      producer = null;
191    }
192  }
193}