001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * <p/> 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * <p/> 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.metrics2.sink; 020 021import com.google.common.base.Strings; 022import org.apache.kafka.clients.producer.Producer; 023import org.apache.kafka.clients.producer.KafkaProducer; 024import org.apache.commons.configuration.SubsetConfiguration; 025import org.apache.hadoop.classification.InterfaceAudience; 026import org.apache.hadoop.classification.InterfaceStability; 027import org.apache.hadoop.metrics2.AbstractMetric; 028import org.apache.hadoop.metrics2.MetricsException; 029import org.apache.hadoop.metrics2.MetricsRecord; 030import org.apache.hadoop.metrics2.MetricsSink; 031import org.apache.hadoop.metrics2.MetricsTag; 032import org.apache.kafka.clients.producer.ProducerRecord; 033import org.apache.kafka.clients.producer.RecordMetadata; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import java.io.Closeable; 038import java.io.IOException; 039import java.net.InetAddress; 040import java.nio.charset.Charset; 041import java.text.SimpleDateFormat; 042import java.util.Date; 043import java.util.Properties; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.Future; 046 047/** 048 * A metrics sink that writes to a Kafka broker. This requires you to configure 049 * a broker_list and a topic in the metrics2 configuration file. The broker_list 050 * must contain a comma-separated list of kafka broker host and ports. The topic 051 * will contain only one topic. 052 */ 053@InterfaceAudience.Public 054@InterfaceStability.Evolving 055public class KafkaSink implements MetricsSink, Closeable { 056 private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); 057 public static final String BROKER_LIST = "broker_list"; 058 public static final String TOPIC = "topic"; 059 060 private String hostname = null; 061 private String brokerList = null; 062 private String topic = null; 063 private Producer<Integer, byte[]> producer = null; 064 065 public void setProducer(Producer<Integer, byte[]> p) { 066 this.producer = p; 067 } 068 069 @Override 070 public void init(SubsetConfiguration conf) { 071 // Get Kafka broker configuration. 072 Properties props = new Properties(); 073 brokerList = conf.getString(BROKER_LIST); 074 if (LOG.isDebugEnabled()) { 075 LOG.debug("Broker list " + brokerList); 076 } 077 props.put("bootstrap.servers", brokerList); 078 if (LOG.isDebugEnabled()) { 079 LOG.debug("Kafka brokers: " + brokerList); 080 } 081 082 // Get Kafka topic configuration. 083 topic = conf.getString(TOPIC); 084 if (LOG.isDebugEnabled()) { 085 LOG.debug("Kafka topic " + topic); 086 } 087 if (Strings.isNullOrEmpty(topic)) { 088 throw new MetricsException("Kafka topic can not be null"); 089 } 090 091 // Set the rest of Kafka configuration. 092 props.put("key.serializer", 093 "org.apache.kafka.common.serialization.ByteArraySerializer"); 094 props.put("value.serializer", 095 "org.apache.kafka.common.serialization.ByteArraySerializer"); 096 props.put("request.required.acks", "0"); 097 098 // Set the hostname once and use it in every message. 099 hostname = "null"; 100 try { 101 hostname = InetAddress.getLocalHost().getHostName(); 102 } catch (Exception e) { 103 LOG.warn("Error getting Hostname, going to continue"); 104 } 105 106 try { 107 // Create the producer object. 108 producer = new KafkaProducer<Integer, byte[]>(props); 109 } catch (Exception e) { 110 throw new MetricsException("Error creating Producer, " + brokerList, e); 111 } 112 } 113 114 @Override 115 public void putMetrics(MetricsRecord record) { 116 117 if (producer == null) { 118 throw new MetricsException("Producer in KafkaSink is null!"); 119 } 120 121 // Create the json object. 122 StringBuilder jsonLines = new StringBuilder(); 123 124 Long timestamp = record.timestamp(); 125 Date currDate = new Date(timestamp); 126 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); 127 SimpleDateFormat timeFormat = new SimpleDateFormat("hh:mm:ss"); 128 String date = dateFormat.format(currDate); 129 String time = timeFormat.format(currDate); 130 131 // Collect datapoints and populate the json object. 132 jsonLines.append("{\"hostname\": \"" + hostname); 133 jsonLines.append("\", \"timestamp\": " + timestamp); 134 jsonLines.append(", \"date\": \"" + date); 135 jsonLines.append("\",\"time\": \"" + time); 136 jsonLines.append("\",\"name\": \"" + record.name() + "\" "); 137 for (MetricsTag tag : record.tags()) { 138 jsonLines.append( 139 ", \"" + tag.name().toString().replaceAll("[\\p{Cc}]", "") + "\": "); 140 jsonLines.append(" \"" + tag.value().toString() + "\""); 141 } 142 for (AbstractMetric metric : record.metrics()) { 143 jsonLines.append(", \"" 144 + metric.name().toString().replaceAll("[\\p{Cc}]", "") + "\": "); 145 jsonLines.append(" \"" + metric.value().toString() + "\""); 146 } 147 jsonLines.append("}"); 148 LOG.debug("kafka message: " + jsonLines.toString()); 149 150 // Create the record to be sent from the json. 151 ProducerRecord<Integer, byte[]> data = new ProducerRecord<Integer, byte[]>( 152 topic, jsonLines.toString().getBytes(Charset.forName("UTF-8"))); 153 154 // Send the data to the Kafka broker. Here is an example of this data: 155 // {"hostname": "...", "timestamp": 1436913651516, 156 // "date": "2015-6-14","time": "22:40:51","context": "yarn","name": 157 // "QueueMetrics, "running_0": "1", "running_60": "0", "running_300": "0", 158 // "running_1440": "0", "AppsSubmitted": "1", "AppsRunning": "1", 159 // "AppsPending": "0", "AppsCompleted": "0", "AppsKilled": "0", 160 // "AppsFailed": "0", "AllocatedMB": "134656", "AllocatedVCores": "132", 161 // "AllocatedContainers": "132", "AggregateContainersAllocated": "132", 162 // "AggregateContainersReleased": "0", "AvailableMB": "0", 163 // "AvailableVCores": "0", "PendingMB": "275456", "PendingVCores": "269", 164 // "PendingContainers": "269", "ReservedMB": "0", "ReservedVCores": "0", 165 // "ReservedContainers": "0", "ActiveUsers": "1", "ActiveApplications": "1"} 166 Future<RecordMetadata> future = producer.send(data); 167 jsonLines.setLength(0); 168 try { 169 future.get(); 170 } catch (InterruptedException e) { 171 throw new MetricsException("Error sending data", e); 172 } catch (ExecutionException e) { 173 throw new MetricsException("Error sending data", e); 174 } 175 } 176 177 @Override 178 public void flush() { 179 LOG.debug("Kafka seems not to have any flush() mechanism!"); 180 } 181 182 @Override 183 public void close() throws IOException { 184 // Close the producer and set it to null. 185 try { 186 producer.close(); 187 } catch (RuntimeException e) { 188 throw new MetricsException("Error closing producer", e); 189 } finally { 190 producer = null; 191 } 192 } 193}