Merge branch 'collectd-5.1'
authorFlorian Forster <octo@collectd.org>
Sun, 23 Sep 2012 10:22:37 +0000 (12:22 +0200)
committerFlorian Forster <octo@collectd.org>
Sun, 23 Sep 2012 10:22:37 +0000 (12:22 +0200)
16 files changed:
AUTHORS
contrib/README
contrib/collectd.service [new file with mode: 0644]
src/Makefile.am
src/amqp.c
src/collectd.conf.in
src/collectd.conf.pod
src/memcached.c
src/oracle.c
src/redis.c
src/swap.c
src/tcpconns.c
src/utils_format_graphite.c [new file with mode: 0644]
src/utils_format_graphite.h [new file with mode: 0644]
src/utils_format_json.c
src/write_graphite.c

diff --git a/AUTHORS b/AUTHORS
index 90560e3..78dbad1 100644 (file)
--- a/AUTHORS
+++ b/AUTHORS
@@ -200,6 +200,9 @@ Sven Trenkel <collectd at semidefinite.de>
  - netapp plugin.
  - python plugin.
 
+Thomas Meson <zllak at hycik.org>
+ - Graphite support for the AMQP plugin.
+
 Tomasz Pala <gotar at pld-linux.org>
  - conntrack plugin.
 
index bc1fe9f..1ebf1f1 100644 (file)
@@ -101,3 +101,8 @@ solaris-smf
 -----------
   Manifest file for the Solaris SMF system and detailed information on how to
 register collectd as a service with this system.
+
+collectd.service
+----------------
+  Service file for systemd. Please ship this file as
+  /lib/systemd/system/collectd.service in any linux package of collectd.
diff --git a/contrib/collectd.service b/contrib/collectd.service
new file mode 100644 (file)
index 0000000..ee4d596
--- /dev/null
@@ -0,0 +1,15 @@
+[Unit]
+Description=statistics collection daemon
+Documentation=man:collectd(1)
+After=local-fs.target network.target
+Requires=local-fs.target network.target
+
+[Service]
+ExecStart=/usr/sbin/collectd -C /etc/collectd/collectd.conf -f
+Restart=always
+RestartSec=10
+StandardOutput=syslog
+StandardError=syslog
+
+[Install]
+WantedBy=multi-user.target
index 8fa0330..eef2a60 100644 (file)
@@ -123,6 +123,7 @@ if BUILD_PLUGIN_AMQP
 pkglib_LTLIBRARIES += amqp.la
 amqp_la_SOURCES = amqp.c \
                  utils_cmd_putval.c utils_cmd_putval.h \
+                 utils_format_graphite.c utils_format_graphite.h \
                  utils_format_json.c utils_format_json.h
 amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS)
 amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS)
@@ -1256,7 +1257,8 @@ endif
 if BUILD_PLUGIN_WRITE_GRAPHITE
 pkglib_LTLIBRARIES += write_graphite.la
 write_graphite_la_SOURCES = write_graphite.c \
-                       utils_format_json.c utils_format_json.h
+                        utils_format_graphite.c utils_format_graphite.h \
+                        utils_format_json.c utils_format_json.h
 write_graphite_la_LDFLAGS = -module -avoid-version
 collectd_LDADD += "-dlopen" write_graphite.la
 collectd_DEPENDENCIES += write_graphite.la
index 89284c8..876f7e9 100644 (file)
@@ -31,6 +31,7 @@
 #include "plugin.h"
 #include "utils_cmd_putval.h"
 #include "utils_format_json.h"
+#include "utils_format_graphite.h"
 
 #include <pthread.h>
 
@@ -42,8 +43,9 @@
 #define CAMQP_DM_VOLATILE   1
 #define CAMQP_DM_PERSISTENT 2
 
-#define CAMQP_FORMAT_COMMAND 1
-#define CAMQP_FORMAT_JSON    2
+#define CAMQP_FORMAT_COMMAND    1
+#define CAMQP_FORMAT_JSON       2
+#define CAMQP_FORMAT_GRAPHITE   3
 
 #define CAMQP_CHANNEL 1
 
@@ -68,6 +70,10 @@ struct camqp_config_s
     uint8_t delivery_mode;
     _Bool   store_rates;
     int     format;
+    /* publish & graphite format only */
+    char    *prefix;
+    char    *postfix;
+    char    escape_char;
 
     /* subscribe only */
     char   *exchange_type;
@@ -129,6 +135,9 @@ static void camqp_config_free (void *ptr) /* {{{ */
     sfree (conf->exchange_type);
     sfree (conf->queue);
     sfree (conf->routing_key);
+    sfree (conf->prefix);
+    sfree (conf->postfix);
+
 
     sfree (conf);
 } /* }}} void camqp_config_free */
@@ -699,6 +708,8 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
         props.content_type = amqp_cstring_bytes("text/collectd");
     else if (conf->format == CAMQP_FORMAT_JSON)
         props.content_type = amqp_cstring_bytes("application/json");
+    else if (conf->format == CAMQP_FORMAT_GRAPHITE)
+        props.content_type = amqp_cstring_bytes("text/graphite");
     else
         assert (23 == 42);
     props.delivery_mode = conf->delivery_mode;
@@ -777,6 +788,17 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
         format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
         format_json_finalize (buffer, &bfill, &bfree);
     }
