X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fnetwork.c;h=f379a5c44e82e688371b08fcf2153596fb86374f;hb=82f6ebad0250c0b8cd0cdf7453fe427fd7b38135;hp=f4b87579dfda0c28b91ddb9a121b40103b478bf1;hpb=0d5c879672770e3b8a740727fb223a6febdeaa27;p=collectd.git diff --git a/src/network.c b/src/network.c index f4b87579..f379a5c4 100644 --- a/src/network.c +++ b/src/network.c @@ -1,23 +1,24 @@ /** * collectd - src/network.c - * Copyright (C) 2005-2009 Florian octo Forster + * Copyright (C) 2005-2013 Florian octo Forster * Copyright (C) 2009 Aman Gupta * * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; only version 2 of the License is applicable. + * under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; only version 2.1 of the License is + * applicable. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License for more details. + * Lesser General Public License for more details. * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * * Authors: - * Florian octo Forster + * Florian octo Forster * Aman Gupta **/ @@ -30,6 +31,7 @@ #include "utils_fbhash.h" #include "utils_avltree.h" #include "utils_cache.h" +#include "utils_complain.h" #include "network.h" @@ -51,9 +53,29 @@ #if HAVE_POLL_H # include #endif +#if HAVE_NET_IF_H +# include +#endif #if HAVE_LIBGCRYPT +# include +# if defined __APPLE__ +/* default xcode compiler throws warnings even when deprecated functionality + * is not used. -Werror breaks the build because of erroneous warnings. + * http://stackoverflow.com/questions/10556299/compiler-warnings-with-libgcrypt-v1-5-0/12830209#12830209 + */ +# pragma GCC diagnostic ignored "-Wdeprecated-declarations" +# endif +/* FreeBSD's copy of libgcrypt extends the existing GCRYPT_NO_DEPRECATED + * to properly hide all deprecated functionality. + * http://svnweb.freebsd.org/ports/head/security/libgcrypt/files/patch-src__gcrypt.h.in + */ +# define GCRYPT_NO_DEPRECATED # include +# if defined __APPLE__ +/* Re enable deprecation warnings */ +# pragma GCC diagnostic warning "-Wdeprecated-declarations" +# endif GCRY_THREAD_OPTION_PTHREAD_IMPL; #endif @@ -117,6 +139,7 @@ typedef struct sockent char *node; char *service; + int interface; union { @@ -255,6 +278,7 @@ typedef struct receive_list_entry_s receive_list_entry_t; static int network_config_ttl = 0; static size_t network_config_packet_size = 1024; static int network_config_forward = 0; +static int network_config_stats = 0; static sockent_t *sending_sockets = NULL; @@ -262,6 +286,7 @@ static receive_list_entry_t *receive_list_head = NULL; static receive_list_entry_t *receive_list_tail = NULL; static pthread_mutex_t receive_list_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t receive_list_cond = PTHREAD_COND_INITIALIZER; +static uint64_t receive_list_length = 0; static sockent_t *listen_sockets = NULL; static struct pollfd *listen_sockets_pollfd = NULL; @@ -282,6 +307,22 @@ static int send_buffer_fill; static value_list_t send_buffer_vl = VALUE_LIST_STATIC; static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER; +/* XXX: These counters are incremented from one place only. The spot in which + * the values are incremented is either only reachable by one thread (the + * dispatch thread, for example) or locked by some lock (send_buffer_lock for + * example). Only if neither is true, the stats_lock is acquired. The counters + * are always read without holding a lock in the hope that writing 8 bytes to + * memory is an atomic operation. */ +static uint64_t stats_octets_rx = 0; +static uint64_t stats_octets_tx = 0; +static uint64_t stats_packets_rx = 0; +static uint64_t stats_packets_tx = 0; +static uint64_t stats_values_dispatched = 0; +static uint64_t stats_values_not_dispatched = 0; +static uint64_t stats_values_sent = 0; +static uint64_t stats_values_not_sent = 0; +static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; + /* * Private functions */ @@ -327,7 +368,45 @@ static _Bool check_send_okay (const value_list_t *vl) /* {{{ */ return (!received); } /* }}} _Bool check_send_okay */ -static int network_dispatch_values (value_list_t *vl) /* {{{ */ +static _Bool check_notify_received (const notification_t *n) /* {{{ */ +{ + notification_meta_t *ptr; + + for (ptr = n->meta; ptr != NULL; ptr = ptr->next) + if ((strcmp ("network:received", ptr->name) == 0) + && (ptr->type == NM_TYPE_BOOLEAN)) + return ((_Bool) ptr->nm_value.nm_boolean); + + return (0); +} /* }}} _Bool check_notify_received */ + +static _Bool check_send_notify_okay (const notification_t *n) /* {{{ */ +{ + static c_complain_t complain_forwarding = C_COMPLAIN_INIT_STATIC; + _Bool received = 0; + + if (n->meta == NULL) + return (1); + + received = check_notify_received (n); + + if (network_config_forward && received) + { + c_complain_once (LOG_ERR, &complain_forwarding, + "network plugin: A notification has been received via the network " + "forwarding if enabled. Forwarding of notifications is currently " + "not supported, because there is not loop-deteciton available. " + "Please contact the collectd mailing list if you need this " + "feature."); + } + + /* By default, only *send* value lists that were not *received* by the + * network plugin. */ + return (!received); +} /* }}} _Bool check_send_notify_okay */ + +static int network_dispatch_values (value_list_t *vl, /* {{{ */ + const char *username) { int status; @@ -340,12 +419,13 @@ static int network_dispatch_values (value_list_t *vl) /* {{{ */ if (!check_receive_okay (vl)) { #if COLLECT_DEBUG - char name[6*DATA_MAX_NAME_LEN]; - FORMAT_VL (name, sizeof (name), vl); - name[sizeof (name) - 1] = 0; - DEBUG ("network plugin: network_dispatch_values: " - "NOT dispatching %s.", name); + char name[6*DATA_MAX_NAME_LEN]; + FORMAT_VL (name, sizeof (name), vl); + name[sizeof (name) - 1] = 0; + DEBUG ("network plugin: network_dispatch_values: " + "NOT dispatching %s.", name); #endif + stats_values_not_dispatched++; return (0); } @@ -367,7 +447,20 @@ static int network_dispatch_values (value_list_t *vl) /* {{{ */ return (status); } - plugin_dispatch_values (vl); + if (username != NULL) + { + status = meta_data_add_string (vl->meta, "network:username", username); + if (status != 0) + { + ERROR ("network plugin: meta_data_add_string failed."); + meta_data_destroy (vl->meta); + vl->meta = NULL; + return (status); + } + } + + plugin_dispatch_values_secure (vl); + stats_values_dispatched++; meta_data_destroy (vl->meta); vl->meta = NULL; @@ -375,7 +468,51 @@ static int network_dispatch_values (value_list_t *vl) /* {{{ */ return (0); } /* }}} int network_dispatch_values */ +static int network_dispatch_notification (notification_t *n) /* {{{ */ +{ + int status; + + assert (n->meta == NULL); + + status = plugin_notification_meta_add_boolean (n, "network:received", 1); + if (status != 0) + { + ERROR ("network plugin: plugin_notification_meta_add_boolean failed."); + plugin_notification_meta_free (n->meta); + n->meta = NULL; + return (status); + } + + status = plugin_dispatch_notification (n); + + plugin_notification_meta_free (n->meta); + n->meta = NULL; + + return (status); +} /* }}} int network_dispatch_notification */ + #if HAVE_LIBGCRYPT +static void network_init_gcrypt (void) /* {{{ */ +{ + /* http://lists.gnupg.org/pipermail/gcrypt-devel/2003-August/000458.html + * Because you can't know in a library whether another library has + * already initialized the library */ + if (gcry_control (GCRYCTL_ANY_INITIALIZATION_P)) + return; + + /* http://www.gnupg.org/documentation/manuals/gcrypt/Multi_002dThreading.html + * To ensure thread-safety, it's important to set GCRYCTL_SET_THREAD_CBS + * *before* initalizing Libgcrypt with gcry_check_version(), which itself must + * be called before any other gcry_* function. GCRYCTL_ANY_INITIALIZATION_P + * above doesn't count, as it doesn't implicitly initalize Libgcrypt. + * + * tl;dr: keep all these gry_* statements in this exact order please. */ + gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread); + gcry_check_version (NULL); + gcry_control (GCRYCTL_INIT_SECMEM, 32768); + gcry_control (GCRYCTL_INITIALIZATION_FINISHED); +} /* }}} void network_init_gcrypt */ + static gcry_cipher_hd_t network_get_aes256_cypher (sockent_t *se, /* {{{ */ const void *iv, size_t iv_size, const char *username) { @@ -561,7 +698,7 @@ static int write_part_number (char **ret_buffer, int *ret_buffer_len, part_header_t pkg_head; uint64_t pkg_value; - + int offset; packet_len = sizeof (pkg_head) + sizeof (pkg_value); @@ -665,7 +802,7 @@ static int parse_part_values (void **ret_buffer, size_t *ret_buffer_len, exp_size = 3 * sizeof (uint16_t) + pkg_numval * (sizeof (uint8_t) + sizeof (value_t)); - if ((buffer_len < 0) || (buffer_len < exp_size)) + if (buffer_len < exp_size) { WARNING ("network plugin: parse_part_values: " "Packet too short: " @@ -719,11 +856,11 @@ static int parse_part_values (void **ret_buffer, size_t *ret_buffer_len, break; default: - sfree (pkg_types); - sfree (pkg_values); NOTICE ("network plugin: parse_part_values: " "Don't know how to handle data source type %"PRIu8, pkg_types[i]); + sfree (pkg_types); + sfree (pkg_values); return (-1); } /* switch (pkg_types[i]) */ } @@ -749,9 +886,8 @@ static int parse_part_number (void **ret_buffer, size_t *ret_buffer_len, size_t exp_size = 2 * sizeof (uint16_t) + sizeof (uint64_t); uint16_t pkg_length; - uint16_t pkg_type; - if ((buffer_len < 0) || ((size_t) buffer_len < exp_size)) + if (buffer_len < exp_size) { WARNING ("network plugin: parse_part_number: " "Packet too short: " @@ -763,7 +899,7 @@ static int parse_part_number (void **ret_buffer, size_t *ret_buffer_len, memcpy ((void *) &tmp16, buffer, sizeof (tmp16)); buffer += sizeof (tmp16); - pkg_type = ntohs (tmp16); + /* pkg_type = ntohs (tmp16); */ memcpy ((void *) &tmp16, buffer, sizeof (tmp16)); buffer += sizeof (tmp16); @@ -789,9 +925,8 @@ static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len, size_t header_size = 2 * sizeof (uint16_t); uint16_t pkg_length; - uint16_t pkg_type; - if ((buffer_len < 0) || (buffer_len < header_size)) + if (buffer_len < header_size) { WARNING ("network plugin: parse_part_string: " "Packet too short: " @@ -803,7 +938,7 @@ static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len, memcpy ((void *) &tmp16, buffer, sizeof (tmp16)); buffer += sizeof (tmp16); - pkg_type = ntohs (tmp16); + /* pkg_type = ntohs (tmp16); */ memcpy ((void *) &tmp16, buffer, sizeof (tmp16)); buffer += sizeof (tmp16); @@ -867,7 +1002,8 @@ static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len, #define PP_SIGNED 0x01 #define PP_ENCRYPTED 0x02 static int parse_packet (sockent_t *se, - void *buffer, size_t buffer_size, int flags); + void *buffer, size_t buffer_size, int flags, + const char *username); #define BUFFER_READ(p,s) do { \ memcpy ((p), buffer + buffer_offset, (s)); \ @@ -878,6 +1014,8 @@ static int parse_packet (sockent_t *se, static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */ void **ret_buffer, size_t *ret_buffer_len, int flags) { + static c_complain_t complain_no_users = C_COMPLAIN_INIT_STATIC; + char *buffer; size_t buffer_len; size_t buffer_offset; @@ -899,8 +1037,9 @@ static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */ if (se->data.server.userdb == NULL) { - NOTICE ("network plugin: Received signed network packet but can't verify " - "it because no user DB has been configured. Will accept it."); + c_complain (LOG_NOTICE, &complain_no_users, + "network plugin: Received signed network packet but can't verify it " + "because no user DB has been configured. Will accept it."); return (0); } @@ -962,6 +1101,8 @@ static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */ { ERROR ("network plugin: gcry_md_setkey failed: %s", gcry_strerror (err)); gcry_md_close (hd); + sfree (secret); + sfree (pss.username); return (-1); } @@ -983,9 +1124,6 @@ static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */ gcry_md_close (hd); hd = NULL; - sfree (secret); - sfree (pss.username); - if (memcmp (pss.hash, hash, sizeof (pss.hash)) != 0) { WARNING ("network plugin: Verifying HMAC-SHA-256 signature failed: " @@ -994,9 +1132,12 @@ static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */ else { parse_packet (se, buffer + buffer_offset, buffer_len - buffer_offset, - flags | PP_SIGNED); + flags | PP_SIGNED, pss.username); } + sfree (secret); + sfree (pss.username); + *ret_buffer = buffer + buffer_len; *ret_buffer_len = 0; @@ -1040,7 +1181,8 @@ static int parse_part_sign_sha256 (sockent_t *se, /* {{{ */ warning_has_been_printed = 1; } - parse_packet (se, buffer + part_len, buffer_size - part_len, flags); + parse_packet (se, buffer + part_len, buffer_size - part_len, flags, + /* username = */ NULL); *ret_buffer = buffer + buffer_size; *ret_buffer_size = 0; @@ -1119,7 +1261,10 @@ static int parse_part_encr_aes256 (sockent_t *se, /* {{{ */ cypher = network_get_aes256_cypher (se, pea.iv, sizeof (pea.iv), pea.username); if (cypher == NULL) + { + sfree (pea.username); return (-1); + } payload_len = part_size - (PART_ENCRYPTION_AES256_SIZE + username_len); assert (payload_len > 0); @@ -1131,6 +1276,7 @@ static int parse_part_encr_aes256 (sockent_t *se, /* {{{ */ /* in = */ NULL, /* in len = */ 0); if (err != 0) { + sfree (pea.username); ERROR ("network plugin: gcry_cipher_decrypt returned: %s", gcry_strerror (err)); return (-1); @@ -1149,17 +1295,22 @@ static int parse_part_encr_aes256 (sockent_t *se, /* {{{ */ buffer + buffer_offset, payload_len); if (memcmp (hash, pea.hash, sizeof (hash)) != 0) { + sfree (pea.username); ERROR ("network plugin: Decryption failed: Checksum mismatch."); return (-1); } parse_packet (se, buffer + buffer_offset, payload_len, - flags | PP_ENCRYPTED); + flags | PP_ENCRYPTED, pea.username); + + /* XXX: Free pea.username?!? */ /* Update return values */ *ret_buffer = buffer + part_size; *ret_buffer_len = buffer_len - part_size; + sfree (pea.username); + return (0); } /* }}} int parse_part_encr_aes256 */ /* #endif HAVE_LIBGCRYPT */ @@ -1214,7 +1365,8 @@ static int parse_part_encr_aes256 (sockent_t *se, /* {{{ */ #undef BUFFER_READ static int parse_packet (sockent_t *se, /* {{{ */ - void *buffer, size_t buffer_size, int flags) + void *buffer, size_t buffer_size, int flags, + const char *username) { int status; @@ -1314,7 +1466,7 @@ static int parse_packet (sockent_t *se, /* {{{ */ if (status != 0) break; - network_dispatch_values (&vl); + network_dispatch_values (&vl, username); sfree (vl.values); } @@ -1410,7 +1562,7 @@ static int parse_packet (sockent_t *se, /* {{{ */ } else { - plugin_dispatch_notification (&n); + network_dispatch_notification (&n); } } else if (pkg_type == TYPE_SEVERITY) @@ -1528,10 +1680,10 @@ static int network_set_ttl (const sockent_t *se, const struct addrinfo *ai) if (setsockopt (se->data.client.fd, IPPROTO_IP, optname, &network_config_ttl, - sizeof (network_config_ttl)) == -1) + sizeof (network_config_ttl)) != 0) { char errbuf[1024]; - ERROR ("setsockopt: %s", + ERROR ("network plugin: setsockopt (ipv4-ttl): %s", sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } @@ -1549,10 +1701,10 @@ static int network_set_ttl (const sockent_t *se, const struct addrinfo *ai) if (setsockopt (se->data.client.fd, IPPROTO_IPV6, optname, &network_config_ttl, - sizeof (network_config_ttl)) == -1) + sizeof (network_config_ttl)) != 0) { char errbuf[1024]; - ERROR ("setsockopt: %s", + ERROR ("network plugin: setsockopt(ipv6-ttl): %s", sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); @@ -1562,16 +1714,124 @@ static int network_set_ttl (const sockent_t *se, const struct addrinfo *ai) return (0); } /* int network_set_ttl */ -static int network_bind_socket (int fd, const struct addrinfo *ai) +static int network_set_interface (const sockent_t *se, const struct addrinfo *ai) /* {{{ */ { + DEBUG ("network plugin: network_set_interface: interface index = %i;", + se->interface); + + assert (se->type == SOCKENT_TYPE_CLIENT); + + if (ai->ai_family == AF_INET) + { + struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr; + + if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr))) + { +#if HAVE_STRUCT_IP_MREQN_IMR_IFINDEX + /* If possible, use the "ip_mreqn" structure which has + * an "interface index" member. Using the interface + * index is preferred here, because of its similarity + * to the way IPv6 handles this. Unfortunately, it + * appears not to be portable. */ + struct ip_mreqn mreq; + + memset (&mreq, 0, sizeof (mreq)); + mreq.imr_multiaddr.s_addr = addr->sin_addr.s_addr; + mreq.imr_address.s_addr = ntohl (INADDR_ANY); + mreq.imr_ifindex = se->interface; +#else + struct ip_mreq mreq; + + memset (&mreq, 0, sizeof (mreq)); + mreq.imr_multiaddr.s_addr = addr->sin_addr.s_addr; + mreq.imr_interface.s_addr = ntohl (INADDR_ANY); +#endif + + if (setsockopt (se->data.client.fd, IPPROTO_IP, IP_MULTICAST_IF, + &mreq, sizeof (mreq)) != 0) + { + char errbuf[1024]; + ERROR ("network plugin: setsockopt (ipv4-multicast-if): %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (-1); + } + + return (0); + } + } + else if (ai->ai_family == AF_INET6) + { + struct sockaddr_in6 *addr = (struct sockaddr_in6 *) ai->ai_addr; + + if (IN6_IS_ADDR_MULTICAST (&addr->sin6_addr)) + { + if (setsockopt (se->data.client.fd, IPPROTO_IPV6, IPV6_MULTICAST_IF, + &se->interface, + sizeof (se->interface)) != 0) + { + char errbuf[1024]; + ERROR ("network plugin: setsockopt (ipv6-multicast-if): %s", + sstrerror (errno, errbuf, + sizeof (errbuf))); + return (-1); + } + + return (0); + } + } + + /* else: Not a multicast interface. */ + if (se->interface != 0) + { +#if defined(HAVE_IF_INDEXTONAME) && HAVE_IF_INDEXTONAME && defined(SO_BINDTODEVICE) + char interface_name[IFNAMSIZ]; + + if (if_indextoname (se->interface, interface_name) == NULL) + return (-1); + + DEBUG ("network plugin: Binding socket to interface %s", interface_name); + + if (setsockopt (se->data.client.fd, SOL_SOCKET, SO_BINDTODEVICE, + interface_name, + sizeof(interface_name)) == -1 ) + { + char errbuf[1024]; + ERROR ("network plugin: setsockopt (bind-if): %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (-1); + } +/* #endif HAVE_IF_INDEXTONAME && SO_BINDTODEVICE */ + +#else + WARNING ("network plugin: Cannot set the interface on a unicast " + "socket because " +# if !defined(SO_BINDTODEVICE) + "the \"SO_BINDTODEVICE\" socket option " +# else + "the \"if_indextoname\" function " +# endif + "is not available on your system."); +#endif + + } + + return (0); +} /* }}} network_set_interface */ + +static int network_bind_socket (int fd, const struct addrinfo *ai, const int interface_idx) +{ +#if KERNEL_SOLARIS + char loop = 0; +#else int loop = 0; +#endif int yes = 1; /* allow multiple sockets to use the same PORT number */ if (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) == -1) { char errbuf[1024]; - ERROR ("setsockopt: %s", + ERROR ("network plugin: setsockopt (reuseaddr): %s", sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } @@ -1591,18 +1851,30 @@ static int network_bind_socket (int fd, const struct addrinfo *ai) struct sockaddr_in *addr = (struct sockaddr_in *) ai->ai_addr; if (IN_MULTICAST (ntohl (addr->sin_addr.s_addr))) { +#if HAVE_STRUCT_IP_MREQN_IMR_IFINDEX + struct ip_mreqn mreq; +#else struct ip_mreq mreq; +#endif DEBUG ("fd = %i; IPv4 multicast address found", fd); mreq.imr_multiaddr.s_addr = addr->sin_addr.s_addr; - mreq.imr_interface.s_addr = htonl (INADDR_ANY); +#if HAVE_STRUCT_IP_MREQN_IMR_IFINDEX + /* Set the interface using the interface index if + * possible (available). Unfortunately, the struct + * ip_mreqn is not portable. */ + mreq.imr_address.s_addr = ntohl (INADDR_ANY); + mreq.imr_ifindex = interface_idx; +#else + mreq.imr_interface.s_addr = ntohl (INADDR_ANY); +#endif if (setsockopt (fd, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof (loop)) == -1) { char errbuf[1024]; - ERROR ("setsockopt: %s", + ERROR ("network plugin: setsockopt (multicast-loop): %s", sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); @@ -1612,11 +1884,13 @@ static int network_bind_socket (int fd, const struct addrinfo *ai) &mreq, sizeof (mreq)) == -1) { char errbuf[1024]; - ERROR ("setsockopt: %s", + ERROR ("network plugin: setsockopt (add-membership): %s", sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } + + return (0); } } else if (ai->ai_family == AF_INET6) @@ -1642,13 +1916,13 @@ static int network_bind_socket (int fd, const struct addrinfo *ai) * single interface; programs running on * multihomed hosts may need to join the same * group on more than one interface.*/ - mreq.ipv6mr_interface = 0; + mreq.ipv6mr_interface = interface_idx; if (setsockopt (fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &loop, sizeof (loop)) == -1) { char errbuf[1024]; - ERROR ("setsockopt: %s", + ERROR ("network plugin: setsockopt (ipv6-multicast-loop): %s", sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); @@ -1658,13 +1932,40 @@ static int network_bind_socket (int fd, const struct addrinfo *ai) &mreq, sizeof (mreq)) == -1) { char errbuf[1024]; - ERROR ("setsockopt: %s", + ERROR ("network plugin: setsockopt (ipv6-add-membership): %s", sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } + + return (0); + } + } + +#if defined(HAVE_IF_INDEXTONAME) && HAVE_IF_INDEXTONAME && defined(SO_BINDTODEVICE) + /* if a specific interface was set, bind the socket to it. But to avoid + * possible problems with multicast routing, only do that for non-multicast + * addresses */ + if (interface_idx != 0) + { + char interface_name[IFNAMSIZ]; + + if (if_indextoname (interface_idx, interface_name) == NULL) + return (-1); + + DEBUG ("fd = %i; Binding socket to interface %s", fd, interface_name); + + if (setsockopt (fd, SOL_SOCKET, SO_BINDTODEVICE, + interface_name, + sizeof(interface_name)) == -1 ) + { + char errbuf[1024]; + ERROR ("network plugin: setsockopt (bind-if): %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (-1); } } +#endif /* HAVE_IF_INDEXTONAME && SO_BINDTODEVICE */ return (0); } /* int network_bind_socket */ @@ -1681,6 +1982,7 @@ static int sockent_init (sockent_t *se, int type) /* {{{ */ se->type = SOCKENT_TYPE_CLIENT; se->node = NULL; se->service = NULL; + se->interface = 0; se->next = NULL; if (type == SOCKENT_TYPE_SERVER) @@ -1728,6 +2030,8 @@ static int sockent_open (sockent_t *se) /* {{{ */ { if (se->data.client.security_level > SECURITY_LEVEL_NONE) { + network_init_gcrypt (); + if ((se->data.client.username == NULL) || (se->data.client.password == NULL)) { @@ -1746,6 +2050,8 @@ static int sockent_open (sockent_t *se) /* {{{ */ { if (se->data.server.security_level > SECURITY_LEVEL_NONE) { + network_init_gcrypt (); + if (se->data.server.auth_file == NULL) { ERROR ("network plugin: Server socket with " @@ -1829,7 +2135,7 @@ static int sockent_open (sockent_t *se) /* {{{ */ continue; } - status = network_bind_socket (*tmp, ai_ptr); + status = network_bind_socket (*tmp, ai_ptr, se->interface); if (status != 0) { close (*tmp); @@ -1869,6 +2175,7 @@ static int sockent_open (sockent_t *se) /* {{{ */ se->data.client.addrlen = ai_ptr->ai_addrlen; network_set_ttl (se, ai_ptr); + network_set_interface (se, ai_ptr); /* We don't open more than one write-socket per * node/service pair.. */ @@ -1968,6 +2275,7 @@ static void *dispatch_thread (void __attribute__((unused)) *arg) /* {{{ */ ent = receive_list_head; if (ent != NULL) receive_list_head = ent->next; + receive_list_length--; pthread_mutex_unlock (&receive_list_lock); /* Check whether we are supposed to exit. We do NOT check `listen_loop' @@ -2001,7 +2309,8 @@ static void *dispatch_thread (void __attribute__((unused)) *arg) /* {{{ */ continue; } - parse_packet (se, ent->data, ent->data_len, /* flags = */ 0); + parse_packet (se, ent->data, ent->data_len, /* flags = */ 0, + /* username = */ NULL); sfree (ent->data); sfree (ent); } /* while (42) */ @@ -2019,11 +2328,13 @@ static int network_receive (void) /* {{{ */ receive_list_entry_t *private_list_head; receive_list_entry_t *private_list_tail; + uint64_t private_list_length; assert (listen_sockets_num > 0); private_list_head = NULL; private_list_tail = NULL; + private_list_length = 0; while (listen_loop == 0) { @@ -2060,6 +2371,9 @@ static int network_receive (void) /* {{{ */ return (-1); } + stats_octets_rx += ((uint64_t) buffer_len); + stats_packets_rx++; + /* TODO: Possible performance enhancement: Do not free * these entries in the dispatch thread but put them in * another list, so we don't have to allocate more and @@ -2074,6 +2388,7 @@ static int network_receive (void) /* {{{ */ ent->data = malloc (network_config_packet_size); if (ent->data == NULL) { + sfree (ent); ERROR ("network plugin: malloc failed."); return (-1); } @@ -2088,22 +2403,28 @@ static int network_receive (void) /* {{{ */ else private_list_tail->next = ent; private_list_tail = ent; + private_list_length++; /* Do not block here. Blocking here has led to * insufficient performance in the past. */ if (pthread_mutex_trylock (&receive_list_lock) == 0) { + assert (((receive_list_head == NULL) && (receive_list_length == 0)) + || ((receive_list_head != NULL) && (receive_list_length != 0))); + if (receive_list_head == NULL) receive_list_head = private_list_head; else receive_list_tail->next = private_list_head; receive_list_tail = private_list_tail; - - private_list_head = NULL; - private_list_tail = NULL; + receive_list_length += private_list_length; pthread_cond_signal (&receive_list_cond); pthread_mutex_unlock (&receive_list_lock); + + private_list_head = NULL; + private_list_tail = NULL; + private_list_length = 0; } } /* for (listen_sockets_pollfd) */ } /* while (listen_loop == 0) */ @@ -2118,9 +2439,11 @@ static int network_receive (void) /* {{{ */ else receive_list_tail->next = private_list_head; receive_list_tail = private_list_tail; + receive_list_length += private_list_length; private_list_head = NULL; private_list_tail = NULL; + private_list_length = 0; pthread_cond_signal (&receive_list_cond); pthread_mutex_unlock (&receive_list_lock); @@ -2416,7 +2739,7 @@ static int add_to_buffer (char *buffer, int buffer_size, /* {{{ */ return (-1); sstrncpy (vl_def->type_instance, vl->type_instance, sizeof (vl_def->type_instance)); } - + if (write_part_values (&buffer, &buffer_size, ds, vl) != 0) return (-1); @@ -2429,6 +2752,10 @@ static void flush_buffer (void) send_buffer_fill); network_send_buffer (send_buffer, (size_t) send_buffer_fill); + + stats_octets_tx += ((uint64_t) send_buffer_fill); + stats_packets_tx++; + network_init_buffer (); } @@ -2446,6 +2773,11 @@ static int network_write (const data_set_t *ds, const value_list_t *vl, DEBUG ("network plugin: network_write: " "NOT sending %s.", name); #endif + /* Counter is not protected by another lock and may be reached by + * multiple threads */ + pthread_mutex_lock (&stats_lock); + stats_values_not_sent++; + pthread_mutex_unlock (&stats_lock); return (0); } @@ -2463,6 +2795,8 @@ static int network_write (const data_set_t *ds, const value_list_t *vl, /* status == bytes added to the buffer */ send_buffer_fill += status; send_buffer_ptr += status; + + stats_values_sent++; } else { @@ -2477,6 +2811,8 @@ static int network_write (const data_set_t *ds, const value_list_t *vl, { send_buffer_fill += status; send_buffer_ptr += status; + + stats_values_sent++; } } @@ -2552,6 +2888,25 @@ static int network_config_set_ttl (const oconfig_item_t *ci) /* {{{ */ return (0); } /* }}} int network_config_set_ttl */ +static int network_config_set_interface (const oconfig_item_t *ci, /* {{{ */ + int *interface) +{ + if ((ci->values_num != 1) + || (ci->values[0].type != OCONFIG_TYPE_STRING)) + { + WARNING ("network plugin: The `Interface' config option needs exactly " + "one string argument."); + return (-1); + } + + if (interface == NULL) + return (-1); + + *interface = if_nametoindex (ci->values[0].value.string); + + return (0); +} /* }}} int network_config_set_interface */ + static int network_config_set_buffer_size (const oconfig_item_t *ci) /* {{{ */ { int tmp; @@ -2663,6 +3018,10 @@ static int network_config_add_listen (const oconfig_item_t *ci) /* {{{ */ &se->data.server.security_level); else #endif /* HAVE_LIBGCRYPT */ + if (strcasecmp ("Interface", child->key) == 0) + network_config_set_interface (child, + &se->interface); + else { WARNING ("network plugin: Option `%s' is not allowed here.", child->key); @@ -2741,6 +3100,10 @@ static int network_config_add_server (const oconfig_item_t *ci) /* {{{ */ &se->data.client.security_level); else #endif /* HAVE_LIBGCRYPT */ + if (strcasecmp ("Interface", child->key) == 0) + network_config_set_interface (child, + &se->interface); + else { WARNING ("network plugin: Option `%s' is not allowed here.", child->key); @@ -2797,6 +3160,8 @@ static int network_config (oconfig_item_t *ci) /* {{{ */ network_config_set_buffer_size (child); else if (strcasecmp ("Forward", child->key) == 0) network_config_set_boolean (child, &network_config_forward); + else if (strcasecmp ("ReportStats", child->key) == 0) + network_config_set_boolean (child, &network_config_stats); else if (strcasecmp ("CacheFlush", child->key) == 0) /* no op for backwards compatibility only */; else @@ -2810,15 +3175,17 @@ static int network_config (oconfig_item_t *ci) /* {{{ */ } /* }}} int network_config */ static int network_notification (const notification_t *n, - user_data_t __attribute__((unused)) *user_data) + user_data_t __attribute__((unused)) *user_data) { char buffer[network_config_packet_size]; char *buffer_ptr = buffer; int buffer_free = sizeof (buffer); int status; - memset (buffer, '\0', sizeof (buffer)); + if (!check_send_notify_okay (n)) + return (0); + memset (buffer, 0, sizeof (buffer)); status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME, (uint64_t) n->time); @@ -2833,7 +3200,7 @@ static int network_notification (const notification_t *n, if (strlen (n->host) > 0) { status = write_part_string (&buffer_ptr, &buffer_free, TYPE_HOST, - n->host, strlen (n->host)); + n->host, strlen (n->host)); if (status != 0) return (-1); } @@ -2841,7 +3208,7 @@ static int network_notification (const notification_t *n, if (strlen (n->plugin) > 0) { status = write_part_string (&buffer_ptr, &buffer_free, TYPE_PLUGIN, - n->plugin, strlen (n->plugin)); + n->plugin, strlen (n->plugin)); if (status != 0) return (-1); } @@ -2849,8 +3216,8 @@ static int network_notification (const notification_t *n, if (strlen (n->plugin_instance) > 0) { status = write_part_string (&buffer_ptr, &buffer_free, - TYPE_PLUGIN_INSTANCE, - n->plugin_instance, strlen (n->plugin_instance)); + TYPE_PLUGIN_INSTANCE, + n->plugin_instance, strlen (n->plugin_instance)); if (status != 0) return (-1); } @@ -2858,7 +3225,7 @@ static int network_notification (const notification_t *n, if (strlen (n->type) > 0) { status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE, - n->type, strlen (n->type)); + n->type, strlen (n->type)); if (status != 0) return (-1); } @@ -2866,7 +3233,7 @@ static int network_notification (const notification_t *n, if (strlen (n->type_instance) > 0) { status = write_part_string (&buffer_ptr, &buffer_free, TYPE_TYPE_INSTANCE, - n->type_instance, strlen (n->type_instance)); + n->type_instance, strlen (n->type_instance)); if (status != 0) return (-1); } @@ -2923,6 +3290,83 @@ static int network_shutdown (void) return (0); } /* int network_shutdown */ +static int network_stats_read (void) /* {{{ */ +{ + uint64_t copy_octets_rx; + uint64_t copy_octets_tx; + uint64_t copy_packets_rx; + uint64_t copy_packets_tx; + uint64_t copy_values_dispatched; + uint64_t copy_values_not_dispatched; + uint64_t copy_values_sent; + uint64_t copy_values_not_sent; + uint64_t copy_receive_list_length; + value_list_t vl = VALUE_LIST_INIT; + value_t values[2]; + + copy_octets_rx = stats_octets_rx; + copy_octets_tx = stats_octets_tx; + copy_packets_rx = stats_packets_rx; + copy_packets_tx = stats_packets_tx; + copy_values_dispatched = stats_values_dispatched; + copy_values_not_dispatched = stats_values_not_dispatched; + copy_values_sent = stats_values_sent; + copy_values_not_sent = stats_values_not_sent; + copy_receive_list_length = receive_list_length; + + /* Initialize `vl' */ + vl.values = values; + vl.values_len = 2; + vl.time = 0; + vl.interval = interval_g; + sstrncpy (vl.host, hostname_g, sizeof (vl.host)); + sstrncpy (vl.plugin, "network", sizeof (vl.plugin)); + + /* Octets received / sent */ + vl.values[0].counter = (counter_t) copy_octets_rx; + vl.values[1].counter = (counter_t) copy_octets_tx; + sstrncpy (vl.type, "if_octets", sizeof (vl.type)); + plugin_dispatch_values_secure (&vl); + + /* Packets received / send */ + vl.values[0].counter = (counter_t) copy_packets_rx; + vl.values[1].counter = (counter_t) copy_packets_tx; + sstrncpy (vl.type, "if_packets", sizeof (vl.type)); + plugin_dispatch_values_secure (&vl); + + /* Values (not) dispatched and (not) send */ + sstrncpy (vl.type, "total_values", sizeof (vl.type)); + vl.values_len = 1; + + vl.values[0].derive = (derive_t) copy_values_dispatched; + sstrncpy (vl.type_instance, "dispatch-accepted", + sizeof (vl.type_instance)); + plugin_dispatch_values_secure (&vl); + + vl.values[0].derive = (derive_t) copy_values_not_dispatched; + sstrncpy (vl.type_instance, "dispatch-rejected", + sizeof (vl.type_instance)); + plugin_dispatch_values_secure (&vl); + + vl.values[0].derive = (derive_t) copy_values_sent; + sstrncpy (vl.type_instance, "send-accepted", + sizeof (vl.type_instance)); + plugin_dispatch_values_secure (&vl); + + vl.values[0].derive = (derive_t) copy_values_not_sent; + sstrncpy (vl.type_instance, "send-rejected", + sizeof (vl.type_instance)); + plugin_dispatch_values_secure (&vl); + + /* Receive queue length */ + vl.values[0].gauge = (gauge_t) copy_receive_list_length; + sstrncpy (vl.type, "queue_length", sizeof (vl.type)); + vl.type_instance[0] = 0; + plugin_dispatch_values_secure (&vl); + + return (0); +} /* }}} int network_stats_read */ + static int network_init (void) { static _Bool have_init = false; @@ -2934,11 +3378,12 @@ static int network_init (void) have_init = true; #if HAVE_LIBGCRYPT - gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread); - gcry_control (GCRYCTL_INIT_SECMEM, 32768, 0); - gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0); + network_init_gcrypt (); #endif + if (network_config_stats != 0) + plugin_register_read ("network", network_stats_read); + plugin_register_shutdown ("network", network_shutdown); send_buffer = malloc (network_config_packet_size); @@ -3007,7 +3452,7 @@ static int network_init (void) return (0); } /* int network_init */ -/* +/* * The flush option of the network plugin cannot flush individual identifiers. * All the values are added to a buffer and sent when the buffer is full, the * requested value may or may not be in there, it's not worth finding out. We