write_riemann: Add support for timeouts
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include <riemann/riemann-client.h>
32 #include <errno.h>
33 #include <pthread.h>
34
35 #include "collectd.h"
36 #include "plugin.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "utils_cache.h"
40 #include "write_riemann_threshold.h"
41
42 #define RIEMANN_HOST            "localhost"
43 #define RIEMANN_PORT            5555
44 #define RIEMANN_TTL_FACTOR      2.0
45 #define RIEMANN_BATCH_MAX      8192
46
47 struct riemann_host {
48         char                    *name;
49         char                    *event_service_prefix;
50         pthread_mutex_t  lock;
51     _Bool            batch_mode;
52         _Bool            notifications;
53         _Bool            check_thresholds;
54         _Bool                    store_rates;
55         _Bool                    always_append_ds;
56         char                    *node;
57         int                      port;
58         riemann_client_type_t    client_type;
59         riemann_client_t        *client;
60         double                   ttl_factor;
61     cdtime_t         batch_init;
62     int              batch_max;
63         int                          reference_count;
64   riemann_message_t     *batch_msg;
65         char                     *tls_ca_file;
66         char                     *tls_cert_file;
67         char                     *tls_key_file;
68         struct timeval timeout;
69 };
70
71 static char     **riemann_tags;
72 static size_t     riemann_tags_num;
73 static char     **riemann_attrs;
74 static size_t     riemann_attrs_num;
75
76 /* host->lock must be held when calling this function. */
77 static int wrr_connect(struct riemann_host *host) /* {{{ */
78 {
79         char const              *node;
80         int                      port;
81
82         if (host->client)
83                 return 0;
84
85         node = (host->node != NULL) ? host->node : RIEMANN_HOST;
86         port = (host->port) ? host->port : RIEMANN_PORT;
87
88         host->client = NULL;
89
90         host->client = riemann_client_create(host->client_type, node, port,
91                                              RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
92                                              RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
93                                              RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
94                                              RIEMANN_CLIENT_OPTION_NONE);
95         if (host->client == NULL) {
96                 WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d",
97                         node, port);
98                 return -1;
99         }
100         if (host->timeout.tv_sec != 0) {
101                 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
102                         riemann_client_free(host->client);
103                         host->client = NULL;
104                         WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d",
105                                 node, port);
106                         return -1;
107                 }
108         }
109         DEBUG("write_riemann plugin: got a successful connection for: %s:%d",
110               node, port);
111
112         return 0;
113 } /* }}} int wrr_connect */
114
115 /* host->lock must be held when calling this function. */
116 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
117 {
118         if (!host->client)
119                 return (0);
120
121         riemann_client_free(host->client);
122         host->client = NULL;
123
124         return (0);
125 } /* }}} int wrr_disconnect */
126
127 /**
128  * Function to send messages to riemann.
129  *
130  * Acquires the host lock, disconnects on errors.
131  */
132 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
133 {
134         int status = 0;
135         pthread_mutex_lock (&host->lock);
136
137         status = wrr_connect(host);
138         if (status != 0)
139                 return status;
140
141         status = riemann_client_send_message(host->client, msg);
142         if (status != 0) {
143                 wrr_disconnect(host);
144                 pthread_mutex_unlock(&host->lock);
145                 return status;
146         }
147
148         /*
149          * For TCP we need to receive message acknowledgemenent.
150          */
151         if (host->client_type != RIEMANN_CLIENT_UDP)
152         {
153                 riemann_message_t *response;
154
155                 response = riemann_client_recv_message(host->client);
156
157                 if (response == NULL)
158                 {
159                         wrr_disconnect(host);
160                         pthread_mutex_unlock(&host->lock);
161                         return errno;
162                 }
163                 riemann_message_free(response);
164         }
165
166         pthread_mutex_unlock (&host->lock);
167         return 0;
168 } /* }}} int wrr_send */
169
170 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
171                 notification_t const *n)
172 {
173         riemann_message_t *msg;
174         riemann_event_t *event;
175         char service_buffer[6 * DATA_MAX_NAME_LEN];
176         char const *severity;
177         notification_meta_t *meta;
178         size_t i;
179
180         switch (n->severity)
181         {
182                 case NOTIF_OKAY:        severity = "ok"; break;
183                 case NOTIF_WARNING:     severity = "warning"; break;
184                 case NOTIF_FAILURE:     severity = "critical"; break;
185                 default:                severity = "unknown";
186         }
187
188         format_name(service_buffer, sizeof(service_buffer),
189                     /* host = */ "", n->plugin, n->plugin_instance,
190                     n->type, n->type_instance);
191
192         event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
193                                      RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
194                                      RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
195                                      RIEMANN_EVENT_FIELD_STATE, severity,
196                                      RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
197                                      RIEMANN_EVENT_FIELD_NONE);
198
199         if (n->host[0] != 0)
200                 riemann_event_string_attribute_add(event, "host", n->host);
201         if (n->plugin[0] != 0)
202                 riemann_event_string_attribute_add(event, "plugin", n->plugin);
203         if (n->plugin_instance[0] != 0)
204                 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
205
206         if (n->type[0] != 0)
207                 riemann_event_string_attribute_add(event, "type", n->type);
208         if (n->type_instance[0] != 0)
209                 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
210
211         for (i = 0; i < riemann_attrs_num; i += 2)
212                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
213
214         for (i = 0; i < riemann_tags_num; i++)
215                 riemann_event_tag_add(event, riemann_tags[i]);
216
217         if (n->message[0] != 0)
218                 riemann_event_string_attribute_add(event, "description", n->message);
219
220         /* Pull in values from threshold and add extra attributes */
221         for (meta = n->meta; meta != NULL; meta = meta->next)
222         {
223                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
224                 {
225                         riemann_event_set(event,
226                                           RIEMANN_EVENT_FIELD_METRIC_D,
227                                           (double) meta->nm_value.nm_double,
228                                           RIEMANN_EVENT_FIELD_NONE);
229                         continue;
230                 }
231
232                 if (meta->type == NM_TYPE_STRING) {
233                         riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
234                         continue;
235                 }
236         }
237
238         msg = riemann_message_create_with_events(event, NULL);
239         if (msg == NULL)
240         {
241                 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
242                 riemann_event_free (event);
243                 return (NULL);
244         }
245
246         DEBUG("write_riemann plugin: Successfully created message for notification: "
247               "host = \"%s\", service = \"%s\", state = \"%s\"",
248               event->host, event->service, event->state);
249         return (msg);
250 } /* }}} riemann_message_t *wrr_notification_to_message */
251
252 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
253                                            data_set_t const *ds,
254                                            value_list_t const *vl, size_t index,
255                                            gauge_t const *rates,
256                                            int status)
257 {
258         riemann_event_t *event;
259         char name_buffer[5 * DATA_MAX_NAME_LEN];
260         char service_buffer[6 * DATA_MAX_NAME_LEN];
261         size_t i;
262
263         event = riemann_event_new();
264         if (event == NULL)
265         {
266                 ERROR("write_riemann plugin: riemann_event_new() failed.");
267                 return (NULL);
268         }
269
270         format_name(name_buffer, sizeof(name_buffer),
271                     /* host = */ "", vl->plugin, vl->plugin_instance,
272                     vl->type, vl->type_instance);
273         if (host->always_append_ds || (ds->ds_num > 1))
274         {
275                 if (host->event_service_prefix == NULL)
276                         ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
277                                   &name_buffer[1], ds->ds[index].name);
278                 else
279                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
280                                   host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
281         }
282         else
283         {
284                 if (host->event_service_prefix == NULL)
285                         sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
286                 else
287                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
288                                   host->event_service_prefix, &name_buffer[1]);
289         }
290
291         riemann_event_set(event,
292                           RIEMANN_EVENT_FIELD_HOST, vl->host,
293                           RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
294                           RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
295                           RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
296                           "plugin", vl->plugin,
297                           "type", vl->type,
298                           "ds_name", ds->ds[index].name,
299                           NULL,
300                           RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
301                           RIEMANN_EVENT_FIELD_NONE);
302
303         if (host->check_thresholds) {
304                 const char *state = NULL;
305
306                 switch (status) {
307                         case STATE_OKAY:
308                                 state = "ok";
309                                 break;
310                         case STATE_ERROR:
311                                 state = "critical";
312                                 break;
313                         case STATE_WARNING:
314                                 state = "warning";
315                                 break;
316                         case STATE_MISSING:
317                                 state = "unknown";
318                                 break;
319                 }
320                 if (state)
321                         riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
322                                           RIEMANN_EVENT_FIELD_NONE);
323         }
324
325         if (vl->plugin_instance[0] != 0)
326                 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
327         if (vl->type_instance[0] != 0)
328                 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
329
330         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
331         {
332                 char ds_type[DATA_MAX_NAME_LEN];
333
334                 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
335                           DS_TYPE_TO_STRING(ds->ds[index].type));
336                 riemann_event_string_attribute_add(event, "ds_type", ds_type);
337         }
338         else
339         {
340                 riemann_event_string_attribute_add(event, "ds_type",
341                                        DS_TYPE_TO_STRING(ds->ds[index].type));
342         }
343
344         {
345                 char ds_index[DATA_MAX_NAME_LEN];
346
347                 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
348                 riemann_event_string_attribute_add(event, "ds_index", ds_index);
349         }
350
351         for (i = 0; i < riemann_attrs_num; i += 2)
352                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
353
354         for (i = 0; i < riemann_tags_num; i++)
355                 riemann_event_tag_add(event, riemann_tags[i]);
356
357         if (ds->ds[index].type == DS_TYPE_GAUGE)
358         {
359                 riemann_event_set(event,
360                                   RIEMANN_EVENT_FIELD_METRIC_D,
361                                   (double) vl->values[index].gauge,
362                                   RIEMANN_EVENT_FIELD_NONE);
363         }
364         else if (rates != NULL)
365         {
366                 riemann_event_set(event,
367                                   RIEMANN_EVENT_FIELD_METRIC_D,
368                                   (double) rates[index],
369                                   RIEMANN_EVENT_FIELD_NONE);
370         }
371         else
372         {
373                 int64_t metric;
374
375                 if (ds->ds[index].type == DS_TYPE_DERIVE)
376                         metric = (int64_t) vl->values[index].derive;
377                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
378                         metric = (int64_t) vl->values[index].absolute;
379                 else
380                         metric = (int64_t) vl->values[index].counter;
381
382                 riemann_event_set(event,
383                                   RIEMANN_EVENT_FIELD_METRIC_S64,
384                                   (int64_t) metric,
385                                   RIEMANN_EVENT_FIELD_NONE);
386         }
387
388         DEBUG("write_riemann plugin: Successfully created message for metric: "
389               "host = \"%s\", service = \"%s\"",
390               event->host, event->service);
391         return (event);
392 } /* }}} riemann_event_t *wrr_value_to_event */
393
394 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
395                                                     data_set_t const *ds,
396                                                     value_list_t const *vl,
397                                                     int *statuses)
398 {
399         riemann_message_t *msg;
400         size_t i;
401         gauge_t *rates = NULL;
402
403         /* Initialize the Msg structure. */
404         msg = riemann_message_new();
405         if (msg == NULL)
406         {
407                 ERROR ("write_riemann plugin: riemann_message_new failed.");
408                 return (NULL);
409         }
410
411         if (host->store_rates)
412         {
413                 rates = uc_get_rate(ds, vl);
414                 if (rates == NULL)
415                 {
416                         ERROR("write_riemann plugin: uc_get_rate failed.");
417                         riemann_message_free(msg);
418                         return (NULL);
419                 }
420         }
421
422         for (i = 0; i < vl->values_len; i++)
423         {
424                 riemann_event_t *event;
425
426                 event = wrr_value_to_event(host, ds, vl,
427                                            (int) i, rates, statuses[i]);
428                 if (event == NULL)
429                 {
430                         riemann_message_free(msg);
431                         sfree(rates);
432                         return (NULL);
433                 }
434                 riemann_message_append_events(msg, event, NULL);
435         }
436
437         sfree(rates);
438         return (msg);
439 } /* }}} riemann_message_t *wrr_value_list_to_message */
440
441 /*
442  * Always call while holding host->lock !
443  */
444 static int wrr_batch_flush_nolock(cdtime_t timeout,
445                                   struct riemann_host *host)
446 {
447         cdtime_t    now;
448         int         status = 0;
449
450         if (timeout > 0) {
451                 now = cdtime();
452                 if ((host->batch_init + timeout) > now)
453                         return status;
454         }
455         wrr_send(host, host->batch_msg);
456         riemann_message_free(host->batch_msg);
457
458         if (host->client_type != RIEMANN_CLIENT_UDP)
459         {
460                 riemann_message_t *response;
461
462                 response = riemann_client_recv_message(host->client);
463
464                 if (!response)
465                 {
466                         wrr_disconnect(host);
467                         return errno;
468                 }
469
470                 riemann_message_free(response);
471         }
472
473         host->batch_init = cdtime();
474         host->batch_msg = NULL;
475         return status;
476 }
477
478 static int wrr_batch_flush(cdtime_t timeout,
479         const char *identifier __attribute__((unused)),
480         user_data_t *user_data)
481 {
482         struct riemann_host *host;
483         int status;
484
485         if (user_data == NULL)
486                 return (-EINVAL);
487
488         host = user_data->data;
489         pthread_mutex_lock(&host->lock);
490         status = wrr_batch_flush_nolock(timeout, host);
491         if (status != 0)
492                 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
493                       status);
494
495         pthread_mutex_unlock(&host->lock);
496         return status;
497 }
498
499 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
500                                     data_set_t const *ds,
501                                     value_list_t const *vl,
502                                     int *statuses)
503 {
504         riemann_message_t *msg;
505         size_t len;
506         int ret;
507
508         msg = wrr_value_list_to_message(host, ds, vl, statuses);
509         if (msg == NULL)
510                 return -1;
511
512         pthread_mutex_lock(&host->lock);
513
514         if (host->batch_msg == NULL) {
515                 host->batch_msg = msg;
516         } else {
517                 int status;
518
519                 status = riemann_message_append_events_n(host->batch_msg,
520                                                          msg->n_events,
521                                                          msg->events);
522                 msg->n_events = 0;
523                 msg->events = NULL;
524
525                 riemann_message_free(msg);
526
527                 if (status != 0) {
528                         pthread_mutex_unlock(&host->lock);
529                         ERROR("write_riemann plugin: out of memory");
530                         return -1;
531                 }
532         }
533
534         len = riemann_message_get_packed_size(host->batch_msg);
535         ret = 0;
536         if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
537                 ret = wrr_batch_flush_nolock(0, host);
538         }
539
540         pthread_mutex_unlock(&host->lock);
541         return ret;
542 } /* }}} riemann_message_t *wrr_batch_add_value_list */
543
544 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
545 {
546         int                      status;
547         struct riemann_host     *host = ud->data;
548         riemann_message_t       *msg;
549
550         if (!host->notifications)
551                 return 0;
552
553     /*
554      * Never batch for notifications, send them ASAP
555      */
556         msg = wrr_notification_to_message(host, n);
557         if (msg == NULL)
558                 return (-1);
559
560         status = wrr_send(host, msg);
561         if (status != 0)
562                 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
563                       status);
564
565         riemann_message_free(msg);
566         return (status);
567 } /* }}} int wrr_notification */
568
569 static int wrr_write(const data_set_t *ds, /* {{{ */
570               const value_list_t *vl,
571               user_data_t *ud)
572 {
573         int                      status = 0;
574         int                      statuses[vl->values_len];
575         struct riemann_host     *host = ud->data;
576         riemann_message_t       *msg;
577
578         if (host->check_thresholds) {
579                 status = write_riemann_threshold_check(ds, vl, statuses);
580     if (status != 0)
581       return status;
582   } else {
583     memset (statuses, 0, sizeof (statuses));
584   }
585
586   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
587     wrr_batch_add_value_list(host, ds, vl, statuses);
588   } else {
589     msg = wrr_value_list_to_message(host, ds, vl, statuses);
590     if (msg == NULL)
591       return (-1);
592
593     status = wrr_send(host, msg);
594     if (status != 0)
595       ERROR("write_riemann plugin: riemann_client_send failed with status %i",
596             status);
597
598     riemann_message_free(msg);
599   }
600   return status;
601 } /* }}} int wrr_write */
602
603 static void wrr_free(void *p) /* {{{ */
604 {
605   struct riemann_host   *host = p;
606
607   if (host == NULL)
608     return;
609
610   pthread_mutex_lock(&host->lock);
611
612   host->reference_count--;
613   if (host->reference_count > 0)
614     {
615       pthread_mutex_unlock(&host->lock);
616       return;
617     }
618
619   wrr_disconnect(host);
620
621   pthread_mutex_destroy(&host->lock);
622   sfree(host);
623 } /* }}} void wrr_free */
624
625 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
626 {
627   struct riemann_host   *host = NULL;
628   int                    status = 0;
629   int                    i;
630   oconfig_item_t                *child;
631   char                   callback_name[DATA_MAX_NAME_LEN];
632   user_data_t            ud;
633
634   if ((host = calloc(1, sizeof(*host))) == NULL) {
635     ERROR ("write_riemann plugin: calloc failed.");
636     return ENOMEM;
637   }
638   pthread_mutex_init(&host->lock, NULL);
639   host->reference_count = 1;
640   host->node = NULL;
641   host->port = 0;
642   host->notifications = 1;
643   host->check_thresholds = 0;
644   host->store_rates = 1;
645   host->always_append_ds = 0;
646   host->batch_mode = 1;
647   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
648   host->batch_init = cdtime();
649   host->ttl_factor = RIEMANN_TTL_FACTOR;
650   host->client = NULL;
651   host->client_type = RIEMANN_CLIENT_TCP;
652   host->timeout.tv_sec = 0;
653   host->timeout.tv_usec = 0;
654
655   status = cf_util_get_string(ci, &host->name);
656   if (status != 0) {
657     WARNING("write_riemann plugin: Required host name is missing.");
658     wrr_free(host);
659     return -1;
660   }
661
662   for (i = 0; i < ci->children_num; i++) {
663     /*
664      * The code here could be simplified but makes room
665      * for easy adding of new options later on.
666      */
667     child = &ci->children[i];
668     status = 0;
669
670     if (strcasecmp("Host", child->key) == 0) {
671       status = cf_util_get_string(child, &host->node);
672       if (status != 0)
673         break;
674     } else if (strcasecmp("Notifications", child->key) == 0) {
675       status = cf_util_get_boolean(child, &host->notifications);
676       if (status != 0)
677         break;
678     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
679       status = cf_util_get_string(child, &host->event_service_prefix);
680       if (status != 0)
681         break;
682     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
683       status = cf_util_get_boolean(child, &host->check_thresholds);
684       if (status != 0)
685         break;
686     } else if (strcasecmp("Batch", child->key) == 0) {
687       status = cf_util_get_boolean(child, &host->batch_mode);
688       if (status != 0)
689         break;
690     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
691       status = cf_util_get_int(child, &host->batch_max);
692       if (status != 0)
693         break;
694     } else if (strcasecmp("Timeout", child->key) == 0) {
695       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
696       if (status != 0)
697         break;
698     } else if (strcasecmp("Port", child->key) == 0) {
699       host->port = cf_util_get_port_number(child);
700       if (host->port == -1) {
701         ERROR("write_riemann plugin: Invalid argument "
702               "configured for the \"Port\" "
703               "option.");
704         break;
705       }
706     } else if (strcasecmp("Protocol", child->key) == 0) {
707       char tmp[16];
708       status = cf_util_get_string_buffer(child,
709                                          tmp, sizeof(tmp));
710       if (status != 0)
711         {
712           ERROR("write_riemann plugin: cf_util_get_"
713                 "string_buffer failed with "
714                 "status %i.", status);
715           break;
716         }
717
718       if (strcasecmp("UDP", tmp) == 0)
719         host->client_type = RIEMANN_CLIENT_UDP;
720       else if (strcasecmp("TCP", tmp) == 0)
721         host->client_type = RIEMANN_CLIENT_TCP;
722       else if (strcasecmp("TLS", tmp) == 0)
723         host->client_type = RIEMANN_CLIENT_TLS;
724       else
725         WARNING("write_riemann plugin: The value "
726                 "\"%s\" is not valid for the "
727                 "\"Protocol\" option. Use "
728                 "either \"UDP\", \"TCP\" or \"TLS\".",
729                 tmp);
730     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
731       status = cf_util_get_string(child, &host->tls_ca_file);
732       if (status != 0)
733         {
734           ERROR("write_riemann plugin: cf_util_get_"
735                 "string_buffer failed with "
736                 "status %i.", status);
737           break;
738         }
739     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
740       status = cf_util_get_string(child, &host->tls_cert_file);
741       if (status != 0)
742         {
743           ERROR("write_riemann plugin: cf_util_get_"
744                 "string_buffer failed with "
745                 "status %i.", status);
746           break;
747         }
748     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
749       status = cf_util_get_string(child, &host->tls_key_file);
750       if (status != 0)
751         {
752           ERROR("write_riemann plugin: cf_util_get_"
753                 "string_buffer failed with "
754                 "status %i.", status);
755           break;
756         }
757     } else if (strcasecmp("StoreRates", child->key) == 0) {
758       status = cf_util_get_boolean(child, &host->store_rates);
759       if (status != 0)
760         break;
761     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
762       status = cf_util_get_boolean(child,
763                                    &host->always_append_ds);
764       if (status != 0)
765         break;
766     } else if (strcasecmp("TTLFactor", child->key) == 0) {
767       double tmp = NAN;
768       status = cf_util_get_double(child, &tmp);
769       if (status != 0)
770         break;
771       if (tmp >= 2.0) {
772         host->ttl_factor = tmp;
773       } else if (tmp >= 1.0) {
774         NOTICE("write_riemann plugin: The configured "
775                "TTLFactor is very small "
776                "(%.1f). A value of 2.0 or "
777                "greater is recommended.",
778                tmp);
779         host->ttl_factor = tmp;
780       } else if (tmp > 0.0) {
781         WARNING("write_riemann plugin: The configured "
782                 "TTLFactor is too small to be "
783                 "useful (%.1f). I'll use it "
784                 "since the user knows best, "
785                 "but under protest.",
786                 tmp);
787         host->ttl_factor = tmp;
788       } else { /* zero, negative and NAN */
789         ERROR("write_riemann plugin: The configured "
790               "TTLFactor is invalid (%.1f).",
791               tmp);
792       }
793     } else {
794       WARNING("write_riemann plugin: ignoring unknown config "
795               "option: \"%s\"", child->key);
796     }
797   }
798   if (status != 0) {
799     wrr_free(host);
800     return status;
801   }
802
803   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
804             host->name);
805   ud.data = host;
806   ud.free_func = wrr_free;
807
808   pthread_mutex_lock(&host->lock);
809
810   status = plugin_register_write(callback_name, wrr_write, &ud);
811
812   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
813     ud.free_func = NULL;
814     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
815   }
816   if (status != 0)
817     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
818             "failed with status %i.",
819             callback_name, status);
820   else /* success */
821     host->reference_count++;
822
823   status = plugin_register_notification(callback_name,
824                                         wrr_notification, &ud);
825   if (status != 0)
826     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
827             "failed with status %i.",
828             callback_name, status);
829   else /* success */
830     host->reference_count++;
831
832   if (host->reference_count <= 1)
833     {
834       /* Both callbacks failed => free memory.
835        * We need to unlock here, because riemann_free() will lock.
836        * This is not a race condition, because we're the only one
837        * holding a reference. */
838       pthread_mutex_unlock(&host->lock);
839       wrr_free(host);
840       return (-1);
841     }
842
843   host->reference_count--;
844   pthread_mutex_unlock(&host->lock);
845
846   return status;
847 } /* }}} int wrr_config_node */
848
849 static int wrr_config(oconfig_item_t *ci) /* {{{ */
850 {
851   int            i;
852   oconfig_item_t        *child;
853   int            status;
854
855   for (i = 0; i < ci->children_num; i++)  {
856     child = &ci->children[i];
857
858     if (strcasecmp("Node", child->key) == 0) {
859       wrr_config_node (child);
860     } else if (strcasecmp(child->key, "attribute") == 0) {
861       char *key = NULL;
862       char *val = NULL;
863
864       if (child->values_num != 2) {
865         WARNING("riemann attributes need both a key and a value.");
866         return (-1);
867       }
868       if (child->values[0].type != OCONFIG_TYPE_STRING ||
869           child->values[1].type != OCONFIG_TYPE_STRING) {
870         WARNING("riemann attribute needs string arguments.");
871         return (-1);
872       }
873       if ((key = strdup(child->values[0].value.string)) == NULL) {
874         WARNING("cannot allocate memory for attribute key.");
875         return (-1);
876       }
877       if ((val = strdup(child->values[1].value.string)) == NULL) {
878         WARNING("cannot allocate memory for attribute value.");
879         sfree(key);
880         return (-1);
881       }
882       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
883       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
884       DEBUG("write_riemann: got attr: %s => %s", key, val);
885       sfree(key);
886       sfree(val);
887     } else if (strcasecmp(child->key, "tag") == 0) {
888       char *tmp = NULL;
889       status = cf_util_get_string(child, &tmp);
890       if (status != 0)
891         continue;
892
893       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
894       DEBUG("write_riemann plugin: Got tag: %s", tmp);
895       sfree(tmp);
896     } else {
897       WARNING("write_riemann plugin: Ignoring unknown "
898               "configuration option \"%s\" at top level.",
899               child->key);
900     }
901   }
902   return (0);
903 } /* }}} int wrr_config */
904
905 void module_register(void)
906 {
907   plugin_register_complex_config("write_riemann", wrr_config);
908 }
909
910 /* vim: set sw=8 sts=8 ts=8 noet : */