Merge pull request #417 from udoprog/udoprog/riemann
authorPierre-Yves Ritschard <pyr@spootnik.org>
Mon, 30 Sep 2013 08:58:42 +0000 (01:58 -0700)
committerPierre-Yves Ritschard <pyr@spootnik.org>
Mon, 30 Sep 2013 08:58:42 +0000 (01:58 -0700)
write_riemann: Add extra meta strings as attributes in notifications

1  2 
src/write_riemann.c

diff --combined src/write_riemann.c
@@@ -178,30 -178,32 +178,30 @@@ riemann_disconnect (struct riemann_hos
        return (0);
  }
  
 -static int
 -riemann_send(struct riemann_host *host, Msg const *msg)
 +static inline int
 +riemann_send_msg(struct riemann_host *host, const Msg *msg)
  {
 -      u_char *buffer;
 +      int status = 0;
 +      u_char *buffer = NULL;
        size_t  buffer_len;
 -      int status;
 -
 -      pthread_mutex_lock (&host->lock);
  
        status = riemann_connect (host);
 +
        if (status != 0)
 -      {
 -              pthread_mutex_unlock (&host->lock);
                return status;
 -      }
  
        buffer_len = msg__get_packed_size(msg);
 +
        if (host->use_tcp)
                buffer_len += 4;
  
        buffer = malloc (buffer_len);
 +
        if (buffer == NULL) {
 -              pthread_mutex_unlock (&host->lock);
                ERROR ("write_riemann plugin: malloc failed.");
                return ENOMEM;
        }
 +
        memset (buffer, 0, buffer_len);
  
        if (host->use_tcp)
        }
  
        status = (int) swrite (host->s, buffer, buffer_len);
 +
        if (status != 0)
        {
                char errbuf[1024];
  
 -              riemann_disconnect (host);
 -              pthread_mutex_unlock (&host->lock);
 -
                ERROR ("write_riemann plugin: Sending to Riemann at %s:%s failed: %s",
                                (host->node != NULL) ? host->node : RIEMANN_HOST,
                                (host->service != NULL) ? host->service : RIEMANN_PORT,
                                sstrerror (errno, errbuf, sizeof (errbuf)));
 +
                sfree (buffer);
                return -1;
        }
  
 -      pthread_mutex_unlock (&host->lock);
        sfree (buffer);
        return 0;
  }
  
 +static inline int
 +riemann_recv_ack(struct riemann_host *host)
 +{
 +      int status = 0;
 +      Msg *msg = NULL;
 +      uint32_t header;
 +
 +      status = (int) sread (host->s, &header, 4);
 +
 +      if (status != 0)
 +              return -1;
 +
 +      size_t size = ntohl(header);
 +
 +      // Buffer on the stack since acknowledges are typically small.
 +      u_char buffer[size];
 +      memset (buffer, 0, size);
 +
 +      status = (int) sread (host->s, buffer, size);
 +
 +      if (status != 0)
 +              return status;
 +
 +      msg = msg__unpack (NULL, size, buffer);
 +
 +      if (msg == NULL)
 +              return -1;
 +
 +      if (!msg->ok)
 +      {
 +              ERROR ("write_riemann plugin: Sending to Riemann at %s:%s acknowledgement message reported error: %s",
 +                              (host->node != NULL) ? host->node : RIEMANN_HOST,
 +                              (host->service != NULL) ? host->service : RIEMANN_PORT,
 +                              msg->error);
 +
 +              msg__free_unpacked(msg, NULL);
 +              return -1;
 +      }
 +
 +      msg__free_unpacked (msg, NULL);
 +      return 0;
 +}
 +
 +/**
 + * Function to send messages (Msg) to riemann.
 + *
 + * Acquires the host lock, disconnects on errors.
 + */
 +static int
 +riemann_send(struct riemann_host *host, Msg const *msg)
 +{
 +      int status = 0;
 +      pthread_mutex_lock (&host->lock);
 +
 +      status = riemann_send_msg(host, msg);
 +
 +      if (status != 0) {
 +              riemann_disconnect (host);
 +              pthread_mutex_unlock (&host->lock);
 +              return status;
 +      }
 +
 +      /*
 +       * For TCP we need to receive message acknowledgemenent.
 +       */
 +      if (host->use_tcp)
 +      {
 +              status = riemann_recv_ack(host);
 +
 +              if (status != 0)
 +              {
 +                      riemann_disconnect (host);
 +                      pthread_mutex_unlock (&host->lock);
 +                      return status;
 +              }
 +      }
 +
 +      pthread_mutex_unlock (&host->lock);
 +      return 0;
 +}
 +
  static int riemann_event_add_tag (Event *event, char const *tag) /* {{{ */
  {
        return (strarray_add (&event->tags, &event->n_tags, tag));
@@@ -430,15 -353,20 +430,20 @@@ static Msg *riemann_notification_to_pro
                        n->type, n->type_instance);
        event->service = strdup (&service_buffer[1]);
  
-       /* Pull in values from threshold */
+       /* Pull in values from threshold and add extra attributes */
        for (meta = n->meta; meta != NULL; meta = meta->next)
        {
-               if (strcasecmp ("CurrentValue", meta->name) != 0)
+               if (strcasecmp ("CurrentValue", meta->name) == 0 && meta->type == NM_TYPE_DOUBLE)
+               {
+                       event->metric_d = meta->nm_value.nm_double;
+                       event->has_metric_d = 1;
                        continue;
+               }
  
-               event->metric_d = meta->nm_value.nm_double;
-               event->has_metric_d = 1;
-               break;
+               if (meta->type == NM_TYPE_STRING) {
+                       riemann_event_add_attribute (event, meta->name, meta->nm_value.nm_string);
+                       continue;
+               }
        }
  
        DEBUG ("write_riemann plugin: Successfully created protobuf for notification: "