+    else if (conf->format == CAMQP_FORMAT_GRAPHITE)
+    {
+        status = format_graphite (buffer, sizeof (buffer), ds, vl,
+                    conf->prefix, conf->postfix, conf->escape_char);
+        if (status != 0)
+        {
+            ERROR ("amqp plugin: format_graphite failed with status %i.",
+                    status);
+            return (status);
+        }
+    }
     else
     {
         ERROR ("amqp plugin: Invalid format (%i).", conf->format);
@@ -809,6 +831,8 @@ static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */
         conf->format = CAMQP_FORMAT_COMMAND;
     else if (strcasecmp ("JSON", string) == 0)
         conf->format = CAMQP_FORMAT_JSON;
+    else if (strcasecmp ("Graphite", string) == 0)
+        conf->format = CAMQP_FORMAT_GRAPHITE;
     else
     {
         WARNING ("amqp plugin: Invalid format string: %s",
@@ -849,6 +873,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     /* publish only */
     conf->delivery_mode = CAMQP_DM_VOLATILE;
     conf->store_rates = 0;
+    /* publish & graphite only */
+    conf->prefix = NULL;
+    conf->postfix = NULL;
+    conf->escape_char = '_';
     /* subscribe only */
     conf->exchange_type = NULL;
     conf->queue = NULL;
@@ -906,6 +934,20 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
             status = cf_util_get_boolean (child, &conf->store_rates);
         else if ((strcasecmp ("Format", child->key) == 0) && publish)
             status = camqp_config_set_format (child, conf);
+        else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish)
+            status = cf_util_get_string (child, &conf->prefix);
+        else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish)
+            status = cf_util_get_string (child, &conf->postfix);
+        else if ((strcasecmp ("GraphiteEscapeChar", child->key) == 0) && publish)
+        {
+            char *tmp_buff = NULL;
+            status = cf_util_get_string (child, &tmp_buff);
+            if (strlen (tmp_buff) > 1)
+                WARNING ("amqp plugin: The option \"GraphiteEscapeChar\" handles "
+                        "only one character. Others will be ignored.");
+            conf->escape_char = tmp_buff[0];
+            sfree (tmp_buff);
+        }
         else
             WARNING ("amqp plugin: Ignoring unknown "
                     "configuration option \"%s\".", child->key);
index f3ef675..7f3e780 100644 (file)
 #</Plugin>
 
 #<Plugin memcached>
-#      Host "127.0.0.1"
-#      Port "11211"
+#      <Instance "local">
+#              Host "127.0.0.1"
+#              Port "11211"
+#      </Instance>
 #</Plugin>
 
 #<Plugin modbus>
 
 #<Plugin "swap">
 #      ReportByDevice false
+#      ReportBytes true
 #</Plugin>
 
 #<Plugin "table">
index c025f94..774d918 100644 (file)
@@ -220,6 +220,8 @@ possibly filtering or messages.
  #   Persistent false
  #   Format "command"
  #   StoreRates false
+ #   GraphitePrefix "collectd."
+ #   GraphiteEscapeChar "_"
    </Publish>
    
    # Receive values from an AMQP broker
@@ -320,6 +322,10 @@ If set to B<JSON>, the values are encoded in the I<JavaScript Object Notation>,
 an easy and straight forward exchange format. The C<Content-Type> header field
 will be set to C<application/json>.
 
+If set to B<Graphite>, values are encoded in the I<Graphite> format, which is
+"<metric> <value> <timestamp>\n". The C<Content-Type> header field will be set to
+C<text/graphite>.
+
 A subscribing client I<should> use the C<Content-Type> header field to
 determine how to decode the values. Currently, the I<AMQP plugin> itself can
 only decode the B<Command> format.
@@ -334,6 +340,25 @@ using the internal value cache.
 Please note that currently this option is only used if the B<Format> option has
 been set to B<JSON>.
 
+=item B<GraphitePrefix> (Publish and B<Format>=I<Graphite> only)
+
+A prefix can be added in the metric name when outputting in the I<Graphite> format.
+It's added before the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphitePostfix> (Publish and B<Format>=I<Graphite> only)
+
+A postfix can be added in the metric name when outputting in the I<Graphite> format.
+It's added after the I<Host> name.
+Metric name will be "<prefix><host><postfix><plugin><type><name>"
+
+=item B<GraphiteEscapeChar> (Publish and B<Format>=I<Graphite> only)
+
+Specify a character to replace dots (.) in the host part of the metric name.
+In I<Graphite> metric name, dots are used as separators between different
+metric parts (host, plugin, type).
+Default is "_" (I<Underscore>).
+
 =back
 
 =head2 Plugin C<apache>
@@ -1949,6 +1974,17 @@ The C<memcached plugin> connects to a memcached server and queries statistics
 about cache utilization, memory and bandwidth used.
 L<http://www.danga.com/memcached/>
 
+ <Plugin "memcached">
+   <Instance "name">
+     Host "memcache.example.com"
+     Port 11211
+   </Instance>
+ </Plugin>
+
+The plugin configuration consists of one or more B<Instance> blocks which
+specify one I<memcached> connection each. Within the B<Instance> blocks, the
+following options are allowed:
+
 =over 4
 
 =item B<Host> I<Hostname>
@@ -1959,6 +1995,11 @@ Hostname to connect to. Defaults to B<127.0.0.1>.
 
 TCP-Port to connect to. Defaults to B<11211>.
 
+=item B<Socket> I<Path>
+
+Connect to I<memcached> using the UNIX domain socket at I<Path>. If this
+setting is given, the B<Host> and B<Port> settings are ignored.
+
 =back
 
 =head2 Plugin C<modbus>
@@ -3278,6 +3319,11 @@ values submitted to the daemon. Other than that, that name is not used.
 Defines the "database alias" or "service name" to connect to. Usually, these
 names are defined in the file named C<$ORACLE_HOME/network/admin/tnsnames.ora>.
 
+=item B<Host> I<Host>
+
+Hostname to use when dispatching values for this database. Defaults to using
+the global hostname of the I<collectd> instance.
+
 =item B<Username> I<Username>
 
 Username used for authentication.
@@ -4066,6 +4112,10 @@ The B<Port> option is the TCP port on which the Redis instance accepts
 connections. Either a service name of a port number may be given. Please note
 that numerical port numbers must be given as a string, too.
 
+=item B<Password> I<Password>
+
+Use I<Password> to authenticate when connecting to I<Redis>.
+
 =item B<Timeout> I<Timeout in miliseconds>
 
 The B<Timeout> option set the socket timeout for node response. Since the Redis
@@ -4295,6 +4345,11 @@ and available space of each device will be reported separately.
 This option is only available if the I<Swap plugin> can read C</proc/swaps>
 (under Linux) or use the L<swapctl(2)> mechanism (under I<Solaris>).
 
+=item B<ReportBytes> B<false>|B<true>
+
+When enabled, the I<swap I/O> is reported in bytes. When disabled, the default,
+I<swap I/O> is reported in pages. This option is available under Linux only.
+
 =back
 
 =head2 Plugin C<syslog>
index 48fa713..a09f45e 100644 (file)
@@ -1,9 +1,10 @@
 /**
  * collectd - src/memcached.c, based on src/hddtemp.c
  * Copyright (C) 2007       Antony Dovgal
- * Copyright (C) 2007-2010  Florian Forster
+ * Copyright (C) 2007-2012  Florian Forster
  * Copyright (C) 2009       Doug MacEachern
  * Copyright (C) 2009       Franck Lombardi
+ * Copyright (C) 2012       Nicolas Szalay
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -24,6 +25,7 @@
  *   Florian octo Forster <octo at collectd.org>
  *   Doug MacEachern <dougm at hyperic.com>
  *   Franck Lombardi
+ *   Nicolas Szalay
  **/
 
 #include "collectd.h"
 #include "plugin.h"
 #include "configfile.h"
 
-# include <poll.h>
-# include <netdb.h>
-# include <sys/socket.h>
-# include <sys/un.h>
-# include <netinet/in.h>
-# include <netinet/tcp.h>
-
-/* Hack to work around the missing define in AIX */
-#ifndef MSG_DONTWAIT
-# define MSG_DONTWAIT MSG_NONBLOCK
-#endif
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
 
 #define MEMCACHED_DEF_HOST "127.0.0.1"
 #define MEMCACHED_DEF_PORT "11211"
 
-#define MEMCACHED_RETRY_COUNT 100
-
-static const char *config_keys[] =
+struct memcached_s
 {
-       "Socket",
-       "Host",
-       "Port"
+  char *name;
+  char *socket;
+  char *host;
+  char *port;
 };
-static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+typedef struct memcached_s memcached_t;
 
-static char *memcached_socket = NULL;
-static char *memcached_host = NULL;
-static char memcached_port[16];
+static _Bool memcached_have_instances = 0;
 
-static int memcached_query_daemon (char *buffer, int buffer_size) /* {{{ */
+static void memcached_free (memcached_t *st)
 {
-       int fd;
-       ssize_t status;
-       int buffer_fill;
-       int i = 0;
-
-       if (memcached_socket != NULL) {
-               struct sockaddr_un serv_addr;
-
-               memset (&serv_addr, 0, sizeof (serv_addr));
-               serv_addr.sun_family = AF_UNIX;
-               sstrncpy (serv_addr.sun_path, memcached_socket,
-                               sizeof (serv_addr.sun_path));
-
-               /* create our socket descriptor */
-               fd = socket (AF_UNIX, SOCK_STREAM, 0);
-               if (fd < 0) {
-                       char errbuf[1024];
-                       ERROR ("memcached: unix socket: %s", sstrerror (errno, errbuf,
-                                               sizeof (errbuf)));
-                       return -1;
-               }
-
-               /* connect to the memcached daemon */
-               status = (ssize_t) connect (fd, (struct sockaddr *) &serv_addr,
-                               sizeof (serv_addr));
-               if (status != 0) {
-                       shutdown (fd, SHUT_RDWR);
-                       close (fd);
-                       fd = -1;
-               }
-       }
-       else { /* if (memcached_socket == NULL) */
-               const char *host;
-               const char *port;
-
-               struct addrinfo  ai_hints;
-               struct addrinfo *ai_list, *ai_ptr;
-               int              ai_return = 0;
-
-               memset (&ai_hints, '\0', sizeof (ai_hints));
-               ai_hints.ai_flags    = 0;
+  if (st == NULL)
+    return;
+
+  sfree (st->name);
+  sfree (st->socket);
+  sfree (st->host);
+  sfree (st->port);
+}
+
+static int memcached_connect_unix (memcached_t *st)
+{
+  struct sockaddr_un serv_addr;
+  int fd;
+
+  memset (&serv_addr, 0, sizeof (serv_addr));
+  serv_addr.sun_family = AF_UNIX;
+  sstrncpy (serv_addr.sun_path, st->socket,
+      sizeof (serv_addr.sun_path));
+
+  /* create our socket descriptor */
+  fd = socket (AF_UNIX, SOCK_STREAM, 0);
+  if (fd < 0)
+  {
+    char errbuf[1024];
+    ERROR ("memcached plugin: memcached_connect_unix: socket(2) failed: %s",
+        sstrerror (errno, errbuf, sizeof (errbuf)));
+    return (-1);
+  }
+
+  return (fd);
+} /* int memcached_connect_unix */
+
+static int memcached_connect_inet (memcached_t *st)
+{
+  char *host;
+  char *port;
+
+  struct addrinfo  ai_hints;
+  struct addrinfo *ai_list, *ai_ptr;
+  int status;
+  int fd = -1;
+
+  memset (&ai_hints, 0, sizeof (ai_hints));
+  ai_hints.ai_flags    = 0;
 #ifdef AI_ADDRCONFIG
-               /*      ai_hints.ai_flags   |= AI_ADDRCONFIG; */
+  ai_hints.ai_flags   |= AI_ADDRCONFIG;
 #endif
-               ai_hints.ai_family   = AF_INET;
-               ai_hints.ai_socktype = SOCK_STREAM;
-               ai_hints.ai_protocol = 0;
-
-               host = memcached_host;
-               if (host == NULL) {
-                       host = MEMCACHED_DEF_HOST;
-               }
-
-               port = memcached_port;
-               if (strlen (port) == 0) {
-                       port = MEMCACHED_DEF_PORT;
-               }
-
-               if ((ai_return = getaddrinfo (host, port, &ai_hints, &ai_list)) != 0) {
-                       char errbuf[1024];
-                       ERROR ("memcached: getaddrinfo (%s, %s): %s",
-                                       host, port,
-                                       (ai_return == EAI_SYSTEM)
-                                       ? sstrerror (errno, errbuf, sizeof (errbuf))
-                                       : gai_strerror (ai_return));
-                       return -1;
-               }
-
-               fd = -1;
-               for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) {
-                       /* create our socket descriptor */
-                       fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
-                       if (fd < 0) {
-                               char errbuf[1024];
-                               ERROR ("memcached: socket: %s", sstrerror (errno, errbuf, sizeof (errbuf)));
-                               continue;
-                       }
-
-                       /* connect to the memcached daemon */
-                       status = (ssize_t) connect (fd, (struct sockaddr *) ai_ptr->ai_addr, ai_ptr->ai_addrlen);
-                       if (status != 0) {
-                               shutdown (fd, SHUT_RDWR);
-                               close (fd);
-                               fd = -1;
-                               continue;
-                       }
-
-                       /* A socket could be opened and connecting succeeded. We're
-                        * done. */
-                       break;
-               }
-
-               freeaddrinfo (ai_list);
-       }
-
-       if (fd < 0) {
-               ERROR ("memcached: Could not connect to daemon.");
-               return -1;
-       }
-
-       if (send(fd, "stats\r\n", sizeof("stats\r\n") - 1, MSG_DONTWAIT) != (sizeof("stats\r\n") - 1)) {
-               ERROR ("memcached: Could not send command to the memcached daemon.");
-               return -1;
-       }
-
-       {
-               struct pollfd p;
-               int status;
-
-               memset (&p, 0, sizeof (p));
-               p.fd = fd;
-               p.events = POLLIN | POLLERR | POLLHUP;
-               p.revents = 0;
-
-               status = poll (&p, /* nfds = */ 1,
-                               /* timeout = */ CDTIME_T_TO_MS (interval_g));
-               if (status <= 0)
-               {
-                       if (status == 0)
-                       {
-                               ERROR ("memcached: poll(2) timed out after %.3f seconds.",
-                                               CDTIME_T_TO_DOUBLE (interval_g));
-                       }
-                       else
-                       {
-                               char errbuf[1024];
-                               ERROR ("memcached: poll(2) failed: %s",
-                                               sstrerror (errno, errbuf, sizeof (errbuf)));
-                       }
-                       shutdown (fd, SHUT_RDWR);
-                       close (fd);
-                       return (-1);
-               }
-       }
-
-       /* receive data from the memcached daemon */
-       memset (buffer, '\0', buffer_size);
-
-       buffer_fill = 0;
-       while ((status = recv (fd, buffer + buffer_fill, buffer_size - buffer_fill, MSG_DONTWAIT)) != 0) {
-               if (i > MEMCACHED_RETRY_COUNT) {
-                       ERROR("recv() timed out");
-                       break;
-               }
-               i++;
-
-               if (status == -1) {
-                       char errbuf[1024];
-
-                       if (errno == EAGAIN) {
-                               continue;
-                       }
-
-                       ERROR ("memcached: Error reading from socket: %s",
-                                       sstrerror (errno, errbuf, sizeof (errbuf)));
-                       shutdown(fd, SHUT_RDWR);
-                       close (fd);
-                       return -1;
-               }
-               buffer_fill += status;
-
-               if (buffer_fill > 3 && buffer[buffer_fill-5] == 'E' && buffer[buffer_fill-4] == 'N' && buffer[buffer_fill-3] == 'D') {
-                       /* we got all the data */
-                       break;
-               }
-       }
-
-       if (buffer_fill >= buffer_size) {
-               buffer[buffer_size - 1] = '\0';
-               WARNING ("memcached: Message from memcached has been truncated.");
-       } else if (buffer_fill == 0) {
-               WARNING ("memcached: Peer has unexpectedly shut down the socket. "
-                               "Buffer: `%s'", buffer);
-               shutdown(fd, SHUT_RDWR);
-               close(fd);
-               return -1;
-       }
-
-       shutdown(fd, SHUT_RDWR);
-       close(fd);
-       return 0;
+  ai_hints.ai_family   = AF_UNSPEC;
+  ai_hints.ai_socktype = SOCK_STREAM;
+  ai_hints.ai_protocol = 0;
+
+  host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST;
+  port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT;
+
+  ai_list = NULL;
+  status = getaddrinfo (host, port, &ai_hints, &ai_list);
+  if (status != 0)
+  {
+    char errbuf[1024];
+    ERROR ("memcached plugin: memcached_connect_inet: "
+        "getaddrinfo(%s,%s) failed: %s",
+        host, port,
+        (status == EAI_SYSTEM)
+        ? sstrerror (errno, errbuf, sizeof (errbuf))
+        : gai_strerror (status));
+    return (-1);
+  }
+
+  for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+  {
+    /* create our socket descriptor */
+    fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+    if (fd < 0)
+    {
+      char errbuf[1024];
+      WARNING ("memcached plugin: memcached_connect_inet: "
+          "socket(2) failed: %s",
+          sstrerror (errno, errbuf, sizeof (errbuf)));
+      continue;
+    }
+
+    /* connect to the memcached daemon */
+    status = (int) connect (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    if (status != 0)
+    {
+      shutdown (fd, SHUT_RDWR);
+      close (fd);
+      fd = -1;
+      continue;
+    }
+
+    /* A socket could be opened and connecting succeeded. We're done. */
+    break;
+  }
+
+  freeaddrinfo (ai_list);
+  return (fd);
+} /* int memcached_connect_inet */
+
+static int memcached_connect (memcached_t *st)
+{
+  if (st->socket != NULL)
+    return (memcached_connect_unix (st));
+  else
+    return (memcached_connect_inet (st));
 }
-/* }}} */
 
-static int memcached_config (const char *key, const char *value) /* {{{ */
+static int memcached_query_daemon (char *buffer, size_t buffer_size, memcached_t *st)
 {
-       if (strcasecmp (key, "Socket") == 0) {
-               if (memcached_socket != NULL) {
-                       free (memcached_socket);
-               }
-               memcached_socket = strdup (value);
-       } else if (strcasecmp (key, "Host") == 0) {
-               if (memcached_host != NULL) {
-                       free (memcached_host);
-               }
-               memcached_host = strdup (value);
-       } else if (strcasecmp (key, "Port") == 0) {
-               int port = (int) (atof (value));
-               if ((port > 0) && (port <= 65535)) {
-                       ssnprintf (memcached_port, sizeof (memcached_port), "%i", port);
-               } else {
-                       sstrncpy (memcached_port, value, sizeof (memcached_port));
-               }
-       } else {
-               return -1;
-       }
-
-       return 0;
+  int fd = -1;
+  int status;
+  size_t buffer_fill;
+
+  fd = memcached_connect (st);
+  if (fd < 0) {
+    ERROR ("memcached plugin: Instance \"%s\" could not connect to daemon.",
+        st->name);
+    return -1;
+  }
+
+  status = (int) swrite (fd, "stats\r\n", strlen ("stats\r\n"));
+  if (status != 0)
+  {
+    char errbuf[1024];
+    ERROR ("memcached plugin: write(2) failed: %s",
+        sstrerror (errno, errbuf, sizeof (errbuf)));
+    shutdown(fd, SHUT_RDWR);
+    close (fd);
+    return (-1);
+  }
+
+  /* receive data from the memcached daemon */
+  memset (buffer, 0, buffer_size);
+
+  buffer_fill = 0;
+  while ((status = (int) recv (fd, buffer + buffer_fill,
+          buffer_size - buffer_fill, /* flags = */ 0)) != 0)
+  {
+    char const end_token[5] = {'E', 'N', 'D', '\r', '\n'};
+    if (status < 0)
+    {
+      char errbuf[1024];
+
+      if ((errno == EAGAIN) || (errno == EINTR))
+          continue;
+
+      ERROR ("memcached: Error reading from socket: %s",
+          sstrerror (errno, errbuf, sizeof (errbuf)));
+      shutdown(fd, SHUT_RDWR);
+      close (fd);
+      return (-1);
+    }
+
+    buffer_fill += (size_t) status;
+    if (buffer_fill > buffer_size)
+    {
+      buffer_fill = buffer_size;
+      WARNING ("memcached plugin: Message was truncated.");
+      break;
+    }
+
+    /* If buffer ends in end_token, we have all the data. */
+    if (memcmp (buffer + buffer_fill - sizeof (end_token),
+          end_token, sizeof (end_token)) == 0)
+      break;
+  } /* while (recv) */
+
+  status = 0;
+  if (buffer_fill == 0)
+  {
+    WARNING ("memcached plugin: No data returned by memcached.");
+    status = -1;
+  }
+
+  shutdown(fd, SHUT_RDWR);
+  close(fd);
+  return (status);
+} /* int memcached_query_daemon */
+
+static void memcached_init_vl (value_list_t *vl, memcached_t const *st)
+{
+  sstrncpy (vl->plugin, "memcached", sizeof (vl->plugin));
+  if (strcmp (st->name, "__legacy__") == 0) /* legacy mode */
+  {
+    sstrncpy (vl->host, hostname_g, sizeof (vl->host));
+  }
+  else
+  {
+    if (st->socket != NULL)
+      sstrncpy (vl->host, hostname_g, sizeof (vl->host));
+    else
+      sstrncpy (vl->host,
+          (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST,
+          sizeof (vl->host));
+    sstrncpy (vl->plugin_instance, st->name, sizeof (vl->plugin_instance));
+  }
 }
-/* }}} */
 
 static void submit_derive (const char *type, const char *type_inst,
-               derive_t value) /* {{{ */
+    derive_t value, memcached_t *st)
 {
-       value_t values[1];
-       value_list_t vl = VALUE_LIST_INIT;
+  value_t values[1];
+  value_list_t vl = VALUE_LIST_INIT;
+  memcached_init_vl (&vl, st);
 
-       values[0].derive = value;
+  values[0].derive = value;
 
-       vl.values = values;
-       vl.values_len = 1;
-       sstrncpy (vl.host, hostname_g, sizeof (vl.host));
-       sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
-       sstrncpy (vl.type, type, sizeof (vl.type));
-       if (type_inst != NULL)
-               sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+  vl.values = values;
+  vl.values_len = 1;
+  sstrncpy (vl.type, type, sizeof (vl.type));
+  if (type_inst != NULL)
+    sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
 
-       plugin_dispatch_values (&vl);
-} /* void memcached_submit_cmd */
-/* }}} */
+  plugin_dispatch_values (&vl);
+}
 
 static void submit_derive2 (const char *type, const char *type_inst,
-               derive_t value0, derive_t value1) /* {{{ */
+    derive_t value0, derive_t value1, memcached_t *st)
 {
-       value_t values[2];
-       value_list_t vl = VALUE_LIST_INIT;
+  value_t values[2];
+  value_list_t vl = VALUE_LIST_INIT;
+  memcached_init_vl (&vl, st);
 
-       values[0].derive = value0;
-       values[1].derive = value1;
+  values[0].derive = value0;
+  values[1].derive = value1;
 
-       vl.values = values;
-       vl.values_len = 2;
-       sstrncpy (vl.host, hostname_g, sizeof (vl.host));
-       sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
-       sstrncpy (vl.type, type, sizeof (vl.type));
-       if (type_inst != NULL)
-               sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+  vl.values = values;
+  vl.values_len = 2;
+  sstrncpy (vl.type, type, sizeof (vl.type));
+  if (type_inst != NULL)
+    sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
 
-       plugin_dispatch_values (&vl);
-} /* void memcached_submit_cmd */
-/* }}} */
+  plugin_dispatch_values (&vl);
+}
 
 static void submit_gauge (const char *type, const char *type_inst,
-               gauge_t value) /* {{{ */
+    gauge_t value, memcached_t *st)
 {
-       value_t values[1];
-       value_list_t vl = VALUE_LIST_INIT;
+  value_t values[1];
+  value_list_t vl = VALUE_LIST_INIT;
+  memcached_init_vl (&vl, st);
 
-       values[0].gauge = value;
+  values[0].gauge = value;
 
-       vl.values = values;
-       vl.values_len = 1;
-       sstrncpy (vl.host, hostname_g, sizeof (vl.host));
-       sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
-       sstrncpy (vl.type, type, sizeof (vl.type));
-       if (type_inst != NULL)
-               sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+  vl.values = values;
+  vl.values_len = 1;
+  sstrncpy (vl.type, type, sizeof (vl.type));
+  if (type_inst != NULL)
+    sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
 
-       plugin_dispatch_values (&vl);
+  plugin_dispatch_values (&vl);
 }
