Merge branch 'collectd-5.5' into collectd-5.6
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include "collectd.h"
32
33 #include "common.h"
34 #include "plugin.h"
35 #include "utils_cache.h"
36 #include "utils_complain.h"
37 #include "write_riemann_threshold.h"
38
39 #include <errno.h>
40 #include <riemann/riemann-client.h>
41
42 #define RIEMANN_HOST "localhost"
43 #define RIEMANN_PORT 5555
44 #define RIEMANN_TTL_FACTOR 2.0
45 #define RIEMANN_BATCH_MAX 8192
46
47 struct riemann_host {
48   c_complain_t init_complaint;
49   char *name;
50   char *event_service_prefix;
51   pthread_mutex_t lock;
52   _Bool batch_mode;
53   _Bool notifications;
54   _Bool check_thresholds;
55   _Bool store_rates;
56   _Bool always_append_ds;
57   char *node;
58   int port;
59   riemann_client_type_t client_type;
60   riemann_client_t *client;
61   double ttl_factor;
62   cdtime_t batch_init;
63   int batch_max;
64   int batch_timeout;
65   int reference_count;
66   riemann_message_t *batch_msg;
67   char *tls_ca_file;
68   char *tls_cert_file;
69   char *tls_key_file;
70   struct timeval timeout;
71 };
72
73 static char **riemann_tags;
74 static size_t riemann_tags_num;
75 static char **riemann_attrs;
76 static size_t riemann_attrs_num;
77
78 /* host->lock must be held when calling this function. */
79 static int wrr_connect(struct riemann_host *host) /* {{{ */
80 {
81   char const *node;
82   int port;
83
84   if (host->client)
85     return 0;
86
87   node = (host->node != NULL) ? host->node : RIEMANN_HOST;
88   port = (host->port) ? host->port : RIEMANN_PORT;
89
90   host->client = NULL;
91
92   host->client = riemann_client_create(
93       host->client_type, node, port, RIEMANN_CLIENT_OPTION_TLS_CA_FILE,
94       host->tls_ca_file, RIEMANN_CLIENT_OPTION_TLS_CERT_FILE,
95       host->tls_cert_file, RIEMANN_CLIENT_OPTION_TLS_KEY_FILE,
96       host->tls_key_file, RIEMANN_CLIENT_OPTION_NONE);
97   if (host->client == NULL) {
98     c_complain(LOG_ERR, &host->init_complaint,
99                "write_riemann plugin: Unable to connect to Riemann at %s:%d",
100                node, port);
101     return -1;
102   }
103 #if RCC_VERSION_NUMBER >= 0x010800
104   if (host->timeout.tv_sec != 0) {
105     if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
106       riemann_client_free(host->client);
107       host->client = NULL;
108       c_complain(LOG_ERR, &host->init_complaint,
109                  "write_riemann plugin: Unable to connect to Riemann at %s:%d",
110                  node, port);
111       return -1;
112     }
113   }
114 #endif
115
116   set_sock_opts(riemann_client_get_fd(host->client));
117
118   c_release(LOG_INFO, &host->init_complaint,
119             "write_riemann plugin: Successfully connected to %s:%d", node,
120             port);
121
122   return 0;
123 } /* }}} int wrr_connect */
124
125 /* host->lock must be held when calling this function. */
126 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
127 {
128   if (!host->client)
129     return (0);
130
131   riemann_client_free(host->client);
132   host->client = NULL;
133
134   return (0);
135 } /* }}} int wrr_disconnect */
136
137 /**
138  * Function to send messages to riemann.
139  *
140  * Acquires the host lock, disconnects on errors.
141  */
142 static int wrr_send_nolock(struct riemann_host *host,
143                            riemann_message_t *msg) /* {{{ */
144 {
145   int status = 0;
146
147   status = wrr_connect(host);
148   if (status != 0) {
149     return status;
150   }
151
152   status = riemann_client_send_message(host->client, msg);
153   if (status != 0) {
154     wrr_disconnect(host);
155     return status;
156   }
157
158   /*
159    * For TCP we need to receive message acknowledgemenent.
160    */
161   if (host->client_type != RIEMANN_CLIENT_UDP) {
162     riemann_message_t *response;
163
164     response = riemann_client_recv_message(host->client);
165
166     if (response == NULL) {
167       wrr_disconnect(host);
168       return errno;
169     }
170     riemann_message_free(response);
171   }
172
173   return 0;
174 } /* }}} int wrr_send */
175
176 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) {
177   int status = 0;
178
179   pthread_mutex_lock(&host->lock);
180   status = wrr_send_nolock(host, msg);
181   pthread_mutex_unlock(&host->lock);
182   return status;
183 }
184
185 static riemann_message_t *
186 wrr_notification_to_message(struct riemann_host *host, /* {{{ */
187                             notification_t const *n) {
188   riemann_message_t *msg;
189   riemann_event_t *event;
190   char service_buffer[6 * DATA_MAX_NAME_LEN];
191   char const *severity;
192
193   switch (n->severity) {
194   case NOTIF_OKAY:
195     severity = "ok";
196     break;
197   case NOTIF_WARNING:
198     severity = "warning";
199     break;
200   case NOTIF_FAILURE:
201     severity = "critical";
202     break;
203   default:
204     severity = "unknown";
205   }
206
207   format_name(service_buffer, sizeof(service_buffer),
208               /* host = */ "", n->plugin, n->plugin_instance, n->type,
209               n->type_instance);
210
211   event = riemann_event_create(
212       RIEMANN_EVENT_FIELD_HOST, n->host, RIEMANN_EVENT_FIELD_TIME,
213       (int64_t)CDTIME_T_TO_TIME_T(n->time), RIEMANN_EVENT_FIELD_TAGS,
214       "notification", NULL, RIEMANN_EVENT_FIELD_STATE, severity,
215       RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
216       RIEMANN_EVENT_FIELD_NONE);
217
218   if (n->host[0] != 0)
219     riemann_event_string_attribute_add(event, "host", n->host);
220   if (n->plugin[0] != 0)
221     riemann_event_string_attribute_add(event, "plugin", n->plugin);
222   if (n->plugin_instance[0] != 0)
223     riemann_event_string_attribute_add(event, "plugin_instance",
224                                        n->plugin_instance);
225
226   if (n->type[0] != 0)
227     riemann_event_string_attribute_add(event, "type", n->type);
228   if (n->type_instance[0] != 0)
229     riemann_event_string_attribute_add(event, "type_instance",
230                                        n->type_instance);
231
232   for (size_t i = 0; i < riemann_attrs_num; i += 2)
233     riemann_event_string_attribute_add(event, riemann_attrs[i],
234                                        riemann_attrs[i + 1]);
235
236   for (size_t i = 0; i < riemann_tags_num; i++)
237     riemann_event_tag_add(event, riemann_tags[i]);
238
239   if (n->message[0] != 0)
240     riemann_event_string_attribute_add(event, "description", n->message);
241
242   /* Pull in values from threshold and add extra attributes */
243   for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
244     if (strcasecmp("CurrentValue", meta->name) == 0 &&
245         meta->type == NM_TYPE_DOUBLE) {
246       riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
247                         (double)meta->nm_value.nm_double,
248                         RIEMANN_EVENT_FIELD_NONE);
249       continue;
250     }
251
252     if (meta->type == NM_TYPE_STRING) {
253       riemann_event_string_attribute_add(event, meta->name,
254                                          meta->nm_value.nm_string);
255       continue;
256     }
257   }
258
259   msg = riemann_message_create_with_events(event, NULL);
260   if (msg == NULL) {
261     ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
262     riemann_event_free(event);
263     return (NULL);
264   }
265
266   DEBUG("write_riemann plugin: Successfully created message for notification: "
267         "host = \"%s\", service = \"%s\", state = \"%s\"",
268         event->host, event->service, event->state);
269   return (msg);
270 } /* }}} riemann_message_t *wrr_notification_to_message */
271
272 static riemann_event_t *
273 wrr_value_to_event(struct riemann_host const *host, /* {{{ */
274                    data_set_t const *ds, value_list_t const *vl, size_t index,
275                    gauge_t const *rates, int status) {
276   riemann_event_t *event;
277   char name_buffer[5 * DATA_MAX_NAME_LEN];
278   char service_buffer[6 * DATA_MAX_NAME_LEN];
279   size_t i;
280
281   event = riemann_event_new();
282   if (event == NULL) {
283     ERROR("write_riemann plugin: riemann_event_new() failed.");
284     return (NULL);
285   }
286
287   format_name(name_buffer, sizeof(name_buffer),
288               /* host = */ "", vl->plugin, vl->plugin_instance, vl->type,
289               vl->type_instance);
290   if (host->always_append_ds || (ds->ds_num > 1)) {
291     if (host->event_service_prefix == NULL)
292       ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
293                 &name_buffer[1], ds->ds[index].name);
294     else
295       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
296                 host->event_service_prefix, &name_buffer[1],
297                 ds->ds[index].name);
298   } else {
299     if (host->event_service_prefix == NULL)
300       sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
301     else
302       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
303                 host->event_service_prefix, &name_buffer[1]);
304   }
305
306   riemann_event_set(
307       event, RIEMANN_EVENT_FIELD_HOST, vl->host, RIEMANN_EVENT_FIELD_TIME,
308       (int64_t)CDTIME_T_TO_TIME_T(vl->time), RIEMANN_EVENT_FIELD_TTL,
309       (float)CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
310       RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, "plugin", vl->plugin, "type",
311       vl->type, "ds_name", ds->ds[index].name, NULL,
312       RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE);
313
314   if (host->check_thresholds) {
315     const char *state = NULL;
316
317     switch (status) {
318     case STATE_OKAY:
319       state = "ok";
320       break;
321     case STATE_ERROR:
322       state = "critical";
323       break;
324     case STATE_WARNING:
325       state = "warning";
326       break;
327     case STATE_MISSING:
328       state = "unknown";
329       break;
330     }
331     if (state)
332       riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
333                         RIEMANN_EVENT_FIELD_NONE);
334   }
335
336   if (vl->plugin_instance[0] != 0)
337     riemann_event_string_attribute_add(event, "plugin_instance",
338                                        vl->plugin_instance);
339   if (vl->type_instance[0] != 0)
340     riemann_event_string_attribute_add(event, "type_instance",
341                                        vl->type_instance);
342
343   if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
344     char ds_type[DATA_MAX_NAME_LEN];
345
346     ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
347               DS_TYPE_TO_STRING(ds->ds[index].type));
348     riemann_event_string_attribute_add(event, "ds_type", ds_type);
349   } else {
350     riemann_event_string_attribute_add(event, "ds_type",
351                                        DS_TYPE_TO_STRING(ds->ds[index].type));
352   }
353
354   {
355     char ds_index[DATA_MAX_NAME_LEN];
356
357     ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
358     riemann_event_string_attribute_add(event, "ds_index", ds_index);
359   }
360
361   for (i = 0; i < riemann_attrs_num; i += 2)
362     riemann_event_string_attribute_add(event, riemann_attrs[i],
363                                        riemann_attrs[i + 1]);
364
365   for (i = 0; i < riemann_tags_num; i++)
366     riemann_event_tag_add(event, riemann_tags[i]);
367
368   if (ds->ds[index].type == DS_TYPE_GAUGE) {
369     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
370                       (double)vl->values[index].gauge,
371                       RIEMANN_EVENT_FIELD_NONE);
372   } else if (rates != NULL) {
373     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, (double)rates[index],
374                       RIEMANN_EVENT_FIELD_NONE);
375   } else {
376     int64_t metric;
377
378     if (ds->ds[index].type == DS_TYPE_DERIVE)
379       metric = (int64_t)vl->values[index].derive;
380     else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
381       metric = (int64_t)vl->values[index].absolute;
382     else
383       metric = (int64_t)vl->values[index].counter;
384
385     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_S64, (int64_t)metric,
386                       RIEMANN_EVENT_FIELD_NONE);
387   }
388
389   DEBUG("write_riemann plugin: Successfully created message for metric: "
390         "host = \"%s\", service = \"%s\"",
391         event->host, event->service);
392   return (event);
393 } /* }}} riemann_event_t *wrr_value_to_event */
394
395 static riemann_message_t *
396 wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
397                           data_set_t const *ds, value_list_t const *vl,
398                           int *statuses) {
399   riemann_message_t *msg;
400   size_t i;
401   gauge_t *rates = NULL;
402
403   /* Initialize the Msg structure. */
404   msg = riemann_message_new();
405   if (msg == NULL) {
406     ERROR("write_riemann plugin: riemann_message_new failed.");
407     return (NULL);
408   }
409
410   if (host->store_rates) {
411     rates = uc_get_rate(ds, vl);
412     if (rates == NULL) {
413       ERROR("write_riemann plugin: uc_get_rate failed.");
414       riemann_message_free(msg);
415       return (NULL);
416     }
417   }
418
419   for (i = 0; i < vl->values_len; i++) {
420     riemann_event_t *event;
421
422     event = wrr_value_to_event(host, ds, vl, (int)i, rates, statuses[i]);
423     if (event == NULL) {
424       riemann_message_free(msg);
425       sfree(rates);
426       return (NULL);
427     }
428     riemann_message_append_events(msg, event, NULL);
429   }
430
431   sfree(rates);
432   return (msg);
433 } /* }}} riemann_message_t *wrr_value_list_to_message */
434
435 /*
436  * Always call while holding host->lock !
437  */
438 static int wrr_batch_flush_nolock(cdtime_t timeout, struct riemann_host *host) {
439   cdtime_t now;
440   int status = 0;
441
442   now = cdtime();
443   if (timeout > 0) {
444     if ((host->batch_init + timeout) > now) {
445       return status;
446     }
447   }
448   wrr_send_nolock(host, host->batch_msg);
449   riemann_message_free(host->batch_msg);
450
451   host->batch_init = now;
452   host->batch_msg = NULL;
453   return status;
454 }
455
456 static int wrr_batch_flush(cdtime_t timeout,
457                            const char *identifier __attribute__((unused)),
458                            user_data_t *user_data) {
459   struct riemann_host *host;
460   int status;
461
462   if (user_data == NULL)
463     return (-EINVAL);
464
465   host = user_data->data;
466   pthread_mutex_lock(&host->lock);
467   status = wrr_batch_flush_nolock(timeout, host);
468   if (status != 0)
469     c_complain(
470         LOG_ERR, &host->init_complaint,
471         "write_riemann plugin: riemann_client_send failed with status %i",
472         status);
473   else
474     c_release(LOG_DEBUG, &host->init_complaint,
475               "write_riemann plugin: batch sent.");
476
477   pthread_mutex_unlock(&host->lock);
478   return status;
479 }
480
481 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
482                                     data_set_t const *ds,
483                                     value_list_t const *vl, int *statuses) {
484   riemann_message_t *msg;
485   size_t len;
486   int ret;
487   cdtime_t timeout;
488
489   msg = wrr_value_list_to_message(host, ds, vl, statuses);
490   if (msg == NULL)
491     return -1;
492
493   pthread_mutex_lock(&host->lock);
494
495   if (host->batch_msg == NULL) {
496     host->batch_msg = msg;
497   } else {
498     int status;
499
500     status = riemann_message_append_events_n(host->batch_msg, msg->n_events,
501                                              msg->events);
502     msg->n_events = 0;
503     msg->events = NULL;
504
505     riemann_message_free(msg);
506
507     if (status != 0) {
508       pthread_mutex_unlock(&host->lock);
509       ERROR("write_riemann plugin: out of memory");
510       return -1;
511     }
512   }
513
514   len = riemann_message_get_packed_size(host->batch_msg);
515   ret = 0;
516   if ((host->batch_max < 0) || (((size_t)host->batch_max) <= len)) {
517     ret = wrr_batch_flush_nolock(0, host);
518   } else {
519     if (host->batch_timeout > 0) {
520       timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
521       ret = wrr_batch_flush_nolock(timeout, host);
522     }
523   }
524
525   pthread_mutex_unlock(&host->lock);
526   return ret;
527 } /* }}} riemann_message_t *wrr_batch_add_value_list */
528
529 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
530 {
531   int status;
532   struct riemann_host *host = ud->data;
533   riemann_message_t *msg;
534
535   if (!host->notifications)
536     return 0;
537
538   /*
539    * Never batch for notifications, send them ASAP
540    */
541   msg = wrr_notification_to_message(host, n);
542   if (msg == NULL)
543     return (-1);
544
545   status = wrr_send(host, msg);
546   if (status != 0)
547     c_complain(
548         LOG_ERR, &host->init_complaint,
549         "write_riemann plugin: riemann_client_send failed with status %i",
550         status);
551   else
552     c_release(LOG_DEBUG, &host->init_complaint,
553               "write_riemann plugin: riemann_client_send succeeded");
554
555   riemann_message_free(msg);
556   return (status);
557 } /* }}} int wrr_notification */
558
559 static int wrr_write(const data_set_t *ds, /* {{{ */
560                      const value_list_t *vl, user_data_t *ud) {
561   int status = 0;
562   int statuses[vl->values_len];
563   struct riemann_host *host = ud->data;
564   riemann_message_t *msg;
565
566   if (host->check_thresholds) {
567     status = write_riemann_threshold_check(ds, vl, statuses);
568     if (status != 0)
569       return status;
570   } else {
571     memset(statuses, 0, sizeof(statuses));
572   }
573
574   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
575     wrr_batch_add_value_list(host, ds, vl, statuses);
576   } else {
577     msg = wrr_value_list_to_message(host, ds, vl, statuses);
578     if (msg == NULL)
579       return (-1);
580
581     status = wrr_send(host, msg);
582
583     riemann_message_free(msg);
584   }
585   return status;
586 } /* }}} int wrr_write */
587
588 static void wrr_free(void *p) /* {{{ */
589 {
590   struct riemann_host *host = p;
591
592   if (host == NULL)
593     return;
594
595   pthread_mutex_lock(&host->lock);
596
597   host->reference_count--;
598   if (host->reference_count > 0) {
599     pthread_mutex_unlock(&host->lock);
600     return;
601   }
602
603   wrr_disconnect(host);
604
605   pthread_mutex_destroy(&host->lock);
606   sfree(host);
607 } /* }}} void wrr_free */
608
609 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
610 {
611   struct riemann_host *host = NULL;
612   int status = 0;
613   int i;
614   oconfig_item_t *child;
615   char callback_name[DATA_MAX_NAME_LEN];
616
617   if ((host = calloc(1, sizeof(*host))) == NULL) {
618     ERROR("write_riemann plugin: calloc failed.");
619     return ENOMEM;
620   }
621   pthread_mutex_init(&host->lock, NULL);
622   C_COMPLAIN_INIT(&host->init_complaint);
623   host->reference_count = 1;
624   host->node = NULL;
625   host->port = 0;
626   host->notifications = 1;
627   host->check_thresholds = 0;
628   host->store_rates = 1;
629   host->always_append_ds = 0;
630   host->batch_mode = 1;
631   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
632   host->batch_init = cdtime();
633   host->batch_timeout = 0;
634   host->ttl_factor = RIEMANN_TTL_FACTOR;
635   host->client = NULL;
636   host->client_type = RIEMANN_CLIENT_TCP;
637   host->timeout.tv_sec = 0;
638   host->timeout.tv_usec = 0;
639
640   status = cf_util_get_string(ci, &host->name);
641   if (status != 0) {
642     WARNING("write_riemann plugin: Required host name is missing.");
643     wrr_free(host);
644     return -1;
645   }
646
647   for (i = 0; i < ci->children_num; i++) {
648     /*
649      * The code here could be simplified but makes room
650      * for easy adding of new options later on.
651      */
652     child = &ci->children[i];
653     status = 0;
654
655     if (strcasecmp("Host", child->key) == 0) {
656       status = cf_util_get_string(child, &host->node);
657       if (status != 0)
658         break;
659     } else if (strcasecmp("Notifications", child->key) == 0) {
660       status = cf_util_get_boolean(child, &host->notifications);
661       if (status != 0)
662         break;
663     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
664       status = cf_util_get_string(child, &host->event_service_prefix);
665       if (status != 0)
666         break;
667     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
668       status = cf_util_get_boolean(child, &host->check_thresholds);
669       if (status != 0)
670         break;
671     } else if (strcasecmp("Batch", child->key) == 0) {
672       status = cf_util_get_boolean(child, &host->batch_mode);
673       if (status != 0)
674         break;
675     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
676       status = cf_util_get_int(child, &host->batch_max);
677       if (status != 0)
678         break;
679     } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
680       status = cf_util_get_int(child, &host->batch_timeout);
681       if (status != 0)
682         break;
683     } else if (strcasecmp("Timeout", child->key) == 0) {
684 #if RCC_VERSION_NUMBER >= 0x010800
685       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
686       if (status != 0)
687         break;
688 #else
689       WARNING("write_riemann plugin: The Timeout option is not supported. Please upgrade the Riemann client to at least 1.8.0.");
690 #endif
691     } else if (strcasecmp("Port", child->key) == 0) {
692       host->port = cf_util_get_port_number(child);
693       if (host->port == -1) {
694         ERROR("write_riemann plugin: Invalid argument "
695               "configured for the \"Port\" "
696               "option.");
697         break;
698       }
699     } else if (strcasecmp("Protocol", child->key) == 0) {
700       char tmp[16];
701       status = cf_util_get_string_buffer(child, tmp, sizeof(tmp));
702       if (status != 0) {
703         ERROR("write_riemann plugin: cf_util_get_"
704               "string_buffer failed with "
705               "status %i.",
706               status);
707         break;
708       }
709
710       if (strcasecmp("UDP", tmp) == 0)
711         host->client_type = RIEMANN_CLIENT_UDP;
712       else if (strcasecmp("TCP", tmp) == 0)
713         host->client_type = RIEMANN_CLIENT_TCP;
714       else if (strcasecmp("TLS", tmp) == 0)
715         host->client_type = RIEMANN_CLIENT_TLS;
716       else
717         WARNING("write_riemann plugin: The value "
718                 "\"%s\" is not valid for the "
719                 "\"Protocol\" option. Use "
720                 "either \"UDP\", \"TCP\" or \"TLS\".",
721                 tmp);
722     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
723       status = cf_util_get_string(child, &host->tls_ca_file);
724       if (status != 0) {
725         ERROR("write_riemann plugin: cf_util_get_"
726               "string_buffer failed with "
727               "status %i.",
728               status);
729         break;
730       }
731     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
732       status = cf_util_get_string(child, &host->tls_cert_file);
733       if (status != 0) {
734         ERROR("write_riemann plugin: cf_util_get_"
735               "string_buffer failed with "
736               "status %i.",
737               status);
738         break;
739       }
740     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
741       status = cf_util_get_string(child, &host->tls_key_file);
742       if (status != 0) {
743         ERROR("write_riemann plugin: cf_util_get_"
744               "string_buffer failed with "
745               "status %i.",
746               status);
747         break;
748       }
749     } else if (strcasecmp("StoreRates", child->key) == 0) {
750       status = cf_util_get_boolean(child, &host->store_rates);
751       if (status != 0)
752         break;
753     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
754       status = cf_util_get_boolean(child, &host->always_append_ds);
755       if (status != 0)
756         break;
757     } else if (strcasecmp("TTLFactor", child->key) == 0) {
758       double tmp = NAN;
759       status = cf_util_get_double(child, &tmp);
760       if (status != 0)
761         break;
762       if (tmp >= 2.0) {
763         host->ttl_factor = tmp;
764       } else if (tmp >= 1.0) {
765         NOTICE("write_riemann plugin: The configured "
766                "TTLFactor is very small "
767                "(%.1f). A value of 2.0 or "
768                "greater is recommended.",
769                tmp);
770         host->ttl_factor = tmp;
771       } else if (tmp > 0.0) {
772         WARNING("write_riemann plugin: The configured "
773                 "TTLFactor is too small to be "
774                 "useful (%.1f). I'll use it "
775                 "since the user knows best, "
776                 "but under protest.",
777                 tmp);
778         host->ttl_factor = tmp;
779       } else { /* zero, negative and NAN */
780         ERROR("write_riemann plugin: The configured "
781               "TTLFactor is invalid (%.1f).",
782               tmp);
783       }
784     } else {
785       WARNING("write_riemann plugin: ignoring unknown config "
786               "option: \"%s\"",
787               child->key);
788     }
789   }
790   if (status != 0) {
791     wrr_free(host);
792     return status;
793   }
794
795   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
796             host->name);
797
798   user_data_t ud = {
799     .data = host,
800     .free_func = wrr_free
801   };
802
803   pthread_mutex_lock(&host->lock);
804
805   status = plugin_register_write(callback_name, wrr_write, &ud);
806
807   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
808     ud.free_func = NULL;
809     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
810   }
811   if (status != 0)
812     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
813             "failed with status %i.",
814             callback_name, status);
815   else /* success */
816     host->reference_count++;
817
818   status = plugin_register_notification(callback_name, wrr_notification, &ud);
819   if (status != 0)
820     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
821             "failed with status %i.",
822             callback_name, status);
823   else /* success */
824     host->reference_count++;
825
826   if (host->reference_count <= 1) {
827     /* Both callbacks failed => free memory.
828      * We need to unlock here, because riemann_free() will lock.
829      * This is not a race condition, because we're the only one
830      * holding a reference. */
831     pthread_mutex_unlock(&host->lock);
832     wrr_free(host);
833     return (-1);
834   }
835
836   host->reference_count--;
837   pthread_mutex_unlock(&host->lock);
838
839   return status;
840 } /* }}} int wrr_config_node */
841
842 static int wrr_config(oconfig_item_t *ci) /* {{{ */
843 {
844   int i;
845   oconfig_item_t *child;
846   int status;
847
848   for (i = 0; i < ci->children_num; i++) {
849     child = &ci->children[i];
850
851     if (strcasecmp("Node", child->key) == 0) {
852       wrr_config_node(child);
853     } else if (strcasecmp(child->key, "attribute") == 0) {
854       char *key = NULL;
855       char *val = NULL;
856
857       if (child->values_num != 2) {
858         WARNING("riemann attributes need both a key and a value.");
859         return (-1);
860       }
861       if (child->values[0].type != OCONFIG_TYPE_STRING ||
862           child->values[1].type != OCONFIG_TYPE_STRING) {
863         WARNING("riemann attribute needs string arguments.");
864         return (-1);
865       }
866       if ((key = strdup(child->values[0].value.string)) == NULL) {
867         WARNING("cannot allocate memory for attribute key.");
868         return (-1);
869       }
870       if ((val = strdup(child->values[1].value.string)) == NULL) {
871         WARNING("cannot allocate memory for attribute value.");
872         sfree(key);
873         return (-1);
874       }
875       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
876       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
877       DEBUG("write_riemann: got attr: %s => %s", key, val);
878       sfree(key);
879       sfree(val);
880     } else if (strcasecmp(child->key, "tag") == 0) {
881       char *tmp = NULL;
882       status = cf_util_get_string(child, &tmp);
883       if (status != 0)
884         continue;
885
886       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
887       DEBUG("write_riemann plugin: Got tag: %s", tmp);
888       sfree(tmp);
889     } else {
890       WARNING("write_riemann plugin: Ignoring unknown "
891               "configuration option \"%s\" at top level.",
892               child->key);
893     }
894   }
895   return (0);
896 } /* }}} int wrr_config */
897
898 void module_register(void) {
899   plugin_register_complex_config("write_riemann", wrr_config);
900 }
901
902 /* vim: set sw=8 sts=8 ts=8 noet : */