write_riemann: Use riemann-c-client 1.6.0+
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include <riemann/riemann-client.h>
32 #include <errno.h>
33 #include <pthread.h>
34
35 #include "collectd.h"
36 #include "plugin.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "utils_cache.h"
40 #include "write_riemann_threshold.h"
41
42 #define RIEMANN_HOST            "localhost"
43 #define RIEMANN_PORT            5555
44 #define RIEMANN_TTL_FACTOR      2.0
45 #define RIEMANN_BATCH_MAX      8192
46
47 struct riemann_host {
48         char                    *name;
49         char                    *event_service_prefix;
50         pthread_mutex_t  lock;
51     _Bool            batch_mode;
52         _Bool            notifications;
53         _Bool            check_thresholds;
54         _Bool                    store_rates;
55         _Bool                    always_append_ds;
56         char                    *node;
57         int                      port;
58         riemann_client_type_t    client_type;
59         riemann_client_t        *client;
60         double                   ttl_factor;
61     cdtime_t         batch_init;
62     int              batch_max;
63         int                          reference_count;
64   riemann_message_t     *batch_msg;
65         char                     *tls_ca_file;
66         char                     *tls_cert_file;
67         char                     *tls_key_file;
68 };
69
70 static char     **riemann_tags;
71 static size_t     riemann_tags_num;
72 static char     **riemann_attrs;
73 static size_t     riemann_attrs_num;
74
75 /* host->lock must be held when calling this function. */
76 static int wrr_connect(struct riemann_host *host) /* {{{ */
77 {
78         char const              *node;
79         int                      port;
80
81         if (host->client)
82                 return 0;
83
84         node = (host->node != NULL) ? host->node : RIEMANN_HOST;
85         port = (host->port) ? host->port : RIEMANN_PORT;
86
87         host->client = NULL;
88
89         host->client = riemann_client_create(host->client_type, node, port,
90                                              RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
91                                              RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
92                                              RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
93                                              RIEMANN_CLIENT_OPTION_NONE);
94         if (host->client == NULL) {
95                 WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d",
96                         node, port);
97                 return -1;
98         }
99         DEBUG("write_riemann plugin: got a successful connection for: %s:%d",
100               node, port);
101
102         return 0;
103 } /* }}} int wrr_connect */
104
105 /* host->lock must be held when calling this function. */
106 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
107 {
108         if (!host->client)
109                 return (0);
110
111         riemann_client_free(host->client);
112         host->client = NULL;
113
114         return (0);
115 } /* }}} int wrr_disconnect */
116
117 /**
118  * Function to send messages to riemann.
119  *
120  * Acquires the host lock, disconnects on errors.
121  */
122 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
123 {
124         int status = 0;
125         pthread_mutex_lock (&host->lock);
126
127         status = wrr_connect(host);
128         if (status != 0)
129                 return status;
130
131         status = riemann_client_send_message(host->client, msg);
132         if (status != 0) {
133                 wrr_disconnect(host);
134                 pthread_mutex_unlock(&host->lock);
135                 return status;
136         }
137
138         /*
139          * For TCP we need to receive message acknowledgemenent.
140          */
141         if (host->client_type != RIEMANN_CLIENT_UDP)
142         {
143                 riemann_message_t *response;
144
145                 response = riemann_client_recv_message(host->client);
146
147                 if (response == NULL)
148                 {
149                         wrr_disconnect(host);
150                         pthread_mutex_unlock(&host->lock);
151                         return errno;
152                 }
153                 riemann_message_free(response);
154         }
155
156         pthread_mutex_unlock (&host->lock);
157         return 0;
158 } /* }}} int wrr_send */
159
160 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
161                 notification_t const *n)
162 {
163         riemann_message_t *msg;
164         riemann_event_t *event;
165         char service_buffer[6 * DATA_MAX_NAME_LEN];
166         char const *severity;
167         notification_meta_t *meta;
168         size_t i;
169
170         switch (n->severity)
171         {
172                 case NOTIF_OKAY:        severity = "ok"; break;
173                 case NOTIF_WARNING:     severity = "warning"; break;
174                 case NOTIF_FAILURE:     severity = "critical"; break;
175                 default:                severity = "unknown";
176         }
177
178         format_name(service_buffer, sizeof(service_buffer),
179                     /* host = */ "", n->plugin, n->plugin_instance,
180                     n->type, n->type_instance);
181
182         event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
183                                      RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
184                                      RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
185                                      RIEMANN_EVENT_FIELD_STATE, severity,
186                                      RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
187                                      RIEMANN_EVENT_FIELD_NONE);
188
189         if (n->host[0] != 0)
190                 riemann_event_string_attribute_add(event, "host", n->host);
191         if (n->plugin[0] != 0)
192                 riemann_event_string_attribute_add(event, "plugin", n->plugin);
193         if (n->plugin_instance[0] != 0)
194                 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
195
196         if (n->type[0] != 0)
197                 riemann_event_string_attribute_add(event, "type", n->type);
198         if (n->type_instance[0] != 0)
199                 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
200
201         for (i = 0; i < riemann_attrs_num; i += 2)
202                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
203
204         for (i = 0; i < riemann_tags_num; i++)
205                 riemann_event_tag_add(event, riemann_tags[i]);
206
207         if (n->message[0] != 0)
208                 riemann_event_string_attribute_add(event, "description", n->message);
209
210         /* Pull in values from threshold and add extra attributes */
211         for (meta = n->meta; meta != NULL; meta = meta->next)
212         {
213                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
214                 {
215                         riemann_event_set(event,
216                                           RIEMANN_EVENT_FIELD_METRIC_D,
217                                           (double) meta->nm_value.nm_double,
218                                           RIEMANN_EVENT_FIELD_NONE);
219                         continue;
220                 }
221
222                 if (meta->type == NM_TYPE_STRING) {
223                         riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
224                         continue;
225                 }
226         }
227
228         msg = riemann_message_create_with_events(event, NULL);
229         if (msg == NULL)
230         {
231                 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
232                 riemann_event_free (event);
233                 return (NULL);
234         }
235
236         DEBUG("write_riemann plugin: Successfully created message for notification: "
237               "host = \"%s\", service = \"%s\", state = \"%s\"",
238               event->host, event->service, event->state);
239         return (msg);
240 } /* }}} riemann_message_t *wrr_notification_to_message */
241
242 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
243                                            data_set_t const *ds,
244                                            value_list_t const *vl, size_t index,
245                                            gauge_t const *rates,
246                                            int status)
247 {
248         riemann_event_t *event;
249         char name_buffer[5 * DATA_MAX_NAME_LEN];
250         char service_buffer[6 * DATA_MAX_NAME_LEN];
251         size_t i;
252
253         event = riemann_event_new();
254         if (event == NULL)
255         {
256                 ERROR("write_riemann plugin: riemann_event_new() failed.");
257                 return (NULL);
258         }
259
260         format_name(name_buffer, sizeof(name_buffer),
261                     /* host = */ "", vl->plugin, vl->plugin_instance,
262                     vl->type, vl->type_instance);
263         if (host->always_append_ds || (ds->ds_num > 1))
264         {
265                 if (host->event_service_prefix == NULL)
266                         ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
267                                   &name_buffer[1], ds->ds[index].name);
268                 else
269                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
270                                   host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
271         }
272         else
273         {
274                 if (host->event_service_prefix == NULL)
275                         sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
276                 else
277                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
278                                   host->event_service_prefix, &name_buffer[1]);
279         }
280
281         riemann_event_set(event,
282                           RIEMANN_EVENT_FIELD_HOST, vl->host,
283                           RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
284                           RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
285                           RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
286                           "plugin", vl->plugin,
287                           "type", vl->type,
288                           "ds_name", ds->ds[index].name,
289                           NULL,
290                           RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
291                           RIEMANN_EVENT_FIELD_NONE);
292
293         if (host->check_thresholds) {
294                 const char *state = NULL;
295
296                 switch (status) {
297                         case STATE_OKAY:
298                                 state = "ok";
299                                 break;
300                         case STATE_ERROR:
301                                 state = "critical";
302                                 break;
303                         case STATE_WARNING:
304                                 state = "warning";
305                                 break;
306                         case STATE_MISSING:
307                                 state = "unknown";
308                                 break;
309                 }
310                 if (state)
311                         riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
312                                           RIEMANN_EVENT_FIELD_NONE);
313         }
314
315         if (vl->plugin_instance[0] != 0)
316                 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
317         if (vl->type_instance[0] != 0)
318                 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
319
320         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
321         {
322                 char ds_type[DATA_MAX_NAME_LEN];
323
324                 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
325                           DS_TYPE_TO_STRING(ds->ds[index].type));
326                 riemann_event_string_attribute_add(event, "ds_type", ds_type);
327         }
328         else
329         {
330                 riemann_event_string_attribute_add(event, "ds_type",
331                                        DS_TYPE_TO_STRING(ds->ds[index].type));
332         }
333
334         {
335                 char ds_index[DATA_MAX_NAME_LEN];
336
337                 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
338                 riemann_event_string_attribute_add(event, "ds_index", ds_index);
339         }
340
341         for (i = 0; i < riemann_attrs_num; i += 2)
342                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
343
344         for (i = 0; i < riemann_tags_num; i++)
345                 riemann_event_tag_add(event, riemann_tags[i]);
346
347         if (ds->ds[index].type == DS_TYPE_GAUGE)
348         {
349                 riemann_event_set(event,
350                                   RIEMANN_EVENT_FIELD_METRIC_D,
351                                   (double) vl->values[index].gauge,
352                                   RIEMANN_EVENT_FIELD_NONE);
353         }
354         else if (rates != NULL)
355         {
356                 riemann_event_set(event,
357                                   RIEMANN_EVENT_FIELD_METRIC_D,
358                                   (double) rates[index],
359                                   RIEMANN_EVENT_FIELD_NONE);
360         }
361         else
362         {
363                 int64_t metric;
364
365                 if (ds->ds[index].type == DS_TYPE_DERIVE)
366                         metric = (int64_t) vl->values[index].derive;
367                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
368                         metric = (int64_t) vl->values[index].absolute;
369                 else
370                         metric = (int64_t) vl->values[index].counter;
371
372                 riemann_event_set(event,
373                                   RIEMANN_EVENT_FIELD_METRIC_S64,
374                                   (int64_t) metric,
375                                   RIEMANN_EVENT_FIELD_NONE);
376         }
377
378         DEBUG("write_riemann plugin: Successfully created message for metric: "
379               "host = \"%s\", service = \"%s\"",
380               event->host, event->service);
381         return (event);
382 } /* }}} riemann_event_t *wrr_value_to_event */
383
384 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
385                                                     data_set_t const *ds,
386                                                     value_list_t const *vl,
387                                                     int *statuses)
388 {
389         riemann_message_t *msg;
390         size_t i;
391         gauge_t *rates = NULL;
392
393         /* Initialize the Msg structure. */
394         msg = riemann_message_new();
395         if (msg == NULL)
396         {
397                 ERROR ("write_riemann plugin: riemann_message_new failed.");
398                 return (NULL);
399         }
400
401         if (host->store_rates)
402         {
403                 rates = uc_get_rate(ds, vl);
404                 if (rates == NULL)
405                 {
406                         ERROR("write_riemann plugin: uc_get_rate failed.");
407                         riemann_message_free(msg);
408                         return (NULL);
409                 }
410         }
411
412         for (i = 0; i < vl->values_len; i++)
413         {
414                 riemann_event_t *event;
415
416                 event = wrr_value_to_event(host, ds, vl,
417                                            (int) i, rates, statuses[i]);
418                 if (event == NULL)
419                 {
420                         riemann_message_free(msg);
421                         sfree(rates);
422                         return (NULL);
423                 }
424                 riemann_message_append_events(msg, event, NULL);
425         }
426
427         sfree(rates);
428         return (msg);
429 } /* }}} riemann_message_t *wrr_value_list_to_message */
430
431 /*
432  * Always call while holding host->lock !
433  */
434 static int wrr_batch_flush_nolock(cdtime_t timeout,
435                                   struct riemann_host *host)
436 {
437         cdtime_t    now;
438         int         status = 0;
439
440         if (timeout > 0) {
441                 now = cdtime();
442                 if ((host->batch_init + timeout) > now)
443                         return status;
444         }
445         wrr_send(host, host->batch_msg);
446         riemann_message_free(host->batch_msg);
447
448         if (host->client_type != RIEMANN_CLIENT_UDP)
449         {
450                 riemann_message_t *response;
451
452                 response = riemann_client_recv_message(host->client);
453
454                 if (!response)
455                 {
456                         wrr_disconnect(host);
457                         return errno;
458                 }
459
460                 riemann_message_free(response);
461         }
462
463         host->batch_init = cdtime();
464         host->batch_msg = NULL;
465         return status;
466 }
467
468 static int wrr_batch_flush(cdtime_t timeout,
469         const char *identifier __attribute__((unused)),
470         user_data_t *user_data)
471 {
472         struct riemann_host *host;
473         int status;
474
475         if (user_data == NULL)
476                 return (-EINVAL);
477
478         host = user_data->data;
479         pthread_mutex_lock(&host->lock);
480         status = wrr_batch_flush_nolock(timeout, host);
481         if (status != 0)
482                 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
483                       status);
484
485         pthread_mutex_unlock(&host->lock);
486         return status;
487 }
488
489 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
490                                     data_set_t const *ds,
491                                     value_list_t const *vl,
492                                     int *statuses)
493 {
494         riemann_message_t *msg;
495         size_t len;
496         int ret;
497
498         msg = wrr_value_list_to_message(host, ds, vl, statuses);
499         if (msg == NULL)
500                 return -1;
501
502         pthread_mutex_lock(&host->lock);
503
504         if (host->batch_msg == NULL) {
505                 host->batch_msg = msg;
506         } else {
507                 int status;
508
509                 status = riemann_message_append_events_n(host->batch_msg,
510                                                          msg->n_events,
511                                                          msg->events);
512                 msg->n_events = 0;
513                 msg->events = NULL;
514
515                 riemann_message_free(msg);
516
517                 if (status != 0) {
518                         pthread_mutex_unlock(&host->lock);
519                         ERROR("write_riemann plugin: out of memory");
520                         return -1;
521                 }
522         }
523
524         len = riemann_message_get_packed_size(host->batch_msg);
525         ret = 0;
526         if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
527                 ret = wrr_batch_flush_nolock(0, host);
528         }
529
530         pthread_mutex_unlock(&host->lock);
531         return ret;
532 } /* }}} riemann_message_t *wrr_batch_add_value_list */
533
534 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
535 {
536         int                      status;
537         struct riemann_host     *host = ud->data;
538         riemann_message_t       *msg;
539
540         if (!host->notifications)
541                 return 0;
542
543     /*
544      * Never batch for notifications, send them ASAP
545      */
546         msg = wrr_notification_to_message(host, n);
547         if (msg == NULL)
548                 return (-1);
549
550         status = wrr_send(host, msg);
551         if (status != 0)
552                 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
553                       status);
554
555         riemann_message_free(msg);
556         return (status);
557 } /* }}} int wrr_notification */
558
559 static int wrr_write(const data_set_t *ds, /* {{{ */
560               const value_list_t *vl,
561               user_data_t *ud)
562 {
563         int                      status = 0;
564         int                      statuses[vl->values_len];
565         struct riemann_host     *host = ud->data;
566         riemann_message_t       *msg;
567
568         if (host->check_thresholds) {
569                 status = write_riemann_threshold_check(ds, vl, statuses);
570     if (status != 0)
571       return status;
572   } else {
573     memset (statuses, 0, sizeof (statuses));
574   }
575
576   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
577     wrr_batch_add_value_list(host, ds, vl, statuses);
578   } else {
579     msg = wrr_value_list_to_message(host, ds, vl, statuses);
580     if (msg == NULL)
581       return (-1);
582
583     status = wrr_send(host, msg);
584     if (status != 0)
585       ERROR("write_riemann plugin: riemann_client_send failed with status %i",
586             status);
587
588     riemann_message_free(msg);
589   }
590   return status;
591 } /* }}} int wrr_write */
592
593 static void wrr_free(void *p) /* {{{ */
594 {
595   struct riemann_host   *host = p;
596
597   if (host == NULL)
598     return;
599
600   pthread_mutex_lock(&host->lock);
601
602   host->reference_count--;
603   if (host->reference_count > 0)
604     {
605       pthread_mutex_unlock(&host->lock);
606       return;
607     }
608
609   wrr_disconnect(host);
610
611   pthread_mutex_destroy(&host->lock);
612   sfree(host);
613 } /* }}} void wrr_free */
614
615 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
616 {
617   struct riemann_host   *host = NULL;
618   int                    status = 0;
619   int                    i;
620   oconfig_item_t                *child;
621   char                   callback_name[DATA_MAX_NAME_LEN];
622   user_data_t            ud;
623
624   if ((host = calloc(1, sizeof(*host))) == NULL) {
625     ERROR ("write_riemann plugin: calloc failed.");
626     return ENOMEM;
627   }
628   pthread_mutex_init(&host->lock, NULL);
629   host->reference_count = 1;
630   host->node = NULL;
631   host->port = 0;
632   host->notifications = 1;
633   host->check_thresholds = 0;
634   host->store_rates = 1;
635   host->always_append_ds = 0;
636   host->batch_mode = 1;
637   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
638   host->batch_init = cdtime();
639   host->ttl_factor = RIEMANN_TTL_FACTOR;
640   host->client = NULL;
641   host->client_type = RIEMANN_CLIENT_TCP;
642
643   status = cf_util_get_string(ci, &host->name);
644   if (status != 0) {
645     WARNING("write_riemann plugin: Required host name is missing.");
646     wrr_free(host);
647     return -1;
648   }
649
650   for (i = 0; i < ci->children_num; i++) {
651     /*
652      * The code here could be simplified but makes room
653      * for easy adding of new options later on.
654      */
655     child = &ci->children[i];
656     status = 0;
657
658     if (strcasecmp("Host", child->key) == 0) {
659       status = cf_util_get_string(child, &host->node);
660       if (status != 0)
661         break;
662     } else if (strcasecmp("Notifications", child->key) == 0) {
663       status = cf_util_get_boolean(child, &host->notifications);
664       if (status != 0)
665         break;
666     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
667       status = cf_util_get_string(child, &host->event_service_prefix);
668       if (status != 0)
669         break;
670     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
671       status = cf_util_get_boolean(child, &host->check_thresholds);
672       if (status != 0)
673         break;
674     } else if (strcasecmp("Batch", child->key) == 0) {
675       status = cf_util_get_boolean(child, &host->batch_mode);
676       if (status != 0)
677         break;
678     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
679       status = cf_util_get_int(child, &host->batch_max);
680       if (status != 0)
681         break;
682     } else if (strcasecmp("Port", child->key) == 0) {
683       host->port = cf_util_get_port_number(child);
684       if (host->port == -1) {
685         ERROR("write_riemann plugin: Invalid argument "
686               "configured for the \"Port\" "
687               "option.");
688         break;
689       }
690     } else if (strcasecmp("Protocol", child->key) == 0) {
691       char tmp[16];
692       status = cf_util_get_string_buffer(child,
693                                          tmp, sizeof(tmp));
694       if (status != 0)
695         {
696           ERROR("write_riemann plugin: cf_util_get_"
697                 "string_buffer failed with "
698                 "status %i.", status);
699           break;
700         }
701
702       if (strcasecmp("UDP", tmp) == 0)
703         host->client_type = RIEMANN_CLIENT_UDP;
704       else if (strcasecmp("TCP", tmp) == 0)
705         host->client_type = RIEMANN_CLIENT_TCP;
706       else if (strcasecmp("TLS", tmp) == 0)
707         host->client_type = RIEMANN_CLIENT_TLS;
708       else
709         WARNING("write_riemann plugin: The value "
710                 "\"%s\" is not valid for the "
711                 "\"Protocol\" option. Use "
712                 "either \"UDP\", \"TCP\" or \"TLS\".",
713                 tmp);
714     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
715       status = cf_util_get_string(child, &host->tls_ca_file);
716       if (status != 0)
717         {
718           ERROR("write_riemann plugin: cf_util_get_"
719                 "string_buffer failed with "
720                 "status %i.", status);
721           break;
722         }
723     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
724       status = cf_util_get_string(child, &host->tls_cert_file);
725       if (status != 0)
726         {
727           ERROR("write_riemann plugin: cf_util_get_"
728                 "string_buffer failed with "
729                 "status %i.", status);
730           break;
731         }
732     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
733       status = cf_util_get_string(child, &host->tls_key_file);
734       if (status != 0)
735         {
736           ERROR("write_riemann plugin: cf_util_get_"
737                 "string_buffer failed with "
738                 "status %i.", status);
739           break;
740         }
741     } else if (strcasecmp("StoreRates", child->key) == 0) {
742       status = cf_util_get_boolean(child, &host->store_rates);
743       if (status != 0)
744         break;
745     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
746       status = cf_util_get_boolean(child,
747                                    &host->always_append_ds);
748       if (status != 0)
749         break;
750     } else if (strcasecmp("TTLFactor", child->key) == 0) {
751       double tmp = NAN;
752       status = cf_util_get_double(child, &tmp);
753       if (status != 0)
754         break;
755       if (tmp >= 2.0) {
756         host->ttl_factor = tmp;
757       } else if (tmp >= 1.0) {
758         NOTICE("write_riemann plugin: The configured "
759                "TTLFactor is very small "
760                "(%.1f). A value of 2.0 or "
761                "greater is recommended.",
762                tmp);
763         host->ttl_factor = tmp;
764       } else if (tmp > 0.0) {
765         WARNING("write_riemann plugin: The configured "
766                 "TTLFactor is too small to be "
767                 "useful (%.1f). I'll use it "
768                 "since the user knows best, "
769                 "but under protest.",
770                 tmp);
771         host->ttl_factor = tmp;
772       } else { /* zero, negative and NAN */
773         ERROR("write_riemann plugin: The configured "
774               "TTLFactor is invalid (%.1f).",
775               tmp);
776       }
777     } else {
778       WARNING("write_riemann plugin: ignoring unknown config "
779               "option: \"%s\"", child->key);
780     }
781   }
782   if (status != 0) {
783     wrr_free(host);
784     return status;
785   }
786
787   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
788             host->name);
789   ud.data = host;
790   ud.free_func = wrr_free;
791
792   pthread_mutex_lock(&host->lock);
793
794   status = plugin_register_write(callback_name, wrr_write, &ud);
795
796   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
797     ud.free_func = NULL;
798     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
799   }
800   if (status != 0)
801     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
802             "failed with status %i.",
803             callback_name, status);
804   else /* success */
805     host->reference_count++;
806
807   status = plugin_register_notification(callback_name,
808                                         wrr_notification, &ud);
809   if (status != 0)
810     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
811             "failed with status %i.",
812             callback_name, status);
813   else /* success */
814     host->reference_count++;
815
816   if (host->reference_count <= 1)
817     {
818       /* Both callbacks failed => free memory.
819        * We need to unlock here, because riemann_free() will lock.
820        * This is not a race condition, because we're the only one
821        * holding a reference. */
822       pthread_mutex_unlock(&host->lock);
823       wrr_free(host);
824       return (-1);
825     }
826
827   host->reference_count--;
828   pthread_mutex_unlock(&host->lock);
829
830   return status;
831 } /* }}} int wrr_config_node */
832
833 static int wrr_config(oconfig_item_t *ci) /* {{{ */
834 {
835   int            i;
836   oconfig_item_t        *child;
837   int            status;
838
839   for (i = 0; i < ci->children_num; i++)  {
840     child = &ci->children[i];
841
842     if (strcasecmp("Node", child->key) == 0) {
843       wrr_config_node (child);
844     } else if (strcasecmp(child->key, "attribute") == 0) {
845       char *key = NULL;
846       char *val = NULL;
847
848       if (child->values_num != 2) {
849         WARNING("riemann attributes need both a key and a value.");
850         return (-1);
851       }
852       if (child->values[0].type != OCONFIG_TYPE_STRING ||
853           child->values[1].type != OCONFIG_TYPE_STRING) {
854         WARNING("riemann attribute needs string arguments.");
855         return (-1);
856       }
857       if ((key = strdup(child->values[0].value.string)) == NULL) {
858         WARNING("cannot allocate memory for attribute key.");
859         return (-1);
860       }
861       if ((val = strdup(child->values[1].value.string)) == NULL) {
862         WARNING("cannot allocate memory for attribute value.");
863         sfree(key);
864         return (-1);
865       }
866       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
867       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
868       DEBUG("write_riemann: got attr: %s => %s", key, val);
869       sfree(key);
870       sfree(val);
871     } else if (strcasecmp(child->key, "tag") == 0) {
872       char *tmp = NULL;
873       status = cf_util_get_string(child, &tmp);
874       if (status != 0)
875         continue;
876
877       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
878       DEBUG("write_riemann plugin: Got tag: %s", tmp);
879       sfree(tmp);
880     } else {
881       WARNING("write_riemann plugin: Ignoring unknown "
882               "configuration option \"%s\" at top level.",
883               child->key);
884     }
885   }
886   return (0);
887 } /* }}} int wrr_config */
888
889 void module_register(void)
890 {
891   plugin_register_complex_config("write_riemann", wrr_config);
892 }
893
894 /* vim: set sw=8 sts=8 ts=8 noet : */