write_riemann: reorder host struct
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include <riemann/riemann-client.h>
32 #include <errno.h>
33 #include <pthread.h>
34
35 #include "collectd.h"
36 #include "plugin.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "utils_cache.h"
40 #include "utils_complain.h"
41 #include "write_riemann_threshold.h"
42
43 #define RIEMANN_HOST            "localhost"
44 #define RIEMANN_PORT            5555
45 #define RIEMANN_TTL_FACTOR      2.0
46 #define RIEMANN_BATCH_MAX      8192
47
48 struct riemann_host {
49     c_complain_t init_complaint;
50         char                    *name;
51         char                    *event_service_prefix;
52         pthread_mutex_t  lock;
53     _Bool            batch_mode;
54         _Bool            notifications;
55         _Bool            check_thresholds;
56         _Bool                    store_rates;
57         _Bool                    always_append_ds;
58         char                    *node;
59         int                      port;
60         riemann_client_type_t    client_type;
61         riemann_client_t        *client;
62         double                   ttl_factor;
63     cdtime_t         batch_init;
64     int              batch_max;
65         int                          reference_count;
66   riemann_message_t     *batch_msg;
67         char                     *tls_ca_file;
68         char                     *tls_cert_file;
69         char                     *tls_key_file;
70         struct timeval timeout;
71 };
72
73 static char     **riemann_tags;
74 static size_t     riemann_tags_num;
75 static char     **riemann_attrs;
76 static size_t     riemann_attrs_num;
77
78 /* host->lock must be held when calling this function. */
79 static int wrr_connect(struct riemann_host *host) /* {{{ */
80 {
81         char const              *node;
82         int                      port;
83
84         if (host->client)
85                 return 0;
86
87         node = (host->node != NULL) ? host->node : RIEMANN_HOST;
88         port = (host->port) ? host->port : RIEMANN_PORT;
89
90         host->client = NULL;
91
92         host->client = riemann_client_create(host->client_type, node, port,
93                                              RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
94                                              RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
95                                              RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
96                                              RIEMANN_CLIENT_OPTION_NONE);
97         if (host->client == NULL) {
98         c_complain (LOG_ERR, &host->init_complaint,
99                     "write_riemann plugin: Unable to connect to Riemann at %s:%d",
100                     node, port);
101                 return -1;
102         }
103         if (host->timeout.tv_sec != 0) {
104                 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
105                         riemann_client_free(host->client);
106                         host->client = NULL;
107             c_complain (LOG_ERR, &host->init_complaint,
108                         "write_riemann plugin: Unable to connect to Riemann at %s:%d",
109                         node, port);
110                         return -1;
111                 }
112         }
113
114     c_release (LOG_INFO, &host->init_complaint,
115                "write_riemann plugin: Successfully connected to %s:%d",
116                node, port);
117
118         return 0;
119 } /* }}} int wrr_connect */
120
121 /* host->lock must be held when calling this function. */
122 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
123 {
124         if (!host->client)
125                 return (0);
126
127         riemann_client_free(host->client);
128         host->client = NULL;
129
130         return (0);
131 } /* }}} int wrr_disconnect */
132
133 /**
134  * Function to send messages to riemann.
135  *
136  * Acquires the host lock, disconnects on errors.
137  */
138 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
139 {
140         int status = 0;
141         pthread_mutex_lock (&host->lock);
142
143         status = wrr_connect(host);
144         if (status != 0) {
145         pthread_mutex_unlock(&host->lock);
146                 return status;
147     }
148
149         status = riemann_client_send_message(host->client, msg);
150         if (status != 0) {
151                 wrr_disconnect(host);
152                 pthread_mutex_unlock(&host->lock);
153                 return status;
154         }
155
156         /*
157          * For TCP we need to receive message acknowledgemenent.
158          */
159         if (host->client_type != RIEMANN_CLIENT_UDP)
160         {
161                 riemann_message_t *response;
162
163                 response = riemann_client_recv_message(host->client);
164
165                 if (response == NULL)
166                 {
167                         wrr_disconnect(host);
168                         pthread_mutex_unlock(&host->lock);
169                         return errno;
170                 }
171                 riemann_message_free(response);
172         }
173
174         pthread_mutex_unlock (&host->lock);
175         return 0;
176 } /* }}} int wrr_send */
177
178 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
179                 notification_t const *n)
180 {
181         riemann_message_t *msg;
182         riemann_event_t *event;
183         char service_buffer[6 * DATA_MAX_NAME_LEN];
184         char const *severity;
185         notification_meta_t *meta;
186         size_t i;
187
188         switch (n->severity)
189         {
190                 case NOTIF_OKAY:        severity = "ok"; break;
191                 case NOTIF_WARNING:     severity = "warning"; break;
192                 case NOTIF_FAILURE:     severity = "critical"; break;
193                 default:                severity = "unknown";
194         }
195
196         format_name(service_buffer, sizeof(service_buffer),
197                     /* host = */ "", n->plugin, n->plugin_instance,
198                     n->type, n->type_instance);
199
200         event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
201                                      RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
202                                      RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
203                                      RIEMANN_EVENT_FIELD_STATE, severity,
204                                      RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
205                                      RIEMANN_EVENT_FIELD_NONE);
206
207         if (n->host[0] != 0)
208                 riemann_event_string_attribute_add(event, "host", n->host);
209         if (n->plugin[0] != 0)
210                 riemann_event_string_attribute_add(event, "plugin", n->plugin);
211         if (n->plugin_instance[0] != 0)
212                 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
213
214         if (n->type[0] != 0)
215                 riemann_event_string_attribute_add(event, "type", n->type);
216         if (n->type_instance[0] != 0)
217                 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
218
219         for (i = 0; i < riemann_attrs_num; i += 2)
220                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
221
222         for (i = 0; i < riemann_tags_num; i++)
223                 riemann_event_tag_add(event, riemann_tags[i]);
224
225         if (n->message[0] != 0)
226                 riemann_event_string_attribute_add(event, "description", n->message);
227
228         /* Pull in values from threshold and add extra attributes */
229         for (meta = n->meta; meta != NULL; meta = meta->next)
230         {
231                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
232                 {
233                         riemann_event_set(event,
234                                           RIEMANN_EVENT_FIELD_METRIC_D,
235                                           (double) meta->nm_value.nm_double,
236                                           RIEMANN_EVENT_FIELD_NONE);
237                         continue;
238                 }
239
240                 if (meta->type == NM_TYPE_STRING) {
241                         riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
242                         continue;
243                 }
244         }
245
246         msg = riemann_message_create_with_events(event, NULL);
247         if (msg == NULL)
248         {
249                 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
250                 riemann_event_free (event);
251                 return (NULL);
252         }
253
254         DEBUG("write_riemann plugin: Successfully created message for notification: "
255               "host = \"%s\", service = \"%s\", state = \"%s\"",
256               event->host, event->service, event->state);
257         return (msg);
258 } /* }}} riemann_message_t *wrr_notification_to_message */
259
260 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
261                                            data_set_t const *ds,
262                                            value_list_t const *vl, size_t index,
263                                            gauge_t const *rates,
264                                            int status)
265 {
266         riemann_event_t *event;
267         char name_buffer[5 * DATA_MAX_NAME_LEN];
268         char service_buffer[6 * DATA_MAX_NAME_LEN];
269         size_t i;
270
271         event = riemann_event_new();
272         if (event == NULL)
273         {
274                 ERROR("write_riemann plugin: riemann_event_new() failed.");
275                 return (NULL);
276         }
277
278         format_name(name_buffer, sizeof(name_buffer),
279                     /* host = */ "", vl->plugin, vl->plugin_instance,
280                     vl->type, vl->type_instance);
281         if (host->always_append_ds || (ds->ds_num > 1))
282         {
283                 if (host->event_service_prefix == NULL)
284                         ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
285                                   &name_buffer[1], ds->ds[index].name);
286                 else
287                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
288                                   host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
289         }
290         else
291         {
292                 if (host->event_service_prefix == NULL)
293                         sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
294                 else
295                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
296                                   host->event_service_prefix, &name_buffer[1]);
297         }
298
299         riemann_event_set(event,
300                           RIEMANN_EVENT_FIELD_HOST, vl->host,
301                           RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
302                           RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
303                           RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
304                           "plugin", vl->plugin,
305                           "type", vl->type,
306                           "ds_name", ds->ds[index].name,
307                           NULL,
308                           RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
309                           RIEMANN_EVENT_FIELD_NONE);
310
311         if (host->check_thresholds) {
312                 const char *state = NULL;
313
314                 switch (status) {
315                         case STATE_OKAY:
316                                 state = "ok";
317                                 break;
318                         case STATE_ERROR:
319                                 state = "critical";
320                                 break;
321                         case STATE_WARNING:
322                                 state = "warning";
323                                 break;
324                         case STATE_MISSING:
325                                 state = "unknown";
326                                 break;
327                 }
328                 if (state)
329                         riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
330                                           RIEMANN_EVENT_FIELD_NONE);
331         }
332
333         if (vl->plugin_instance[0] != 0)
334                 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
335         if (vl->type_instance[0] != 0)
336                 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
337
338         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
339         {
340                 char ds_type[DATA_MAX_NAME_LEN];
341
342                 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
343                           DS_TYPE_TO_STRING(ds->ds[index].type));
344                 riemann_event_string_attribute_add(event, "ds_type", ds_type);
345         }
346         else
347         {
348                 riemann_event_string_attribute_add(event, "ds_type",
349                                        DS_TYPE_TO_STRING(ds->ds[index].type));
350         }
351
352         {
353                 char ds_index[DATA_MAX_NAME_LEN];
354
355                 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
356                 riemann_event_string_attribute_add(event, "ds_index", ds_index);
357         }
358
359         for (i = 0; i < riemann_attrs_num; i += 2)
360                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
361
362         for (i = 0; i < riemann_tags_num; i++)
363                 riemann_event_tag_add(event, riemann_tags[i]);
364
365         if (ds->ds[index].type == DS_TYPE_GAUGE)
366         {
367                 riemann_event_set(event,
368                                   RIEMANN_EVENT_FIELD_METRIC_D,
369                                   (double) vl->values[index].gauge,
370                                   RIEMANN_EVENT_FIELD_NONE);
371         }
372         else if (rates != NULL)
373         {
374                 riemann_event_set(event,
375                                   RIEMANN_EVENT_FIELD_METRIC_D,
376                                   (double) rates[index],
377                                   RIEMANN_EVENT_FIELD_NONE);
378         }
379         else
380         {
381                 int64_t metric;
382
383                 if (ds->ds[index].type == DS_TYPE_DERIVE)
384                         metric = (int64_t) vl->values[index].derive;
385                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
386                         metric = (int64_t) vl->values[index].absolute;
387                 else
388                         metric = (int64_t) vl->values[index].counter;
389
390                 riemann_event_set(event,
391                                   RIEMANN_EVENT_FIELD_METRIC_S64,
392                                   (int64_t) metric,
393                                   RIEMANN_EVENT_FIELD_NONE);
394         }
395
396         DEBUG("write_riemann plugin: Successfully created message for metric: "
397               "host = \"%s\", service = \"%s\"",
398               event->host, event->service);
399         return (event);
400 } /* }}} riemann_event_t *wrr_value_to_event */
401
402 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
403                                                     data_set_t const *ds,
404                                                     value_list_t const *vl,
405                                                     int *statuses)
406 {
407         riemann_message_t *msg;
408         size_t i;
409         gauge_t *rates = NULL;
410
411         /* Initialize the Msg structure. */
412         msg = riemann_message_new();
413         if (msg == NULL)
414         {
415                 ERROR ("write_riemann plugin: riemann_message_new failed.");
416                 return (NULL);
417         }
418
419         if (host->store_rates)
420         {
421                 rates = uc_get_rate(ds, vl);
422                 if (rates == NULL)
423                 {
424                         ERROR("write_riemann plugin: uc_get_rate failed.");
425                         riemann_message_free(msg);
426                         return (NULL);
427                 }
428         }
429
430         for (i = 0; i < vl->values_len; i++)
431         {
432                 riemann_event_t *event;
433
434                 event = wrr_value_to_event(host, ds, vl,
435                                            (int) i, rates, statuses[i]);
436                 if (event == NULL)
437                 {
438                         riemann_message_free(msg);
439                         sfree(rates);
440                         return (NULL);
441                 }
442                 riemann_message_append_events(msg, event, NULL);
443         }
444
445         sfree(rates);
446         return (msg);
447 } /* }}} riemann_message_t *wrr_value_list_to_message */
448
449 /*
450  * Always call while holding host->lock !
451  */
452 static int wrr_batch_flush_nolock(cdtime_t timeout,
453                                   struct riemann_host *host)
454 {
455         cdtime_t    now;
456         int         status = 0;
457
458         if (timeout > 0) {
459                 now = cdtime();
460                 if ((host->batch_init + timeout) > now)
461                         return status;
462         }
463         wrr_send(host, host->batch_msg);
464         riemann_message_free(host->batch_msg);
465
466         if (host->client_type != RIEMANN_CLIENT_UDP)
467         {
468                 riemann_message_t *response;
469
470                 response = riemann_client_recv_message(host->client);
471
472                 if (!response)
473                 {
474                         wrr_disconnect(host);
475                         return errno;
476                 }
477
478                 riemann_message_free(response);
479         }
480
481         host->batch_init = cdtime();
482         host->batch_msg = NULL;
483         return status;
484 }
485
486 static int wrr_batch_flush(cdtime_t timeout,
487         const char *identifier __attribute__((unused)),
488         user_data_t *user_data)
489 {
490         struct riemann_host *host;
491         int status;
492
493         if (user_data == NULL)
494                 return (-EINVAL);
495
496         host = user_data->data;
497         pthread_mutex_lock(&host->lock);
498         status = wrr_batch_flush_nolock(timeout, host);
499         if (status != 0)
500         c_complain (LOG_ERR, &host->init_complaint,
501                     "write_riemann plugin: riemann_client_send failed with status %i",
502                     status);
503     else
504         c_release (LOG_DEBUG, &host->init_complaint, "write_riemann plugin: batch sent.");
505
506         pthread_mutex_unlock(&host->lock);
507         return status;
508 }
509
510 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
511                                     data_set_t const *ds,
512                                     value_list_t const *vl,
513                                     int *statuses)
514 {
515         riemann_message_t *msg;
516         size_t len;
517         int ret;
518
519         msg = wrr_value_list_to_message(host, ds, vl, statuses);
520         if (msg == NULL)
521                 return -1;
522
523         pthread_mutex_lock(&host->lock);
524
525         if (host->batch_msg == NULL) {
526                 host->batch_msg = msg;
527         } else {
528                 int status;
529
530                 status = riemann_message_append_events_n(host->batch_msg,
531                                                          msg->n_events,
532                                                          msg->events);
533                 msg->n_events = 0;
534                 msg->events = NULL;
535
536                 riemann_message_free(msg);
537
538                 if (status != 0) {
539                         pthread_mutex_unlock(&host->lock);
540                         ERROR("write_riemann plugin: out of memory");
541                         return -1;
542                 }
543         }
544
545         len = riemann_message_get_packed_size(host->batch_msg);
546         ret = 0;
547         if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
548                 ret = wrr_batch_flush_nolock(0, host);
549         }
550
551         pthread_mutex_unlock(&host->lock);
552         return ret;
553 } /* }}} riemann_message_t *wrr_batch_add_value_list */
554
555 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
556 {
557         int                      status;
558         struct riemann_host     *host = ud->data;
559         riemann_message_t       *msg;
560
561         if (!host->notifications)
562                 return 0;
563
564     /*
565      * Never batch for notifications, send them ASAP
566      */
567         msg = wrr_notification_to_message(host, n);
568         if (msg == NULL)
569                 return (-1);
570
571         status = wrr_send(host, msg);
572         if (status != 0)
573         c_complain (LOG_ERR, &host->init_complaint,
574                     "write_riemann plugin: riemann_client_send failed with status %i",
575                     status);
576     else
577         c_release (LOG_DEBUG, &host->init_complaint,
578                    "write_riemann plugin: riemann_client_send succeeded");
579
580         riemann_message_free(msg);
581         return (status);
582 } /* }}} int wrr_notification */
583
584 static int wrr_write(const data_set_t *ds, /* {{{ */
585               const value_list_t *vl,
586               user_data_t *ud)
587 {
588         int                      status = 0;
589         int                      statuses[vl->values_len];
590         struct riemann_host     *host = ud->data;
591         riemann_message_t       *msg;
592
593         if (host->check_thresholds) {
594                 status = write_riemann_threshold_check(ds, vl, statuses);
595     if (status != 0)
596       return status;
597   } else {
598     memset (statuses, 0, sizeof (statuses));
599   }
600
601   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
602     wrr_batch_add_value_list(host, ds, vl, statuses);
603   } else {
604     msg = wrr_value_list_to_message(host, ds, vl, statuses);
605     if (msg == NULL)
606       return (-1);
607
608     status = wrr_send(host, msg);
609
610     riemann_message_free(msg);
611   }
612   return status;
613 } /* }}} int wrr_write */
614
615 static void wrr_free(void *p) /* {{{ */
616 {
617   struct riemann_host   *host = p;
618
619   if (host == NULL)
620     return;
621
622   pthread_mutex_lock(&host->lock);
623
624   host->reference_count--;
625   if (host->reference_count > 0)
626     {
627       pthread_mutex_unlock(&host->lock);
628       return;
629     }
630
631   wrr_disconnect(host);
632
633   pthread_mutex_destroy(&host->lock);
634   sfree(host);
635 } /* }}} void wrr_free */
636
637 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
638 {
639   struct riemann_host   *host = NULL;
640   int                    status = 0;
641   int                    i;
642   oconfig_item_t                *child;
643   char                   callback_name[DATA_MAX_NAME_LEN];
644   user_data_t            ud;
645
646   if ((host = calloc(1, sizeof(*host))) == NULL) {
647     ERROR ("write_riemann plugin: calloc failed.");
648     return ENOMEM;
649   }
650   pthread_mutex_init(&host->lock, NULL);
651   C_COMPLAIN_INIT (&host->init_complaint);
652   host->reference_count = 1;
653   host->node = NULL;
654   host->port = 0;
655   host->notifications = 1;
656   host->check_thresholds = 0;
657   host->store_rates = 1;
658   host->always_append_ds = 0;
659   host->batch_mode = 1;
660   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
661   host->batch_init = cdtime();
662   host->ttl_factor = RIEMANN_TTL_FACTOR;
663   host->client = NULL;
664   host->client_type = RIEMANN_CLIENT_TCP;
665   host->timeout.tv_sec = 0;
666   host->timeout.tv_usec = 0;
667
668   status = cf_util_get_string(ci, &host->name);
669   if (status != 0) {
670     WARNING("write_riemann plugin: Required host name is missing.");
671     wrr_free(host);
672     return -1;
673   }
674
675   for (i = 0; i < ci->children_num; i++) {
676     /*
677      * The code here could be simplified but makes room
678      * for easy adding of new options later on.
679      */
680     child = &ci->children[i];
681     status = 0;
682
683     if (strcasecmp("Host", child->key) == 0) {
684       status = cf_util_get_string(child, &host->node);
685       if (status != 0)
686         break;
687     } else if (strcasecmp("Notifications", child->key) == 0) {
688       status = cf_util_get_boolean(child, &host->notifications);
689       if (status != 0)
690         break;
691     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
692       status = cf_util_get_string(child, &host->event_service_prefix);
693       if (status != 0)
694         break;
695     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
696       status = cf_util_get_boolean(child, &host->check_thresholds);
697       if (status != 0)
698         break;
699     } else if (strcasecmp("Batch", child->key) == 0) {
700       status = cf_util_get_boolean(child, &host->batch_mode);
701       if (status != 0)
702         break;
703     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
704       status = cf_util_get_int(child, &host->batch_max);
705       if (status != 0)
706         break;
707     } else if (strcasecmp("Timeout", child->key) == 0) {
708       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
709       if (status != 0)
710         break;
711     } else if (strcasecmp("Port", child->key) == 0) {
712       host->port = cf_util_get_port_number(child);
713       if (host->port == -1) {
714         ERROR("write_riemann plugin: Invalid argument "
715               "configured for the \"Port\" "
716               "option.");
717         break;
718       }
719     } else if (strcasecmp("Protocol", child->key) == 0) {
720       char tmp[16];
721       status = cf_util_get_string_buffer(child,
722                                          tmp, sizeof(tmp));
723       if (status != 0)
724         {
725           ERROR("write_riemann plugin: cf_util_get_"
726                 "string_buffer failed with "
727                 "status %i.", status);
728           break;
729         }
730
731       if (strcasecmp("UDP", tmp) == 0)
732         host->client_type = RIEMANN_CLIENT_UDP;
733       else if (strcasecmp("TCP", tmp) == 0)
734         host->client_type = RIEMANN_CLIENT_TCP;
735       else if (strcasecmp("TLS", tmp) == 0)
736         host->client_type = RIEMANN_CLIENT_TLS;
737       else
738         WARNING("write_riemann plugin: The value "
739                 "\"%s\" is not valid for the "
740                 "\"Protocol\" option. Use "
741                 "either \"UDP\", \"TCP\" or \"TLS\".",
742                 tmp);
743     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
744       status = cf_util_get_string(child, &host->tls_ca_file);
745       if (status != 0)
746         {
747           ERROR("write_riemann plugin: cf_util_get_"
748                 "string_buffer failed with "
749                 "status %i.", status);
750           break;
751         }
752     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
753       status = cf_util_get_string(child, &host->tls_cert_file);
754       if (status != 0)
755         {
756           ERROR("write_riemann plugin: cf_util_get_"
757                 "string_buffer failed with "
758                 "status %i.", status);
759           break;
760         }
761     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
762       status = cf_util_get_string(child, &host->tls_key_file);
763       if (status != 0)
764         {
765           ERROR("write_riemann plugin: cf_util_get_"
766                 "string_buffer failed with "
767                 "status %i.", status);
768           break;
769         }
770     } else if (strcasecmp("StoreRates", child->key) == 0) {
771       status = cf_util_get_boolean(child, &host->store_rates);
772       if (status != 0)
773         break;
774     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
775       status = cf_util_get_boolean(child,
776                                    &host->always_append_ds);
777       if (status != 0)
778         break;
779     } else if (strcasecmp("TTLFactor", child->key) == 0) {
780       double tmp = NAN;
781       status = cf_util_get_double(child, &tmp);
782       if (status != 0)
783         break;
784       if (tmp >= 2.0) {
785         host->ttl_factor = tmp;
786       } else if (tmp >= 1.0) {
787         NOTICE("write_riemann plugin: The configured "
788                "TTLFactor is very small "
789                "(%.1f). A value of 2.0 or "
790                "greater is recommended.",
791                tmp);
792         host->ttl_factor = tmp;
793       } else if (tmp > 0.0) {
794         WARNING("write_riemann plugin: The configured "
795                 "TTLFactor is too small to be "
796                 "useful (%.1f). I'll use it "
797                 "since the user knows best, "
798                 "but under protest.",
799                 tmp);
800         host->ttl_factor = tmp;
801       } else { /* zero, negative and NAN */
802         ERROR("write_riemann plugin: The configured "
803               "TTLFactor is invalid (%.1f).",
804               tmp);
805       }
806     } else {
807       WARNING("write_riemann plugin: ignoring unknown config "
808               "option: \"%s\"", child->key);
809     }
810   }
811   if (status != 0) {
812     wrr_free(host);
813     return status;
814   }
815
816   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
817             host->name);
818   ud.data = host;
819   ud.free_func = wrr_free;
820
821   pthread_mutex_lock(&host->lock);
822
823   status = plugin_register_write(callback_name, wrr_write, &ud);
824
825   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
826     ud.free_func = NULL;
827     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
828   }
829   if (status != 0)
830     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
831             "failed with status %i.",
832             callback_name, status);
833   else /* success */
834     host->reference_count++;
835
836   status = plugin_register_notification(callback_name,
837                                         wrr_notification, &ud);
838   if (status != 0)
839     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
840             "failed with status %i.",
841             callback_name, status);
842   else /* success */
843     host->reference_count++;
844
845   if (host->reference_count <= 1)
846     {
847       /* Both callbacks failed => free memory.
848        * We need to unlock here, because riemann_free() will lock.
849        * This is not a race condition, because we're the only one
850        * holding a reference. */
851       pthread_mutex_unlock(&host->lock);
852       wrr_free(host);
853       return (-1);
854     }
855
856   host->reference_count--;
857   pthread_mutex_unlock(&host->lock);
858
859   return status;
860 } /* }}} int wrr_config_node */
861
862 static int wrr_config(oconfig_item_t *ci) /* {{{ */
863 {
864   int            i;
865   oconfig_item_t        *child;
866   int            status;
867
868   for (i = 0; i < ci->children_num; i++)  {
869     child = &ci->children[i];
870
871     if (strcasecmp("Node", child->key) == 0) {
872       wrr_config_node (child);
873     } else if (strcasecmp(child->key, "attribute") == 0) {
874       char *key = NULL;
875       char *val = NULL;
876
877       if (child->values_num != 2) {
878         WARNING("riemann attributes need both a key and a value.");
879         return (-1);
880       }
881       if (child->values[0].type != OCONFIG_TYPE_STRING ||
882           child->values[1].type != OCONFIG_TYPE_STRING) {
883         WARNING("riemann attribute needs string arguments.");
884         return (-1);
885       }
886       if ((key = strdup(child->values[0].value.string)) == NULL) {
887         WARNING("cannot allocate memory for attribute key.");
888         return (-1);
889       }
890       if ((val = strdup(child->values[1].value.string)) == NULL) {
891         WARNING("cannot allocate memory for attribute value.");
892         sfree(key);
893         return (-1);
894       }
895       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
896       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
897       DEBUG("write_riemann: got attr: %s => %s", key, val);
898       sfree(key);
899       sfree(val);
900     } else if (strcasecmp(child->key, "tag") == 0) {
901       char *tmp = NULL;
902       status = cf_util_get_string(child, &tmp);
903       if (status != 0)
904         continue;
905
906       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
907       DEBUG("write_riemann plugin: Got tag: %s", tmp);
908       sfree(tmp);
909     } else {
910       WARNING("write_riemann plugin: Ignoring unknown "
911               "configuration option \"%s\" at top level.",
912               child->key);
913     }
914   }
915   return (0);
916 } /* }}} int wrr_config */
917
918 void module_register(void)
919 {
920   plugin_register_complex_config("write_riemann", wrr_config);
921 }
922
923 /* vim: set sw=8 sts=8 ts=8 noet : */