utils_vl_lookup: Fixed a race when creating user objects.
[collectd.git] / src / write_riemann.c
1 /**
2  * collectd - src/write_riemann.c
3  *
4  * Copyright (C) 2012,2013  Pierre-Yves Ritschard
5  * Copyright (C) 2013       Florian octo Forster
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
16  * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
17  * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  *
19  * Authors:
20  *   Pierre-Yves Ritschard <pyr at spootnik.org>
21  *   Florian octo Forster <octo at collectd.org>
22  */
23
24 #include "collectd.h"
25 #include "plugin.h"
26 #include "common.h"
27 #include "configfile.h"
28 #include "utils_cache.h"
29 #include "riemann.pb-c.h"
30
31 #include <sys/socket.h>
32 #include <arpa/inet.h>
33 #include <errno.h>
34 #include <netdb.h>
35 #include <inttypes.h>
36 #include <pthread.h>
37
38 #define RIEMANN_HOST            "localhost"
39 #define RIEMANN_PORT            "5555"
40
41 struct riemann_host {
42         char                    *name;
43 #define F_CONNECT                0x01
44         uint8_t                  flags;
45         pthread_mutex_t          lock;
46         _Bool                    store_rates;
47         _Bool                    always_append_ds;
48         char                    *node;
49         char                    *service;
50         _Bool                    use_tcp;
51         int                      s;
52
53         int                      reference_count;
54 };
55
56 static char     **riemann_tags;
57 static size_t     riemann_tags_num;
58
59 static void riemann_event_protobuf_free (Event *event) /* {{{ */
60 {
61         size_t i;
62
63         if (event == NULL)
64                 return;
65
66         sfree (event->state);
67         sfree (event->service);
68         sfree (event->host);
69         sfree (event->description);
70
71         strarray_free (event->tags, event->n_tags);
72         event->tags = NULL;
73         event->n_tags = 0;
74
75         for (i = 0; i < event->n_attributes; i++)
76         {
77                 sfree (event->attributes[i]->key);
78                 sfree (event->attributes[i]->value);
79                 sfree (event->attributes[i]);
80         }
81         sfree (event->attributes);
82         event->n_attributes = 0;
83
84         sfree (event);
85 } /* }}} void riemann_event_protobuf_free */
86
87 static void riemann_msg_protobuf_free(Msg *msg) /* {{{ */
88 {
89         size_t i;
90
91         if (msg == NULL)
92                 return;
93
94         for (i = 0; i < msg->n_events; i++)
95         {
96                 riemann_event_protobuf_free (msg->events[i]);
97                 msg->events[i] = NULL;
98         }
99
100         sfree (msg->events);
101         msg->n_events = 0;
102
103         sfree (msg);
104 } /* }}} void riemann_msg_protobuf_free */
105
106 /* host->lock must be held when calling this function. */
107 static int riemann_connect(struct riemann_host *host) /* {{{ */
108 {
109         int                      e;
110         struct addrinfo         *ai, *res, hints;
111         char const              *node;
112         char const              *service;
113
114         if (host->flags & F_CONNECT)
115                 return 0;
116
117         memset(&hints, 0, sizeof(hints));
118         memset(&service, 0, sizeof(service));
119         hints.ai_family = AF_UNSPEC;
120         hints.ai_socktype = host->use_tcp ? SOCK_STREAM : SOCK_DGRAM;
121 #ifdef AI_ADDRCONFIG
122         hints.ai_flags |= AI_ADDRCONFIG;
123 #endif
124
125         node = (host->node != NULL) ? host->node : RIEMANN_HOST;
126         service = (host->service != NULL) ? host->service : RIEMANN_PORT;
127
128         if ((e = getaddrinfo(node, service, &hints, &res)) != 0) {
129                 ERROR ("write_riemann plugin: Unable to resolve host \"%s\": %s",
130                         node, gai_strerror(e));
131                 return -1;
132         }
133
134         host->s = -1;
135         for (ai = res; ai != NULL; ai = ai->ai_next) {
136                 if ((host->s = socket(ai->ai_family,
137                                       ai->ai_socktype,
138                                       ai->ai_protocol)) == -1) {
139                         continue;
140                 }
141
142                 if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
143                         close(host->s);
144                         host->s = -1;
145                         continue;
146                 }
147
148                 host->flags |= F_CONNECT;
149                 DEBUG("write_riemann plugin: got a succesful connection for: %s:%s",
150                                 node, service);
151                 break;
152         }
153
154         freeaddrinfo(res);
155
156         if (host->s < 0) {
157                 WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%s",
158                                 node, service);
159                 return -1;
160         }
161         return 0;
162 } /* }}} int riemann_connect */
163
164 /* host->lock must be held when calling this function. */
165 static int riemann_disconnect (struct riemann_host *host) /* {{{ */
166 {
167         if ((host->flags & F_CONNECT) == 0)
168                 return (0);
169
170         close (host->s);
171         host->s = -1;
172         host->flags &= ~F_CONNECT;
173
174         return (0);
175 } /* }}} int riemann_disconnect */
176
177 static int riemann_send_msg (struct riemann_host *host, const Msg *msg) /* {{{ */
178 {
179         int status = 0;
180         u_char *buffer = NULL;
181         size_t  buffer_len;
182
183         status = riemann_connect (host);
184         if (status != 0)
185                 return status;
186
187         buffer_len = msg__get_packed_size(msg);
188
189         if (host->use_tcp)
190                 buffer_len += 4;
191
192         buffer = malloc (buffer_len);
193         if (buffer == NULL) {
194                 ERROR ("write_riemann plugin: malloc failed.");
195                 return ENOMEM;
196         }
197         memset (buffer, 0, buffer_len);
198
199         if (host->use_tcp)
200         {
201                 uint32_t length = htonl ((uint32_t) (buffer_len - 4));
202                 memcpy (buffer, &length, 4);
203                 msg__pack(msg, buffer + 4);
204         }
205         else
206         {
207                 msg__pack(msg, buffer);
208         }
209
210         status = (int) swrite (host->s, buffer, buffer_len);
211         if (status != 0)
212         {
213                 char errbuf[1024];
214                 ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s",
215                                 (host->node != NULL) ? host->node : RIEMANN_HOST,
216                                 (host->service != NULL) ? host->service : RIEMANN_PORT,
217                                 sstrerror (errno, errbuf, sizeof (errbuf)));
218                 sfree (buffer);
219                 return -1;
220         }
221
222         sfree (buffer);
223         return 0;
224 } /* }}} int riemann_send_msg */
225
226 static int riemann_recv_ack(struct riemann_host *host) /* {{{ */
227 {
228         int status = 0;
229         Msg *msg = NULL;
230         uint32_t header;
231
232         status = (int) sread (host->s, &header, 4);
233
234         if (status != 0)
235                 return -1;
236
237         size_t size = ntohl(header);
238
239         // Buffer on the stack since acknowledges are typically small.
240         u_char buffer[size];
241         memset (buffer, 0, size);
242
243         status = (int) sread (host->s, buffer, size);
244
245         if (status != 0)
246                 return status;
247
248         msg = msg__unpack (NULL, size, buffer);
249
250         if (msg == NULL)
251                 return -1;
252
253         if (!msg->ok)
254         {
255                 ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s",
256                                 (host->node != NULL) ? host->node : RIEMANN_HOST,
257                                 (host->service != NULL) ? host->service : RIEMANN_PORT,
258                                 msg->error);
259
260                 msg__free_unpacked(msg, NULL);
261                 return -1;
262         }
263
264         msg__free_unpacked (msg, NULL);
265         return 0;
266 } /* }}} int riemann_recv_ack */
267
268 /**
269  * Function to send messages (Msg) to riemann.
270  *
271  * Acquires the host lock, disconnects on errors.
272  */
273 static int riemann_send(struct riemann_host *host, Msg const *msg) /* {{{ */
274 {
275         int status = 0;
276         pthread_mutex_lock (&host->lock);
277
278         status = riemann_send_msg(host, msg);
279         if (status != 0) {
280                 riemann_disconnect (host);
281                 pthread_mutex_unlock (&host->lock);
282                 return status;
283         }
284
285         /*
286          * For TCP we need to receive message acknowledgemenent.
287          */
288         if (host->use_tcp)
289         {
290                 status = riemann_recv_ack(host);
291
292                 if (status != 0)
293                 {
294                         riemann_disconnect (host);
295                         pthread_mutex_unlock (&host->lock);
296                         return status;
297                 }
298         }
299
300         pthread_mutex_unlock (&host->lock);
301         return 0;
302 } /* }}} int riemann_send */
303
304 static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */
305 {
306         return (strarray_add (&event->tags, &event->n_tags, tag));
307 } /* }}} int riemann_event_add_tag */
308
309 static int riemann_event_add_attribute(Event *event, /* {{{ */
310                 char const *key, char const *value)
311 {
312         Attribute **new_attributes;
313         Attribute *a;
314
315         new_attributes = realloc (event->attributes,
316                         sizeof (*event->attributes) * (event->n_attributes + 1));
317         if (new_attributes == NULL)
318         {
319                 ERROR ("write_riemann plugin: realloc failed.");
320                 return (ENOMEM);
321         }
322         event->attributes = new_attributes;
323
324         a = malloc (sizeof (*a));
325         if (a == NULL)
326         {
327                 ERROR ("write_riemann plugin: malloc failed.");
328                 return (ENOMEM);
329         }
330         attribute__init (a);
331
332         a->key = strdup (key);
333         if (value != NULL)
334                 a->value = strdup (value);
335
336         event->attributes[event->n_attributes] = a;
337         event->n_attributes++;
338
339         return (0);
340 } /* }}} int riemann_event_add_attribute */
341
342 static Msg *riemann_notification_to_protobuf(struct riemann_host *host, /* {{{ */
343                 notification_t const *n)
344 {
345         Msg *msg;
346         Event *event;
347         char service_buffer[6 * DATA_MAX_NAME_LEN];
348         char const *severity;
349         notification_meta_t *meta;
350         int i;
351
352         msg = malloc (sizeof (*msg));
353         if (msg == NULL)
354         {
355                 ERROR ("write_riemann plugin: malloc failed.");
356                 return (NULL);
357         }
358         memset (msg, 0, sizeof (*msg));
359         msg__init (msg);
360
361         msg->events = malloc (sizeof (*msg->events));
362         if (msg->events == NULL)
363         {
364                 ERROR ("write_riemann plugin: malloc failed.");
365                 sfree (msg);
366                 return (NULL);
367         }
368
369         event = malloc (sizeof (*event));
370         if (event == NULL)
371         {
372                 ERROR ("write_riemann plugin: malloc failed.");
373                 sfree (msg->events);
374                 sfree (msg);
375                 return (NULL);
376         }
377         memset (event, 0, sizeof (*event));
378         event__init (event);
379
380         msg->events[0] = event;
381         msg->n_events = 1;
382
383         event->host = strdup (n->host);
384         event->time = CDTIME_T_TO_TIME_T (n->time);
385         event->has_time = 1;
386
387         switch (n->severity)
388         {
389                 case NOTIF_OKAY:        severity = "ok"; break;
390                 case NOTIF_WARNING:     severity = "warning"; break;
391                 case NOTIF_FAILURE:     severity = "critical"; break;
392                 default:                severity = "unknown";
393         }
394         event->state = strdup (severity);
395
396         riemann_event_add_tag (event, "notification");
397         if (n->host[0] != 0)
398                 riemann_event_add_attribute (event, "host", n->host);
399         if (n->plugin[0] != 0)
400                 riemann_event_add_attribute (event, "plugin", n->plugin);
401         if (n->plugin_instance[0] != 0)
402                 riemann_event_add_attribute (event, "plugin_instance",
403                                 n->plugin_instance);
404
405         if (n->type[0] != 0)
406                 riemann_event_add_attribute (event, "type", n->type);
407         if (n->type_instance[0] != 0)
408                 riemann_event_add_attribute (event, "type_instance",
409                                 n->type_instance);
410
411         for (i = 0; i < riemann_tags_num; i++)
412                 riemann_event_add_tag (event, riemann_tags[i]);
413
414         format_name (service_buffer, sizeof (service_buffer),
415                         /* host = */ "", n->plugin, n->plugin_instance,
416                         n->type, n->type_instance);
417         event->service = strdup (&service_buffer[1]);
418
419         /* Pull in values from threshold */
420         for (meta = n->meta; meta != NULL; meta = meta->next)
421         {
422                 if (strcasecmp ("CurrentValue", meta->name) != 0)
423                         continue;
424
425                 event->metric_d = meta->nm_value.nm_double;
426                 event->has_metric_d = 1;
427                 break;
428         }
429
430         DEBUG ("write_riemann plugin: Successfully created protobuf for notification: "
431                         "host = \"%s\", service = \"%s\", state = \"%s\"",
432                         event->host, event->service, event->state);
433         return (msg);
434 } /* }}} Msg *riemann_notification_to_protobuf */
435
436 static Event *riemann_value_to_protobuf(struct riemann_host const *host, /* {{{ */
437                 data_set_t const *ds,
438                 value_list_t const *vl, size_t index,
439                 gauge_t const *rates)
440 {
441         Event *event;
442         char name_buffer[5 * DATA_MAX_NAME_LEN];
443         char service_buffer[6 * DATA_MAX_NAME_LEN];
444         int i;
445
446         event = malloc (sizeof (*event));
447         if (event == NULL)
448         {
449                 ERROR ("write_riemann plugin: malloc failed.");
450                 return (NULL);
451         }
452         memset (event, 0, sizeof (*event));
453         event__init (event);
454
455         event->host = strdup (vl->host);
456         event->time = CDTIME_T_TO_TIME_T (vl->time);
457         event->has_time = 1;
458         event->ttl = CDTIME_T_TO_TIME_T (2 * vl->interval);
459         event->has_ttl = 1;
460
461         riemann_event_add_attribute (event, "plugin", vl->plugin);
462         if (vl->plugin_instance[0] != 0)
463                 riemann_event_add_attribute (event, "plugin_instance",
464                                 vl->plugin_instance);
465
466         riemann_event_add_attribute (event, "type", vl->type);
467         if (vl->type_instance[0] != 0)
468                 riemann_event_add_attribute (event, "type_instance",
469                                 vl->type_instance);
470
471         if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
472         {
473                 char ds_type[DATA_MAX_NAME_LEN];
474
475                 ssnprintf (ds_type, sizeof (ds_type), "%s:rate",
476                                 DS_TYPE_TO_STRING(ds->ds[index].type));
477                 riemann_event_add_attribute (event, "ds_type", ds_type);
478         }
479         else
480         {
481                 riemann_event_add_attribute (event, "ds_type",
482                                 DS_TYPE_TO_STRING(ds->ds[index].type));
483         }
484         riemann_event_add_attribute (event, "ds_name", ds->ds[index].name);
485         {
486                 char ds_index[DATA_MAX_NAME_LEN];
487
488                 ssnprintf (ds_index, sizeof (ds_index), "%zu", index);
489                 riemann_event_add_attribute (event, "ds_index", ds_index);
490         }
491
492         for (i = 0; i < riemann_tags_num; i++)
493                 riemann_event_add_tag (event, riemann_tags[i]);
494
495         if (ds->ds[index].type == DS_TYPE_GAUGE)
496         {
497                 event->has_metric_d = 1;
498                 event->metric_d = (double) vl->values[index].gauge;
499         }
500         else if (rates != NULL)
501         {
502                 event->has_metric_d = 1;
503                 event->metric_d = (double) rates[index];
504         }
505         else
506         {
507                 event->has_metric_sint64 = 1;
508                 if (ds->ds[index].type == DS_TYPE_DERIVE)
509                         event->metric_sint64 = (int64_t) vl->values[index].derive;
510                 else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
511                         event->metric_sint64 = (int64_t) vl->values[index].absolute;
512                 else
513                         event->metric_sint64 = (int64_t) vl->values[index].counter;
514         }
515
516         format_name (name_buffer, sizeof (name_buffer),
517                         /* host = */ "", vl->plugin, vl->plugin_instance,
518                         vl->type, vl->type_instance);
519         if (host->always_append_ds || (ds->ds_num > 1))
520                 ssnprintf (service_buffer, sizeof (service_buffer),
521                                 "%s/%s", &name_buffer[1], ds->ds[index].name);
522         else
523                 sstrncpy (service_buffer, &name_buffer[1],
524                                 sizeof (service_buffer));
525
526         event->service = strdup (service_buffer);
527
528         DEBUG ("write_riemann plugin: Successfully created protobuf for metric: "
529                         "host = \"%s\", service = \"%s\"",
530                         event->host, event->service);
531         return (event);
532 } /* }}} Event *riemann_value_to_protobuf */
533
534 static Msg *riemann_value_list_to_protobuf(struct riemann_host const *host, /* {{{ */
535                 data_set_t const *ds,
536                 value_list_t const *vl)
537 {
538         Msg *msg;
539         size_t i;
540         gauge_t *rates = NULL;
541
542         /* Initialize the Msg structure. */
543         msg = malloc (sizeof (*msg));
544         if (msg == NULL)
545         {
546                 ERROR ("write_riemann plugin: malloc failed.");
547                 return (NULL);
548         }
549         memset (msg, 0, sizeof (*msg));
550         msg__init (msg);
551
552         /* Set up events. First, the list of pointers. */
553         msg->n_events = (size_t) vl->values_len;
554         msg->events = calloc (msg->n_events, sizeof (*msg->events));
555         if (msg->events == NULL)
556         {
557                 ERROR ("write_riemann plugin: calloc failed.");
558                 riemann_msg_protobuf_free (msg);
559                 return (NULL);
560         }
561
562         if (host->store_rates)
563         {
564                 rates = uc_get_rate (ds, vl);
565                 if (rates == NULL)
566                 {
567                         ERROR ("write_riemann plugin: uc_get_rate failed.");
568                         riemann_msg_protobuf_free (msg);
569                         return (NULL);
570                 }
571         }
572
573         for (i = 0; i < msg->n_events; i++)
574         {
575                 msg->events[i] = riemann_value_to_protobuf (host, ds, vl,
576                                 (int) i, rates);
577                 if (msg->events[i] == NULL)
578                 {
579                         riemann_msg_protobuf_free (msg);
580                         sfree (rates);
581                         return (NULL);
582                 }
583         }
584
585         sfree (rates);
586         return (msg);
587 } /* }}} Msg *riemann_value_list_to_protobuf */
588
589 static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{ */
590 {
591         int                      status;
592         struct riemann_host     *host = ud->data;
593         Msg                     *msg;
594
595         msg = riemann_notification_to_protobuf (host, n);
596         if (msg == NULL)
597                 return (-1);
598
599         status = riemann_send (host, msg);
600         if (status != 0)
601                 ERROR ("write_riemann plugin: riemann_send failed with status %i",
602                                 status);
603
604         riemann_msg_protobuf_free (msg);
605         return (status);
606 } /* }}} int riemann_notification */
607
608 static int riemann_write(const data_set_t *ds, /* {{{ */
609               const value_list_t *vl,
610               user_data_t *ud)
611 {
612         int                      status;
613         struct riemann_host     *host = ud->data;
614         Msg                     *msg;
615
616         msg = riemann_value_list_to_protobuf (host, ds, vl);
617         if (msg == NULL)
618                 return (-1);
619
620         status = riemann_send (host, msg);
621         if (status != 0)
622                 ERROR ("write_riemann plugin: riemann_send failed with status %i",
623                                 status);
624
625         riemann_msg_protobuf_free (msg);
626         return status;
627 } /* }}} int riemann_write */
628
629 static void riemann_free(void *p) /* {{{ */
630 {
631         struct riemann_host     *host = p;
632
633         if (host == NULL)
634                 return;
635
636         pthread_mutex_lock (&host->lock);
637
638         host->reference_count--;
639         if (host->reference_count > 0)
640         {
641                 pthread_mutex_unlock (&host->lock);
642                 return;
643         }
644
645         riemann_disconnect (host);
646
647         sfree(host->service);
648         pthread_mutex_destroy (&host->lock);
649         sfree(host);
650 } /* }}} void riemann_free */
651
652 static int riemann_config_node(oconfig_item_t *ci) /* {{{ */
653 {
654         struct riemann_host     *host = NULL;
655         int                      status = 0;
656         int                      i;
657         oconfig_item_t          *child;
658         char                     callback_name[DATA_MAX_NAME_LEN];
659         user_data_t              ud;
660
661         if ((host = calloc(1, sizeof (*host))) == NULL) {
662                 ERROR ("write_riemann plugin: calloc failed.");
663                 return ENOMEM;
664         }
665         pthread_mutex_init (&host->lock, NULL);
666         host->reference_count = 1;
667         host->node = NULL;
668         host->service = NULL;
669         host->store_rates = 1;
670         host->always_append_ds = 0;
671         host->use_tcp = 0;
672
673         status = cf_util_get_string (ci, &host->name);
674         if (status != 0) {
675                 WARNING("write_riemann plugin: Required host name is missing.");
676                 riemann_free (host);
677                 return -1;
678         }
679
680         for (i = 0; i < ci->children_num; i++) {
681                 /*
682                  * The code here could be simplified but makes room
683                  * for easy adding of new options later on.
684                  */
685                 child = &ci->children[i];
686                 status = 0;
687
688                 if (strcasecmp ("Host", child->key) == 0) {
689                         status = cf_util_get_string (child, &host->node);
690                         if (status != 0)
691                                 break;
692                 } else if (strcasecmp ("Port", child->key) == 0) {
693                         status = cf_util_get_service (child, &host->service);
694                         if (status != 0) {
695                                 ERROR ("write_riemann plugin: Invalid argument "
696                                                 "configured for the \"Port\" "
697                                                 "option.");
698                                 break;
699                         }
700                 } else if (strcasecmp ("Protocol", child->key) == 0) {
701                         char tmp[16];
702                         status = cf_util_get_string_buffer (child,
703                                         tmp, sizeof (tmp));
704                         if (status != 0)
705                         {
706                                 ERROR ("write_riemann plugin: cf_util_get_"
707                                                 "string_buffer failed with "
708                                                 "status %i.", status);
709                                 break;
710                         }
711
712                         if (strcasecmp ("UDP", tmp) == 0)
713                                 host->use_tcp = 0;
714                         else if (strcasecmp ("TCP", tmp) == 0)
715                                 host->use_tcp = 1;
716                         else
717                                 WARNING ("write_riemann plugin: The value "
718                                                 "\"%s\" is not valid for the "
719                                                 "\"Protocol\" option. Use "
720                                                 "either \"UDP\" or \"TCP\".",
721                                                 tmp);
722                 } else if (strcasecmp ("StoreRates", child->key) == 0) {
723                         status = cf_util_get_boolean (child, &host->store_rates);
724                         if (status != 0)
725                                 break;
726                 } else if (strcasecmp ("AlwaysAppendDS", child->key) == 0) {
727                         status = cf_util_get_boolean (child,
728                                         &host->always_append_ds);
729                         if (status != 0)
730                                 break;
731                 } else {
732                         WARNING("write_riemann plugin: ignoring unknown config "
733                                 "option: \"%s\"", child->key);
734                 }
735         }
736         if (status != 0) {
737                 riemann_free (host);
738                 return status;
739         }
740
741         ssnprintf (callback_name, sizeof (callback_name), "write_riemann/%s",
742                         host->name);
743         ud.data = host;
744         ud.free_func = riemann_free;
745
746         pthread_mutex_lock (&host->lock);
747
748         status = plugin_register_write (callback_name, riemann_write, &ud);
749         if (status != 0)
750                 WARNING ("write_riemann plugin: plugin_register_write (\"%s\") "
751                                 "failed with status %i.",
752                                 callback_name, status);
753         else /* success */
754                 host->reference_count++;
755
756         status = plugin_register_notification (callback_name,
757                         riemann_notification, &ud);
758         if (status != 0)
759                 WARNING ("write_riemann plugin: plugin_register_notification (\"%s\") "
760                                 "failed with status %i.",
761                                 callback_name, status);
762         else /* success */
763                 host->reference_count++;
764
765         if (host->reference_count <= 1)
766         {
767                 /* Both callbacks failed => free memory.
768                  * We need to unlock here, because riemann_free() will lock.
769                  * This is not a race condition, because we're the only one
770                  * holding a reference. */
771                 pthread_mutex_unlock (&host->lock);
772                 riemann_free (host);
773                 return (-1);
774         }
775
776         host->reference_count--;
777         pthread_mutex_unlock (&host->lock);
778
779         return status;
780 } /* }}} int riemann_config_node */
781
782 static int riemann_config(oconfig_item_t *ci) /* {{{ */
783 {
784         int              i;
785         oconfig_item_t  *child;
786         int              status;
787
788         for (i = 0; i < ci->children_num; i++)  {
789                 child = &ci->children[i];
790
791                 if (strcasecmp("Node", child->key) == 0) {
792                         riemann_config_node (child);
793                 } else if (strcasecmp(child->key, "tag") == 0) {
794                         char *tmp = NULL;
795                         status = cf_util_get_string(child, &tmp);
796                         if (status != 0)
797                                 continue;
798
799                         strarray_add (&riemann_tags, &riemann_tags_num, tmp);
800                         DEBUG("write_riemann plugin: Got tag: %s", tmp);
801                         sfree (tmp);
802                 } else {
803                         WARNING ("write_riemann plugin: Ignoring unknown "
804                                  "configuration option \"%s\" at top level.",
805                                  child->key);
806                 }
807         }
808         return (0);
809 } /* }}} int riemann_config */
810
811 void module_register(void)
812 {
813         plugin_register_complex_config ("write_riemann", riemann_config);
814 }
815
816 /* vim: set sw=8 sts=8 ts=8 noet : */