Merge pull request #3329 from efuss/fix-3311
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include "collectd.h"
32
33 #include "plugin.h"
34 #include "utils/common/common.h"
35 #include "utils_cache.h"
36 #include "utils_complain.h"
37 #include "write_riemann_threshold.h"
38
39 #include <riemann/riemann-client.h>
40
41 #define RIEMANN_HOST "localhost"
42 #define RIEMANN_PORT 5555
43 #define RIEMANN_TTL_FACTOR 2.0
44 #define RIEMANN_BATCH_MAX 8192
45
46 struct riemann_host {
47   c_complain_t init_complaint;
48   char *name;
49   char *event_service_prefix;
50   pthread_mutex_t lock;
51   bool batch_mode;
52   bool notifications;
53   bool check_thresholds;
54   bool store_rates;
55   bool always_append_ds;
56   char *node;
57   int port;
58   riemann_client_type_t client_type;
59   riemann_client_t *client;
60   double ttl_factor;
61   cdtime_t batch_init;
62   int batch_max;
63   int batch_timeout;
64   int reference_count;
65   riemann_message_t *batch_msg;
66   char *tls_ca_file;
67   char *tls_cert_file;
68   char *tls_key_file;
69   struct timeval timeout;
70 };
71
72 static char **riemann_tags;
73 static size_t riemann_tags_num;
74 static char **riemann_attrs;
75 static size_t riemann_attrs_num;
76
77 /* host->lock must be held when calling this function. */
78 static int wrr_connect(struct riemann_host *host) /* {{{ */
79 {
80   char const *node;
81   int port;
82
83   if (host->client)
84     return 0;
85
86   node = (host->node != NULL) ? host->node : RIEMANN_HOST;
87   port = (host->port) ? host->port : RIEMANN_PORT;
88
89   host->client = NULL;
90
91   host->client = riemann_client_create(
92       host->client_type, node, port, RIEMANN_CLIENT_OPTION_TLS_CA_FILE,
93       host->tls_ca_file, RIEMANN_CLIENT_OPTION_TLS_CERT_FILE,
94       host->tls_cert_file, RIEMANN_CLIENT_OPTION_TLS_KEY_FILE,
95       host->tls_key_file, RIEMANN_CLIENT_OPTION_NONE);
96   if (host->client == NULL) {
97     c_complain(LOG_ERR, &host->init_complaint,
98                "write_riemann plugin: Unable to connect to Riemann at %s:%d",
99                node, port);
100     return -1;
101   }
102 #if RCC_VERSION_NUMBER >= 0x010800
103   if (host->timeout.tv_sec != 0) {
104     if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
105       riemann_client_free(host->client);
106       host->client = NULL;
107       c_complain(LOG_ERR, &host->init_complaint,
108                  "write_riemann plugin: Unable to connect to Riemann at %s:%d",
109                  node, port);
110       return -1;
111     }
112   }
113 #endif
114
115   set_sock_opts(riemann_client_get_fd(host->client));
116
117   c_release(LOG_INFO, &host->init_complaint,
118             "write_riemann plugin: Successfully connected to %s:%d", node,
119             port);
120
121   return 0;
122 } /* }}} int wrr_connect */
123
124 /* host->lock must be held when calling this function. */
125 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
126 {
127   if (!host->client)
128     return 0;
129
130   riemann_client_free(host->client);
131   host->client = NULL;
132
133   return 0;
134 } /* }}} int wrr_disconnect */
135
136 /**
137  * Function to send messages to riemann.
138  *
139  * Acquires the host lock, disconnects on errors.
140  */
141 static int wrr_send_nolock(struct riemann_host *host,
142                            riemann_message_t *msg) /* {{{ */
143 {
144   int status = 0;
145
146   status = wrr_connect(host);
147   if (status != 0) {
148     return status;
149   }
150
151   status = riemann_client_send_message(host->client, msg);
152   if (status != 0) {
153     wrr_disconnect(host);
154     return status;
155   }
156
157   /*
158    * For TCP we need to receive message acknowledgemenent.
159    */
160   if (host->client_type != RIEMANN_CLIENT_UDP) {
161     riemann_message_t *response;
162
163     response = riemann_client_recv_message(host->client);
164
165     if (response == NULL) {
166       wrr_disconnect(host);
167       return errno;
168     }
169     riemann_message_free(response);
170   }
171
172   return 0;
173 } /* }}} int wrr_send */
174
175 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) {
176   int status = 0;
177
178   pthread_mutex_lock(&host->lock);
179   status = wrr_send_nolock(host, msg);
180   pthread_mutex_unlock(&host->lock);
181   return status;
182 }
183
184 static riemann_message_t *wrr_notification_to_message(notification_t const *n) {
185   riemann_message_t *msg;
186   riemann_event_t *event;
187   char service_buffer[6 * DATA_MAX_NAME_LEN];
188   char const *severity;
189
190   switch (n->severity) {
191   case NOTIF_OKAY:
192     severity = "ok";
193     break;
194   case NOTIF_WARNING:
195     severity = "warning";
196     break;
197   case NOTIF_FAILURE:
198     severity = "critical";
199     break;
200   default:
201     severity = "unknown";
202   }
203
204   format_name(service_buffer, sizeof(service_buffer),
205               /* host = */ "", n->plugin, n->plugin_instance, n->type,
206               n->type_instance);
207
208   event = riemann_event_create(
209       RIEMANN_EVENT_FIELD_HOST, n->host, RIEMANN_EVENT_FIELD_TIME,
210       (int64_t)CDTIME_T_TO_TIME_T(n->time), RIEMANN_EVENT_FIELD_TAGS,
211       "notification", NULL, RIEMANN_EVENT_FIELD_STATE, severity,
212       RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
213       RIEMANN_EVENT_FIELD_NONE);
214
215 #if RCC_VERSION_NUMBER >= 0x010A00
216   riemann_event_set(event, RIEMANN_EVENT_FIELD_TIME_MICROS,
217                     (int64_t)CDTIME_T_TO_US(n->time));
218 #endif
219
220   if (n->host[0] != 0)
221     riemann_event_string_attribute_add(event, "host", n->host);
222   if (n->plugin[0] != 0)
223     riemann_event_string_attribute_add(event, "plugin", n->plugin);
224   if (n->plugin_instance[0] != 0)
225     riemann_event_string_attribute_add(event, "plugin_instance",
226                                        n->plugin_instance);
227
228   if (n->type[0] != 0)
229     riemann_event_string_attribute_add(event, "type", n->type);
230   if (n->type_instance[0] != 0)
231     riemann_event_string_attribute_add(event, "type_instance",
232                                        n->type_instance);
233
234   for (size_t i = 0; i < riemann_attrs_num; i += 2)
235     riemann_event_string_attribute_add(event, riemann_attrs[i],
236                                        riemann_attrs[i + 1]);
237
238   for (size_t i = 0; i < riemann_tags_num; i++)
239     riemann_event_tag_add(event, riemann_tags[i]);
240
241   if (n->message[0] != 0)
242     riemann_event_string_attribute_add(event, "description", n->message);
243
244   /* Pull in values from threshold and add extra attributes */
245   for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
246     if (strcasecmp("CurrentValue", meta->name) == 0 &&
247         meta->type == NM_TYPE_DOUBLE) {
248       riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
249                         (double)meta->nm_value.nm_double,
250                         RIEMANN_EVENT_FIELD_NONE);
251       continue;
252     }
253
254     if (meta->type == NM_TYPE_STRING) {
255       riemann_event_string_attribute_add(event, meta->name,
256                                          meta->nm_value.nm_string);
257       continue;
258     }
259   }
260
261   msg = riemann_message_create_with_events(event, NULL);
262   if (msg == NULL) {
263     ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
264     riemann_event_free(event);
265     return NULL;
266   }
267
268   DEBUG("write_riemann plugin: Successfully created message for notification: "
269         "host = \"%s\", service = \"%s\", state = \"%s\"",
270         event->host, event->service, event->state);
271   return msg;
272 }
273
274 static riemann_event_t *
275 wrr_value_to_event(struct riemann_host const *host, /* {{{ */
276                    data_set_t const *ds, value_list_t const *vl, size_t index,
277                    gauge_t const *rates, int status) {
278   riemann_event_t *event;
279   char name_buffer[5 * DATA_MAX_NAME_LEN];
280   char service_buffer[6 * DATA_MAX_NAME_LEN];
281   size_t i;
282
283   event = riemann_event_new();
284   if (event == NULL) {
285     ERROR("write_riemann plugin: riemann_event_new() failed.");
286     return NULL;
287   }
288
289   format_name(name_buffer, sizeof(name_buffer),
290               /* host = */ "", vl->plugin, vl->plugin_instance, vl->type,
291               vl->type_instance);
292   if (host->always_append_ds || (ds->ds_num > 1)) {
293     if (host->event_service_prefix == NULL)
294       ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
295                 &name_buffer[1], ds->ds[index].name);
296     else
297       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
298                 host->event_service_prefix, &name_buffer[1],
299                 ds->ds[index].name);
300   } else {
301     if (host->event_service_prefix == NULL)
302       sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
303     else
304       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
305                 host->event_service_prefix, &name_buffer[1]);
306   }
307
308   riemann_event_set(
309       event, RIEMANN_EVENT_FIELD_HOST, vl->host, RIEMANN_EVENT_FIELD_TIME,
310       (int64_t)CDTIME_T_TO_TIME_T(vl->time), RIEMANN_EVENT_FIELD_TTL,
311       (float)CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
312       RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, "plugin", vl->plugin, "type",
313       vl->type, "ds_name", ds->ds[index].name, NULL,
314       RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE);
315
316 #if RCC_VERSION_NUMBER >= 0x010A00
317   riemann_event_set(event, RIEMANN_EVENT_FIELD_TIME_MICROS,
318                     (int64_t)CDTIME_T_TO_US(vl->time));
319 #endif
320
321   if (host->check_thresholds) {
322     const char *state = NULL;
323
324     switch (status) {
325     case STATE_OKAY:
326       state = "ok";
327       break;
328     case STATE_ERROR:
329       state = "critical";
330       break;
331     case STATE_WARNING:
332       state = "warning";
333       break;
334     case STATE_MISSING:
335       state = "unknown";
336       break;
337     }
338     if (state)
339       riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
340                         RIEMANN_EVENT_FIELD_NONE);
341   }
342
343   if (vl->plugin_instance[0] != 0)
344     riemann_event_string_attribute_add(event, "plugin_instance",
345                                        vl->plugin_instance);
346   if (vl->type_instance[0] != 0)
347     riemann_event_string_attribute_add(event, "type_instance",
348                                        vl->type_instance);
349
350   if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
351     char ds_type[DATA_MAX_NAME_LEN];
352
353     ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
354               DS_TYPE_TO_STRING(ds->ds[index].type));
355     riemann_event_string_attribute_add(event, "ds_type", ds_type);
356   } else {
357     riemann_event_string_attribute_add(event, "ds_type",
358                                        DS_TYPE_TO_STRING(ds->ds[index].type));
359   }
360
361   {
362     char ds_index[DATA_MAX_NAME_LEN];
363
364     ssnprintf(ds_index, sizeof(ds_index), "%" PRIsz, index);
365     riemann_event_string_attribute_add(event, "ds_index", ds_index);
366   }
367
368   for (i = 0; i < riemann_attrs_num; i += 2)
369     riemann_event_string_attribute_add(event, riemann_attrs[i],
370                                        riemann_attrs[i + 1]);
371
372   for (i = 0; i < riemann_tags_num; i++)
373     riemann_event_tag_add(event, riemann_tags[i]);
374
375   if (ds->ds[index].type == DS_TYPE_GAUGE) {
376     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
377                       (double)vl->values[index].gauge,
378                       RIEMANN_EVENT_FIELD_NONE);
379   } else if (rates != NULL) {
380     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, (double)rates[index],
381                       RIEMANN_EVENT_FIELD_NONE);
382   } else {
383     int64_t metric;
384
385     if (ds->ds[index].type == DS_TYPE_DERIVE)
386       metric = (int64_t)vl->values[index].derive;
387     else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
388       metric = (int64_t)vl->values[index].absolute;
389     else
390       metric = (int64_t)vl->values[index].counter;
391
392     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_S64, (int64_t)metric,
393                       RIEMANN_EVENT_FIELD_NONE);
394   }
395
396   if (vl->meta) {
397     char **toc;
398     int n = meta_data_toc(vl->meta, &toc);
399
400     for (int i = 0; i < n; i++) {
401       char *key = toc[i];
402       char *value;
403
404       if (0 == meta_data_as_string(vl->meta, key, &value)) {
405         riemann_event_string_attribute_add(event, key, value);
406         free(value);
407       }
408     }
409
410     free(toc);
411   }
412
413   DEBUG("write_riemann plugin: Successfully created message for metric: "
414         "host = \"%s\", service = \"%s\"",
415         event->host, event->service);
416   return event;
417 } /* }}} riemann_event_t *wrr_value_to_event */
418
419 static riemann_message_t *
420 wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
421                           data_set_t const *ds, value_list_t const *vl,
422                           int *statuses) {
423   riemann_message_t *msg;
424   size_t i;
425   gauge_t *rates = NULL;
426
427   /* Initialize the Msg structure. */
428   msg = riemann_message_new();
429   if (msg == NULL) {
430     ERROR("write_riemann plugin: riemann_message_new failed.");
431     return NULL;
432   }
433
434   if (host->store_rates) {
435     rates = uc_get_rate(ds, vl);
436     if (rates == NULL) {
437       ERROR("write_riemann plugin: uc_get_rate failed.");
438       riemann_message_free(msg);
439       return NULL;
440     }
441   }
442
443   for (i = 0; i < vl->values_len; i++) {
444     riemann_event_t *event;
445
446     event = wrr_value_to_event(host, ds, vl, (int)i, rates, statuses[i]);
447     if (event == NULL) {
448       riemann_message_free(msg);
449       sfree(rates);
450       return NULL;
451     }
452     riemann_message_append_events(msg, event, NULL);
453   }
454
455   sfree(rates);
456   return msg;
457 } /* }}} riemann_message_t *wrr_value_list_to_message */
458
459 /*
460  * Always call while holding host->lock !
461  */
462 static int wrr_batch_flush_nolock(cdtime_t timeout, struct riemann_host *host) {
463   cdtime_t now;
464   int status = 0;
465
466   now = cdtime();
467   if (timeout > 0) {
468     if ((host->batch_init + timeout) > now) {
469       return status;
470     }
471   }
472   wrr_send_nolock(host, host->batch_msg);
473   riemann_message_free(host->batch_msg);
474
475   host->batch_init = now;
476   host->batch_msg = NULL;
477   return status;
478 }
479
480 static int wrr_batch_flush(cdtime_t timeout,
481                            const char *identifier __attribute__((unused)),
482                            user_data_t *user_data) {
483   struct riemann_host *host;
484   int status;
485
486   if (user_data == NULL)
487     return -EINVAL;
488
489   host = user_data->data;
490   pthread_mutex_lock(&host->lock);
491   status = wrr_batch_flush_nolock(timeout, host);
492   if (status != 0)
493     c_complain(
494         LOG_ERR, &host->init_complaint,
495         "write_riemann plugin: riemann_client_send failed with status %i",
496         status);
497   else
498     c_release(LOG_DEBUG, &host->init_complaint,
499               "write_riemann plugin: batch sent.");
500
501   pthread_mutex_unlock(&host->lock);
502   return status;
503 }
504
505 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
506                                     data_set_t const *ds,
507                                     value_list_t const *vl, int *statuses) {
508   riemann_message_t *msg;
509   size_t len;
510   int ret;
511   cdtime_t timeout;
512
513   msg = wrr_value_list_to_message(host, ds, vl, statuses);
514   if (msg == NULL)
515     return -1;
516
517   pthread_mutex_lock(&host->lock);
518
519   if (host->batch_msg == NULL) {
520     host->batch_msg = msg;
521   } else {
522     int status;
523
524     status = riemann_message_append_events_n(host->batch_msg, msg->n_events,
525                                              msg->events);
526     msg->n_events = 0;
527     msg->events = NULL;
528
529     riemann_message_free(msg);
530
531     if (status != 0) {
532       pthread_mutex_unlock(&host->lock);
533       ERROR("write_riemann plugin: out of memory");
534       return -1;
535     }
536   }
537
538   len = riemann_message_get_packed_size(host->batch_msg);
539   ret = 0;
540   if ((host->batch_max < 0) || (((size_t)host->batch_max) <= len)) {
541     ret = wrr_batch_flush_nolock(0, host);
542   } else {
543     if (host->batch_timeout > 0) {
544       timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
545       ret = wrr_batch_flush_nolock(timeout, host);
546     }
547   }
548
549   pthread_mutex_unlock(&host->lock);
550   return ret;
551 } /* }}} riemann_message_t *wrr_batch_add_value_list */
552
553 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
554 {
555   int status;
556   struct riemann_host *host = ud->data;
557   riemann_message_t *msg;
558
559   if (!host->notifications)
560     return 0;
561
562   /*
563    * Never batch for notifications, send them ASAP
564    */
565   msg = wrr_notification_to_message(n);
566   if (msg == NULL)
567     return -1;
568
569   status = wrr_send(host, msg);
570   if (status != 0)
571     c_complain(
572         LOG_ERR, &host->init_complaint,
573         "write_riemann plugin: riemann_client_send failed with status %i",
574         status);
575   else
576     c_release(LOG_DEBUG, &host->init_complaint,
577               "write_riemann plugin: riemann_client_send succeeded");
578
579   riemann_message_free(msg);
580   return status;
581 } /* }}} int wrr_notification */
582
583 static int wrr_write(const data_set_t *ds, /* {{{ */
584                      const value_list_t *vl, user_data_t *ud) {
585   int status = 0;
586   int statuses[vl->values_len];
587   struct riemann_host *host = ud->data;
588   riemann_message_t *msg;
589
590   if (host->check_thresholds) {
591     status = write_riemann_threshold_check(ds, vl, statuses);
592     if (status != 0)
593       return status;
594   } else {
595     memset(statuses, 0, sizeof(statuses));
596   }
597
598   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
599     wrr_batch_add_value_list(host, ds, vl, statuses);
600   } else {
601     msg = wrr_value_list_to_message(host, ds, vl, statuses);
602     if (msg == NULL)
603       return -1;
604
605     status = wrr_send(host, msg);
606
607     riemann_message_free(msg);
608   }
609   return status;
610 } /* }}} int wrr_write */
611
612 static void wrr_free(void *p) /* {{{ */
613 {
614   struct riemann_host *host = p;
615
616   if (host == NULL)
617     return;
618
619   pthread_mutex_lock(&host->lock);
620
621   host->reference_count--;
622   if (host->reference_count > 0) {
623     pthread_mutex_unlock(&host->lock);
624     return;
625   }
626
627   wrr_disconnect(host);
628
629   pthread_mutex_lock(&host->lock);
630   pthread_mutex_destroy(&host->lock);
631   sfree(host);
632 } /* }}} void wrr_free */
633
634 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
635 {
636   struct riemann_host *host = NULL;
637   int status = 0;
638   int i;
639   oconfig_item_t *child;
640   char callback_name[DATA_MAX_NAME_LEN];
641
642   if ((host = calloc(1, sizeof(*host))) == NULL) {
643     ERROR("write_riemann plugin: calloc failed.");
644     return ENOMEM;
645   }
646   pthread_mutex_init(&host->lock, NULL);
647   C_COMPLAIN_INIT(&host->init_complaint);
648   host->reference_count = 1;
649   host->node = NULL;
650   host->port = 0;
651   host->notifications = true;
652   host->check_thresholds = false;
653   host->store_rates = true;
654   host->always_append_ds = false;
655   host->batch_mode = true;
656   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
657   host->batch_init = cdtime();
658   host->batch_timeout = 0;
659   host->ttl_factor = RIEMANN_TTL_FACTOR;
660   host->client = NULL;
661   host->client_type = RIEMANN_CLIENT_TCP;
662   host->timeout.tv_sec = 0;
663   host->timeout.tv_usec = 0;
664
665   status = cf_util_get_string(ci, &host->name);
666   if (status != 0) {
667     WARNING("write_riemann plugin: Required host name is missing.");
668     wrr_free(host);
669     return -1;
670   }
671
672   for (i = 0; i < ci->children_num; i++) {
673     /*
674      * The code here could be simplified but makes room
675      * for easy adding of new options later on.
676      */
677     child = &ci->children[i];
678     status = 0;
679
680     if (strcasecmp("Host", child->key) == 0) {
681       status = cf_util_get_string(child, &host->node);
682       if (status != 0)
683         break;
684     } else if (strcasecmp("Notifications", child->key) == 0) {
685       status = cf_util_get_boolean(child, &host->notifications);
686       if (status != 0)
687         break;
688     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
689       status = cf_util_get_string(child, &host->event_service_prefix);
690       if (status != 0)
691         break;
692     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
693       status = cf_util_get_boolean(child, &host->check_thresholds);
694       if (status != 0)
695         break;
696     } else if (strcasecmp("Batch", child->key) == 0) {
697       status = cf_util_get_boolean(child, &host->batch_mode);
698       if (status != 0)
699         break;
700     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
701       status = cf_util_get_int(child, &host->batch_max);
702       if (status != 0)
703         break;
704     } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
705       status = cf_util_get_int(child, &host->batch_timeout);
706       if (status != 0)
707         break;
708     } else if (strcasecmp("Timeout", child->key) == 0) {
709 #if RCC_VERSION_NUMBER >= 0x010800
710       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
711       if (status != 0)
712         break;
713 #else
714       WARNING("write_riemann plugin: The Timeout option is not supported. "
715               "Please upgrade the Riemann client to at least 1.8.0.");
716 #endif
717     } else if (strcasecmp("Port", child->key) == 0) {
718       host->port = cf_util_get_port_number(child);
719       if (host->port == -1) {
720         break;
721       }
722     } else if (strcasecmp("Protocol", child->key) == 0) {
723       char tmp[16];
724       status = cf_util_get_string_buffer(child, tmp, sizeof(tmp));
725       if (status != 0)
726         break;
727
728       if (strcasecmp("UDP", tmp) == 0)
729         host->client_type = RIEMANN_CLIENT_UDP;
730       else if (strcasecmp("TCP", tmp) == 0)
731         host->client_type = RIEMANN_CLIENT_TCP;
732       else if (strcasecmp("TLS", tmp) == 0)
733         host->client_type = RIEMANN_CLIENT_TLS;
734       else
735         WARNING("write_riemann plugin: The value "
736                 "\"%s\" is not valid for the "
737                 "\"Protocol\" option. Use "
738                 "either \"UDP\", \"TCP\" or \"TLS\".",
739                 tmp);
740     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
741       status = cf_util_get_string(child, &host->tls_ca_file);
742       if (status != 0)
743         break;
744     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
745       status = cf_util_get_string(child, &host->tls_cert_file);
746       if (status != 0)
747         break;
748     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
749       status = cf_util_get_string(child, &host->tls_key_file);
750       if (status != 0)
751         break;
752     } else if (strcasecmp("StoreRates", child->key) == 0) {
753       status = cf_util_get_boolean(child, &host->store_rates);
754       if (status != 0)
755         break;
756     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
757       status = cf_util_get_boolean(child, &host->always_append_ds);
758       if (status != 0)
759         break;
760     } else if (strcasecmp("TTLFactor", child->key) == 0) {
761       double tmp = NAN;
762       status = cf_util_get_double(child, &tmp);
763       if (status != 0)
764         break;
765       if (tmp >= 2.0) {
766         host->ttl_factor = tmp;
767       } else if (tmp >= 1.0) {
768         NOTICE("write_riemann plugin: The configured "
769                "TTLFactor is very small "
770                "(%.1f). A value of 2.0 or "
771                "greater is recommended.",
772                tmp);
773         host->ttl_factor = tmp;
774       } else if (tmp > 0.0) {
775         WARNING("write_riemann plugin: The configured "
776                 "TTLFactor is too small to be "
777                 "useful (%.1f). I'll use it "
778                 "since the user knows best, "
779                 "but under protest.",
780                 tmp);
781         host->ttl_factor = tmp;
782       } else { /* zero, negative and NAN */
783         ERROR("write_riemann plugin: The configured "
784               "TTLFactor is invalid (%.1f).",
785               tmp);
786       }
787     } else {
788       WARNING("write_riemann plugin: ignoring unknown config "
789               "option: \"%s\"",
790               child->key);
791     }
792   }
793   if (status != 0) {
794     wrr_free(host);
795     return status;
796   }
797
798   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
799             host->name);
800
801   user_data_t ud = {.data = host, .free_func = wrr_free};
802
803   pthread_mutex_lock(&host->lock);
804
805   status = plugin_register_write(callback_name, wrr_write, &ud);
806
807   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
808     ud.free_func = NULL;
809     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
810   }
811   if (status != 0)
812     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
813             "failed with status %i.",
814             callback_name, status);
815   else /* success */
816     host->reference_count++;
817
818   status = plugin_register_notification(callback_name, wrr_notification, &ud);
819   if (status != 0)
820     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
821             "failed with status %i.",
822             callback_name, status);
823   else /* success */
824     host->reference_count++;
825
826   if (host->reference_count <= 1) {
827     /* Both callbacks failed => free memory.
828      * We need to unlock here, because riemann_free() will lock.
829      * This is not a race condition, because we're the only one
830      * holding a reference. */
831     pthread_mutex_unlock(&host->lock);
832     wrr_free(host);
833     return -1;
834   }
835
836   host->reference_count--;
837   pthread_mutex_unlock(&host->lock);
838
839   return status;
840 } /* }}} int wrr_config_node */
841
842 static int wrr_config(oconfig_item_t *ci) /* {{{ */
843 {
844   int i;
845   oconfig_item_t *child;
846   int status;
847
848   for (i = 0; i < ci->children_num; i++) {
849     child = &ci->children[i];
850
851     if (strcasecmp("Node", child->key) == 0) {
852       wrr_config_node(child);
853     } else if (strcasecmp(child->key, "attribute") == 0) {
854       char *key = NULL;
855       char *val = NULL;
856
857       if (child->values_num != 2) {
858         WARNING("riemann attributes need both a key and a value.");
859         return -1;
860       }
861       if (child->values[0].type != OCONFIG_TYPE_STRING ||
862           child->values[1].type != OCONFIG_TYPE_STRING) {
863         WARNING("riemann attribute needs string arguments.");
864         return -1;
865       }
866       if ((key = strdup(child->values[0].value.string)) == NULL) {
867         WARNING("cannot allocate memory for attribute key.");
868         return -1;
869       }
870       if ((val = strdup(child->values[1].value.string)) == NULL) {
871         WARNING("cannot allocate memory for attribute value.");
872         sfree(key);
873         return -1;
874       }
875       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
876       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
877       DEBUG("write_riemann: got attr: %s => %s", key, val);
878       sfree(key);
879       sfree(val);
880     } else if (strcasecmp(child->key, "tag") == 0) {
881       char *tmp = NULL;
882       status = cf_util_get_string(child, &tmp);
883       if (status != 0)
884         continue;
885
886       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
887       DEBUG("write_riemann plugin: Got tag: %s", tmp);
888       sfree(tmp);
889     } else {
890       WARNING("write_riemann plugin: Ignoring unknown "
891               "configuration option \"%s\" at top level.",
892               child->key);
893     }
894   }
895   return 0;
896 } /* }}} int wrr_config */
897
898 void module_register(void) {
899   plugin_register_complex_config("write_riemann", wrr_config);
900 }