-/* }}} */
 
 static void submit_gauge2 (const char *type, const char *type_inst,
-               gauge_t value0, gauge_t value1) /* {{{ */
+    gauge_t value0, gauge_t value1, memcached_t *st)
 {
-       value_t values[2];
-       value_list_t vl = VALUE_LIST_INIT;
+  value_t values[2];
+  value_list_t vl = VALUE_LIST_INIT;
+  memcached_init_vl (&vl, st);
 
-       values[0].gauge = value0;
-       values[1].gauge = value1;
+  values[0].gauge = value0;
+  values[1].gauge = value1;
 
-       vl.values = values;
-       vl.values_len = 2;
-       sstrncpy (vl.host, hostname_g, sizeof (vl.host));
-       sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin));
-       sstrncpy (vl.type, type, sizeof (vl.type));
-       if (type_inst != NULL)
-               sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
+  vl.values = values;
+  vl.values_len = 2;
+  sstrncpy (vl.type, type, sizeof (vl.type));
+  if (type_inst != NULL)
+    sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance));
 
-       plugin_dispatch_values (&vl);
+  plugin_dispatch_values (&vl);
 }
-/* }}} */
 
-static int memcached_read (void) /* {{{ */
+static int memcached_read (user_data_t *user_data)
 {
-       char buf[4096];
-       char *fields[3];
-       char *ptr;
-       char *line;
-       char *saveptr;
-       int fields_num;
-
-       gauge_t bytes_used = NAN;
-       gauge_t bytes_total = NAN;
-       gauge_t hits = NAN;
-       gauge_t gets = NAN;
-       derive_t rusage_user = 0;
-       derive_t rusage_syst = 0;
-       derive_t octets_rx = 0;
-       derive_t octets_tx = 0;
-
-       /* get data from daemon */
-       if (memcached_query_daemon (buf, sizeof (buf)) < 0) {
-               return -1;
-       }
+  char buf[4096];
+  char *fields[3];
+  char *ptr;
+  char *line;
+  char *saveptr;
+  int fields_num;
+
+  gauge_t bytes_used = NAN;
+  gauge_t bytes_total = NAN;
+  gauge_t hits = NAN;
+  gauge_t gets = NAN;
+  derive_t rusage_user = 0;
+  derive_t rusage_syst = 0;
+  derive_t octets_rx = 0;
+  derive_t octets_tx = 0;
+
+  memcached_t *st;
+  st = user_data->data;
+
+  /* get data from daemon */
+  if (memcached_query_daemon (buf, sizeof (buf), st) < 0) {
+    return -1;
+  }
 
 #define FIELD_IS(cnst) \
-       (((sizeof(cnst) - 1) == name_len) && (strcmp (cnst, fields[1]) == 0))
-
-       ptr = buf;
-       saveptr = NULL;
-       while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL)
-       {
-               int name_len;
-
-               ptr = NULL;
-
-               fields_num = strsplit(line, fields, 3);
-               if (fields_num != 3)
-                       continue;
-
-               name_len = strlen(fields[1]);
-               if (name_len == 0)
-                       continue;
-
-               /*
-                * For an explanation on these fields please refer to
-                * <http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt>
-                */
-
-               /*
-                * CPU time consumed by the memcached process
-                */
-               if (FIELD_IS ("rusage_user"))
-               {
-                       rusage_user = atoll (fields[2]);
-               }
-               else if (FIELD_IS ("rusage_system"))
-               {
-                       rusage_syst = atoll(fields[2]);
-               }
-
-               /*
-                * Number of threads of this instance
-                */
-               else if (FIELD_IS ("threads"))
-               {
-                       submit_gauge2 ("ps_count", NULL, NAN, atof (fields[2]));
-               }
-
-               /*
-                * Number of items stored
-                */
-               else if (FIELD_IS ("curr_items"))
-               {
-                       submit_gauge ("memcached_items", "current", atof (fields[2]));
-               }
-
-               /*
-                * Number of bytes used and available (total - used)
-                */
-               else if (FIELD_IS ("bytes"))
-               {
-                       bytes_used = atof (fields[2]);
-               }
-               else if (FIELD_IS ("limit_maxbytes"))
-               {
-                       bytes_total = atof(fields[2]);
-               }
-
-               /*
-                * Connections
-                */
-               else if (FIELD_IS ("curr_connections"))
-               {
-                       submit_gauge ("memcached_connections", "current", atof (fields[2]));
-               }
-
-               /*
-                * Commands
-                */
-               else if ((name_len > 4) && (strncmp (fields[1], "cmd_", 4) == 0))
-               {
-                       const char *name = fields[1] + 4;
-                       submit_derive ("memcached_command", name, atoll (fields[2]));
-                       if (strcmp (name, "get") == 0)
-                               gets = atof (fields[2]);
-               }
-
-               /*
-                * Operations on the cache, i. e. cache hits, cache misses and evictions of items
-                */
-               else if (FIELD_IS ("get_hits"))
-               {
-                       submit_derive ("memcached_ops", "hits", atoll (fields[2]));
-                       hits = atof (fields[2]);
-               }
-               else if (FIELD_IS ("get_misses"))
-               {
-                       submit_derive ("memcached_ops", "misses", atoll (fields[2]));
-               }
-               else if (FIELD_IS ("evictions"))
-               {
-                       submit_derive ("memcached_ops", "evictions", atoll (fields[2]));
-               }
-
-               /*
-                * Network traffic
-                */
-               else if (FIELD_IS ("bytes_read"))
-               {
-                       octets_rx = atoll (fields[2]);
-               }
-               else if (FIELD_IS ("bytes_written"))
-               {
-                       octets_tx = atoll (fields[2]);
-               }
-       } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */
-
-       if (!isnan (bytes_used) && !isnan (bytes_total) && (bytes_used <= bytes_total))
-               submit_gauge2 ("df", "cache", bytes_used, bytes_total - bytes_used);
-
-       if ((rusage_user != 0) || (rusage_syst != 0))
-               submit_derive2 ("ps_cputime", NULL, rusage_user, rusage_syst);
-
-       if ((octets_rx != 0) || (octets_tx != 0))
-               submit_derive2 ("memcached_octets", NULL, octets_rx, octets_tx);
-
-       if (!isnan (gets) && !isnan (hits))
-       {
-               gauge_t rate = NAN;
-
-               if (gets != 0.0)
-                       rate = 100.0 * hits / gets;
-
-               submit_gauge ("percent", "hitratio", rate);
-       }
-
-       return 0;
+  (((sizeof(cnst) - 1) == name_len) && (strcmp (cnst, fields[1]) == 0))
+
+  ptr = buf;
+  saveptr = NULL;
+  while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL)
+  {
+    int name_len;
+
+    ptr = NULL;
+
+    fields_num = strsplit(line, fields, 3);
+    if (fields_num != 3)
+      continue;
+
+    name_len = strlen(fields[1]);
+    if (name_len == 0)
+      continue;
+
+    /*
+     * For an explanation on these fields please refer to
+     * <http://code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt>
+     */
+
+    /*
+     * CPU time consumed by the memcached process
+     */
+    if (FIELD_IS ("rusage_user"))
+    {
+      rusage_user = atoll (fields[2]);
+    }
+    else if (FIELD_IS ("rusage_system"))
+    {
+      rusage_syst = atoll(fields[2]);
+    }
+
+    /*
+     * Number of threads of this instance
+     */
+    else if (FIELD_IS ("threads"))
+    {
+      submit_gauge2 ("ps_count", NULL, NAN, atof (fields[2]), st);
+    }
+
+    /*
+     * Number of items stored
+     */
+    else if (FIELD_IS ("curr_items"))
+    {
+      submit_gauge ("memcached_items", "current", atof (fields[2]), st);
+    }
+
+    /*
+     * Number of bytes used and available (total - used)
+     */
+    else if (FIELD_IS ("bytes"))
+    {
+      bytes_used = atof (fields[2]);
+    }
+    else if (FIELD_IS ("limit_maxbytes"))
+    {
+      bytes_total = atof(fields[2]);
+    }
+
+    /*
+     * Connections
+     */
+    else if (FIELD_IS ("curr_connections"))
+    {
+      submit_gauge ("memcached_connections", "current", atof (fields[2]), st);
+    }
+
+    /*
+     * Commands
+     */
+    else if ((name_len > 4) && (strncmp (fields[1], "cmd_", 4) == 0))
+    {
+      const char *name = fields[1] + 4;
+      submit_derive ("memcached_command", name, atoll (fields[2]), st);
+      if (strcmp (name, "get") == 0)
+        gets = atof (fields[2]);
+    }
+
+    /*
+     * Operations on the cache, i. e. cache hits, cache misses and evictions of items
+     */
+    else if (FIELD_IS ("get_hits"))
+    {
+      submit_derive ("memcached_ops", "hits", atoll (fields[2]), st);
+      hits = atof (fields[2]);
+    }
+    else if (FIELD_IS ("get_misses"))
+    {
+      submit_derive ("memcached_ops", "misses", atoll (fields[2]), st);
+    }
+    else if (FIELD_IS ("evictions"))
+    {
+      submit_derive ("memcached_ops", "evictions", atoll (fields[2]), st);
+    }
+
+    /*
+     * Network traffic
+     */
+    else if (FIELD_IS ("bytes_read"))
+    {
+      octets_rx = atoll (fields[2]);
+    }
+    else if (FIELD_IS ("bytes_written"))
+    {
+      octets_tx = atoll (fields[2]);
+    }
+  } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */
+
+  if (!isnan (bytes_used) && !isnan (bytes_total) && (bytes_used <= bytes_total))
+    submit_gauge2 ("df", "cache", bytes_used, bytes_total - bytes_used, st);
+
+  if ((rusage_user != 0) || (rusage_syst != 0))
+    submit_derive2 ("ps_cputime", NULL, rusage_user, rusage_syst, st);
+
+  if ((octets_rx != 0) || (octets_tx != 0))
+    submit_derive2 ("memcached_octets", NULL, octets_rx, octets_tx, st);
+
+  if (!isnan (gets) && !isnan (hits))
+  {
+    gauge_t rate = NAN;
+
+    if (gets != 0.0)
+      rate = 100.0 * hits / gets;
+
+    submit_gauge ("percent", "hitratio", rate, st);
+  }
+
+  return 0;
+} /* int memcached_read */
+
+static int memcached_add_read_callback (memcached_t *st)
+{
+  user_data_t ud;
+  char callback_name[3*DATA_MAX_NAME_LEN];
+  int status;
+
+  memset (&ud, 0, sizeof (ud));
+  ud.data = st;
+  ud.free_func = (void *) memcached_free;
+
+  assert (st->name != NULL);
+  ssnprintf (callback_name, sizeof (callback_name), "memcached/%s", st->name);
+
+  status = plugin_register_complex_read (/* group = */ "memcached",
+      /* name      = */ callback_name,
+      /* callback  = */ memcached_read,
+      /* interval  = */ NULL,
+      /* user_data = */ &ud);
+  return (status);
+} /* int memcached_add_read_callback */
+
+/* Configuration handling functiions
+ * <Plugin memcached>
+ *   <Instance "instance_name">
+ *     Host foo.zomg.com
+ *     Port "1234"
+ *   </Instance>
+ * </Plugin>
+ */
+static int config_add_instance(oconfig_item_t *ci)
+{
+  memcached_t *st;
+  int i;
+  int status = 0;
+
+  /* Disable automatic generation of default instance in the init callback. */
+  memcached_have_instances = 1;
+
+  st = malloc (sizeof (*st));
+  if (st == NULL)
+  {
+    ERROR ("memcached plugin: malloc failed.");
+    return (-1);
+  }
+
+  memset (st, 0, sizeof (*st));
+  st->name = NULL;
+  st->socket = NULL;
+  st->host = NULL;
+  st->port = NULL;
+
+  if (strcasecmp (ci->key, "Plugin") == 0) /* default instance */
+    st->name = sstrdup ("__legacy__");
+  else /* <Instance /> block */
+    status = cf_util_get_string (ci, &st->name);
+  if (status != 0)
+  {
+    sfree (st);
+    return (status);
+  }
+  assert (st->name != NULL);
+
+  for (i = 0; i < ci->children_num; i++)
+  {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp ("Socket", child->key) == 0)
+      status = cf_util_get_string (child, &st->socket);
+    else if (strcasecmp ("Host", child->key) == 0)
+      status = cf_util_get_string (child, &st->host);
+    else if (strcasecmp ("Port", child->key) == 0)
+      status = cf_util_get_service (child, &st->port);
+    else
+    {
+      WARNING ("memcached plugin: Option `%s' not allowed here.",
+          child->key);
+      status = -1;
+    }
+
+    if (status != 0)
+      break;
+  }
+
+  if (status == 0)
+    status = memcached_add_read_callback (st);
+
+  if (status != 0)
+  {
+    memcached_free(st);
+    return (-1);
+  }
+
+  return (0);
 }
