From: Pierre-Yves Ritschard Date: Mon, 18 Apr 2016 08:10:47 +0000 (+0200) Subject: Merge branch 'master' into f/riemann/rcc X-Git-Tag: collectd-5.6.0~339^2 X-Git-Url: https://git.octo.it/?a=commitdiff_plain;h=810e8f5d687514841ed0ea445764ccc05e8f5299;hp=8b3afb76805d0e0bbaf2b7ef3f37c07c1ebf0ffb;p=collectd.git Merge branch 'master' into f/riemann/rcc --- diff --git a/README b/README index 76b5487a..5f249454 100644 --- a/README +++ b/README @@ -767,8 +767,7 @@ Prerequisites * libprotobuf-c, protoc-c (optional) Used by the `pinba' plugin to generate a parser for the network packets - sent by the Pinba PHP extension, and by the `write_riemann' plugin to - generate events to be sent to a Riemann server. + sent by the Pinba PHP extension. * libpython (optional) @@ -843,6 +842,10 @@ Prerequisites `varnish' plugin. + * riemann-c-client (optional) + For the `write_riemann' plugin. + + Configuring / Compiling / Installing ------------------------------------ diff --git a/configure.ac b/configure.ac index 0eebfee1..de24293c 100644 --- a/configure.ac +++ b/configure.ac @@ -5231,6 +5231,10 @@ PKG_CHECK_MODULES([LIBNOTIFY], [libnotify], [with_libnotify="no (pkg-config doesn't know libnotify)"] ) +PKG_CHECK_MODULES([RIEMANN_C], [riemann-client >= 1.8.0], + [with_riemann_c="yes"], + [with_riemann_c="no (pbg-config doesn't know riemann-c-client)"]) + # Check for enabled/disabled features # @@ -5886,7 +5890,7 @@ AC_PLUGIN([write_kafka], [$with_librdkafka], [Kafka output plugin AC_PLUGIN([write_log], [yes], [Log output plugin]) AC_PLUGIN([write_mongodb], [$with_libmongoc], [MongoDB output plugin]) AC_PLUGIN([write_redis], [$with_libhiredis], [Redis output plugin]) -AC_PLUGIN([write_riemann], [$have_protoc_c], [Riemann output plugin]) +AC_PLUGIN([write_riemann], [$with_riemann_c], [Riemann output plugin]) AC_PLUGIN([write_sensu], [yes], [Sensu output plugin]) AC_PLUGIN([write_tsdb], [yes], [TSDB output plugin]) AC_PLUGIN([xmms], [$with_libxmms], [XMMS statistics]) @@ -6146,6 +6150,7 @@ Configuration: oracle . . . . . . . $with_oracle protobuf-c . . . . . $have_protoc_c python . . . . . . . $with_python + riemann-c-client . . $with_riemann_c Features: daemon mode . . . . . $enable_daemon diff --git a/src/Makefile.am b/src/Makefile.am index 3832bd24..1e3dfbaa 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -120,7 +120,7 @@ if BUILD_PLUGIN_AMQP pkglib_LTLIBRARIES += amqp.la amqp_la_SOURCES = amqp.c \ utils_cmd_putval.c utils_cmd_putval.h \ - utils_parse_option.c utils_parse_option.h \ + utils_parse_option.c utils_parse_option.h \ utils_format_graphite.c utils_format_graphite.h \ utils_format_json.c utils_format_json.h amqp_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBRABBITMQ_LDFLAGS) @@ -1261,9 +1261,8 @@ endif if BUILD_PLUGIN_WRITE_RIEMANN pkglib_LTLIBRARIES += write_riemann.la write_riemann_la_SOURCES = write_riemann.c write_riemann_threshold.c write_riemann_threshold.h -nodist_write_riemann_la_SOURCES = riemann.pb-c.c riemann.pb-c.h -write_riemann_la_LDFLAGS = $(PLUGIN_LDFLAGS) -write_riemann_la_LIBADD = -lprotobuf-c +write_riemann_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(RIEMANN_C_LIBS) +write_riemann_la_CFLAGS = $(AM_CFLAGS) $(RIEMANN_C_CFLAGS) endif if BUILD_PLUGIN_WRITE_SENSU @@ -1386,17 +1385,6 @@ pinba.pb-c.c pinba.pb-c.h: pinba.proto $(AM_V_PROTOC_C)protoc-c -I$(srcdir) --c_out . $(srcdir)/pinba.proto endif -# Protocol buffer for the "write_riemann" plugin. -EXTRA_DIST += riemann.proto -if BUILD_PLUGIN_WRITE_RIEMANN -CLEANFILES += riemann.pb-c.c riemann.pb-c.h - -BUILT_SOURCES += riemann.pb-c.c riemann.pb-c.h - -riemann.pb-c.c riemann.pb-c.h: riemann.proto - $(AM_V_PROTOC_C)protoc-c -I$(srcdir) --c_out . $(srcdir)/riemann.proto -endif - install-exec-hook: $(mkinstalldirs) $(DESTDIR)$(sysconfdir) if test -e $(DESTDIR)$(sysconfdir)/collectd.conf; \ diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 2da6fe40..3f2b2eff 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -2557,8 +2557,8 @@ at least one digit. =item B I|I -Interface name is not unique on Solaris (KSTAT), interface name is unique -only within a module/instance. Following tuple is considered unique: +Interface name is not unique on Solaris (KSTAT), interface name is unique +only within a module/instance. Following tuple is considered unique: (ks_module, ks_instance, ks_name) If this option is set to true, interface name contains above three fields separated by an underscore. For more info on KSTAT, visit @@ -4735,12 +4735,12 @@ I. =item B I -Name in the form of an LDAP distinguished name intended to be used for +Name in the form of an LDAP distinguished name intended to be used for authentication. Defaults to empty string to establish an anonymous authorization. =item B I -Password for simple bind authentication. If this option is not set, +Password for simple bind authentication. If this option is not set, unauthenticated bind operation is used. =item B B @@ -6910,13 +6910,13 @@ fails or if you want to disable this feature. =item B I|I Boolean enabling the collection of the temperature of each core. -This option should only be used if the automated detectionfails or +This option should only be used if the automated detectionfails or if you want to disable this feature. =item B I|I Boolean enabling the collection of the temperature of each package. -This option should only be used if the automated detectionfails or +This option should only be used if the automated detectionfails or if you want to disable this feature. =item B I @@ -7867,11 +7867,26 @@ Hostname or address to connect to. Defaults to C. Service name or port number to connect to. Defaults to C<5555>. -=item B B|B +=item B B|B|B Specify the protocol to use when communicating with I. Defaults to B. +=item B I + +When using the B protocol, path to a PEM certificate to present +to remote host. + +=item B I + +When using the B protocol, path to a PEM CA certificate to +use to validate the remote hosts's identity. + +=item B I + +When using the B protocol, path to a PEM private key associated +with the certificate defined by B. + =item B B|B If set to B and B is set to B, @@ -7891,6 +7906,11 @@ Defaults to true Maximum payload size for a riemann packet. Defaults to 8192 +=item B I + +Maximum amount of seconds to wait in between to batch flushes. +No timeout by default. + =item B B|B If set to B (the default), convert counter values to rates. If set to diff --git a/src/daemon/utils_complain.c b/src/daemon/utils_complain.c index 61936149..1cf6c3aa 100644 --- a/src/daemon/utils_complain.c +++ b/src/daemon/utils_complain.c @@ -102,4 +102,3 @@ void c_do_release (int level, c_complain_t *c, const char *format, ...) } /* c_release */ /* vim: set sw=4 ts=4 tw=78 noexpandtab : */ - diff --git a/src/riemann.proto b/src/riemann.proto deleted file mode 100644 index 3e946a3a..00000000 --- a/src/riemann.proto +++ /dev/null @@ -1,45 +0,0 @@ -option java_package = "com.aphyr.riemann"; -option java_outer_classname = "Proto"; - -message State { - optional int64 time = 1; - optional string state = 2; - optional string service = 3; - optional string host = 4; - optional string description = 5; - optional bool once = 6; - repeated string tags = 7; - optional float ttl = 8; -} - -message Event { - optional int64 time = 1; - optional string state = 2; - optional string service = 3; - optional string host = 4; - optional string description = 5; - repeated string tags = 7; - optional float ttl = 8; - repeated Attribute attributes = 9; - - optional sint64 metric_sint64 = 13; - optional double metric_d = 14; - optional float metric_f = 15; -} - -message Query { - optional string string = 1; -} - -message Msg { - optional bool ok = 2; - optional string error = 3; - repeated State states = 4; - optional Query query = 5; - repeated Event events = 6; -} - -message Attribute { - required string key = 1; - optional string value = 2; -} diff --git a/src/write_riemann.c b/src/write_riemann.c index 8191ae25..2936dfa4 100644 --- a/src/write_riemann.c +++ b/src/write_riemann.c @@ -2,6 +2,7 @@ * collectd - src/write_riemann.c * Copyright (C) 2012,2013 Pierre-Yves Ritschard * Copyright (C) 2013 Florian octo Forster + * Copyright (C) 2015,2016 Gergely Nagy * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -24,12 +25,11 @@ * Authors: * Pierre-Yves Ritschard * Florian octo Forster + * Gergely Nagy */ -#include +#include #include -#include -#include #include #include "collectd.h" @@ -37,19 +37,18 @@ #include "common.h" #include "configfile.h" #include "utils_cache.h" -#include "riemann.pb-c.h" +#include "utils_complain.h" #include "write_riemann_threshold.h" #define RIEMANN_HOST "localhost" -#define RIEMANN_PORT "5555" +#define RIEMANN_PORT 5555 #define RIEMANN_TTL_FACTOR 2.0 #define RIEMANN_BATCH_MAX 8192 struct riemann_host { + c_complain_t init_complaint; char *name; char *event_service_prefix; -#define F_CONNECT 0x01 - uint8_t flags; pthread_mutex_t lock; _Bool batch_mode; _Bool notifications; @@ -57,14 +56,19 @@ struct riemann_host { _Bool store_rates; _Bool always_append_ds; char *node; - char *service; - _Bool use_tcp; - int s; + int port; + riemann_client_type_t client_type; + riemann_client_t *client; double ttl_factor; - Msg *batch_msg; cdtime_t batch_init; int batch_max; + int batch_timeout; int reference_count; + riemann_message_t *batch_msg; + char *tls_ca_file; + char *tls_cert_file; + char *tls_key_file; + struct timeval timeout; }; static char **riemann_tags; @@ -72,331 +76,121 @@ static size_t riemann_tags_num; static char **riemann_attrs; static size_t riemann_attrs_num; -static void riemann_event_protobuf_free (Event *event) /* {{{ */ -{ - size_t i; - - if (event == NULL) - return; - - sfree (event->state); - sfree (event->service); - sfree (event->host); - sfree (event->description); - - strarray_free (event->tags, event->n_tags); - event->tags = NULL; - event->n_tags = 0; - - for (i = 0; i < event->n_attributes; i++) - { - sfree (event->attributes[i]->key); - sfree (event->attributes[i]->value); - sfree (event->attributes[i]); - } - sfree (event->attributes); - event->n_attributes = 0; - - sfree (event); -} /* }}} void riemann_event_protobuf_free */ - -static void riemann_msg_protobuf_free(Msg *msg) /* {{{ */ -{ - size_t i; - - if (msg == NULL) - return; - - for (i = 0; i < msg->n_events; i++) - { - riemann_event_protobuf_free (msg->events[i]); - msg->events[i] = NULL; - } - - sfree (msg->events); - msg->n_events = 0; - - sfree (msg); -} /* }}} void riemann_msg_protobuf_free */ - /* host->lock must be held when calling this function. */ -static int riemann_connect(struct riemann_host *host) /* {{{ */ +static int wrr_connect(struct riemann_host *host) /* {{{ */ { - int e; - struct addrinfo *ai, *res, hints; char const *node; - char const *service; + int port; - if (host->flags & F_CONNECT) + if (host->client) return 0; - memset(&hints, 0, sizeof(hints)); - memset(&service, 0, sizeof(service)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = host->use_tcp ? SOCK_STREAM : SOCK_DGRAM; -#ifdef AI_ADDRCONFIG - hints.ai_flags |= AI_ADDRCONFIG; -#endif - node = (host->node != NULL) ? host->node : RIEMANN_HOST; - service = (host->service != NULL) ? host->service : RIEMANN_PORT; - - if ((e = getaddrinfo(node, service, &hints, &res)) != 0) { - ERROR ("write_riemann plugin: Unable to resolve host \"%s\": %s", - node, gai_strerror(e)); + port = (host->port) ? host->port : RIEMANN_PORT; + + host->client = NULL; + + host->client = riemann_client_create(host->client_type, node, port, + RIEMANN_CLIENT_OPTION_TLS_CA_FILE, host->tls_ca_file, + RIEMANN_CLIENT_OPTION_TLS_CERT_FILE, host->tls_cert_file, + RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file, + RIEMANN_CLIENT_OPTION_NONE); + if (host->client == NULL) { + c_complain (LOG_ERR, &host->init_complaint, + "write_riemann plugin: Unable to connect to Riemann at %s:%d", + node, port); return -1; } - - host->s = -1; - for (ai = res; ai != NULL; ai = ai->ai_next) { - if ((host->s = socket(ai->ai_family, - ai->ai_socktype, - ai->ai_protocol)) == -1) { - continue; + if (host->timeout.tv_sec != 0) { + if (riemann_client_set_timeout(host->client, &host->timeout) != 0) { + riemann_client_free(host->client); + host->client = NULL; + c_complain (LOG_ERR, &host->init_complaint, + "write_riemann plugin: Unable to connect to Riemann at %s:%d", + node, port); + return -1; } - - if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) { - close(host->s); - host->s = -1; - continue; - } - - host->flags |= F_CONNECT; - DEBUG("write_riemann plugin: got a successful connection for: %s:%s", - node, service); - break; } - freeaddrinfo(res); + c_release (LOG_INFO, &host->init_complaint, + "write_riemann plugin: Successfully connected to %s:%d", + node, port); - if (host->s < 0) { - WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%s", - node, service); - return -1; - } return 0; -} /* }}} int riemann_connect */ +} /* }}} int wrr_connect */ /* host->lock must be held when calling this function. */ -static int riemann_disconnect (struct riemann_host *host) /* {{{ */ +static int wrr_disconnect(struct riemann_host *host) /* {{{ */ { - if ((host->flags & F_CONNECT) == 0) + if (!host->client) return (0); - close (host->s); - host->s = -1; - host->flags &= ~F_CONNECT; + riemann_client_free(host->client); + host->client = NULL; return (0); -} /* }}} int riemann_disconnect */ - -static int riemann_send_msg (struct riemann_host *host, const Msg *msg) /* {{{ */ -{ - int status = 0; - u_char *buffer = NULL; - size_t buffer_len; - - status = riemann_connect (host); - if (status != 0) - return status; - - buffer_len = msg__get_packed_size(msg); - - if (host->use_tcp) - buffer_len += 4; - - buffer = calloc (1, buffer_len); - if (buffer == NULL) { - ERROR ("write_riemann plugin: calloc failed."); - return ENOMEM; - } - - if (host->use_tcp) - { - uint32_t length = htonl ((uint32_t) (buffer_len - 4)); - memcpy (buffer, &length, 4); - msg__pack(msg, buffer + 4); - } - else - { - msg__pack(msg, buffer); - } - - status = (int) swrite (host->s, buffer, buffer_len); - if (status != 0) - { - char errbuf[1024]; - ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s", - (host->node != NULL) ? host->node : RIEMANN_HOST, - (host->service != NULL) ? host->service : RIEMANN_PORT, - sstrerror (errno, errbuf, sizeof (errbuf))); - sfree (buffer); - return -1; - } - - sfree (buffer); - return 0; -} /* }}} int riemann_send_msg */ - -static int riemann_recv_ack(struct riemann_host *host) /* {{{ */ -{ - int status = 0; - Msg *msg = NULL; - uint32_t header; - - status = (int) sread (host->s, &header, 4); - - if (status != 0) - return -1; - - size_t size = ntohl(header); - - // Buffer on the stack since acknowledges are typically small. - u_char buffer[size]; - memset (buffer, 0, size); - - status = (int) sread (host->s, buffer, size); - - if (status != 0) - return status; - - msg = msg__unpack (NULL, size, buffer); - - if (msg == NULL) - return -1; - - if (!msg->ok) - { - ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s", - (host->node != NULL) ? host->node : RIEMANN_HOST, - (host->service != NULL) ? host->service : RIEMANN_PORT, - msg->error); - - msg__free_unpacked(msg, NULL); - return -1; - } - - msg__free_unpacked (msg, NULL); - return 0; -} /* }}} int riemann_recv_ack */ +} /* }}} int wrr_disconnect */ /** - * Function to send messages (Msg) to riemann. + * Function to send messages to riemann. * * Acquires the host lock, disconnects on errors. */ -static int riemann_send(struct riemann_host *host, Msg const *msg) /* {{{ */ +static int wrr_send_nolock(struct riemann_host *host, riemann_message_t *msg) /* {{{ */ { int status = 0; - pthread_mutex_lock (&host->lock); - status = riemann_send_msg(host, msg); + status = wrr_connect(host); + if (status != 0) { + return status; + } + + status = riemann_client_send_message(host->client, msg); if (status != 0) { - riemann_disconnect (host); - pthread_mutex_unlock (&host->lock); + wrr_disconnect(host); return status; } /* * For TCP we need to receive message acknowledgemenent. */ - if (host->use_tcp) + if (host->client_type != RIEMANN_CLIENT_UDP) { - status = riemann_recv_ack(host); + riemann_message_t *response; - if (status != 0) + response = riemann_client_recv_message(host->client); + + if (response == NULL) { - riemann_disconnect (host); - pthread_mutex_unlock (&host->lock); - return status; + wrr_disconnect(host); + return errno; } + riemann_message_free(response); } - pthread_mutex_unlock (&host->lock); return 0; -} /* }}} int riemann_send */ - -static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */ -{ - return (strarray_add (&event->tags, &event->n_tags, tag)); -} /* }}} int riemann_event_add_tag */ +} /* }}} int wrr_send */ -static int riemann_event_add_attribute(Event *event, /* {{{ */ - char const *key, char const *value) +static int wrr_send(struct riemann_host *host, riemann_message_t *msg) { - Attribute **new_attributes; - Attribute *a; - - new_attributes = realloc (event->attributes, - sizeof (*event->attributes) * (event->n_attributes + 1)); - if (new_attributes == NULL) - { - ERROR ("write_riemann plugin: realloc failed."); - return (ENOMEM); - } - event->attributes = new_attributes; + int status = 0; - a = malloc (sizeof (*a)); - if (a == NULL) - { - ERROR ("write_riemann plugin: malloc failed."); - return (ENOMEM); - } - attribute__init (a); - - a->key = strdup (key); - if (value != NULL) - a->value = strdup (value); - - event->attributes[event->n_attributes] = a; - event->n_attributes++; - - return (0); -} /* }}} int riemann_event_add_attribute */ + pthread_mutex_lock (&host->lock); + status = wrr_send_nolock(host, msg); + pthread_mutex_unlock (&host->lock); + return status; +} -static Msg *riemann_notification_to_protobuf(struct riemann_host *host, /* {{{ */ +static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */ notification_t const *n) { - Msg *msg; - Event *event; + riemann_message_t *msg; + riemann_event_t *event; char service_buffer[6 * DATA_MAX_NAME_LEN]; char const *severity; notification_meta_t *meta; size_t i; - msg = calloc (1, sizeof (*msg)); - if (msg == NULL) - { - ERROR ("write_riemann plugin: calloc failed."); - return (NULL); - } - msg__init (msg); - - msg->events = malloc (sizeof (*msg->events)); - if (msg->events == NULL) - { - ERROR ("write_riemann plugin: malloc failed."); - sfree (msg); - return (NULL); - } - - event = calloc (1, sizeof (*event)); - if (event == NULL) - { - ERROR ("write_riemann plugin: calloc failed."); - sfree (msg->events); - sfree (msg); - return (NULL); - } - event__init (event); - - msg->events[0] = event; - msg->n_events = 1; - - event->host = strdup (n->host); - event->time = CDTIME_T_TO_TIME_T (n->time); - event->has_time = 1; - switch (n->severity) { case NOTIF_OKAY: severity = "ok"; break; @@ -404,352 +198,363 @@ static Msg *riemann_notification_to_protobuf(struct riemann_host *host, /* {{{ * case NOTIF_FAILURE: severity = "critical"; break; default: severity = "unknown"; } - event->state = strdup (severity); - riemann_event_add_tag (event, "notification"); + format_name(service_buffer, sizeof(service_buffer), + /* host = */ "", n->plugin, n->plugin_instance, + n->type, n->type_instance); + + event = riemann_event_create(RIEMANN_EVENT_FIELD_HOST, n->host, + RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(n->time), + RIEMANN_EVENT_FIELD_TAGS, "notification", NULL, + RIEMANN_EVENT_FIELD_STATE, severity, + RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1], + RIEMANN_EVENT_FIELD_NONE); + if (n->host[0] != 0) - riemann_event_add_attribute (event, "host", n->host); + riemann_event_string_attribute_add(event, "host", n->host); if (n->plugin[0] != 0) - riemann_event_add_attribute (event, "plugin", n->plugin); + riemann_event_string_attribute_add(event, "plugin", n->plugin); if (n->plugin_instance[0] != 0) - riemann_event_add_attribute (event, "plugin_instance", - n->plugin_instance); + riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance); if (n->type[0] != 0) - riemann_event_add_attribute (event, "type", n->type); + riemann_event_string_attribute_add(event, "type", n->type); if (n->type_instance[0] != 0) - riemann_event_add_attribute (event, "type_instance", - n->type_instance); + riemann_event_string_attribute_add(event, "type_instance", n->type_instance); for (i = 0; i < riemann_attrs_num; i += 2) - riemann_event_add_attribute(event, - riemann_attrs[i], - riemann_attrs[i +1]); + riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]); for (i = 0; i < riemann_tags_num; i++) - riemann_event_add_tag (event, riemann_tags[i]); - - format_name (service_buffer, sizeof (service_buffer), - /* host = */ "", n->plugin, n->plugin_instance, - n->type, n->type_instance); - event->service = strdup (&service_buffer[1]); + riemann_event_tag_add(event, riemann_tags[i]); if (n->message[0] != 0) - riemann_event_add_attribute (event, "description", n->message); + riemann_event_string_attribute_add(event, "description", n->message); /* Pull in values from threshold and add extra attributes */ for (meta = n->meta; meta != NULL; meta = meta->next) { - if (strcasecmp ("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE) + if (strcasecmp("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE) { - event->metric_d = meta->nm_value.nm_double; - event->has_metric_d = 1; + riemann_event_set(event, + RIEMANN_EVENT_FIELD_METRIC_D, + (double) meta->nm_value.nm_double, + RIEMANN_EVENT_FIELD_NONE); continue; } if (meta->type == NM_TYPE_STRING) { - riemann_event_add_attribute (event, meta->name, meta->nm_value.nm_string); + riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string); continue; } } - DEBUG ("write_riemann plugin: Successfully created protobuf for notification: " - "host = \"%s\", service = \"%s\", state = \"%s\"", - event->host, event->service, event->state); + msg = riemann_message_create_with_events(event, NULL); + if (msg == NULL) + { + ERROR("write_riemann plugin: riemann_message_create_with_events() failed."); + riemann_event_free (event); + return (NULL); + } + + DEBUG("write_riemann plugin: Successfully created message for notification: " + "host = \"%s\", service = \"%s\", state = \"%s\"", + event->host, event->service, event->state); return (msg); -} /* }}} Msg *riemann_notification_to_protobuf */ +} /* }}} riemann_message_t *wrr_notification_to_message */ -static Event *riemann_value_to_protobuf(struct riemann_host const *host, /* {{{ */ - data_set_t const *ds, - value_list_t const *vl, size_t index, - gauge_t const *rates, - int status) +static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {{{ */ + data_set_t const *ds, + value_list_t const *vl, size_t index, + gauge_t const *rates, + int status) { - Event *event; + riemann_event_t *event; char name_buffer[5 * DATA_MAX_NAME_LEN]; char service_buffer[6 * DATA_MAX_NAME_LEN]; - double ttl; size_t i; - event = calloc (1, sizeof (*event)); + event = riemann_event_new(); if (event == NULL) { - ERROR ("write_riemann plugin: calloc failed."); + ERROR("write_riemann plugin: riemann_event_new() failed."); return (NULL); } - event__init (event); - event->host = strdup (vl->host); - event->time = CDTIME_T_TO_TIME_T (vl->time); - event->has_time = 1; + format_name(name_buffer, sizeof(name_buffer), + /* host = */ "", vl->plugin, vl->plugin_instance, + vl->type, vl->type_instance); + if (host->always_append_ds || (ds->ds_num > 1)) + { + if (host->event_service_prefix == NULL) + ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s", + &name_buffer[1], ds->ds[index].name); + else + ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s", + host->event_service_prefix, &name_buffer[1], ds->ds[index].name); + } + else + { + if (host->event_service_prefix == NULL) + sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer)); + else + ssnprintf(service_buffer, sizeof(service_buffer), "%s%s", + host->event_service_prefix, &name_buffer[1]); + } + + riemann_event_set(event, + RIEMANN_EVENT_FIELD_HOST, vl->host, + RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time), + RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor, + RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, + "plugin", vl->plugin, + "type", vl->type, + "ds_name", ds->ds[index].name, + NULL, + RIEMANN_EVENT_FIELD_SERVICE, service_buffer, + RIEMANN_EVENT_FIELD_NONE); if (host->check_thresholds) { + const char *state = NULL; + switch (status) { case STATE_OKAY: - event->state = strdup("ok"); + state = "ok"; break; case STATE_ERROR: - event->state = strdup("critical"); + state = "critical"; break; case STATE_WARNING: - event->state = strdup("warning"); + state = "warning"; break; case STATE_MISSING: - event->state = strdup("unknown"); + state = "unknown"; break; } + if (state) + riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state, + RIEMANN_EVENT_FIELD_NONE); } - ttl = CDTIME_T_TO_DOUBLE (vl->interval) * host->ttl_factor; - event->ttl = (float) ttl; - event->has_ttl = 1; - - riemann_event_add_attribute (event, "plugin", vl->plugin); if (vl->plugin_instance[0] != 0) - riemann_event_add_attribute (event, "plugin_instance", - vl->plugin_instance); - - riemann_event_add_attribute (event, "type", vl->type); + riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance); if (vl->type_instance[0] != 0) - riemann_event_add_attribute (event, "type_instance", - vl->type_instance); + riemann_event_string_attribute_add(event, "type_instance", vl->type_instance); if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) { char ds_type[DATA_MAX_NAME_LEN]; - ssnprintf (ds_type, sizeof (ds_type), "%s:rate", - DS_TYPE_TO_STRING(ds->ds[index].type)); - riemann_event_add_attribute (event, "ds_type", ds_type); + ssnprintf(ds_type, sizeof(ds_type), "%s:rate", + DS_TYPE_TO_STRING(ds->ds[index].type)); + riemann_event_string_attribute_add(event, "ds_type", ds_type); } else { - riemann_event_add_attribute (event, "ds_type", - DS_TYPE_TO_STRING(ds->ds[index].type)); + riemann_event_string_attribute_add(event, "ds_type", + DS_TYPE_TO_STRING(ds->ds[index].type)); } - riemann_event_add_attribute (event, "ds_name", ds->ds[index].name); + { char ds_index[DATA_MAX_NAME_LEN]; - ssnprintf (ds_index, sizeof (ds_index), "%zu", index); - riemann_event_add_attribute (event, "ds_index", ds_index); + ssnprintf(ds_index, sizeof(ds_index), "%zu", index); + riemann_event_string_attribute_add(event, "ds_index", ds_index); } for (i = 0; i < riemann_attrs_num; i += 2) - riemann_event_add_attribute(event, - riemann_attrs[i], - riemann_attrs[i +1]); + riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]); for (i = 0; i < riemann_tags_num; i++) - riemann_event_add_tag (event, riemann_tags[i]); + riemann_event_tag_add(event, riemann_tags[i]); if (ds->ds[index].type == DS_TYPE_GAUGE) { - event->has_metric_d = 1; - event->metric_d = (double) vl->values[index].gauge; + riemann_event_set(event, + RIEMANN_EVENT_FIELD_METRIC_D, + (double) vl->values[index].gauge, + RIEMANN_EVENT_FIELD_NONE); } else if (rates != NULL) { - event->has_metric_d = 1; - event->metric_d = (double) rates[index]; + riemann_event_set(event, + RIEMANN_EVENT_FIELD_METRIC_D, + (double) rates[index], + RIEMANN_EVENT_FIELD_NONE); } else { - event->has_metric_sint64 = 1; + int64_t metric; + if (ds->ds[index].type == DS_TYPE_DERIVE) - event->metric_sint64 = (int64_t) vl->values[index].derive; + metric = (int64_t) vl->values[index].derive; else if (ds->ds[index].type == DS_TYPE_ABSOLUTE) - event->metric_sint64 = (int64_t) vl->values[index].absolute; + metric = (int64_t) vl->values[index].absolute; else - event->metric_sint64 = (int64_t) vl->values[index].counter; - } + metric = (int64_t) vl->values[index].counter; - format_name (name_buffer, sizeof (name_buffer), - /* host = */ "", vl->plugin, vl->plugin_instance, - vl->type, vl->type_instance); - if (host->always_append_ds || (ds->ds_num > 1)) - { - if (host->event_service_prefix == NULL) - ssnprintf (service_buffer, sizeof (service_buffer), "%s/%s", - &name_buffer[1], ds->ds[index].name); - else - ssnprintf (service_buffer, sizeof (service_buffer), "%s%s/%s", - host->event_service_prefix, &name_buffer[1], ds->ds[index].name); + riemann_event_set(event, + RIEMANN_EVENT_FIELD_METRIC_S64, + (int64_t) metric, + RIEMANN_EVENT_FIELD_NONE); } - else - { - if (host->event_service_prefix == NULL) - sstrncpy (service_buffer, &name_buffer[1], sizeof (service_buffer)); - else - ssnprintf (service_buffer, sizeof (service_buffer), "%s%s", - host->event_service_prefix, &name_buffer[1]); - } - - event->service = strdup (service_buffer); - DEBUG ("write_riemann plugin: Successfully created protobuf for metric: " - "host = \"%s\", service = \"%s\"", - event->host, event->service); + DEBUG("write_riemann plugin: Successfully created message for metric: " + "host = \"%s\", service = \"%s\"", + event->host, event->service); return (event); -} /* }}} Event *riemann_value_to_protobuf */ +} /* }}} riemann_event_t *wrr_value_to_event */ -static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */ - data_set_t const *ds, - value_list_t const *vl, - int *statuses) +static riemann_message_t *wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */ + data_set_t const *ds, + value_list_t const *vl, + int *statuses) { - Msg *msg; + riemann_message_t *msg; size_t i; gauge_t *rates = NULL; /* Initialize the Msg structure. */ - msg = calloc (1, sizeof (*msg)); + msg = riemann_message_new(); if (msg == NULL) { - ERROR ("write_riemann plugin: calloc failed."); - return (NULL); - } - msg__init (msg); - - /* Set up events. First, the list of pointers. */ - msg->n_events = vl->values_len; - msg->events = calloc (msg->n_events, sizeof (*msg->events)); - if (msg->events == NULL) - { - ERROR ("write_riemann plugin: calloc failed."); - riemann_msg_protobuf_free (msg); + ERROR ("write_riemann plugin: riemann_message_new failed."); return (NULL); } if (host->store_rates) { - rates = uc_get_rate (ds, vl); + rates = uc_get_rate(ds, vl); if (rates == NULL) { - ERROR ("write_riemann plugin: uc_get_rate failed."); - riemann_msg_protobuf_free (msg); + ERROR("write_riemann plugin: uc_get_rate failed."); + riemann_message_free(msg); return (NULL); } } - for (i = 0; i < msg->n_events; i++) + for (i = 0; i < vl->values_len; i++) { - msg->events[i] = riemann_value_to_protobuf (host, ds, vl, - (int) i, rates, statuses[i]); - if (msg->events[i] == NULL) + riemann_event_t *event; + + event = wrr_value_to_event(host, ds, vl, + (int) i, rates, statuses[i]); + if (event == NULL) { - riemann_msg_protobuf_free (msg); - sfree (rates); + riemann_message_free(msg); + sfree(rates); return (NULL); } + riemann_message_append_events(msg, event, NULL); } - sfree (rates); + sfree(rates); return (msg); -} /* }}} Msg *riemann_value_list_to_protobuf */ - +} /* }}} riemann_message_t *wrr_value_list_to_message */ /* * Always call while holding host->lock ! */ -static int riemann_batch_flush_nolock (cdtime_t timeout, - struct riemann_host *host) +static int wrr_batch_flush_nolock(cdtime_t timeout, + struct riemann_host *host) { - cdtime_t now; - int status = 0; - - if (timeout > 0) { - now = cdtime (); - if ((host->batch_init + timeout) > now) - return status; - } - riemann_send_msg(host, host->batch_msg); - riemann_msg_protobuf_free(host->batch_msg); + cdtime_t now; + int status = 0; - if (host->use_tcp && ((status = riemann_recv_ack(host)) != 0)) - riemann_disconnect (host); + now = cdtime(); + if (timeout > 0) { + if ((host->batch_init + timeout) > now) { + return status; + } + } + wrr_send_nolock(host, host->batch_msg); + riemann_message_free(host->batch_msg); - host->batch_init = cdtime(); - host->batch_msg = NULL; - return status; + host->batch_init = now; + host->batch_msg = NULL; + return status; } -static int riemann_batch_flush (cdtime_t timeout, +static int wrr_batch_flush(cdtime_t timeout, const char *identifier __attribute__((unused)), user_data_t *user_data) { - struct riemann_host *host; - int status; + struct riemann_host *host; + int status; - if (user_data == NULL) - return (-EINVAL); + if (user_data == NULL) + return (-EINVAL); - host = user_data->data; - pthread_mutex_lock (&host->lock); - status = riemann_batch_flush_nolock (timeout, host); - if (status != 0) - ERROR ("write_riemann plugin: riemann_send failed with status %i", - status); + host = user_data->data; + pthread_mutex_lock(&host->lock); + status = wrr_batch_flush_nolock(timeout, host); + if (status != 0) + c_complain (LOG_ERR, &host->init_complaint, + "write_riemann plugin: riemann_client_send failed with status %i", + status); + else + c_release (LOG_DEBUG, &host->init_complaint, "write_riemann plugin: batch sent."); - pthread_mutex_unlock(&host->lock); - return status; + pthread_mutex_unlock(&host->lock); + return status; } -static int riemann_batch_add_value_list (struct riemann_host *host, /* {{{ */ - data_set_t const *ds, - value_list_t const *vl, - int *statuses) +static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */ + data_set_t const *ds, + value_list_t const *vl, + int *statuses) { - size_t i; - Event **events; - Msg *msg; - size_t len; - int ret; + riemann_message_t *msg; + size_t len; + int ret; + cdtime_t timeout; - msg = riemann_value_list_to_protobuf (host, ds, vl, statuses); - if (msg == NULL) - return -1; + msg = wrr_value_list_to_message(host, ds, vl, statuses); + if (msg == NULL) + return -1; - pthread_mutex_lock(&host->lock); + pthread_mutex_lock(&host->lock); - if (host->batch_msg == NULL) { - host->batch_msg = msg; - } else { - len = msg->n_events + host->batch_msg->n_events; - events = realloc(host->batch_msg->events, - (len * sizeof(*host->batch_msg->events))); - if (events == NULL) { - pthread_mutex_unlock(&host->lock); - ERROR ("write_riemann plugin: out of memory"); - riemann_msg_protobuf_free (msg); - return -1; - } - host->batch_msg->events = events; + if (host->batch_msg == NULL) { + host->batch_msg = msg; + } else { + int status; - for (i = host->batch_msg->n_events; i < len; i++) - host->batch_msg->events[i] = msg->events[i - host->batch_msg->n_events]; + status = riemann_message_append_events_n(host->batch_msg, + msg->n_events, + msg->events); + msg->n_events = 0; + msg->events = NULL; - host->batch_msg->n_events = len; - sfree (msg->events); - msg->n_events = 0; - sfree (msg); - } + riemann_message_free(msg); - len = msg__get_packed_size(host->batch_msg); - ret = 0; - if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) { - ret = riemann_batch_flush_nolock(0, host); + if (status != 0) { + pthread_mutex_unlock(&host->lock); + ERROR("write_riemann plugin: out of memory"); + return -1; + } + } + + len = riemann_message_get_packed_size(host->batch_msg); + ret = 0; + if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) { + ret = wrr_batch_flush_nolock(0, host); + } else { + if (host->batch_timeout > 0) { + timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout); + ret = wrr_batch_flush_nolock(timeout, host); + } } - pthread_mutex_unlock(&host->lock); - return ret; -} /* }}} Msg *riemann_batch_add_value_list */ + pthread_mutex_unlock(&host->lock); + return ret; +} /* }}} riemann_message_t *wrr_batch_add_value_list */ -static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{ */ +static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */ { int status; struct riemann_host *host = ud->data; - Msg *msg; + riemann_message_t *msg; if (!host->notifications) return 0; @@ -757,322 +562,365 @@ static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{ /* * Never batch for notifications, send them ASAP */ - msg = riemann_notification_to_protobuf (host, n); + msg = wrr_notification_to_message(host, n); if (msg == NULL) return (-1); - status = riemann_send (host, msg); + status = wrr_send(host, msg); if (status != 0) - ERROR ("write_riemann plugin: riemann_send failed with status %i", - status); - - riemann_msg_protobuf_free (msg); + c_complain (LOG_ERR, &host->init_complaint, + "write_riemann plugin: riemann_client_send failed with status %i", + status); + else + c_release (LOG_DEBUG, &host->init_complaint, + "write_riemann plugin: riemann_client_send succeeded"); + + riemann_message_free(msg); return (status); -} /* }}} int riemann_notification */ +} /* }}} int wrr_notification */ -static int riemann_write(const data_set_t *ds, /* {{{ */ - const value_list_t *vl, - user_data_t *ud) +static int wrr_write(const data_set_t *ds, /* {{{ */ + const value_list_t *vl, + user_data_t *ud) { int status = 0; int statuses[vl->values_len]; struct riemann_host *host = ud->data; + riemann_message_t *msg; if (host->check_thresholds) { status = write_riemann_threshold_check(ds, vl, statuses); - if (status != 0) - return status; - } else { - memset (statuses, 0, sizeof (statuses)); - } - - if (host->use_tcp == 1 && host->batch_mode) { - riemann_batch_add_value_list (host, ds, vl, statuses); - } else { - Msg *msg = riemann_value_list_to_protobuf (host, ds, vl, statuses); - if (msg == NULL) - return (-1); - - status = riemann_send (host, msg); - if (status != 0) - ERROR ("write_riemann plugin: riemann_send failed with status %i", status); + if (status != 0) + return status; + } else { + memset (statuses, 0, sizeof (statuses)); + } + + if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) { + wrr_batch_add_value_list(host, ds, vl, statuses); + } else { + msg = wrr_value_list_to_message(host, ds, vl, statuses); + if (msg == NULL) + return (-1); - riemann_msg_protobuf_free (msg); - } + status = wrr_send(host, msg); - return status; -} /* }}} int riemann_write */ + riemann_message_free(msg); + } + return status; +} /* }}} int wrr_write */ -static void riemann_free(void *p) /* {{{ */ +static void wrr_free(void *p) /* {{{ */ { - struct riemann_host *host = p; + struct riemann_host *host = p; - if (host == NULL) - return; + if (host == NULL) + return; - pthread_mutex_lock (&host->lock); + pthread_mutex_lock(&host->lock); - host->reference_count--; - if (host->reference_count > 0) - { - pthread_mutex_unlock (&host->lock); - return; - } + host->reference_count--; + if (host->reference_count > 0) + { + pthread_mutex_unlock(&host->lock); + return; + } - riemann_disconnect (host); + wrr_disconnect(host); - sfree(host->service); - pthread_mutex_destroy (&host->lock); - sfree(host); -} /* }}} void riemann_free */ + pthread_mutex_destroy(&host->lock); + sfree(host); +} /* }}} void wrr_free */ -static int riemann_config_node(oconfig_item_t *ci) /* {{{ */ +static int wrr_config_node(oconfig_item_t *ci) /* {{{ */ { - struct riemann_host *host = NULL; - int status = 0; - int i; - oconfig_item_t *child; - char callback_name[DATA_MAX_NAME_LEN]; - user_data_t ud; - - if ((host = calloc(1, sizeof (*host))) == NULL) { - ERROR ("write_riemann plugin: calloc failed."); - return ENOMEM; - } - pthread_mutex_init (&host->lock, NULL); - host->reference_count = 1; - host->node = NULL; - host->service = NULL; - host->notifications = 1; - host->check_thresholds = 0; - host->store_rates = 1; - host->always_append_ds = 0; - host->use_tcp = 1; - host->batch_mode = 1; - host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */ - host->batch_init = cdtime(); - host->ttl_factor = RIEMANN_TTL_FACTOR; - - status = cf_util_get_string (ci, &host->name); - if (status != 0) { - WARNING("write_riemann plugin: Required host name is missing."); - riemann_free (host); - return -1; - } - - for (i = 0; i < ci->children_num; i++) { - /* - * The code here could be simplified but makes room - * for easy adding of new options later on. - */ - child = &ci->children[i]; - status = 0; - - if (strcasecmp ("Host", child->key) == 0) { - status = cf_util_get_string (child, &host->node); - if (status != 0) - break; - } else if (strcasecmp ("Notifications", child->key) == 0) { - status = cf_util_get_boolean(child, &host->notifications); - if (status != 0) - break; - } else if (strcasecmp ("EventServicePrefix", child->key) == 0) { - status = cf_util_get_string (child, &host->event_service_prefix); - if (status != 0) - break; - } else if (strcasecmp ("CheckThresholds", child->key) == 0) { - status = cf_util_get_boolean(child, &host->check_thresholds); - if (status != 0) - break; - } else if (strcasecmp ("Batch", child->key) == 0) { - status = cf_util_get_boolean(child, &host->batch_mode); - if (status != 0) - break; - } else if (strcasecmp("BatchMaxSize", child->key) == 0) { - status = cf_util_get_int(child, &host->batch_max); - if (status != 0) - break; - } else if (strcasecmp ("Port", child->key) == 0) { - status = cf_util_get_service (child, &host->service); - if (status != 0) { - ERROR ("write_riemann plugin: Invalid argument " - "configured for the \"Port\" " - "option."); - break; - } - } else if (strcasecmp ("Protocol", child->key) == 0) { - char tmp[16]; - status = cf_util_get_string_buffer (child, - tmp, sizeof (tmp)); - if (status != 0) - { - ERROR ("write_riemann plugin: cf_util_get_" - "string_buffer failed with " - "status %i.", status); - break; - } - - if (strcasecmp ("UDP", tmp) == 0) - host->use_tcp = 0; - else if (strcasecmp ("TCP", tmp) == 0) - host->use_tcp = 1; - else - WARNING ("write_riemann plugin: The value " - "\"%s\" is not valid for the " - "\"Protocol\" option. Use " - "either \"UDP\" or \"TCP\".", - tmp); - } else if (strcasecmp ("StoreRates", child->key) == 0) { - status = cf_util_get_boolean (child, &host->store_rates); - if (status != 0) - break; - } else if (strcasecmp ("AlwaysAppendDS", child->key) == 0) { - status = cf_util_get_boolean (child, - &host->always_append_ds); - if (status != 0) - break; - } else if (strcasecmp ("TTLFactor", child->key) == 0) { - double tmp = NAN; - status = cf_util_get_double (child, &tmp); - if (status != 0) - break; - if (tmp >= 2.0) { - host->ttl_factor = tmp; - } else if (tmp >= 1.0) { - NOTICE ("write_riemann plugin: The configured " - "TTLFactor is very small " - "(%.1f). A value of 2.0 or " - "greater is recommended.", - tmp); - host->ttl_factor = tmp; - } else if (tmp > 0.0) { - WARNING ("write_riemann plugin: The configured " - "TTLFactor is too small to be " - "useful (%.1f). I'll use it " - "since the user knows best, " - "but under protest.", - tmp); - host->ttl_factor = tmp; - } else { /* zero, negative and NAN */ - ERROR ("write_riemann plugin: The configured " - "TTLFactor is invalid (%.1f).", - tmp); - } - } else { - WARNING("write_riemann plugin: ignoring unknown config " - "option: \"%s\"", child->key); - } - } - if (status != 0) { - riemann_free (host); - return status; - } - - ssnprintf (callback_name, sizeof (callback_name), "write_riemann/%s", - host->name); - ud.data = host; - ud.free_func = riemann_free; - - pthread_mutex_lock (&host->lock); - - status = plugin_register_write (callback_name, riemann_write, &ud); + struct riemann_host *host = NULL; + int status = 0; + int i; + oconfig_item_t *child; + char callback_name[DATA_MAX_NAME_LEN]; + user_data_t ud; + + if ((host = calloc(1, sizeof(*host))) == NULL) { + ERROR ("write_riemann plugin: calloc failed."); + return ENOMEM; + } + pthread_mutex_init(&host->lock, NULL); + C_COMPLAIN_INIT (&host->init_complaint); + host->reference_count = 1; + host->node = NULL; + host->port = 0; + host->notifications = 1; + host->check_thresholds = 0; + host->store_rates = 1; + host->always_append_ds = 0; + host->batch_mode = 1; + host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */ + host->batch_init = cdtime(); + host->batch_timeout = 0; + host->ttl_factor = RIEMANN_TTL_FACTOR; + host->client = NULL; + host->client_type = RIEMANN_CLIENT_TCP; + host->timeout.tv_sec = 0; + host->timeout.tv_usec = 0; + + status = cf_util_get_string(ci, &host->name); + if (status != 0) { + WARNING("write_riemann plugin: Required host name is missing."); + wrr_free(host); + return -1; + } + + for (i = 0; i < ci->children_num; i++) { + /* + * The code here could be simplified but makes room + * for easy adding of new options later on. + */ + child = &ci->children[i]; + status = 0; + + if (strcasecmp("Host", child->key) == 0) { + status = cf_util_get_string(child, &host->node); + if (status != 0) + break; + } else if (strcasecmp("Notifications", child->key) == 0) { + status = cf_util_get_boolean(child, &host->notifications); + if (status != 0) + break; + } else if (strcasecmp("EventServicePrefix", child->key) == 0) { + status = cf_util_get_string(child, &host->event_service_prefix); + if (status != 0) + break; + } else if (strcasecmp("CheckThresholds", child->key) == 0) { + status = cf_util_get_boolean(child, &host->check_thresholds); + if (status != 0) + break; + } else if (strcasecmp("Batch", child->key) == 0) { + status = cf_util_get_boolean(child, &host->batch_mode); + if (status != 0) + break; + } else if (strcasecmp("BatchMaxSize", child->key) == 0) { + status = cf_util_get_int(child, &host->batch_max); + if (status != 0) + break; + } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) { + status = cf_util_get_int(child, &host->batch_timeout); + if (status != 0) + break; + } else if (strcasecmp("Timeout", child->key) == 0) { + status = cf_util_get_int(child, (int *)&host->timeout.tv_sec); + if (status != 0) + break; + } else if (strcasecmp("Port", child->key) == 0) { + host->port = cf_util_get_port_number(child); + if (host->port == -1) { + ERROR("write_riemann plugin: Invalid argument " + "configured for the \"Port\" " + "option."); + break; + } + } else if (strcasecmp("Protocol", child->key) == 0) { + char tmp[16]; + status = cf_util_get_string_buffer(child, + tmp, sizeof(tmp)); + if (status != 0) + { + ERROR("write_riemann plugin: cf_util_get_" + "string_buffer failed with " + "status %i.", status); + break; + } - if (host->use_tcp == 1 && host->batch_mode) { - ud.free_func = NULL; - plugin_register_flush(callback_name, riemann_batch_flush, &ud); + if (strcasecmp("UDP", tmp) == 0) + host->client_type = RIEMANN_CLIENT_UDP; + else if (strcasecmp("TCP", tmp) == 0) + host->client_type = RIEMANN_CLIENT_TCP; + else if (strcasecmp("TLS", tmp) == 0) + host->client_type = RIEMANN_CLIENT_TLS; + else + WARNING("write_riemann plugin: The value " + "\"%s\" is not valid for the " + "\"Protocol\" option. Use " + "either \"UDP\", \"TCP\" or \"TLS\".", + tmp); + } else if (strcasecmp("TLSCAFile", child->key) == 0) { + status = cf_util_get_string(child, &host->tls_ca_file); + if (status != 0) + { + ERROR("write_riemann plugin: cf_util_get_" + "string_buffer failed with " + "status %i.", status); + break; + } + } else if (strcasecmp("TLSCertFile", child->key) == 0) { + status = cf_util_get_string(child, &host->tls_cert_file); + if (status != 0) + { + ERROR("write_riemann plugin: cf_util_get_" + "string_buffer failed with " + "status %i.", status); + break; + } + } else if (strcasecmp("TLSKeyFile", child->key) == 0) { + status = cf_util_get_string(child, &host->tls_key_file); + if (status != 0) + { + ERROR("write_riemann plugin: cf_util_get_" + "string_buffer failed with " + "status %i.", status); + break; + } + } else if (strcasecmp("StoreRates", child->key) == 0) { + status = cf_util_get_boolean(child, &host->store_rates); + if (status != 0) + break; + } else if (strcasecmp("AlwaysAppendDS", child->key) == 0) { + status = cf_util_get_boolean(child, + &host->always_append_ds); + if (status != 0) + break; + } else if (strcasecmp("TTLFactor", child->key) == 0) { + double tmp = NAN; + status = cf_util_get_double(child, &tmp); + if (status != 0) + break; + if (tmp >= 2.0) { + host->ttl_factor = tmp; + } else if (tmp >= 1.0) { + NOTICE("write_riemann plugin: The configured " + "TTLFactor is very small " + "(%.1f). A value of 2.0 or " + "greater is recommended.", + tmp); + host->ttl_factor = tmp; + } else if (tmp > 0.0) { + WARNING("write_riemann plugin: The configured " + "TTLFactor is too small to be " + "useful (%.1f). I'll use it " + "since the user knows best, " + "but under protest.", + tmp); + host->ttl_factor = tmp; + } else { /* zero, negative and NAN */ + ERROR("write_riemann plugin: The configured " + "TTLFactor is invalid (%.1f).", + tmp); + } + } else { + WARNING("write_riemann plugin: ignoring unknown config " + "option: \"%s\"", child->key); + } + } + if (status != 0) { + wrr_free(host); + return status; + } + + ssnprintf(callback_name, sizeof(callback_name), "write_riemann/%s", + host->name); + ud.data = host; + ud.free_func = wrr_free; + + pthread_mutex_lock(&host->lock); + + status = plugin_register_write(callback_name, wrr_write, &ud); + + if (host->client_type != RIEMANN_CLIENT_UDP && host->batch_mode) { + ud.free_func = NULL; + plugin_register_flush(callback_name, wrr_batch_flush, &ud); + } + if (status != 0) + WARNING("write_riemann plugin: plugin_register_write (\"%s\") " + "failed with status %i.", + callback_name, status); + else /* success */ + host->reference_count++; + + status = plugin_register_notification(callback_name, + wrr_notification, &ud); + if (status != 0) + WARNING("write_riemann plugin: plugin_register_notification (\"%s\") " + "failed with status %i.", + callback_name, status); + else /* success */ + host->reference_count++; + + if (host->reference_count <= 1) + { + /* Both callbacks failed => free memory. + * We need to unlock here, because riemann_free() will lock. + * This is not a race condition, because we're the only one + * holding a reference. */ + pthread_mutex_unlock(&host->lock); + wrr_free(host); + return (-1); } - if (status != 0) - WARNING ("write_riemann plugin: plugin_register_write (\"%s\") " - "failed with status %i.", - callback_name, status); - else /* success */ - host->reference_count++; - - status = plugin_register_notification (callback_name, - riemann_notification, &ud); - if (status != 0) - WARNING ("write_riemann plugin: plugin_register_notification (\"%s\") " - "failed with status %i.", - callback_name, status); - else /* success */ - host->reference_count++; - - if (host->reference_count <= 1) - { - /* Both callbacks failed => free memory. - * We need to unlock here, because riemann_free() will lock. - * This is not a race condition, because we're the only one - * holding a reference. */ - pthread_mutex_unlock (&host->lock); - riemann_free (host); - return (-1); - } - host->reference_count--; - pthread_mutex_unlock (&host->lock); + host->reference_count--; + pthread_mutex_unlock(&host->lock); - return status; -} /* }}} int riemann_config_node */ + return status; +} /* }}} int wrr_config_node */ -static int riemann_config(oconfig_item_t *ci) /* {{{ */ +static int wrr_config(oconfig_item_t *ci) /* {{{ */ { - int i; - oconfig_item_t *child; - int status; - - for (i = 0; i < ci->children_num; i++) { - child = &ci->children[i]; - - if (strcasecmp("Node", child->key) == 0) { - riemann_config_node (child); - } else if (strcasecmp(child->key, "attribute") == 0) { - char *key = NULL; - char *val = NULL; - - if (child->values_num != 2) { - WARNING("riemann attributes need both a key and a value."); - return (-1); - } - if (child->values[0].type != OCONFIG_TYPE_STRING || - child->values[1].type != OCONFIG_TYPE_STRING) { - WARNING("riemann attribute needs string arguments."); - return (-1); - } - if ((key = strdup(child->values[0].value.string)) == NULL) { - WARNING("cannot allocate memory for attribute key."); - return (-1); - } - if ((val = strdup(child->values[1].value.string)) == NULL) { - WARNING("cannot allocate memory for attribute value."); - sfree (key); - return (-1); - } - strarray_add(&riemann_attrs, &riemann_attrs_num, key); - strarray_add(&riemann_attrs, &riemann_attrs_num, val); - DEBUG("write_riemann: got attr: %s => %s", key, val); - sfree(key); - sfree(val); - } else if (strcasecmp(child->key, "tag") == 0) { - char *tmp = NULL; - status = cf_util_get_string(child, &tmp); - if (status != 0) - continue; - - strarray_add (&riemann_tags, &riemann_tags_num, tmp); - DEBUG("write_riemann plugin: Got tag: %s", tmp); - sfree (tmp); - } else { - WARNING ("write_riemann plugin: Ignoring unknown " - "configuration option \"%s\" at top level.", - child->key); - } - } - return (0); -} /* }}} int riemann_config */ + int i; + oconfig_item_t *child; + int status; + + for (i = 0; i < ci->children_num; i++) { + child = &ci->children[i]; + + if (strcasecmp("Node", child->key) == 0) { + wrr_config_node (child); + } else if (strcasecmp(child->key, "attribute") == 0) { + char *key = NULL; + char *val = NULL; + + if (child->values_num != 2) { + WARNING("riemann attributes need both a key and a value."); + return (-1); + } + if (child->values[0].type != OCONFIG_TYPE_STRING || + child->values[1].type != OCONFIG_TYPE_STRING) { + WARNING("riemann attribute needs string arguments."); + return (-1); + } + if ((key = strdup(child->values[0].value.string)) == NULL) { + WARNING("cannot allocate memory for attribute key."); + return (-1); + } + if ((val = strdup(child->values[1].value.string)) == NULL) { + WARNING("cannot allocate memory for attribute value."); + sfree(key); + return (-1); + } + strarray_add(&riemann_attrs, &riemann_attrs_num, key); + strarray_add(&riemann_attrs, &riemann_attrs_num, val); + DEBUG("write_riemann: got attr: %s => %s", key, val); + sfree(key); + sfree(val); + } else if (strcasecmp(child->key, "tag") == 0) { + char *tmp = NULL; + status = cf_util_get_string(child, &tmp); + if (status != 0) + continue; + + strarray_add(&riemann_tags, &riemann_tags_num, tmp); + DEBUG("write_riemann plugin: Got tag: %s", tmp); + sfree(tmp); + } else { + WARNING("write_riemann plugin: Ignoring unknown " + "configuration option \"%s\" at top level.", + child->key); + } + } + return (0); +} /* }}} int wrr_config */ void module_register(void) { - plugin_register_complex_config ("write_riemann", riemann_config); + plugin_register_complex_config("write_riemann", wrr_config); } /* vim: set sw=8 sts=8 ts=8 noet : */