Merge branch 'collectd-5.5' into collectd-5.6
authorFlorian Forster <octo@collectd.org>
Mon, 7 Nov 2016 07:43:17 +0000 (08:43 +0100)
committerFlorian Forster <octo@collectd.org>
Mon, 7 Nov 2016 07:48:45 +0000 (08:48 +0100)
1  2 
configure.ac
src/apcups.c
src/modbus.c
src/write_kafka.c
src/write_riemann.c
src/write_riemann_threshold.c

diff --cc configure.ac
Simple merge
diff --cc src/apcups.c
@@@ -252,8 -263,8 +252,8 @@@ static int net_send (int *sockfd, cons
  }
  
  /* Get and print status from apcupsd NIS server */
 -static int apc_query_server (char *host, int port,
 +static int apc_query_server (char const *node, char const *service,
-               struct apc_detail_s *apcups_detail)
+               apc_detail_t *apcups_detail)
  {
        int     n;
        char    recvline[1024];
        return (0);
  }
  
 -static int apcups_config (const char *key, const char *value)
 +static int apcups_config (oconfig_item_t *ci)
  {
 -      if (strcasecmp (key, "host") == 0)
 -      {
 -              if (conf_host != NULL)
 -              {
 -                      free (conf_host);
 -                      conf_host = NULL;
 -              }
 -              if ((conf_host = strdup (value)) == NULL)
 -                      return (1);
 -      }
 -      else if (strcasecmp (key, "Port") == 0)
 +      _Bool persistent_conn_set = 0;
 +
 +      for (int i = 0; i < ci->children_num; i++)
        {
 -              int port_tmp = atoi (value);
 -              if (port_tmp < 1 || port_tmp > 65535)
 -              {
 -                      WARNING ("apcups plugin: Invalid port: %i", port_tmp);
 -                      return (1);
 +              oconfig_item_t *child = ci->children + i;
 +
 +              if (strcasecmp (child->key, "Host") == 0)
 +                      cf_util_get_string (child, &conf_node);
 +              else if (strcasecmp (child->key, "Port") == 0)
 +                      cf_util_get_service (child, &conf_service);
 +              else if (strcasecmp (child->key, "ReportSeconds") == 0)
 +                      cf_util_get_boolean (child, &conf_report_seconds);
 +              else if (strcasecmp (child->key, "PersistentConnection") == 0) {
 +                      cf_util_get_boolean (child, &conf_persistent_conn);
 +                      persistent_conn_set = 1;
                }
 -              conf_port = port_tmp;
 -      }
 -      else if (strcasecmp (key, "ReportSeconds") == 0)
 -      {
 -              if (IS_TRUE (value))
 -                      conf_report_seconds = 1;
                else
 -                      conf_report_seconds = 0;
 +                      ERROR ("apcups plugin: Unknown config option \"%s\".", child->key);
        }
 -      else
 -      {
 -              return (-1);
 +
 +      if (!persistent_conn_set) {
 +              double interval = CDTIME_T_TO_DOUBLE(plugin_get_interval());
 +              if (interval > APCUPS_SERVER_TIMEOUT) {
 +                      NOTICE ("apcups plugin: Plugin poll interval set to %.3f seconds. "
 +                              "Apcupsd NIS socket timeout is %.3f seconds, "
 +                              "PersistentConnection disabled by default.",
 +                              interval, APCUPS_SERVER_TIMEOUT);
 +                      conf_persistent_conn = 0;
 +              }
        }
 +
        return (0);
 -}
 +} /* int apcups_config */
  
- static void apc_submit_generic (const char *type, const char *type_inst, double value)
 -static void apc_submit_generic (char *type, char *type_inst, gauge_t value)
++static void apc_submit_generic (const char *type, const char *type_inst, gauge_t value)
  {
        value_t values[1];
        value_list_t vl = VALUE_LIST_INIT;
@@@ -446,33 -461,27 +450,27 @@@ static void apc_submit (apc_detail_t co
  
  static int apcups_read (void)
  {
-       struct apc_detail_s apcups_detail;
-       int status;
+       apc_detail_t apcups_detail = {
+               .linev    = NAN,
+               .outputv  = NAN,
+               .battv    = NAN,
+               .loadpct  = NAN,
+               .bcharge  = NAN,
+               .timeleft = NAN,
+               .itemp    = NAN,
+               .linefreq = NAN,
+       };
  
-       apcups_detail.linev    =   -1.0;
-       apcups_detail.outputv  =   -1.0;
-       apcups_detail.battv    =   -1.0;
-       apcups_detail.loadpct  =   -1.0;
-       apcups_detail.bcharge  =   -1.0;
-       apcups_detail.timeleft =    NAN;
-       apcups_detail.itemp    = -300.0;
-       apcups_detail.linefreq =   -1.0;
-       status = apc_query_server ((conf_node == NULL) ? APCUPS_DEFAULT_NODE : conf_node,
-                       (conf_service == NULL) ? APCUPS_DEFAULT_SERVICE : conf_service,
-                       &apcups_detail);
-       /*
-        * if we did not connect then do not bother submitting
-        * zeros. We want rrd files to have NAN.
-        */
 -      int status = apc_query_server (conf_host == NULL
 -                      ? APCUPS_DEFAULT_HOST
 -                      : conf_host,
 -                      conf_port, &apcups_detail);
++      int status = apc_query_server (conf_node == NULL
++                      ? APCUPS_DEFAULT_NODE
++                      : conf_node,
++                      conf_service, &apcups_detail);
        if (status != 0)
        {
-               DEBUG ("apcups plugin: apc_query_server (%s, %s) = %i",
-                               (conf_node == NULL) ? APCUPS_DEFAULT_NODE : conf_node,
-                               (conf_service == NULL) ? APCUPS_DEFAULT_SERVICE : conf_service,
-                               status);
-               return (-1);
 -              DEBUG ("apcups plugin: apc_query_server (%s, %i) = %i",
 -                              conf_host == NULL ? APCUPS_DEFAULT_HOST : conf_host,
 -                              conf_port, status);
++              DEBUG ("apcups plugin: apc_query_server (\"%s\", \"%s\") = %d",
++                              conf_node == NULL ? APCUPS_DEFAULT_NODE : conf_node,
++                              conf_service, status);
+               return (status);
        }
  
        apc_submit (&apcups_detail);
diff --cc src/modbus.c
Simple merge
Simple merge
  
  #include "collectd.h"
  
 -#include "plugin.h"
  #include "common.h"
 -#include "configfile.h"
 +#include "plugin.h"
  #include "utils_cache.h"
 -#include "riemann.pb-c.h"
 +#include "utils_complain.h"
  #include "write_riemann_threshold.h"
  
- #include <errno.h>
 -#include <sys/socket.h>
 -#include <arpa/inet.h>
 -#include <netdb.h>
 +#include <riemann/riemann-client.h>
  
 -#define RIEMANN_HOST          "localhost"
 -#define RIEMANN_PORT          "5555"
 -#define RIEMANN_TTL_FACTOR    2.0
 -#define RIEMANN_BATCH_MAX     8192
 +#define RIEMANN_HOST "localhost"
 +#define RIEMANN_PORT 5555
 +#define RIEMANN_TTL_FACTOR 2.0
 +#define RIEMANN_BATCH_MAX 8192
  
  struct riemann_host {
 -      char                    *name;
 -      char                    *event_service_prefix;
 -#define F_CONNECT      0x01
 -      uint8_t                  flags;
 -      pthread_mutex_t  lock;
 -    _Bool            batch_mode;
 -      _Bool            notifications;
 -      _Bool            check_thresholds;
 -      _Bool                    store_rates;
 -      _Bool                    always_append_ds;
 -      char                    *node;
 -      char                    *service;
 -      _Bool                    use_tcp;
 -      int                          s;
 -      double                   ttl_factor;
 -    Msg             *batch_msg;
 -    cdtime_t         batch_init;
 -    int              batch_max;
 -      int                          reference_count;
 +  c_complain_t init_complaint;
 +  char *name;
 +  char *event_service_prefix;
 +  pthread_mutex_t lock;
 +  _Bool batch_mode;
 +  _Bool notifications;
 +  _Bool check_thresholds;
 +  _Bool store_rates;
 +  _Bool always_append_ds;
 +  char *node;
 +  int port;
 +  riemann_client_type_t client_type;
 +  riemann_client_t *client;
 +  double ttl_factor;
 +  cdtime_t batch_init;
 +  int batch_max;
 +  int batch_timeout;
 +  int reference_count;
 +  riemann_message_t *batch_msg;
 +  char *tls_ca_file;
 +  char *tls_cert_file;
 +  char *tls_key_file;
 +  struct timeval timeout;
  };
  
 -static char   **riemann_tags;
 -static size_t   riemann_tags_num;
 -static char   **riemann_attrs;
 -static size_t     riemann_attrs_num;
 -
 -static void riemann_event_protobuf_free (Event *event) /* {{{ */
 -{
 -      size_t i;
 -
 -      if (event == NULL)
 -              return;
 -
 -      sfree (event->state);
 -      sfree (event->service);
 -      sfree (event->host);
 -      sfree (event->description);
 -
 -      strarray_free (event->tags, event->n_tags);
 -      event->tags = NULL;
 -      event->n_tags = 0;
 -
 -      for (i = 0; i < event->n_attributes; i++)
 -      {
 -              sfree (event->attributes[i]->key);
 -              sfree (event->attributes[i]->value);
 -              sfree (event->attributes[i]);
 -      }
 -      sfree (event->attributes);
 -      event->n_attributes = 0;
 -
 -      sfree (event);
 -} /* }}} void riemann_event_protobuf_free */
 -
 -static void riemann_msg_protobuf_free(Msg *msg) /* {{{ */
 -{
 -      size_t i;
 -
 -      if (msg == NULL)
 -              return;
 -
 -      for (i = 0; i < msg->n_events; i++)
 -      {
 -              riemann_event_protobuf_free (msg->events[i]);
 -              msg->events[i] = NULL;
 -      }
 -
 -      sfree (msg->events);
 -      msg->n_events = 0;
 -
 -      sfree (msg);
 -} /* }}} void riemann_msg_protobuf_free */
 +static char **riemann_tags;
 +static size_t riemann_tags_num;
 +static char **riemann_attrs;
 +static size_t riemann_attrs_num;
  
  /* host->lock must be held when calling this function. */
 -static int riemann_connect(struct riemann_host *host) /* {{{ */
 +static int wrr_connect(struct riemann_host *host) /* {{{ */
  {
 -      int                      e;
 -      struct addrinfo         *ai, *res, hints;
 -      char const              *node;
 -      char const              *service;
 -
 -      if (host->flags & F_CONNECT)
 -              return 0;
 -
 -      memset(&hints, 0, sizeof(hints));
 -      memset(&service, 0, sizeof(service));
 -      hints.ai_family = AF_UNSPEC;
 -      hints.ai_socktype = host->use_tcp ? SOCK_STREAM : SOCK_DGRAM;
 -#ifdef AI_ADDRCONFIG
 -      hints.ai_flags |= AI_ADDRCONFIG;
 +  char const *node;
 +  int port;
 +
 +  if (host->client)
 +    return 0;
 +
 +  node = (host->node != NULL) ? host->node : RIEMANN_HOST;
 +  port = (host->port) ? host->port : RIEMANN_PORT;
 +
 +  host->client = NULL;
 +
 +  host->client = riemann_client_create(
 +      host->client_type, node, port, RIEMANN_CLIENT_OPTION_TLS_CA_FILE,
 +      host->tls_ca_file, RIEMANN_CLIENT_OPTION_TLS_CERT_FILE,
 +      host->tls_cert_file, RIEMANN_CLIENT_OPTION_TLS_KEY_FILE,
 +      host->tls_key_file, RIEMANN_CLIENT_OPTION_NONE);
 +  if (host->client == NULL) {
 +    c_complain(LOG_ERR, &host->init_complaint,
 +               "write_riemann plugin: Unable to connect to Riemann at %s:%d",
 +               node, port);
 +    return -1;
 +  }
 +#if RCC_VERSION_NUMBER >= 0x010800
 +  if (host->timeout.tv_sec != 0) {
 +    if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
 +      riemann_client_free(host->client);
 +      host->client = NULL;
 +      c_complain(LOG_ERR, &host->init_complaint,
 +                 "write_riemann plugin: Unable to connect to Riemann at %s:%d",
 +                 node, port);
 +      return -1;
 +    }
 +  }
  #endif
  
 -      node = (host->node != NULL) ? host->node : RIEMANN_HOST;
 -      service = (host->service != NULL) ? host->service : RIEMANN_PORT;
 -
 -      if ((e = getaddrinfo(node, service, &hints, &res)) != 0) {
 -              ERROR ("write_riemann plugin: Unable to resolve host \"%s\": %s",
 -                      node, gai_strerror(e));
 -              return -1;
 -      }
 -
 -      host->s = -1;
 -      for (ai = res; ai != NULL; ai = ai->ai_next) {
 -              if ((host->s = socket(ai->ai_family,
 -                                    ai->ai_socktype,
 -                                    ai->ai_protocol)) == -1) {
 -                      continue;
 -              }
 -
 -              if (connect(host->s, ai->ai_addr, ai->ai_addrlen) != 0) {
 -                      close(host->s);
 -                      host->s = -1;
 -                      continue;
 -              }
 -
 -              host->flags |= F_CONNECT;
 -              DEBUG("write_riemann plugin: got a successful connection for: %s:%s",
 -                              node, service);
 -              break;
 -      }
 -
 -      freeaddrinfo(res);
 -
 -      if (host->s < 0) {
 -              WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%s",
 -                              node, service);
 -              return -1;
 -      }
 -      return 0;
 -} /* }}} int riemann_connect */
 +  set_sock_opts(riemann_client_get_fd(host->client));
 +
 +  c_release(LOG_INFO, &host->init_complaint,
 +            "write_riemann plugin: Successfully connected to %s:%d", node,
 +            port);
 +
 +  return 0;
 +} /* }}} int wrr_connect */
  
  /* host->lock must be held when calling this function. */
 -static int riemann_disconnect (struct riemann_host *host) /* {{{ */
 +static int wrr_disconnect(struct riemann_host *host) /* {{{ */
  {
 -      if ((host->flags & F_CONNECT) == 0)
 -              return (0);
 +  if (!host->client)
 +    return (0);
  
 -      close (host->s);
 -      host->s = -1;
 -      host->flags &= ~F_CONNECT;
 +  riemann_client_free(host->client);
 +  host->client = NULL;
  
 -      return (0);
 -} /* }}} int riemann_disconnect */
 +  return (0);
 +} /* }}} int wrr_disconnect */
  
 -static int riemann_send_msg (struct riemann_host *host, const Msg *msg) /* {{{ */
 -{
 -      int status = 0;
 -      u_char *buffer = NULL;
 -      size_t  buffer_len;
 -
 -      status = riemann_connect (host);
 -      if (status != 0)
 -              return status;
 -
 -      buffer_len = msg__get_packed_size(msg);
 -
 -      if (host->use_tcp)
 -              buffer_len += 4;
 -
 -      buffer = malloc (buffer_len);
 -      if (buffer == NULL) {
 -              ERROR ("write_riemann plugin: malloc failed.");
 -              return ENOMEM;
 -      }
 -      memset (buffer, 0, buffer_len);
 -
 -      if (host->use_tcp)
 -      {
 -              uint32_t length = htonl ((uint32_t) (buffer_len - 4));
 -              memcpy (buffer, &length, 4);
 -              msg__pack(msg, buffer + 4);
 -      }
 -      else
 -      {
 -              msg__pack(msg, buffer);
 -      }
 -
 -      status = (int) swrite (host->s, buffer, buffer_len);
 -      if (status != 0)
 -      {
 -              char errbuf[1024];
 -              ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s",
 -                              (host->node != NULL) ? host->node : RIEMANN_HOST,
 -                              (host->service != NULL) ? host->service : RIEMANN_PORT,
 -                              sstrerror (errno, errbuf, sizeof (errbuf)));
 -              sfree (buffer);
 -              return -1;
 -      }
 -
 -      sfree (buffer);
 -      return 0;
 -} /* }}} int riemann_send_msg */
 -
 -static int riemann_recv_ack(struct riemann_host *host) /* {{{ */
 +/**
 + * Function to send messages to riemann.
 + *
 + * Acquires the host lock, disconnects on errors.
 + */
 +static int wrr_send_nolock(struct riemann_host *host,
 +                           riemann_message_t *msg) /* {{{ */
  {
 -      int status = 0;
 -      Msg *msg = NULL;
 -      uint32_t header;
 +  int status = 0;
  
 -      status = (int) sread (host->s, &header, 4);
 +  status = wrr_connect(host);
 +  if (status != 0) {
 +    return status;
 +  }
  
 -      if (status != 0)
 -              return -1;
 +  status = riemann_client_send_message(host->client, msg);
 +  if (status != 0) {
 +    wrr_disconnect(host);
 +    return status;
 +  }
  
 -      size_t size = ntohl(header);
 +  /*
 +   * For TCP we need to receive message acknowledgemenent.
 +   */
 +  if (host->client_type != RIEMANN_CLIENT_UDP) {
 +    riemann_message_t *response;
  
 -      // Buffer on the stack since acknowledges are typically small.
 -      u_char buffer[size];
 -      memset (buffer, 0, size);
 +    response = riemann_client_recv_message(host->client);
  
 -      status = (int) sread (host->s, buffer, size);
 +    if (response == NULL) {
 +      wrr_disconnect(host);
 +      return errno;
 +    }
 +    riemann_message_free(response);
 +  }
  
 -      if (status != 0)
 -              return status;
 +  return 0;
 +} /* }}} int wrr_send */
  
 -      msg = msg__unpack (NULL, size, buffer);
 +static int wrr_send(struct riemann_host *host, riemann_message_t *msg) {
 +  int status = 0;
  
 -      if (msg == NULL)
 -              return -1;
 +  pthread_mutex_lock(&host->lock);
 +  status = wrr_send_nolock(host, msg);
 +  pthread_mutex_unlock(&host->lock);
 +  return status;
 +}
  
 -      if (!msg->ok)
 -      {
 -              ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s",
 -                              (host->node != NULL) ? host->node : RIEMANN_HOST,
 -                              (host->service != NULL) ? host->service : RIEMANN_PORT,
 -                              msg->error);
 +static riemann_message_t *
 +wrr_notification_to_message(struct riemann_host *host, /* {{{ */
 +                            notification_t const *n) {
 +  riemann_message_t *msg;
 +  riemann_event_t *event;
 +  char service_buffer[6 * DATA_MAX_NAME_LEN];
 +  char const *severity;
 +
 +  switch (n->severity) {
 +  case NOTIF_OKAY:
 +    severity = "ok";
 +    break;
 +  case NOTIF_WARNING:
 +    severity = "warning";
 +    break;
 +  case NOTIF_FAILURE:
 +    severity = "critical";
 +    break;
 +  default:
 +    severity = "unknown";
 +  }
 +
 +  format_name(service_buffer, sizeof(service_buffer),
 +              /* host = */ "", n->plugin, n->plugin_instance, n->type,
 +              n->type_instance);
 +
 +  event = riemann_event_create(
 +      RIEMANN_EVENT_FIELD_HOST, n->host, RIEMANN_EVENT_FIELD_TIME,
 +      (int64_t)CDTIME_T_TO_TIME_T(n->time), RIEMANN_EVENT_FIELD_TAGS,
 +      "notification", NULL, RIEMANN_EVENT_FIELD_STATE, severity,
 +      RIEMANN_EVENT_FIELD_SERVICE, &service_buffer[1],
 +      RIEMANN_EVENT_FIELD_NONE);
 +
 +  if (n->host[0] != 0)
 +    riemann_event_string_attribute_add(event, "host", n->host);
 +  if (n->plugin[0] != 0)
 +    riemann_event_string_attribute_add(event, "plugin", n->plugin);
 +  if (n->plugin_instance[0] != 0)
 +    riemann_event_string_attribute_add(event, "plugin_instance",
 +                                       n->plugin_instance);
 +
 +  if (n->type[0] != 0)
 +    riemann_event_string_attribute_add(event, "type", n->type);
 +  if (n->type_instance[0] != 0)
 +    riemann_event_string_attribute_add(event, "type_instance",
 +                                       n->type_instance);
 +
 +  for (size_t i = 0; i < riemann_attrs_num; i += 2)
 +    riemann_event_string_attribute_add(event, riemann_attrs[i],
 +                                       riemann_attrs[i + 1]);
 +
 +  for (size_t i = 0; i < riemann_tags_num; i++)
 +    riemann_event_tag_add(event, riemann_tags[i]);
 +
 +  if (n->message[0] != 0)
 +    riemann_event_string_attribute_add(event, "description", n->message);
 +
 +  /* Pull in values from threshold and add extra attributes */
 +  for (notification_meta_t *meta = n->meta; meta != NULL; meta = meta->next) {
 +    if (strcasecmp("CurrentValue", meta->name) == 0 &&
 +        meta->type == NM_TYPE_DOUBLE) {
 +      riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
 +                        (double)meta->nm_value.nm_double,
 +                        RIEMANN_EVENT_FIELD_NONE);
 +      continue;
 +    }
  
 -              msg__free_unpacked(msg, NULL);
 -              return -1;
 -      }
 +    if (meta->type == NM_TYPE_STRING) {
 +      riemann_event_string_attribute_add(event, meta->name,
 +                                         meta->nm_value.nm_string);
 +      continue;
 +    }
 +  }
 +
 +  msg = riemann_message_create_with_events(event, NULL);
 +  if (msg == NULL) {
 +    ERROR("write_riemann plugin: riemann_message_create_with_events() failed.");
 +    riemann_event_free(event);
 +    return (NULL);
 +  }
 +
 +  DEBUG("write_riemann plugin: Successfully created message for notification: "
 +        "host = \"%s\", service = \"%s\", state = \"%s\"",
 +        event->host, event->service, event->state);
 +  return (msg);
 +} /* }}} riemann_message_t *wrr_notification_to_message */
 +
 +static riemann_event_t *
 +wrr_value_to_event(struct riemann_host const *host, /* {{{ */
 +                   data_set_t const *ds, value_list_t const *vl, size_t index,
 +                   gauge_t const *rates, int status) {
 +  riemann_event_t *event;
 +  char name_buffer[5 * DATA_MAX_NAME_LEN];
 +  char service_buffer[6 * DATA_MAX_NAME_LEN];
 +  size_t i;
 +
 +  event = riemann_event_new();
 +  if (event == NULL) {
 +    ERROR("write_riemann plugin: riemann_event_new() failed.");
 +    return (NULL);
 +  }
 +
 +  format_name(name_buffer, sizeof(name_buffer),
 +              /* host = */ "", vl->plugin, vl->plugin_instance, vl->type,
 +              vl->type_instance);
 +  if (host->always_append_ds || (ds->ds_num > 1)) {
 +    if (host->event_service_prefix == NULL)
 +      ssnprintf(service_buffer, sizeof(service_buffer), "%s/%s",
 +                &name_buffer[1], ds->ds[index].name);
 +    else
 +      ssnprintf(service_buffer, sizeof(service_buffer), "%s%s/%s",
 +                host->event_service_prefix, &name_buffer[1],
 +                ds->ds[index].name);
 +  } else {
 +    if (host->event_service_prefix == NULL)
 +      sstrncpy(service_buffer, &name_buffer[1], sizeof(service_buffer));
 +    else
 +      ssnprintf(service_buffer, sizeof(service_buffer), "%s%s",
 +                host->event_service_prefix, &name_buffer[1]);
 +  }
 +
 +  riemann_event_set(
 +      event, RIEMANN_EVENT_FIELD_HOST, vl->host, RIEMANN_EVENT_FIELD_TIME,
 +      (int64_t)CDTIME_T_TO_TIME_T(vl->time), RIEMANN_EVENT_FIELD_TTL,
 +      (float)CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
 +      RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES, "plugin", vl->plugin, "type",
 +      vl->type, "ds_name", ds->ds[index].name, NULL,
 +      RIEMANN_EVENT_FIELD_SERVICE, service_buffer, RIEMANN_EVENT_FIELD_NONE);
 +
 +  if (host->check_thresholds) {
 +    const char *state = NULL;
 +
 +    switch (status) {
 +    case STATE_OKAY:
 +      state = "ok";
 +      break;
 +    case STATE_ERROR:
 +      state = "critical";
 +      break;
 +    case STATE_WARNING:
 +      state = "warning";
 +      break;
 +    case STATE_MISSING:
 +      state = "unknown";
 +      break;
 +    }
 +    if (state)
 +      riemann_event_set(event, RIEMANN_EVENT_FIELD_STATE, state,
 +                        RIEMANN_EVENT_FIELD_NONE);
 +  }
 +
 +  if (vl->plugin_instance[0] != 0)
 +    riemann_event_string_attribute_add(event, "plugin_instance",
 +                                       vl->plugin_instance);
 +  if (vl->type_instance[0] != 0)
 +    riemann_event_string_attribute_add(event, "type_instance",
 +                                       vl->type_instance);
 +
 +  if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL)) {
 +    char ds_type[DATA_MAX_NAME_LEN];
 +
 +    ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
 +              DS_TYPE_TO_STRING(ds->ds[index].type));
 +    riemann_event_string_attribute_add(event, "ds_type", ds_type);
 +  } else {
 +    riemann_event_string_attribute_add(event, "ds_type",
 +                                       DS_TYPE_TO_STRING(ds->ds[index].type));
 +  }
 +
 +  {
 +    char ds_index[DATA_MAX_NAME_LEN];
 +
 +    ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
 +    riemann_event_string_attribute_add(event, "ds_index", ds_index);
 +  }
 +
 +  for (i = 0; i < riemann_attrs_num; i += 2)
 +    riemann_event_string_attribute_add(event, riemann_attrs[i],
 +                                       riemann_attrs[i + 1]);
 +
 +  for (i = 0; i < riemann_tags_num; i++)
 +    riemann_event_tag_add(event, riemann_tags[i]);
 +
 +  if (ds->ds[index].type == DS_TYPE_GAUGE) {
 +    riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D,
 +                      (double)vl->values[index].gauge,
 +                      RIEMANN_EVENT_FIELD_NONE);
 +  } else if (rates != NULL) {
 +    riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_D, (double)rates[index],
 +                      RIEMANN_EVENT_FIELD_NONE);
 +  } else {
 +    int64_t metric;
 +
 +    if (ds->ds[index].type == DS_TYPE_DERIVE)
 +      metric = (int64_t)vl->values[index].derive;
 +    else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
 +      metric = (int64_t)vl->values[index].absolute;
 +    else
 +      metric = (int64_t)vl->values[index].counter;
 +
 +    riemann_event_set(event, RIEMANN_EVENT_FIELD_METRIC_S64, (int64_t)metric,
 +                      RIEMANN_EVENT_FIELD_NONE);
 +  }
 +
 +  DEBUG("write_riemann plugin: Successfully created message for metric: "
 +        "host = \"%s\", service = \"%s\"",
 +        event->host, event->service);
 +  return (event);
 +} /* }}} riemann_event_t *wrr_value_to_event */
 +
 +static riemann_message_t *
 +wrr_value_list_to_message(struct riemann_host const *host, /* {{{ */
 +                          data_set_t const *ds, value_list_t const *vl,
 +                          int *statuses) {
 +  riemann_message_t *msg;
 +  size_t i;
 +  gauge_t *rates = NULL;
 +
 +  /* Initialize the Msg structure. */
 +  msg = riemann_message_new();
 +  if (msg == NULL) {
 +    ERROR("write_riemann plugin: riemann_message_new failed.");
 +    return (NULL);
 +  }
 +
 +  if (host->store_rates) {
 +    rates = uc_get_rate(ds, vl);
 +    if (rates == NULL) {
 +      ERROR("write_riemann plugin: uc_get_rate failed.");
 +      riemann_message_free(msg);
 +      return (NULL);
 +    }
 +  }
  
 -      msg__free_unpacked (msg, NULL);
 -      return 0;
 -} /* }}} int riemann_recv_ack */
 +  for (i = 0; i < vl->values_len; i++) {
 +    riemann_event_t *event;
  
 -/**
 - * Function to send messages (Msg) to riemann.
 - *
 - * Acquires the host lock, disconnects on errors.
 - */
 -static int riemann_send(struct riemann_host *host, Msg const *msg) /* {{{ */
 -{
 -      int status = 0;
 -      pthread_mutex_lock (&host->lock);
 -
 -      status = riemann_send_msg(host, msg);
 -      if (status != 0) {
 -              riemann_disconnect (host);
 -              pthread_mutex_unlock (&host->lock);
 -              return status;
 -      }
 -
 -      /*
 -       * For TCP we need to receive message acknowledgemenent.
 -       */
 -      if (host->use_tcp)
 -      {
 -              status = riemann_recv_ack(host);
 -
 -              if (status != 0)
 -              {
 -                      riemann_disconnect (host);
 -                      pthread_mutex_unlock (&host->lock);
 -                      return status;
 -              }
 -      }
 -
 -      pthread_mutex_unlock (&host->lock);
 -      return 0;
 -} /* }}} int riemann_send */
 -
 -static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */
 -{
 -      return (strarray_add (&event->tags, &event->n_tags, tag));
 -} /* }}} int riemann_event_add_tag */
 -
 -static int riemann_event_add_attribute(Event *event, /* {{{ */
 -              char const *key, char const *value)
 -{
 -      Attribute **new_attributes;
 -      Attribute *a;
 -
 -      new_attributes = realloc (event->attributes,
 -                      sizeof (*event->attributes) * (event->n_attributes + 1));
 -      if (new_attributes == NULL)
 -      {
 -              ERROR ("write_riemann plugin: realloc failed.");
 -              return (ENOMEM);
 -      }
 -      event->attributes = new_attributes;
 -
 -      a = malloc (sizeof (*a));
 -      if (a == NULL)
 -      {
 -              ERROR ("write_riemann plugin: malloc failed.");
 -              return (ENOMEM);
 -      }
 -      attribute__init (a);
 -
 -      a->key = strdup (key);
 -      if (value != NULL)
 -              a->value = strdup (value);
 -
 -      event->attributes[event->n_attributes] = a;
 -      event->n_attributes++;
 -
 -      return (0);
 -} /* }}} int riemann_event_add_attribute */
 -
 -static Msg *riemann_notification_to_protobuf(struct riemann_host *host, /* {{{ */
 -              notification_t const *n)
 -{
 -      Msg *msg;
 -      Event *event;
 -      char service_buffer[6 * DATA_MAX_NAME_LEN];
 -      char const *severity;
 -      notification_meta_t *meta;
 -      int i;
 -
 -      msg = malloc (sizeof (*msg));
 -      if (msg == NULL)
 -      {
 -              ERROR ("write_riemann plugin: malloc failed.");
 -              return (NULL);
 -      }
 -      memset (msg, 0, sizeof (*msg));
 -      msg__init (msg);
 -
 -      msg->events = malloc (sizeof (*msg->events));
 -      if (msg->events == NULL)
 -      {
 -              ERROR ("write_riemann plugin: malloc failed.");
 -              sfree (msg);
 -              return (NULL);
 -      }
 -
 -      event = malloc (sizeof (*event));
 -      if (event == NULL)
 -      {
 -              ERROR ("write_riemann plugin: malloc failed.");
 -              sfree (msg->events);
 -              sfree (msg);
 -              return (NULL);
 -      }
 -      memset (event, 0, sizeof (*event));
 -      event__init (event);
 -
 -      msg->events[0] = event;
 -      msg->n_events = 1;
 -
 -      event->host = strdup (n->host);
 -      event->time = CDTIME_T_TO_TIME_T (n->time);
 -      event->has_time = 1;
 -
 -      switch (n->severity)
 -      {
 -              case NOTIF_OKAY:        severity = "ok"; break;
 -              case NOTIF_WARNING:     severity = "warning"; break;
 -              case NOTIF_FAILURE:     severity = "critical"; break;
 -              default:                severity = "unknown";
 -      }
 -      event->state = strdup (severity);
 -
 -      riemann_event_add_tag (event, "notification");
 -      if (n->host[0] != 0)
 -              riemann_event_add_attribute (event, "host", n->host);
 -      if (n->plugin[0] != 0)
 -              riemann_event_add_attribute (event, "plugin", n->plugin);
 -      if (n->plugin_instance[0] != 0)
 -              riemann_event_add_attribute (event, "plugin_instance",
 -                              n->plugin_instance);
 -
 -      if (n->type[0] != 0)
 -              riemann_event_add_attribute (event, "type", n->type);
 -      if (n->type_instance[0] != 0)
 -              riemann_event_add_attribute (event, "type_instance",
 -                              n->type_instance);
 -
 -      for (i = 0; i < riemann_attrs_num; i += 2)
 -              riemann_event_add_attribute(event,
 -                                          riemann_attrs[i],
 -                                          riemann_attrs[i +1]);
 -
 -      for (i = 0; i < riemann_tags_num; i++)
 -              riemann_event_add_tag (event, riemann_tags[i]);
 -
 -      format_name (service_buffer, sizeof (service_buffer),
 -                      /* host = */ "", n->plugin, n->plugin_instance,
 -                      n->type, n->type_instance);
 -      event->service = strdup (&service_buffer[1]);
 -
 -      if (n->message[0] != 0)
 -              riemann_event_add_attribute (event, "description", n->message);
 -
 -      /* Pull in values from threshold and add extra attributes */
 -      for (meta = n->meta; meta != NULL; meta = meta->next)
 -      {
 -              if (strcasecmp ("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
 -              {
 -                      event->metric_d = meta->nm_value.nm_double;
 -                      event->has_metric_d = 1;
 -                      continue;
 -              }
 -
 -              if (meta->type == NM_TYPE_STRING) {
 -                      riemann_event_add_attribute (event, meta->name, meta->nm_value.nm_string);
 -                      continue;
 -              }
 -      }
 -
 -      DEBUG ("write_riemann plugin: Successfully created protobuf for notification: "
 -                      "host = \"%s\", service = \"%s\", state = \"%s\"",
 -                      event->host, event->service, event->state);
 -      return (msg);
 -} /* }}} Msg *riemann_notification_to_protobuf */
 -
 -static Event *riemann_value_to_protobuf(struct riemann_host const *host, /* {{{ */
 -              data_set_t const *ds,
 -              value_list_t const *vl, size_t index,
 -                                       gauge_t const *rates,
 -                                       int status)
 -{
 -      Event *event;
 -      char name_buffer[5 * DATA_MAX_NAME_LEN];
 -      char service_buffer[6 * DATA_MAX_NAME_LEN];
 -      double ttl;
 -      int i;
 -
 -      event = malloc (sizeof (*event));
 -      if (event == NULL)
 -      {
 -              ERROR ("write_riemann plugin: malloc failed.");
 -              return (NULL);
 -      }
 -      memset (event, 0, sizeof (*event));
 -      event__init (event);
 -
 -      event->host = strdup (vl->host);
 -      event->time = CDTIME_T_TO_TIME_T (vl->time);
 -      event->has_time = 1;
 -
 -      if (host->check_thresholds) {
 -              switch (status) {
 -                      case STATE_OKAY:
 -                              event->state = strdup("ok");
 -                              break;
 -                      case STATE_ERROR:
 -                              event->state = strdup("critical");
 -                              break;
 -                      case STATE_WARNING:
 -                              event->state = strdup("warning");
 -                              break;
 -                      case STATE_MISSING:
 -                              event->state = strdup("unknown");
 -                              break;
 -              }
 -      }
 -
 -      ttl = CDTIME_T_TO_DOUBLE (vl->interval) * host->ttl_factor;
 -      event->ttl = (float) ttl;
 -      event->has_ttl = 1;
 -
 -      riemann_event_add_attribute (event, "plugin", vl->plugin);
 -      if (vl->plugin_instance[0] != 0)
 -              riemann_event_add_attribute (event, "plugin_instance",
 -                              vl->plugin_instance);
 -
 -      riemann_event_add_attribute (event, "type", vl->type);
 -      if (vl->type_instance[0] != 0)
 -              riemann_event_add_attribute (event, "type_instance",
 -                              vl->type_instance);
 -
 -      if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
 -      {
 -              char ds_type[DATA_MAX_NAME_LEN];
 -
 -              ssnprintf (ds_type, sizeof (ds_type), "%s:rate",
 -                              DS_TYPE_TO_STRING(ds->ds[index].type));
 -              riemann_event_add_attribute (event, "ds_type", ds_type);
 -      }
 -      else
 -      {
 -              riemann_event_add_attribute (event, "ds_type",
 -                              DS_TYPE_TO_STRING(ds->ds[index].type));
 -      }
 -      riemann_event_add_attribute (event, "ds_name", ds->ds[index].name);
 -      {
 -              char ds_index[DATA_MAX_NAME_LEN];
 -
 -              ssnprintf (ds_index, sizeof (ds_index), "%zu", index);
 -              riemann_event_add_attribute (event, "ds_index", ds_index);
 -      }
 -
 -      for (i = 0; i < riemann_attrs_num; i += 2)
 -              riemann_event_add_attribute(event,
 -                                          riemann_attrs[i],
 -                                          riemann_attrs[i +1]);
 -
 -      for (i = 0; i < riemann_tags_num; i++)
 -              riemann_event_add_tag (event, riemann_tags[i]);
 -
 -      if (ds->ds[index].type == DS_TYPE_GAUGE)
 -      {
 -              event->has_metric_d = 1;
 -              event->metric_d = (double) vl->values[index].gauge;
 -      }
 -      else if (rates != NULL)
 -      {
 -              event->has_metric_d = 1;
 -              event->metric_d = (double) rates[index];
 -      }
 -      else
 -      {
 -              event->has_metric_sint64 = 1;
 -              if (ds->ds[index].type == DS_TYPE_DERIVE)
 -                      event->metric_sint64 = (int64_t) vl->values[index].derive;
 -              else if (ds->ds[index].type == DS_TYPE_ABSOLUTE)
 -                      event->metric_sint64 = (int64_t) vl->values[index].absolute;
 -              else
 -                      event->metric_sint64 = (int64_t) vl->values[index].counter;
 -      }
 -
 -      format_name (name_buffer, sizeof (name_buffer),
 -                      /* host = */ "", vl->plugin, vl->plugin_instance,
 -                      vl->type, vl->type_instance);
 -      if (host->always_append_ds || (ds->ds_num > 1))
 -      {
 -              if (host->event_service_prefix == NULL)
 -                      ssnprintf (service_buffer, sizeof (service_buffer), "%s/%s",
 -                                      &name_buffer[1], ds->ds[index].name);
 -              else
 -                      ssnprintf (service_buffer, sizeof (service_buffer), "%s%s/%s",
 -                                      host->event_service_prefix, &name_buffer[1], ds->ds[index].name);
 -      }
 -      else
 -      {
 -              if (host->event_service_prefix == NULL)
 -                      sstrncpy (service_buffer, &name_buffer[1], sizeof (service_buffer));
 -              else
 -                      ssnprintf (service_buffer, sizeof (service_buffer), "%s%s",
 -                                      host->event_service_prefix, &name_buffer[1]);
 -      }
 -
 -      event->service = strdup (service_buffer);
 -
 -      DEBUG ("write_riemann plugin: Successfully created protobuf for metric: "
 -                      "host = \"%s\", service = \"%s\"",
 -                      event->host, event->service);
 -      return (event);
 -} /* }}} Event *riemann_value_to_protobuf */
 -
 -static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */
 -                                          data_set_t const *ds,
 -                                          value_list_t const *vl,
 -                                          int *statuses)
 -{
 -      Msg *msg;
 -      size_t i;
 -      gauge_t *rates = NULL;
 -
 -      /* Initialize the Msg structure. */
 -      msg = malloc (sizeof (*msg));
 -      if (msg == NULL)
 -      {
 -              ERROR ("write_riemann plugin: malloc failed.");
 -              return (NULL);
 -      }
 -      memset (msg, 0, sizeof (*msg));
 -      msg__init (msg);
 -
 -      /* Set up events. First, the list of pointers. */
 -      msg->n_events = (size_t) vl->values_len;
 -      msg->events = calloc (msg->n_events, sizeof (*msg->events));
 -      if (msg->events == NULL)
 -      {
 -              ERROR ("write_riemann plugin: calloc failed.");
 -              riemann_msg_protobuf_free (msg);
 -              return (NULL);
 -      }
 -
 -      if (host->store_rates)
 -      {
 -              rates = uc_get_rate (ds, vl);
 -              if (rates == NULL)
 -              {
 -                      ERROR ("write_riemann plugin: uc_get_rate failed.");
 -                      riemann_msg_protobuf_free (msg);
 -                      return (NULL);
 -              }
 -      }
 -
 -      for (i = 0; i < msg->n_events; i++)
 -      {
 -              msg->events[i] = riemann_value_to_protobuf (host, ds, vl,
 -                                                          (int) i, rates, statuses[i]);
 -              if (msg->events[i] == NULL)
 -              {
 -                      riemann_msg_protobuf_free (msg);
 -                      sfree (rates);
 -                      return (NULL);
 -              }
 -      }
 -
 -      sfree (rates);
 -      return (msg);
 -} /* }}} Msg *riemann_value_list_to_protobuf */
 +    event = wrr_value_to_event(host, ds, vl, (int)i, rates, statuses[i]);
 +    if (event == NULL) {
 +      riemann_message_free(msg);
 +      sfree(rates);
 +      return (NULL);
 +    }
 +    riemann_message_append_events(msg, event, NULL);
 +  }
  
 +  sfree(rates);
 +  return (msg);
 +} /* }}} riemann_message_t *wrr_value_list_to_message */
  
  /*
   * Always call while holding host->lock !
Simple merge