-/* }}} */
 
-void module_register (void) /* {{{ */
+static int memcached_config (oconfig_item_t *ci)
 {
-       plugin_register_config ("memcached", memcached_config, config_keys, config_keys_num);
-       plugin_register_read ("memcached", memcached_read);
+  int status = 0;
+  _Bool have_instance_block = 0;
+  int i;
+
+  for (i = 0; i < ci->children_num; i++)
+  {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp ("Instance", child->key) == 0)
+    {
+      config_add_instance (child);
+      have_instance_block = 1;
+    }
+    else if (!have_instance_block)
+    {
+      /* Non-instance option: Assume legacy configuration (without <Instance />
+       * blocks) and call config_add_instance() with the <Plugin /> block. */
+      return (config_add_instance (ci));
+    }
+    else
+      WARNING ("memcached plugin: The configuration option "
+          "\"%s\" is not allowed here. Did you "
+          "forget to add an <Instance /> block "
+          "around the configuration?",
+          child->key);
+  } /* for (ci->children) */
+
+  return (status);
 }
-/* }}} */
-
-/*
- * Local variables:
- * tab-width: 4
- * c-basic-offset: 4
- * End:
- * vim600: sw=4 ts=4 fdm=marker noexpandtab
- * vim<600: sw=4 ts=4 noexpandtab
- */
 
