2 * collectd - src/riemann.c
4 * Copyright (C) 2012 Pierre-Yves Ritschard <pyr@spootnik.org>
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
15 * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
16 * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
23 #include "configfile.h"
24 #include "riemann.pb-c.h"
26 #include <sys/socket.h>
27 #include <arpa/inet.h>
33 #define RIEMANN_DELAY 1
34 #define RIEMANN_PORT 5555
35 #define RIEMANN_MAX_TAGS 37
36 #define RIEMANN_EXTRA_TAGS 32
39 #define F_CONNECT 0x01
43 char name[DATA_MAX_NAME_LEN];
48 struct riemann_event {
50 char service[DATA_MAX_NAME_LEN];
51 const char *tags[RIEMANN_MAX_TAGS];
54 char *riemann_tags[RIEMANN_EXTRA_TAGS];
57 int riemann_send(struct riemann_host *, Msg *);
58 int riemann_notification(const notification_t *, user_data_t *);
59 int riemann_write(const data_set_t *, const value_list_t *, user_data_t *);
60 int riemann_connect(struct riemann_host *);
61 void riemann_free(void *);
62 int riemann_config_host(oconfig_item_t *);
63 int riemann_config(oconfig_item_t *);
64 void module_register(void);
67 riemann_send(struct riemann_host *host, Msg *msg)
72 len = msg__get_packed_size(msg);
73 DEBUG("riemann_write: packed size computed: %ld", len);
74 if ((buf = calloc(1, len)) == NULL) {
75 WARNING("riemann_write: failing to alloc buf!");
81 if (write(host->s, buf, len) != len) {
82 host->flags &= ~F_CONNECT;
83 WARNING("riemann_write: could not send out full packet");
92 riemann_notification(const notification_t *n, user_data_t *ud)
95 struct riemann_host *host = ud->data;
97 Event ev = EVENT__INIT;
99 const char *tags[RIEMANN_MAX_TAGS];
100 char service[DATA_MAX_NAME_LEN];
101 notification_meta_t *meta;
106 { NOTIF_OKAY, "ok" },
107 { NOTIF_WARNING, "warning" },
108 { NOTIF_FAILURE, "critical" },
116 ev.host = host->name;
117 ev.time = CDTIME_T_TO_TIME_T(n->time);
121 severities[i].code > 0 && severities[i].code != n->severity;
124 ev.state = severities[i].name;
127 ev.tags = (char **)tags;
129 tags[1] = "notification";
131 for (i = 0; i < riemann_tagcount; i++)
132 tags[ev.n_tags++] = riemann_tags[i];
134 ssnprintf(service, sizeof(service),
135 "%s-%s-%s-%s", n->plugin, n->plugin_instance,
136 n->type, n->type_instance);
137 ev.service = service;
138 ev.description = (char *)n->message;
141 * Pull in values from threshold
144 meta != NULL && strcasecmp(meta->name, "CurrentValue") != 0;
150 ev.metric_d = meta->nm_value.nm_double;
153 return riemann_send(host, &msg);
157 riemann_write(const data_set_t *ds,
158 const value_list_t *vl,
163 struct riemann_host *host = ud->data;
166 struct riemann_event *event_tab, *event;
168 if ((status = riemann_connect(host)) != 0)
171 msg.n_events = vl->values_len;
174 * Get rid of allocations up front
176 if ((msg.events = calloc(msg.n_events, sizeof(*msg.events))) == NULL ||
177 (event_tab = calloc(msg.n_events, sizeof(*event_tab))) == NULL) {
184 * Now produce valid protobuf structures
186 for (i = 0; i < vl->values_len; i++) {
187 event = &event_tab[i];
188 event__init(&event->ev);
192 ev->host = host->name;
194 ev->time = CDTIME_T_TO_TIME_T(vl->time);
196 ev->ttl = CDTIME_T_TO_TIME_T(vl->interval) + host->delay;
198 ev->tags = (char **)event->tags;
199 event->tags[0] = DS_TYPE_TO_STRING(ds->ds[i].type);
200 event->tags[1] = vl->plugin;
201 event->tags[2] = ds->ds[i].name;
202 if (vl->plugin_instance && strlen(vl->plugin_instance)) {
203 event->tags[ev->n_tags++] = vl->plugin_instance;
205 if (vl->type && strlen(vl->type)) {
206 event->tags[ev->n_tags++] = vl->type;
208 if (vl->type_instance && strlen(vl->type_instance)) {
209 event->tags[ev->n_tags++] = vl->type_instance;
212 /* add user defined extra tags */
213 for (j = 0; j < riemann_tagcount; j++)
214 event->tags[ev->n_tags++] = riemann_tags[j];
216 switch (ds->ds[i].type) {
217 case DS_TYPE_COUNTER:
218 ev->has_metric_sint64 = 1;
219 ev->metric_sint64 = vl->values[i].counter;
222 ev->has_metric_d = 1;
223 ev->metric_d = vl->values[i].gauge;
226 ev->has_metric_sint64 = 1;
227 ev->metric_sint64 = vl->values[i].derive;
229 case DS_TYPE_ABSOLUTE:
230 ev->has_metric_sint64 = 1;
231 ev->metric_sint64 = vl->values[i].absolute;
234 WARNING("riemann_write: unknown metric type: %d",
238 ssnprintf(event->service, sizeof(event->service),
239 "%s-%s-%s-%s-%s", vl->plugin, vl->plugin_instance,
240 vl->type, vl->type_instance, ds->ds[i].name);
241 ev->service = event->service;
242 DEBUG("riemann_write: %s ready to send", ev->service);
246 status = riemann_send(host, &msg);
252 riemann_connect(struct riemann_host *host)
255 struct addrinfo *ai, *res, hints;
258 if (host->flags & F_CONNECT)
261 memset(&hints, 0, sizeof(hints));
262 memset(&service, 0, sizeof(service));
263 hints.ai_family = PF_UNSPEC;
264 hints.ai_socktype = SOCK_DGRAM;
266 ssnprintf(service, sizeof(service), "%d", host->port);
268 if ((e = getaddrinfo(host->name, service, &hints, &res)) != 0) {
269 WARNING("could not resolve host \"%s\": %s",
270 host->name, gai_strerror(e));
274 for (ai = res; ai != NULL; ai = ai->ai_next) {
275 pthread_mutex_lock(&host->lock);
277 * check if another thread did not already succesfully connect
279 if (host->flags & F_CONNECT) {
284 if ((host->s = socket(ai->ai_family,
286 ai->ai_protocol)) == -1) {
287 pthread_mutex_unlock(&host->lock);
288 WARNING("riemann_connect: could not open socket");
293 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
295 host->flags |= ~F_CONNECT;
296 pthread_mutex_unlock(&host->lock);
300 host->flags |= F_CONNECT;
301 DEBUG("got a succesful connection for: %s", host->name);
302 pthread_mutex_unlock(&host->lock);
308 WARNING("riemann_connect: no suitable hosts found");
316 riemann_free(void *p)
318 struct riemann_host *host = p;
320 if (host->flags & F_CONNECT)
326 riemann_config_host(oconfig_item_t *ci)
328 struct riemann_host *host = NULL;
331 oconfig_item_t *child;
332 char w_cb_name[DATA_MAX_NAME_LEN];
333 char n_cb_name[DATA_MAX_NAME_LEN];
336 if (ci->values_num != 1 ||
337 ci->values[0].type != OCONFIG_TYPE_STRING) {
338 WARNING("riemann hosts need one string argument");
342 if ((host = calloc(1, sizeof (*host))) == NULL) {
343 WARNING("riemann host allocation failed");
347 if (cf_util_get_string_buffer(ci, host->name,
348 sizeof(host->name)) != 0) {
349 WARNING("riemann host name too long");
354 host->port = RIEMANN_PORT;
355 host->delay = RIEMANN_DELAY;
356 for (i = 0; i < ci->children_num; i++) {
358 * The code here could be simplified but makes room
359 * for easy adding of new options later on.
361 child = &ci->children[i];
364 if (strcasecmp(child->key, "port") == 0) {
365 if ((status = cf_util_get_port_number(child)) < 0) {
366 WARNING("invalid port number");
371 } else if (strcasecmp(child->key, "delay") == 0) {
372 if ((status = cf_util_get_int(ci, &host->delay)) != 0)
375 WARNING("riemann plugin: ignoring unknown config "
376 "option: \"%s\"", child->key);
384 pthread_mutex_init(&host->lock, NULL);
385 ssnprintf(w_cb_name, sizeof(w_cb_name), "write-riemann/%s:%d",
386 host->name, host->port);
387 ssnprintf(n_cb_name, sizeof(n_cb_name), "notification-riemann/%s:%d",
388 host->name, host->port);
389 DEBUG("riemann w_cb_name: %s", w_cb_name);
390 DEBUG("riemann n_cb_name: %s", n_cb_name);
392 ud.free_func = riemann_free;
394 if ((status = plugin_register_write(w_cb_name, riemann_write, &ud)) != 0)
397 if ((status = plugin_register_notification(n_cb_name,
398 riemann_notification,
400 plugin_unregister_write(w_cb_name);
407 riemann_config(oconfig_item_t *ci)
411 oconfig_item_t *child;
413 for (i = 0; i < ci->children_num; i++) {
414 child = &ci->children[i];
416 if (strcasecmp(child->key, "host") == 0) {
417 riemann_config_host(child);
418 } else if (strcasecmp(child->key, "tag") == 0) {
419 if (riemann_tagcount >= RIEMANN_EXTRA_TAGS) {
420 WARNING("riemann plugin: too many tags");
424 cf_util_get_string(child, &newtag);
427 riemann_tags[riemann_tagcount++] = newtag;
428 DEBUG("riemann_config: got tag: %s", newtag);
431 WARNING ("riemann plugin: Ignoring unknown "
432 "configuration option \"%s\" at top level.",
440 module_register(void)
442 DEBUG("riemann: module_register");
444 plugin_register_complex_config ("riemann", riemann_config);