Merge remote-tracking branch 'origin/pr/1346'
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include "collectd.h"
32
33 #include "common.h"
34 #include "configfile.h"
35 #include "plugin.h"
36 #include "utils_cache.h"
37 #include "utils_complain.h"
38 #include "write_riemann_threshold.h"
39
40 #include <errno.h>
41 #include <riemann/riemann-client.h>
42
43 #define RIEMANN_HOST "localhost"
44 #define RIEMANN_PORT 5555
45 #define RIEMANN_TTL_FACTOR 2.0
46 #define RIEMANN_BATCH_MAX 8192
47
48 struct riemann_host {
49   c_complain_t init_complaint;
50   char *name;
51   char *event_service_prefix;
52   pthread_mutex_t lock;
53   _Bool batch_mode;
54   _Bool notifications;
55   _Bool check_thresholds;
56   _Bool store_rates;
57   _Bool always_append_ds;
58   char *node;
59   int port;
60   riemann_client_type_t client_type;
61   riemann_client_t *client;
62   double ttl_factor;
63   cdtime_t batch_init;
64   int batch_max;
65   int batch_timeout;
66   int reference_count;
67   riemann_message_t *batch_msg;
68   char *tls_ca_file;
69   char *tls_cert_file;
70   char *tls_key_file;
71   struct timeval timeout;
72 };
73
74 static char **riemann_tags;
75 static size_t riemann_tags_num;
76 static char **riemann_attrs;
77 static size_t riemann_attrs_num;
78
79 /* host->lock must be held when calling this function. */
80 static int wrr_connect(struct riemann_host *host) /* {{{ */
81 {
82   char const *node;
83   int port;
84
85   if (host->client)
86     return 0;
87
88   node = (host->node != NULL) ? host->node : RIEMANN_HOST;
89   port = (host->port) ? host->port : RIEMANN_PORT;
90
91   host->client = NULL;
92
93   host->client = riemann_client_create(
94       host->client_type, node, port, RIEMANN_CLIENT_OPTION_TLS_CA_FILE,
95       host->tls_ca_file, RIEMANN_CLIENT_OPTION_TLS_CERT_FILE,
96       host->tls_cert_file, RIEMANN_CLIENT_OPTION_TLS_KEY_FILE,
97       host->tls_key_file, RIEMANN_CLIENT_OPTION_NONE);
98   if (host->client == NULL) {
99     c_complain(LOG_ERR, &host->init_complaint,
100                "write_riemann plugin: Unable to connect to Riemann at %s:%d",
101                node, port);
102     return -1;
103   }
104   if (host->timeout.tv_sec != 0) {
105     if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
106       riemann_client_free(host->client);
107       host->client = NULL;
108       c_complain(LOG_ERR, &host->init_complaint,
109                  "write_riemann plugin: Unable to connect to Riemann at %s:%d",
110                  node, port);
111       return -1;
112     }
113   }
114
115   c_release(LOG_INFO, &host->init_complaint,
116             "write_riemann plugin: Successfully connected to %s:%d", node,
117             port);
118
119   return 0;
120 } /* }}} int wrr_connect */
121
122 /* host->lock must be held when calling this function. */
123 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
124 {
125   if (!host->client)
126     return (0);
127
128   riemann_client_free(host->client);
129   host->client = NULL;
130
131   return (0);
132 } /* }}} int wrr_disconnect */
133
134 /**
135  * Function to send messages to riemann.
136  *
137  * Acquires the host lock, disconnects on errors.
138  */
139 static int wrr_send_nolock(struct riemann_host *host,
140                            riemann_message_t *msg) /* {{{ */
141 {
142   int status = 0;
143
144   status = wrr_connect(host);
145   if (status != 0) {
146     return status;
147   }
148
149   status = riemann_client_send_message(host->client, msg);
150   if (status != 0) {
151     wrr_disconnect(host);
152     return status;
153   }
154
155   /*
156    * For TCP we need to receive message acknowledgemenent.
157    */
158   if (host->client_type != RIEMANN_CLIENT_UDP) {
159     riemann_message_t *response;
160
161     response = riemann_client_recv_message(host->client);
162
163     if (response == NULL) {
164       wrr_disconnect(host);
165       return errno;
166     }
167     riemann_message_free(response);
168   }
169
170   return 0;
171 } /* }}} int wrr_send */
172
173 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) {
174   int status = 0;
175
176   pthread_mutex_lock(&host->lock);
177   status = wrr_send_nolock(host, msg);
178   pthread_mutex_unlock(&host->lock);
179   return status;
180 }
181
182 static riemann_message_t *
183 wrr_notification_to_message(struct riemann_host *host, /* {{{ */
184                             notification_t const *n) {
185   riemann_message_t *msg;
186   riemann_event_t *event;
187   char service_buffer[6 * DATA_MAX_NAME_LEN];
188   char const *severity;
189
190   switch (n->severity) {
191   case NOTIF_OKAY:
192     severity = "ok";
193     break;
194   case NOTIF_WARNING:
195     severity = "warning";
196     break;
197   case NOTIF_FAILURE:
198     severity = "critical";
199     break;
200   default:
201     severity = "unknown";
202   }
203
204   format_name(service_buffer, sizeof(service_buffer),
205               /* host = */ "", n->plugin, n->plugin_instance, n->type,
206               n->type_instance);
207
208   event = riemann_event_create(
209       RIEMANN_EVENT_FIELD_HOST, n->host, RIEMANN_EVENT_FIELD_TIME,
210       (int64_t)CDTIME_T_TO_TIME_T(n->time), RIEMANN_EVENT_FIELD_TAGS,
211       "notification", NULL, RIEMANN_EVENT_FIELD_STATE, severity,
212       RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
213       RIEMANN_EVENT_FIELD_NONE);
214
215   if (n->host[0] != 0)
216     riemann_event_string_attribute_add(event, "host", n->host);
217   if (n->plugin[0] != 0)
218     riemann_event_string_attribute_add(event, "plugin", n->plugin);
219   if (n->plugin_instance[0] != 0)
220     riemann_event_string_attribute_add(event, "plugin_instance",
221                                        n->plugin_instance);
222
223   if (n->type[0] != 0)
224     riemann_event_string_attribute_add(event, "type", n->type);
225   if (n->type_instance[0] != 0)
226     riemann_event_string_attribute_add(event, "type_instance",
227                                        n->type_instance);
228
229   for (size_t i = 0; i < riemann_attrs_num; i += 2)
230     riemann_event_string_attribute_add(event, riemann_attrs[i],
231                                        riemann_attrs[i + 1]);
232
233   for (size_t i = 0; i < riemann_tags_num; i++)
234     riemann_event_tag_add(event, riemann_tags[i]);
235
236   if (n->message[0] != 0)
237     riemann_event_string_attribute_add(event, "description", n->message);
238
239   /* Pull in values from threshold and add extra attributes */
240   for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
241     if (strcasecmp("CurrentValue", meta->name) == 0 &&
242         meta->type == NM_TYPE_DOUBLE) {
243       riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
244                         (double)meta->nm_value.nm_double,
245                         RIEMANN_EVENT_FIELD_NONE);
246       continue;
247     }
248
249     if (meta->type == NM_TYPE_STRING) {
250       riemann_event_string_attribute_add(event, meta->name,
251                                          meta->nm_value.nm_string);
252       continue;
253     }
254   }
255
256   msg = riemann_message_create_with_events(event, NULL);
257   if (msg == NULL) {
258     ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
259     riemann_event_free(event);
260     return (NULL);
261   }
262
263   DEBUG("write_riemann plugin: Successfully created message for notification: "
264         "host = \"%s\", service = \"%s\", state = \"%s\"",
265         event->host, event->service, event->state);
266   return (msg);
267 } /* }}} riemann_message_t *wrr_notification_to_message */
268
269 static riemann_event_t *
270 wrr_value_to_event(struct riemann_host const *host, /* {{{ */
271                    data_set_t const *ds, value_list_t const *vl, size_t index,
272                    gauge_t const *rates, int status) {
273   riemann_event_t *event;
274   char name_buffer[5 * DATA_MAX_NAME_LEN];
275   char service_buffer[6 * DATA_MAX_NAME_LEN];
276   size_t i;
277
278   event = riemann_event_new();
279   if (event == NULL) {
280     ERROR("write_riemann plugin: riemann_event_new() failed.");
281     return (NULL);
282   }
283
284   format_name(name_buffer, sizeof(name_buffer),
285               /* host = */ "", vl->plugin, vl->plugin_instance, vl->type,
286               vl->type_instance);
287   if (host->always_append_ds || (ds->ds_num > 1)) {
288     if (host->event_service_prefix == NULL)
289       ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
290                 &name_buffer[1], ds->ds[index].name);
291     else
292       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
293                 host->event_service_prefix, &name_buffer[1],
294                 ds->ds[index].name);
295   } else {
296     if (host->event_service_prefix == NULL)
297       sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
298     else
299       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
300                 host->event_service_prefix, &name_buffer[1]);
301   }
302
303   riemann_event_set(
304       event, RIEMANN_EVENT_FIELD_HOST, vl->host, RIEMANN_EVENT_FIELD_TIME,
305       (int64_t)CDTIME_T_TO_TIME_T(vl->time), RIEMANN_EVENT_FIELD_TTL,
306       (float)CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
307       RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, "plugin", vl->plugin, "type",
308       vl->type, "ds_name", ds->ds[index].name, NULL,
309       RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE);
310
311   if (host->check_thresholds) {
312     const char *state = NULL;
313
314     switch (status) {
315     case STATE_OKAY:
316       state = "ok";
317       break;
318     case STATE_ERROR:
319       state = "critical";
320       break;
321     case STATE_WARNING:
322       state = "warning";
323       break;
324     case STATE_MISSING:
325       state = "unknown";
326       break;
327     }
328     if (state)
329       riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
330                         RIEMANN_EVENT_FIELD_NONE);
331   }
332
333   if (vl->plugin_instance[0] != 0)
334     riemann_event_string_attribute_add(event, "plugin_instance",
335                                        vl->plugin_instance);
336   if (vl->type_instance[0] != 0)
337     riemann_event_string_attribute_add(event, "type_instance",
338                                        vl->type_instance);
339
340   if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
341     char ds_type[DATA_MAX_NAME_LEN];
342
343     ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
344               DS_TYPE_TO_STRING(ds->ds[index].type));
345     riemann_event_string_attribute_add(event, "ds_type", ds_type);
346   } else {
347     riemann_event_string_attribute_add(event, "ds_type",
348                                        DS_TYPE_TO_STRING(ds->ds[index].type));
349   }
350
351   {
352     char ds_index[DATA_MAX_NAME_LEN];
353
354     ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
355     riemann_event_string_attribute_add(event, "ds_index", ds_index);
356   }
357
358   for (i = 0; i < riemann_attrs_num; i += 2)
359     riemann_event_string_attribute_add(event, riemann_attrs[i],
360                                        riemann_attrs[i + 1]);
361
362   for (i = 0; i < riemann_tags_num; i++)
363     riemann_event_tag_add(event, riemann_tags[i]);
364
365   if (ds->ds[index].type == DS_TYPE_GAUGE) {
366     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
367                       (double)vl->values[index].gauge,
368                       RIEMANN_EVENT_FIELD_NONE);
369   } else if (rates != NULL) {
370     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, (double)rates[index],
371                       RIEMANN_EVENT_FIELD_NONE);
372   } else {
373     int64_t metric;
374
375     if (ds->ds[index].type == DS_TYPE_DERIVE)
376       metric = (int64_t)vl->values[index].derive;
377     else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
378       metric = (int64_t)vl->values[index].absolute;
379     else
380       metric = (int64_t)vl->values[index].counter;
381
382     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_S64, (int64_t)metric,
383                       RIEMANN_EVENT_FIELD_NONE);
384   }
385
386   DEBUG("write_riemann plugin: Successfully created message for metric: "
387         "host = \"%s\", service = \"%s\"",
388         event->host, event->service);
389   return (event);
390 } /* }}} riemann_event_t *wrr_value_to_event */
391
392 static riemann_message_t *
393 wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
394                           data_set_t const *ds, value_list_t const *vl,
395                           int *statuses) {
396   riemann_message_t *msg;
397   size_t i;
398   gauge_t *rates = NULL;
399
400   /* Initialize the Msg structure. */
401   msg = riemann_message_new();
402   if (msg == NULL) {
403     ERROR("write_riemann plugin: riemann_message_new failed.");
404     return (NULL);
405   }
406
407   if (host->store_rates) {
408     rates = uc_get_rate(ds, vl);
409     if (rates == NULL) {
410       ERROR("write_riemann plugin: uc_get_rate failed.");
411       riemann_message_free(msg);
412       return (NULL);
413     }
414   }
415
416   for (i = 0; i < vl->values_len; i++) {
417     riemann_event_t *event;
418
419     event = wrr_value_to_event(host, ds, vl, (int)i, rates, statuses[i]);
420     if (event == NULL) {
421       riemann_message_free(msg);
422       sfree(rates);
423       return (NULL);
424     }
425     riemann_message_append_events(msg, event, NULL);
426   }
427
428   sfree(rates);
429   return (msg);
430 } /* }}} riemann_message_t *wrr_value_list_to_message */
431
432 /*
433  * Always call while holding host->lock !
434  */
435 static int wrr_batch_flush_nolock(cdtime_t timeout, struct riemann_host *host) {
436   cdtime_t now;
437   int status = 0;
438
439   now = cdtime();
440   if (timeout > 0) {
441     if ((host->batch_init + timeout) > now) {
442       return status;
443     }
444   }
445   wrr_send_nolock(host, host->batch_msg);
446   riemann_message_free(host->batch_msg);
447
448   host->batch_init = now;
449   host->batch_msg = NULL;
450   return status;
451 }
452
453 static int wrr_batch_flush(cdtime_t timeout,
454                            const char *identifier __attribute__((unused)),
455                            user_data_t *user_data) {
456   struct riemann_host *host;
457   int status;
458
459   if (user_data == NULL)
460     return (-EINVAL);
461
462   host = user_data->data;
463   pthread_mutex_lock(&host->lock);
464   status = wrr_batch_flush_nolock(timeout, host);
465   if (status != 0)
466     c_complain(
467         LOG_ERR, &host->init_complaint,
468         "write_riemann plugin: riemann_client_send failed with status %i",
469         status);
470   else
471     c_release(LOG_DEBUG, &host->init_complaint,
472               "write_riemann plugin: batch sent.");
473
474   pthread_mutex_unlock(&host->lock);
475   return status;
476 }
477
478 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
479                                     data_set_t const *ds,
480                                     value_list_t const *vl, int *statuses) {
481   riemann_message_t *msg;
482   size_t len;
483   int ret;
484   cdtime_t timeout;
485
486   msg = wrr_value_list_to_message(host, ds, vl, statuses);
487   if (msg == NULL)
488     return -1;
489
490   pthread_mutex_lock(&host->lock);
491
492   if (host->batch_msg == NULL) {
493     host->batch_msg = msg;
494   } else {
495     int status;
496
497     status = riemann_message_append_events_n(host->batch_msg, msg->n_events,
498                                              msg->events);
499     msg->n_events = 0;
500     msg->events = NULL;
501
502     riemann_message_free(msg);
503
504     if (status != 0) {
505       pthread_mutex_unlock(&host->lock);
506       ERROR("write_riemann plugin: out of memory");
507       return -1;
508     }
509   }
510
511   len = riemann_message_get_packed_size(host->batch_msg);
512   ret = 0;
513   if ((host->batch_max < 0) || (((size_t)host->batch_max) <= len)) {
514     ret = wrr_batch_flush_nolock(0, host);
515   } else {
516     if (host->batch_timeout > 0) {
517       timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
518       ret = wrr_batch_flush_nolock(timeout, host);
519     }
520   }
521
522   pthread_mutex_unlock(&host->lock);
523   return ret;
524 } /* }}} riemann_message_t *wrr_batch_add_value_list */
525
526 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
527 {
528   int status;
529   struct riemann_host *host = ud->data;
530   riemann_message_t *msg;
531
532   if (!host->notifications)
533     return 0;
534
535   /*
536    * Never batch for notifications, send them ASAP
537    */
538   msg = wrr_notification_to_message(host, n);
539   if (msg == NULL)
540     return (-1);
541
542   status = wrr_send(host, msg);
543   if (status != 0)
544     c_complain(
545         LOG_ERR, &host->init_complaint,
546         "write_riemann plugin: riemann_client_send failed with status %i",
547         status);
548   else
549     c_release(LOG_DEBUG, &host->init_complaint,
550               "write_riemann plugin: riemann_client_send succeeded");
551
552   riemann_message_free(msg);
553   return (status);
554 } /* }}} int wrr_notification */
555
556 static int wrr_write(const data_set_t *ds, /* {{{ */
557                      const value_list_t *vl, user_data_t *ud) {
558   int status = 0;
559   int statuses[vl->values_len];
560   struct riemann_host *host = ud->data;
561   riemann_message_t *msg;
562
563   if (host->check_thresholds) {
564     status = write_riemann_threshold_check(ds, vl, statuses);
565     if (status != 0)
566       return status;
567   } else {
568     memset(statuses, 0, sizeof(statuses));
569   }
570
571   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
572     wrr_batch_add_value_list(host, ds, vl, statuses);
573   } else {
574     msg = wrr_value_list_to_message(host, ds, vl, statuses);
575     if (msg == NULL)
576       return (-1);
577
578     status = wrr_send(host, msg);
579
580     riemann_message_free(msg);
581   }
582   return status;
583 } /* }}} int wrr_write */
584
585 static void wrr_free(void *p) /* {{{ */
586 {
587   struct riemann_host *host = p;
588
589   if (host == NULL)
590     return;
591
592   pthread_mutex_lock(&host->lock);
593
594   host->reference_count--;
595   if (host->reference_count > 0) {
596     pthread_mutex_unlock(&host->lock);
597     return;
598   }
599
600   wrr_disconnect(host);
601
602   pthread_mutex_destroy(&host->lock);
603   sfree(host);
604 } /* }}} void wrr_free */
605
606 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
607 {
608   struct riemann_host *host = NULL;
609   int status = 0;
610   int i;
611   oconfig_item_t *child;
612   char callback_name[DATA_MAX_NAME_LEN];
613   user_data_t ud;
614
615   if ((host = calloc(1, sizeof(*host))) == NULL) {
616     ERROR("write_riemann plugin: calloc failed.");
617     return ENOMEM;
618   }
619   pthread_mutex_init(&host->lock, NULL);
620   C_COMPLAIN_INIT(&host->init_complaint);
621   host->reference_count = 1;
622   host->node = NULL;
623   host->port = 0;
624   host->notifications = 1;
625   host->check_thresholds = 0;
626   host->store_rates = 1;
627   host->always_append_ds = 0;
628   host->batch_mode = 1;
629   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
630   host->batch_init = cdtime();
631   host->batch_timeout = 0;
632   host->ttl_factor = RIEMANN_TTL_FACTOR;
633   host->client = NULL;
634   host->client_type = RIEMANN_CLIENT_TCP;
635   host->timeout.tv_sec = 0;
636   host->timeout.tv_usec = 0;
637
638   status = cf_util_get_string(ci, &host->name);
639   if (status != 0) {
640     WARNING("write_riemann plugin: Required host name is missing.");
641     wrr_free(host);
642     return -1;
643   }
644
645   for (i = 0; i < ci->children_num; i++) {
646     /*
647      * The code here could be simplified but makes room
648      * for easy adding of new options later on.
649      */
650     child = &ci->children[i];
651     status = 0;
652
653     if (strcasecmp("Host", child->key) == 0) {
654       status = cf_util_get_string(child, &host->node);
655       if (status != 0)
656         break;
657     } else if (strcasecmp("Notifications", child->key) == 0) {
658       status = cf_util_get_boolean(child, &host->notifications);
659       if (status != 0)
660         break;
661     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
662       status = cf_util_get_string(child, &host->event_service_prefix);
663       if (status != 0)
664         break;
665     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
666       status = cf_util_get_boolean(child, &host->check_thresholds);
667       if (status != 0)
668         break;
669     } else if (strcasecmp("Batch", child->key) == 0) {
670       status = cf_util_get_boolean(child, &host->batch_mode);
671       if (status != 0)
672         break;
673     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
674       status = cf_util_get_int(child, &host->batch_max);
675       if (status != 0)
676         break;
677     } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
678       status = cf_util_get_int(child, &host->batch_timeout);
679       if (status != 0)
680         break;
681     } else if (strcasecmp("Timeout", child->key) == 0) {
682       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
683       if (status != 0)
684         break;
685     } else if (strcasecmp("Port", child->key) == 0) {
686       host->port = cf_util_get_port_number(child);
687       if (host->port == -1) {
688         ERROR("write_riemann plugin: Invalid argument "
689               "configured for the \"Port\" "
690               "option.");
691         break;
692       }
693     } else if (strcasecmp("Protocol", child->key) == 0) {
694       char tmp[16];
695       status = cf_util_get_string_buffer(child, tmp, sizeof(tmp));
696       if (status != 0) {
697         ERROR("write_riemann plugin: cf_util_get_"
698               "string_buffer failed with "
699               "status %i.",
700               status);
701         break;
702       }
703
704       if (strcasecmp("UDP", tmp) == 0)
705         host->client_type = RIEMANN_CLIENT_UDP;
706       else if (strcasecmp("TCP", tmp) == 0)
707         host->client_type = RIEMANN_CLIENT_TCP;
708       else if (strcasecmp("TLS", tmp) == 0)
709         host->client_type = RIEMANN_CLIENT_TLS;
710       else
711         WARNING("write_riemann plugin: The value "
712                 "\"%s\" is not valid for the "
713                 "\"Protocol\" option. Use "
714                 "either \"UDP\", \"TCP\" or \"TLS\".",
715                 tmp);
716     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
717       status = cf_util_get_string(child, &host->tls_ca_file);
718       if (status != 0) {
719         ERROR("write_riemann plugin: cf_util_get_"
720               "string_buffer failed with "
721               "status %i.",
722               status);
723         break;
724       }
725     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
726       status = cf_util_get_string(child, &host->tls_cert_file);
727       if (status != 0) {
728         ERROR("write_riemann plugin: cf_util_get_"
729               "string_buffer failed with "
730               "status %i.",
731               status);
732         break;
733       }
734     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
735       status = cf_util_get_string(child, &host->tls_key_file);
736       if (status != 0) {
737         ERROR("write_riemann plugin: cf_util_get_"
738               "string_buffer failed with "
739               "status %i.",
740               status);
741         break;
742       }
743     } else if (strcasecmp("StoreRates", child->key) == 0) {
744       status = cf_util_get_boolean(child, &host->store_rates);
745       if (status != 0)
746         break;
747     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
748       status = cf_util_get_boolean(child, &host->always_append_ds);
749       if (status != 0)
750         break;
751     } else if (strcasecmp("TTLFactor", child->key) == 0) {
752       double tmp = NAN;
753       status = cf_util_get_double(child, &tmp);
754       if (status != 0)
755         break;
756       if (tmp >= 2.0) {
757         host->ttl_factor = tmp;
758       } else if (tmp >= 1.0) {
759         NOTICE("write_riemann plugin: The configured "
760                "TTLFactor is very small "
761                "(%.1f). A value of 2.0 or "
762                "greater is recommended.",
763                tmp);
764         host->ttl_factor = tmp;
765       } else if (tmp > 0.0) {
766         WARNING("write_riemann plugin: The configured "
767                 "TTLFactor is too small to be "
768                 "useful (%.1f). I'll use it "
769                 "since the user knows best, "
770                 "but under protest.",
771                 tmp);
772         host->ttl_factor = tmp;
773       } else { /* zero, negative and NAN */
774         ERROR("write_riemann plugin: The configured "
775               "TTLFactor is invalid (%.1f).",
776               tmp);
777       }
778     } else {
779       WARNING("write_riemann plugin: ignoring unknown config "
780               "option: \"%s\"",
781               child->key);
782     }
783   }
784   if (status != 0) {
785     wrr_free(host);
786     return status;
787   }
788
789   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
790             host->name);
791   ud.data = host;
792   ud.free_func = wrr_free;
793
794   pthread_mutex_lock(&host->lock);
795
796   status = plugin_register_write(callback_name, wrr_write, &ud);
797
798   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
799     ud.free_func = NULL;
800     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
801   }
802   if (status != 0)
803     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
804             "failed with status %i.",
805             callback_name, status);
806   else /* success */
807     host->reference_count++;
808
809   status = plugin_register_notification(callback_name, wrr_notification, &ud);
810   if (status != 0)
811     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
812             "failed with status %i.",
813             callback_name, status);
814   else /* success */
815     host->reference_count++;
816
817   if (host->reference_count <= 1) {
818     /* Both callbacks failed => free memory.
819      * We need to unlock here, because riemann_free() will lock.
820      * This is not a race condition, because we're the only one
821      * holding a reference. */
822     pthread_mutex_unlock(&host->lock);
823     wrr_free(host);
824     return (-1);
825   }
826
827   host->reference_count--;
828   pthread_mutex_unlock(&host->lock);
829
830   return status;
831 } /* }}} int wrr_config_node */
832
833 static int wrr_config(oconfig_item_t *ci) /* {{{ */
834 {
835   int i;
836   oconfig_item_t *child;
837   int status;
838
839   for (i = 0; i < ci->children_num; i++) {
840     child = &ci->children[i];
841
842     if (strcasecmp("Node", child->key) == 0) {
843       wrr_config_node(child);
844     } else if (strcasecmp(child->key, "attribute") == 0) {
845       char *key = NULL;
846       char *val = NULL;
847
848       if (child->values_num != 2) {
849         WARNING("riemann attributes need both a key and a value.");
850         return (-1);
851       }
852       if (child->values[0].type != OCONFIG_TYPE_STRING ||
853           child->values[1].type != OCONFIG_TYPE_STRING) {
854         WARNING("riemann attribute needs string arguments.");
855         return (-1);
856       }
857       if ((key = strdup(child->values[0].value.string)) == NULL) {
858         WARNING("cannot allocate memory for attribute key.");
859         return (-1);
860       }
861       if ((val = strdup(child->values[1].value.string)) == NULL) {
862         WARNING("cannot allocate memory for attribute value.");
863         sfree(key);
864         return (-1);
865       }
866       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
867       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
868       DEBUG("write_riemann: got attr: %s => %s", key, val);
869       sfree(key);
870       sfree(val);
871     } else if (strcasecmp(child->key, "tag") == 0) {
872       char *tmp = NULL;
873       status = cf_util_get_string(child, &tmp);
874       if (status != 0)
875         continue;
876
877       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
878       DEBUG("write_riemann plugin: Got tag: %s", tmp);
879       sfree(tmp);
880     } else {
881       WARNING("write_riemann plugin: Ignoring unknown "
882               "configuration option \"%s\" at top level.",
883               child->key);
884     }
885   }
886   return (0);
887 } /* }}} int wrr_config */
888
889 void module_register(void) {
890   plugin_register_complex_config("write_riemann", wrr_config);
891 }
892
893 /* vim: set sw=8 sts=8 ts=8 noet : */