write_riemann: Use riemann-c-client
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include <riemann/riemann-client.h>
32 #include <errno.h>
33 #include <pthread.h>
34
35 #include "collectd.h"
36 #include "plugin.h"
37 #include "common.h"
38 #include "configfile.h"
39 #include "utils_cache.h"
40 #include "write_riemann_threshold.h"
41
42 #define RIEMANN_HOST            "localhost"
43 #define RIEMANN_PORT            5555
44 #define RIEMANN_TTL_FACTOR      2.0
45 #define RIEMANN_BATCH_MAX      8192
46
47 struct riemann_host {
48         char                    *name;
49         char                    *event_service_prefix;
50         pthread_mutex_t  lock;
51     _Bool            batch_mode;
52         _Bool            notifications;
53         _Bool            check_thresholds;
54         _Bool                    store_rates;
55         _Bool                    always_append_ds;
56         char                    *node;
57         int                      port;
58         riemann_client_type_t    client_type;
59         riemann_client_t        *client;
60         double                   ttl_factor;
61     cdtime_t         batch_init;
62     int              batch_max;
63         int                          reference_count;
64   riemann_message_t     *batch_msg;
65         char                     *tls_ca_file;
66         char                     *tls_cert_file;
67         char                     *tls_key_file;
68 };
69
70 static char     **riemann_tags;
71 static size_t     riemann_tags_num;
72 static char     **riemann_attrs;
73 static size_t     riemann_attrs_num;
74
75 /* host->lock must be held when calling this function. */
76 static int wrr_connect(struct riemann_host *host) /* {{{ */
77 {
78         char const              *node;
79         int                      port;
80
81         if (host->client)
82                 return 0;
83
84         node = (host->node != NULL) ? host->node : RIEMANN_HOST;
85         port = (host->port) ? host->port : RIEMANN_PORT;
86
87         host->client = NULL;
88
89         host->client = riemann_client_create(host->client_type, node, port,
90                                              RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
91                                              RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
92                                              RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
93                                              RIEMANN_CLIENT_OPTION_NONE);
94         if (host->client == NULL) {
95                 WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d",
96                         node, port);
97                 return -1;
98         }
99         DEBUG("write_riemann plugin: got a successful connection for: %s:%d",
100               node, port);
101
102         return 0;
103 } /* }}} int wrr_connect */
104
105 /* host->lock must be held when calling this function. */
106 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
107 {
108         if (!host->client)
109                 return (0);
110
111         riemann_client_free(host->client);
112         host->client = NULL;
113
114         return (0);
115 } /* }}} int wrr_disconnect */
116
117 /**
118  * Function to send messages to riemann.
119  *
120  * Acquires the host lock, disconnects on errors.
121  */
122 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
123 {
124         int status = 0;
125         pthread_mutex_lock (&host->lock);
126
127         status = wrr_connect(host);
128         if (status != 0)
129                 return status;
130
131         status = riemann_client_send_message(host->client, msg);
132         if (status != 0) {
133                 wrr_disconnect(host);
134                 pthread_mutex_unlock(&host->lock);
135                 return status;
136         }
137
138         /*
139          * For TCP we need to receive message acknowledgemenent.
140          */
141         if (host->client_type != RIEMANN_CLIENT_UDP)
142         {
143                 riemann_message_t *response;
144
145                 response = riemann_client_recv_message(host->client);
146
147                 if (response == NULL)
148                 {
149                         wrr_disconnect(host);
150                         pthread_mutex_unlock(&host->lock);
151                         return errno;
152                 }
153                 riemann_message_free(response);
154         }
155
156         pthread_mutex_unlock (&host->lock);
157         return 0;
158 } /* }}} int wrr_send */
159
160 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
161                 notification_t const *n)
162 {
163         riemann_message_t *msg;
164         riemann_event_t *event;
165         char service_buffer[6 * DATA_MAX_NAME_LEN];
166         char const *severity;
167         notification_meta_t *meta;
168         size_t i;
169
170         switch (n->severity)
171         {
172                 case NOTIF_OKAY:        severity = "ok"; break;
173                 case NOTIF_WARNING:     severity = "warning"; break;
174                 case NOTIF_FAILURE:     severity = "critical"; break;
175                 default:                severity = "unknown";
176         }
177
178         format_name(service_buffer, sizeof(service_buffer),
179                     /* host = */ "", n->plugin, n->plugin_instance,
180                     n->type, n->type_instance);
181
182         event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
183                                      RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
184                                      RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
185                                      RIEMANN_EVENT_FIELD_STATE, severity,
186                                      RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
187                                      RIEMANN_EVENT_FIELD_NONE);
188
189         if (n->host[0] != 0)
190                 riemann_event_attribute_add(event,
191                                             riemann_attribute_create("host", n->host));
192         if (n->plugin[0] != 0)
193                 riemann_event_attribute_add(event,
194                                             riemann_attribute_create("plugin", n->plugin));
195         if (n->plugin_instance[0] != 0)
196                 riemann_event_attribute_add(event,
197                                             riemann_attribute_create("plugin_instance",
198                                                                      n->plugin_instance));
199
200         if (n->type[0] != 0)
201                 riemann_event_attribute_add(event,
202                                             riemann_attribute_create("type", n->type));
203         if (n->type_instance[0] != 0)
204                 riemann_event_attribute_add(event,
205                                             riemann_attribute_create("type_instance",
206                                                                      n->type_instance));
207
208         for (i = 0; i < riemann_attrs_num; i += 2)
209                 riemann_event_attribute_add(event,
210                                             riemann_attribute_create(riemann_attrs[i],
211                                                                      riemann_attrs[i +1]));
212
213         for (i = 0; i < riemann_tags_num; i++)
214                 riemann_event_tag_add(event, riemann_tags[i]);
215
216         if (n->message[0] != 0)
217                 riemann_event_attribute_add(event,
218                                             riemann_attribute_create("description", n->message));
219
220         /* Pull in values from threshold and add extra attributes */
221         for (meta = n->meta; meta != NULL; meta = meta->next)
222         {
223                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
224                 {
225                         riemann_event_set(event,
226                                           RIEMANN_EVENT_FIELD_METRIC_D,
227                                           (double) meta->nm_value.nm_double,
228                                           RIEMANN_EVENT_FIELD_NONE);
229                         continue;
230                 }
231
232                 if (meta->type == NM_TYPE_STRING) {
233                         riemann_event_attribute_add(event,
234                                                     riemann_attribute_create(meta->name,
235                                                                              meta->nm_value.nm_string));
236                         continue;
237                 }
238         }
239
240         msg = riemann_message_create_with_events(event, NULL);
241         if (msg == NULL)
242         {
243                 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
244                 riemann_event_free (event);
245                 return (NULL);
246         }
247
248         DEBUG("write_riemann plugin: Successfully created message for notification: "
249               "host = \"%s\", service = \"%s\", state = \"%s\"",
250               event->host, event->service, event->state);
251         return (msg);
252 } /* }}} riemann_message_t *wrr_notification_to_message */
253
254 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
255                                            data_set_t const *ds,
256                                            value_list_t const *vl, size_t index,
257                                            gauge_t const *rates,
258                                            int status)
259 {
260         riemann_event_t *event;
261         char name_buffer[5 * DATA_MAX_NAME_LEN];
262         char service_buffer[6 * DATA_MAX_NAME_LEN];
263         size_t i;
264
265         event = riemann_event_new();
266         if (event == NULL)
267         {
268                 ERROR("write_riemann plugin: riemann_event_new() failed.");
269                 return (NULL);
270         }
271
272         format_name(name_buffer, sizeof(name_buffer),
273                     /* host = */ "", vl->plugin, vl->plugin_instance,
274                     vl->type, vl->type_instance);
275         if (host->always_append_ds || (ds->ds_num > 1))
276         {
277                 if (host->event_service_prefix == NULL)
278                         ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
279                                   &name_buffer[1], ds->ds[index].name);
280                 else
281                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
282                                   host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
283         }
284         else
285         {
286                 if (host->event_service_prefix == NULL)
287                         sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
288                 else
289                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
290                                   host->event_service_prefix, &name_buffer[1]);
291         }
292
293         riemann_event_set(event,
294                           RIEMANN_EVENT_FIELD_HOST, vl->host,
295                           RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
296                           RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
297                           RIEMANN_EVENT_FIELD_ATTRIBUTES,
298                           riemann_attribute_create("plugin", vl->plugin),
299                           riemann_attribute_create("type", vl->type),
300                           riemann_attribute_create("ds_name", ds->ds[index].name),
301                           NULL,
302                           RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
303                           RIEMANN_EVENT_FIELD_NONE);
304
305         if (host->check_thresholds) {
306                 const char *state = NULL;
307
308                 switch (status) {
309                         case STATE_OKAY:
310                                 state = "ok";
311                                 break;
312                         case STATE_ERROR:
313                                 state = "critical";
314                                 break;
315                         case STATE_WARNING:
316                                 state = "warning";
317                                 break;
318                         case STATE_MISSING:
319                                 state = "unknown";
320                                 break;
321                 }
322                 if (state)
323                         riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
324                                           RIEMANN_EVENT_FIELD_NONE);
325         }
326
327         if (vl->plugin_instance[0] != 0)
328                 riemann_event_attribute_add(event,
329                                             riemann_attribute_create("plugin_instance",
330                                                                      vl->plugin_instance));
331         if (vl->type_instance[0] != 0)
332                 riemann_event_attribute_add(event,
333                                             riemann_attribute_create("type_instance",
334                                                                      vl->type_instance));
335
336         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
337         {
338                 char ds_type[DATA_MAX_NAME_LEN];
339
340                 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
341                           DS_TYPE_TO_STRING(ds->ds[index].type));
342                 riemann_event_attribute_add(event,
343                                             riemann_attribute_create("ds_type", ds_type));
344         }
345         else
346         {
347                 riemann_event_attribute_add(event,
348                                             riemann_attribute_create("ds_type",
349                                                                      DS_TYPE_TO_STRING(ds->ds[index].type)));
350         }
351
352         {
353                 char ds_index[DATA_MAX_NAME_LEN];
354
355                 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
356                 riemann_event_attribute_add(event,
357                                             riemann_attribute_create("ds_index", ds_index));
358         }
359
360         for (i = 0; i < riemann_attrs_num; i += 2)
361                 riemann_event_attribute_add(event,
362                                             riemann_attribute_create(riemann_attrs[i],
363                                                                      riemann_attrs[i +1]));
364
365         for (i = 0; i < riemann_tags_num; i++)
366                 riemann_event_tag_add(event, riemann_tags[i]);
367
368         if (ds->ds[index].type == DS_TYPE_GAUGE)
369         {
370                 riemann_event_set(event,
371                                   RIEMANN_EVENT_FIELD_METRIC_D,
372                                   (double) vl->values[index].gauge,
373                                   RIEMANN_EVENT_FIELD_NONE);
374         }
375         else if (rates != NULL)
376         {
377                 riemann_event_set(event,
378                                   RIEMANN_EVENT_FIELD_METRIC_D,
379                                   (double) rates[index],
380                                   RIEMANN_EVENT_FIELD_NONE);
381         }
382         else
383         {
384                 int64_t metric;
385
386                 if (ds->ds[index].type == DS_TYPE_DERIVE)
387                         metric = (int64_t) vl->values[index].derive;
388                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
389                         metric = (int64_t) vl->values[index].absolute;
390                 else
391                         metric = (int64_t) vl->values[index].counter;
392
393                 riemann_event_set(event,
394                                   RIEMANN_EVENT_FIELD_METRIC_S64,
395                                   (int64_t) metric,
396                                   RIEMANN_EVENT_FIELD_NONE);
397         }
398
399         DEBUG("write_riemann plugin: Successfully created message for metric: "
400               "host = \"%s\", service = \"%s\"",
401               event->host, event->service);
402         return (event);
403 } /* }}} riemann_event_t *wrr_value_to_event */
404
405 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
406                                                     data_set_t const *ds,
407                                                     value_list_t const *vl,
408                                                     int *statuses)
409 {
410         riemann_message_t *msg;
411         size_t i;
412         gauge_t *rates = NULL;
413
414         /* Initialize the Msg structure. */
415         msg = riemann_message_new();
416         if (msg == NULL)
417         {
418                 ERROR ("write_riemann plugin: riemann_message_new failed.");
419                 return (NULL);
420         }
421
422         if (host->store_rates)
423         {
424                 rates = uc_get_rate(ds, vl);
425                 if (rates == NULL)
426                 {
427                         ERROR("write_riemann plugin: uc_get_rate failed.");
428                         riemann_message_free(msg);
429                         return (NULL);
430                 }
431         }
432
433         for (i = 0; i < vl->values_len; i++)
434         {
435                 riemann_event_t *event;
436
437                 event = wrr_value_to_event(host, ds, vl,
438                                            (int) i, rates, statuses[i]);
439                 if (event == NULL)
440                 {
441                         riemann_message_free(msg);
442                         sfree(rates);
443                         return (NULL);
444                 }
445                 riemann_message_append_events(msg, event, NULL);
446         }
447
448         sfree(rates);
449         return (msg);
450 } /* }}} riemann_message_t *wrr_value_list_to_message */
451
452 /*
453  * Always call while holding host->lock !
454  */
455 static int wrr_batch_flush_nolock(cdtime_t timeout,
456                                   struct riemann_host *host)
457 {
458         cdtime_t    now;
459         int         status = 0;
460
461         if (timeout > 0) {
462                 now = cdtime();
463                 if ((host->batch_init + timeout) > now)
464                         return status;
465         }
466         wrr_send(host, host->batch_msg);
467         riemann_message_free(host->batch_msg);
468
469         if (host->client_type != RIEMANN_CLIENT_UDP)
470         {
471                 riemann_message_t *response;
472
473                 response = riemann_client_recv_message(host->client);
474
475                 if (!response)
476                 {
477                         wrr_disconnect(host);
478                         return errno;
479                 }
480
481                 riemann_message_free(response);
482         }
483
484         host->batch_init = cdtime();
485         host->batch_msg = NULL;
486         return status;
487 }
488
489 static int wrr_batch_flush(cdtime_t timeout,
490         const char *identifier __attribute__((unused)),
491         user_data_t *user_data)
492 {
493         struct riemann_host *host;
494         int status;
495
496         if (user_data == NULL)
497                 return (-EINVAL);
498
499         host = user_data->data;
500         pthread_mutex_lock(&host->lock);
501         status = wrr_batch_flush_nolock(timeout, host);
502         if (status != 0)
503                 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
504                       status);
505
506         pthread_mutex_unlock(&host->lock);
507         return status;
508 }
509
510 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
511                                     data_set_t const *ds,
512                                     value_list_t const *vl,
513                                     int *statuses)
514 {
515         riemann_message_t *msg;
516         size_t len;
517         int ret;
518
519         msg = wrr_value_list_to_message(host, ds, vl, statuses);
520         if (msg == NULL)
521                 return -1;
522
523         pthread_mutex_lock(&host->lock);
524
525         if (host->batch_msg == NULL) {
526                 host->batch_msg = msg;
527         } else {
528                 int status;
529
530                 status = riemann_message_append_events_n(host->batch_msg,
531                                                          msg->n_events,
532                                                          msg->events);
533                 msg->n_events = 0;
534                 msg->events = NULL;
535
536                 riemann_message_free(msg);
537
538                 if (status != 0) {
539                         pthread_mutex_unlock(&host->lock);
540                         ERROR("write_riemann plugin: out of memory");
541                         return -1;
542                 }
543         }
544
545         len = protobuf_c_message_get_packed_size((const ProtobufCMessage*)(host->batch_msg));
546         ret = 0;
547         if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
548                 ret = wrr_batch_flush_nolock(0, host);
549         }
550
551         pthread_mutex_unlock(&host->lock);
552         return ret;
553 } /* }}} riemann_message_t *wrr_batch_add_value_list */
554
555 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
556 {
557         int                      status;
558         struct riemann_host     *host = ud->data;
559         riemann_message_t       *msg;
560
561         if (!host->notifications)
562                 return 0;
563
564     /*
565      * Never batch for notifications, send them ASAP
566      */
567         msg = wrr_notification_to_message(host, n);
568         if (msg == NULL)
569                 return (-1);
570
571         status = wrr_send(host, msg);
572         if (status != 0)
573                 ERROR("write_riemann plugin: riemann_client_send failed with status %i",
574                       status);
575
576         riemann_message_free(msg);
577         return (status);
578 } /* }}} int wrr_notification */
579
580 static int wrr_write(const data_set_t *ds, /* {{{ */
581               const value_list_t *vl,
582               user_data_t *ud)
583 {
584         int                      status = 0;
585         int                      statuses[vl->values_len];
586         struct riemann_host     *host = ud->data;
587         riemann_message_t       *msg;
588
589         if (host->check_thresholds) {
590                 status = write_riemann_threshold_check(ds, vl, statuses);
591     if (status != 0)
592       return status;
593   } else {
594     memset (statuses, 0, sizeof (statuses));
595   }
596
597   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
598     wrr_batch_add_value_list(host, ds, vl, statuses);
599   } else {
600     msg = wrr_value_list_to_message(host, ds, vl, statuses);
601     if (msg == NULL)
602       return (-1);
603
604     status = wrr_send(host, msg);
605     if (status != 0)
606       ERROR("write_riemann plugin: riemann_client_send failed with status %i",
607             status);
608
609     riemann_message_free(msg);
610   }
611   return status;
612 } /* }}} int wrr_write */
613
614 static void wrr_free(void *p) /* {{{ */
615 {
616   struct riemann_host   *host = p;
617
618   if (host == NULL)
619     return;
620
621   pthread_mutex_lock(&host->lock);
622
623   host->reference_count--;
624   if (host->reference_count > 0)
625     {
626       pthread_mutex_unlock(&host->lock);
627       return;
628     }
629
630   wrr_disconnect(host);
631
632   pthread_mutex_destroy(&host->lock);
633   sfree(host);
634 } /* }}} void wrr_free */
635
636 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
637 {
638   struct riemann_host   *host = NULL;
639   int                    status = 0;
640   int                    i;
641   oconfig_item_t                *child;
642   char                   callback_name[DATA_MAX_NAME_LEN];
643   user_data_t            ud;
644
645   if ((host = calloc(1, sizeof(*host))) == NULL) {
646     ERROR ("write_riemann plugin: calloc failed.");
647     return ENOMEM;
648   }
649   pthread_mutex_init(&host->lock, NULL);
650   host->reference_count = 1;
651   host->node = NULL;
652   host->port = 0;
653   host->notifications = 1;
654   host->check_thresholds = 0;
655   host->store_rates = 1;
656   host->always_append_ds = 0;
657   host->batch_mode = 1;
658   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
659   host->batch_init = cdtime();
660   host->ttl_factor = RIEMANN_TTL_FACTOR;
661   host->client = NULL;
662   host->client_type = RIEMANN_CLIENT_TCP;
663
664   status = cf_util_get_string(ci, &host->name);
665   if (status != 0) {
666     WARNING("write_riemann plugin: Required host name is missing.");
667     wrr_free(host);
668     return -1;
669   }
670
671   for (i = 0; i < ci->children_num; i++) {
672     /*
673      * The code here could be simplified but makes room
674      * for easy adding of new options later on.
675      */
676     child = &ci->children[i];
677     status = 0;
678
679     if (strcasecmp("Host", child->key) == 0) {
680       status = cf_util_get_string(child, &host->node);
681       if (status != 0)
682         break;
683     } else if (strcasecmp("Notifications", child->key) == 0) {
684       status = cf_util_get_boolean(child, &host->notifications);
685       if (status != 0)
686         break;
687     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
688       status = cf_util_get_string(child, &host->event_service_prefix);
689       if (status != 0)
690         break;
691     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
692       status = cf_util_get_boolean(child, &host->check_thresholds);
693       if (status != 0)
694         break;
695     } else if (strcasecmp("Batch", child->key) == 0) {
696       status = cf_util_get_boolean(child, &host->batch_mode);
697       if (status != 0)
698         break;
699     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
700       status = cf_util_get_int(child, &host->batch_max);
701       if (status != 0)
702         break;
703     } else if (strcasecmp("Port", child->key) == 0) {
704       host->port = cf_util_get_port_number(child);
705       if (host->port == -1) {
706         ERROR("write_riemann plugin: Invalid argument "
707               "configured for the \"Port\" "
708               "option.");
709         break;
710       }
711     } else if (strcasecmp("Protocol", child->key) == 0) {
712       char tmp[16];
713       status = cf_util_get_string_buffer(child,
714                                          tmp, sizeof(tmp));
715       if (status != 0)
716         {
717           ERROR("write_riemann plugin: cf_util_get_"
718                 "string_buffer failed with "
719                 "status %i.", status);
720           break;
721         }
722
723       if (strcasecmp("UDP", tmp) == 0)
724         host->client_type = RIEMANN_CLIENT_UDP;
725       else if (strcasecmp("TCP", tmp) == 0)
726         host->client_type = RIEMANN_CLIENT_TCP;
727       else if (strcasecmp("TLS", tmp) == 0)
728         host->client_type = RIEMANN_CLIENT_TLS;
729       else
730         WARNING("write_riemann plugin: The value "
731                 "\"%s\" is not valid for the "
732                 "\"Protocol\" option. Use "
733                 "either \"UDP\", \"TCP\" or \"TLS\".",
734                 tmp);
735     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
736       status = cf_util_get_string(child, &host->tls_ca_file);
737       if (status != 0)
738         {
739           ERROR("write_riemann plugin: cf_util_get_"
740                 "string_buffer failed with "
741                 "status %i.", status);
742           break;
743         }
744     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
745       status = cf_util_get_string(child, &host->tls_cert_file);
746       if (status != 0)
747         {
748           ERROR("write_riemann plugin: cf_util_get_"
749                 "string_buffer failed with "
750                 "status %i.", status);
751           break;
752         }
753     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
754       status = cf_util_get_string(child, &host->tls_key_file);
755       if (status != 0)
756         {
757           ERROR("write_riemann plugin: cf_util_get_"
758                 "string_buffer failed with "
759                 "status %i.", status);
760           break;
761         }
762     } else if (strcasecmp("StoreRates", child->key) == 0) {
763       status = cf_util_get_boolean(child, &host->store_rates);
764       if (status != 0)
765         break;
766     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
767       status = cf_util_get_boolean(child,
768                                    &host->always_append_ds);
769       if (status != 0)
770         break;
771     } else if (strcasecmp("TTLFactor", child->key) == 0) {
772       double tmp = NAN;
773       status = cf_util_get_double(child, &tmp);
774       if (status != 0)
775         break;
776       if (tmp >= 2.0) {
777         host->ttl_factor = tmp;
778       } else if (tmp >= 1.0) {
779         NOTICE("write_riemann plugin: The configured "
780                "TTLFactor is very small "
781                "(%.1f). A value of 2.0 or "
782                "greater is recommended.",
783                tmp);
784         host->ttl_factor = tmp;
785       } else if (tmp > 0.0) {
786         WARNING("write_riemann plugin: The configured "
787                 "TTLFactor is too small to be "
788                 "useful (%.1f). I'll use it "
789                 "since the user knows best, "
790                 "but under protest.",
791                 tmp);
792         host->ttl_factor = tmp;
793       } else { /* zero, negative and NAN */
794         ERROR("write_riemann plugin: The configured "
795               "TTLFactor is invalid (%.1f).",
796               tmp);
797       }
798     } else {
799       WARNING("write_riemann plugin: ignoring unknown config "
800               "option: \"%s\"", child->key);
801     }
802   }
803   if (status != 0) {
804     wrr_free(host);
805     return status;
806   }
807
808   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
809             host->name);
810   ud.data = host;
811   ud.free_func = wrr_free;
812
813   pthread_mutex_lock(&host->lock);
814
815   status = plugin_register_write(callback_name, wrr_write, &ud);
816
817   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
818     ud.free_func = NULL;
819     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
820   }
821   if (status != 0)
822     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
823             "failed with status %i.",
824             callback_name, status);
825   else /* success */
826     host->reference_count++;
827
828   status = plugin_register_notification(callback_name,
829                                         wrr_notification, &ud);
830   if (status != 0)
831     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
832             "failed with status %i.",
833             callback_name, status);
834   else /* success */
835     host->reference_count++;
836
837   if (host->reference_count <= 1)
838     {
839       /* Both callbacks failed => free memory.
840        * We need to unlock here, because riemann_free() will lock.
841        * This is not a race condition, because we're the only one
842        * holding a reference. */
843       pthread_mutex_unlock(&host->lock);
844       wrr_free(host);
845       return (-1);
846     }
847
848   host->reference_count--;
849   pthread_mutex_unlock(&host->lock);
850
851   return status;
852 } /* }}} int wrr_config_node */
853
854 static int wrr_config(oconfig_item_t *ci) /* {{{ */
855 {
856   int            i;
857   oconfig_item_t        *child;
858   int            status;
859
860   for (i = 0; i < ci->children_num; i++)  {
861     child = &ci->children[i];
862
863     if (strcasecmp("Node", child->key) == 0) {
864       wrr_config_node (child);
865     } else if (strcasecmp(child->key, "attribute") == 0) {
866       char *key = NULL;
867       char *val = NULL;
868
869       if (child->values_num != 2) {
870         WARNING("riemann attributes need both a key and a value.");
871         return (-1);
872       }
873       if (child->values[0].type != OCONFIG_TYPE_STRING ||
874           child->values[1].type != OCONFIG_TYPE_STRING) {
875         WARNING("riemann attribute needs string arguments.");
876         return (-1);
877       }
878       if ((key = strdup(child->values[0].value.string)) == NULL) {
879         WARNING("cannot allocate memory for attribute key.");
880         return (-1);
881       }
882       if ((val = strdup(child->values[1].value.string)) == NULL) {
883         WARNING("cannot allocate memory for attribute value.");
884         sfree(key);
885         return (-1);
886       }
887       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
888       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
889       DEBUG("write_riemann: got attr: %s => %s", key, val);
890       sfree(key);
891       sfree(val);
892     } else if (strcasecmp(child->key, "tag") == 0) {
893       char *tmp = NULL;
894       status = cf_util_get_string(child, &tmp);
895       if (status != 0)
896         continue;
897
898       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
899       DEBUG("write_riemann plugin: Got tag: %s", tmp);
900       sfree(tmp);
901     } else {
902       WARNING("write_riemann plugin: Ignoring unknown "
903               "configuration option \"%s\" at top level.",
904               child->key);
905     }
906   }
907   return (0);
908 } /* }}} int wrr_config */
909
910 void module_register(void)
911 {
912   plugin_register_complex_config("write_riemann", wrr_config);
913 }
914
915 /* vim: set sw=8 sts=8 ts=8 noet : */