+static int memcached_init (void)
+{
+  memcached_t *st;
+  int status;
+
+  if (memcached_have_instances)
+    return (0);
+
+  /* No instances were configured, lets start a default instance. */
+  st = malloc (sizeof (*st));
+  if (st == NULL)
+    return (ENOMEM);
+  memset (st, 0, sizeof (*st));
+  st->name = sstrdup ("__legacy__");
+  st->socket = NULL;
+  st->host = NULL;
+  st->port = NULL;
+
+  status = memcached_add_read_callback (st);
+  if (status == 0)
+    memcached_have_instances = 1;
+  else
+    memcached_free (st);
+
+  return (status);
+} /* int memcached_init */
+
+void module_register (void)
+{
+  plugin_register_complex_config ("memcached", memcached_config);
+  plugin_register_init ("memcached", memcached_init);
+}
index 80ae699..ab0812b 100644 (file)
@@ -59,6 +59,7 @@
 struct o_database_s
 {
   char *name;
+  char *host;
   char *connect_id;
   char *username;
   char *password;
@@ -183,33 +184,6 @@ static void o_database_free (o_database_t *db) /* {{{ */
  * </Plugin>
  */
 
-static int o_config_set_string (char **ret_string, /* {{{ */
-    oconfig_item_t *ci)
-{
-  char *string;
-
-  if ((ci->values_num != 1)
-      || (ci->values[0].type != OCONFIG_TYPE_STRING))
-  {
-    WARNING ("oracle plugin: The `%s' config option "
-        "needs exactly one string argument.", ci->key);
-    return (-1);
-  }
-
-  string = strdup (ci->values[0].value.string);
-  if (string == NULL)
-  {
-    ERROR ("oracle plugin: strdup failed.");
-    return (-1);
-  }
-
-  if (*ret_string != NULL)
-    free (*ret_string);
-  *ret_string = string;
-
-  return (0);
-} /* }}} int o_config_set_string */
-
 static int o_config_add_database (oconfig_item_t *ci) /* {{{ */
 {
   o_database_t *db;
@@ -231,8 +205,13 @@ static int o_config_add_database (oconfig_item_t *ci) /* {{{ */
     return (-1);
   }
   memset (db, 0, sizeof (*db));
+  db->name = NULL;
+  db->host = NULL;
+  db->connect_id = NULL;
+  db->username = NULL;
+  db->password = NULL;
 
-  status = o_config_set_string (&db->name, ci);
+  status = cf_util_get_string (ci, &db->name);
   if (status != 0)
   {
     sfree (db);
@@ -245,11 +224,13 @@ static int o_config_add_database (oconfig_item_t *ci) /* {{{ */
     oconfig_item_t *child = ci->children + i;
 
     if (strcasecmp ("ConnectID", child->key) == 0)
-      status = o_config_set_string (&db->connect_id, child);
+      status = cf_util_get_string (child, &db->connect_id);
+    else if (strcasecmp ("Host", child->key) == 0)
+      status = cf_util_get_string (child, &db->host);
     else if (strcasecmp ("Username", child->key) == 0)
-      status = o_config_set_string (&db->username, child);
+      status = cf_util_get_string (child, &db->username);
     else if (strcasecmp ("Password", child->key) == 0)
-      status = o_config_set_string (&db->password, child);
+      status = cf_util_get_string (child, &db->password);
     else if (strcasecmp ("Query", child->key) == 0)
       status = udb_query_pick_from_list (child, queries, queries_num,
           &db->queries, &db->queries_num);
@@ -613,7 +594,8 @@ static int o_read_database_query (o_database_t *db, /* {{{ */
   } /* for (j = 1; j <= param_counter; j++) */
   /* }}} End of the ``define'' stuff. */
 
-  status = udb_query_prepare_result (q, prep_area, hostname_g,
+  status = udb_query_prepare_result (q, prep_area,
+      (db->host != NULL) ? db->host : hostname_g,
       /* plugin = */ "oracle", db->name, column_names, column_num,
       /* interval = */ 0);
   if (status != 0)
index 86062d9..439cf4b 100644 (file)
@@ -54,6 +54,7 @@ struct redis_node_s
 {
   char name[MAX_REDIS_NODE_NAME];
   char host[HOST_NAME_MAX];
+  char passwd[HOST_NAME_MAX];
   int port;
   int timeout;
 
@@ -136,6 +137,8 @@ static int redis_config_node (oconfig_item_t *ci) /* {{{ */
     }
     else if (strcasecmp ("Timeout", option->key) == 0)
       status = cf_util_get_int (option, &rn.timeout);
+    else if (strcasecmp ("Password", option->key) == 0)
+      status = cf_util_get_string_buffer (option, rn.passwd, sizeof (rn.passwd));
     else
       WARNING ("redis plugin: Option `%s' not allowed inside a `Node' "
           "block. I'll ignore this option.", option->key);
@@ -255,6 +258,18 @@ static int redis_read (void) /* {{{ */
       continue;
     }
 
+    if (strlen (rn->passwd) > 0)
+    {
+      DEBUG ("redis plugin: authenticanting node `%s' passwd(%s).", rn->name, rn->passwd);
+      status = credis_auth(rh, rn->passwd);
+      if (status != 0)
+      {
+        WARNING ("redis plugin: unable to authenticate on node `%s'.", rn->name);
+        credis_close (rh);
+        continue;
+      }
+    }
+
     memset (&info, 0, sizeof (info));
     status = credis_info (rh, &info);
     if (status != 0)
index 397969e..c7b634b 100644 (file)
@@ -1,6 +1,6 @@
 /**
  * collectd - src/swap.c
- * Copyright (C) 2005-2010  Florian octo Forster
+ * Copyright (C) 2005-2012  Florian octo Forster
  * Copyright (C) 2009       Stefan Völkel
  * Copyright (C) 2009       Manuel Sanmartin
  * Copyright (C) 2010       Aurélien Reynaud
 #define MAX(x,y) ((x) > (y) ? (x) : (y))
 
 #if KERNEL_LINUX
-# define SWAP_HAVE_CONFIG 1
-/* No global variables */
+# define SWAP_HAVE_REPORT_BY_DEVICE 1
+static derive_t pagesize;
+static _Bool report_bytes = 0;
+static _Bool report_by_device = 0;
 /* #endif KERNEL_LINUX */
 
 #elif HAVE_SWAPCTL && HAVE_SWAPCTL_TWO_ARGS
-# define SWAP_HAVE_CONFIG 1
+# define SWAP_HAVE_REPORT_BY_DEVICE 1
 static derive_t pagesize;
+static _Bool report_by_device = 0;
 /* #endif HAVE_SWAPCTL && HAVE_SWAPCTL_TWO_ARGS */
 
 #elif defined(VM_SWAPUSAGE)
@@ -101,23 +104,37 @@ static perfstat_memory_total_t pmemory;
 # error "No applicable input method."
 #endif /* HAVE_LIBSTATGRAB */
 
-#if SWAP_HAVE_CONFIG
 static const char *config_keys[] =
 {
+       "ReportBytes",
        "ReportByDevice"
 };
 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
 
-static _Bool report_by_device = 0;
-
 static int swap_config (const char *key, const char *value) /* {{{ */
 {
-       if (strcasecmp ("ReportByDevice", key) == 0)
+       if (strcasecmp ("ReportBytes", key) == 0)
        {
+#if KERNEL_LINUX
+               report_bytes = IS_TRUE (value) ? 1 : 0;
+#else
+               WARNING ("swap plugin: The \"ReportBytes\" option is only "
+                               "valid under Linux. "
+                               "The option is going to be ignored.");
+#endif
+       }
+       else if (strcasecmp ("ReportByDevice", key) == 0)
+       {
+#if SWAP_HAVE_REPORT_BY_DEVICE
                if (IS_TRUE (value))
                        report_by_device = 1;
                else
                        report_by_device = 0;
+#else
+               WARNING ("swap plugin: The \"ReportByDevice\" option is not "
+                               "supported on this platform. "
+                               "The option is going to be ignored.");
+#endif /* SWAP_HAVE_REPORT_BY_DEVICE */
        }
        else
        {
@@ -126,12 +143,11 @@ static int swap_config (const char *key, const char *value) /* {{{ */
 
        return (0);
 } /* }}} int swap_config */
-#endif /* SWAP_HAVE_CONFIG */
 
 static int swap_init (void) /* {{{ */
 {
 #if KERNEL_LINUX
-       /* No init stuff */
+       pagesize = (derive_t) sysconf (_SC_PAGESIZE);
 /* #endif KERNEL_LINUX */
 
 #elif HAVE_SWAPCTL && HAVE_SWAPCTL_TWO_ARGS
@@ -406,6 +422,12 @@ static int swap_read_io (void) /* {{{ */
        if (have_data != 0x03)
                return (ENOENT);
 
+       if (report_bytes)
+       {
+               swap_in = swap_in * pagesize;
+               swap_out = swap_out * pagesize;
+       }
+
        swap_submit_derive (NULL, "in",  swap_in);
        swap_submit_derive (NULL, "out", swap_out);
 
@@ -773,9 +795,8 @@ static int swap_read (void) /* {{{ */
 
 void module_register (void)
 {
-#if SWAP_HAVE_CONFIG
-       plugin_register_config ("swap", swap_config, config_keys, config_keys_num);
-#endif
+       plugin_register_config ("swap", swap_config,
+                       config_keys, config_keys_num);
        plugin_register_init ("swap", swap_init);
        plugin_register_read ("swap", swap_read);
 } /* void module_register */
index 3c8fc72..4d90c41 100644 (file)
 #endif
 
 #if KERNEL_LINUX
+# include <asm/types.h>
+/* sys/socket.h is necessary to compile when using netlink on older systems. */
+# include <sys/socket.h>
+# include <linux/netlink.h>
+# include <linux/inet_diag.h>
+# include <sys/socket.h>
+# include <arpa/inet.h>
 /* #endif KERNEL_LINUX */
 
 #elif HAVE_SYSCTLBYNAME
 #endif /* KERNEL_AIX */
 
 #if KERNEL_LINUX
+struct nlreq {
+  struct nlmsghdr nlh;
+  struct inet_diag_req r;
+};
+
 static const char *tcp_state[] =
 {
   "", /* 0 */
@@ -263,6 +275,17 @@ static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
 static int port_collect_listening = 0;
 static port_entry_t *port_list_head = NULL;
 
+static uint32_t sequence_number = 0;
+
+#if KERNEL_LINUX
+enum
+{
+  SRC_DUNNO,
+  SRC_NETLINK,
+  SRC_PROC
+} linux_source = SRC_DUNNO;
+#endif
+
 static void conn_submit_port_entry (port_entry_t *pe)
 {
   value_t values[1];
@@ -419,6 +442,140 @@ static int conn_handle_ports (uint16_t port_local, uint16_t port_remote, uint8_t
 } /* int conn_handle_ports */
 
 #if KERNEL_LINUX
+/* Returns zero on success, less than zero on socket error and greater than
+ * zero on other errors. */
+static int conn_read_netlink (void)
+{
+  int fd;
+  struct sockaddr_nl nladdr;
+  struct nlreq req;
+  struct msghdr msg;
+  struct iovec iov;
+  struct inet_diag_msg *r;
+  char buf[8192];
+
+  /* If this fails, it's likely a permission problem. We'll fall back to
+   * reading this information from files below. */
+  fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_INET_DIAG);
+  if (fd < 0)
+  {
+    ERROR ("tcpconns plugin: conn_read_netlink: socket(AF_NETLINK, SOCK_RAW, "
+       "NETLINK_INET_DIAG) failed: %s",
+       sstrerror (errno, buf, sizeof (buf)));
+    return (-1);
+  }
+
+  memset(&nladdr, 0, sizeof(nladdr));
+  nladdr.nl_family = AF_NETLINK;
+
+  memset(&req, 0, sizeof(req));
+  req.nlh.nlmsg_len = sizeof(req);
+  req.nlh.nlmsg_type = TCPDIAG_GETSOCK;
+  /* NLM_F_ROOT: return the complete table instead of a single entry.
+   * NLM_F_MATCH: return all entries matching criteria (not implemented)
+   * NLM_F_REQUEST: must be set on all request messages */
+  req.nlh.nlmsg_flags = NLM_F_ROOT | NLM_F_MATCH | NLM_F_REQUEST;
+  req.nlh.nlmsg_pid = 0;
+  /* The sequence_number is used to track our messages. Since netlink is not
+   * reliable, we don't want to end up with a corrupt or incomplete old
+   * message in case the system is/was out of memory. */
+  req.nlh.nlmsg_seq = ++sequence_number;
+  req.r.idiag_family = AF_INET;
+  req.r.idiag_states = 0xfff;
+  req.r.idiag_ext = 0;
+
+  memset(&iov, 0, sizeof(iov));
+  iov.iov_base = &req;
+  iov.iov_len = sizeof(req);
+
+  memset(&msg, 0, sizeof(msg));
+  msg.msg_name = (void*)&nladdr;
+  msg.msg_namelen = sizeof(nladdr);
+  msg.msg_iov = &iov;
+  msg.msg_iovlen = 1;
+
+  if (sendmsg (fd, &msg, 0) < 0)
+  {
+    ERROR ("tcpconns plugin: conn_read_netlink: sendmsg(2) failed: %s",
+       sstrerror (errno, buf, sizeof (buf)));
+    close (fd);
+    return (-1);
+  }
+
+  iov.iov_base = buf;
+  iov.iov_len = sizeof(buf);
+
+  while (1)
+  {
+    int status;
+    struct nlmsghdr *h;
+
+    memset(&msg, 0, sizeof(msg));
+    msg.msg_name = (void*)&nladdr;
+    msg.msg_namelen = sizeof(nladdr);
+    msg.msg_iov = &iov;
+    msg.msg_iovlen = 1;
+
+    status = recvmsg(fd, (void *) &msg, /* flags = */ 0);
+    if (status < 0)
+    {
+      if ((errno == EINTR) || (errno == EAGAIN))
+        continue;
+
+      ERROR ("tcpconns plugin: conn_read_netlink: recvmsg(2) failed: %s",
+         sstrerror (errno, buf, sizeof (buf)));
+      close (fd);
+      return (-1);
+    }
+    else if (status == 0)
+    {
+      close (fd);
+      DEBUG ("tcpconns plugin: conn_read_netlink: Unexpected zero-sized "
+         "reply from netlink socket.");
+      return (0);
+    }
+
+    h = (struct nlmsghdr*)buf;
+    while (NLMSG_OK(h, status))
+    {
+      if (h->nlmsg_seq != sequence_number)
+      {
+       h = NLMSG_NEXT(h, status);
+       continue;
+      }
+
+      if (h->nlmsg_type == NLMSG_DONE)
+      {
+       close (fd);
+       return (0);
+      }
+      else if (h->nlmsg_type == NLMSG_ERROR)
+      {
+       struct nlmsgerr *msg_error;
+
+       msg_error = NLMSG_DATA(h);
+       WARNING ("tcpconns plugin: conn_read_netlink: Received error %i.",
+           msg_error->error);
+
+       close (fd);
+       return (1);
+      }
+
+      r = NLMSG_DATA(h);
+
+      /* This code does not (need to) distinguish between IPv4 and IPv6. */
+      conn_handle_ports (ntohs(r->id.idiag_sport),
+         ntohs(r->id.idiag_dport),
+         r->idiag_state);
+
+      h = NLMSG_NEXT(h, status);
+    } /* while (NLMSG_OK) */
+  } /* while (1) */
+
+  /* Not reached because the while() loop above handles the exit condition. */
+  return (0);
+} /* int conn_read_netlink */
+
 static int conn_handle_line (char *buffer)
 {
   char *fields[32];
@@ -553,26 +710,55 @@ static int conn_init (void)
 
 static int conn_read (void)
 {
-  int errors_num = 0;
+  int status;
 
   conn_reset_port_entry ();
 
-  if (conn_read_file ("/proc/net/tcp") != 0)
-    errors_num++;
-  if (conn_read_file ("/proc/net/tcp6") != 0)
-    errors_num++;
-
-  if (errors_num < 2)
+  if (linux_source == SRC_NETLINK)
   {
-    conn_submit_all ();
+    status = conn_read_netlink ();
   }
-  else
+  else if (linux_source == SRC_PROC)
   {
-    ERROR ("tcpconns plugin: Neither /proc/net/tcp nor /proc/net/tcp6 "
-       "coult be read.");
-    return (-1);
+    int errors_num = 0;
+
+    if (conn_read_file ("/proc/net/tcp") != 0)
+      errors_num++;
+    if (conn_read_file ("/proc/net/tcp6") != 0)
+      errors_num++;
+
+    if (errors_num < 2)
+      status = 0;
+    else
+      status = ENOENT;
+  }
+  else /* if (linux_source == SRC_DUNNO) */
+  {
+    /* Try to use netlink for getting this data, it is _much_ faster on systems
+     * with a large amount of connections. */
+    status = conn_read_netlink ();
+    if (status == 0)
+    {
+      INFO ("tcpconns plugin: Reading from netlink succeeded. "
+         "Will use the netlink method from now on.");
+      linux_source = SRC_NETLINK;
+    }
+    else
+    {
+      INFO ("tcpconns plugin: Reading from netlink failed. "
+         "Will read from /proc from now on.");
+      linux_source = SRC_PROC;
+
+      /* return success here to avoid the "plugin failed" message. */
+      return (0);
+    }
   }
 
+  if (status == 0)
+    conn_submit_all ();
+  else
+    return (status);
+
   return (0);
 } /* int conn_read */
 /* #endif KERNEL_LINUX */
diff --git a/src/utils_format_graphite.c b/src/utils_format_graphite.c
new file mode 100644 (file)
index 0000000..49a59c5
--- /dev/null
@@ -0,0 +1,227 @@
+/**
+ * collectd - src/utils_format_graphite.c
+ * Copyright (C) 2012  Thomas Meson
+ * Copyright (C) 2012  Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Thomas Meson <zllak at hycik.org>
+ *   Florian octo Forster <octo at collectd.org>
+ **/
+
+#include "collectd.h"
+#include "plugin.h"
+#include "common.h"
+
+#include "utils_cache.h"
+#include "utils_format_json.h"
+#include "utils_parse_option.h"
+
+/* Utils functions to format data sets in graphite format.
+ * Largely taken from write_graphite.c as it remains the same formatting */
+
+static int gr_format_values (char *ret, size_t ret_len,
+        int ds_num, const data_set_t *ds, const value_list_t *vl)
+{
+    size_t offset = 0;
+    int status;
+
+    assert (0 == strcmp (ds->type, vl->type));
+
+    memset (ret, 0, ret_len);
+
+#define BUFFER_ADD(...) do { \
+    status = ssnprintf (ret + offset, ret_len - offset, \
+            __VA_ARGS__); \
+    if (status < 1) \
+    { \
+        return (-1); \
+    } \
+    else if (((size_t) status) >= (ret_len - offset)) \
+    { \
+        return (-1); \
+    } \
+    else \
+    offset += ((size_t) status); \
+} while (0)
+
+    if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
+        BUFFER_ADD ("%f", vl->values[ds_num].gauge);
+    else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
+        BUFFER_ADD ("%llu", vl->values[ds_num].counter);
+    else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
+        BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive);
+    else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
+        BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute);
+    else
+    {
+        ERROR ("gr_format_values plugin: Unknown data source type: %i",
+                ds->ds[ds_num].type);
+        return (-1);
+    }
+
+#undef BUFFER_ADD
+
+    return (0);
+}
+
+static void gr_copy_escape_part (char *dst, const char *src, size_t dst_len,
+    char escape_char)
+{
+    size_t i;
+
+    memset (dst, 0, dst_len);
+
+    if (src == NULL)
+        return;
+
+    for (i = 0; i < dst_len; i++)
+    {
+        if (src[i] == 0)
+        {
+            dst[i] = 0;
+            break;
+        }
+
+        if ((src[i] == '.')
+                || isspace ((int) src[i])
+                || iscntrl ((int) src[i]))
+            dst[i] = escape_char;
+        else
+            dst[i] = src[i];
+    }
+}
+
+static int gr_format_name (char *ret, int ret_len,
+        const value_list_t *vl,
+        const char *ds_name,
+        char *prefix,
+        char *postfix,
+        char escape_char)
+{
+    char n_host[DATA_MAX_NAME_LEN];
+    char n_plugin[DATA_MAX_NAME_LEN];
+    char n_plugin_instance[DATA_MAX_NAME_LEN];
+    char n_type[DATA_MAX_NAME_LEN];
+    char n_type_instance[DATA_MAX_NAME_LEN];
+
+    char tmp_plugin[2 * DATA_MAX_NAME_LEN + 1];
+    char tmp_type[2 * DATA_MAX_NAME_LEN + 1];
+
+    if (prefix == NULL)
+        prefix = "";
+
+    if (postfix == NULL)
+        postfix = "";
+
+    gr_copy_escape_part (n_host, vl->host,
+            sizeof (n_host), escape_char);
+    gr_copy_escape_part (n_plugin, vl->plugin,
+            sizeof (n_plugin), escape_char);
+    gr_copy_escape_part (n_plugin_instance, vl->plugin_instance,
+            sizeof (n_plugin_instance), escape_char);
+    gr_copy_escape_part (n_type, vl->type,
+            sizeof (n_type), escape_char);
+    gr_copy_escape_part (n_type_instance, vl->type_instance,
+            sizeof (n_type_instance), escape_char);
+
+    if (n_plugin_instance[0] != '\0')
+        ssnprintf (tmp_plugin, sizeof (tmp_plugin), "%s%c%s",
+            n_plugin,
+            '-',
+            n_plugin_instance);
+    else
+        sstrncpy (tmp_plugin, n_plugin, sizeof (tmp_plugin));
+
+    if (n_type_instance[0] != '\0')
+        ssnprintf (tmp_type, sizeof (tmp_type), "%s%c%s",
+            n_type,
+            '-',
+            n_type_instance);
+    else
+        sstrncpy (tmp_type, n_type, sizeof (tmp_type));
+
+    if (ds_name != NULL)
+        ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s",
+            prefix, n_host, postfix, tmp_plugin, tmp_type, ds_name);
+    else
+        ssnprintf (ret, ret_len, "%s%s%s.%s.%s",
+            prefix, n_host, postfix, tmp_plugin, tmp_type);
+
+    return (0);
+}
+
+int format_graphite (char *buffer, size_t buffer_size,
+    const data_set_t *ds, const value_list_t *vl, char *prefix,
+    char *postfix, char escape_char)
+{
+    int status = 0;
+    int i;
+    int buffer_pos = 0;
+
+    for (i = 0; i < ds->ds_num; i++)
+    {
+        const char *ds_name = NULL;
+        char        key[10*DATA_MAX_NAME_LEN];
+        char        values[512];
+        size_t      message_len;
+        char        message[1024];
+
+        ds_name = ds->ds[i].name;
+
+        /* Copy the identifier to `key' and escape it. */
+        status = gr_format_name (key, sizeof (key), vl, ds_name,
+                    prefix, postfix, escape_char);
+        if (status != 0)
+        {
+            ERROR ("amqp plugin: error with gr_format_name");
+            return (status);
+        }
+
+        escape_string (key, sizeof (key));
+        /* Convert the values to an ASCII representation and put that into
+         * `values'. */
+        status = gr_format_values (values, sizeof (values), i, ds, vl);
+        if (status != 0)
+        {
+            ERROR ("format_graphite: error with gr_format_values");
+            return (status);
+        }
+
+        /* Compute the graphite command */
+        message_len = (size_t) ssnprintf (message, sizeof (message),
+                "%s %s %u\r\n",
+                key,
+                values,
+                (unsigned int) CDTIME_T_TO_TIME_T (vl->time));
+        if (message_len >= sizeof (message)) {
+            ERROR ("format_graphite: message buffer too small: "
+                    "Need %zu bytes.", message_len + 1);
+            return (-ENOMEM);
+        }
+
+        /* Append it in case we got multiple data set */
+        if ((buffer_pos + message_len) >= buffer_size)
+        {
+            ERROR ("format_graphite: target buffer too small");
+            return (-ENOMEM);
+        }
+        memcpy((void *) (buffer + buffer_pos), message, message_len);
+        buffer_pos += message_len;
+    }
+    return (status);
+} /* int format_graphite */
+
+/* vim: set sw=2 sts=2 et fdm=marker : */
diff --git a/src/utils_format_graphite.h b/src/utils_format_graphite.h
new file mode 100644 (file)
index 0000000..a3c4d85
--- /dev/null
@@ -0,0 +1,33 @@
+/**
+ * collectd - src/utils_format_graphite.h
+ * Copyright (C) 2012  Thomas Meson
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Author:
+ *   Thomas Meson <zllak at hycik.org>
+ **/
+
+#ifndef UTILS_FORMAT_GRAPHITE_H
+#define UTILS_FORMAT_GRAPHITE_H 1
+
+#include "collectd.h"
+#include "plugin.h"
+
+int format_graphite (char *buffer,
+    size_t buffer_size, const data_set_t *ds,
+    const value_list_t *vl, const char *prefix,
+    const char *postfix, const char escape_char);
+
+#endif /* UTILS_FORMAT_GRAPHITE_H */
index 2a5526b..bbc3dfd 100644 (file)
@@ -70,7 +70,7 @@ static int escape_string (char *buffer, size_t buffer_size, /* {{{ */
 #undef BUFFER_ADD
 
   return (0);
-} /* }}} int buffer_add_string */
+} /* }}} int escape_string */
 
 static int values_to_json (char *buffer, size_t buffer_size, /* {{{ */
                 const data_set_t *ds, const value_list_t *vl, int store_rates)
@@ -152,7 +152,7 @@ static int values_to_json (char *buffer, size_t buffer_size, /* {{{ */
 } /* }}} int values_to_json */
 
 static int dstypes_to_json (char *buffer, size_t buffer_size, /* {{{ */
-                const data_set_t *ds, const value_list_t *vl)
+                const data_set_t *ds)
 {
   size_t offset = 0;
   int i;
@@ -189,7 +189,7 @@ static int dstypes_to_json (char *buffer, size_t buffer_size, /* {{{ */
 } /* }}} int dstypes_to_json */
 
 static int dsnames_to_json (char *buffer, size_t buffer_size, /* {{{ */
-                const data_set_t *ds, const value_list_t *vl)
+                const data_set_t *ds)
 {
   size_t offset = 0;
   int i;
@@ -225,6 +225,86 @@ static int dsnames_to_json (char *buffer, size_t buffer_size, /* {{{ */
   return (0);
 } /* }}} int dsnames_to_json */
 
+static int meta_data_to_json (char *buffer, size_t buffer_size, /* {{{ */
+    meta_data_t *meta)
+{
+  size_t offset = 0;
+  char **keys = NULL;
+  int keys_num;
+  int status;
+  int i;
+
+  memset (buffer, 0, buffer_size);
+
+#define BUFFER_ADD(...) do { \
+  status = ssnprintf (buffer + offset, buffer_size - offset, \
+      __VA_ARGS__); \
+  if (status < 1) \
+    return (-1); \
+  else if (((size_t) status) >= (buffer_size - offset)) \
+    return (-ENOMEM); \
+  else \
+    offset += ((size_t) status); \
+} while (0)
+
+  keys_num = meta_data_toc (meta, &keys);
+  for (i = 0; i < keys_num; ++i)
+  {
+    int type;
+    char *key = keys[i];
+
+    type = meta_data_type (meta, key);
+    if (type == MD_TYPE_STRING)
+    {
+      char *value = NULL;
+      if (meta_data_get_string (meta, key, &value) == 0)
+      {
+        char temp[512] = "";
+        escape_string (temp, sizeof (temp), value);
+        sfree (value);
+        BUFFER_ADD (",\"%s\":%s", key, temp);
+      }
+    }
+    else if (type == MD_TYPE_SIGNED_INT)
+    {
+      int64_t value = 0;
+      if (meta_data_get_signed_int (meta, key, &value) == 0)
+        BUFFER_ADD (",\"%s\":%"PRIi64, key, value);
+    }
+    else if (type == MD_TYPE_UNSIGNED_INT)
+    {
+      uint64_t value = 0;
+      if (meta_data_get_unsigned_int (meta, key, &value) == 0)
+        BUFFER_ADD (",\"%s\":%"PRIu64, key, value);
+    }
+    else if (type == MD_TYPE_DOUBLE)
+    {
+      double value = 0.0;
+      if (meta_data_get_double (meta, key, &value) == 0)
+        BUFFER_ADD (",\"%s\":%f", key, value);
+    }
+    else if (type == MD_TYPE_BOOLEAN)
+    {
+      _Bool value = 0;
+      if (meta_data_get_boolean (meta, key, &value) == 0)
+        BUFFER_ADD (",\"%s\":%s", key, value ? "true" : "false");
+    }
+
+    free (key);
+  } /* for (keys) */
+  free (keys);
+
+  if (offset <= 0)
+    return (ENOENT);
+
+  buffer[0] = '{'; /* replace leading ',' */
+  BUFFER_ADD ("}");
+
+#undef BUFFER_ADD
+
+  return (0);
+} /* int meta_data_to_json */
+
 static int value_list_to_json (char *buffer, size_t buffer_size, /* {{{ */
                 const data_set_t *ds, const value_list_t *vl, int store_rates)
 {
@@ -254,12 +334,12 @@ static int value_list_to_json (char *buffer, size_t buffer_size, /* {{{ */
     return (status);
   BUFFER_ADD ("\"values\":%s", temp);
 
-  status = dstypes_to_json (temp, sizeof (temp), ds, vl);
+  status = dstypes_to_json (temp, sizeof (temp), ds);
   if (status != 0)
     return (status);
   BUFFER_ADD (",\"dstypes\":%s", temp);
 
-  status = dsnames_to_json (temp, sizeof (temp), ds, vl);
+  status = dsnames_to_json (temp, sizeof (temp), ds);
   if (status != 0)
     return (status);
   BUFFER_ADD (",\"dsnames\":%s", temp);
@@ -280,6 +360,17 @@ static int value_list_to_json (char *buffer, size_t buffer_size, /* {{{ */
   BUFFER_ADD_KEYVAL ("type", vl->type);
   BUFFER_ADD_KEYVAL ("type_instance", vl->type_instance);
 
+  if (vl->meta != NULL)
+  {
+    char meta_buffer[buffer_size];
+    memset (meta_buffer, 0, sizeof (meta_buffer));
+    status = meta_data_to_json (meta_buffer, sizeof (meta_buffer), vl->meta);
+    if (status != 0)
+      return (status);
+
+    BUFFER_ADD (",\"meta\":%s", meta_buffer);
+  } /* if (vl->meta != NULL) */
+
   BUFFER_ADD ("}");
 
 #undef BUFFER_ADD_KEYVAL
index d6583a7..f3753da 100644 (file)
@@ -47,6 +47,7 @@
 
 #include "utils_cache.h"
 #include "utils_parse_option.h"
+#include "utils_format_graphite.h"
 
 /* Folks without pthread will need to disable this plugin. */
 #include <pthread.h>
@@ -284,175 +285,12 @@ static int wg_flush (cdtime_t timeout,
     return (status);
 }
 
-static int wg_format_values (char *ret, size_t ret_len,
-        int ds_num, const data_set_t *ds, const value_list_t *vl,
-        _Bool store_rates)
-{
-    size_t offset = 0;
-    int status;
-    gauge_t *rates = NULL;
-
-    assert (0 == strcmp (ds->type, vl->type));
-
-    memset (ret, 0, ret_len);
-
-#define BUFFER_ADD(...) do { \
-    status = ssnprintf (ret + offset, ret_len - offset, \
-            __VA_ARGS__); \
-    if (status < 1) \
-    { \
-        sfree (rates); \
-        return (-1); \
-    } \
-    else if (((size_t) status) >= (ret_len - offset)) \
-    { \
-        sfree (rates); \
-        return (-1); \
-    } \
-    else \
-    offset += ((size_t) status); \
-} while (0)
-
-    if (ds->ds[ds_num].type == DS_TYPE_GAUGE)
-        BUFFER_ADD ("%f", vl->values[ds_num].gauge);
-    else if (store_rates)
-    {
-        if (rates == NULL)
-            rates = uc_get_rate (ds, vl);
-        if (rates == NULL)
-        {
-            WARNING ("format_values: "
-                    "uc_get_rate failed.");
-            return (-1);
-        }
-        BUFFER_ADD ("%g", rates[ds_num]);
-    }
-    else if (ds->ds[ds_num].type == DS_TYPE_COUNTER)
-        BUFFER_ADD ("%llu", vl->values[ds_num].counter);
-    else if (ds->ds[ds_num].type == DS_TYPE_DERIVE)
-        BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive);
-    else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE)
-        BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute);
-    else
-    {
-        ERROR ("format_values plugin: Unknown data source type: %i",
-                ds->ds[ds_num].type);
-        sfree (rates);
-        return (-1);
-    }
-
-#undef BUFFER_ADD
-
-    sfree (rates);
-    return (0);
-}
-
-static void wg_copy_escape_part (char *dst, const char *src, size_t dst_len,
-    char escape_char)
-{
-    size_t i;
-
-    memset (dst, 0, dst_len);
-
-    if (src == NULL)
-        return;
-
-    for (i = 0; i < dst_len; i++)
-    {
-        if (src[i] == 0)
-        {
-            dst[i] = 0;
-            break;
-        }
-
-        if ((src[i] == '.')
-                || isspace ((int) src[i])
-                || iscntrl ((int) src[i]))
-            dst[i] = escape_char;
-        else
-            dst[i] = src[i];
-    }
-}
-
-static int wg_format_name (char *ret, int ret_len,
-        const value_list_t *vl,
-        const struct wg_callback *cb,
-        const char *ds_name)
-{
-    char n_host[DATA_MAX_NAME_LEN];
-    char n_plugin[DATA_MAX_NAME_LEN];
-    char n_plugin_instance[DATA_MAX_NAME_LEN];
-    char n_type[DATA_MAX_NAME_LEN];
-    char n_type_instance[DATA_MAX_NAME_LEN];
-
-    char *prefix;
-    char *postfix;
-
-    char tmp_plugin[2 * DATA_MAX_NAME_LEN + 1];
-    char tmp_type[2 * DATA_MAX_NAME_LEN + 1];
-
-    prefix = cb->prefix;
-    if (prefix == NULL)
-        prefix = "";
-
-    postfix = cb->postfix;
-    if (postfix == NULL)
-        postfix = "";
-
-    wg_copy_escape_part (n_host, vl->host,
-            sizeof (n_host), cb->escape_char);
-    wg_copy_escape_part (n_plugin, vl->plugin,
-            sizeof (n_plugin), cb->escape_char);
-    wg_copy_escape_part (n_plugin_instance, vl->plugin_instance,
-            sizeof (n_plugin_instance), cb->escape_char);
-    wg_copy_escape_part (n_type, vl->type,
-            sizeof (n_type), cb->escape_char);
-    wg_copy_escape_part (n_type_instance, vl->type_instance,
-            sizeof (n_type_instance), cb->escape_char);
-
-    if (n_plugin_instance[0] != '\0')
-        ssnprintf (tmp_plugin, sizeof (tmp_plugin), "%s%c%s",
-            n_plugin,
-            cb->separate_instances ? '.' : '-',
-            n_plugin_instance);
-    else
-        sstrncpy (tmp_plugin, n_plugin, sizeof (tmp_plugin));
-
-    if (n_type_instance[0] != '\0')
-        ssnprintf (tmp_type, sizeof (tmp_type), "%s%c%s",
-            n_type,
-            cb->separate_instances ? '.' : '-',
-            n_type_instance);
-    else
-        sstrncpy (tmp_type, n_type, sizeof (tmp_type));
-
-    if (ds_name != NULL)
-        ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s",
-            prefix, n_host, postfix, tmp_plugin, tmp_type, ds_name);
-    else
-        ssnprintf (ret, ret_len, "%s%s%s.%s.%s",
-            prefix, n_host, postfix, tmp_plugin, tmp_type);
-
-    return (0);
-}
-
-static int wg_send_message (const char* key, const char* value,
-        cdtime_t time, struct wg_callback *cb)
+static int wg_send_message (char const *message, struct wg_callback *cb)
 {
     int status;
     size_t message_len;
-    char message[1024];
-
-    message_len = (size_t) ssnprintf (message, sizeof (message),
-            "%s %s %u\r\n",
-            key,
-            value,
-            (unsigned int) CDTIME_T_TO_TIME_T (time));
-    if (message_len >= sizeof (message)) {
-        ERROR ("write_graphite plugin: message buffer too small: "
-                "Need %zu bytes.", message_len + 1);
-        return (-1);
-    }
+
+    message_len = strlen (message);
 
     pthread_mutex_lock (&cb->send_lock);
 
@@ -502,10 +340,8 @@ static int wg_send_message (const char* key, const char* value,
 static int wg_write_messages (const data_set_t *ds, const value_list_t *vl,
         struct wg_callback *cb)
 {
-    char key[10*DATA_MAX_NAME_LEN];
-    char values[512];
-
-    int status, i;
+    char buffer[4096];
+    int status;
 
     if (0 != strcmp (ds->type, vl->type))
     {
@@ -514,45 +350,22 @@ static int wg_write_messages (const data_set_t *ds, const value_list_t *vl,
         return -1;
     }
 
-    for (i = 0; i < ds->ds_num; i++)
-    {
-        const char *ds_name = NULL;
-
-        if (cb->always_append_ds || (ds->ds_num > 1))
-            ds_name = ds->ds[i].name;
-
-        /* Copy the identifier to `key' and escape it. */
-        status = wg_format_name (key, sizeof (key), vl, cb, ds_name);
-        if (status != 0)
-        {
-            ERROR ("write_graphite plugin: error with format_name");
-            return (status);
-        }
-
-        escape_string (key, sizeof (key));
-        /* Convert the values to an ASCII representation and put that into
-         * `values'. */
-        status = wg_format_values (values, sizeof (values), i, ds, vl,
-                    cb->store_rates);
-        if (status != 0)
-        {
-            ERROR ("write_graphite plugin: error with "
-                    "wg_format_values");
-            return (status);
-        }
+    memset (buffer, 0, sizeof (buffer));
+    status = format_graphite (buffer, sizeof (buffer), ds, vl,
+            cb->prefix, cb->postfix, cb->escape_char);
+    if (status != 0) /* error message has been printed already. */
+        return (status);
 
-        /* Send the message to graphite */
-        status = wg_send_message (key, values, vl->time, cb);
-        if (status != 0)
-        {
-            ERROR ("write_graphite plugin: error with "
-                    "wg_send_message");
-            return (status);
-        }
+    wg_send_message (buffer, cb);
+    if (status != 0)
+    {
+        ERROR ("write_graphite plugin: wg_send_message failed "
+                "with status %i.", status);
+        return (status);
     }
 
     return (0);
-}
+} /* int wg_write_messages */
 
 static int wg_write (const data_set_t *ds, const value_list_t *vl,
         user_data_t *user_data)