support older versions of riemann-c-client
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include "collectd.h"
32
33 #include "common.h"
34 #include "configfile.h"
35 #include "plugin.h"
36 #include "utils_cache.h"
37 #include "utils_complain.h"
38 #include "write_riemann_threshold.h"
39
40 #include <errno.h>
41 #include <riemann/riemann-client.h>
42
43 #define RIEMANN_HOST "localhost"
44 #define RIEMANN_PORT 5555
45 #define RIEMANN_TTL_FACTOR 2.0
46 #define RIEMANN_BATCH_MAX 8192
47
48 struct riemann_host {
49   c_complain_t init_complaint;
50   char *name;
51   char *event_service_prefix;
52   pthread_mutex_t lock;
53   _Bool batch_mode;
54   _Bool notifications;
55   _Bool check_thresholds;
56   _Bool store_rates;
57   _Bool always_append_ds;
58   char *node;
59   int port;
60   riemann_client_type_t client_type;
61   riemann_client_t *client;
62   double ttl_factor;
63   cdtime_t batch_init;
64   int batch_max;
65   int batch_timeout;
66   int reference_count;
67   riemann_message_t *batch_msg;
68   char *tls_ca_file;
69   char *tls_cert_file;
70   char *tls_key_file;
71   struct timeval timeout;
72 };
73
74 static char **riemann_tags;
75 static size_t riemann_tags_num;
76 static char **riemann_attrs;
77 static size_t riemann_attrs_num;
78
79 /* host->lock must be held when calling this function. */
80 static int wrr_connect(struct riemann_host *host) /* {{{ */
81 {
82   char const *node;
83   int port;
84
85   if (host->client)
86     return 0;
87
88   node = (host->node != NULL) ? host->node : RIEMANN_HOST;
89   port = (host->port) ? host->port : RIEMANN_PORT;
90
91   host->client = NULL;
92
93   host->client = riemann_client_create(
94       host->client_type, node, port, RIEMANN_CLIENT_OPTION_TLS_CA_FILE,
95       host->tls_ca_file, RIEMANN_CLIENT_OPTION_TLS_CERT_FILE,
96       host->tls_cert_file, RIEMANN_CLIENT_OPTION_TLS_KEY_FILE,
97       host->tls_key_file, RIEMANN_CLIENT_OPTION_NONE);
98   if (host->client == NULL) {
99     c_complain(LOG_ERR, &host->init_complaint,
100                "write_riemann plugin: Unable to connect to Riemann at %s:%d",
101                node, port);
102     return -1;
103   }
104 #if RCC_VERSION_NUMBER >= 0x010800
105   if (host->timeout.tv_sec != 0) {
106     if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
107       riemann_client_free(host->client);
108       host->client = NULL;
109       c_complain(LOG_ERR, &host->init_complaint,
110                  "write_riemann plugin: Unable to connect to Riemann at %s:%d",
111                  node, port);
112       return -1;
113     }
114   }
115 #endif
116
117   set_sock_opts(riemann_client_get_fd(host->client));
118
119   c_release(LOG_INFO, &host->init_complaint,
120             "write_riemann plugin: Successfully connected to %s:%d", node,
121             port);
122
123   return 0;
124 } /* }}} int wrr_connect */
125
126 /* host->lock must be held when calling this function. */
127 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
128 {
129   if (!host->client)
130     return (0);
131
132   riemann_client_free(host->client);
133   host->client = NULL;
134
135   return (0);
136 } /* }}} int wrr_disconnect */
137
138 /**
139  * Function to send messages to riemann.
140  *
141  * Acquires the host lock, disconnects on errors.
142  */
143 static int wrr_send_nolock(struct riemann_host *host,
144                            riemann_message_t *msg) /* {{{ */
145 {
146   int status = 0;
147
148   status = wrr_connect(host);
149   if (status != 0) {
150     return status;
151   }
152
153   status = riemann_client_send_message(host->client, msg);
154   if (status != 0) {
155     wrr_disconnect(host);
156     return status;
157   }
158
159   /*
160    * For TCP we need to receive message acknowledgemenent.
161    */
162   if (host->client_type != RIEMANN_CLIENT_UDP) {
163     riemann_message_t *response;
164
165     response = riemann_client_recv_message(host->client);
166
167     if (response == NULL) {
168       wrr_disconnect(host);
169       return errno;
170     }
171     riemann_message_free(response);
172   }
173
174   return 0;
175 } /* }}} int wrr_send */
176
177 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) {
178   int status = 0;
179
180   pthread_mutex_lock(&host->lock);
181   status = wrr_send_nolock(host, msg);
182   pthread_mutex_unlock(&host->lock);
183   return status;
184 }
185
186 static riemann_message_t *
187 wrr_notification_to_message(struct riemann_host *host, /* {{{ */
188                             notification_t const *n) {
189   riemann_message_t *msg;
190   riemann_event_t *event;
191   char service_buffer[6 * DATA_MAX_NAME_LEN];
192   char const *severity;
193
194   switch (n->severity) {
195   case NOTIF_OKAY:
196     severity = "ok";
197     break;
198   case NOTIF_WARNING:
199     severity = "warning";
200     break;
201   case NOTIF_FAILURE:
202     severity = "critical";
203     break;
204   default:
205     severity = "unknown";
206   }
207
208   format_name(service_buffer, sizeof(service_buffer),
209               /* host = */ "", n->plugin, n->plugin_instance, n->type,
210               n->type_instance);
211
212   event = riemann_event_create(
213       RIEMANN_EVENT_FIELD_HOST, n->host, RIEMANN_EVENT_FIELD_TIME,
214       (int64_t)CDTIME_T_TO_TIME_T(n->time), RIEMANN_EVENT_FIELD_TAGS,
215       "notification", NULL, RIEMANN_EVENT_FIELD_STATE, severity,
216       RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
217       RIEMANN_EVENT_FIELD_NONE);
218
219   if (n->host[0] != 0)
220     riemann_event_string_attribute_add(event, "host", n->host);
221   if (n->plugin[0] != 0)
222     riemann_event_string_attribute_add(event, "plugin", n->plugin);
223   if (n->plugin_instance[0] != 0)
224     riemann_event_string_attribute_add(event, "plugin_instance",
225                                        n->plugin_instance);
226
227   if (n->type[0] != 0)
228     riemann_event_string_attribute_add(event, "type", n->type);
229   if (n->type_instance[0] != 0)
230     riemann_event_string_attribute_add(event, "type_instance",
231                                        n->type_instance);
232
233   for (size_t i = 0; i < riemann_attrs_num; i += 2)
234     riemann_event_string_attribute_add(event, riemann_attrs[i],
235                                        riemann_attrs[i + 1]);
236
237   for (size_t i = 0; i < riemann_tags_num; i++)
238     riemann_event_tag_add(event, riemann_tags[i]);
239
240   if (n->message[0] != 0)
241     riemann_event_string_attribute_add(event, "description", n->message);
242
243   /* Pull in values from threshold and add extra attributes */
244   for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
245     if (strcasecmp("CurrentValue", meta->name) == 0 &&
246         meta->type == NM_TYPE_DOUBLE) {
247       riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
248                         (double)meta->nm_value.nm_double,
249                         RIEMANN_EVENT_FIELD_NONE);
250       continue;
251     }
252
253     if (meta->type == NM_TYPE_STRING) {
254       riemann_event_string_attribute_add(event, meta->name,
255                                          meta->nm_value.nm_string);
256       continue;
257     }
258   }
259
260   msg = riemann_message_create_with_events(event, NULL);
261   if (msg == NULL) {
262     ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
263     riemann_event_free(event);
264     return (NULL);
265   }
266
267   DEBUG("write_riemann plugin: Successfully created message for notification: "
268         "host = \"%s\", service = \"%s\", state = \"%s\"",
269         event->host, event->service, event->state);
270   return (msg);
271 } /* }}} riemann_message_t *wrr_notification_to_message */
272
273 static riemann_event_t *
274 wrr_value_to_event(struct riemann_host const *host, /* {{{ */
275                    data_set_t const *ds, value_list_t const *vl, size_t index,
276                    gauge_t const *rates, int status) {
277   riemann_event_t *event;
278   char name_buffer[5 * DATA_MAX_NAME_LEN];
279   char service_buffer[6 * DATA_MAX_NAME_LEN];
280   size_t i;
281
282   event = riemann_event_new();
283   if (event == NULL) {
284     ERROR("write_riemann plugin: riemann_event_new() failed.");
285     return (NULL);
286   }
287
288   format_name(name_buffer, sizeof(name_buffer),
289               /* host = */ "", vl->plugin, vl->plugin_instance, vl->type,
290               vl->type_instance);
291   if (host->always_append_ds || (ds->ds_num > 1)) {
292     if (host->event_service_prefix == NULL)
293       ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
294                 &name_buffer[1], ds->ds[index].name);
295     else
296       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
297                 host->event_service_prefix, &name_buffer[1],
298                 ds->ds[index].name);
299   } else {
300     if (host->event_service_prefix == NULL)
301       sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
302     else
303       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
304                 host->event_service_prefix, &name_buffer[1]);
305   }
306
307   riemann_event_set(
308       event, RIEMANN_EVENT_FIELD_HOST, vl->host, RIEMANN_EVENT_FIELD_TIME,
309       (int64_t)CDTIME_T_TO_TIME_T(vl->time), RIEMANN_EVENT_FIELD_TTL,
310       (float)CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
311       RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, "plugin", vl->plugin, "type",
312       vl->type, "ds_name", ds->ds[index].name, NULL,
313       RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE);
314
315   if (host->check_thresholds) {
316     const char *state = NULL;
317
318     switch (status) {
319     case STATE_OKAY:
320       state = "ok";
321       break;
322     case STATE_ERROR:
323       state = "critical";
324       break;
325     case STATE_WARNING:
326       state = "warning";
327       break;
328     case STATE_MISSING:
329       state = "unknown";
330       break;
331     }
332     if (state)
333       riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
334                         RIEMANN_EVENT_FIELD_NONE);
335   }
336
337   if (vl->plugin_instance[0] != 0)
338     riemann_event_string_attribute_add(event, "plugin_instance",
339                                        vl->plugin_instance);
340   if (vl->type_instance[0] != 0)
341     riemann_event_string_attribute_add(event, "type_instance",
342                                        vl->type_instance);
343
344   if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
345     char ds_type[DATA_MAX_NAME_LEN];
346
347     ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
348               DS_TYPE_TO_STRING(ds->ds[index].type));
349     riemann_event_string_attribute_add(event, "ds_type", ds_type);
350   } else {
351     riemann_event_string_attribute_add(event, "ds_type",
352                                        DS_TYPE_TO_STRING(ds->ds[index].type));
353   }
354
355   {
356     char ds_index[DATA_MAX_NAME_LEN];
357
358     ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
359     riemann_event_string_attribute_add(event, "ds_index", ds_index);
360   }
361
362   for (i = 0; i < riemann_attrs_num; i += 2)
363     riemann_event_string_attribute_add(event, riemann_attrs[i],
364                                        riemann_attrs[i + 1]);
365
366   for (i = 0; i < riemann_tags_num; i++)
367     riemann_event_tag_add(event, riemann_tags[i]);
368
369   if (ds->ds[index].type == DS_TYPE_GAUGE) {
370     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
371                       (double)vl->values[index].gauge,
372                       RIEMANN_EVENT_FIELD_NONE);
373   } else if (rates != NULL) {
374     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, (double)rates[index],
375                       RIEMANN_EVENT_FIELD_NONE);
376   } else {
377     int64_t metric;
378
379     if (ds->ds[index].type == DS_TYPE_DERIVE)
380       metric = (int64_t)vl->values[index].derive;
381     else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
382       metric = (int64_t)vl->values[index].absolute;
383     else
384       metric = (int64_t)vl->values[index].counter;
385
386     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_S64, (int64_t)metric,
387                       RIEMANN_EVENT_FIELD_NONE);
388   }
389
390   DEBUG("write_riemann plugin: Successfully created message for metric: "
391         "host = \"%s\", service = \"%s\"",
392         event->host, event->service);
393   return (event);
394 } /* }}} riemann_event_t *wrr_value_to_event */
395
396 static riemann_message_t *
397 wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
398                           data_set_t const *ds, value_list_t const *vl,
399                           int *statuses) {
400   riemann_message_t *msg;
401   size_t i;
402   gauge_t *rates = NULL;
403
404   /* Initialize the Msg structure. */
405   msg = riemann_message_new();
406   if (msg == NULL) {
407     ERROR("write_riemann plugin: riemann_message_new failed.");
408     return (NULL);
409   }
410
411   if (host->store_rates) {
412     rates = uc_get_rate(ds, vl);
413     if (rates == NULL) {
414       ERROR("write_riemann plugin: uc_get_rate failed.");
415       riemann_message_free(msg);
416       return (NULL);
417     }
418   }
419
420   for (i = 0; i < vl->values_len; i++) {
421     riemann_event_t *event;
422
423     event = wrr_value_to_event(host, ds, vl, (int)i, rates, statuses[i]);
424     if (event == NULL) {
425       riemann_message_free(msg);
426       sfree(rates);
427       return (NULL);
428     }
429     riemann_message_append_events(msg, event, NULL);
430   }
431
432   sfree(rates);
433   return (msg);
434 } /* }}} riemann_message_t *wrr_value_list_to_message */
435
436 /*
437  * Always call while holding host->lock !
438  */
439 static int wrr_batch_flush_nolock(cdtime_t timeout, struct riemann_host *host) {
440   cdtime_t now;
441   int status = 0;
442
443   now = cdtime();
444   if (timeout > 0) {
445     if ((host->batch_init + timeout) > now) {
446       return status;
447     }
448   }
449   wrr_send_nolock(host, host->batch_msg);
450   riemann_message_free(host->batch_msg);
451
452   host->batch_init = now;
453   host->batch_msg = NULL;
454   return status;
455 }
456
457 static int wrr_batch_flush(cdtime_t timeout,
458                            const char *identifier __attribute__((unused)),
459                            user_data_t *user_data) {
460   struct riemann_host *host;
461   int status;
462
463   if (user_data == NULL)
464     return (-EINVAL);
465
466   host = user_data->data;
467   pthread_mutex_lock(&host->lock);
468   status = wrr_batch_flush_nolock(timeout, host);
469   if (status != 0)
470     c_complain(
471         LOG_ERR, &host->init_complaint,
472         "write_riemann plugin: riemann_client_send failed with status %i",
473         status);
474   else
475     c_release(LOG_DEBUG, &host->init_complaint,
476               "write_riemann plugin: batch sent.");
477
478   pthread_mutex_unlock(&host->lock);
479   return status;
480 }
481
482 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
483                                     data_set_t const *ds,
484                                     value_list_t const *vl, int *statuses) {
485   riemann_message_t *msg;
486   size_t len;
487   int ret;
488   cdtime_t timeout;
489
490   msg = wrr_value_list_to_message(host, ds, vl, statuses);
491   if (msg == NULL)
492     return -1;
493
494   pthread_mutex_lock(&host->lock);
495
496   if (host->batch_msg == NULL) {
497     host->batch_msg = msg;
498   } else {
499     int status;
500
501     status = riemann_message_append_events_n(host->batch_msg, msg->n_events,
502                                              msg->events);
503     msg->n_events = 0;
504     msg->events = NULL;
505
506     riemann_message_free(msg);
507
508     if (status != 0) {
509       pthread_mutex_unlock(&host->lock);
510       ERROR("write_riemann plugin: out of memory");
511       return -1;
512     }
513   }
514
515   len = riemann_message_get_packed_size(host->batch_msg);
516   ret = 0;
517   if ((host->batch_max < 0) || (((size_t)host->batch_max) <= len)) {
518     ret = wrr_batch_flush_nolock(0, host);
519   } else {
520     if (host->batch_timeout > 0) {
521       timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
522       ret = wrr_batch_flush_nolock(timeout, host);
523     }
524   }
525
526   pthread_mutex_unlock(&host->lock);
527   return ret;
528 } /* }}} riemann_message_t *wrr_batch_add_value_list */
529
530 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
531 {
532   int status;
533   struct riemann_host *host = ud->data;
534   riemann_message_t *msg;
535
536   if (!host->notifications)
537     return 0;
538
539   /*
540    * Never batch for notifications, send them ASAP
541    */
542   msg = wrr_notification_to_message(host, n);
543   if (msg == NULL)
544     return (-1);
545
546   status = wrr_send(host, msg);
547   if (status != 0)
548     c_complain(
549         LOG_ERR, &host->init_complaint,
550         "write_riemann plugin: riemann_client_send failed with status %i",
551         status);
552   else
553     c_release(LOG_DEBUG, &host->init_complaint,
554               "write_riemann plugin: riemann_client_send succeeded");
555
556   riemann_message_free(msg);
557   return (status);
558 } /* }}} int wrr_notification */
559
560 static int wrr_write(const data_set_t *ds, /* {{{ */
561                      const value_list_t *vl, user_data_t *ud) {
562   int status = 0;
563   int statuses[vl->values_len];
564   struct riemann_host *host = ud->data;
565   riemann_message_t *msg;
566
567   if (host->check_thresholds) {
568     status = write_riemann_threshold_check(ds, vl, statuses);
569     if (status != 0)
570       return status;
571   } else {
572     memset(statuses, 0, sizeof(statuses));
573   }
574
575   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
576     wrr_batch_add_value_list(host, ds, vl, statuses);
577   } else {
578     msg = wrr_value_list_to_message(host, ds, vl, statuses);
579     if (msg == NULL)
580       return (-1);
581
582     status = wrr_send(host, msg);
583
584     riemann_message_free(msg);
585   }
586   return status;
587 } /* }}} int wrr_write */
588
589 static void wrr_free(void *p) /* {{{ */
590 {
591   struct riemann_host *host = p;
592
593   if (host == NULL)
594     return;
595
596   pthread_mutex_lock(&host->lock);
597
598   host->reference_count--;
599   if (host->reference_count > 0) {
600     pthread_mutex_unlock(&host->lock);
601     return;
602   }
603
604   wrr_disconnect(host);
605
606   pthread_mutex_destroy(&host->lock);
607   sfree(host);
608 } /* }}} void wrr_free */
609
610 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
611 {
612   struct riemann_host *host = NULL;
613   int status = 0;
614   int i;
615   oconfig_item_t *child;
616   char callback_name[DATA_MAX_NAME_LEN];
617   user_data_t ud;
618
619   if ((host = calloc(1, sizeof(*host))) == NULL) {
620     ERROR("write_riemann plugin: calloc failed.");
621     return ENOMEM;
622   }
623   pthread_mutex_init(&host->lock, NULL);
624   C_COMPLAIN_INIT(&host->init_complaint);
625   host->reference_count = 1;
626   host->node = NULL;
627   host->port = 0;
628   host->notifications = 1;
629   host->check_thresholds = 0;
630   host->store_rates = 1;
631   host->always_append_ds = 0;
632   host->batch_mode = 1;
633   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
634   host->batch_init = cdtime();
635   host->batch_timeout = 0;
636   host->ttl_factor = RIEMANN_TTL_FACTOR;
637   host->client = NULL;
638   host->client_type = RIEMANN_CLIENT_TCP;
639   host->timeout.tv_sec = 0;
640   host->timeout.tv_usec = 0;
641
642   status = cf_util_get_string(ci, &host->name);
643   if (status != 0) {
644     WARNING("write_riemann plugin: Required host name is missing.");
645     wrr_free(host);
646     return -1;
647   }
648
649   for (i = 0; i < ci->children_num; i++) {
650     /*
651      * The code here could be simplified but makes room
652      * for easy adding of new options later on.
653      */
654     child = &ci->children[i];
655     status = 0;
656
657     if (strcasecmp("Host", child->key) == 0) {
658       status = cf_util_get_string(child, &host->node);
659       if (status != 0)
660         break;
661     } else if (strcasecmp("Notifications", child->key) == 0) {
662       status = cf_util_get_boolean(child, &host->notifications);
663       if (status != 0)
664         break;
665     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
666       status = cf_util_get_string(child, &host->event_service_prefix);
667       if (status != 0)
668         break;
669     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
670       status = cf_util_get_boolean(child, &host->check_thresholds);
671       if (status != 0)
672         break;
673     } else if (strcasecmp("Batch", child->key) == 0) {
674       status = cf_util_get_boolean(child, &host->batch_mode);
675       if (status != 0)
676         break;
677     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
678       status = cf_util_get_int(child, &host->batch_max);
679       if (status != 0)
680         break;
681     } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
682       status = cf_util_get_int(child, &host->batch_timeout);
683       if (status != 0)
684         break;
685     } else if (strcasecmp("Timeout", child->key) == 0) {
686 #if RCC_VERSION_NUMBER >= 0x010800
687       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
688       if (status != 0)
689         break;
690 #else
691       WARNING("write_riemann plugin: The Timeout option is not supported. Please upgrade the Riemann client to at least 1.8.0.");
692 #endif
693     } else if (strcasecmp("Port", child->key) == 0) {
694       host->port = cf_util_get_port_number(child);
695       if (host->port == -1) {
696         ERROR("write_riemann plugin: Invalid argument "
697               "configured for the \"Port\" "
698               "option.");
699         break;
700       }
701     } else if (strcasecmp("Protocol", child->key) == 0) {
702       char tmp[16];
703       status = cf_util_get_string_buffer(child, tmp, sizeof(tmp));
704       if (status != 0) {
705         ERROR("write_riemann plugin: cf_util_get_"
706               "string_buffer failed with "
707               "status %i.",
708               status);
709         break;
710       }
711
712       if (strcasecmp("UDP", tmp) == 0)
713         host->client_type = RIEMANN_CLIENT_UDP;
714       else if (strcasecmp("TCP", tmp) == 0)
715         host->client_type = RIEMANN_CLIENT_TCP;
716       else if (strcasecmp("TLS", tmp) == 0)
717         host->client_type = RIEMANN_CLIENT_TLS;
718       else
719         WARNING("write_riemann plugin: The value "
720                 "\"%s\" is not valid for the "
721                 "\"Protocol\" option. Use "
722                 "either \"UDP\", \"TCP\" or \"TLS\".",
723                 tmp);
724     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
725       status = cf_util_get_string(child, &host->tls_ca_file);
726       if (status != 0) {
727         ERROR("write_riemann plugin: cf_util_get_"
728               "string_buffer failed with "
729               "status %i.",
730               status);
731         break;
732       }
733     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
734       status = cf_util_get_string(child, &host->tls_cert_file);
735       if (status != 0) {
736         ERROR("write_riemann plugin: cf_util_get_"
737               "string_buffer failed with "
738               "status %i.",
739               status);
740         break;
741       }
742     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
743       status = cf_util_get_string(child, &host->tls_key_file);
744       if (status != 0) {
745         ERROR("write_riemann plugin: cf_util_get_"
746               "string_buffer failed with "
747               "status %i.",
748               status);
749         break;
750       }
751     } else if (strcasecmp("StoreRates", child->key) == 0) {
752       status = cf_util_get_boolean(child, &host->store_rates);
753       if (status != 0)
754         break;
755     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
756       status = cf_util_get_boolean(child, &host->always_append_ds);
757       if (status != 0)
758         break;
759     } else if (strcasecmp("TTLFactor", child->key) == 0) {
760       double tmp = NAN;
761       status = cf_util_get_double(child, &tmp);
762       if (status != 0)
763         break;
764       if (tmp >= 2.0) {
765         host->ttl_factor = tmp;
766       } else if (tmp >= 1.0) {
767         NOTICE("write_riemann plugin: The configured "
768                "TTLFactor is very small "
769                "(%.1f). A value of 2.0 or "
770                "greater is recommended.",
771                tmp);
772         host->ttl_factor = tmp;
773       } else if (tmp > 0.0) {
774         WARNING("write_riemann plugin: The configured "
775                 "TTLFactor is too small to be "
776                 "useful (%.1f). I'll use it "
777                 "since the user knows best, "
778                 "but under protest.",
779                 tmp);
780         host->ttl_factor = tmp;
781       } else { /* zero, negative and NAN */
782         ERROR("write_riemann plugin: The configured "
783               "TTLFactor is invalid (%.1f).",
784               tmp);
785       }
786     } else {
787       WARNING("write_riemann plugin: ignoring unknown config "
788               "option: \"%s\"",
789               child->key);
790     }
791   }
792   if (status != 0) {
793     wrr_free(host);
794     return status;
795   }
796
797   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
798             host->name);
799   ud.data = host;
800   ud.free_func = wrr_free;
801
802   pthread_mutex_lock(&host->lock);
803
804   status = plugin_register_write(callback_name, wrr_write, &ud);
805
806   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
807     ud.free_func = NULL;
808     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
809   }
810   if (status != 0)
811     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
812             "failed with status %i.",
813             callback_name, status);
814   else /* success */
815     host->reference_count++;
816
817   status = plugin_register_notification(callback_name, wrr_notification, &ud);
818   if (status != 0)
819     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
820             "failed with status %i.",
821             callback_name, status);
822   else /* success */
823     host->reference_count++;
824
825   if (host->reference_count <= 1) {
826     /* Both callbacks failed => free memory.
827      * We need to unlock here, because riemann_free() will lock.
828      * This is not a race condition, because we're the only one
829      * holding a reference. */
830     pthread_mutex_unlock(&host->lock);
831     wrr_free(host);
832     return (-1);
833   }
834
835   host->reference_count--;
836   pthread_mutex_unlock(&host->lock);
837
838   return status;
839 } /* }}} int wrr_config_node */
840
841 static int wrr_config(oconfig_item_t *ci) /* {{{ */
842 {
843   int i;
844   oconfig_item_t *child;
845   int status;
846
847   for (i = 0; i < ci->children_num; i++) {
848     child = &ci->children[i];
849
850     if (strcasecmp("Node", child->key) == 0) {
851       wrr_config_node(child);
852     } else if (strcasecmp(child->key, "attribute") == 0) {
853       char *key = NULL;
854       char *val = NULL;
855
856       if (child->values_num != 2) {
857         WARNING("riemann attributes need both a key and a value.");
858         return (-1);
859       }
860       if (child->values[0].type != OCONFIG_TYPE_STRING ||
861           child->values[1].type != OCONFIG_TYPE_STRING) {
862         WARNING("riemann attribute needs string arguments.");
863         return (-1);
864       }
865       if ((key = strdup(child->values[0].value.string)) == NULL) {
866         WARNING("cannot allocate memory for attribute key.");
867         return (-1);
868       }
869       if ((val = strdup(child->values[1].value.string)) == NULL) {
870         WARNING("cannot allocate memory for attribute value.");
871         sfree(key);
872         return (-1);
873       }
874       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
875       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
876       DEBUG("write_riemann: got attr: %s => %s", key, val);
877       sfree(key);
878       sfree(val);
879     } else if (strcasecmp(child->key, "tag") == 0) {
880       char *tmp = NULL;
881       status = cf_util_get_string(child, &tmp);
882       if (status != 0)
883         continue;
884
885       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
886       DEBUG("write_riemann plugin: Got tag: %s", tmp);
887       sfree(tmp);
888     } else {
889       WARNING("write_riemann plugin: Ignoring unknown "
890               "configuration option \"%s\" at top level.",
891               child->key);
892     }
893   }
894   return (0);
895 } /* }}} int wrr_config */
896
897 void module_register(void) {
898   plugin_register_complex_config("write_riemann", wrr_config);
899 }
900
901 /* vim: set sw=8 sts=8 ts=8 noet : */