2 * collectd - src/riemann.c
4 * Copyright (C) 2012 Pierre-Yves Ritschard <pyr@spootnik.org>
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
15 * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
16 * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
23 #include "configfile.h"
24 #include "riemann.pb-c.h"
26 #include <sys/socket.h>
27 #include <arpa/inet.h>
33 #define RIEMANN_DELAY 1
34 #define RIEMANN_PORT 5555
35 #define RIEMANN_MAX_TAGS 37
36 #define RIEMANN_EXTRA_TAGS 32
39 struct riemann_host *next;
40 #define F_CONNECT 0x01
44 char name[DATA_MAX_NAME_LEN];
49 struct riemann_event {
51 char service[DATA_MAX_NAME_LEN];
52 const char *tags[RIEMANN_MAX_TAGS];
55 char *riemann_tags[RIEMANN_EXTRA_TAGS];
58 int riemann_write(const data_set_t *, const value_list_t *, user_data_t *);
59 int riemann_connect(struct riemann_host *);
60 void riemann_free(void *);
61 int riemann_config_host(oconfig_item_t *);
62 int riemann_config(oconfig_item_t *);
63 void module_register(void);
69 riemann_write(const data_set_t *ds,
70 const value_list_t *vl,
75 struct riemann_host *host = ud->data;
78 struct riemann_event *event_tab, *event;
82 if ((status = riemann_connect(host)) != 0)
85 msg.n_events = vl->values_len;
88 * Get rid of allocations up front
90 if ((msg.events = calloc(msg.n_events, sizeof(*msg.events))) == NULL ||
91 (event_tab = calloc(msg.n_events, sizeof(*event_tab))) == NULL) {
98 * Now produce valid protobuf structures
100 for (i = 0; i < vl->values_len; i++) {
101 event = &event_tab[i];
102 event__init(&event->ev);
106 ev->host = host->name;
108 ev->time = CDTIME_T_TO_TIME_T(vl->time);
110 ev->ttl = CDTIME_T_TO_TIME_T(vl->interval) + host->delay;
112 ev->tags = (char **)event->tags;
113 event->tags[0] = DS_TYPE_TO_STRING(ds->ds[i].type);
114 event->tags[1] = vl->plugin;
115 event->tags[2] = ds->ds[i].name;
116 if (vl->plugin_instance && strlen(vl->plugin_instance)) {
117 event->tags[ev->n_tags++] = vl->plugin_instance;
119 if (vl->type && strlen(vl->type)) {
120 event->tags[ev->n_tags++] = vl->type;
122 if (vl->type_instance && strlen(vl->type_instance)) {
123 event->tags[ev->n_tags++] = vl->type_instance;
126 /* add user defined extra tags */
127 for (j = 0; j < riemann_tagcount; j++)
128 event->tags[ev->n_tags++] = riemann_tags[j];
130 switch (ds->ds[i].type) {
131 case DS_TYPE_COUNTER:
132 ev->has_metric_sint64 = 1;
133 ev->metric_sint64 = vl->values[i].counter;
136 ev->has_metric_d = 1;
137 ev->metric_d = vl->values[i].gauge;
140 ev->has_metric_sint64 = 1;
141 ev->metric_sint64 = vl->values[i].derive;
143 case DS_TYPE_ABSOLUTE:
144 ev->has_metric_sint64 = 1;
145 ev->metric_sint64 = vl->values[i].absolute;
148 WARNING("riemann_write: unknown metric type: %d",
152 ssnprintf(event->service, sizeof(event->service),
153 "%s-%s-%s-%s-%s", vl->plugin, vl->plugin_instance,
154 vl->type, vl->type_instance, ds->ds[i].name);
155 ev->service = event->service;
156 DEBUG("riemann_write: %s ready to send", ev->service);
161 * we have now packed a bunch of events, let's pack them
163 len = msg__get_packed_size(&msg);
164 DEBUG("riemann_write: packed size computed: %ld", len);
165 if ((buf = calloc(1, len)) == NULL) {
166 WARNING("riemann_write: failing to alloc buf!");
172 * prepend full size to beginning of buffer
174 msg__pack(&msg, buf);
178 * we're now ready to send
180 if (write(host->s, buf, len) != len) {
181 WARNING("riemann_write: could not send out full packet");
189 riemann_connect(struct riemann_host *host)
192 struct addrinfo *ai, *res, hints;
193 struct sockaddr_in *sin4;
194 struct sockaddr_in6 *sin6;
196 if (host->flags & F_CONNECT)
199 memset(&hints, 0, sizeof(hints));
200 hints.ai_family = PF_UNSPEC;
201 hints.ai_socktype = SOCK_DGRAM;
203 if ((e = getaddrinfo(host->name, NULL, &hints, &res)) != 0) {
204 WARNING("could not resolve host \"%s\": %s",
205 host->name, gai_strerror(e));
209 for (ai = res; ai != NULL; ai = ai->ai_next) {
210 pthread_mutex_lock(&host->lock);
212 * check if another thread did not already succesfully connect
214 if (host->flags & F_CONNECT) {
219 if ((host->s = socket(ai->ai_family, SOCK_DGRAM, 0)) == -1) {
220 pthread_mutex_unlock(&host->lock);
221 WARNING("riemann_connect: could not open socket");
226 switch (ai->ai_family) {
228 sin4 = (struct sockaddr_in *)ai->ai_addr;
229 sin4->sin_port = ntohs(host->port);
232 sin6 = (struct sockaddr_in6 *)ai->ai_addr;
233 sin6->sin6_port = ntohs(host->port);
236 WARNING("riemann_connect: unsupported address family");
238 pthread_mutex_unlock(&host->lock);
243 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
245 host->flags |= ~F_CONNECT;
246 pthread_mutex_unlock(&host->lock);
250 host->flags |= F_CONNECT;
251 DEBUG("got a succesful connection for: %s", host->name);
252 pthread_mutex_unlock(&host->lock);
258 WARNING("riemann_connect: no suitable hosts found");
266 riemann_free(void *p)
268 struct riemann_host *host = p;
270 if (host->flags & F_CONNECT)
276 riemann_config_host(oconfig_item_t *ci)
278 struct riemann_host *host = NULL;
281 oconfig_item_t *child;
282 char cb_name[DATA_MAX_NAME_LEN];
285 if (ci->values_num != 1 ||
286 ci->values[0].type != OCONFIG_TYPE_STRING) {
287 WARNING("riemann hosts need one string argument");
291 if ((host = calloc(1, sizeof (*host))) == NULL) {
292 WARNING("riemann host allocation failed");
296 if (cf_util_get_string_buffer(ci, host->name,
297 sizeof(host->name)) != 0) {
298 WARNING("riemann host name too long");
303 host->port = RIEMANN_PORT;
304 host->delay = RIEMANN_DELAY;
305 for (i = 0; i < ci->children_num; i++) {
307 * The code here could be simplified but makes room
308 * for easy adding of new options later on.
310 child = &ci->children[i];
313 if (strcasecmp(child->key, "port") == 0) {
314 if ((status = cf_util_get_port_number(child)) < 0) {
315 WARNING("invalid port number");
320 } else if (strcasecmp(child->key, "delay") == 0) {
321 if ((status = cf_util_get_int(ci, &host->delay)) != 0)
324 WARNING("riemann plugin: ignoring unknown config "
325 "option: \"%s\"", child->key);
333 pthread_mutex_init(&host->lock, NULL);
334 ssnprintf(cb_name, sizeof(cb_name), "riemann/%s:%d", host->name, host->port);
335 DEBUG("riemann cb_name: %s", cb_name);
337 ud.free_func = riemann_free;
339 if ((status = plugin_register_write(cb_name, riemann_write, &ud)) != 0)
346 riemann_config(oconfig_item_t *ci)
350 oconfig_item_t *child;
352 for (i = 0; i < ci->children_num; i++) {
353 child = &ci->children[i];
355 if (strcasecmp(child->key, "host") == 0) {
356 riemann_config_host(child);
357 } else if (strcasecmp(child->key, "tag") == 0) {
358 if (riemann_tagcount >= RIEMANN_EXTRA_TAGS) {
359 WARNING("riemann plugin: too many tags");
363 cf_util_get_string(child, &newtag);
366 riemann_tags[riemann_tagcount++] = newtag;
367 DEBUG("riemann_config: got tag: %s", newtag);
370 WARNING ("riemann plugin: Ignoring unknown "
371 "configuration option \"%s\" at top level.",
379 module_register(void)
381 DEBUG("riemann: module_register");
383 plugin_register_complex_config ("riemann", riemann_config);