63e89186cb3eb45a7a741304d0745851b0886317
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include <riemann/riemann-client.h>
32 #include <errno.h>
33 #include <pthread.h>
34
35 #include "collectd.h"
36 #include "plugin.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "utils_cache.h"
40 #include "write_riemann_threshold.h"
41
42 #define RIEMANN_HOST            "localhost"
43 #define RIEMANN_PORT            5555
44 #define RIEMANN_TTL_FACTOR      2.0
45 #define RIEMANN_BATCH_MAX      8192
46
47 struct riemann_host {
48         char                    *name;
49         char                    *event_service_prefix;
50         pthread_mutex_t  lock;
51     _Bool            batch_mode;
52         _Bool            notifications;
53         _Bool            check_thresholds;
54         _Bool                    store_rates;
55         _Bool                    always_append_ds;
56         char                    *node;
57         int                      port;
58         riemann_client_type_t    client_type;
59         riemann_client_t        *client;
60         double                   ttl_factor;
61     cdtime_t         batch_init;
62     int              batch_max;
63         int                          reference_count;
64   riemann_message_t     *batch_msg;
65         char                     *tls_ca_file;
66         char                     *tls_cert_file;
67         char                     *tls_key_file;
68         struct timeval timeout;
69 };
70
71 static char     **riemann_tags;
72 static size_t     riemann_tags_num;
73 static char     **riemann_attrs;
74 static size_t     riemann_attrs_num;
75
76 /* host->lock must be held when calling this function. */
77 static int wrr_connect(struct riemann_host *host) /* {{{ */
78 {
79         char const              *node;
80         int                      port;
81
82         if (host->client)
83                 return 0;
84
85         node = (host->node != NULL) ? host->node : RIEMANN_HOST;
86         port = (host->port) ? host->port : RIEMANN_PORT;
87
88         host->client = NULL;
89
90         host->client = riemann_client_create(host->client_type, node, port,
91                                              RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
92                                              RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
93                                              RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
94                                              RIEMANN_CLIENT_OPTION_NONE);
95         if (host->client == NULL) {
96                 WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d",
97                         node, port);
98                 return -1;
99         }
100         if (host->timeout.tv_sec != 0) {
101                 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
102                         riemann_client_free(host->client);
103                         host->client = NULL;
104                         WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d",
105                                 node, port);
106                         return -1;
107                 }
108         }
109         DEBUG("write_riemann plugin: got a successful connection for: %s:%d",
110               node, port);
111
112         return 0;
113 } /* }}} int wrr_connect */
114
115 /* host->lock must be held when calling this function. */
116 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
117 {
118         if (!host->client)
119                 return (0);
120
121         riemann_client_free(host->client);
122         host->client = NULL;
123
124         return (0);
125 } /* }}} int wrr_disconnect */
126
127 /**
128  * Function to send messages to riemann.
129  *
130  * Acquires the host lock, disconnects on errors.
131  */
132 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
133 {
134         int status = 0;
135         pthread_mutex_lock (&host->lock);
136
137         status = wrr_connect(host);
138         if (status != 0) {
139         pthread_mutex_unlock(&host->lock);
140                 return status;
141     }
142
143         status = riemann_client_send_message(host->client, msg);
144         if (status != 0) {
145                 wrr_disconnect(host);
146                 pthread_mutex_unlock(&host->lock);
147                 return status;
148         }
149
150         /*
151          * For TCP we need to receive message acknowledgemenent.
152          */
153         if (host->client_type != RIEMANN_CLIENT_UDP)
154         {
155                 riemann_message_t *response;
156
157                 response = riemann_client_recv_message(host->client);
158
159                 if (response == NULL)
160                 {
161                         wrr_disconnect(host);
162                         pthread_mutex_unlock(&host->lock);
163                         return errno;
164                 }
165                 riemann_message_free(response);
166         }
167
168         pthread_mutex_unlock (&host->lock);
169         return 0;
170 } /* }}} int wrr_send */
171
172 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
173                 notification_t const *n)
174 {
175         riemann_message_t *msg;
176         riemann_event_t *event;
177         char service_buffer[6 * DATA_MAX_NAME_LEN];
178         char const *severity;
179         notification_meta_t *meta;
180         size_t i;
181
182         switch (n->severity)
183         {
184                 case NOTIF_OKAY:        severity = "ok"; break;
185                 case NOTIF_WARNING:     severity = "warning"; break;
186                 case NOTIF_FAILURE:     severity = "critical"; break;
187                 default:                severity = "unknown";
188         }
189
190         format_name(service_buffer, sizeof(service_buffer),
191                     /* host = */ "", n->plugin, n->plugin_instance,
192                     n->type, n->type_instance);
193
194         event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
195                                      RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
196                                      RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
197                                      RIEMANN_EVENT_FIELD_STATE, severity,
198                                      RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
199                                      RIEMANN_EVENT_FIELD_NONE);
200
201         if (n->host[0] != 0)
202                 riemann_event_string_attribute_add(event, "host", n->host);
203         if (n->plugin[0] != 0)
204                 riemann_event_string_attribute_add(event, "plugin", n->plugin);
205         if (n->plugin_instance[0] != 0)
206                 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
207
208         if (n->type[0] != 0)
209                 riemann_event_string_attribute_add(event, "type", n->type);
210         if (n->type_instance[0] != 0)
211                 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
212
213         for (i = 0; i < riemann_attrs_num; i += 2)
214                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
215
216         for (i = 0; i < riemann_tags_num; i++)
217                 riemann_event_tag_add(event, riemann_tags[i]);
218
219         if (n->message[0] != 0)
220                 riemann_event_string_attribute_add(event, "description", n->message);
221
222         /* Pull in values from threshold and add extra attributes */
223         for (meta = n->meta; meta != NULL; meta = meta->next)
224         {
225                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
226                 {
227                         riemann_event_set(event,
228                                           RIEMANN_EVENT_FIELD_METRIC_D,
229                                           (double) meta->nm_value.nm_double,
230                                           RIEMANN_EVENT_FIELD_NONE);
231                         continue;
232                 }
233
234                 if (meta->type == NM_TYPE_STRING) {
235                         riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
236                         continue;
237                 }
238         }
239
240         msg = riemann_message_create_with_events(event, NULL);
241         if (msg == NULL)
242         {
243                 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
244                 riemann_event_free (event);
245                 return (NULL);
246         }
247
248         DEBUG("write_riemann plugin: Successfully created message for notification: "
249               "host = \"%s\", service = \"%s\", state = \"%s\"",
250               event->host, event->service, event->state);
251         return (msg);
252 } /* }}} riemann_message_t *wrr_notification_to_message */
253
254 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
255                                            data_set_t const *ds,
256                                            value_list_t const *vl, size_t index,
257                                            gauge_t const *rates,
258                                            int status)
259 {
260         riemann_event_t *event;
261         char name_buffer[5 * DATA_MAX_NAME_LEN];
262         char service_buffer[6 * DATA_MAX_NAME_LEN];
263         size_t i;
264
265         event = riemann_event_new();
266         if (event == NULL)
267         {
268                 ERROR("write_riemann plugin: riemann_event_new() failed.");
269                 return (NULL);
270         }
271
272         format_name(name_buffer, sizeof(name_buffer),
273                     /* host = */ "", vl->plugin, vl->plugin_instance,
274                     vl->type, vl->type_instance);
275         if (host->always_append_ds || (ds->ds_num > 1))
276         {
277                 if (host->event_service_prefix == NULL)
278                         ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
279                                   &name_buffer[1], ds->ds[index].name);
280                 else
281                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
282                                   host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
283         }
284         else
285         {
286                 if (host->event_service_prefix == NULL)
287                         sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
288                 else
289                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
290                                   host->event_service_prefix, &name_buffer[1]);
291         }
292
293         riemann_event_set(event,
294                           RIEMANN_EVENT_FIELD_HOST, vl->host,
295                           RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
296                           RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
297                           RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
298                           "plugin", vl->plugin,
299                           "type", vl->type,
300                           "ds_name", ds->ds[index].name,
301                           NULL,
302                           RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
303                           RIEMANN_EVENT_FIELD_NONE);
304
305         if (host->check_thresholds) {
306                 const char *state = NULL;
307
308                 switch (status) {
309                         case STATE_OKAY:
310                                 state = "ok";
311                                 break;
312                         case STATE_ERROR:
313                                 state = "critical";
314                                 break;
315                         case STATE_WARNING:
316                                 state = "warning";
317                                 break;
318                         case STATE_MISSING:
319                                 state = "unknown";
320                                 break;
321                 }
322                 if (state)
323                         riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
324                                           RIEMANN_EVENT_FIELD_NONE);
325         }
326
327         if (vl->plugin_instance[0] != 0)
328                 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
329         if (vl->type_instance[0] != 0)
330                 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
331
332         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
333         {
334                 char ds_type[DATA_MAX_NAME_LEN];
335
336                 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
337                           DS_TYPE_TO_STRING(ds->ds[index].type));
338                 riemann_event_string_attribute_add(event, "ds_type", ds_type);
339         }
340         else
341         {
342                 riemann_event_string_attribute_add(event, "ds_type",
343                                        DS_TYPE_TO_STRING(ds->ds[index].type));
344         }
345
346         {
347                 char ds_index[DATA_MAX_NAME_LEN];
348
349                 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
350                 riemann_event_string_attribute_add(event, "ds_index", ds_index);
351         }
352
353         for (i = 0; i < riemann_attrs_num; i += 2)
354                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
355
356         for (i = 0; i < riemann_tags_num; i++)
357                 riemann_event_tag_add(event, riemann_tags[i]);
358
359         if (ds->ds[index].type == DS_TYPE_GAUGE)
360         {
361                 riemann_event_set(event,
362                                   RIEMANN_EVENT_FIELD_METRIC_D,
363                                   (double) vl->values[index].gauge,
364                                   RIEMANN_EVENT_FIELD_NONE);
365         }
366         else if (rates != NULL)
367         {
368                 riemann_event_set(event,
369                                   RIEMANN_EVENT_FIELD_METRIC_D,
370                                   (double) rates[index],
371                                   RIEMANN_EVENT_FIELD_NONE);
372         }
373         else
374         {
375                 int64_t metric;
376
377                 if (ds->ds[index].type == DS_TYPE_DERIVE)
378                         metric = (int64_t) vl->values[index].derive;
379                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
380                         metric = (int64_t) vl->values[index].absolute;
381                 else
382                         metric = (int64_t) vl->values[index].counter;
383
384                 riemann_event_set(event,
385                                   RIEMANN_EVENT_FIELD_METRIC_S64,
386                                   (int64_t) metric,
387                                   RIEMANN_EVENT_FIELD_NONE);
388         }
389
390         DEBUG("write_riemann plugin: Successfully created message for metric: "
391               "host = \"%s\", service = \"%s\"",
392               event->host, event->service);
393         return (event);
394 } /* }}} riemann_event_t *wrr_value_to_event */
395
396 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
397                                                     data_set_t const *ds,
398                                                     value_list_t const *vl,
399                                                     int *statuses)
400 {
401         riemann_message_t *msg;
402         size_t i;
403         gauge_t *rates = NULL;
404
405         /* Initialize the Msg structure. */
406         msg = riemann_message_new();
407         if (msg == NULL)
408         {
409                 ERROR ("write_riemann plugin: riemann_message_new failed.");
410                 return (NULL);
411         }
412
413         if (host->store_rates)
414         {
415                 rates = uc_get_rate(ds, vl);
416                 if (rates == NULL)
417                 {
418                         ERROR("write_riemann plugin: uc_get_rate failed.");
419                         riemann_message_free(msg);
420                         return (NULL);
421                 }
422         }
423
424         for (i = 0; i < vl->values_len; i++)
425         {
426                 riemann_event_t *event;
427
428                 event = wrr_value_to_event(host, ds, vl,
429                                            (int) i, rates, statuses[i]);
430                 if (event == NULL)
431                 {
432                         riemann_message_free(msg);
433                         sfree(rates);
434                         return (NULL);
435                 }
436                 riemann_message_append_events(msg, event, NULL);
437         }
438
439         sfree(rates);
440         return (msg);
441 } /* }}} riemann_message_t *wrr_value_list_to_message */
442
443 /*
444  * Always call while holding host->lock !
445  */
446 static int wrr_batch_flush_nolock(cdtime_t timeout,
447                                   struct riemann_host *host)
448 {
449         cdtime_t    now;
450         int         status = 0;
451
452         if (timeout > 0) {
453                 now = cdtime();
454                 if ((host->batch_init + timeout) > now)
455                         return status;
456         }
457         wrr_send(host, host->batch_msg);
458         riemann_message_free(host->batch_msg);
459
460         if (host->client_type != RIEMANN_CLIENT_UDP)
461         {
462                 riemann_message_t *response;
463
464                 response = riemann_client_recv_message(host->client);
465
466                 if (!response)
467                 {
468                         wrr_disconnect(host);
469                         return errno;
470                 }
471
472                 riemann_message_free(response);
473         }
474
475         host->batch_init = cdtime();
476         host->batch_msg = NULL;
477         return status;
478 }
479
480 static int wrr_batch_flush(cdtime_t timeout,
481         const char *identifier __attribute__((unused)),
482         user_data_t *user_data)
483 {
484         struct riemann_host *host;
485         int status;
486
487         if (user_data == NULL)
488                 return (-EINVAL);
489
490         host = user_data->data;
491         pthread_mutex_lock(&host->lock);
492         status = wrr_batch_flush_nolock(timeout, host);
493         if (status != 0)
494                 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
495                       status);
496
497         pthread_mutex_unlock(&host->lock);
498         return status;
499 }
500
501 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
502                                     data_set_t const *ds,
503                                     value_list_t const *vl,
504                                     int *statuses)
505 {
506         riemann_message_t *msg;
507         size_t len;
508         int ret;
509
510         msg = wrr_value_list_to_message(host, ds, vl, statuses);
511         if (msg == NULL)
512                 return -1;
513
514         pthread_mutex_lock(&host->lock);
515
516         if (host->batch_msg == NULL) {
517                 host->batch_msg = msg;
518         } else {
519                 int status;
520
521                 status = riemann_message_append_events_n(host->batch_msg,
522                                                          msg->n_events,
523                                                          msg->events);
524                 msg->n_events = 0;
525                 msg->events = NULL;
526
527                 riemann_message_free(msg);
528
529                 if (status != 0) {
530                         pthread_mutex_unlock(&host->lock);
531                         ERROR("write_riemann plugin: out of memory");
532                         return -1;
533                 }
534         }
535
536         len = riemann_message_get_packed_size(host->batch_msg);
537         ret = 0;
538         if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
539                 ret = wrr_batch_flush_nolock(0, host);
540         }
541
542         pthread_mutex_unlock(&host->lock);
543         return ret;
544 } /* }}} riemann_message_t *wrr_batch_add_value_list */
545
546 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
547 {
548         int                      status;
549         struct riemann_host     *host = ud->data;
550         riemann_message_t       *msg;
551
552         if (!host->notifications)
553                 return 0;
554
555     /*
556      * Never batch for notifications, send them ASAP
557      */
558         msg = wrr_notification_to_message(host, n);
559         if (msg == NULL)
560                 return (-1);
561
562         status = wrr_send(host, msg);
563         if (status != 0)
564                 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
565                       status);
566
567         riemann_message_free(msg);
568         return (status);
569 } /* }}} int wrr_notification */
570
571 static int wrr_write(const data_set_t *ds, /* {{{ */
572               const value_list_t *vl,
573               user_data_t *ud)
574 {
575         int                      status = 0;
576         int                      statuses[vl->values_len];
577         struct riemann_host     *host = ud->data;
578         riemann_message_t       *msg;
579
580         if (host->check_thresholds) {
581                 status = write_riemann_threshold_check(ds, vl, statuses);
582     if (status != 0)
583       return status;
584   } else {
585     memset (statuses, 0, sizeof (statuses));
586   }
587
588   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
589     wrr_batch_add_value_list(host, ds, vl, statuses);
590   } else {
591     msg = wrr_value_list_to_message(host, ds, vl, statuses);
592     if (msg == NULL)
593       return (-1);
594
595     status = wrr_send(host, msg);
596     if (status != 0)
597       ERROR("write_riemann plugin: riemann_client_send failed with status %i",
598             status);
599
600     riemann_message_free(msg);
601   }
602   return status;
603 } /* }}} int wrr_write */
604
605 static void wrr_free(void *p) /* {{{ */
606 {
607   struct riemann_host   *host = p;
608
609   if (host == NULL)
610     return;
611
612   pthread_mutex_lock(&host->lock);
613
614   host->reference_count--;
615   if (host->reference_count > 0)
616     {
617       pthread_mutex_unlock(&host->lock);
618       return;
619     }
620
621   wrr_disconnect(host);
622
623   pthread_mutex_destroy(&host->lock);
624   sfree(host);
625 } /* }}} void wrr_free */
626
627 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
628 {
629   struct riemann_host   *host = NULL;
630   int                    status = 0;
631   int                    i;
632   oconfig_item_t                *child;
633   char                   callback_name[DATA_MAX_NAME_LEN];
634   user_data_t            ud;
635
636   if ((host = calloc(1, sizeof(*host))) == NULL) {
637     ERROR ("write_riemann plugin: calloc failed.");
638     return ENOMEM;
639   }
640   pthread_mutex_init(&host->lock, NULL);
641   host->reference_count = 1;
642   host->node = NULL;
643   host->port = 0;
644   host->notifications = 1;
645   host->check_thresholds = 0;
646   host->store_rates = 1;
647   host->always_append_ds = 0;
648   host->batch_mode = 1;
649   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
650   host->batch_init = cdtime();
651   host->ttl_factor = RIEMANN_TTL_FACTOR;
652   host->client = NULL;
653   host->client_type = RIEMANN_CLIENT_TCP;
654   host->timeout.tv_sec = 0;
655   host->timeout.tv_usec = 0;
656
657   status = cf_util_get_string(ci, &host->name);
658   if (status != 0) {
659     WARNING("write_riemann plugin: Required host name is missing.");
660     wrr_free(host);
661     return -1;
662   }
663
664   for (i = 0; i < ci->children_num; i++) {
665     /*
666      * The code here could be simplified but makes room
667      * for easy adding of new options later on.
668      */
669     child = &ci->children[i];
670     status = 0;
671
672     if (strcasecmp("Host", child->key) == 0) {
673       status = cf_util_get_string(child, &host->node);
674       if (status != 0)
675         break;
676     } else if (strcasecmp("Notifications", child->key) == 0) {
677       status = cf_util_get_boolean(child, &host->notifications);
678       if (status != 0)
679         break;
680     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
681       status = cf_util_get_string(child, &host->event_service_prefix);
682       if (status != 0)
683         break;
684     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
685       status = cf_util_get_boolean(child, &host->check_thresholds);
686       if (status != 0)
687         break;
688     } else if (strcasecmp("Batch", child->key) == 0) {
689       status = cf_util_get_boolean(child, &host->batch_mode);
690       if (status != 0)
691         break;
692     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
693       status = cf_util_get_int(child, &host->batch_max);
694       if (status != 0)
695         break;
696     } else if (strcasecmp("Timeout", child->key) == 0) {
697       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
698       if (status != 0)
699         break;
700     } else if (strcasecmp("Port", child->key) == 0) {
701       host->port = cf_util_get_port_number(child);
702       if (host->port == -1) {
703         ERROR("write_riemann plugin: Invalid argument "
704               "configured for the \"Port\" "
705               "option.");
706         break;
707       }
708     } else if (strcasecmp("Protocol", child->key) == 0) {
709       char tmp[16];
710       status = cf_util_get_string_buffer(child,
711                                          tmp, sizeof(tmp));
712       if (status != 0)
713         {
714           ERROR("write_riemann plugin: cf_util_get_"
715                 "string_buffer failed with "
716                 "status %i.", status);
717           break;
718         }
719
720       if (strcasecmp("UDP", tmp) == 0)
721         host->client_type = RIEMANN_CLIENT_UDP;
722       else if (strcasecmp("TCP", tmp) == 0)
723         host->client_type = RIEMANN_CLIENT_TCP;
724       else if (strcasecmp("TLS", tmp) == 0)
725         host->client_type = RIEMANN_CLIENT_TLS;
726       else
727         WARNING("write_riemann plugin: The value "
728                 "\"%s\" is not valid for the "
729                 "\"Protocol\" option. Use "
730                 "either \"UDP\", \"TCP\" or \"TLS\".",
731                 tmp);
732     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
733       status = cf_util_get_string(child, &host->tls_ca_file);
734       if (status != 0)
735         {
736           ERROR("write_riemann plugin: cf_util_get_"
737                 "string_buffer failed with "
738                 "status %i.", status);
739           break;
740         }
741     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
742       status = cf_util_get_string(child, &host->tls_cert_file);
743       if (status != 0)
744         {
745           ERROR("write_riemann plugin: cf_util_get_"
746                 "string_buffer failed with "
747                 "status %i.", status);
748           break;
749         }
750     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
751       status = cf_util_get_string(child, &host->tls_key_file);
752       if (status != 0)
753         {
754           ERROR("write_riemann plugin: cf_util_get_"
755                 "string_buffer failed with "
756                 "status %i.", status);
757           break;
758         }
759     } else if (strcasecmp("StoreRates", child->key) == 0) {
760       status = cf_util_get_boolean(child, &host->store_rates);
761       if (status != 0)
762         break;
763     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
764       status = cf_util_get_boolean(child,
765                                    &host->always_append_ds);
766       if (status != 0)
767         break;
768     } else if (strcasecmp("TTLFactor", child->key) == 0) {
769       double tmp = NAN;
770       status = cf_util_get_double(child, &tmp);
771       if (status != 0)
772         break;
773       if (tmp >= 2.0) {
774         host->ttl_factor = tmp;
775       } else if (tmp >= 1.0) {
776         NOTICE("write_riemann plugin: The configured "
777                "TTLFactor is very small "
778                "(%.1f). A value of 2.0 or "
779                "greater is recommended.",
780                tmp);
781         host->ttl_factor = tmp;
782       } else if (tmp > 0.0) {
783         WARNING("write_riemann plugin: The configured "
784                 "TTLFactor is too small to be "
785                 "useful (%.1f). I'll use it "
786                 "since the user knows best, "
787                 "but under protest.",
788                 tmp);
789         host->ttl_factor = tmp;
790       } else { /* zero, negative and NAN */
791         ERROR("write_riemann plugin: The configured "
792               "TTLFactor is invalid (%.1f).",
793               tmp);
794       }
795     } else {
796       WARNING("write_riemann plugin: ignoring unknown config "
797               "option: \"%s\"", child->key);
798     }
799   }
800   if (status != 0) {
801     wrr_free(host);
802     return status;
803   }
804
805   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
806             host->name);
807   ud.data = host;
808   ud.free_func = wrr_free;
809
810   pthread_mutex_lock(&host->lock);
811
812   status = plugin_register_write(callback_name, wrr_write, &ud);
813
814   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
815     ud.free_func = NULL;
816     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
817   }
818   if (status != 0)
819     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
820             "failed with status %i.",
821             callback_name, status);
822   else /* success */
823     host->reference_count++;
824
825   status = plugin_register_notification(callback_name,
826                                         wrr_notification, &ud);
827   if (status != 0)
828     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
829             "failed with status %i.",
830             callback_name, status);
831   else /* success */
832     host->reference_count++;
833
834   if (host->reference_count <= 1)
835     {
836       /* Both callbacks failed => free memory.
837        * We need to unlock here, because riemann_free() will lock.
838        * This is not a race condition, because we're the only one
839        * holding a reference. */
840       pthread_mutex_unlock(&host->lock);
841       wrr_free(host);
842       return (-1);
843     }
844
845   host->reference_count--;
846   pthread_mutex_unlock(&host->lock);
847
848   return status;
849 } /* }}} int wrr_config_node */
850
851 static int wrr_config(oconfig_item_t *ci) /* {{{ */
852 {
853   int            i;
854   oconfig_item_t        *child;
855   int            status;
856
857   for (i = 0; i < ci->children_num; i++)  {
858     child = &ci->children[i];
859
860     if (strcasecmp("Node", child->key) == 0) {
861       wrr_config_node (child);
862     } else if (strcasecmp(child->key, "attribute") == 0) {
863       char *key = NULL;
864       char *val = NULL;
865
866       if (child->values_num != 2) {
867         WARNING("riemann attributes need both a key and a value.");
868         return (-1);
869       }
870       if (child->values[0].type != OCONFIG_TYPE_STRING ||
871           child->values[1].type != OCONFIG_TYPE_STRING) {
872         WARNING("riemann attribute needs string arguments.");
873         return (-1);
874       }
875       if ((key = strdup(child->values[0].value.string)) == NULL) {
876         WARNING("cannot allocate memory for attribute key.");
877         return (-1);
878       }
879       if ((val = strdup(child->values[1].value.string)) == NULL) {
880         WARNING("cannot allocate memory for attribute value.");
881         sfree(key);
882         return (-1);
883       }
884       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
885       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
886       DEBUG("write_riemann: got attr: %s => %s", key, val);
887       sfree(key);
888       sfree(val);
889     } else if (strcasecmp(child->key, "tag") == 0) {
890       char *tmp = NULL;
891       status = cf_util_get_string(child, &tmp);
892       if (status != 0)
893         continue;
894
895       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
896       DEBUG("write_riemann plugin: Got tag: %s", tmp);
897       sfree(tmp);
898     } else {
899       WARNING("write_riemann plugin: Ignoring unknown "
900               "configuration option \"%s\" at top level.",
901               child->key);
902     }
903   }
904   return (0);
905 } /* }}} int wrr_config */
906
907 void module_register(void)
908 {
909   plugin_register_complex_config("write_riemann", wrr_config);
910 }
911
912 /* vim: set sw=8 sts=8 ts=8 noet : */