From: Florian Forster Date: Fri, 7 Sep 2012 09:19:52 +0000 (+0200) Subject: Merge branch 'collectd-5.1' X-Git-Tag: collectd-5.2.0~62 X-Git-Url: https://git.octo.it/?a=commitdiff_plain;h=59dded4cc1a2077ad11c6e9b507e15ad19e0e38c;hp=fda68e239798cea197a225ed5325f5bb3c2e70de;p=collectd.git Merge branch 'collectd-5.1' --- diff --git a/AUTHORS b/AUTHORS index 90560e34..78dbad14 100644 --- a/AUTHORS +++ b/AUTHORS @@ -200,6 +200,9 @@ Sven Trenkel - netapp plugin. - python plugin. +Thomas Meson + - Graphite support for the AMQP plugin. + Tomasz Pala - conntrack plugin. diff --git a/contrib/README b/contrib/README index bc1fe9ff..1ebf1f14 100644 --- a/contrib/README +++ b/contrib/README @@ -101,3 +101,8 @@ solaris-smf ----------- Manifest file for the Solaris SMF system and detailed information on how to register collectd as a service with this system. + +collectd.service +---------------- + Service file for systemd. Please ship this file as + /lib/systemd/system/collectd.service in any linux package of collectd. diff --git a/contrib/collectd.service b/contrib/collectd.service new file mode 100644 index 00000000..ee4d596d --- /dev/null +++ b/contrib/collectd.service @@ -0,0 +1,15 @@ +[Unit] +Description=statistics collection daemon +Documentation=man:collectd(1) +After=local-fs.target network.target +Requires=local-fs.target network.target + +[Service] +ExecStart=/usr/sbin/collectd -C /etc/collectd/collectd.conf -f +Restart=always +RestartSec=10 +StandardOutput=syslog +StandardError=syslog + +[Install] +WantedBy=multi-user.target diff --git a/src/Makefile.am b/src/Makefile.am index 8fa03301..eef2a609 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -123,6 +123,7 @@ if BUILD_PLUGIN_AMQP pkglib_LTLIBRARIES += amqp.la amqp_la_SOURCES = amqp.c \ utils_cmd_putval.c utils_cmd_putval.h \ + utils_format_graphite.c utils_format_graphite.h \ utils_format_json.c utils_format_json.h amqp_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRABBITMQ_LDFLAGS) amqp_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBRABBITMQ_CPPFLAGS) @@ -1256,7 +1257,8 @@ endif if BUILD_PLUGIN_WRITE_GRAPHITE pkglib_LTLIBRARIES += write_graphite.la write_graphite_la_SOURCES = write_graphite.c \ - utils_format_json.c utils_format_json.h + utils_format_graphite.c utils_format_graphite.h \ + utils_format_json.c utils_format_json.h write_graphite_la_LDFLAGS = -module -avoid-version collectd_LDADD += "-dlopen" write_graphite.la collectd_DEPENDENCIES += write_graphite.la diff --git a/src/amqp.c b/src/amqp.c index 89284c81..876f7e9e 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -31,6 +31,7 @@ #include "plugin.h" #include "utils_cmd_putval.h" #include "utils_format_json.h" +#include "utils_format_graphite.h" #include @@ -42,8 +43,9 @@ #define CAMQP_DM_VOLATILE 1 #define CAMQP_DM_PERSISTENT 2 -#define CAMQP_FORMAT_COMMAND 1 -#define CAMQP_FORMAT_JSON 2 +#define CAMQP_FORMAT_COMMAND 1 +#define CAMQP_FORMAT_JSON 2 +#define CAMQP_FORMAT_GRAPHITE 3 #define CAMQP_CHANNEL 1 @@ -68,6 +70,10 @@ struct camqp_config_s uint8_t delivery_mode; _Bool store_rates; int format; + /* publish & graphite format only */ + char *prefix; + char *postfix; + char escape_char; /* subscribe only */ char *exchange_type; @@ -129,6 +135,9 @@ static void camqp_config_free (void *ptr) /* {{{ */ sfree (conf->exchange_type); sfree (conf->queue); sfree (conf->routing_key); + sfree (conf->prefix); + sfree (conf->postfix); + sfree (conf); } /* }}} void camqp_config_free */ @@ -699,6 +708,8 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ props.content_type = amqp_cstring_bytes("text/collectd"); else if (conf->format == CAMQP_FORMAT_JSON) props.content_type = amqp_cstring_bytes("application/json"); + else if (conf->format == CAMQP_FORMAT_GRAPHITE) + props.content_type = amqp_cstring_bytes("text/graphite"); else assert (23 == 42); props.delivery_mode = conf->delivery_mode; @@ -777,6 +788,17 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); format_json_finalize (buffer, &bfill, &bfree); } + else if (conf->format == CAMQP_FORMAT_GRAPHITE) + { + status = format_graphite (buffer, sizeof (buffer), ds, vl, + conf->prefix, conf->postfix, conf->escape_char); + if (status != 0) + { + ERROR ("amqp plugin: format_graphite failed with status %i.", + status); + return (status); + } + } else { ERROR ("amqp plugin: Invalid format (%i).", conf->format); @@ -809,6 +831,8 @@ static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */ conf->format = CAMQP_FORMAT_COMMAND; else if (strcasecmp ("JSON", string) == 0) conf->format = CAMQP_FORMAT_JSON; + else if (strcasecmp ("Graphite", string) == 0) + conf->format = CAMQP_FORMAT_GRAPHITE; else { WARNING ("amqp plugin: Invalid format string: %s", @@ -849,6 +873,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; + /* publish & graphite only */ + conf->prefix = NULL; + conf->postfix = NULL; + conf->escape_char = '_'; /* subscribe only */ conf->exchange_type = NULL; conf->queue = NULL; @@ -906,6 +934,20 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ status = cf_util_get_boolean (child, &conf->store_rates); else if ((strcasecmp ("Format", child->key) == 0) && publish) status = camqp_config_set_format (child, conf); + else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish) + status = cf_util_get_string (child, &conf->prefix); + else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish) + status = cf_util_get_string (child, &conf->postfix); + else if ((strcasecmp ("GraphiteEscapeChar", child->key) == 0) && publish) + { + char *tmp_buff = NULL; + status = cf_util_get_string (child, &tmp_buff); + if (strlen (tmp_buff) > 1) + WARNING ("amqp plugin: The option \"GraphiteEscapeChar\" handles " + "only one character. Others will be ignored."); + conf->escape_char = tmp_buff[0]; + sfree (tmp_buff); + } else WARNING ("amqp plugin: Ignoring unknown " "configuration option \"%s\".", child->key); diff --git a/src/collectd.conf.in b/src/collectd.conf.in index f3ef6759..8e537d60 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -465,8 +465,10 @@ # # -# Host "127.0.0.1" -# Port "11211" +# +# Host "127.0.0.1" +# Port "11211" +# # # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index c025f949..de9da5ac 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -220,6 +220,8 @@ possibly filtering or messages. # Persistent false # Format "command" # StoreRates false + # GraphitePrefix "collectd." + # GraphiteEscapeChar "_" # Receive values from an AMQP broker @@ -320,6 +322,10 @@ If set to B, the values are encoded in the I, an easy and straight forward exchange format. The C header field will be set to C. +If set to B, values are encoded in the I format, which is +" \n". The C header field will be set to +C. + A subscribing client I use the C header field to determine how to decode the values. Currently, the I itself can only decode the B format. @@ -334,6 +340,25 @@ using the internal value cache. Please note that currently this option is only used if the B option has been set to B. +=item B (Publish and B=I only) + +A prefix can be added in the metric name when outputting in the I format. +It's added before the I name. +Metric name will be "" + +=item B (Publish and B=I only) + +A postfix can be added in the metric name when outputting in the I format. +It's added after the I name. +Metric name will be "" + +=item B (Publish and B=I only) + +Specify a character to replace dots (.) in the host part of the metric name. +In I metric name, dots are used as separators between different +metric parts (host, plugin, type). +Default is "_" (I). + =back =head2 Plugin C @@ -1949,6 +1974,17 @@ The C connects to a memcached server and queries statistics about cache utilization, memory and bandwidth used. L + + + Host "memcache.example.com" + Port 11211 + + + +The plugin configuration consists of one or more B blocks which +specify one I connection each. Within the B blocks, the +following options are allowed: + =over 4 =item B I @@ -1959,6 +1995,11 @@ Hostname to connect to. Defaults to B<127.0.0.1>. TCP-Port to connect to. Defaults to B<11211>. +=item B I + +Connect to I using the UNIX domain socket at I. If this +setting is given, the B and B settings are ignored. + =back =head2 Plugin C @@ -3278,6 +3319,11 @@ values submitted to the daemon. Other than that, that name is not used. Defines the "database alias" or "service name" to connect to. Usually, these names are defined in the file named C<$ORACLE_HOME/network/admin/tnsnames.ora>. +=item B I + +Hostname to use when dispatching values for this database. Defaults to using +the global hostname of the I instance. + =item B I Username used for authentication. @@ -4066,6 +4112,10 @@ The B option is the TCP port on which the Redis instance accepts connections. Either a service name of a port number may be given. Please note that numerical port numbers must be given as a string, too. +=item B I + +Use I to authenticate when connecting to I. + =item B I The B option set the socket timeout for node response. Since the Redis diff --git a/src/memcached.c b/src/memcached.c index 48fa713b..a09f45ec 100644 --- a/src/memcached.c +++ b/src/memcached.c @@ -1,9 +1,10 @@ /** * collectd - src/memcached.c, based on src/hddtemp.c * Copyright (C) 2007 Antony Dovgal - * Copyright (C) 2007-2010 Florian Forster + * Copyright (C) 2007-2012 Florian Forster * Copyright (C) 2009 Doug MacEachern * Copyright (C) 2009 Franck Lombardi + * Copyright (C) 2012 Nicolas Szalay * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -24,6 +25,7 @@ * Florian octo Forster * Doug MacEachern * Franck Lombardi + * Nicolas Szalay **/ #include "collectd.h" @@ -31,502 +33,617 @@ #include "plugin.h" #include "configfile.h" -# include -# include -# include -# include -# include -# include - -/* Hack to work around the missing define in AIX */ -#ifndef MSG_DONTWAIT -# define MSG_DONTWAIT MSG_NONBLOCK -#endif +#include +#include +#include +#include +#include #define MEMCACHED_DEF_HOST "127.0.0.1" #define MEMCACHED_DEF_PORT "11211" -#define MEMCACHED_RETRY_COUNT 100 - -static const char *config_keys[] = +struct memcached_s { - "Socket", - "Host", - "Port" + char *name; + char *socket; + char *host; + char *port; }; -static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); +typedef struct memcached_s memcached_t; -static char *memcached_socket = NULL; -static char *memcached_host = NULL; -static char memcached_port[16]; +static _Bool memcached_have_instances = 0; -static int memcached_query_daemon (char *buffer, int buffer_size) /* {{{ */ +static void memcached_free (memcached_t *st) { - int fd; - ssize_t status; - int buffer_fill; - int i = 0; - - if (memcached_socket != NULL) { - struct sockaddr_un serv_addr; - - memset (&serv_addr, 0, sizeof (serv_addr)); - serv_addr.sun_family = AF_UNIX; - sstrncpy (serv_addr.sun_path, memcached_socket, - sizeof (serv_addr.sun_path)); - - /* create our socket descriptor */ - fd = socket (AF_UNIX, SOCK_STREAM, 0); - if (fd < 0) { - char errbuf[1024]; - ERROR ("memcached: unix socket: %s", sstrerror (errno, errbuf, - sizeof (errbuf))); - return -1; - } - - /* connect to the memcached daemon */ - status = (ssize_t) connect (fd, (struct sockaddr *) &serv_addr, - sizeof (serv_addr)); - if (status != 0) { - shutdown (fd, SHUT_RDWR); - close (fd); - fd = -1; - } - } - else { /* if (memcached_socket == NULL) */ - const char *host; - const char *port; - - struct addrinfo ai_hints; - struct addrinfo *ai_list, *ai_ptr; - int ai_return = 0; - - memset (&ai_hints, '\0', sizeof (ai_hints)); - ai_hints.ai_flags = 0; + if (st == NULL) + return; + + sfree (st->name); + sfree (st->socket); + sfree (st->host); + sfree (st->port); +} + +static int memcached_connect_unix (memcached_t *st) +{ + struct sockaddr_un serv_addr; + int fd; + + memset (&serv_addr, 0, sizeof (serv_addr)); + serv_addr.sun_family = AF_UNIX; + sstrncpy (serv_addr.sun_path, st->socket, + sizeof (serv_addr.sun_path)); + + /* create our socket descriptor */ + fd = socket (AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) + { + char errbuf[1024]; + ERROR ("memcached plugin: memcached_connect_unix: socket(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (-1); + } + + return (fd); +} /* int memcached_connect_unix */ + +static int memcached_connect_inet (memcached_t *st) +{ + char *host; + char *port; + + struct addrinfo ai_hints; + struct addrinfo *ai_list, *ai_ptr; + int status; + int fd = -1; + + memset (&ai_hints, 0, sizeof (ai_hints)); + ai_hints.ai_flags = 0; #ifdef AI_ADDRCONFIG - /* ai_hints.ai_flags |= AI_ADDRCONFIG; */ + ai_hints.ai_flags |= AI_ADDRCONFIG; #endif - ai_hints.ai_family = AF_INET; - ai_hints.ai_socktype = SOCK_STREAM; - ai_hints.ai_protocol = 0; - - host = memcached_host; - if (host == NULL) { - host = MEMCACHED_DEF_HOST; - } - - port = memcached_port; - if (strlen (port) == 0) { - port = MEMCACHED_DEF_PORT; - } - - if ((ai_return = getaddrinfo (host, port, &ai_hints, &ai_list)) != 0) { - char errbuf[1024]; - ERROR ("memcached: getaddrinfo (%s, %s): %s", - host, port, - (ai_return == EAI_SYSTEM) - ? sstrerror (errno, errbuf, sizeof (errbuf)) - : gai_strerror (ai_return)); - return -1; - } - - fd = -1; - for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) { - /* create our socket descriptor */ - fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); - if (fd < 0) { - char errbuf[1024]; - ERROR ("memcached: socket: %s", sstrerror (errno, errbuf, sizeof (errbuf))); - continue; - } - - /* connect to the memcached daemon */ - status = (ssize_t) connect (fd, (struct sockaddr *) ai_ptr->ai_addr, ai_ptr->ai_addrlen); - if (status != 0) { - shutdown (fd, SHUT_RDWR); - close (fd); - fd = -1; - continue; - } - - /* A socket could be opened and connecting succeeded. We're - * done. */ - break; - } - - freeaddrinfo (ai_list); - } - - if (fd < 0) { - ERROR ("memcached: Could not connect to daemon."); - return -1; - } - - if (send(fd, "stats\r\n", sizeof("stats\r\n") - 1, MSG_DONTWAIT) != (sizeof("stats\r\n") - 1)) { - ERROR ("memcached: Could not send command to the memcached daemon."); - return -1; - } - - { - struct pollfd p; - int status; - - memset (&p, 0, sizeof (p)); - p.fd = fd; - p.events = POLLIN | POLLERR | POLLHUP; - p.revents = 0; - - status = poll (&p, /* nfds = */ 1, - /* timeout = */ CDTIME_T_TO_MS (interval_g)); - if (status <= 0) - { - if (status == 0) - { - ERROR ("memcached: poll(2) timed out after %.3f seconds.", - CDTIME_T_TO_DOUBLE (interval_g)); - } - else - { - char errbuf[1024]; - ERROR ("memcached: poll(2) failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - } - shutdown (fd, SHUT_RDWR); - close (fd); - return (-1); - } - } - - /* receive data from the memcached daemon */ - memset (buffer, '\0', buffer_size); - - buffer_fill = 0; - while ((status = recv (fd, buffer + buffer_fill, buffer_size - buffer_fill, MSG_DONTWAIT)) != 0) { - if (i > MEMCACHED_RETRY_COUNT) { - ERROR("recv() timed out"); - break; - } - i++; - - if (status == -1) { - char errbuf[1024]; - - if (errno == EAGAIN) { - continue; - } - - ERROR ("memcached: Error reading from socket: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - shutdown(fd, SHUT_RDWR); - close (fd); - return -1; - } - buffer_fill += status; - - if (buffer_fill > 3 && buffer[buffer_fill-5] == 'E' && buffer[buffer_fill-4] == 'N' && buffer[buffer_fill-3] == 'D') { - /* we got all the data */ - break; - } - } - - if (buffer_fill >= buffer_size) { - buffer[buffer_size - 1] = '\0'; - WARNING ("memcached: Message from memcached has been truncated."); - } else if (buffer_fill == 0) { - WARNING ("memcached: Peer has unexpectedly shut down the socket. " - "Buffer: `%s'", buffer); - shutdown(fd, SHUT_RDWR); - close(fd); - return -1; - } - - shutdown(fd, SHUT_RDWR); - close(fd); - return 0; + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_STREAM; + ai_hints.ai_protocol = 0; + + host = (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST; + port = (st->port != NULL) ? st->port : MEMCACHED_DEF_PORT; + + ai_list = NULL; + status = getaddrinfo (host, port, &ai_hints, &ai_list); + if (status != 0) + { + char errbuf[1024]; + ERROR ("memcached plugin: memcached_connect_inet: " + "getaddrinfo(%s,%s) failed: %s", + host, port, + (status == EAI_SYSTEM) + ? sstrerror (errno, errbuf, sizeof (errbuf)) + : gai_strerror (status)); + return (-1); + } + + for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + /* create our socket descriptor */ + fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); + if (fd < 0) + { + char errbuf[1024]; + WARNING ("memcached plugin: memcached_connect_inet: " + "socket(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + continue; + } + + /* connect to the memcached daemon */ + status = (int) connect (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + shutdown (fd, SHUT_RDWR); + close (fd); + fd = -1; + continue; + } + + /* A socket could be opened and connecting succeeded. We're done. */ + break; + } + + freeaddrinfo (ai_list); + return (fd); +} /* int memcached_connect_inet */ + +static int memcached_connect (memcached_t *st) +{ + if (st->socket != NULL) + return (memcached_connect_unix (st)); + else + return (memcached_connect_inet (st)); } -/* }}} */ -static int memcached_config (const char *key, const char *value) /* {{{ */ +static int memcached_query_daemon (char *buffer, size_t buffer_size, memcached_t *st) { - if (strcasecmp (key, "Socket") == 0) { - if (memcached_socket != NULL) { - free (memcached_socket); - } - memcached_socket = strdup (value); - } else if (strcasecmp (key, "Host") == 0) { - if (memcached_host != NULL) { - free (memcached_host); - } - memcached_host = strdup (value); - } else if (strcasecmp (key, "Port") == 0) { - int port = (int) (atof (value)); - if ((port > 0) && (port <= 65535)) { - ssnprintf (memcached_port, sizeof (memcached_port), "%i", port); - } else { - sstrncpy (memcached_port, value, sizeof (memcached_port)); - } - } else { - return -1; - } - - return 0; + int fd = -1; + int status; + size_t buffer_fill; + + fd = memcached_connect (st); + if (fd < 0) { + ERROR ("memcached plugin: Instance \"%s\" could not connect to daemon.", + st->name); + return -1; + } + + status = (int) swrite (fd, "stats\r\n", strlen ("stats\r\n")); + if (status != 0) + { + char errbuf[1024]; + ERROR ("memcached plugin: write(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + shutdown(fd, SHUT_RDWR); + close (fd); + return (-1); + } + + /* receive data from the memcached daemon */ + memset (buffer, 0, buffer_size); + + buffer_fill = 0; + while ((status = (int) recv (fd, buffer + buffer_fill, + buffer_size - buffer_fill, /* flags = */ 0)) != 0) + { + char const end_token[5] = {'E', 'N', 'D', '\r', '\n'}; + if (status < 0) + { + char errbuf[1024]; + + if ((errno == EAGAIN) || (errno == EINTR)) + continue; + + ERROR ("memcached: Error reading from socket: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + shutdown(fd, SHUT_RDWR); + close (fd); + return (-1); + } + + buffer_fill += (size_t) status; + if (buffer_fill > buffer_size) + { + buffer_fill = buffer_size; + WARNING ("memcached plugin: Message was truncated."); + break; + } + + /* If buffer ends in end_token, we have all the data. */ + if (memcmp (buffer + buffer_fill - sizeof (end_token), + end_token, sizeof (end_token)) == 0) + break; + } /* while (recv) */ + + status = 0; + if (buffer_fill == 0) + { + WARNING ("memcached plugin: No data returned by memcached."); + status = -1; + } + + shutdown(fd, SHUT_RDWR); + close(fd); + return (status); +} /* int memcached_query_daemon */ + +static void memcached_init_vl (value_list_t *vl, memcached_t const *st) +{ + sstrncpy (vl->plugin, "memcached", sizeof (vl->plugin)); + if (strcmp (st->name, "__legacy__") == 0) /* legacy mode */ + { + sstrncpy (vl->host, hostname_g, sizeof (vl->host)); + } + else + { + if (st->socket != NULL) + sstrncpy (vl->host, hostname_g, sizeof (vl->host)); + else + sstrncpy (vl->host, + (st->host != NULL) ? st->host : MEMCACHED_DEF_HOST, + sizeof (vl->host)); + sstrncpy (vl->plugin_instance, st->name, sizeof (vl->plugin_instance)); + } } -/* }}} */ static void submit_derive (const char *type, const char *type_inst, - derive_t value) /* {{{ */ + derive_t value, memcached_t *st) { - value_t values[1]; - value_list_t vl = VALUE_LIST_INIT; + value_t values[1]; + value_list_t vl = VALUE_LIST_INIT; + memcached_init_vl (&vl, st); - values[0].derive = value; + values[0].derive = value; - vl.values = values; - vl.values_len = 1; - sstrncpy (vl.host, hostname_g, sizeof (vl.host)); - sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin)); - sstrncpy (vl.type, type, sizeof (vl.type)); - if (type_inst != NULL) - sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance)); + vl.values = values; + vl.values_len = 1; + sstrncpy (vl.type, type, sizeof (vl.type)); + if (type_inst != NULL) + sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance)); - plugin_dispatch_values (&vl); -} /* void memcached_submit_cmd */ -/* }}} */ + plugin_dispatch_values (&vl); +} static void submit_derive2 (const char *type, const char *type_inst, - derive_t value0, derive_t value1) /* {{{ */ + derive_t value0, derive_t value1, memcached_t *st) { - value_t values[2]; - value_list_t vl = VALUE_LIST_INIT; + value_t values[2]; + value_list_t vl = VALUE_LIST_INIT; + memcached_init_vl (&vl, st); - values[0].derive = value0; - values[1].derive = value1; + values[0].derive = value0; + values[1].derive = value1; - vl.values = values; - vl.values_len = 2; - sstrncpy (vl.host, hostname_g, sizeof (vl.host)); - sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin)); - sstrncpy (vl.type, type, sizeof (vl.type)); - if (type_inst != NULL) - sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance)); + vl.values = values; + vl.values_len = 2; + sstrncpy (vl.type, type, sizeof (vl.type)); + if (type_inst != NULL) + sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance)); - plugin_dispatch_values (&vl); -} /* void memcached_submit_cmd */ -/* }}} */ + plugin_dispatch_values (&vl); +} static void submit_gauge (const char *type, const char *type_inst, - gauge_t value) /* {{{ */ + gauge_t value, memcached_t *st) { - value_t values[1]; - value_list_t vl = VALUE_LIST_INIT; + value_t values[1]; + value_list_t vl = VALUE_LIST_INIT; + memcached_init_vl (&vl, st); - values[0].gauge = value; + values[0].gauge = value; - vl.values = values; - vl.values_len = 1; - sstrncpy (vl.host, hostname_g, sizeof (vl.host)); - sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin)); - sstrncpy (vl.type, type, sizeof (vl.type)); - if (type_inst != NULL) - sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance)); + vl.values = values; + vl.values_len = 1; + sstrncpy (vl.type, type, sizeof (vl.type)); + if (type_inst != NULL) + sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance)); - plugin_dispatch_values (&vl); + plugin_dispatch_values (&vl); } -/* }}} */ static void submit_gauge2 (const char *type, const char *type_inst, - gauge_t value0, gauge_t value1) /* {{{ */ + gauge_t value0, gauge_t value1, memcached_t *st) { - value_t values[2]; - value_list_t vl = VALUE_LIST_INIT; + value_t values[2]; + value_list_t vl = VALUE_LIST_INIT; + memcached_init_vl (&vl, st); - values[0].gauge = value0; - values[1].gauge = value1; + values[0].gauge = value0; + values[1].gauge = value1; - vl.values = values; - vl.values_len = 2; - sstrncpy (vl.host, hostname_g, sizeof (vl.host)); - sstrncpy (vl.plugin, "memcached", sizeof (vl.plugin)); - sstrncpy (vl.type, type, sizeof (vl.type)); - if (type_inst != NULL) - sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance)); + vl.values = values; + vl.values_len = 2; + sstrncpy (vl.type, type, sizeof (vl.type)); + if (type_inst != NULL) + sstrncpy (vl.type_instance, type_inst, sizeof (vl.type_instance)); - plugin_dispatch_values (&vl); + plugin_dispatch_values (&vl); } -/* }}} */ -static int memcached_read (void) /* {{{ */ +static int memcached_read (user_data_t *user_data) { - char buf[4096]; - char *fields[3]; - char *ptr; - char *line; - char *saveptr; - int fields_num; - - gauge_t bytes_used = NAN; - gauge_t bytes_total = NAN; - gauge_t hits = NAN; - gauge_t gets = NAN; - derive_t rusage_user = 0; - derive_t rusage_syst = 0; - derive_t octets_rx = 0; - derive_t octets_tx = 0; - - /* get data from daemon */ - if (memcached_query_daemon (buf, sizeof (buf)) < 0) { - return -1; - } + char buf[4096]; + char *fields[3]; + char *ptr; + char *line; + char *saveptr; + int fields_num; + + gauge_t bytes_used = NAN; + gauge_t bytes_total = NAN; + gauge_t hits = NAN; + gauge_t gets = NAN; + derive_t rusage_user = 0; + derive_t rusage_syst = 0; + derive_t octets_rx = 0; + derive_t octets_tx = 0; + + memcached_t *st; + st = user_data->data; + + /* get data from daemon */ + if (memcached_query_daemon (buf, sizeof (buf), st) < 0) { + return -1; + } #define FIELD_IS(cnst) \ - (((sizeof(cnst) - 1) == name_len) && (strcmp (cnst, fields[1]) == 0)) - - ptr = buf; - saveptr = NULL; - while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) - { - int name_len; - - ptr = NULL; - - fields_num = strsplit(line, fields, 3); - if (fields_num != 3) - continue; - - name_len = strlen(fields[1]); - if (name_len == 0) - continue; - - /* - * For an explanation on these fields please refer to - * - */ - - /* - * CPU time consumed by the memcached process - */ - if (FIELD_IS ("rusage_user")) - { - rusage_user = atoll (fields[2]); - } - else if (FIELD_IS ("rusage_system")) - { - rusage_syst = atoll(fields[2]); - } - - /* - * Number of threads of this instance - */ - else if (FIELD_IS ("threads")) - { - submit_gauge2 ("ps_count", NULL, NAN, atof (fields[2])); - } - - /* - * Number of items stored - */ - else if (FIELD_IS ("curr_items")) - { - submit_gauge ("memcached_items", "current", atof (fields[2])); - } - - /* - * Number of bytes used and available (total - used) - */ - else if (FIELD_IS ("bytes")) - { - bytes_used = atof (fields[2]); - } - else if (FIELD_IS ("limit_maxbytes")) - { - bytes_total = atof(fields[2]); - } - - /* - * Connections - */ - else if (FIELD_IS ("curr_connections")) - { - submit_gauge ("memcached_connections", "current", atof (fields[2])); - } - - /* - * Commands - */ - else if ((name_len > 4) && (strncmp (fields[1], "cmd_", 4) == 0)) - { - const char *name = fields[1] + 4; - submit_derive ("memcached_command", name, atoll (fields[2])); - if (strcmp (name, "get") == 0) - gets = atof (fields[2]); - } - - /* - * Operations on the cache, i. e. cache hits, cache misses and evictions of items - */ - else if (FIELD_IS ("get_hits")) - { - submit_derive ("memcached_ops", "hits", atoll (fields[2])); - hits = atof (fields[2]); - } - else if (FIELD_IS ("get_misses")) - { - submit_derive ("memcached_ops", "misses", atoll (fields[2])); - } - else if (FIELD_IS ("evictions")) - { - submit_derive ("memcached_ops", "evictions", atoll (fields[2])); - } - - /* - * Network traffic - */ - else if (FIELD_IS ("bytes_read")) - { - octets_rx = atoll (fields[2]); - } - else if (FIELD_IS ("bytes_written")) - { - octets_tx = atoll (fields[2]); - } - } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */ - - if (!isnan (bytes_used) && !isnan (bytes_total) && (bytes_used <= bytes_total)) - submit_gauge2 ("df", "cache", bytes_used, bytes_total - bytes_used); - - if ((rusage_user != 0) || (rusage_syst != 0)) - submit_derive2 ("ps_cputime", NULL, rusage_user, rusage_syst); - - if ((octets_rx != 0) || (octets_tx != 0)) - submit_derive2 ("memcached_octets", NULL, octets_rx, octets_tx); - - if (!isnan (gets) && !isnan (hits)) - { - gauge_t rate = NAN; - - if (gets != 0.0) - rate = 100.0 * hits / gets; - - submit_gauge ("percent", "hitratio", rate); - } - - return 0; + (((sizeof(cnst) - 1) == name_len) && (strcmp (cnst, fields[1]) == 0)) + + ptr = buf; + saveptr = NULL; + while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) + { + int name_len; + + ptr = NULL; + + fields_num = strsplit(line, fields, 3); + if (fields_num != 3) + continue; + + name_len = strlen(fields[1]); + if (name_len == 0) + continue; + + /* + * For an explanation on these fields please refer to + * + */ + + /* + * CPU time consumed by the memcached process + */ + if (FIELD_IS ("rusage_user")) + { + rusage_user = atoll (fields[2]); + } + else if (FIELD_IS ("rusage_system")) + { + rusage_syst = atoll(fields[2]); + } + + /* + * Number of threads of this instance + */ + else if (FIELD_IS ("threads")) + { + submit_gauge2 ("ps_count", NULL, NAN, atof (fields[2]), st); + } + + /* + * Number of items stored + */ + else if (FIELD_IS ("curr_items")) + { + submit_gauge ("memcached_items", "current", atof (fields[2]), st); + } + + /* + * Number of bytes used and available (total - used) + */ + else if (FIELD_IS ("bytes")) + { + bytes_used = atof (fields[2]); + } + else if (FIELD_IS ("limit_maxbytes")) + { + bytes_total = atof(fields[2]); + } + + /* + * Connections + */ + else if (FIELD_IS ("curr_connections")) + { + submit_gauge ("memcached_connections", "current", atof (fields[2]), st); + } + + /* + * Commands + */ + else if ((name_len > 4) && (strncmp (fields[1], "cmd_", 4) == 0)) + { + const char *name = fields[1] + 4; + submit_derive ("memcached_command", name, atoll (fields[2]), st); + if (strcmp (name, "get") == 0) + gets = atof (fields[2]); + } + + /* + * Operations on the cache, i. e. cache hits, cache misses and evictions of items + */ + else if (FIELD_IS ("get_hits")) + { + submit_derive ("memcached_ops", "hits", atoll (fields[2]), st); + hits = atof (fields[2]); + } + else if (FIELD_IS ("get_misses")) + { + submit_derive ("memcached_ops", "misses", atoll (fields[2]), st); + } + else if (FIELD_IS ("evictions")) + { + submit_derive ("memcached_ops", "evictions", atoll (fields[2]), st); + } + + /* + * Network traffic + */ + else if (FIELD_IS ("bytes_read")) + { + octets_rx = atoll (fields[2]); + } + else if (FIELD_IS ("bytes_written")) + { + octets_tx = atoll (fields[2]); + } + } /* while ((line = strtok_r (ptr, "\n\r", &saveptr)) != NULL) */ + + if (!isnan (bytes_used) && !isnan (bytes_total) && (bytes_used <= bytes_total)) + submit_gauge2 ("df", "cache", bytes_used, bytes_total - bytes_used, st); + + if ((rusage_user != 0) || (rusage_syst != 0)) + submit_derive2 ("ps_cputime", NULL, rusage_user, rusage_syst, st); + + if ((octets_rx != 0) || (octets_tx != 0)) + submit_derive2 ("memcached_octets", NULL, octets_rx, octets_tx, st); + + if (!isnan (gets) && !isnan (hits)) + { + gauge_t rate = NAN; + + if (gets != 0.0) + rate = 100.0 * hits / gets; + + submit_gauge ("percent", "hitratio", rate, st); + } + + return 0; +} /* int memcached_read */ + +static int memcached_add_read_callback (memcached_t *st) +{ + user_data_t ud; + char callback_name[3*DATA_MAX_NAME_LEN]; + int status; + + memset (&ud, 0, sizeof (ud)); + ud.data = st; + ud.free_func = (void *) memcached_free; + + assert (st->name != NULL); + ssnprintf (callback_name, sizeof (callback_name), "memcached/%s", st->name); + + status = plugin_register_complex_read (/* group = */ "memcached", + /* name = */ callback_name, + /* callback = */ memcached_read, + /* interval = */ NULL, + /* user_data = */ &ud); + return (status); +} /* int memcached_add_read_callback */ + +/* Configuration handling functiions + * + * + * Host foo.zomg.com + * Port "1234" + * + * + */ +static int config_add_instance(oconfig_item_t *ci) +{ + memcached_t *st; + int i; + int status = 0; + + /* Disable automatic generation of default instance in the init callback. */ + memcached_have_instances = 1; + + st = malloc (sizeof (*st)); + if (st == NULL) + { + ERROR ("memcached plugin: malloc failed."); + return (-1); + } + + memset (st, 0, sizeof (*st)); + st->name = NULL; + st->socket = NULL; + st->host = NULL; + st->port = NULL; + + if (strcasecmp (ci->key, "Plugin") == 0) /* default instance */ + st->name = sstrdup ("__legacy__"); + else /* block */ + status = cf_util_get_string (ci, &st->name); + if (status != 0) + { + sfree (st); + return (status); + } + assert (st->name != NULL); + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Socket", child->key) == 0) + status = cf_util_get_string (child, &st->socket); + else if (strcasecmp ("Host", child->key) == 0) + status = cf_util_get_string (child, &st->host); + else if (strcasecmp ("Port", child->key) == 0) + status = cf_util_get_service (child, &st->port); + else + { + WARNING ("memcached plugin: Option `%s' not allowed here.", + child->key); + status = -1; + } + + if (status != 0) + break; + } + + if (status == 0) + status = memcached_add_read_callback (st); + + if (status != 0) + { + memcached_free(st); + return (-1); + } + + return (0); } -/* }}} */ -void module_register (void) /* {{{ */ +static int memcached_config (oconfig_item_t *ci) { - plugin_register_config ("memcached", memcached_config, config_keys, config_keys_num); - plugin_register_read ("memcached", memcached_read); + int status = 0; + _Bool have_instance_block = 0; + int i; + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Instance", child->key) == 0) + { + config_add_instance (child); + have_instance_block = 1; + } + else if (!have_instance_block) + { + /* Non-instance option: Assume legacy configuration (without + * blocks) and call config_add_instance() with the block. */ + return (config_add_instance (ci)); + } + else + WARNING ("memcached plugin: The configuration option " + "\"%s\" is not allowed here. Did you " + "forget to add an block " + "around the configuration?", + child->key); + } /* for (ci->children) */ + + return (status); } -/* }}} */ - -/* - * Local variables: - * tab-width: 4 - * c-basic-offset: 4 - * End: - * vim600: sw=4 ts=4 fdm=marker noexpandtab - * vim<600: sw=4 ts=4 noexpandtab - */ +static int memcached_init (void) +{ + memcached_t *st; + int status; + + if (memcached_have_instances) + return (0); + + /* No instances were configured, lets start a default instance. */ + st = malloc (sizeof (*st)); + if (st == NULL) + return (ENOMEM); + memset (st, 0, sizeof (*st)); + st->name = sstrdup ("__legacy__"); + st->socket = NULL; + st->host = NULL; + st->port = NULL; + + status = memcached_add_read_callback (st); + if (status == 0) + memcached_have_instances = 1; + else + memcached_free (st); + + return (status); +} /* int memcached_init */ + +void module_register (void) +{ + plugin_register_complex_config ("memcached", memcached_config); + plugin_register_init ("memcached", memcached_init); +} diff --git a/src/oracle.c b/src/oracle.c index 80ae699c..ab0812b7 100644 --- a/src/oracle.c +++ b/src/oracle.c @@ -59,6 +59,7 @@ struct o_database_s { char *name; + char *host; char *connect_id; char *username; char *password; @@ -183,33 +184,6 @@ static void o_database_free (o_database_t *db) /* {{{ */ * */ -static int o_config_set_string (char **ret_string, /* {{{ */ - oconfig_item_t *ci) -{ - char *string; - - if ((ci->values_num != 1) - || (ci->values[0].type != OCONFIG_TYPE_STRING)) - { - WARNING ("oracle plugin: The `%s' config option " - "needs exactly one string argument.", ci->key); - return (-1); - } - - string = strdup (ci->values[0].value.string); - if (string == NULL) - { - ERROR ("oracle plugin: strdup failed."); - return (-1); - } - - if (*ret_string != NULL) - free (*ret_string); - *ret_string = string; - - return (0); -} /* }}} int o_config_set_string */ - static int o_config_add_database (oconfig_item_t *ci) /* {{{ */ { o_database_t *db; @@ -231,8 +205,13 @@ static int o_config_add_database (oconfig_item_t *ci) /* {{{ */ return (-1); } memset (db, 0, sizeof (*db)); + db->name = NULL; + db->host = NULL; + db->connect_id = NULL; + db->username = NULL; + db->password = NULL; - status = o_config_set_string (&db->name, ci); + status = cf_util_get_string (ci, &db->name); if (status != 0) { sfree (db); @@ -245,11 +224,13 @@ static int o_config_add_database (oconfig_item_t *ci) /* {{{ */ oconfig_item_t *child = ci->children + i; if (strcasecmp ("ConnectID", child->key) == 0) - status = o_config_set_string (&db->connect_id, child); + status = cf_util_get_string (child, &db->connect_id); + else if (strcasecmp ("Host", child->key) == 0) + status = cf_util_get_string (child, &db->host); else if (strcasecmp ("Username", child->key) == 0) - status = o_config_set_string (&db->username, child); + status = cf_util_get_string (child, &db->username); else if (strcasecmp ("Password", child->key) == 0) - status = o_config_set_string (&db->password, child); + status = cf_util_get_string (child, &db->password); else if (strcasecmp ("Query", child->key) == 0) status = udb_query_pick_from_list (child, queries, queries_num, &db->queries, &db->queries_num); @@ -613,7 +594,8 @@ static int o_read_database_query (o_database_t *db, /* {{{ */ } /* for (j = 1; j <= param_counter; j++) */ /* }}} End of the ``define'' stuff. */ - status = udb_query_prepare_result (q, prep_area, hostname_g, + status = udb_query_prepare_result (q, prep_area, + (db->host != NULL) ? db->host : hostname_g, /* plugin = */ "oracle", db->name, column_names, column_num, /* interval = */ 0); if (status != 0) diff --git a/src/redis.c b/src/redis.c index 86062d9c..439cf4b8 100644 --- a/src/redis.c +++ b/src/redis.c @@ -54,6 +54,7 @@ struct redis_node_s { char name[MAX_REDIS_NODE_NAME]; char host[HOST_NAME_MAX]; + char passwd[HOST_NAME_MAX]; int port; int timeout; @@ -136,6 +137,8 @@ static int redis_config_node (oconfig_item_t *ci) /* {{{ */ } else if (strcasecmp ("Timeout", option->key) == 0) status = cf_util_get_int (option, &rn.timeout); + else if (strcasecmp ("Password", option->key) == 0) + status = cf_util_get_string_buffer (option, rn.passwd, sizeof (rn.passwd)); else WARNING ("redis plugin: Option `%s' not allowed inside a `Node' " "block. I'll ignore this option.", option->key); @@ -255,6 +258,18 @@ static int redis_read (void) /* {{{ */ continue; } + if (strlen (rn->passwd) > 0) + { + DEBUG ("redis plugin: authenticanting node `%s' passwd(%s).", rn->name, rn->passwd); + status = credis_auth(rh, rn->passwd); + if (status != 0) + { + WARNING ("redis plugin: unable to authenticate on node `%s'.", rn->name); + credis_close (rh); + continue; + } + } + memset (&info, 0, sizeof (info)); status = credis_info (rh, &info); if (status != 0) diff --git a/src/tcpconns.c b/src/tcpconns.c index 3c8fc728..4d90c41a 100644 --- a/src/tcpconns.c +++ b/src/tcpconns.c @@ -70,6 +70,13 @@ #endif #if KERNEL_LINUX +# include +/* sys/socket.h is necessary to compile when using netlink on older systems. */ +# include +# include +# include +# include +# include /* #endif KERNEL_LINUX */ #elif HAVE_SYSCTLBYNAME @@ -130,6 +137,11 @@ #endif /* KERNEL_AIX */ #if KERNEL_LINUX +struct nlreq { + struct nlmsghdr nlh; + struct inet_diag_req r; +}; + static const char *tcp_state[] = { "", /* 0 */ @@ -263,6 +275,17 @@ static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); static int port_collect_listening = 0; static port_entry_t *port_list_head = NULL; +static uint32_t sequence_number = 0; + +#if KERNEL_LINUX +enum +{ + SRC_DUNNO, + SRC_NETLINK, + SRC_PROC +} linux_source = SRC_DUNNO; +#endif + static void conn_submit_port_entry (port_entry_t *pe) { value_t values[1]; @@ -419,6 +442,140 @@ static int conn_handle_ports (uint16_t port_local, uint16_t port_remote, uint8_t } /* int conn_handle_ports */ #if KERNEL_LINUX +/* Returns zero on success, less than zero on socket error and greater than + * zero on other errors. */ +static int conn_read_netlink (void) +{ + int fd; + struct sockaddr_nl nladdr; + struct nlreq req; + struct msghdr msg; + struct iovec iov; + struct inet_diag_msg *r; + char buf[8192]; + + /* If this fails, it's likely a permission problem. We'll fall back to + * reading this information from files below. */ + fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_INET_DIAG); + if (fd < 0) + { + ERROR ("tcpconns plugin: conn_read_netlink: socket(AF_NETLINK, SOCK_RAW, " + "NETLINK_INET_DIAG) failed: %s", + sstrerror (errno, buf, sizeof (buf))); + return (-1); + } + + memset(&nladdr, 0, sizeof(nladdr)); + nladdr.nl_family = AF_NETLINK; + + memset(&req, 0, sizeof(req)); + req.nlh.nlmsg_len = sizeof(req); + req.nlh.nlmsg_type = TCPDIAG_GETSOCK; + /* NLM_F_ROOT: return the complete table instead of a single entry. + * NLM_F_MATCH: return all entries matching criteria (not implemented) + * NLM_F_REQUEST: must be set on all request messages */ + req.nlh.nlmsg_flags = NLM_F_ROOT | NLM_F_MATCH | NLM_F_REQUEST; + req.nlh.nlmsg_pid = 0; + /* The sequence_number is used to track our messages. Since netlink is not + * reliable, we don't want to end up with a corrupt or incomplete old + * message in case the system is/was out of memory. */ + req.nlh.nlmsg_seq = ++sequence_number; + req.r.idiag_family = AF_INET; + req.r.idiag_states = 0xfff; + req.r.idiag_ext = 0; + + memset(&iov, 0, sizeof(iov)); + iov.iov_base = &req; + iov.iov_len = sizeof(req); + + memset(&msg, 0, sizeof(msg)); + msg.msg_name = (void*)&nladdr; + msg.msg_namelen = sizeof(nladdr); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + if (sendmsg (fd, &msg, 0) < 0) + { + ERROR ("tcpconns plugin: conn_read_netlink: sendmsg(2) failed: %s", + sstrerror (errno, buf, sizeof (buf))); + close (fd); + return (-1); + } + + iov.iov_base = buf; + iov.iov_len = sizeof(buf); + + while (1) + { + int status; + struct nlmsghdr *h; + + memset(&msg, 0, sizeof(msg)); + msg.msg_name = (void*)&nladdr; + msg.msg_namelen = sizeof(nladdr); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + status = recvmsg(fd, (void *) &msg, /* flags = */ 0); + if (status < 0) + { + if ((errno == EINTR) || (errno == EAGAIN)) + continue; + + ERROR ("tcpconns plugin: conn_read_netlink: recvmsg(2) failed: %s", + sstrerror (errno, buf, sizeof (buf))); + close (fd); + return (-1); + } + else if (status == 0) + { + close (fd); + DEBUG ("tcpconns plugin: conn_read_netlink: Unexpected zero-sized " + "reply from netlink socket."); + return (0); + } + + h = (struct nlmsghdr*)buf; + while (NLMSG_OK(h, status)) + { + if (h->nlmsg_seq != sequence_number) + { + h = NLMSG_NEXT(h, status); + continue; + } + + if (h->nlmsg_type == NLMSG_DONE) + { + close (fd); + return (0); + } + else if (h->nlmsg_type == NLMSG_ERROR) + { + struct nlmsgerr *msg_error; + + msg_error = NLMSG_DATA(h); + WARNING ("tcpconns plugin: conn_read_netlink: Received error %i.", + msg_error->error); + + close (fd); + return (1); + } + + r = NLMSG_DATA(h); + + /* This code does not (need to) distinguish between IPv4 and IPv6. */ + conn_handle_ports (ntohs(r->id.idiag_sport), + ntohs(r->id.idiag_dport), + r->idiag_state); + + h = NLMSG_NEXT(h, status); + } /* while (NLMSG_OK) */ + } /* while (1) */ + + /* Not reached because the while() loop above handles the exit condition. */ + return (0); +} /* int conn_read_netlink */ + static int conn_handle_line (char *buffer) { char *fields[32]; @@ -553,26 +710,55 @@ static int conn_init (void) static int conn_read (void) { - int errors_num = 0; + int status; conn_reset_port_entry (); - if (conn_read_file ("/proc/net/tcp") != 0) - errors_num++; - if (conn_read_file ("/proc/net/tcp6") != 0) - errors_num++; - - if (errors_num < 2) + if (linux_source == SRC_NETLINK) { - conn_submit_all (); + status = conn_read_netlink (); } - else + else if (linux_source == SRC_PROC) { - ERROR ("tcpconns plugin: Neither /proc/net/tcp nor /proc/net/tcp6 " - "coult be read."); - return (-1); + int errors_num = 0; + + if (conn_read_file ("/proc/net/tcp") != 0) + errors_num++; + if (conn_read_file ("/proc/net/tcp6") != 0) + errors_num++; + + if (errors_num < 2) + status = 0; + else + status = ENOENT; + } + else /* if (linux_source == SRC_DUNNO) */ + { + /* Try to use netlink for getting this data, it is _much_ faster on systems + * with a large amount of connections. */ + status = conn_read_netlink (); + if (status == 0) + { + INFO ("tcpconns plugin: Reading from netlink succeeded. " + "Will use the netlink method from now on."); + linux_source = SRC_NETLINK; + } + else + { + INFO ("tcpconns plugin: Reading from netlink failed. " + "Will read from /proc from now on."); + linux_source = SRC_PROC; + + /* return success here to avoid the "plugin failed" message. */ + return (0); + } } + if (status == 0) + conn_submit_all (); + else + return (status); + return (0); } /* int conn_read */ /* #endif KERNEL_LINUX */ diff --git a/src/utils_format_graphite.c b/src/utils_format_graphite.c new file mode 100644 index 00000000..49a59c50 --- /dev/null +++ b/src/utils_format_graphite.c @@ -0,0 +1,227 @@ +/** + * collectd - src/utils_format_graphite.c + * Copyright (C) 2012 Thomas Meson + * Copyright (C) 2012 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Thomas Meson + * Florian octo Forster + **/ + +#include "collectd.h" +#include "plugin.h" +#include "common.h" + +#include "utils_cache.h" +#include "utils_format_json.h" +#include "utils_parse_option.h" + +/* Utils functions to format data sets in graphite format. + * Largely taken from write_graphite.c as it remains the same formatting */ + +static int gr_format_values (char *ret, size_t ret_len, + int ds_num, const data_set_t *ds, const value_list_t *vl) +{ + size_t offset = 0; + int status; + + assert (0 == strcmp (ds->type, vl->type)); + + memset (ret, 0, ret_len); + +#define BUFFER_ADD(...) do { \ + status = ssnprintf (ret + offset, ret_len - offset, \ + __VA_ARGS__); \ + if (status < 1) \ + { \ + return (-1); \ + } \ + else if (((size_t) status) >= (ret_len - offset)) \ + { \ + return (-1); \ + } \ + else \ + offset += ((size_t) status); \ +} while (0) + + if (ds->ds[ds_num].type == DS_TYPE_GAUGE) + BUFFER_ADD ("%f", vl->values[ds_num].gauge); + else if (ds->ds[ds_num].type == DS_TYPE_COUNTER) + BUFFER_ADD ("%llu", vl->values[ds_num].counter); + else if (ds->ds[ds_num].type == DS_TYPE_DERIVE) + BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive); + else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE) + BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute); + else + { + ERROR ("gr_format_values plugin: Unknown data source type: %i", + ds->ds[ds_num].type); + return (-1); + } + +#undef BUFFER_ADD + + return (0); +} + +static void gr_copy_escape_part (char *dst, const char *src, size_t dst_len, + char escape_char) +{ + size_t i; + + memset (dst, 0, dst_len); + + if (src == NULL) + return; + + for (i = 0; i < dst_len; i++) + { + if (src[i] == 0) + { + dst[i] = 0; + break; + } + + if ((src[i] == '.') + || isspace ((int) src[i]) + || iscntrl ((int) src[i])) + dst[i] = escape_char; + else + dst[i] = src[i]; + } +} + +static int gr_format_name (char *ret, int ret_len, + const value_list_t *vl, + const char *ds_name, + char *prefix, + char *postfix, + char escape_char) +{ + char n_host[DATA_MAX_NAME_LEN]; + char n_plugin[DATA_MAX_NAME_LEN]; + char n_plugin_instance[DATA_MAX_NAME_LEN]; + char n_type[DATA_MAX_NAME_LEN]; + char n_type_instance[DATA_MAX_NAME_LEN]; + + char tmp_plugin[2 * DATA_MAX_NAME_LEN + 1]; + char tmp_type[2 * DATA_MAX_NAME_LEN + 1]; + + if (prefix == NULL) + prefix = ""; + + if (postfix == NULL) + postfix = ""; + + gr_copy_escape_part (n_host, vl->host, + sizeof (n_host), escape_char); + gr_copy_escape_part (n_plugin, vl->plugin, + sizeof (n_plugin), escape_char); + gr_copy_escape_part (n_plugin_instance, vl->plugin_instance, + sizeof (n_plugin_instance), escape_char); + gr_copy_escape_part (n_type, vl->type, + sizeof (n_type), escape_char); + gr_copy_escape_part (n_type_instance, vl->type_instance, + sizeof (n_type_instance), escape_char); + + if (n_plugin_instance[0] != '\0') + ssnprintf (tmp_plugin, sizeof (tmp_plugin), "%s%c%s", + n_plugin, + '-', + n_plugin_instance); + else + sstrncpy (tmp_plugin, n_plugin, sizeof (tmp_plugin)); + + if (n_type_instance[0] != '\0') + ssnprintf (tmp_type, sizeof (tmp_type), "%s%c%s", + n_type, + '-', + n_type_instance); + else + sstrncpy (tmp_type, n_type, sizeof (tmp_type)); + + if (ds_name != NULL) + ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s", + prefix, n_host, postfix, tmp_plugin, tmp_type, ds_name); + else + ssnprintf (ret, ret_len, "%s%s%s.%s.%s", + prefix, n_host, postfix, tmp_plugin, tmp_type); + + return (0); +} + +int format_graphite (char *buffer, size_t buffer_size, + const data_set_t *ds, const value_list_t *vl, char *prefix, + char *postfix, char escape_char) +{ + int status = 0; + int i; + int buffer_pos = 0; + + for (i = 0; i < ds->ds_num; i++) + { + const char *ds_name = NULL; + char key[10*DATA_MAX_NAME_LEN]; + char values[512]; + size_t message_len; + char message[1024]; + + ds_name = ds->ds[i].name; + + /* Copy the identifier to `key' and escape it. */ + status = gr_format_name (key, sizeof (key), vl, ds_name, + prefix, postfix, escape_char); + if (status != 0) + { + ERROR ("amqp plugin: error with gr_format_name"); + return (status); + } + + escape_string (key, sizeof (key)); + /* Convert the values to an ASCII representation and put that into + * `values'. */ + status = gr_format_values (values, sizeof (values), i, ds, vl); + if (status != 0) + { + ERROR ("format_graphite: error with gr_format_values"); + return (status); + } + + /* Compute the graphite command */ + message_len = (size_t) ssnprintf (message, sizeof (message), + "%s %s %u\r\n", + key, + values, + (unsigned int) CDTIME_T_TO_TIME_T (vl->time)); + if (message_len >= sizeof (message)) { + ERROR ("format_graphite: message buffer too small: " + "Need %zu bytes.", message_len + 1); + return (-ENOMEM); + } + + /* Append it in case we got multiple data set */ + if ((buffer_pos + message_len) >= buffer_size) + { + ERROR ("format_graphite: target buffer too small"); + return (-ENOMEM); + } + memcpy((void *) (buffer + buffer_pos), message, message_len); + buffer_pos += message_len; + } + return (status); +} /* int format_graphite */ + +/* vim: set sw=2 sts=2 et fdm=marker : */ diff --git a/src/utils_format_graphite.h b/src/utils_format_graphite.h new file mode 100644 index 00000000..a3c4d85c --- /dev/null +++ b/src/utils_format_graphite.h @@ -0,0 +1,33 @@ +/** + * collectd - src/utils_format_graphite.h + * Copyright (C) 2012 Thomas Meson + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Author: + * Thomas Meson + **/ + +#ifndef UTILS_FORMAT_GRAPHITE_H +#define UTILS_FORMAT_GRAPHITE_H 1 + +#include "collectd.h" +#include "plugin.h" + +int format_graphite (char *buffer, + size_t buffer_size, const data_set_t *ds, + const value_list_t *vl, const char *prefix, + const char *postfix, const char escape_char); + +#endif /* UTILS_FORMAT_GRAPHITE_H */ diff --git a/src/utils_format_json.c b/src/utils_format_json.c index 2a5526b2..bbc3dfdb 100644 --- a/src/utils_format_json.c +++ b/src/utils_format_json.c @@ -70,7 +70,7 @@ static int escape_string (char *buffer, size_t buffer_size, /* {{{ */ #undef BUFFER_ADD return (0); -} /* }}} int buffer_add_string */ +} /* }}} int escape_string */ static int values_to_json (char *buffer, size_t buffer_size, /* {{{ */ const data_set_t *ds, const value_list_t *vl, int store_rates) @@ -152,7 +152,7 @@ static int values_to_json (char *buffer, size_t buffer_size, /* {{{ */ } /* }}} int values_to_json */ static int dstypes_to_json (char *buffer, size_t buffer_size, /* {{{ */ - const data_set_t *ds, const value_list_t *vl) + const data_set_t *ds) { size_t offset = 0; int i; @@ -189,7 +189,7 @@ static int dstypes_to_json (char *buffer, size_t buffer_size, /* {{{ */ } /* }}} int dstypes_to_json */ static int dsnames_to_json (char *buffer, size_t buffer_size, /* {{{ */ - const data_set_t *ds, const value_list_t *vl) + const data_set_t *ds) { size_t offset = 0; int i; @@ -225,6 +225,86 @@ static int dsnames_to_json (char *buffer, size_t buffer_size, /* {{{ */ return (0); } /* }}} int dsnames_to_json */ +static int meta_data_to_json (char *buffer, size_t buffer_size, /* {{{ */ + meta_data_t *meta) +{ + size_t offset = 0; + char **keys = NULL; + int keys_num; + int status; + int i; + + memset (buffer, 0, buffer_size); + +#define BUFFER_ADD(...) do { \ + status = ssnprintf (buffer + offset, buffer_size - offset, \ + __VA_ARGS__); \ + if (status < 1) \ + return (-1); \ + else if (((size_t) status) >= (buffer_size - offset)) \ + return (-ENOMEM); \ + else \ + offset += ((size_t) status); \ +} while (0) + + keys_num = meta_data_toc (meta, &keys); + for (i = 0; i < keys_num; ++i) + { + int type; + char *key = keys[i]; + + type = meta_data_type (meta, key); + if (type == MD_TYPE_STRING) + { + char *value = NULL; + if (meta_data_get_string (meta, key, &value) == 0) + { + char temp[512] = ""; + escape_string (temp, sizeof (temp), value); + sfree (value); + BUFFER_ADD (",\"%s\":%s", key, temp); + } + } + else if (type == MD_TYPE_SIGNED_INT) + { + int64_t value = 0; + if (meta_data_get_signed_int (meta, key, &value) == 0) + BUFFER_ADD (",\"%s\":%"PRIi64, key, value); + } + else if (type == MD_TYPE_UNSIGNED_INT) + { + uint64_t value = 0; + if (meta_data_get_unsigned_int (meta, key, &value) == 0) + BUFFER_ADD (",\"%s\":%"PRIu64, key, value); + } + else if (type == MD_TYPE_DOUBLE) + { + double value = 0.0; + if (meta_data_get_double (meta, key, &value) == 0) + BUFFER_ADD (",\"%s\":%f", key, value); + } + else if (type == MD_TYPE_BOOLEAN) + { + _Bool value = 0; + if (meta_data_get_boolean (meta, key, &value) == 0) + BUFFER_ADD (",\"%s\":%s", key, value ? "true" : "false"); + } + + free (key); + } /* for (keys) */ + free (keys); + + if (offset <= 0) + return (ENOENT); + + buffer[0] = '{'; /* replace leading ',' */ + BUFFER_ADD ("}"); + +#undef BUFFER_ADD + + return (0); +} /* int meta_data_to_json */ + static int value_list_to_json (char *buffer, size_t buffer_size, /* {{{ */ const data_set_t *ds, const value_list_t *vl, int store_rates) { @@ -254,12 +334,12 @@ static int value_list_to_json (char *buffer, size_t buffer_size, /* {{{ */ return (status); BUFFER_ADD ("\"values\":%s", temp); - status = dstypes_to_json (temp, sizeof (temp), ds, vl); + status = dstypes_to_json (temp, sizeof (temp), ds); if (status != 0) return (status); BUFFER_ADD (",\"dstypes\":%s", temp); - status = dsnames_to_json (temp, sizeof (temp), ds, vl); + status = dsnames_to_json (temp, sizeof (temp), ds); if (status != 0) return (status); BUFFER_ADD (",\"dsnames\":%s", temp); @@ -280,6 +360,17 @@ static int value_list_to_json (char *buffer, size_t buffer_size, /* {{{ */ BUFFER_ADD_KEYVAL ("type", vl->type); BUFFER_ADD_KEYVAL ("type_instance", vl->type_instance); + if (vl->meta != NULL) + { + char meta_buffer[buffer_size]; + memset (meta_buffer, 0, sizeof (meta_buffer)); + status = meta_data_to_json (meta_buffer, sizeof (meta_buffer), vl->meta); + if (status != 0) + return (status); + + BUFFER_ADD (",\"meta\":%s", meta_buffer); + } /* if (vl->meta != NULL) */ + BUFFER_ADD ("}"); #undef BUFFER_ADD_KEYVAL diff --git a/src/write_graphite.c b/src/write_graphite.c index d6583a75..f3753da2 100644 --- a/src/write_graphite.c +++ b/src/write_graphite.c @@ -47,6 +47,7 @@ #include "utils_cache.h" #include "utils_parse_option.h" +#include "utils_format_graphite.h" /* Folks without pthread will need to disable this plugin. */ #include @@ -284,175 +285,12 @@ static int wg_flush (cdtime_t timeout, return (status); } -static int wg_format_values (char *ret, size_t ret_len, - int ds_num, const data_set_t *ds, const value_list_t *vl, - _Bool store_rates) -{ - size_t offset = 0; - int status; - gauge_t *rates = NULL; - - assert (0 == strcmp (ds->type, vl->type)); - - memset (ret, 0, ret_len); - -#define BUFFER_ADD(...) do { \ - status = ssnprintf (ret + offset, ret_len - offset, \ - __VA_ARGS__); \ - if (status < 1) \ - { \ - sfree (rates); \ - return (-1); \ - } \ - else if (((size_t) status) >= (ret_len - offset)) \ - { \ - sfree (rates); \ - return (-1); \ - } \ - else \ - offset += ((size_t) status); \ -} while (0) - - if (ds->ds[ds_num].type == DS_TYPE_GAUGE) - BUFFER_ADD ("%f", vl->values[ds_num].gauge); - else if (store_rates) - { - if (rates == NULL) - rates = uc_get_rate (ds, vl); - if (rates == NULL) - { - WARNING ("format_values: " - "uc_get_rate failed."); - return (-1); - } - BUFFER_ADD ("%g", rates[ds_num]); - } - else if (ds->ds[ds_num].type == DS_TYPE_COUNTER) - BUFFER_ADD ("%llu", vl->values[ds_num].counter); - else if (ds->ds[ds_num].type == DS_TYPE_DERIVE) - BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive); - else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE) - BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute); - else - { - ERROR ("format_values plugin: Unknown data source type: %i", - ds->ds[ds_num].type); - sfree (rates); - return (-1); - } - -#undef BUFFER_ADD - - sfree (rates); - return (0); -} - -static void wg_copy_escape_part (char *dst, const char *src, size_t dst_len, - char escape_char) -{ - size_t i; - - memset (dst, 0, dst_len); - - if (src == NULL) - return; - - for (i = 0; i < dst_len; i++) - { - if (src[i] == 0) - { - dst[i] = 0; - break; - } - - if ((src[i] == '.') - || isspace ((int) src[i]) - || iscntrl ((int) src[i])) - dst[i] = escape_char; - else - dst[i] = src[i]; - } -} - -static int wg_format_name (char *ret, int ret_len, - const value_list_t *vl, - const struct wg_callback *cb, - const char *ds_name) -{ - char n_host[DATA_MAX_NAME_LEN]; - char n_plugin[DATA_MAX_NAME_LEN]; - char n_plugin_instance[DATA_MAX_NAME_LEN]; - char n_type[DATA_MAX_NAME_LEN]; - char n_type_instance[DATA_MAX_NAME_LEN]; - - char *prefix; - char *postfix; - - char tmp_plugin[2 * DATA_MAX_NAME_LEN + 1]; - char tmp_type[2 * DATA_MAX_NAME_LEN + 1]; - - prefix = cb->prefix; - if (prefix == NULL) - prefix = ""; - - postfix = cb->postfix; - if (postfix == NULL) - postfix = ""; - - wg_copy_escape_part (n_host, vl->host, - sizeof (n_host), cb->escape_char); - wg_copy_escape_part (n_plugin, vl->plugin, - sizeof (n_plugin), cb->escape_char); - wg_copy_escape_part (n_plugin_instance, vl->plugin_instance, - sizeof (n_plugin_instance), cb->escape_char); - wg_copy_escape_part (n_type, vl->type, - sizeof (n_type), cb->escape_char); - wg_copy_escape_part (n_type_instance, vl->type_instance, - sizeof (n_type_instance), cb->escape_char); - - if (n_plugin_instance[0] != '\0') - ssnprintf (tmp_plugin, sizeof (tmp_plugin), "%s%c%s", - n_plugin, - cb->separate_instances ? '.' : '-', - n_plugin_instance); - else - sstrncpy (tmp_plugin, n_plugin, sizeof (tmp_plugin)); - - if (n_type_instance[0] != '\0') - ssnprintf (tmp_type, sizeof (tmp_type), "%s%c%s", - n_type, - cb->separate_instances ? '.' : '-', - n_type_instance); - else - sstrncpy (tmp_type, n_type, sizeof (tmp_type)); - - if (ds_name != NULL) - ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s", - prefix, n_host, postfix, tmp_plugin, tmp_type, ds_name); - else - ssnprintf (ret, ret_len, "%s%s%s.%s.%s", - prefix, n_host, postfix, tmp_plugin, tmp_type); - - return (0); -} - -static int wg_send_message (const char* key, const char* value, - cdtime_t time, struct wg_callback *cb) +static int wg_send_message (char const *message, struct wg_callback *cb) { int status; size_t message_len; - char message[1024]; - - message_len = (size_t) ssnprintf (message, sizeof (message), - "%s %s %u\r\n", - key, - value, - (unsigned int) CDTIME_T_TO_TIME_T (time)); - if (message_len >= sizeof (message)) { - ERROR ("write_graphite plugin: message buffer too small: " - "Need %zu bytes.", message_len + 1); - return (-1); - } + + message_len = strlen (message); pthread_mutex_lock (&cb->send_lock); @@ -502,10 +340,8 @@ static int wg_send_message (const char* key, const char* value, static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, struct wg_callback *cb) { - char key[10*DATA_MAX_NAME_LEN]; - char values[512]; - - int status, i; + char buffer[4096]; + int status; if (0 != strcmp (ds->type, vl->type)) { @@ -514,45 +350,22 @@ static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, return -1; } - for (i = 0; i < ds->ds_num; i++) - { - const char *ds_name = NULL; - - if (cb->always_append_ds || (ds->ds_num > 1)) - ds_name = ds->ds[i].name; - - /* Copy the identifier to `key' and escape it. */ - status = wg_format_name (key, sizeof (key), vl, cb, ds_name); - if (status != 0) - { - ERROR ("write_graphite plugin: error with format_name"); - return (status); - } - - escape_string (key, sizeof (key)); - /* Convert the values to an ASCII representation and put that into - * `values'. */ - status = wg_format_values (values, sizeof (values), i, ds, vl, - cb->store_rates); - if (status != 0) - { - ERROR ("write_graphite plugin: error with " - "wg_format_values"); - return (status); - } + memset (buffer, 0, sizeof (buffer)); + status = format_graphite (buffer, sizeof (buffer), ds, vl, + cb->prefix, cb->postfix, cb->escape_char); + if (status != 0) /* error message has been printed already. */ + return (status); - /* Send the message to graphite */ - status = wg_send_message (key, values, vl->time, cb); - if (status != 0) - { - ERROR ("write_graphite plugin: error with " - "wg_send_message"); - return (status); - } + wg_send_message (buffer, cb); + if (status != 0) + { + ERROR ("write_graphite plugin: wg_send_message failed " + "with status %i.", status); + return (status); } return (0); -} +} /* int wg_write_messages */ static int wg_write (const data_set_t *ds, const value_list_t *vl, user_data_t *user_data)