Merge branch 'pr/1791'
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include "collectd.h"
32
33
34 #include "plugin.h"
35 #include "common.h"
36 #include "configfile.h"
37 #include "utils_cache.h"
38 #include "utils_complain.h"
39 #include "write_riemann_threshold.h"
40
41 #include <errno.h>
42 #include <riemann/riemann-client.h>
43
44 #define RIEMANN_HOST            "localhost"
45 #define RIEMANN_PORT            5555
46 #define RIEMANN_TTL_FACTOR      2.0
47 #define RIEMANN_BATCH_MAX      8192
48
49 struct riemann_host {
50     c_complain_t init_complaint;
51         char                    *name;
52         char                    *event_service_prefix;
53         pthread_mutex_t  lock;
54     _Bool            batch_mode;
55         _Bool            notifications;
56         _Bool            check_thresholds;
57         _Bool                    store_rates;
58         _Bool                    always_append_ds;
59         char                    *node;
60         int                      port;
61         riemann_client_type_t    client_type;
62         riemann_client_t        *client;
63         double                   ttl_factor;
64     cdtime_t         batch_init;
65     int              batch_max;
66     int              batch_timeout;
67         int                          reference_count;
68   riemann_message_t     *batch_msg;
69         char                     *tls_ca_file;
70         char                     *tls_cert_file;
71         char                     *tls_key_file;
72         struct timeval timeout;
73 };
74
75 static char     **riemann_tags;
76 static size_t     riemann_tags_num;
77 static char     **riemann_attrs;
78 static size_t     riemann_attrs_num;
79
80 /* host->lock must be held when calling this function. */
81 static int wrr_connect(struct riemann_host *host) /* {{{ */
82 {
83         char const              *node;
84         int                      port;
85
86         if (host->client)
87                 return 0;
88
89         node = (host->node != NULL) ? host->node : RIEMANN_HOST;
90         port = (host->port) ? host->port : RIEMANN_PORT;
91
92         host->client = NULL;
93
94         host->client = riemann_client_create(host->client_type, node, port,
95                                              RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file,
96                                              RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file,
97                                              RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
98                                              RIEMANN_CLIENT_OPTION_NONE);
99         if (host->client == NULL) {
100         c_complain (LOG_ERR, &host->init_complaint,
101                     "write_riemann plugin: Unable to connect to Riemann at %s:%d",
102                     node, port);
103                 return -1;
104         }
105         if (host->timeout.tv_sec != 0) {
106                 if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
107                         riemann_client_free(host->client);
108                         host->client = NULL;
109             c_complain (LOG_ERR, &host->init_complaint,
110                         "write_riemann plugin: Unable to connect to Riemann at %s:%d",
111                         node, port);
112                         return -1;
113                 }
114         }
115
116     c_release (LOG_INFO, &host->init_complaint,
117                "write_riemann plugin: Successfully connected to %s:%d",
118                node, port);
119
120         return 0;
121 } /* }}} int wrr_connect */
122
123 /* host->lock must be held when calling this function. */
124 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
125 {
126         if (!host->client)
127                 return (0);
128
129         riemann_client_free(host->client);
130         host->client = NULL;
131
132         return (0);
133 } /* }}} int wrr_disconnect */
134
135 /**
136  * Function to send messages to riemann.
137  *
138  * Acquires the host lock, disconnects on errors.
139  */
140 static int wrr_send_nolock(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
141 {
142         int status = 0;
143
144         status = wrr_connect(host);
145         if (status != 0) {
146                 return status;
147     }
148
149         status = riemann_client_send_message(host->client, msg);
150         if (status != 0) {
151                 wrr_disconnect(host);
152                 return status;
153         }
154
155         /*
156          * For TCP we need to receive message acknowledgemenent.
157          */
158         if (host->client_type != RIEMANN_CLIENT_UDP)
159         {
160                 riemann_message_t *response;
161
162                 response = riemann_client_recv_message(host->client);
163
164                 if (response == NULL)
165                 {
166                         wrr_disconnect(host);
167                         return errno;
168                 }
169                 riemann_message_free(response);
170         }
171
172         return 0;
173 } /* }}} int wrr_send */
174
175 static int wrr_send(struct riemann_host *host, riemann_message_t *msg)
176 {
177     int status = 0;
178
179     pthread_mutex_lock (&host->lock);
180     status = wrr_send_nolock(host, msg);
181     pthread_mutex_unlock (&host->lock);
182     return status;
183 }
184
185 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
186                 notification_t const *n)
187 {
188         riemann_message_t *msg;
189         riemann_event_t *event;
190         char service_buffer[6 * DATA_MAX_NAME_LEN];
191         char const *severity;
192         notification_meta_t *meta;
193         size_t i;
194
195         switch (n->severity)
196         {
197                 case NOTIF_OKAY:        severity = "ok"; break;
198                 case NOTIF_WARNING:     severity = "warning"; break;
199                 case NOTIF_FAILURE:     severity = "critical"; break;
200                 default:                severity = "unknown";
201         }
202
203         format_name(service_buffer, sizeof(service_buffer),
204                     /* host = */ "", n->plugin, n->plugin_instance,
205                     n->type, n->type_instance);
206
207         event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host,
208                                      RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time),
209                                      RIEMANN_EVENT_FIELD_TAGS, "notification", NULL,
210                                      RIEMANN_EVENT_FIELD_STATE, severity,
211                                      RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
212                                      RIEMANN_EVENT_FIELD_NONE);
213
214         if (n->host[0] != 0)
215                 riemann_event_string_attribute_add(event, "host", n->host);
216         if (n->plugin[0] != 0)
217                 riemann_event_string_attribute_add(event, "plugin", n->plugin);
218         if (n->plugin_instance[0] != 0)
219                 riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
220
221         if (n->type[0] != 0)
222                 riemann_event_string_attribute_add(event, "type", n->type);
223         if (n->type_instance[0] != 0)
224                 riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
225
226         for (i = 0; i < riemann_attrs_num; i += 2)
227                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
228
229         for (i = 0; i < riemann_tags_num; i++)
230                 riemann_event_tag_add(event, riemann_tags[i]);
231
232         if (n->message[0] != 0)
233                 riemann_event_string_attribute_add(event, "description", n->message);
234
235         /* Pull in values from threshold and add extra attributes */
236         for (meta = n->meta; meta != NULL; meta = meta->next)
237         {
238                 if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
239                 {
240                         riemann_event_set(event,
241                                           RIEMANN_EVENT_FIELD_METRIC_D,
242                                           (double) meta->nm_value.nm_double,
243                                           RIEMANN_EVENT_FIELD_NONE);
244                         continue;
245                 }
246
247                 if (meta->type == NM_TYPE_STRING) {
248                         riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
249                         continue;
250                 }
251         }
252
253         msg = riemann_message_create_with_events(event, NULL);
254         if (msg == NULL)
255         {
256                 ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
257                 riemann_event_free (event);
258                 return (NULL);
259         }
260
261         DEBUG("write_riemann plugin: Successfully created message for notification: "
262               "host = \"%s\", service = \"%s\", state = \"%s\"",
263               event->host, event->service, event->state);
264         return (msg);
265 } /* }}} riemann_message_t *wrr_notification_to_message */
266
267 static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */
268                                            data_set_t const *ds,
269                                            value_list_t const *vl, size_t index,
270                                            gauge_t const *rates,
271                                            int status)
272 {
273         riemann_event_t *event;
274         char name_buffer[5 * DATA_MAX_NAME_LEN];
275         char service_buffer[6 * DATA_MAX_NAME_LEN];
276         size_t i;
277
278         event = riemann_event_new();
279         if (event == NULL)
280         {
281                 ERROR("write_riemann plugin: riemann_event_new() failed.");
282                 return (NULL);
283         }
284
285         format_name(name_buffer, sizeof(name_buffer),
286                     /* host = */ "", vl->plugin, vl->plugin_instance,
287                     vl->type, vl->type_instance);
288         if (host->always_append_ds || (ds->ds_num > 1))
289         {
290                 if (host->event_service_prefix == NULL)
291                         ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
292                                   &name_buffer[1], ds->ds[index].name);
293                 else
294                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
295                                   host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
296         }
297         else
298         {
299                 if (host->event_service_prefix == NULL)
300                         sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
301                 else
302                         ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
303                                   host->event_service_prefix, &name_buffer[1]);
304         }
305
306         riemann_event_set(event,
307                           RIEMANN_EVENT_FIELD_HOST, vl->host,
308                           RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
309                           RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
310                           RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
311                           "plugin", vl->plugin,
312                           "type", vl->type,
313                           "ds_name", ds->ds[index].name,
314                           NULL,
315                           RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
316                           RIEMANN_EVENT_FIELD_NONE);
317
318         if (host->check_thresholds) {
319                 const char *state = NULL;
320
321                 switch (status) {
322                         case STATE_OKAY:
323                                 state = "ok";
324                                 break;
325                         case STATE_ERROR:
326                                 state = "critical";
327                                 break;
328                         case STATE_WARNING:
329                                 state = "warning";
330                                 break;
331                         case STATE_MISSING:
332                                 state = "unknown";
333                                 break;
334                 }
335                 if (state)
336                         riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
337                                           RIEMANN_EVENT_FIELD_NONE);
338         }
339
340         if (vl->plugin_instance[0] != 0)
341                 riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
342         if (vl->type_instance[0] != 0)
343                 riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
344
345         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
346         {
347                 char ds_type[DATA_MAX_NAME_LEN];
348
349                 ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
350                           DS_TYPE_TO_STRING(ds->ds[index].type));
351                 riemann_event_string_attribute_add(event, "ds_type", ds_type);
352         }
353         else
354         {
355                 riemann_event_string_attribute_add(event, "ds_type",
356                                        DS_TYPE_TO_STRING(ds->ds[index].type));
357         }
358
359         {
360                 char ds_index[DATA_MAX_NAME_LEN];
361
362                 ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
363                 riemann_event_string_attribute_add(event, "ds_index", ds_index);
364         }
365
366         for (i = 0; i < riemann_attrs_num; i += 2)
367                 riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
368
369         for (i = 0; i < riemann_tags_num; i++)
370                 riemann_event_tag_add(event, riemann_tags[i]);
371
372         if (ds->ds[index].type == DS_TYPE_GAUGE)
373         {
374                 riemann_event_set(event,
375                                   RIEMANN_EVENT_FIELD_METRIC_D,
376                                   (double) vl->values[index].gauge,
377                                   RIEMANN_EVENT_FIELD_NONE);
378         }
379         else if (rates != NULL)
380         {
381                 riemann_event_set(event,
382                                   RIEMANN_EVENT_FIELD_METRIC_D,
383                                   (double) rates[index],
384                                   RIEMANN_EVENT_FIELD_NONE);
385         }
386         else
387         {
388                 int64_t metric;
389
390                 if (ds->ds[index].type == DS_TYPE_DERIVE)
391                         metric = (int64_t) vl->values[index].derive;
392                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
393                         metric = (int64_t) vl->values[index].absolute;
394                 else
395                         metric = (int64_t) vl->values[index].counter;
396
397                 riemann_event_set(event,
398                                   RIEMANN_EVENT_FIELD_METRIC_S64,
399                                   (int64_t) metric,
400                                   RIEMANN_EVENT_FIELD_NONE);
401         }
402
403         DEBUG("write_riemann plugin: Successfully created message for metric: "
404               "host = \"%s\", service = \"%s\"",
405               event->host, event->service);
406         return (event);
407 } /* }}} riemann_event_t *wrr_value_to_event */
408
409 static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
410                                                     data_set_t const *ds,
411                                                     value_list_t const *vl,
412                                                     int *statuses)
413 {
414         riemann_message_t *msg;
415         size_t i;
416         gauge_t *rates = NULL;
417
418         /* Initialize the Msg structure. */
419         msg = riemann_message_new();
420         if (msg == NULL)
421         {
422                 ERROR ("write_riemann plugin: riemann_message_new failed.");
423                 return (NULL);
424         }
425
426         if (host->store_rates)
427         {
428                 rates = uc_get_rate(ds, vl);
429                 if (rates == NULL)
430                 {
431                         ERROR("write_riemann plugin: uc_get_rate failed.");
432                         riemann_message_free(msg);
433                         return (NULL);
434                 }
435         }
436
437         for (i = 0; i < vl->values_len; i++)
438         {
439                 riemann_event_t *event;
440
441                 event = wrr_value_to_event(host, ds, vl,
442                                            (int) i, rates, statuses[i]);
443                 if (event == NULL)
444                 {
445                         riemann_message_free(msg);
446                         sfree(rates);
447                         return (NULL);
448                 }
449                 riemann_message_append_events(msg, event, NULL);
450         }
451
452         sfree(rates);
453         return (msg);
454 } /* }}} riemann_message_t *wrr_value_list_to_message */
455
456 /*
457  * Always call while holding host->lock !
458  */
459 static int wrr_batch_flush_nolock(cdtime_t timeout,
460                                   struct riemann_host *host)
461 {
462         cdtime_t    now;
463         int         status = 0;
464
465     now = cdtime();
466         if (timeout > 0) {
467                 if ((host->batch_init + timeout) > now) {
468                         return status;
469         }
470         }
471         wrr_send_nolock(host, host->batch_msg);
472         riemann_message_free(host->batch_msg);
473
474         host->batch_init = now;
475         host->batch_msg = NULL;
476         return status;
477 }
478
479 static int wrr_batch_flush(cdtime_t timeout,
480         const char *identifier __attribute__((unused)),
481         user_data_t *user_data)
482 {
483         struct riemann_host *host;
484         int status;
485
486         if (user_data == NULL)
487                 return (-EINVAL);
488
489         host = user_data->data;
490         pthread_mutex_lock(&host->lock);
491         status = wrr_batch_flush_nolock(timeout, host);
492         if (status != 0)
493         c_complain (LOG_ERR, &host->init_complaint,
494                     "write_riemann plugin: riemann_client_send failed with status %i",
495                     status);
496     else
497         c_release (LOG_DEBUG, &host->init_complaint, "write_riemann plugin: batch sent.");
498
499         pthread_mutex_unlock(&host->lock);
500         return status;
501 }
502
503 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
504                                     data_set_t const *ds,
505                                     value_list_t const *vl,
506                                     int *statuses)
507 {
508         riemann_message_t *msg;
509         size_t len;
510         int ret;
511     cdtime_t timeout;
512
513         msg = wrr_value_list_to_message(host, ds, vl, statuses);
514         if (msg == NULL)
515                 return -1;
516
517         pthread_mutex_lock(&host->lock);
518
519         if (host->batch_msg == NULL) {
520                 host->batch_msg = msg;
521         } else {
522                 int status;
523
524                 status = riemann_message_append_events_n(host->batch_msg,
525                                                          msg->n_events,
526                                                          msg->events);
527                 msg->n_events = 0;
528                 msg->events = NULL;
529
530                 riemann_message_free(msg);
531
532                 if (status != 0) {
533                         pthread_mutex_unlock(&host->lock);
534                         ERROR("write_riemann plugin: out of memory");
535                         return -1;
536                 }
537         }
538
539         len = riemann_message_get_packed_size(host->batch_msg);
540         ret = 0;
541         if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
542                 ret = wrr_batch_flush_nolock(0, host);
543         } else {
544         if (host->batch_timeout > 0) {
545             timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
546             ret = wrr_batch_flush_nolock(timeout, host);
547         }
548     }
549
550         pthread_mutex_unlock(&host->lock);
551         return ret;
552 } /* }}} riemann_message_t *wrr_batch_add_value_list */
553
554 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
555 {
556         int                      status;
557         struct riemann_host     *host = ud->data;
558         riemann_message_t       *msg;
559
560         if (!host->notifications)
561                 return 0;
562
563     /*
564      * Never batch for notifications, send them ASAP
565      */
566         msg = wrr_notification_to_message(host, n);
567         if (msg == NULL)
568                 return (-1);
569
570         status = wrr_send(host, msg);
571         if (status != 0)
572         c_complain (LOG_ERR, &host->init_complaint,
573                     "write_riemann plugin: riemann_client_send failed with status %i",
574                     status);
575     else
576         c_release (LOG_DEBUG, &host->init_complaint,
577                    "write_riemann plugin: riemann_client_send succeeded");
578
579         riemann_message_free(msg);
580         return (status);
581 } /* }}} int wrr_notification */
582
583 static int wrr_write(const data_set_t *ds, /* {{{ */
584               const value_list_t *vl,
585               user_data_t *ud)
586 {
587         int                      status = 0;
588         int                      statuses[vl->values_len];
589         struct riemann_host     *host = ud->data;
590         riemann_message_t       *msg;
591
592         if (host->check_thresholds) {
593                 status = write_riemann_threshold_check(ds, vl, statuses);
594     if (status != 0)
595       return status;
596   } else {
597     memset (statuses, 0, sizeof (statuses));
598   }
599
600   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
601       wrr_batch_add_value_list(host, ds, vl, statuses);
602   } else {
603     msg = wrr_value_list_to_message(host, ds, vl, statuses);
604     if (msg == NULL)
605       return (-1);
606
607     status = wrr_send(host, msg);
608
609     riemann_message_free(msg);
610   }
611   return status;
612 } /* }}} int wrr_write */
613
614 static void wrr_free(void *p) /* {{{ */
615 {
616   struct riemann_host   *host = p;
617
618   if (host == NULL)
619     return;
620
621   pthread_mutex_lock(&host->lock);
622
623   host->reference_count--;
624   if (host->reference_count > 0)
625     {
626       pthread_mutex_unlock(&host->lock);
627       return;
628     }
629
630   wrr_disconnect(host);
631
632   pthread_mutex_destroy(&host->lock);
633   sfree(host);
634 } /* }}} void wrr_free */
635
636 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
637 {
638   struct riemann_host   *host = NULL;
639   int                    status = 0;
640   int                    i;
641   oconfig_item_t                *child;
642   char                   callback_name[DATA_MAX_NAME_LEN];
643   user_data_t            ud;
644
645   if ((host = calloc(1, sizeof(*host))) == NULL) {
646     ERROR ("write_riemann plugin: calloc failed.");
647     return ENOMEM;
648   }
649   pthread_mutex_init(&host->lock, NULL);
650   C_COMPLAIN_INIT (&host->init_complaint);
651   host->reference_count = 1;
652   host->node = NULL;
653   host->port = 0;
654   host->notifications = 1;
655   host->check_thresholds = 0;
656   host->store_rates = 1;
657   host->always_append_ds = 0;
658   host->batch_mode = 1;
659   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
660   host->batch_init = cdtime();
661   host->batch_timeout = 0;
662   host->ttl_factor = RIEMANN_TTL_FACTOR;
663   host->client = NULL;
664   host->client_type = RIEMANN_CLIENT_TCP;
665   host->timeout.tv_sec = 0;
666   host->timeout.tv_usec = 0;
667
668   status = cf_util_get_string(ci, &host->name);
669   if (status != 0) {
670     WARNING("write_riemann plugin: Required host name is missing.");
671     wrr_free(host);
672     return -1;
673   }
674
675   for (i = 0; i < ci->children_num; i++) {
676     /*
677      * The code here could be simplified but makes room
678      * for easy adding of new options later on.
679      */
680     child = &ci->children[i];
681     status = 0;
682
683     if (strcasecmp("Host", child->key) == 0) {
684       status = cf_util_get_string(child, &host->node);
685       if (status != 0)
686         break;
687     } else if (strcasecmp("Notifications", child->key) == 0) {
688       status = cf_util_get_boolean(child, &host->notifications);
689       if (status != 0)
690         break;
691     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
692       status = cf_util_get_string(child, &host->event_service_prefix);
693       if (status != 0)
694         break;
695     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
696       status = cf_util_get_boolean(child, &host->check_thresholds);
697       if (status != 0)
698         break;
699     } else if (strcasecmp("Batch", child->key) == 0) {
700       status = cf_util_get_boolean(child, &host->batch_mode);
701       if (status != 0)
702         break;
703     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
704       status = cf_util_get_int(child, &host->batch_max);
705       if (status != 0)
706         break;
707     } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
708       status = cf_util_get_int(child, &host->batch_timeout);
709       if (status != 0)
710         break;
711     } else if (strcasecmp("Timeout", child->key) == 0) {
712       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
713       if (status != 0)
714         break;
715     } else if (strcasecmp("Port", child->key) == 0) {
716       host->port = cf_util_get_port_number(child);
717       if (host->port == -1) {
718         ERROR("write_riemann plugin: Invalid argument "
719               "configured for the \"Port\" "
720               "option.");
721         break;
722       }
723     } else if (strcasecmp("Protocol", child->key) == 0) {
724       char tmp[16];
725       status = cf_util_get_string_buffer(child,
726                                          tmp, sizeof(tmp));
727       if (status != 0)
728         {
729           ERROR("write_riemann plugin: cf_util_get_"
730                 "string_buffer failed with "
731                 "status %i.", status);
732           break;
733         }
734
735       if (strcasecmp("UDP", tmp) == 0)
736         host->client_type = RIEMANN_CLIENT_UDP;
737       else if (strcasecmp("TCP", tmp) == 0)
738         host->client_type = RIEMANN_CLIENT_TCP;
739       else if (strcasecmp("TLS", tmp) == 0)
740         host->client_type = RIEMANN_CLIENT_TLS;
741       else
742         WARNING("write_riemann plugin: The value "
743                 "\"%s\" is not valid for the "
744                 "\"Protocol\" option. Use "
745                 "either \"UDP\", \"TCP\" or \"TLS\".",
746                 tmp);
747     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
748       status = cf_util_get_string(child, &host->tls_ca_file);
749       if (status != 0)
750         {
751           ERROR("write_riemann plugin: cf_util_get_"
752                 "string_buffer failed with "
753                 "status %i.", status);
754           break;
755         }
756     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
757       status = cf_util_get_string(child, &host->tls_cert_file);
758       if (status != 0)
759         {
760           ERROR("write_riemann plugin: cf_util_get_"
761                 "string_buffer failed with "
762                 "status %i.", status);
763           break;
764         }
765     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
766       status = cf_util_get_string(child, &host->tls_key_file);
767       if (status != 0)
768         {
769           ERROR("write_riemann plugin: cf_util_get_"
770                 "string_buffer failed with "
771                 "status %i.", status);
772           break;
773         }
774     } else if (strcasecmp("StoreRates", child->key) == 0) {
775       status = cf_util_get_boolean(child, &host->store_rates);
776       if (status != 0)
777         break;
778     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
779       status = cf_util_get_boolean(child,
780                                    &host->always_append_ds);
781       if (status != 0)
782         break;
783     } else if (strcasecmp("TTLFactor", child->key) == 0) {
784       double tmp = NAN;
785       status = cf_util_get_double(child, &tmp);
786       if (status != 0)
787         break;
788       if (tmp >= 2.0) {
789         host->ttl_factor = tmp;
790       } else if (tmp >= 1.0) {
791         NOTICE("write_riemann plugin: The configured "
792                "TTLFactor is very small "
793                "(%.1f). A value of 2.0 or "
794                "greater is recommended.",
795                tmp);
796         host->ttl_factor = tmp;
797       } else if (tmp > 0.0) {
798         WARNING("write_riemann plugin: The configured "
799                 "TTLFactor is too small to be "
800                 "useful (%.1f). I'll use it "
801                 "since the user knows best, "
802                 "but under protest.",
803                 tmp);
804         host->ttl_factor = tmp;
805       } else { /* zero, negative and NAN */
806         ERROR("write_riemann plugin: The configured "
807               "TTLFactor is invalid (%.1f).",
808               tmp);
809       }
810     } else {
811       WARNING("write_riemann plugin: ignoring unknown config "
812               "option: \"%s\"", child->key);
813     }
814   }
815   if (status != 0) {
816     wrr_free(host);
817     return status;
818   }
819
820   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
821             host->name);
822   ud.data = host;
823   ud.free_func = wrr_free;
824
825   pthread_mutex_lock(&host->lock);
826
827   status = plugin_register_write(callback_name, wrr_write, &ud);
828
829   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
830     ud.free_func = NULL;
831     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
832   }
833   if (status != 0)
834     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
835             "failed with status %i.",
836             callback_name, status);
837   else /* success */
838     host->reference_count++;
839
840   status = plugin_register_notification(callback_name,
841                                         wrr_notification, &ud);
842   if (status != 0)
843     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
844             "failed with status %i.",
845             callback_name, status);
846   else /* success */
847     host->reference_count++;
848
849   if (host->reference_count <= 1)
850     {
851       /* Both callbacks failed => free memory.
852        * We need to unlock here, because riemann_free() will lock.
853        * This is not a race condition, because we're the only one
854        * holding a reference. */
855       pthread_mutex_unlock(&host->lock);
856       wrr_free(host);
857       return (-1);
858     }
859
860   host->reference_count--;
861   pthread_mutex_unlock(&host->lock);
862
863   return status;
864 } /* }}} int wrr_config_node */
865
866 static int wrr_config(oconfig_item_t *ci) /* {{{ */
867 {
868   int            i;
869   oconfig_item_t        *child;
870   int            status;
871
872   for (i = 0; i < ci->children_num; i++)  {
873     child = &ci->children[i];
874
875     if (strcasecmp("Node", child->key) == 0) {
876       wrr_config_node (child);
877     } else if (strcasecmp(child->key, "attribute") == 0) {
878       char *key = NULL;
879       char *val = NULL;
880
881       if (child->values_num != 2) {
882         WARNING("riemann attributes need both a key and a value.");
883         return (-1);
884       }
885       if (child->values[0].type != OCONFIG_TYPE_STRING ||
886           child->values[1].type != OCONFIG_TYPE_STRING) {
887         WARNING("riemann attribute needs string arguments.");
888         return (-1);
889       }
890       if ((key = strdup(child->values[0].value.string)) == NULL) {
891         WARNING("cannot allocate memory for attribute key.");
892         return (-1);
893       }
894       if ((val = strdup(child->values[1].value.string)) == NULL) {
895         WARNING("cannot allocate memory for attribute value.");
896         sfree(key);
897         return (-1);
898       }
899       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
900       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
901       DEBUG("write_riemann: got attr: %s => %s", key, val);
902       sfree(key);
903       sfree(val);
904     } else if (strcasecmp(child->key, "tag") == 0) {
905       char *tmp = NULL;
906       status = cf_util_get_string(child, &tmp);
907       if (status != 0)
908         continue;
909
910       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
911       DEBUG("write_riemann plugin: Got tag: %s", tmp);
912       sfree(tmp);
913     } else {
914       WARNING("write_riemann plugin: Ignoring unknown "
915               "configuration option \"%s\" at top level.",
916               child->key);
917     }
918   }
919   return (0);
920 } /* }}} int wrr_config */
921
922 void module_register(void)
923 {
924   plugin_register_complex_config("write_riemann", wrr_config);
925 }
926
927 /* vim: set sw=8 sts=8 ts=8 noet : */