write_riemann: do not deadlock when sending from flush
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include <riemann/riemann-client.h>
32 #include <errno.h>
33 #include <pthread.h>
34
35 #include "collectd.h"
36 #include "plugin.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "utils_cache.h"
40 #include "utils_complain.h"
41 #include "write_riemann_threshold.h"
42
43 #define RIEMANN_HOST            "localhost"
44 #define RIEMANN_PORT            5555
45 #define RIEMANN_TTL_FACTOR      2.0
46 #define RIEMANN_BATCH_MAX      8192
47
48 struct riemann_host {
49     c_complain_t init_complaint;
50         char                    *name;
51         char                    *event_service_prefix;
52         pthread_mutex_t  lock;
53     _Bool            batch_mode;
54         _Bool            notifications;
55         _Bool            check_thresholds;
56         _Bool                    store_rates;
57         _Bool                    always_append_ds;
58         char                    *node;
59         int                      port;
60         riemann_client_type_t    client_type;
61         riemann_client_t        *client;
62         double                   ttl_factor;
63     cdtime_t         batch_init;
64     int              batch_max;
65     int              batch_timeout;
66         int                          reference_count;
67   riemann_message_t     *batch_msg;
68         char                     *tls_ca_file;
69         char                     *tls_cert_file;
70         char                     *tls_key_file;
71         struct timeval timeout;
72 };
73
74 static char     **riemann_tags;
75 static size_t     riemann_tags_num;
76 static char     **riemann_attrs;
77 static size_t     riemann_attrs_num;
78
79 /* host->lock must be held when calling this function. */
80 static int wrr_connect(struct riemann_host *host) /* {{{ */
81 {
82         char const              *node;
83         int                      port;
84
85         if (host->client)
86                 return 0;
87
88         node = (host->node != NULL) ? host->node : RIEMANN_HOST;
89         port = (host->port) ? host->port : RIEMANN_PORT;
90
91         host->client = NULL;
92
93         host->client = riemann_client_create(host->client_type, node, port,
94                                              RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
95                                              RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
96                                              RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
97                                              RIEMANN_CLIENT_OPTION_NONE);
98         if (host->client == NULL) {
99         c_complain (LOG_ERR, &host->init_complaint,
100                     "write_riemann plugin: Unable to connect to Riemann at %s:%d",
101                     node, port);
102                 return -1;
103         }
104         if (host->timeout.tv_sec != 0) {
105                 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
106                         riemann_client_free(host->client);
107                         host->client = NULL;
108             c_complain (LOG_ERR, &host->init_complaint,
109                         "write_riemann plugin: Unable to connect to Riemann at %s:%d",
110                         node, port);
111                         return -1;
112                 }
113         }
114
115     c_release (LOG_INFO, &host->init_complaint,
116                "write_riemann plugin: Successfully connected to %s:%d",
117                node, port);
118
119         return 0;
120 } /* }}} int wrr_connect */
121
122 /* host->lock must be held when calling this function. */
123 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
124 {
125         if (!host->client)
126                 return (0);
127
128         riemann_client_free(host->client);
129         host->client = NULL;
130
131         return (0);
132 } /* }}} int wrr_disconnect */
133
134 /**
135  * Function to send messages to riemann.
136  *
137  * Acquires the host lock, disconnects on errors.
138  */
139 static int wrr_send_nolock(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
140 {
141         int status = 0;
142
143         status = wrr_connect(host);
144         if (status != 0) {
145                 return status;
146     }
147
148         status = riemann_client_send_message(host->client, msg);
149         if (status != 0) {
150                 wrr_disconnect(host);
151                 return status;
152         }
153
154         /*
155          * For TCP we need to receive message acknowledgemenent.
156          */
157         if (host->client_type != RIEMANN_CLIENT_UDP)
158         {
159                 riemann_message_t *response;
160
161                 response = riemann_client_recv_message(host->client);
162
163                 if (response == NULL)
164                 {
165                         wrr_disconnect(host);
166                         return errno;
167                 }
168                 riemann_message_free(response);
169         }
170
171         return 0;
172 } /* }}} int wrr_send */
173
174 static int wrr_send(struct riemann_host *host, riemann_message_t *msg)
175 {
176     int status = 0;
177
178     pthread_mutex_lock (&host->lock);
179     status = wrr_send_nolock(host, msg);
180     pthread_mutex_unlock (&host->lock);
181     return status;
182 }
183
184 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
185                 notification_t const *n)
186 {
187         riemann_message_t *msg;
188         riemann_event_t *event;
189         char service_buffer[6 * DATA_MAX_NAME_LEN];
190         char const *severity;
191         notification_meta_t *meta;
192         size_t i;
193
194         switch (n->severity)
195         {
196                 case NOTIF_OKAY:        severity = "ok"; break;
197                 case NOTIF_WARNING:     severity = "warning"; break;
198                 case NOTIF_FAILURE:     severity = "critical"; break;
199                 default:                severity = "unknown";
200         }
201
202         format_name(service_buffer, sizeof(service_buffer),
203                     /* host = */ "", n->plugin, n->plugin_instance,
204                     n->type, n->type_instance);
205
206         event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
207                                      RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
208                                      RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
209                                      RIEMANN_EVENT_FIELD_STATE, severity,
210                                      RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
211                                      RIEMANN_EVENT_FIELD_NONE);
212
213         if (n->host[0] != 0)
214                 riemann_event_string_attribute_add(event, "host", n->host);
215         if (n->plugin[0] != 0)
216                 riemann_event_string_attribute_add(event, "plugin", n->plugin);
217         if (n->plugin_instance[0] != 0)
218                 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
219
220         if (n->type[0] != 0)
221                 riemann_event_string_attribute_add(event, "type", n->type);
222         if (n->type_instance[0] != 0)
223                 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
224
225         for (i = 0; i < riemann_attrs_num; i += 2)
226                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
227
228         for (i = 0; i < riemann_tags_num; i++)
229                 riemann_event_tag_add(event, riemann_tags[i]);
230
231         if (n->message[0] != 0)
232                 riemann_event_string_attribute_add(event, "description", n->message);
233
234         /* Pull in values from threshold and add extra attributes */
235         for (meta = n->meta; meta != NULL; meta = meta->next)
236         {
237                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
238                 {
239                         riemann_event_set(event,
240                                           RIEMANN_EVENT_FIELD_METRIC_D,
241                                           (double) meta->nm_value.nm_double,
242                                           RIEMANN_EVENT_FIELD_NONE);
243                         continue;
244                 }
245
246                 if (meta->type == NM_TYPE_STRING) {
247                         riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
248                         continue;
249                 }
250         }
251
252         msg = riemann_message_create_with_events(event, NULL);
253         if (msg == NULL)
254         {
255                 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
256                 riemann_event_free (event);
257                 return (NULL);
258         }
259
260         DEBUG("write_riemann plugin: Successfully created message for notification: "
261               "host = \"%s\", service = \"%s\", state = \"%s\"",
262               event->host, event->service, event->state);
263         return (msg);
264 } /* }}} riemann_message_t *wrr_notification_to_message */
265
266 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
267                                            data_set_t const *ds,
268                                            value_list_t const *vl, size_t index,
269                                            gauge_t const *rates,
270                                            int status)
271 {
272         riemann_event_t *event;
273         char name_buffer[5 * DATA_MAX_NAME_LEN];
274         char service_buffer[6 * DATA_MAX_NAME_LEN];
275         size_t i;
276
277         event = riemann_event_new();
278         if (event == NULL)
279         {
280                 ERROR("write_riemann plugin: riemann_event_new() failed.");
281                 return (NULL);
282         }
283
284         format_name(name_buffer, sizeof(name_buffer),
285                     /* host = */ "", vl->plugin, vl->plugin_instance,
286                     vl->type, vl->type_instance);
287         if (host->always_append_ds || (ds->ds_num > 1))
288         {
289                 if (host->event_service_prefix == NULL)
290                         ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
291                                   &name_buffer[1], ds->ds[index].name);
292                 else
293                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
294                                   host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
295         }
296         else
297         {
298                 if (host->event_service_prefix == NULL)
299                         sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
300                 else
301                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
302                                   host->event_service_prefix, &name_buffer[1]);
303         }
304
305         riemann_event_set(event,
306                           RIEMANN_EVENT_FIELD_HOST, vl->host,
307                           RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
308                           RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
309                           RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
310                           "plugin", vl->plugin,
311                           "type", vl->type,
312                           "ds_name", ds->ds[index].name,
313                           NULL,
314                           RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
315                           RIEMANN_EVENT_FIELD_NONE);
316
317         if (host->check_thresholds) {
318                 const char *state = NULL;
319
320                 switch (status) {
321                         case STATE_OKAY:
322                                 state = "ok";
323                                 break;
324                         case STATE_ERROR:
325                                 state = "critical";
326                                 break;
327                         case STATE_WARNING:
328                                 state = "warning";
329                                 break;
330                         case STATE_MISSING:
331                                 state = "unknown";
332                                 break;
333                 }
334                 if (state)
335                         riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
336                                           RIEMANN_EVENT_FIELD_NONE);
337         }
338
339         if (vl->plugin_instance[0] != 0)
340                 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
341         if (vl->type_instance[0] != 0)
342                 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
343
344         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
345         {
346                 char ds_type[DATA_MAX_NAME_LEN];
347
348                 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
349                           DS_TYPE_TO_STRING(ds->ds[index].type));
350                 riemann_event_string_attribute_add(event, "ds_type", ds_type);
351         }
352         else
353         {
354                 riemann_event_string_attribute_add(event, "ds_type",
355                                        DS_TYPE_TO_STRING(ds->ds[index].type));
356         }
357
358         {
359                 char ds_index[DATA_MAX_NAME_LEN];
360
361                 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
362                 riemann_event_string_attribute_add(event, "ds_index", ds_index);
363         }
364
365         for (i = 0; i < riemann_attrs_num; i += 2)
366                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
367
368         for (i = 0; i < riemann_tags_num; i++)
369                 riemann_event_tag_add(event, riemann_tags[i]);
370
371         if (ds->ds[index].type == DS_TYPE_GAUGE)
372         {
373                 riemann_event_set(event,
374                                   RIEMANN_EVENT_FIELD_METRIC_D,
375                                   (double) vl->values[index].gauge,
376                                   RIEMANN_EVENT_FIELD_NONE);
377         }
378         else if (rates != NULL)
379         {
380                 riemann_event_set(event,
381                                   RIEMANN_EVENT_FIELD_METRIC_D,
382                                   (double) rates[index],
383                                   RIEMANN_EVENT_FIELD_NONE);
384         }
385         else
386         {
387                 int64_t metric;
388
389                 if (ds->ds[index].type == DS_TYPE_DERIVE)
390                         metric = (int64_t) vl->values[index].derive;
391                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
392                         metric = (int64_t) vl->values[index].absolute;
393                 else
394                         metric = (int64_t) vl->values[index].counter;
395
396                 riemann_event_set(event,
397                                   RIEMANN_EVENT_FIELD_METRIC_S64,
398                                   (int64_t) metric,
399                                   RIEMANN_EVENT_FIELD_NONE);
400         }
401
402         DEBUG("write_riemann plugin: Successfully created message for metric: "
403               "host = \"%s\", service = \"%s\"",
404               event->host, event->service);
405         return (event);
406 } /* }}} riemann_event_t *wrr_value_to_event */
407
408 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
409                                                     data_set_t const *ds,
410                                                     value_list_t const *vl,
411                                                     int *statuses)
412 {
413         riemann_message_t *msg;
414         size_t i;
415         gauge_t *rates = NULL;
416
417         /* Initialize the Msg structure. */
418         msg = riemann_message_new();
419         if (msg == NULL)
420         {
421                 ERROR ("write_riemann plugin: riemann_message_new failed.");
422                 return (NULL);
423         }
424
425         if (host->store_rates)
426         {
427                 rates = uc_get_rate(ds, vl);
428                 if (rates == NULL)
429                 {
430                         ERROR("write_riemann plugin: uc_get_rate failed.");
431                         riemann_message_free(msg);
432                         return (NULL);
433                 }
434         }
435
436         for (i = 0; i < vl->values_len; i++)
437         {
438                 riemann_event_t *event;
439
440                 event = wrr_value_to_event(host, ds, vl,
441                                            (int) i, rates, statuses[i]);
442                 if (event == NULL)
443                 {
444                         riemann_message_free(msg);
445                         sfree(rates);
446                         return (NULL);
447                 }
448                 riemann_message_append_events(msg, event, NULL);
449         }
450
451         sfree(rates);
452         return (msg);
453 } /* }}} riemann_message_t *wrr_value_list_to_message */
454
455 /*
456  * Always call while holding host->lock !
457  */
458 static int wrr_batch_flush_nolock(cdtime_t timeout,
459                                   struct riemann_host *host)
460 {
461         cdtime_t    now;
462         int         status = 0;
463
464         if (timeout > 0) {
465                 now = cdtime();
466                 if ((host->batch_init + timeout) > now)
467                         return status;
468         }
469         wrr_send_nolock(host, host->batch_msg);
470         riemann_message_free(host->batch_msg);
471
472         if (host->client_type != RIEMANN_CLIENT_UDP)
473         {
474                 riemann_message_t *response;
475
476                 response = riemann_client_recv_message(host->client);
477
478                 if (!response)
479                 {
480                         wrr_disconnect(host);
481                         return errno;
482                 }
483
484                 riemann_message_free(response);
485         }
486
487         host->batch_init = cdtime();
488         host->batch_msg = NULL;
489         return status;
490 }
491
492 static int wrr_batch_flush(cdtime_t timeout,
493         const char *identifier __attribute__((unused)),
494         user_data_t *user_data)
495 {
496         struct riemann_host *host;
497         int status;
498
499         if (user_data == NULL)
500                 return (-EINVAL);
501
502         host = user_data->data;
503         pthread_mutex_lock(&host->lock);
504         status = wrr_batch_flush_nolock(timeout, host);
505         if (status != 0)
506         c_complain (LOG_ERR, &host->init_complaint,
507                     "write_riemann plugin: riemann_client_send failed with status %i",
508                     status);
509     else
510         c_release (LOG_DEBUG, &host->init_complaint, "write_riemann plugin: batch sent.");
511
512         pthread_mutex_unlock(&host->lock);
513         return status;
514 }
515
516 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
517                                     data_set_t const *ds,
518                                     value_list_t const *vl,
519                                     int *statuses)
520 {
521         riemann_message_t *msg;
522         size_t len;
523         int ret;
524     cdtime_t timeout;
525
526         msg = wrr_value_list_to_message(host, ds, vl, statuses);
527         if (msg == NULL)
528                 return -1;
529
530         pthread_mutex_lock(&host->lock);
531
532         if (host->batch_msg == NULL) {
533                 host->batch_msg = msg;
534         } else {
535                 int status;
536
537                 status = riemann_message_append_events_n(host->batch_msg,
538                                                          msg->n_events,
539                                                          msg->events);
540                 msg->n_events = 0;
541                 msg->events = NULL;
542
543                 riemann_message_free(msg);
544
545                 if (status != 0) {
546                         pthread_mutex_unlock(&host->lock);
547                         ERROR("write_riemann plugin: out of memory");
548                         return -1;
549                 }
550         }
551
552         len = riemann_message_get_packed_size(host->batch_msg);
553         ret = 0;
554         if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
555                 ret = wrr_batch_flush_nolock(0, host);
556         } else {
557         if (host->batch_timeout > 0) {
558             timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
559             ret = wrr_batch_flush_nolock(timeout, host);
560         }
561     }
562
563         pthread_mutex_unlock(&host->lock);
564         return ret;
565 } /* }}} riemann_message_t *wrr_batch_add_value_list */
566
567 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
568 {
569         int                      status;
570         struct riemann_host     *host = ud->data;
571         riemann_message_t       *msg;
572
573         if (!host->notifications)
574                 return 0;
575
576     /*
577      * Never batch for notifications, send them ASAP
578      */
579         msg = wrr_notification_to_message(host, n);
580         if (msg == NULL)
581                 return (-1);
582
583         status = wrr_send(host, msg);
584         if (status != 0)
585         c_complain (LOG_ERR, &host->init_complaint,
586                     "write_riemann plugin: riemann_client_send failed with status %i",
587                     status);
588     else
589         c_release (LOG_DEBUG, &host->init_complaint,
590                    "write_riemann plugin: riemann_client_send succeeded");
591
592         riemann_message_free(msg);
593         return (status);
594 } /* }}} int wrr_notification */
595
596 static int wrr_write(const data_set_t *ds, /* {{{ */
597               const value_list_t *vl,
598               user_data_t *ud)
599 {
600         int                      status = 0;
601         int                      statuses[vl->values_len];
602         struct riemann_host     *host = ud->data;
603         riemann_message_t       *msg;
604
605         if (host->check_thresholds) {
606                 status = write_riemann_threshold_check(ds, vl, statuses);
607     if (status != 0)
608       return status;
609   } else {
610     memset (statuses, 0, sizeof (statuses));
611   }
612
613   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
614     wrr_batch_add_value_list(host, ds, vl, statuses);
615   } else {
616     msg = wrr_value_list_to_message(host, ds, vl, statuses);
617     if (msg == NULL)
618       return (-1);
619
620     status = wrr_send(host, msg);
621
622     riemann_message_free(msg);
623   }
624   return status;
625 } /* }}} int wrr_write */
626
627 static void wrr_free(void *p) /* {{{ */
628 {
629   struct riemann_host   *host = p;
630
631   if (host == NULL)
632     return;
633
634   pthread_mutex_lock(&host->lock);
635
636   host->reference_count--;
637   if (host->reference_count > 0)
638     {
639       pthread_mutex_unlock(&host->lock);
640       return;
641     }
642
643   wrr_disconnect(host);
644
645   pthread_mutex_destroy(&host->lock);
646   sfree(host);
647 } /* }}} void wrr_free */
648
649 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
650 {
651   struct riemann_host   *host = NULL;
652   int                    status = 0;
653   int                    i;
654   oconfig_item_t                *child;
655   char                   callback_name[DATA_MAX_NAME_LEN];
656   user_data_t            ud;
657
658   if ((host = calloc(1, sizeof(*host))) == NULL) {
659     ERROR ("write_riemann plugin: calloc failed.");
660     return ENOMEM;
661   }
662   pthread_mutex_init(&host->lock, NULL);
663   C_COMPLAIN_INIT (&host->init_complaint);
664   host->reference_count = 1;
665   host->node = NULL;
666   host->port = 0;
667   host->notifications = 1;
668   host->check_thresholds = 0;
669   host->store_rates = 1;
670   host->always_append_ds = 0;
671   host->batch_mode = 1;
672   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
673   host->batch_init = cdtime();
674   host->batch_timeout = 0;
675   host->ttl_factor = RIEMANN_TTL_FACTOR;
676   host->client = NULL;
677   host->client_type = RIEMANN_CLIENT_TCP;
678   host->timeout.tv_sec = 0;
679   host->timeout.tv_usec = 0;
680
681   status = cf_util_get_string(ci, &host->name);
682   if (status != 0) {
683     WARNING("write_riemann plugin: Required host name is missing.");
684     wrr_free(host);
685     return -1;
686   }
687
688   for (i = 0; i < ci->children_num; i++) {
689     /*
690      * The code here could be simplified but makes room
691      * for easy adding of new options later on.
692      */
693     child = &ci->children[i];
694     status = 0;
695
696     if (strcasecmp("Host", child->key) == 0) {
697       status = cf_util_get_string(child, &host->node);
698       if (status != 0)
699         break;
700     } else if (strcasecmp("Notifications", child->key) == 0) {
701       status = cf_util_get_boolean(child, &host->notifications);
702       if (status != 0)
703         break;
704     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
705       status = cf_util_get_string(child, &host->event_service_prefix);
706       if (status != 0)
707         break;
708     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
709       status = cf_util_get_boolean(child, &host->check_thresholds);
710       if (status != 0)
711         break;
712     } else if (strcasecmp("Batch", child->key) == 0) {
713       status = cf_util_get_boolean(child, &host->batch_mode);
714       if (status != 0)
715         break;
716     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
717       status = cf_util_get_int(child, &host->batch_max);
718       if (status != 0)
719         break;
720     } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
721       status = cf_util_get_int(child, &host->batch_timeout);
722       if (status != 0)
723         break;
724     } else if (strcasecmp("Timeout", child->key) == 0) {
725       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
726       if (status != 0)
727         break;
728     } else if (strcasecmp("Port", child->key) == 0) {
729       host->port = cf_util_get_port_number(child);
730       if (host->port == -1) {
731         ERROR("write_riemann plugin: Invalid argument "
732               "configured for the \"Port\" "
733               "option.");
734         break;
735       }
736     } else if (strcasecmp("Protocol", child->key) == 0) {
737       char tmp[16];
738       status = cf_util_get_string_buffer(child,
739                                          tmp, sizeof(tmp));
740       if (status != 0)
741         {
742           ERROR("write_riemann plugin: cf_util_get_"
743                 "string_buffer failed with "
744                 "status %i.", status);
745           break;
746         }
747
748       if (strcasecmp("UDP", tmp) == 0)
749         host->client_type = RIEMANN_CLIENT_UDP;
750       else if (strcasecmp("TCP", tmp) == 0)
751         host->client_type = RIEMANN_CLIENT_TCP;
752       else if (strcasecmp("TLS", tmp) == 0)
753         host->client_type = RIEMANN_CLIENT_TLS;
754       else
755         WARNING("write_riemann plugin: The value "
756                 "\"%s\" is not valid for the "
757                 "\"Protocol\" option. Use "
758                 "either \"UDP\", \"TCP\" or \"TLS\".",
759                 tmp);
760     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
761       status = cf_util_get_string(child, &host->tls_ca_file);
762       if (status != 0)
763         {
764           ERROR("write_riemann plugin: cf_util_get_"
765                 "string_buffer failed with "
766                 "status %i.", status);
767           break;
768         }
769     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
770       status = cf_util_get_string(child, &host->tls_cert_file);
771       if (status != 0)
772         {
773           ERROR("write_riemann plugin: cf_util_get_"
774                 "string_buffer failed with "
775                 "status %i.", status);
776           break;
777         }
778     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
779       status = cf_util_get_string(child, &host->tls_key_file);
780       if (status != 0)
781         {
782           ERROR("write_riemann plugin: cf_util_get_"
783                 "string_buffer failed with "
784                 "status %i.", status);
785           break;
786         }
787     } else if (strcasecmp("StoreRates", child->key) == 0) {
788       status = cf_util_get_boolean(child, &host->store_rates);
789       if (status != 0)
790         break;
791     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
792       status = cf_util_get_boolean(child,
793                                    &host->always_append_ds);
794       if (status != 0)
795         break;
796     } else if (strcasecmp("TTLFactor", child->key) == 0) {
797       double tmp = NAN;
798       status = cf_util_get_double(child, &tmp);
799       if (status != 0)
800         break;
801       if (tmp >= 2.0) {
802         host->ttl_factor = tmp;
803       } else if (tmp >= 1.0) {
804         NOTICE("write_riemann plugin: The configured "
805                "TTLFactor is very small "
806                "(%.1f). A value of 2.0 or "
807                "greater is recommended.",
808                tmp);
809         host->ttl_factor = tmp;
810       } else if (tmp > 0.0) {
811         WARNING("write_riemann plugin: The configured "
812                 "TTLFactor is too small to be "
813                 "useful (%.1f). I'll use it "
814                 "since the user knows best, "
815                 "but under protest.",
816                 tmp);
817         host->ttl_factor = tmp;
818       } else { /* zero, negative and NAN */
819         ERROR("write_riemann plugin: The configured "
820               "TTLFactor is invalid (%.1f).",
821               tmp);
822       }
823     } else {
824       WARNING("write_riemann plugin: ignoring unknown config "
825               "option: \"%s\"", child->key);
826     }
827   }
828   if (status != 0) {
829     wrr_free(host);
830     return status;
831   }
832
833   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
834             host->name);
835   ud.data = host;
836   ud.free_func = wrr_free;
837
838   pthread_mutex_lock(&host->lock);
839
840   status = plugin_register_write(callback_name, wrr_write, &ud);
841
842   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
843     ud.free_func = NULL;
844     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
845   }
846   if (status != 0)
847     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
848             "failed with status %i.",
849             callback_name, status);
850   else /* success */
851     host->reference_count++;
852
853   status = plugin_register_notification(callback_name,
854                                         wrr_notification, &ud);
855   if (status != 0)
856     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
857             "failed with status %i.",
858             callback_name, status);
859   else /* success */
860     host->reference_count++;
861
862   if (host->reference_count <= 1)
863     {
864       /* Both callbacks failed => free memory.
865        * We need to unlock here, because riemann_free() will lock.
866        * This is not a race condition, because we're the only one
867        * holding a reference. */
868       pthread_mutex_unlock(&host->lock);
869       wrr_free(host);
870       return (-1);
871     }
872
873   host->reference_count--;
874   pthread_mutex_unlock(&host->lock);
875
876   return status;
877 } /* }}} int wrr_config_node */
878
879 static int wrr_config(oconfig_item_t *ci) /* {{{ */
880 {
881   int            i;
882   oconfig_item_t        *child;
883   int            status;
884
885   for (i = 0; i < ci->children_num; i++)  {
886     child = &ci->children[i];
887
888     if (strcasecmp("Node", child->key) == 0) {
889       wrr_config_node (child);
890     } else if (strcasecmp(child->key, "attribute") == 0) {
891       char *key = NULL;
892       char *val = NULL;
893
894       if (child->values_num != 2) {
895         WARNING("riemann attributes need both a key and a value.");
896         return (-1);
897       }
898       if (child->values[0].type != OCONFIG_TYPE_STRING ||
899           child->values[1].type != OCONFIG_TYPE_STRING) {
900         WARNING("riemann attribute needs string arguments.");
901         return (-1);
902       }
903       if ((key = strdup(child->values[0].value.string)) == NULL) {
904         WARNING("cannot allocate memory for attribute key.");
905         return (-1);
906       }
907       if ((val = strdup(child->values[1].value.string)) == NULL) {
908         WARNING("cannot allocate memory for attribute value.");
909         sfree(key);
910         return (-1);
911       }
912       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
913       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
914       DEBUG("write_riemann: got attr: %s => %s", key, val);
915       sfree(key);
916       sfree(val);
917     } else if (strcasecmp(child->key, "tag") == 0) {
918       char *tmp = NULL;
919       status = cf_util_get_string(child, &tmp);
920       if (status != 0)
921         continue;
922
923       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
924       DEBUG("write_riemann plugin: Got tag: %s", tmp);
925       sfree(tmp);
926     } else {
927       WARNING("write_riemann plugin: Ignoring unknown "
928               "configuration option \"%s\" at top level.",
929               child->key);
930     }
931   }
932   return (0);
933 } /* }}} int wrr_config */
934
935 void module_register(void)
936 {
937   plugin_register_complex_config("write_riemann", wrr_config);
938 }
939
940 /* vim: set sw=8 sts=8 ts=8 noet : */