write_riemann: enable TCP keepalive on network socket
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include "collectd.h"
32
33 #include "common.h"
34 #include "configfile.h"
35 #include "plugin.h"
36 #include "utils_cache.h"
37 #include "utils_complain.h"
38 #include "write_riemann_threshold.h"
39
40 #include <errno.h>
41 #include <riemann/riemann-client.h>
42
43 #define RIEMANN_HOST "localhost"
44 #define RIEMANN_PORT 5555
45 #define RIEMANN_TTL_FACTOR 2.0
46 #define RIEMANN_BATCH_MAX 8192
47
48 struct riemann_host {
49   c_complain_t init_complaint;
50   char *name;
51   char *event_service_prefix;
52   pthread_mutex_t lock;
53   _Bool batch_mode;
54   _Bool notifications;
55   _Bool check_thresholds;
56   _Bool store_rates;
57   _Bool always_append_ds;
58   char *node;
59   int port;
60   riemann_client_type_t client_type;
61   riemann_client_t *client;
62   double ttl_factor;
63   cdtime_t batch_init;
64   int batch_max;
65   int batch_timeout;
66   int reference_count;
67   riemann_message_t *batch_msg;
68   char *tls_ca_file;
69   char *tls_cert_file;
70   char *tls_key_file;
71   struct timeval timeout;
72 };
73
74 static char **riemann_tags;
75 static size_t riemann_tags_num;
76 static char **riemann_attrs;
77 static size_t riemann_attrs_num;
78
79 /* host->lock must be held when calling this function. */
80 static int wrr_connect(struct riemann_host *host) /* {{{ */
81 {
82   char const *node;
83   int port;
84
85   if (host->client)
86     return 0;
87
88   node = (host->node != NULL) ? host->node : RIEMANN_HOST;
89   port = (host->port) ? host->port : RIEMANN_PORT;
90
91   host->client = NULL;
92
93   host->client = riemann_client_create(
94       host->client_type, node, port, RIEMANN_CLIENT_OPTION_TLS_CA_FILE,
95       host->tls_ca_file, RIEMANN_CLIENT_OPTION_TLS_CERT_FILE,
96       host->tls_cert_file, RIEMANN_CLIENT_OPTION_TLS_KEY_FILE,
97       host->tls_key_file, RIEMANN_CLIENT_OPTION_NONE);
98   if (host->client == NULL) {
99     c_complain(LOG_ERR, &host->init_complaint,
100                "write_riemann plugin: Unable to connect to Riemann at %s:%d",
101                node, port);
102     return -1;
103   }
104   if (host->timeout.tv_sec != 0) {
105     if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
106       riemann_client_free(host->client);
107       host->client = NULL;
108       c_complain(LOG_ERR, &host->init_complaint,
109                  "write_riemann plugin: Unable to connect to Riemann at %s:%d",
110                  node, port);
111       return -1;
112     }
113   }
114
115   set_sock_opts(riemann_client_get_fd(host->client));
116
117   c_release(LOG_INFO, &host->init_complaint,
118             "write_riemann plugin: Successfully connected to %s:%d", node,
119             port);
120
121   return 0;
122 } /* }}} int wrr_connect */
123
124 /* host->lock must be held when calling this function. */
125 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
126 {
127   if (!host->client)
128     return (0);
129
130   riemann_client_free(host->client);
131   host->client = NULL;
132
133   return (0);
134 } /* }}} int wrr_disconnect */
135
136 /**
137  * Function to send messages to riemann.
138  *
139  * Acquires the host lock, disconnects on errors.
140  */
141 static int wrr_send_nolock(struct riemann_host *host,
142                            riemann_message_t *msg) /* {{{ */
143 {
144   int status = 0;
145
146   status = wrr_connect(host);
147   if (status != 0) {
148     return status;
149   }
150
151   status = riemann_client_send_message(host->client, msg);
152   if (status != 0) {
153     wrr_disconnect(host);
154     return status;
155   }
156
157   /*
158    * For TCP we need to receive message acknowledgemenent.
159    */
160   if (host->client_type != RIEMANN_CLIENT_UDP) {
161     riemann_message_t *response;
162
163     response = riemann_client_recv_message(host->client);
164
165     if (response == NULL) {
166       wrr_disconnect(host);
167       return errno;
168     }
169     riemann_message_free(response);
170   }
171
172   return 0;
173 } /* }}} int wrr_send */
174
175 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) {
176   int status = 0;
177
178   pthread_mutex_lock(&host->lock);
179   status = wrr_send_nolock(host, msg);
180   pthread_mutex_unlock(&host->lock);
181   return status;
182 }
183
184 static riemann_message_t *
185 wrr_notification_to_message(struct riemann_host *host, /* {{{ */
186                             notification_t const *n) {
187   riemann_message_t *msg;
188   riemann_event_t *event;
189   char service_buffer[6 * DATA_MAX_NAME_LEN];
190   char const *severity;
191
192   switch (n->severity) {
193   case NOTIF_OKAY:
194     severity = "ok";
195     break;
196   case NOTIF_WARNING:
197     severity = "warning";
198     break;
199   case NOTIF_FAILURE:
200     severity = "critical";
201     break;
202   default:
203     severity = "unknown";
204   }
205
206   format_name(service_buffer, sizeof(service_buffer),
207               /* host = */ "", n->plugin, n->plugin_instance, n->type,
208               n->type_instance);
209
210   event = riemann_event_create(
211       RIEMANN_EVENT_FIELD_HOST, n->host, RIEMANN_EVENT_FIELD_TIME,
212       (int64_t)CDTIME_T_TO_TIME_T(n->time), RIEMANN_EVENT_FIELD_TAGS,
213       "notification", NULL, RIEMANN_EVENT_FIELD_STATE, severity,
214       RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
215       RIEMANN_EVENT_FIELD_NONE);
216
217   if (n->host[0] != 0)
218     riemann_event_string_attribute_add(event, "host", n->host);
219   if (n->plugin[0] != 0)
220     riemann_event_string_attribute_add(event, "plugin", n->plugin);
221   if (n->plugin_instance[0] != 0)
222     riemann_event_string_attribute_add(event, "plugin_instance",
223                                        n->plugin_instance);
224
225   if (n->type[0] != 0)
226     riemann_event_string_attribute_add(event, "type", n->type);
227   if (n->type_instance[0] != 0)
228     riemann_event_string_attribute_add(event, "type_instance",
229                                        n->type_instance);
230
231   for (size_t i = 0; i < riemann_attrs_num; i += 2)
232     riemann_event_string_attribute_add(event, riemann_attrs[i],
233                                        riemann_attrs[i + 1]);
234
235   for (size_t i = 0; i < riemann_tags_num; i++)
236     riemann_event_tag_add(event, riemann_tags[i]);
237
238   if (n->message[0] != 0)
239     riemann_event_string_attribute_add(event, "description", n->message);
240
241   /* Pull in values from threshold and add extra attributes */
242   for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
243     if (strcasecmp("CurrentValue", meta->name) == 0 &&
244         meta->type == NM_TYPE_DOUBLE) {
245       riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
246                         (double)meta->nm_value.nm_double,
247                         RIEMANN_EVENT_FIELD_NONE);
248       continue;
249     }
250
251     if (meta->type == NM_TYPE_STRING) {
252       riemann_event_string_attribute_add(event, meta->name,
253                                          meta->nm_value.nm_string);
254       continue;
255     }
256   }
257
258   msg = riemann_message_create_with_events(event, NULL);
259   if (msg == NULL) {
260     ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
261     riemann_event_free(event);
262     return (NULL);
263   }
264
265   DEBUG("write_riemann plugin: Successfully created message for notification: "
266         "host = \"%s\", service = \"%s\", state = \"%s\"",
267         event->host, event->service, event->state);
268   return (msg);
269 } /* }}} riemann_message_t *wrr_notification_to_message */
270
271 static riemann_event_t *
272 wrr_value_to_event(struct riemann_host const *host, /* {{{ */
273                    data_set_t const *ds, value_list_t const *vl, size_t index,
274                    gauge_t const *rates, int status) {
275   riemann_event_t *event;
276   char name_buffer[5 * DATA_MAX_NAME_LEN];
277   char service_buffer[6 * DATA_MAX_NAME_LEN];
278   size_t i;
279
280   event = riemann_event_new();
281   if (event == NULL) {
282     ERROR("write_riemann plugin: riemann_event_new() failed.");
283     return (NULL);
284   }
285
286   format_name(name_buffer, sizeof(name_buffer),
287               /* host = */ "", vl->plugin, vl->plugin_instance, vl->type,
288               vl->type_instance);
289   if (host->always_append_ds || (ds->ds_num > 1)) {
290     if (host->event_service_prefix == NULL)
291       ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
292                 &name_buffer[1], ds->ds[index].name);
293     else
294       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
295                 host->event_service_prefix, &name_buffer[1],
296                 ds->ds[index].name);
297   } else {
298     if (host->event_service_prefix == NULL)
299       sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
300     else
301       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
302                 host->event_service_prefix, &name_buffer[1]);
303   }
304
305   riemann_event_set(
306       event, RIEMANN_EVENT_FIELD_HOST, vl->host, RIEMANN_EVENT_FIELD_TIME,
307       (int64_t)CDTIME_T_TO_TIME_T(vl->time), RIEMANN_EVENT_FIELD_TTL,
308       (float)CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
309       RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, "plugin", vl->plugin, "type",
310       vl->type, "ds_name", ds->ds[index].name, NULL,
311       RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE);
312
313   if (host->check_thresholds) {
314     const char *state = NULL;
315
316     switch (status) {
317     case STATE_OKAY:
318       state = "ok";
319       break;
320     case STATE_ERROR:
321       state = "critical";
322       break;
323     case STATE_WARNING:
324       state = "warning";
325       break;
326     case STATE_MISSING:
327       state = "unknown";
328       break;
329     }
330     if (state)
331       riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
332                         RIEMANN_EVENT_FIELD_NONE);
333   }
334
335   if (vl->plugin_instance[0] != 0)
336     riemann_event_string_attribute_add(event, "plugin_instance",
337                                        vl->plugin_instance);
338   if (vl->type_instance[0] != 0)
339     riemann_event_string_attribute_add(event, "type_instance",
340                                        vl->type_instance);
341
342   if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
343     char ds_type[DATA_MAX_NAME_LEN];
344
345     ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
346               DS_TYPE_TO_STRING(ds->ds[index].type));
347     riemann_event_string_attribute_add(event, "ds_type", ds_type);
348   } else {
349     riemann_event_string_attribute_add(event, "ds_type",
350                                        DS_TYPE_TO_STRING(ds->ds[index].type));
351   }
352
353   {
354     char ds_index[DATA_MAX_NAME_LEN];
355
356     ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
357     riemann_event_string_attribute_add(event, "ds_index", ds_index);
358   }
359
360   for (i = 0; i < riemann_attrs_num; i += 2)
361     riemann_event_string_attribute_add(event, riemann_attrs[i],
362                                        riemann_attrs[i + 1]);
363
364   for (i = 0; i < riemann_tags_num; i++)
365     riemann_event_tag_add(event, riemann_tags[i]);
366
367   if (ds->ds[index].type == DS_TYPE_GAUGE) {
368     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
369                       (double)vl->values[index].gauge,
370                       RIEMANN_EVENT_FIELD_NONE);
371   } else if (rates != NULL) {
372     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, (double)rates[index],
373                       RIEMANN_EVENT_FIELD_NONE);
374   } else {
375     int64_t metric;
376
377     if (ds->ds[index].type == DS_TYPE_DERIVE)
378       metric = (int64_t)vl->values[index].derive;
379     else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
380       metric = (int64_t)vl->values[index].absolute;
381     else
382       metric = (int64_t)vl->values[index].counter;
383
384     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_S64, (int64_t)metric,
385                       RIEMANN_EVENT_FIELD_NONE);
386   }
387
388   DEBUG("write_riemann plugin: Successfully created message for metric: "
389         "host = \"%s\", service = \"%s\"",
390         event->host, event->service);
391   return (event);
392 } /* }}} riemann_event_t *wrr_value_to_event */
393
394 static riemann_message_t *
395 wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
396                           data_set_t const *ds, value_list_t const *vl,
397                           int *statuses) {
398   riemann_message_t *msg;
399   size_t i;
400   gauge_t *rates = NULL;
401
402   /* Initialize the Msg structure. */
403   msg = riemann_message_new();
404   if (msg == NULL) {
405     ERROR("write_riemann plugin: riemann_message_new failed.");
406     return (NULL);
407   }
408
409   if (host->store_rates) {
410     rates = uc_get_rate(ds, vl);
411     if (rates == NULL) {
412       ERROR("write_riemann plugin: uc_get_rate failed.");
413       riemann_message_free(msg);
414       return (NULL);
415     }
416   }
417
418   for (i = 0; i < vl->values_len; i++) {
419     riemann_event_t *event;
420
421     event = wrr_value_to_event(host, ds, vl, (int)i, rates, statuses[i]);
422     if (event == NULL) {
423       riemann_message_free(msg);
424       sfree(rates);
425       return (NULL);
426     }
427     riemann_message_append_events(msg, event, NULL);
428   }
429
430   sfree(rates);
431   return (msg);
432 } /* }}} riemann_message_t *wrr_value_list_to_message */
433
434 /*
435  * Always call while holding host->lock !
436  */
437 static int wrr_batch_flush_nolock(cdtime_t timeout, struct riemann_host *host) {
438   cdtime_t now;
439   int status = 0;
440
441   now = cdtime();
442   if (timeout > 0) {
443     if ((host->batch_init + timeout) > now) {
444       return status;
445     }
446   }
447   wrr_send_nolock(host, host->batch_msg);
448   riemann_message_free(host->batch_msg);
449
450   host->batch_init = now;
451   host->batch_msg = NULL;
452   return status;
453 }
454
455 static int wrr_batch_flush(cdtime_t timeout,
456                            const char *identifier __attribute__((unused)),
457                            user_data_t *user_data) {
458   struct riemann_host *host;
459   int status;
460
461   if (user_data == NULL)
462     return (-EINVAL);
463
464   host = user_data->data;
465   pthread_mutex_lock(&host->lock);
466   status = wrr_batch_flush_nolock(timeout, host);
467   if (status != 0)
468     c_complain(
469         LOG_ERR, &host->init_complaint,
470         "write_riemann plugin: riemann_client_send failed with status %i",
471         status);
472   else
473     c_release(LOG_DEBUG, &host->init_complaint,
474               "write_riemann plugin: batch sent.");
475
476   pthread_mutex_unlock(&host->lock);
477   return status;
478 }
479
480 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
481                                     data_set_t const *ds,
482                                     value_list_t const *vl, int *statuses) {
483   riemann_message_t *msg;
484   size_t len;
485   int ret;
486   cdtime_t timeout;
487
488   msg = wrr_value_list_to_message(host, ds, vl, statuses);
489   if (msg == NULL)
490     return -1;
491
492   pthread_mutex_lock(&host->lock);
493
494   if (host->batch_msg == NULL) {
495     host->batch_msg = msg;
496   } else {
497     int status;
498
499     status = riemann_message_append_events_n(host->batch_msg, msg->n_events,
500                                              msg->events);
501     msg->n_events = 0;
502     msg->events = NULL;
503
504     riemann_message_free(msg);
505
506     if (status != 0) {
507       pthread_mutex_unlock(&host->lock);
508       ERROR("write_riemann plugin: out of memory");
509       return -1;
510     }
511   }
512
513   len = riemann_message_get_packed_size(host->batch_msg);
514   ret = 0;
515   if ((host->batch_max < 0) || (((size_t)host->batch_max) <= len)) {
516     ret = wrr_batch_flush_nolock(0, host);
517   } else {
518     if (host->batch_timeout > 0) {
519       timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
520       ret = wrr_batch_flush_nolock(timeout, host);
521     }
522   }
523
524   pthread_mutex_unlock(&host->lock);
525   return ret;
526 } /* }}} riemann_message_t *wrr_batch_add_value_list */
527
528 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
529 {
530   int status;
531   struct riemann_host *host = ud->data;
532   riemann_message_t *msg;
533
534   if (!host->notifications)
535     return 0;
536
537   /*
538    * Never batch for notifications, send them ASAP
539    */
540   msg = wrr_notification_to_message(host, n);
541   if (msg == NULL)
542     return (-1);
543
544   status = wrr_send(host, msg);
545   if (status != 0)
546     c_complain(
547         LOG_ERR, &host->init_complaint,
548         "write_riemann plugin: riemann_client_send failed with status %i",
549         status);
550   else
551     c_release(LOG_DEBUG, &host->init_complaint,
552               "write_riemann plugin: riemann_client_send succeeded");
553
554   riemann_message_free(msg);
555   return (status);
556 } /* }}} int wrr_notification */
557
558 static int wrr_write(const data_set_t *ds, /* {{{ */
559                      const value_list_t *vl, user_data_t *ud) {
560   int status = 0;
561   int statuses[vl->values_len];
562   struct riemann_host *host = ud->data;
563   riemann_message_t *msg;
564
565   if (host->check_thresholds) {
566     status = write_riemann_threshold_check(ds, vl, statuses);
567     if (status != 0)
568       return status;
569   } else {
570     memset(statuses, 0, sizeof(statuses));
571   }
572
573   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
574     wrr_batch_add_value_list(host, ds, vl, statuses);
575   } else {
576     msg = wrr_value_list_to_message(host, ds, vl, statuses);
577     if (msg == NULL)
578       return (-1);
579
580     status = wrr_send(host, msg);
581
582     riemann_message_free(msg);
583   }
584   return status;
585 } /* }}} int wrr_write */
586
587 static void wrr_free(void *p) /* {{{ */
588 {
589   struct riemann_host *host = p;
590
591   if (host == NULL)
592     return;
593
594   pthread_mutex_lock(&host->lock);
595
596   host->reference_count--;
597   if (host->reference_count > 0) {
598     pthread_mutex_unlock(&host->lock);
599     return;
600   }
601
602   wrr_disconnect(host);
603
604   pthread_mutex_destroy(&host->lock);
605   sfree(host);
606 } /* }}} void wrr_free */
607
608 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
609 {
610   struct riemann_host *host = NULL;
611   int status = 0;
612   int i;
613   oconfig_item_t *child;
614   char callback_name[DATA_MAX_NAME_LEN];
615   user_data_t ud;
616
617   if ((host = calloc(1, sizeof(*host))) == NULL) {
618     ERROR("write_riemann plugin: calloc failed.");
619     return ENOMEM;
620   }
621   pthread_mutex_init(&host->lock, NULL);
622   C_COMPLAIN_INIT(&host->init_complaint);
623   host->reference_count = 1;
624   host->node = NULL;
625   host->port = 0;
626   host->notifications = 1;
627   host->check_thresholds = 0;
628   host->store_rates = 1;
629   host->always_append_ds = 0;
630   host->batch_mode = 1;
631   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
632   host->batch_init = cdtime();
633   host->batch_timeout = 0;
634   host->ttl_factor = RIEMANN_TTL_FACTOR;
635   host->client = NULL;
636   host->client_type = RIEMANN_CLIENT_TCP;
637   host->timeout.tv_sec = 0;
638   host->timeout.tv_usec = 0;
639
640   status = cf_util_get_string(ci, &host->name);
641   if (status != 0) {
642     WARNING("write_riemann plugin: Required host name is missing.");
643     wrr_free(host);
644     return -1;
645   }
646
647   for (i = 0; i < ci->children_num; i++) {
648     /*
649      * The code here could be simplified but makes room
650      * for easy adding of new options later on.
651      */
652     child = &ci->children[i];
653     status = 0;
654
655     if (strcasecmp("Host", child->key) == 0) {
656       status = cf_util_get_string(child, &host->node);
657       if (status != 0)
658         break;
659     } else if (strcasecmp("Notifications", child->key) == 0) {
660       status = cf_util_get_boolean(child, &host->notifications);
661       if (status != 0)
662         break;
663     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
664       status = cf_util_get_string(child, &host->event_service_prefix);
665       if (status != 0)
666         break;
667     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
668       status = cf_util_get_boolean(child, &host->check_thresholds);
669       if (status != 0)
670         break;
671     } else if (strcasecmp("Batch", child->key) == 0) {
672       status = cf_util_get_boolean(child, &host->batch_mode);
673       if (status != 0)
674         break;
675     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
676       status = cf_util_get_int(child, &host->batch_max);
677       if (status != 0)
678         break;
679     } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
680       status = cf_util_get_int(child, &host->batch_timeout);
681       if (status != 0)
682         break;
683     } else if (strcasecmp("Timeout", child->key) == 0) {
684       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
685       if (status != 0)
686         break;
687     } else if (strcasecmp("Port", child->key) == 0) {
688       host->port = cf_util_get_port_number(child);
689       if (host->port == -1) {
690         ERROR("write_riemann plugin: Invalid argument "
691               "configured for the \"Port\" "
692               "option.");
693         break;
694       }
695     } else if (strcasecmp("Protocol", child->key) == 0) {
696       char tmp[16];
697       status = cf_util_get_string_buffer(child, tmp, sizeof(tmp));
698       if (status != 0) {
699         ERROR("write_riemann plugin: cf_util_get_"
700               "string_buffer failed with "
701               "status %i.",
702               status);
703         break;
704       }
705
706       if (strcasecmp("UDP", tmp) == 0)
707         host->client_type = RIEMANN_CLIENT_UDP;
708       else if (strcasecmp("TCP", tmp) == 0)
709         host->client_type = RIEMANN_CLIENT_TCP;
710       else if (strcasecmp("TLS", tmp) == 0)
711         host->client_type = RIEMANN_CLIENT_TLS;
712       else
713         WARNING("write_riemann plugin: The value "
714                 "\"%s\" is not valid for the "
715                 "\"Protocol\" option. Use "
716                 "either \"UDP\", \"TCP\" or \"TLS\".",
717                 tmp);
718     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
719       status = cf_util_get_string(child, &host->tls_ca_file);
720       if (status != 0) {
721         ERROR("write_riemann plugin: cf_util_get_"
722               "string_buffer failed with "
723               "status %i.",
724               status);
725         break;
726       }
727     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
728       status = cf_util_get_string(child, &host->tls_cert_file);
729       if (status != 0) {
730         ERROR("write_riemann plugin: cf_util_get_"
731               "string_buffer failed with "
732               "status %i.",
733               status);
734         break;
735       }
736     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
737       status = cf_util_get_string(child, &host->tls_key_file);
738       if (status != 0) {
739         ERROR("write_riemann plugin: cf_util_get_"
740               "string_buffer failed with "
741               "status %i.",
742               status);
743         break;
744       }
745     } else if (strcasecmp("StoreRates", child->key) == 0) {
746       status = cf_util_get_boolean(child, &host->store_rates);
747       if (status != 0)
748         break;
749     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
750       status = cf_util_get_boolean(child, &host->always_append_ds);
751       if (status != 0)
752         break;
753     } else if (strcasecmp("TTLFactor", child->key) == 0) {
754       double tmp = NAN;
755       status = cf_util_get_double(child, &tmp);
756       if (status != 0)
757         break;
758       if (tmp >= 2.0) {
759         host->ttl_factor = tmp;
760       } else if (tmp >= 1.0) {
761         NOTICE("write_riemann plugin: The configured "
762                "TTLFactor is very small "
763                "(%.1f). A value of 2.0 or "
764                "greater is recommended.",
765                tmp);
766         host->ttl_factor = tmp;
767       } else if (tmp > 0.0) {
768         WARNING("write_riemann plugin: The configured "
769                 "TTLFactor is too small to be "
770                 "useful (%.1f). I'll use it "
771                 "since the user knows best, "
772                 "but under protest.",
773                 tmp);
774         host->ttl_factor = tmp;
775       } else { /* zero, negative and NAN */
776         ERROR("write_riemann plugin: The configured "
777               "TTLFactor is invalid (%.1f).",
778               tmp);
779       }
780     } else {
781       WARNING("write_riemann plugin: ignoring unknown config "
782               "option: \"%s\"",
783               child->key);
784     }
785   }
786   if (status != 0) {
787     wrr_free(host);
788     return status;
789   }
790
791   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
792             host->name);
793   ud.data = host;
794   ud.free_func = wrr_free;
795
796   pthread_mutex_lock(&host->lock);
797
798   status = plugin_register_write(callback_name, wrr_write, &ud);
799
800   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
801     ud.free_func = NULL;
802     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
803   }
804   if (status != 0)
805     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
806             "failed with status %i.",
807             callback_name, status);
808   else /* success */
809     host->reference_count++;
810
811   status = plugin_register_notification(callback_name, wrr_notification, &ud);
812   if (status != 0)
813     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
814             "failed with status %i.",
815             callback_name, status);
816   else /* success */
817     host->reference_count++;
818
819   if (host->reference_count <= 1) {
820     /* Both callbacks failed => free memory.
821      * We need to unlock here, because riemann_free() will lock.
822      * This is not a race condition, because we're the only one
823      * holding a reference. */
824     pthread_mutex_unlock(&host->lock);
825     wrr_free(host);
826     return (-1);
827   }
828
829   host->reference_count--;
830   pthread_mutex_unlock(&host->lock);
831
832   return status;
833 } /* }}} int wrr_config_node */
834
835 static int wrr_config(oconfig_item_t *ci) /* {{{ */
836 {
837   int i;
838   oconfig_item_t *child;
839   int status;
840
841   for (i = 0; i < ci->children_num; i++) {
842     child = &ci->children[i];
843
844     if (strcasecmp("Node", child->key) == 0) {
845       wrr_config_node(child);
846     } else if (strcasecmp(child->key, "attribute") == 0) {
847       char *key = NULL;
848       char *val = NULL;
849
850       if (child->values_num != 2) {
851         WARNING("riemann attributes need both a key and a value.");
852         return (-1);
853       }
854       if (child->values[0].type != OCONFIG_TYPE_STRING ||
855           child->values[1].type != OCONFIG_TYPE_STRING) {
856         WARNING("riemann attribute needs string arguments.");
857         return (-1);
858       }
859       if ((key = strdup(child->values[0].value.string)) == NULL) {
860         WARNING("cannot allocate memory for attribute key.");
861         return (-1);
862       }
863       if ((val = strdup(child->values[1].value.string)) == NULL) {
864         WARNING("cannot allocate memory for attribute value.");
865         sfree(key);
866         return (-1);
867       }
868       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
869       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
870       DEBUG("write_riemann: got attr: %s => %s", key, val);
871       sfree(key);
872       sfree(val);
873     } else if (strcasecmp(child->key, "tag") == 0) {
874       char *tmp = NULL;
875       status = cf_util_get_string(child, &tmp);
876       if (status != 0)
877         continue;
878
879       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
880       DEBUG("write_riemann plugin: Got tag: %s", tmp);
881       sfree(tmp);
882     } else {
883       WARNING("write_riemann plugin: Ignoring unknown "
884               "configuration option \"%s\" at top level.",
885               child->key);
886     }
887   }
888   return (0);
889 } /* }}} int wrr_config */
890
891 void module_register(void) {
892   plugin_register_complex_config("write_riemann", wrr_config);
893 }
894
895 /* vim: set sw=8 sts=8 ts=8 noet : */