curl stats: Use CamelCase identifiers instead of using underscores.
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include <riemann/riemann-client.h>
32 #include <errno.h>
33
34 #include "collectd.h"
35 #include "plugin.h"
36 #include "common.h"
37 #include "configfile.h"
38 #include "utils_cache.h"
39 #include "utils_complain.h"
40 #include "write_riemann_threshold.h"
41
42 #define RIEMANN_HOST            "localhost"
43 #define RIEMANN_PORT            5555
44 #define RIEMANN_TTL_FACTOR      2.0
45 #define RIEMANN_BATCH_MAX      8192
46
47 struct riemann_host {
48     c_complain_t init_complaint;
49         char                    *name;
50         char                    *event_service_prefix;
51         pthread_mutex_t  lock;
52     _Bool            batch_mode;
53         _Bool            notifications;
54         _Bool            check_thresholds;
55         _Bool                    store_rates;
56         _Bool                    always_append_ds;
57         char                    *node;
58         int                      port;
59         riemann_client_type_t    client_type;
60         riemann_client_t        *client;
61         double                   ttl_factor;
62     cdtime_t         batch_init;
63     int              batch_max;
64     int              batch_timeout;
65         int                          reference_count;
66   riemann_message_t     *batch_msg;
67         char                     *tls_ca_file;
68         char                     *tls_cert_file;
69         char                     *tls_key_file;
70         struct timeval timeout;
71 };
72
73 static char     **riemann_tags;
74 static size_t     riemann_tags_num;
75 static char     **riemann_attrs;
76 static size_t     riemann_attrs_num;
77
78 /* host->lock must be held when calling this function. */
79 static int wrr_connect(struct riemann_host *host) /* {{{ */
80 {
81         char const              *node;
82         int                      port;
83
84         if (host->client)
85                 return 0;
86
87         node = (host->node != NULL) ? host->node : RIEMANN_HOST;
88         port = (host->port) ? host->port : RIEMANN_PORT;
89
90         host->client = NULL;
91
92         host->client = riemann_client_create(host->client_type, node, port,
93                                              RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
94                                              RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
95                                              RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
96                                              RIEMANN_CLIENT_OPTION_NONE);
97         if (host->client == NULL) {
98         c_complain (LOG_ERR, &host->init_complaint,
99                     "write_riemann plugin: Unable to connect to Riemann at %s:%d",
100                     node, port);
101                 return -1;
102         }
103         if (host->timeout.tv_sec != 0) {
104                 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
105                         riemann_client_free(host->client);
106                         host->client = NULL;
107             c_complain (LOG_ERR, &host->init_complaint,
108                         "write_riemann plugin: Unable to connect to Riemann at %s:%d",
109                         node, port);
110                         return -1;
111                 }
112         }
113
114     c_release (LOG_INFO, &host->init_complaint,
115                "write_riemann plugin: Successfully connected to %s:%d",
116                node, port);
117
118         return 0;
119 } /* }}} int wrr_connect */
120
121 /* host->lock must be held when calling this function. */
122 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
123 {
124         if (!host->client)
125                 return (0);
126
127         riemann_client_free(host->client);
128         host->client = NULL;
129
130         return (0);
131 } /* }}} int wrr_disconnect */
132
133 /**
134  * Function to send messages to riemann.
135  *
136  * Acquires the host lock, disconnects on errors.
137  */
138 static int wrr_send_nolock(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
139 {
140         int status = 0;
141
142         status = wrr_connect(host);
143         if (status != 0) {
144                 return status;
145     }
146
147         status = riemann_client_send_message(host->client, msg);
148         if (status != 0) {
149                 wrr_disconnect(host);
150                 return status;
151         }
152
153         /*
154          * For TCP we need to receive message acknowledgemenent.
155          */
156         if (host->client_type != RIEMANN_CLIENT_UDP)
157         {
158                 riemann_message_t *response;
159
160                 response = riemann_client_recv_message(host->client);
161
162                 if (response == NULL)
163                 {
164                         wrr_disconnect(host);
165                         return errno;
166                 }
167                 riemann_message_free(response);
168         }
169
170         return 0;
171 } /* }}} int wrr_send */
172
173 static int wrr_send(struct riemann_host *host, riemann_message_t *msg)
174 {
175     int status = 0;
176
177     pthread_mutex_lock (&host->lock);
178     status = wrr_send_nolock(host, msg);
179     pthread_mutex_unlock (&host->lock);
180     return status;
181 }
182
183 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
184                 notification_t const *n)
185 {
186         riemann_message_t *msg;
187         riemann_event_t *event;
188         char service_buffer[6 * DATA_MAX_NAME_LEN];
189         char const *severity;
190         notification_meta_t *meta;
191         size_t i;
192
193         switch (n->severity)
194         {
195                 case NOTIF_OKAY:        severity = "ok"; break;
196                 case NOTIF_WARNING:     severity = "warning"; break;
197                 case NOTIF_FAILURE:     severity = "critical"; break;
198                 default:                severity = "unknown";
199         }
200
201         format_name(service_buffer, sizeof(service_buffer),
202                     /* host = */ "", n->plugin, n->plugin_instance,
203                     n->type, n->type_instance);
204
205         event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
206                                      RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
207                                      RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
208                                      RIEMANN_EVENT_FIELD_STATE, severity,
209                                      RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
210                                      RIEMANN_EVENT_FIELD_NONE);
211
212         if (n->host[0] != 0)
213                 riemann_event_string_attribute_add(event, "host", n->host);
214         if (n->plugin[0] != 0)
215                 riemann_event_string_attribute_add(event, "plugin", n->plugin);
216         if (n->plugin_instance[0] != 0)
217                 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
218
219         if (n->type[0] != 0)
220                 riemann_event_string_attribute_add(event, "type", n->type);
221         if (n->type_instance[0] != 0)
222                 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
223
224         for (i = 0; i < riemann_attrs_num; i += 2)
225                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
226
227         for (i = 0; i < riemann_tags_num; i++)
228                 riemann_event_tag_add(event, riemann_tags[i]);
229
230         if (n->message[0] != 0)
231                 riemann_event_string_attribute_add(event, "description", n->message);
232
233         /* Pull in values from threshold and add extra attributes */
234         for (meta = n->meta; meta != NULL; meta = meta->next)
235         {
236                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
237                 {
238                         riemann_event_set(event,
239                                           RIEMANN_EVENT_FIELD_METRIC_D,
240                                           (double) meta->nm_value.nm_double,
241                                           RIEMANN_EVENT_FIELD_NONE);
242                         continue;
243                 }
244
245                 if (meta->type == NM_TYPE_STRING) {
246                         riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
247                         continue;
248                 }
249         }
250
251         msg = riemann_message_create_with_events(event, NULL);
252         if (msg == NULL)
253         {
254                 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
255                 riemann_event_free (event);
256                 return (NULL);
257         }
258
259         DEBUG("write_riemann plugin: Successfully created message for notification: "
260               "host = \"%s\", service = \"%s\", state = \"%s\"",
261               event->host, event->service, event->state);
262         return (msg);
263 } /* }}} riemann_message_t *wrr_notification_to_message */
264
265 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
266                                            data_set_t const *ds,
267                                            value_list_t const *vl, size_t index,
268                                            gauge_t const *rates,
269                                            int status)
270 {
271         riemann_event_t *event;
272         char name_buffer[5 * DATA_MAX_NAME_LEN];
273         char service_buffer[6 * DATA_MAX_NAME_LEN];
274         size_t i;
275
276         event = riemann_event_new();
277         if (event == NULL)
278         {
279                 ERROR("write_riemann plugin: riemann_event_new() failed.");
280                 return (NULL);
281         }
282
283         format_name(name_buffer, sizeof(name_buffer),
284                     /* host = */ "", vl->plugin, vl->plugin_instance,
285                     vl->type, vl->type_instance);
286         if (host->always_append_ds || (ds->ds_num > 1))
287         {
288                 if (host->event_service_prefix == NULL)
289                         ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
290                                   &name_buffer[1], ds->ds[index].name);
291                 else
292                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
293                                   host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
294         }
295         else
296         {
297                 if (host->event_service_prefix == NULL)
298                         sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
299                 else
300                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
301                                   host->event_service_prefix, &name_buffer[1]);
302         }
303
304         riemann_event_set(event,
305                           RIEMANN_EVENT_FIELD_HOST, vl->host,
306                           RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
307                           RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
308                           RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
309                           "plugin", vl->plugin,
310                           "type", vl->type,
311                           "ds_name", ds->ds[index].name,
312                           NULL,
313                           RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
314                           RIEMANN_EVENT_FIELD_NONE);
315
316         if (host->check_thresholds) {
317                 const char *state = NULL;
318
319                 switch (status) {
320                         case STATE_OKAY:
321                                 state = "ok";
322                                 break;
323                         case STATE_ERROR:
324                                 state = "critical";
325                                 break;
326                         case STATE_WARNING:
327                                 state = "warning";
328                                 break;
329                         case STATE_MISSING:
330                                 state = "unknown";
331                                 break;
332                 }
333                 if (state)
334                         riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
335                                           RIEMANN_EVENT_FIELD_NONE);
336         }
337
338         if (vl->plugin_instance[0] != 0)
339                 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
340         if (vl->type_instance[0] != 0)
341                 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
342
343         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
344         {
345                 char ds_type[DATA_MAX_NAME_LEN];
346
347                 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
348                           DS_TYPE_TO_STRING(ds->ds[index].type));
349                 riemann_event_string_attribute_add(event, "ds_type", ds_type);
350         }
351         else
352         {
353                 riemann_event_string_attribute_add(event, "ds_type",
354                                        DS_TYPE_TO_STRING(ds->ds[index].type));
355         }
356
357         {
358                 char ds_index[DATA_MAX_NAME_LEN];
359
360                 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
361                 riemann_event_string_attribute_add(event, "ds_index", ds_index);
362         }
363
364         for (i = 0; i < riemann_attrs_num; i += 2)
365                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
366
367         for (i = 0; i < riemann_tags_num; i++)
368                 riemann_event_tag_add(event, riemann_tags[i]);
369
370         if (ds->ds[index].type == DS_TYPE_GAUGE)
371         {
372                 riemann_event_set(event,
373                                   RIEMANN_EVENT_FIELD_METRIC_D,
374                                   (double) vl->values[index].gauge,
375                                   RIEMANN_EVENT_FIELD_NONE);
376         }
377         else if (rates != NULL)
378         {
379                 riemann_event_set(event,
380                                   RIEMANN_EVENT_FIELD_METRIC_D,
381                                   (double) rates[index],
382                                   RIEMANN_EVENT_FIELD_NONE);
383         }
384         else
385         {
386                 int64_t metric;
387
388                 if (ds->ds[index].type == DS_TYPE_DERIVE)
389                         metric = (int64_t) vl->values[index].derive;
390                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
391                         metric = (int64_t) vl->values[index].absolute;
392                 else
393                         metric = (int64_t) vl->values[index].counter;
394
395                 riemann_event_set(event,
396                                   RIEMANN_EVENT_FIELD_METRIC_S64,
397                                   (int64_t) metric,
398                                   RIEMANN_EVENT_FIELD_NONE);
399         }
400
401         DEBUG("write_riemann plugin: Successfully created message for metric: "
402               "host = \"%s\", service = \"%s\"",
403               event->host, event->service);
404         return (event);
405 } /* }}} riemann_event_t *wrr_value_to_event */
406
407 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
408                                                     data_set_t const *ds,
409                                                     value_list_t const *vl,
410                                                     int *statuses)
411 {
412         riemann_message_t *msg;
413         size_t i;
414         gauge_t *rates = NULL;
415
416         /* Initialize the Msg structure. */
417         msg = riemann_message_new();
418         if (msg == NULL)
419         {
420                 ERROR ("write_riemann plugin: riemann_message_new failed.");
421                 return (NULL);
422         }
423
424         if (host->store_rates)
425         {
426                 rates = uc_get_rate(ds, vl);
427                 if (rates == NULL)
428                 {
429                         ERROR("write_riemann plugin: uc_get_rate failed.");
430                         riemann_message_free(msg);
431                         return (NULL);
432                 }
433         }
434
435         for (i = 0; i < vl->values_len; i++)
436         {
437                 riemann_event_t *event;
438
439                 event = wrr_value_to_event(host, ds, vl,
440                                            (int) i, rates, statuses[i]);
441                 if (event == NULL)
442                 {
443                         riemann_message_free(msg);
444                         sfree(rates);
445                         return (NULL);
446                 }
447                 riemann_message_append_events(msg, event, NULL);
448         }
449
450         sfree(rates);
451         return (msg);
452 } /* }}} riemann_message_t *wrr_value_list_to_message */
453
454 /*
455  * Always call while holding host->lock !
456  */
457 static int wrr_batch_flush_nolock(cdtime_t timeout,
458                                   struct riemann_host *host)
459 {
460         cdtime_t    now;
461         int         status = 0;
462
463     now = cdtime();
464         if (timeout > 0) {
465                 if ((host->batch_init + timeout) > now) {
466                         return status;
467         }
468         }
469         wrr_send_nolock(host, host->batch_msg);
470         riemann_message_free(host->batch_msg);
471
472         host->batch_init = now;
473         host->batch_msg = NULL;
474         return status;
475 }
476
477 static int wrr_batch_flush(cdtime_t timeout,
478         const char *identifier __attribute__((unused)),
479         user_data_t *user_data)
480 {
481         struct riemann_host *host;
482         int status;
483
484         if (user_data == NULL)
485                 return (-EINVAL);
486
487         host = user_data->data;
488         pthread_mutex_lock(&host->lock);
489         status = wrr_batch_flush_nolock(timeout, host);
490         if (status != 0)
491         c_complain (LOG_ERR, &host->init_complaint,
492                     "write_riemann plugin: riemann_client_send failed with status %i",
493                     status);
494     else
495         c_release (LOG_DEBUG, &host->init_complaint, "write_riemann plugin: batch sent.");
496
497         pthread_mutex_unlock(&host->lock);
498         return status;
499 }
500
501 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
502                                     data_set_t const *ds,
503                                     value_list_t const *vl,
504                                     int *statuses)
505 {
506         riemann_message_t *msg;
507         size_t len;
508         int ret;
509     cdtime_t timeout;
510
511         msg = wrr_value_list_to_message(host, ds, vl, statuses);
512         if (msg == NULL)
513                 return -1;
514
515         pthread_mutex_lock(&host->lock);
516
517         if (host->batch_msg == NULL) {
518                 host->batch_msg = msg;
519         } else {
520                 int status;
521
522                 status = riemann_message_append_events_n(host->batch_msg,
523                                                          msg->n_events,
524                                                          msg->events);
525                 msg->n_events = 0;
526                 msg->events = NULL;
527
528                 riemann_message_free(msg);
529
530                 if (status != 0) {
531                         pthread_mutex_unlock(&host->lock);
532                         ERROR("write_riemann plugin: out of memory");
533                         return -1;
534                 }
535         }
536
537         len = riemann_message_get_packed_size(host->batch_msg);
538         ret = 0;
539         if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
540                 ret = wrr_batch_flush_nolock(0, host);
541         } else {
542         if (host->batch_timeout > 0) {
543             timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
544             ret = wrr_batch_flush_nolock(timeout, host);
545         }
546     }
547
548         pthread_mutex_unlock(&host->lock);
549         return ret;
550 } /* }}} riemann_message_t *wrr_batch_add_value_list */
551
552 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
553 {
554         int                      status;
555         struct riemann_host     *host = ud->data;
556         riemann_message_t       *msg;
557
558         if (!host->notifications)
559                 return 0;
560
561     /*
562      * Never batch for notifications, send them ASAP
563      */
564         msg = wrr_notification_to_message(host, n);
565         if (msg == NULL)
566                 return (-1);
567
568         status = wrr_send(host, msg);
569         if (status != 0)
570         c_complain (LOG_ERR, &host->init_complaint,
571                     "write_riemann plugin: riemann_client_send failed with status %i",
572                     status);
573     else
574         c_release (LOG_DEBUG, &host->init_complaint,
575                    "write_riemann plugin: riemann_client_send succeeded");
576
577         riemann_message_free(msg);
578         return (status);
579 } /* }}} int wrr_notification */
580
581 static int wrr_write(const data_set_t *ds, /* {{{ */
582               const value_list_t *vl,
583               user_data_t *ud)
584 {
585         int                      status = 0;
586         int                      statuses[vl->values_len];
587         struct riemann_host     *host = ud->data;
588         riemann_message_t       *msg;
589
590         if (host->check_thresholds) {
591                 status = write_riemann_threshold_check(ds, vl, statuses);
592     if (status != 0)
593       return status;
594   } else {
595     memset (statuses, 0, sizeof (statuses));
596   }
597
598   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
599       wrr_batch_add_value_list(host, ds, vl, statuses);
600   } else {
601     msg = wrr_value_list_to_message(host, ds, vl, statuses);
602     if (msg == NULL)
603       return (-1);
604
605     status = wrr_send(host, msg);
606
607     riemann_message_free(msg);
608   }
609   return status;
610 } /* }}} int wrr_write */
611
612 static void wrr_free(void *p) /* {{{ */
613 {
614   struct riemann_host   *host = p;
615
616   if (host == NULL)
617     return;
618
619   pthread_mutex_lock(&host->lock);
620
621   host->reference_count--;
622   if (host->reference_count > 0)
623     {
624       pthread_mutex_unlock(&host->lock);
625       return;
626     }
627
628   wrr_disconnect(host);
629
630   pthread_mutex_destroy(&host->lock);
631   sfree(host);
632 } /* }}} void wrr_free */
633
634 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
635 {
636   struct riemann_host   *host = NULL;
637   int                    status = 0;
638   int                    i;
639   oconfig_item_t                *child;
640   char                   callback_name[DATA_MAX_NAME_LEN];
641   user_data_t            ud;
642
643   if ((host = calloc(1, sizeof(*host))) == NULL) {
644     ERROR ("write_riemann plugin: calloc failed.");
645     return ENOMEM;
646   }
647   pthread_mutex_init(&host->lock, NULL);
648   C_COMPLAIN_INIT (&host->init_complaint);
649   host->reference_count = 1;
650   host->node = NULL;
651   host->port = 0;
652   host->notifications = 1;
653   host->check_thresholds = 0;
654   host->store_rates = 1;
655   host->always_append_ds = 0;
656   host->batch_mode = 1;
657   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
658   host->batch_init = cdtime();
659   host->batch_timeout = 0;
660   host->ttl_factor = RIEMANN_TTL_FACTOR;
661   host->client = NULL;
662   host->client_type = RIEMANN_CLIENT_TCP;
663   host->timeout.tv_sec = 0;
664   host->timeout.tv_usec = 0;
665
666   status = cf_util_get_string(ci, &host->name);
667   if (status != 0) {
668     WARNING("write_riemann plugin: Required host name is missing.");
669     wrr_free(host);
670     return -1;
671   }
672
673   for (i = 0; i < ci->children_num; i++) {
674     /*
675      * The code here could be simplified but makes room
676      * for easy adding of new options later on.
677      */
678     child = &ci->children[i];
679     status = 0;
680
681     if (strcasecmp("Host", child->key) == 0) {
682       status = cf_util_get_string(child, &host->node);
683       if (status != 0)
684         break;
685     } else if (strcasecmp("Notifications", child->key) == 0) {
686       status = cf_util_get_boolean(child, &host->notifications);
687       if (status != 0)
688         break;
689     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
690       status = cf_util_get_string(child, &host->event_service_prefix);
691       if (status != 0)
692         break;
693     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
694       status = cf_util_get_boolean(child, &host->check_thresholds);
695       if (status != 0)
696         break;
697     } else if (strcasecmp("Batch", child->key) == 0) {
698       status = cf_util_get_boolean(child, &host->batch_mode);
699       if (status != 0)
700         break;
701     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
702       status = cf_util_get_int(child, &host->batch_max);
703       if (status != 0)
704         break;
705     } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
706       status = cf_util_get_int(child, &host->batch_timeout);
707       if (status != 0)
708         break;
709     } else if (strcasecmp("Timeout", child->key) == 0) {
710       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
711       if (status != 0)
712         break;
713     } else if (strcasecmp("Port", child->key) == 0) {
714       host->port = cf_util_get_port_number(child);
715       if (host->port == -1) {
716         ERROR("write_riemann plugin: Invalid argument "
717               "configured for the \"Port\" "
718               "option.");
719         break;
720       }
721     } else if (strcasecmp("Protocol", child->key) == 0) {
722       char tmp[16];
723       status = cf_util_get_string_buffer(child,
724                                          tmp, sizeof(tmp));
725       if (status != 0)
726         {
727           ERROR("write_riemann plugin: cf_util_get_"
728                 "string_buffer failed with "
729                 "status %i.", status);
730           break;
731         }
732
733       if (strcasecmp("UDP", tmp) == 0)
734         host->client_type = RIEMANN_CLIENT_UDP;
735       else if (strcasecmp("TCP", tmp) == 0)
736         host->client_type = RIEMANN_CLIENT_TCP;
737       else if (strcasecmp("TLS", tmp) == 0)
738         host->client_type = RIEMANN_CLIENT_TLS;
739       else
740         WARNING("write_riemann plugin: The value "
741                 "\"%s\" is not valid for the "
742                 "\"Protocol\" option. Use "
743                 "either \"UDP\", \"TCP\" or \"TLS\".",
744                 tmp);
745     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
746       status = cf_util_get_string(child, &host->tls_ca_file);
747       if (status != 0)
748         {
749           ERROR("write_riemann plugin: cf_util_get_"
750                 "string_buffer failed with "
751                 "status %i.", status);
752           break;
753         }
754     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
755       status = cf_util_get_string(child, &host->tls_cert_file);
756       if (status != 0)
757         {
758           ERROR("write_riemann plugin: cf_util_get_"
759                 "string_buffer failed with "
760                 "status %i.", status);
761           break;
762         }
763     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
764       status = cf_util_get_string(child, &host->tls_key_file);
765       if (status != 0)
766         {
767           ERROR("write_riemann plugin: cf_util_get_"
768                 "string_buffer failed with "
769                 "status %i.", status);
770           break;
771         }
772     } else if (strcasecmp("StoreRates", child->key) == 0) {
773       status = cf_util_get_boolean(child, &host->store_rates);
774       if (status != 0)
775         break;
776     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
777       status = cf_util_get_boolean(child,
778                                    &host->always_append_ds);
779       if (status != 0)
780         break;
781     } else if (strcasecmp("TTLFactor", child->key) == 0) {
782       double tmp = NAN;
783       status = cf_util_get_double(child, &tmp);
784       if (status != 0)
785         break;
786       if (tmp >= 2.0) {
787         host->ttl_factor = tmp;
788       } else if (tmp >= 1.0) {
789         NOTICE("write_riemann plugin: The configured "
790                "TTLFactor is very small "
791                "(%.1f). A value of 2.0 or "
792                "greater is recommended.",
793                tmp);
794         host->ttl_factor = tmp;
795       } else if (tmp > 0.0) {
796         WARNING("write_riemann plugin: The configured "
797                 "TTLFactor is too small to be "
798                 "useful (%.1f). I'll use it "
799                 "since the user knows best, "
800                 "but under protest.",
801                 tmp);
802         host->ttl_factor = tmp;
803       } else { /* zero, negative and NAN */
804         ERROR("write_riemann plugin: The configured "
805               "TTLFactor is invalid (%.1f).",
806               tmp);
807       }
808     } else {
809       WARNING("write_riemann plugin: ignoring unknown config "
810               "option: \"%s\"", child->key);
811     }
812   }
813   if (status != 0) {
814     wrr_free(host);
815     return status;
816   }
817
818   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
819             host->name);
820   ud.data = host;
821   ud.free_func = wrr_free;
822
823   pthread_mutex_lock(&host->lock);
824
825   status = plugin_register_write(callback_name, wrr_write, &ud);
826
827   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
828     ud.free_func = NULL;
829     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
830   }
831   if (status != 0)
832     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
833             "failed with status %i.",
834             callback_name, status);
835   else /* success */
836     host->reference_count++;
837
838   status = plugin_register_notification(callback_name,
839                                         wrr_notification, &ud);
840   if (status != 0)
841     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
842             "failed with status %i.",
843             callback_name, status);
844   else /* success */
845     host->reference_count++;
846
847   if (host->reference_count <= 1)
848     {
849       /* Both callbacks failed => free memory.
850        * We need to unlock here, because riemann_free() will lock.
851        * This is not a race condition, because we're the only one
852        * holding a reference. */
853       pthread_mutex_unlock(&host->lock);
854       wrr_free(host);
855       return (-1);
856     }
857
858   host->reference_count--;
859   pthread_mutex_unlock(&host->lock);
860
861   return status;
862 } /* }}} int wrr_config_node */
863
864 static int wrr_config(oconfig_item_t *ci) /* {{{ */
865 {
866   int            i;
867   oconfig_item_t        *child;
868   int            status;
869
870   for (i = 0; i < ci->children_num; i++)  {
871     child = &ci->children[i];
872
873     if (strcasecmp("Node", child->key) == 0) {
874       wrr_config_node (child);
875     } else if (strcasecmp(child->key, "attribute") == 0) {
876       char *key = NULL;
877       char *val = NULL;
878
879       if (child->values_num != 2) {
880         WARNING("riemann attributes need both a key and a value.");
881         return (-1);
882       }
883       if (child->values[0].type != OCONFIG_TYPE_STRING ||
884           child->values[1].type != OCONFIG_TYPE_STRING) {
885         WARNING("riemann attribute needs string arguments.");
886         return (-1);
887       }
888       if ((key = strdup(child->values[0].value.string)) == NULL) {
889         WARNING("cannot allocate memory for attribute key.");
890         return (-1);
891       }
892       if ((val = strdup(child->values[1].value.string)) == NULL) {
893         WARNING("cannot allocate memory for attribute value.");
894         sfree(key);
895         return (-1);
896       }
897       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
898       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
899       DEBUG("write_riemann: got attr: %s => %s", key, val);
900       sfree(key);
901       sfree(val);
902     } else if (strcasecmp(child->key, "tag") == 0) {
903       char *tmp = NULL;
904       status = cf_util_get_string(child, &tmp);
905       if (status != 0)
906         continue;
907
908       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
909       DEBUG("write_riemann plugin: Got tag: %s", tmp);
910       sfree(tmp);
911     } else {
912       WARNING("write_riemann plugin: Ignoring unknown "
913               "configuration option \"%s\" at top level.",
914               child->key);
915     }
916   }
917   return (0);
918 } /* }}} int wrr_config */
919
920 void module_register(void)
921 {
922   plugin_register_complex_config("write_riemann", wrr_config);
923 }
924
925 /* vim: set sw=8 sts=8 ts=8 noet : */