Merge branch 'master' into feat_mcelog_notification_bugfixes
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include "collectd.h"
32
33 #include "common.h"
34 #include "plugin.h"
35 #include "utils_cache.h"
36 #include "utils_complain.h"
37 #include "write_riemann_threshold.h"
38
39 #include <riemann/riemann-client.h>
40
41 #define RIEMANN_HOST "localhost"
42 #define RIEMANN_PORT 5555
43 #define RIEMANN_TTL_FACTOR 2.0
44 #define RIEMANN_BATCH_MAX 8192
45
46 struct riemann_host {
47   c_complain_t init_complaint;
48   char *name;
49   char *event_service_prefix;
50   pthread_mutex_t lock;
51   _Bool batch_mode;
52   _Bool notifications;
53   _Bool check_thresholds;
54   _Bool store_rates;
55   _Bool always_append_ds;
56   char *node;
57   int port;
58   riemann_client_type_t client_type;
59   riemann_client_t *client;
60   double ttl_factor;
61   cdtime_t batch_init;
62   int batch_max;
63   int batch_timeout;
64   int reference_count;
65   riemann_message_t *batch_msg;
66   char *tls_ca_file;
67   char *tls_cert_file;
68   char *tls_key_file;
69   struct timeval timeout;
70 };
71
72 static char **riemann_tags;
73 static size_t riemann_tags_num;
74 static char **riemann_attrs;
75 static size_t riemann_attrs_num;
76
77 /* host->lock must be held when calling this function. */
78 static int wrr_connect(struct riemann_host *host) /* {{{ */
79 {
80   char const *node;
81   int port;
82
83   if (host->client)
84     return 0;
85
86   node = (host->node != NULL) ? host->node : RIEMANN_HOST;
87   port = (host->port) ? host->port : RIEMANN_PORT;
88
89   host->client = NULL;
90
91   host->client = riemann_client_create(
92       host->client_type, node, port, RIEMANN_CLIENT_OPTION_TLS_CA_FILE,
93       host->tls_ca_file, RIEMANN_CLIENT_OPTION_TLS_CERT_FILE,
94       host->tls_cert_file, RIEMANN_CLIENT_OPTION_TLS_KEY_FILE,
95       host->tls_key_file, RIEMANN_CLIENT_OPTION_NONE);
96   if (host->client == NULL) {
97     c_complain(LOG_ERR, &host->init_complaint,
98                "write_riemann plugin: Unable to connect to Riemann at %s:%d",
99                node, port);
100     return -1;
101   }
102 #if RCC_VERSION_NUMBER >= 0x010800
103   if (host->timeout.tv_sec != 0) {
104     if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
105       riemann_client_free(host->client);
106       host->client = NULL;
107       c_complain(LOG_ERR, &host->init_complaint,
108                  "write_riemann plugin: Unable to connect to Riemann at %s:%d",
109                  node, port);
110       return -1;
111     }
112   }
113 #endif
114
115   set_sock_opts(riemann_client_get_fd(host->client));
116
117   c_release(LOG_INFO, &host->init_complaint,
118             "write_riemann plugin: Successfully connected to %s:%d", node,
119             port);
120
121   return 0;
122 } /* }}} int wrr_connect */
123
124 /* host->lock must be held when calling this function. */
125 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
126 {
127   if (!host->client)
128     return 0;
129
130   riemann_client_free(host->client);
131   host->client = NULL;
132
133   return 0;
134 } /* }}} int wrr_disconnect */
135
136 /**
137  * Function to send messages to riemann.
138  *
139  * Acquires the host lock, disconnects on errors.
140  */
141 static int wrr_send_nolock(struct riemann_host *host,
142                            riemann_message_t *msg) /* {{{ */
143 {
144   int status = 0;
145
146   status = wrr_connect(host);
147   if (status != 0) {
148     return status;
149   }
150
151   status = riemann_client_send_message(host->client, msg);
152   if (status != 0) {
153     wrr_disconnect(host);
154     return status;
155   }
156
157   /*
158    * For TCP we need to receive message acknowledgemenent.
159    */
160   if (host->client_type != RIEMANN_CLIENT_UDP) {
161     riemann_message_t *response;
162
163     response = riemann_client_recv_message(host->client);
164
165     if (response == NULL) {
166       wrr_disconnect(host);
167       return errno;
168     }
169     riemann_message_free(response);
170   }
171
172   return 0;
173 } /* }}} int wrr_send */
174
175 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) {
176   int status = 0;
177
178   pthread_mutex_lock(&host->lock);
179   status = wrr_send_nolock(host, msg);
180   pthread_mutex_unlock(&host->lock);
181   return status;
182 }
183
184 static riemann_message_t *
185 wrr_notification_to_message(struct riemann_host *host, /* {{{ */
186                             notification_t const *n) {
187   riemann_message_t *msg;
188   riemann_event_t *event;
189   char service_buffer[6 * DATA_MAX_NAME_LEN];
190   char const *severity;
191
192   switch (n->severity) {
193   case NOTIF_OKAY:
194     severity = "ok";
195     break;
196   case NOTIF_WARNING:
197     severity = "warning";
198     break;
199   case NOTIF_FAILURE:
200     severity = "critical";
201     break;
202   default:
203     severity = "unknown";
204   }
205
206   format_name(service_buffer, sizeof(service_buffer),
207               /* host = */ "", n->plugin, n->plugin_instance, n->type,
208               n->type_instance);
209
210   event = riemann_event_create(
211       RIEMANN_EVENT_FIELD_HOST, n->host, RIEMANN_EVENT_FIELD_TIME,
212       (int64_t)CDTIME_T_TO_TIME_T(n->time), RIEMANN_EVENT_FIELD_TAGS,
213       "notification", NULL, RIEMANN_EVENT_FIELD_STATE, severity,
214       RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
215       RIEMANN_EVENT_FIELD_NONE);
216
217 #if RCC_VERSION_NUMBER >= 0x010A00
218   riemann_event_set(event, RIEMANN_EVENT_FIELD_TIME_MICROS,
219                     (int64_t)CDTIME_T_TO_US(n->time));
220 #endif
221
222   if (n->host[0] != 0)
223     riemann_event_string_attribute_add(event, "host", n->host);
224   if (n->plugin[0] != 0)
225     riemann_event_string_attribute_add(event, "plugin", n->plugin);
226   if (n->plugin_instance[0] != 0)
227     riemann_event_string_attribute_add(event, "plugin_instance",
228                                        n->plugin_instance);
229
230   if (n->type[0] != 0)
231     riemann_event_string_attribute_add(event, "type", n->type);
232   if (n->type_instance[0] != 0)
233     riemann_event_string_attribute_add(event, "type_instance",
234                                        n->type_instance);
235
236   for (size_t i = 0; i < riemann_attrs_num; i += 2)
237     riemann_event_string_attribute_add(event, riemann_attrs[i],
238                                        riemann_attrs[i + 1]);
239
240   for (size_t i = 0; i < riemann_tags_num; i++)
241     riemann_event_tag_add(event, riemann_tags[i]);
242
243   if (n->message[0] != 0)
244     riemann_event_string_attribute_add(event, "description", n->message);
245
246   /* Pull in values from threshold and add extra attributes */
247   for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
248     if (strcasecmp("CurrentValue", meta->name) == 0 &&
249         meta->type == NM_TYPE_DOUBLE) {
250       riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
251                         (double)meta->nm_value.nm_double,
252                         RIEMANN_EVENT_FIELD_NONE);
253       continue;
254     }
255
256     if (meta->type == NM_TYPE_STRING) {
257       riemann_event_string_attribute_add(event, meta->name,
258                                          meta->nm_value.nm_string);
259       continue;
260     }
261   }
262
263   msg = riemann_message_create_with_events(event, NULL);
264   if (msg == NULL) {
265     ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
266     riemann_event_free(event);
267     return NULL;
268   }
269
270   DEBUG("write_riemann plugin: Successfully created message for notification: "
271         "host = \"%s\", service = \"%s\", state = \"%s\"",
272         event->host, event->service, event->state);
273   return msg;
274 } /* }}} riemann_message_t *wrr_notification_to_message */
275
276 static riemann_event_t *
277 wrr_value_to_event(struct riemann_host const *host, /* {{{ */
278                    data_set_t const *ds, value_list_t const *vl, size_t index,
279                    gauge_t const *rates, int status) {
280   riemann_event_t *event;
281   char name_buffer[5 * DATA_MAX_NAME_LEN];
282   char service_buffer[6 * DATA_MAX_NAME_LEN];
283   size_t i;
284
285   event = riemann_event_new();
286   if (event == NULL) {
287     ERROR("write_riemann plugin: riemann_event_new() failed.");
288     return NULL;
289   }
290
291   format_name(name_buffer, sizeof(name_buffer),
292               /* host = */ "", vl->plugin, vl->plugin_instance, vl->type,
293               vl->type_instance);
294   if (host->always_append_ds || (ds->ds_num > 1)) {
295     if (host->event_service_prefix == NULL)
296       ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
297                 &name_buffer[1], ds->ds[index].name);
298     else
299       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
300                 host->event_service_prefix, &name_buffer[1],
301                 ds->ds[index].name);
302   } else {
303     if (host->event_service_prefix == NULL)
304       sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
305     else
306       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
307                 host->event_service_prefix, &name_buffer[1]);
308   }
309
310   riemann_event_set(
311       event, RIEMANN_EVENT_FIELD_HOST, vl->host, RIEMANN_EVENT_FIELD_TIME,
312       (int64_t)CDTIME_T_TO_TIME_T(vl->time), RIEMANN_EVENT_FIELD_TTL,
313       (float)CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
314       RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, "plugin", vl->plugin, "type",
315       vl->type, "ds_name", ds->ds[index].name, NULL,
316       RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE);
317
318 #if RCC_VERSION_NUMBER >= 0x010A00
319   riemann_event_set(event, RIEMANN_EVENT_FIELD_TIME_MICROS,
320                     (int64_t)CDTIME_T_TO_US(vl->time));
321 #endif
322
323   if (host->check_thresholds) {
324     const char *state = NULL;
325
326     switch (status) {
327     case STATE_OKAY:
328       state = "ok";
329       break;
330     case STATE_ERROR:
331       state = "critical";
332       break;
333     case STATE_WARNING:
334       state = "warning";
335       break;
336     case STATE_MISSING:
337       state = "unknown";
338       break;
339     }
340     if (state)
341       riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
342                         RIEMANN_EVENT_FIELD_NONE);
343   }
344
345   if (vl->plugin_instance[0] != 0)
346     riemann_event_string_attribute_add(event, "plugin_instance",
347                                        vl->plugin_instance);
348   if (vl->type_instance[0] != 0)
349     riemann_event_string_attribute_add(event, "type_instance",
350                                        vl->type_instance);
351
352   if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
353     char ds_type[DATA_MAX_NAME_LEN];
354
355     ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
356               DS_TYPE_TO_STRING(ds->ds[index].type));
357     riemann_event_string_attribute_add(event, "ds_type", ds_type);
358   } else {
359     riemann_event_string_attribute_add(event, "ds_type",
360                                        DS_TYPE_TO_STRING(ds->ds[index].type));
361   }
362
363   {
364     char ds_index[DATA_MAX_NAME_LEN];
365
366     ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
367     riemann_event_string_attribute_add(event, "ds_index", ds_index);
368   }
369
370   for (i = 0; i < riemann_attrs_num; i += 2)
371     riemann_event_string_attribute_add(event, riemann_attrs[i],
372                                        riemann_attrs[i + 1]);
373
374   for (i = 0; i < riemann_tags_num; i++)
375     riemann_event_tag_add(event, riemann_tags[i]);
376
377   if (ds->ds[index].type == DS_TYPE_GAUGE) {
378     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
379                       (double)vl->values[index].gauge,
380                       RIEMANN_EVENT_FIELD_NONE);
381   } else if (rates != NULL) {
382     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, (double)rates[index],
383                       RIEMANN_EVENT_FIELD_NONE);
384   } else {
385     int64_t metric;
386
387     if (ds->ds[index].type == DS_TYPE_DERIVE)
388       metric = (int64_t)vl->values[index].derive;
389     else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
390       metric = (int64_t)vl->values[index].absolute;
391     else
392       metric = (int64_t)vl->values[index].counter;
393
394     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_S64, (int64_t)metric,
395                       RIEMANN_EVENT_FIELD_NONE);
396   }
397
398   DEBUG("write_riemann plugin: Successfully created message for metric: "
399         "host = \"%s\", service = \"%s\"",
400         event->host, event->service);
401   return event;
402 } /* }}} riemann_event_t *wrr_value_to_event */
403
404 static riemann_message_t *
405 wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
406                           data_set_t const *ds, value_list_t const *vl,
407                           int *statuses) {
408   riemann_message_t *msg;
409   size_t i;
410   gauge_t *rates = NULL;
411
412   /* Initialize the Msg structure. */
413   msg = riemann_message_new();
414   if (msg == NULL) {
415     ERROR("write_riemann plugin: riemann_message_new failed.");
416     return NULL;
417   }
418
419   if (host->store_rates) {
420     rates = uc_get_rate(ds, vl);
421     if (rates == NULL) {
422       ERROR("write_riemann plugin: uc_get_rate failed.");
423       riemann_message_free(msg);
424       return NULL;
425     }
426   }
427
428   for (i = 0; i < vl->values_len; i++) {
429     riemann_event_t *event;
430
431     event = wrr_value_to_event(host, ds, vl, (int)i, rates, statuses[i]);
432     if (event == NULL) {
433       riemann_message_free(msg);
434       sfree(rates);
435       return NULL;
436     }
437     riemann_message_append_events(msg, event, NULL);
438   }
439
440   sfree(rates);
441   return msg;
442 } /* }}} riemann_message_t *wrr_value_list_to_message */
443
444 /*
445  * Always call while holding host->lock !
446  */
447 static int wrr_batch_flush_nolock(cdtime_t timeout, struct riemann_host *host) {
448   cdtime_t now;
449   int status = 0;
450
451   now = cdtime();
452   if (timeout > 0) {
453     if ((host->batch_init + timeout) > now) {
454       return status;
455     }
456   }
457   wrr_send_nolock(host, host->batch_msg);
458   riemann_message_free(host->batch_msg);
459
460   host->batch_init = now;
461   host->batch_msg = NULL;
462   return status;
463 }
464
465 static int wrr_batch_flush(cdtime_t timeout,
466                            const char *identifier __attribute__((unused)),
467                            user_data_t *user_data) {
468   struct riemann_host *host;
469   int status;
470
471   if (user_data == NULL)
472     return -EINVAL;
473
474   host = user_data->data;
475   pthread_mutex_lock(&host->lock);
476   status = wrr_batch_flush_nolock(timeout, host);
477   if (status != 0)
478     c_complain(
479         LOG_ERR, &host->init_complaint,
480         "write_riemann plugin: riemann_client_send failed with status %i",
481         status);
482   else
483     c_release(LOG_DEBUG, &host->init_complaint,
484               "write_riemann plugin: batch sent.");
485
486   pthread_mutex_unlock(&host->lock);
487   return status;
488 }
489
490 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
491                                     data_set_t const *ds,
492                                     value_list_t const *vl, int *statuses) {
493   riemann_message_t *msg;
494   size_t len;
495   int ret;
496   cdtime_t timeout;
497
498   msg = wrr_value_list_to_message(host, ds, vl, statuses);
499   if (msg == NULL)
500     return -1;
501
502   pthread_mutex_lock(&host->lock);
503
504   if (host->batch_msg == NULL) {
505     host->batch_msg = msg;
506   } else {
507     int status;
508
509     status = riemann_message_append_events_n(host->batch_msg, msg->n_events,
510                                              msg->events);
511     msg->n_events = 0;
512     msg->events = NULL;
513
514     riemann_message_free(msg);
515
516     if (status != 0) {
517       pthread_mutex_unlock(&host->lock);
518       ERROR("write_riemann plugin: out of memory");
519       return -1;
520     }
521   }
522
523   len = riemann_message_get_packed_size(host->batch_msg);
524   ret = 0;
525   if ((host->batch_max < 0) || (((size_t)host->batch_max) <= len)) {
526     ret = wrr_batch_flush_nolock(0, host);
527   } else {
528     if (host->batch_timeout > 0) {
529       timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
530       ret = wrr_batch_flush_nolock(timeout, host);
531     }
532   }
533
534   pthread_mutex_unlock(&host->lock);
535   return ret;
536 } /* }}} riemann_message_t *wrr_batch_add_value_list */
537
538 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
539 {
540   int status;
541   struct riemann_host *host = ud->data;
542   riemann_message_t *msg;
543
544   if (!host->notifications)
545     return 0;
546
547   /*
548    * Never batch for notifications, send them ASAP
549    */
550   msg = wrr_notification_to_message(host, n);
551   if (msg == NULL)
552     return -1;
553
554   status = wrr_send(host, msg);
555   if (status != 0)
556     c_complain(
557         LOG_ERR, &host->init_complaint,
558         "write_riemann plugin: riemann_client_send failed with status %i",
559         status);
560   else
561     c_release(LOG_DEBUG, &host->init_complaint,
562               "write_riemann plugin: riemann_client_send succeeded");
563
564   riemann_message_free(msg);
565   return status;
566 } /* }}} int wrr_notification */
567
568 static int wrr_write(const data_set_t *ds, /* {{{ */
569                      const value_list_t *vl, user_data_t *ud) {
570   int status = 0;
571   int statuses[vl->values_len];
572   struct riemann_host *host = ud->data;
573   riemann_message_t *msg;
574
575   if (host->check_thresholds) {
576     status = write_riemann_threshold_check(ds, vl, statuses);
577     if (status != 0)
578       return status;
579   } else {
580     memset(statuses, 0, sizeof(statuses));
581   }
582
583   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
584     wrr_batch_add_value_list(host, ds, vl, statuses);
585   } else {
586     msg = wrr_value_list_to_message(host, ds, vl, statuses);
587     if (msg == NULL)
588       return -1;
589
590     status = wrr_send(host, msg);
591
592     riemann_message_free(msg);
593   }
594   return status;
595 } /* }}} int wrr_write */
596
597 static void wrr_free(void *p) /* {{{ */
598 {
599   struct riemann_host *host = p;
600
601   if (host == NULL)
602     return;
603
604   pthread_mutex_lock(&host->lock);
605
606   host->reference_count--;
607   if (host->reference_count > 0) {
608     pthread_mutex_unlock(&host->lock);
609     return;
610   }
611
612   wrr_disconnect(host);
613
614   pthread_mutex_destroy(&host->lock);
615   sfree(host);
616 } /* }}} void wrr_free */
617
618 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
619 {
620   struct riemann_host *host = NULL;
621   int status = 0;
622   int i;
623   oconfig_item_t *child;
624   char callback_name[DATA_MAX_NAME_LEN];
625
626   if ((host = calloc(1, sizeof(*host))) == NULL) {
627     ERROR("write_riemann plugin: calloc failed.");
628     return ENOMEM;
629   }
630   pthread_mutex_init(&host->lock, NULL);
631   C_COMPLAIN_INIT(&host->init_complaint);
632   host->reference_count = 1;
633   host->node = NULL;
634   host->port = 0;
635   host->notifications = 1;
636   host->check_thresholds = 0;
637   host->store_rates = 1;
638   host->always_append_ds = 0;
639   host->batch_mode = 1;
640   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
641   host->batch_init = cdtime();
642   host->batch_timeout = 0;
643   host->ttl_factor = RIEMANN_TTL_FACTOR;
644   host->client = NULL;
645   host->client_type = RIEMANN_CLIENT_TCP;
646   host->timeout.tv_sec = 0;
647   host->timeout.tv_usec = 0;
648
649   status = cf_util_get_string(ci, &host->name);
650   if (status != 0) {
651     WARNING("write_riemann plugin: Required host name is missing.");
652     wrr_free(host);
653     return -1;
654   }
655
656   for (i = 0; i < ci->children_num; i++) {
657     /*
658      * The code here could be simplified but makes room
659      * for easy adding of new options later on.
660      */
661     child = &ci->children[i];
662     status = 0;
663
664     if (strcasecmp("Host", child->key) == 0) {
665       status = cf_util_get_string(child, &host->node);
666       if (status != 0)
667         break;
668     } else if (strcasecmp("Notifications", child->key) == 0) {
669       status = cf_util_get_boolean(child, &host->notifications);
670       if (status != 0)
671         break;
672     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
673       status = cf_util_get_string(child, &host->event_service_prefix);
674       if (status != 0)
675         break;
676     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
677       status = cf_util_get_boolean(child, &host->check_thresholds);
678       if (status != 0)
679         break;
680     } else if (strcasecmp("Batch", child->key) == 0) {
681       status = cf_util_get_boolean(child, &host->batch_mode);
682       if (status != 0)
683         break;
684     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
685       status = cf_util_get_int(child, &host->batch_max);
686       if (status != 0)
687         break;
688     } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
689       status = cf_util_get_int(child, &host->batch_timeout);
690       if (status != 0)
691         break;
692     } else if (strcasecmp("Timeout", child->key) == 0) {
693 #if RCC_VERSION_NUMBER >= 0x010800
694       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
695       if (status != 0)
696         break;
697 #else
698       WARNING("write_riemann plugin: The Timeout option is not supported. "
699               "Please upgrade the Riemann client to at least 1.8.0.");
700 #endif
701     } else if (strcasecmp("Port", child->key) == 0) {
702       host->port = cf_util_get_port_number(child);
703       if (host->port == -1) {
704         ERROR("write_riemann plugin: Invalid argument "
705               "configured for the \"Port\" "
706               "option.");
707         break;
708       }
709     } else if (strcasecmp("Protocol", child->key) == 0) {
710       char tmp[16];
711       status = cf_util_get_string_buffer(child, tmp, sizeof(tmp));
712       if (status != 0) {
713         ERROR("write_riemann plugin: cf_util_get_"
714               "string_buffer failed with "
715               "status %i.",
716               status);
717         break;
718       }
719
720       if (strcasecmp("UDP", tmp) == 0)
721         host->client_type = RIEMANN_CLIENT_UDP;
722       else if (strcasecmp("TCP", tmp) == 0)
723         host->client_type = RIEMANN_CLIENT_TCP;
724       else if (strcasecmp("TLS", tmp) == 0)
725         host->client_type = RIEMANN_CLIENT_TLS;
726       else
727         WARNING("write_riemann plugin: The value "
728                 "\"%s\" is not valid for the "
729                 "\"Protocol\" option. Use "
730                 "either \"UDP\", \"TCP\" or \"TLS\".",
731                 tmp);
732     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
733       status = cf_util_get_string(child, &host->tls_ca_file);
734       if (status != 0) {
735         ERROR("write_riemann plugin: cf_util_get_"
736               "string_buffer failed with "
737               "status %i.",
738               status);
739         break;
740       }
741     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
742       status = cf_util_get_string(child, &host->tls_cert_file);
743       if (status != 0) {
744         ERROR("write_riemann plugin: cf_util_get_"
745               "string_buffer failed with "
746               "status %i.",
747               status);
748         break;
749       }
750     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
751       status = cf_util_get_string(child, &host->tls_key_file);
752       if (status != 0) {
753         ERROR("write_riemann plugin: cf_util_get_"
754               "string_buffer failed with "
755               "status %i.",
756               status);
757         break;
758       }
759     } else if (strcasecmp("StoreRates", child->key) == 0) {
760       status = cf_util_get_boolean(child, &host->store_rates);
761       if (status != 0)
762         break;
763     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
764       status = cf_util_get_boolean(child, &host->always_append_ds);
765       if (status != 0)
766         break;
767     } else if (strcasecmp("TTLFactor", child->key) == 0) {
768       double tmp = NAN;
769       status = cf_util_get_double(child, &tmp);
770       if (status != 0)
771         break;
772       if (tmp >= 2.0) {
773         host->ttl_factor = tmp;
774       } else if (tmp >= 1.0) {
775         NOTICE("write_riemann plugin: The configured "
776                "TTLFactor is very small "
777                "(%.1f). A value of 2.0 or "
778                "greater is recommended.",
779                tmp);
780         host->ttl_factor = tmp;
781       } else if (tmp > 0.0) {
782         WARNING("write_riemann plugin: The configured "
783                 "TTLFactor is too small to be "
784                 "useful (%.1f). I'll use it "
785                 "since the user knows best, "
786                 "but under protest.",
787                 tmp);
788         host->ttl_factor = tmp;
789       } else { /* zero, negative and NAN */
790         ERROR("write_riemann plugin: The configured "
791               "TTLFactor is invalid (%.1f).",
792               tmp);
793       }
794     } else {
795       WARNING("write_riemann plugin: ignoring unknown config "
796               "option: \"%s\"",
797               child->key);
798     }
799   }
800   if (status != 0) {
801     wrr_free(host);
802     return status;
803   }
804
805   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
806             host->name);
807
808   user_data_t ud = {.data = host, .free_func = wrr_free};
809
810   pthread_mutex_lock(&host->lock);
811
812   status = plugin_register_write(callback_name, wrr_write, &ud);
813
814   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
815     ud.free_func = NULL;
816     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
817   }
818   if (status != 0)
819     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
820             "failed with status %i.",
821             callback_name, status);
822   else /* success */
823     host->reference_count++;
824
825   status = plugin_register_notification(callback_name, wrr_notification, &ud);
826   if (status != 0)
827     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
828             "failed with status %i.",
829             callback_name, status);
830   else /* success */
831     host->reference_count++;
832
833   if (host->reference_count <= 1) {
834     /* Both callbacks failed => free memory.
835      * We need to unlock here, because riemann_free() will lock.
836      * This is not a race condition, because we're the only one
837      * holding a reference. */
838     pthread_mutex_unlock(&host->lock);
839     wrr_free(host);
840     return -1;
841   }
842
843   host->reference_count--;
844   pthread_mutex_unlock(&host->lock);
845
846   return status;
847 } /* }}} int wrr_config_node */
848
849 static int wrr_config(oconfig_item_t *ci) /* {{{ */
850 {
851   int i;
852   oconfig_item_t *child;
853   int status;
854
855   for (i = 0; i < ci->children_num; i++) {
856     child = &ci->children[i];
857
858     if (strcasecmp("Node", child->key) == 0) {
859       wrr_config_node(child);
860     } else if (strcasecmp(child->key, "attribute") == 0) {
861       char *key = NULL;
862       char *val = NULL;
863
864       if (child->values_num != 2) {
865         WARNING("riemann attributes need both a key and a value.");
866         return -1;
867       }
868       if (child->values[0].type != OCONFIG_TYPE_STRING ||
869           child->values[1].type != OCONFIG_TYPE_STRING) {
870         WARNING("riemann attribute needs string arguments.");
871         return -1;
872       }
873       if ((key = strdup(child->values[0].value.string)) == NULL) {
874         WARNING("cannot allocate memory for attribute key.");
875         return -1;
876       }
877       if ((val = strdup(child->values[1].value.string)) == NULL) {
878         WARNING("cannot allocate memory for attribute value.");
879         sfree(key);
880         return -1;
881       }
882       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
883       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
884       DEBUG("write_riemann: got attr: %s => %s", key, val);
885       sfree(key);
886       sfree(val);
887     } else if (strcasecmp(child->key, "tag") == 0) {
888       char *tmp = NULL;
889       status = cf_util_get_string(child, &tmp);
890       if (status != 0)
891         continue;
892
893       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
894       DEBUG("write_riemann plugin: Got tag: %s", tmp);
895       sfree(tmp);
896     } else {
897       WARNING("write_riemann plugin: Ignoring unknown "
898               "configuration option \"%s\" at top level.",
899               child->key);
900     }
901   }
902   return 0;
903 } /* }}} int wrr_config */
904
905 void module_register(void) {
906   plugin_register_complex_config("write_riemann", wrr_config);
907 }