Merge pull request #1875 from rubenk/remove-configfile-h-from-plugins
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
4  * Copyright (C) 2013       Florian octo Forster
5  * Copyright (C) 2015,2016  Gergely Nagy
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Authors:
26  *   Pierre-Yves Ritschard <pyr at spootnik.org>
27  *   Florian octo Forster <octo at collectd.org>
28  *   Gergely Nagy <algernon at madhouse-project.org>
29  */
30
31 #include "collectd.h"
32
33 #include "common.h"
34 #include "plugin.h"
35 #include "utils_cache.h"
36 #include "utils_complain.h"
37 #include "write_riemann_threshold.h"
38
39 #include <errno.h>
40 #include <riemann/riemann-client.h>
41
42 #define RIEMANN_HOST "localhost"
43 #define RIEMANN_PORT 5555
44 #define RIEMANN_TTL_FACTOR 2.0
45 #define RIEMANN_BATCH_MAX 8192
46
47 struct riemann_host {
48   c_complain_t init_complaint;
49   char *name;
50   char *event_service_prefix;
51   pthread_mutex_t lock;
52   _Bool batch_mode;
53   _Bool notifications;
54   _Bool check_thresholds;
55   _Bool store_rates;
56   _Bool always_append_ds;
57   char *node;
58   int port;
59   riemann_client_type_t client_type;
60   riemann_client_t *client;
61   double ttl_factor;
62   cdtime_t batch_init;
63   int batch_max;
64   int batch_timeout;
65   int reference_count;
66   riemann_message_t *batch_msg;
67   char *tls_ca_file;
68   char *tls_cert_file;
69   char *tls_key_file;
70   struct timeval timeout;
71 };
72
73 static char **riemann_tags;
74 static size_t riemann_tags_num;
75 static char **riemann_attrs;
76 static size_t riemann_attrs_num;
77
78 /* host->lock must be held when calling this function. */
79 static int wrr_connect(struct riemann_host *host) /* {{{ */
80 {
81   char const *node;
82   int port;
83
84   if (host->client)
85     return 0;
86
87   node = (host->node != NULL) ? host->node : RIEMANN_HOST;
88   port = (host->port) ? host->port : RIEMANN_PORT;
89
90   host->client = NULL;
91
92   host->client = riemann_client_create(
93       host->client_type, node, port, RIEMANN_CLIENT_OPTION_TLS_CA_FILE,
94       host->tls_ca_file, RIEMANN_CLIENT_OPTION_TLS_CERT_FILE,
95       host->tls_cert_file, RIEMANN_CLIENT_OPTION_TLS_KEY_FILE,
96       host->tls_key_file, RIEMANN_CLIENT_OPTION_NONE);
97   if (host->client == NULL) {
98     c_complain(LOG_ERR, &host->init_complaint,
99                "write_riemann plugin: Unable to connect to Riemann at %s:%d",
100                node, port);
101     return -1;
102   }
103   if (host->timeout.tv_sec != 0) {
104     if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
105       riemann_client_free(host->client);
106       host->client = NULL;
107       c_complain(LOG_ERR, &host->init_complaint,
108                  "write_riemann plugin: Unable to connect to Riemann at %s:%d",
109                  node, port);
110       return -1;
111     }
112   }
113
114   set_sock_opts(riemann_client_get_fd(host->client));
115
116   c_release(LOG_INFO, &host->init_complaint,
117             "write_riemann plugin: Successfully connected to %s:%d", node,
118             port);
119
120   return 0;
121 } /* }}} int wrr_connect */
122
123 /* host->lock must be held when calling this function. */
124 static int wrr_disconnect(struct riemann_host *host) /* {{{ */
125 {
126   if (!host->client)
127     return (0);
128
129   riemann_client_free(host->client);
130   host->client = NULL;
131
132   return (0);
133 } /* }}} int wrr_disconnect */
134
135 /**
136  * Function to send messages to riemann.
137  *
138  * Acquires the host lock, disconnects on errors.
139  */
140 static int wrr_send_nolock(struct riemann_host *host,
141                            riemann_message_t *msg) /* {{{ */
142 {
143   int status = 0;
144
145   status = wrr_connect(host);
146   if (status != 0) {
147     return status;
148   }
149
150   status = riemann_client_send_message(host->client, msg);
151   if (status != 0) {
152     wrr_disconnect(host);
153     return status;
154   }
155
156   /*
157    * For TCP we need to receive message acknowledgemenent.
158    */
159   if (host->client_type != RIEMANN_CLIENT_UDP) {
160     riemann_message_t *response;
161
162     response = riemann_client_recv_message(host->client);
163
164     if (response == NULL) {
165       wrr_disconnect(host);
166       return errno;
167     }
168     riemann_message_free(response);
169   }
170
171   return 0;
172 } /* }}} int wrr_send */
173
174 static int wrr_send(struct riemann_host *host, riemann_message_t *msg) {
175   int status = 0;
176
177   pthread_mutex_lock(&host->lock);
178   status = wrr_send_nolock(host, msg);
179   pthread_mutex_unlock(&host->lock);
180   return status;
181 }
182
183 static riemann_message_t *
184 wrr_notification_to_message(struct riemann_host *host, /* {{{ */
185                             notification_t const *n) {
186   riemann_message_t *msg;
187   riemann_event_t *event;
188   char service_buffer[6 * DATA_MAX_NAME_LEN];
189   char const *severity;
190
191   switch (n->severity) {
192   case NOTIF_OKAY:
193     severity = "ok";
194     break;
195   case NOTIF_WARNING:
196     severity = "warning";
197     break;
198   case NOTIF_FAILURE:
199     severity = "critical";
200     break;
201   default:
202     severity = "unknown";
203   }
204
205   format_name(service_buffer, sizeof(service_buffer),
206               /* host = */ "", n->plugin, n->plugin_instance, n->type,
207               n->type_instance);
208
209   event = riemann_event_create(
210       RIEMANN_EVENT_FIELD_HOST, n->host, RIEMANN_EVENT_FIELD_TIME,
211       (int64_t)CDTIME_T_TO_TIME_T(n->time), RIEMANN_EVENT_FIELD_TAGS,
212       "notification", NULL, RIEMANN_EVENT_FIELD_STATE, severity,
213       RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
214       RIEMANN_EVENT_FIELD_NONE);
215
216   if (n->host[0] != 0)
217     riemann_event_string_attribute_add(event, "host", n->host);
218   if (n->plugin[0] != 0)
219     riemann_event_string_attribute_add(event, "plugin", n->plugin);
220   if (n->plugin_instance[0] != 0)
221     riemann_event_string_attribute_add(event, "plugin_instance",
222                                        n->plugin_instance);
223
224   if (n->type[0] != 0)
225     riemann_event_string_attribute_add(event, "type", n->type);
226   if (n->type_instance[0] != 0)
227     riemann_event_string_attribute_add(event, "type_instance",
228                                        n->type_instance);
229
230   for (size_t i = 0; i < riemann_attrs_num; i += 2)
231     riemann_event_string_attribute_add(event, riemann_attrs[i],
232                                        riemann_attrs[i + 1]);
233
234   for (size_t i = 0; i < riemann_tags_num; i++)
235     riemann_event_tag_add(event, riemann_tags[i]);
236
237   if (n->message[0] != 0)
238     riemann_event_string_attribute_add(event, "description", n->message);
239
240   /* Pull in values from threshold and add extra attributes */
241   for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
242     if (strcasecmp("CurrentValue", meta->name) == 0 &&
243         meta->type == NM_TYPE_DOUBLE) {
244       riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
245                         (double)meta->nm_value.nm_double,
246                         RIEMANN_EVENT_FIELD_NONE);
247       continue;
248     }
249
250     if (meta->type == NM_TYPE_STRING) {
251       riemann_event_string_attribute_add(event, meta->name,
252                                          meta->nm_value.nm_string);
253       continue;
254     }
255   }
256
257   msg = riemann_message_create_with_events(event, NULL);
258   if (msg == NULL) {
259     ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
260     riemann_event_free(event);
261     return (NULL);
262   }
263
264   DEBUG("write_riemann plugin: Successfully created message for notification: "
265         "host = \"%s\", service = \"%s\", state = \"%s\"",
266         event->host, event->service, event->state);
267   return (msg);
268 } /* }}} riemann_message_t *wrr_notification_to_message */
269
270 static riemann_event_t *
271 wrr_value_to_event(struct riemann_host const *host, /* {{{ */
272                    data_set_t const *ds, value_list_t const *vl, size_t index,
273                    gauge_t const *rates, int status) {
274   riemann_event_t *event;
275   char name_buffer[5 * DATA_MAX_NAME_LEN];
276   char service_buffer[6 * DATA_MAX_NAME_LEN];
277   size_t i;
278
279   event = riemann_event_new();
280   if (event == NULL) {
281     ERROR("write_riemann plugin: riemann_event_new() failed.");
282     return (NULL);
283   }
284
285   format_name(name_buffer, sizeof(name_buffer),
286               /* host = */ "", vl->plugin, vl->plugin_instance, vl->type,
287               vl->type_instance);
288   if (host->always_append_ds || (ds->ds_num > 1)) {
289     if (host->event_service_prefix == NULL)
290       ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
291                 &name_buffer[1], ds->ds[index].name);
292     else
293       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
294                 host->event_service_prefix, &name_buffer[1],
295                 ds->ds[index].name);
296   } else {
297     if (host->event_service_prefix == NULL)
298       sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
299     else
300       ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
301                 host->event_service_prefix, &name_buffer[1]);
302   }
303
304   riemann_event_set(
305       event, RIEMANN_EVENT_FIELD_HOST, vl->host, RIEMANN_EVENT_FIELD_TIME,
306       (int64_t)CDTIME_T_TO_TIME_T(vl->time), RIEMANN_EVENT_FIELD_TTL,
307       (float)CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
308       RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, "plugin", vl->plugin, "type",
309       vl->type, "ds_name", ds->ds[index].name, NULL,
310       RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE);
311
312   if (host->check_thresholds) {
313     const char *state = NULL;
314
315     switch (status) {
316     case STATE_OKAY:
317       state = "ok";
318       break;
319     case STATE_ERROR:
320       state = "critical";
321       break;
322     case STATE_WARNING:
323       state = "warning";
324       break;
325     case STATE_MISSING:
326       state = "unknown";
327       break;
328     }
329     if (state)
330       riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
331                         RIEMANN_EVENT_FIELD_NONE);
332   }
333
334   if (vl->plugin_instance[0] != 0)
335     riemann_event_string_attribute_add(event, "plugin_instance",
336                                        vl->plugin_instance);
337   if (vl->type_instance[0] != 0)
338     riemann_event_string_attribute_add(event, "type_instance",
339                                        vl->type_instance);
340
341   if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
342     char ds_type[DATA_MAX_NAME_LEN];
343
344     ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
345               DS_TYPE_TO_STRING(ds->ds[index].type));
346     riemann_event_string_attribute_add(event, "ds_type", ds_type);
347   } else {
348     riemann_event_string_attribute_add(event, "ds_type",
349                                        DS_TYPE_TO_STRING(ds->ds[index].type));
350   }
351
352   {
353     char ds_index[DATA_MAX_NAME_LEN];
354
355     ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
356     riemann_event_string_attribute_add(event, "ds_index", ds_index);
357   }
358
359   for (i = 0; i < riemann_attrs_num; i += 2)
360     riemann_event_string_attribute_add(event, riemann_attrs[i],
361                                        riemann_attrs[i + 1]);
362
363   for (i = 0; i < riemann_tags_num; i++)
364     riemann_event_tag_add(event, riemann_tags[i]);
365
366   if (ds->ds[index].type == DS_TYPE_GAUGE) {
367     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
368                       (double)vl->values[index].gauge,
369                       RIEMANN_EVENT_FIELD_NONE);
370   } else if (rates != NULL) {
371     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, (double)rates[index],
372                       RIEMANN_EVENT_FIELD_NONE);
373   } else {
374     int64_t metric;
375
376     if (ds->ds[index].type == DS_TYPE_DERIVE)
377       metric = (int64_t)vl->values[index].derive;
378     else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
379       metric = (int64_t)vl->values[index].absolute;
380     else
381       metric = (int64_t)vl->values[index].counter;
382
383     riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_S64, (int64_t)metric,
384                       RIEMANN_EVENT_FIELD_NONE);
385   }
386
387   DEBUG("write_riemann plugin: Successfully created message for metric: "
388         "host = \"%s\", service = \"%s\"",
389         event->host, event->service);
390   return (event);
391 } /* }}} riemann_event_t *wrr_value_to_event */
392
393 static riemann_message_t *
394 wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
395                           data_set_t const *ds, value_list_t const *vl,
396                           int *statuses) {
397   riemann_message_t *msg;
398   size_t i;
399   gauge_t *rates = NULL;
400
401   /* Initialize the Msg structure. */
402   msg = riemann_message_new();
403   if (msg == NULL) {
404     ERROR("write_riemann plugin: riemann_message_new failed.");
405     return (NULL);
406   }
407
408   if (host->store_rates) {
409     rates = uc_get_rate(ds, vl);
410     if (rates == NULL) {
411       ERROR("write_riemann plugin: uc_get_rate failed.");
412       riemann_message_free(msg);
413       return (NULL);
414     }
415   }
416
417   for (i = 0; i < vl->values_len; i++) {
418     riemann_event_t *event;
419
420     event = wrr_value_to_event(host, ds, vl, (int)i, rates, statuses[i]);
421     if (event == NULL) {
422       riemann_message_free(msg);
423       sfree(rates);
424       return (NULL);
425     }
426     riemann_message_append_events(msg, event, NULL);
427   }
428
429   sfree(rates);
430   return (msg);
431 } /* }}} riemann_message_t *wrr_value_list_to_message */
432
433 /*
434  * Always call while holding host->lock !
435  */
436 static int wrr_batch_flush_nolock(cdtime_t timeout, struct riemann_host *host) {
437   cdtime_t now;
438   int status = 0;
439
440   now = cdtime();
441   if (timeout > 0) {
442     if ((host->batch_init + timeout) > now) {
443       return status;
444     }
445   }
446   wrr_send_nolock(host, host->batch_msg);
447   riemann_message_free(host->batch_msg);
448
449   host->batch_init = now;
450   host->batch_msg = NULL;
451   return status;
452 }
453
454 static int wrr_batch_flush(cdtime_t timeout,
455                            const char *identifier __attribute__((unused)),
456                            user_data_t *user_data) {
457   struct riemann_host *host;
458   int status;
459
460   if (user_data == NULL)
461     return (-EINVAL);
462
463   host = user_data->data;
464   pthread_mutex_lock(&host->lock);
465   status = wrr_batch_flush_nolock(timeout, host);
466   if (status != 0)
467     c_complain(
468         LOG_ERR, &host->init_complaint,
469         "write_riemann plugin: riemann_client_send failed with status %i",
470         status);
471   else
472     c_release(LOG_DEBUG, &host->init_complaint,
473               "write_riemann plugin: batch sent.");
474
475   pthread_mutex_unlock(&host->lock);
476   return status;
477 }
478
479 static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
480                                     data_set_t const *ds,
481                                     value_list_t const *vl, int *statuses) {
482   riemann_message_t *msg;
483   size_t len;
484   int ret;
485   cdtime_t timeout;
486
487   msg = wrr_value_list_to_message(host, ds, vl, statuses);
488   if (msg == NULL)
489     return -1;
490
491   pthread_mutex_lock(&host->lock);
492
493   if (host->batch_msg == NULL) {
494     host->batch_msg = msg;
495   } else {
496     int status;
497
498     status = riemann_message_append_events_n(host->batch_msg, msg->n_events,
499                                              msg->events);
500     msg->n_events = 0;
501     msg->events = NULL;
502
503     riemann_message_free(msg);
504
505     if (status != 0) {
506       pthread_mutex_unlock(&host->lock);
507       ERROR("write_riemann plugin: out of memory");
508       return -1;
509     }
510   }
511
512   len = riemann_message_get_packed_size(host->batch_msg);
513   ret = 0;
514   if ((host->batch_max < 0) || (((size_t)host->batch_max) <= len)) {
515     ret = wrr_batch_flush_nolock(0, host);
516   } else {
517     if (host->batch_timeout > 0) {
518       timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
519       ret = wrr_batch_flush_nolock(timeout, host);
520     }
521   }
522
523   pthread_mutex_unlock(&host->lock);
524   return ret;
525 } /* }}} riemann_message_t *wrr_batch_add_value_list */
526
527 static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
528 {
529   int status;
530   struct riemann_host *host = ud->data;
531   riemann_message_t *msg;
532
533   if (!host->notifications)
534     return 0;
535
536   /*
537    * Never batch for notifications, send them ASAP
538    */
539   msg = wrr_notification_to_message(host, n);
540   if (msg == NULL)
541     return (-1);
542
543   status = wrr_send(host, msg);
544   if (status != 0)
545     c_complain(
546         LOG_ERR, &host->init_complaint,
547         "write_riemann plugin: riemann_client_send failed with status %i",
548         status);
549   else
550     c_release(LOG_DEBUG, &host->init_complaint,
551               "write_riemann plugin: riemann_client_send succeeded");
552
553   riemann_message_free(msg);
554   return (status);
555 } /* }}} int wrr_notification */
556
557 static int wrr_write(const data_set_t *ds, /* {{{ */
558                      const value_list_t *vl, user_data_t *ud) {
559   int status = 0;
560   int statuses[vl->values_len];
561   struct riemann_host *host = ud->data;
562   riemann_message_t *msg;
563
564   if (host->check_thresholds) {
565     status = write_riemann_threshold_check(ds, vl, statuses);
566     if (status != 0)
567       return status;
568   } else {
569     memset(statuses, 0, sizeof(statuses));
570   }
571
572   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
573     wrr_batch_add_value_list(host, ds, vl, statuses);
574   } else {
575     msg = wrr_value_list_to_message(host, ds, vl, statuses);
576     if (msg == NULL)
577       return (-1);
578
579     status = wrr_send(host, msg);
580
581     riemann_message_free(msg);
582   }
583   return status;
584 } /* }}} int wrr_write */
585
586 static void wrr_free(void *p) /* {{{ */
587 {
588   struct riemann_host *host = p;
589
590   if (host == NULL)
591     return;
592
593   pthread_mutex_lock(&host->lock);
594
595   host->reference_count--;
596   if (host->reference_count > 0) {
597     pthread_mutex_unlock(&host->lock);
598     return;
599   }
600
601   wrr_disconnect(host);
602
603   pthread_mutex_destroy(&host->lock);
604   sfree(host);
605 } /* }}} void wrr_free */
606
607 static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
608 {
609   struct riemann_host *host = NULL;
610   int status = 0;
611   int i;
612   oconfig_item_t *child;
613   char callback_name[DATA_MAX_NAME_LEN];
614   user_data_t ud;
615
616   if ((host = calloc(1, sizeof(*host))) == NULL) {
617     ERROR("write_riemann plugin: calloc failed.");
618     return ENOMEM;
619   }
620   pthread_mutex_init(&host->lock, NULL);
621   C_COMPLAIN_INIT(&host->init_complaint);
622   host->reference_count = 1;
623   host->node = NULL;
624   host->port = 0;
625   host->notifications = 1;
626   host->check_thresholds = 0;
627   host->store_rates = 1;
628   host->always_append_ds = 0;
629   host->batch_mode = 1;
630   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
631   host->batch_init = cdtime();
632   host->batch_timeout = 0;
633   host->ttl_factor = RIEMANN_TTL_FACTOR;
634   host->client = NULL;
635   host->client_type = RIEMANN_CLIENT_TCP;
636   host->timeout.tv_sec = 0;
637   host->timeout.tv_usec = 0;
638
639   status = cf_util_get_string(ci, &host->name);
640   if (status != 0) {
641     WARNING("write_riemann plugin: Required host name is missing.");
642     wrr_free(host);
643     return -1;
644   }
645
646   for (i = 0; i < ci->children_num; i++) {
647     /*
648      * The code here could be simplified but makes room
649      * for easy adding of new options later on.
650      */
651     child = &ci->children[i];
652     status = 0;
653
654     if (strcasecmp("Host", child->key) == 0) {
655       status = cf_util_get_string(child, &host->node);
656       if (status != 0)
657         break;
658     } else if (strcasecmp("Notifications", child->key) == 0) {
659       status = cf_util_get_boolean(child, &host->notifications);
660       if (status != 0)
661         break;
662     } else if (strcasecmp("EventServicePrefix", child->key) == 0) {
663       status = cf_util_get_string(child, &host->event_service_prefix);
664       if (status != 0)
665         break;
666     } else if (strcasecmp("CheckThresholds", child->key) == 0) {
667       status = cf_util_get_boolean(child, &host->check_thresholds);
668       if (status != 0)
669         break;
670     } else if (strcasecmp("Batch", child->key) == 0) {
671       status = cf_util_get_boolean(child, &host->batch_mode);
672       if (status != 0)
673         break;
674     } else if (strcasecmp("BatchMaxSize", child->key) == 0) {
675       status = cf_util_get_int(child, &host->batch_max);
676       if (status != 0)
677         break;
678     } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
679       status = cf_util_get_int(child, &host->batch_timeout);
680       if (status != 0)
681         break;
682     } else if (strcasecmp("Timeout", child->key) == 0) {
683       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
684       if (status != 0)
685         break;
686     } else if (strcasecmp("Port", child->key) == 0) {
687       host->port = cf_util_get_port_number(child);
688       if (host->port == -1) {
689         ERROR("write_riemann plugin: Invalid argument "
690               "configured for the \"Port\" "
691               "option.");
692         break;
693       }
694     } else if (strcasecmp("Protocol", child->key) == 0) {
695       char tmp[16];
696       status = cf_util_get_string_buffer(child, tmp, sizeof(tmp));
697       if (status != 0) {
698         ERROR("write_riemann plugin: cf_util_get_"
699               "string_buffer failed with "
700               "status %i.",
701               status);
702         break;
703       }
704
705       if (strcasecmp("UDP", tmp) == 0)
706         host->client_type = RIEMANN_CLIENT_UDP;
707       else if (strcasecmp("TCP", tmp) == 0)
708         host->client_type = RIEMANN_CLIENT_TCP;
709       else if (strcasecmp("TLS", tmp) == 0)
710         host->client_type = RIEMANN_CLIENT_TLS;
711       else
712         WARNING("write_riemann plugin: The value "
713                 "\"%s\" is not valid for the "
714                 "\"Protocol\" option. Use "
715                 "either \"UDP\", \"TCP\" or \"TLS\".",
716                 tmp);
717     } else if (strcasecmp("TLSCAFile", child->key) == 0) {
718       status = cf_util_get_string(child, &host->tls_ca_file);
719       if (status != 0) {
720         ERROR("write_riemann plugin: cf_util_get_"
721               "string_buffer failed with "
722               "status %i.",
723               status);
724         break;
725       }
726     } else if (strcasecmp("TLSCertFile", child->key) == 0) {
727       status = cf_util_get_string(child, &host->tls_cert_file);
728       if (status != 0) {
729         ERROR("write_riemann plugin: cf_util_get_"
730               "string_buffer failed with "
731               "status %i.",
732               status);
733         break;
734       }
735     } else if (strcasecmp("TLSKeyFile", child->key) == 0) {
736       status = cf_util_get_string(child, &host->tls_key_file);
737       if (status != 0) {
738         ERROR("write_riemann plugin: cf_util_get_"
739               "string_buffer failed with "
740               "status %i.",
741               status);
742         break;
743       }
744     } else if (strcasecmp("StoreRates", child->key) == 0) {
745       status = cf_util_get_boolean(child, &host->store_rates);
746       if (status != 0)
747         break;
748     } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) {
749       status = cf_util_get_boolean(child, &host->always_append_ds);
750       if (status != 0)
751         break;
752     } else if (strcasecmp("TTLFactor", child->key) == 0) {
753       double tmp = NAN;
754       status = cf_util_get_double(child, &tmp);
755       if (status != 0)
756         break;
757       if (tmp >= 2.0) {
758         host->ttl_factor = tmp;
759       } else if (tmp >= 1.0) {
760         NOTICE("write_riemann plugin: The configured "
761                "TTLFactor is very small "
762                "(%.1f). A value of 2.0 or "
763                "greater is recommended.",
764                tmp);
765         host->ttl_factor = tmp;
766       } else if (tmp > 0.0) {
767         WARNING("write_riemann plugin: The configured "
768                 "TTLFactor is too small to be "
769                 "useful (%.1f). I'll use it "
770                 "since the user knows best, "
771                 "but under protest.",
772                 tmp);
773         host->ttl_factor = tmp;
774       } else { /* zero, negative and NAN */
775         ERROR("write_riemann plugin: The configured "
776               "TTLFactor is invalid (%.1f).",
777               tmp);
778       }
779     } else {
780       WARNING("write_riemann plugin: ignoring unknown config "
781               "option: \"%s\"",
782               child->key);
783     }
784   }
785   if (status != 0) {
786     wrr_free(host);
787     return status;
788   }
789
790   ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s",
791             host->name);
792   ud.data = host;
793   ud.free_func = wrr_free;
794
795   pthread_mutex_lock(&host->lock);
796
797   status = plugin_register_write(callback_name, wrr_write, &ud);
798
799   if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) {
800     ud.free_func = NULL;
801     plugin_register_flush(callback_name, wrr_batch_flush, &ud);
802   }
803   if (status != 0)
804     WARNING("write_riemann plugin: plugin_register_write (\"%s\") "
805             "failed with status %i.",
806             callback_name, status);
807   else /* success */
808     host->reference_count++;
809
810   status = plugin_register_notification(callback_name, wrr_notification, &ud);
811   if (status != 0)
812     WARNING("write_riemann plugin: plugin_register_notification (\"%s\") "
813             "failed with status %i.",
814             callback_name, status);
815   else /* success */
816     host->reference_count++;
817
818   if (host->reference_count <= 1) {
819     /* Both callbacks failed => free memory.
820      * We need to unlock here, because riemann_free() will lock.
821      * This is not a race condition, because we're the only one
822      * holding a reference. */
823     pthread_mutex_unlock(&host->lock);
824     wrr_free(host);
825     return (-1);
826   }
827
828   host->reference_count--;
829   pthread_mutex_unlock(&host->lock);
830
831   return status;
832 } /* }}} int wrr_config_node */
833
834 static int wrr_config(oconfig_item_t *ci) /* {{{ */
835 {
836   int i;
837   oconfig_item_t *child;
838   int status;
839
840   for (i = 0; i < ci->children_num; i++) {
841     child = &ci->children[i];
842
843     if (strcasecmp("Node", child->key) == 0) {
844       wrr_config_node(child);
845     } else if (strcasecmp(child->key, "attribute") == 0) {
846       char *key = NULL;
847       char *val = NULL;
848
849       if (child->values_num != 2) {
850         WARNING("riemann attributes need both a key and a value.");
851         return (-1);
852       }
853       if (child->values[0].type != OCONFIG_TYPE_STRING ||
854           child->values[1].type != OCONFIG_TYPE_STRING) {
855         WARNING("riemann attribute needs string arguments.");
856         return (-1);
857       }
858       if ((key = strdup(child->values[0].value.string)) == NULL) {
859         WARNING("cannot allocate memory for attribute key.");
860         return (-1);
861       }
862       if ((val = strdup(child->values[1].value.string)) == NULL) {
863         WARNING("cannot allocate memory for attribute value.");
864         sfree(key);
865         return (-1);
866       }
867       strarray_add(&riemann_attrs, &riemann_attrs_num, key);
868       strarray_add(&riemann_attrs, &riemann_attrs_num, val);
869       DEBUG("write_riemann: got attr: %s => %s", key, val);
870       sfree(key);
871       sfree(val);
872     } else if (strcasecmp(child->key, "tag") == 0) {
873       char *tmp = NULL;
874       status = cf_util_get_string(child, &tmp);
875       if (status != 0)
876         continue;
877
878       strarray_add(&riemann_tags, &riemann_tags_num, tmp);
879       DEBUG("write_riemann plugin: Got tag: %s", tmp);
880       sfree(tmp);
881     } else {
882       WARNING("write_riemann plugin: Ignoring unknown "
883               "configuration option \"%s\" at top level.",
884               child->key);
885     }
886   }
887   return (0);
888 } /* }}} int wrr_config */
889
890 void module_register(void) {
891   plugin_register_complex_config("write_riemann", wrr_config);
892 }
893
894 /* vim: set sw=8 sts=8 ts=8 noet : */