/**
* collectd - src/network.c
- * Copyright (C) 2005-2009 Florian octo Forster
+ * Copyright (C) 2005-2013 Florian octo Forster
* Copyright (C) 2009 Aman Gupta
*
* This program is free software; you can redistribute it and/or modify it
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* Authors:
- * Florian octo Forster <octo at verplant.org>
+ * Florian octo Forster <octo at collectd.org>
* Aman Gupta <aman at tmm1.net>
**/
#endif
#if HAVE_LIBGCRYPT
+# include <pthread.h>
+# if defined __APPLE__
+/* default xcode compiler throws warnings even when deprecated functionality
+ * is not used. -Werror breaks the build because of erroneous warnings.
+ * http://stackoverflow.com/questions/10556299/compiler-warnings-with-libgcrypt-v1-5-0/12830209#12830209
+ */
+# pragma GCC diagnostic ignored "-Wdeprecated-declarations"
+# endif
+/* FreeBSD's copy of libgcrypt extends the existing GCRYPT_NO_DEPRECATED
+ * to properly hide all deprecated functionality.
+ * http://svnweb.freebsd.org/ports/head/security/libgcrypt/files/patch-src__gcrypt.h.in
+ */
+# define GCRYPT_NO_DEPRECATED
# include <gcrypt.h>
+# if defined __APPLE__
+/* Re enable deprecation warnings */
+# pragma GCC diagnostic warning "-Wdeprecated-declarations"
+# endif
+# if GCRYPT_VERSION_NUMBER < 0x010600
GCRY_THREAD_OPTION_PTHREAD_IMPL;
+# endif
#endif
#ifndef IPV6_ADD_MEMBERSHIP
* Private variables
*/
static int network_config_ttl = 0;
-static size_t network_config_packet_size = 1024;
+/* Ethernet - (IPv6 + UDP) = 1500 - (40 + 8) = 1452 */
+static size_t network_config_packet_size = 1452;
static int network_config_forward = 0;
static int network_config_stats = 0;
* example). Only if neither is true, the stats_lock is acquired. The counters
* are always read without holding a lock in the hope that writing 8 bytes to
* memory is an atomic operation. */
-static uint64_t stats_octets_rx = 0;
-static uint64_t stats_octets_tx = 0;
-static uint64_t stats_packets_rx = 0;
-static uint64_t stats_packets_tx = 0;
-static uint64_t stats_values_dispatched = 0;
-static uint64_t stats_values_not_dispatched = 0;
-static uint64_t stats_values_sent = 0;
-static uint64_t stats_values_not_sent = 0;
+static derive_t stats_octets_rx = 0;
+static derive_t stats_octets_tx = 0;
+static derive_t stats_packets_rx = 0;
+static derive_t stats_packets_tx = 0;
+static derive_t stats_values_dispatched = 0;
+static derive_t stats_values_not_dispatched = 0;
+static derive_t stats_values_sent = 0;
+static derive_t stats_values_not_sent = 0;
static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
/*
/* This is a value we already sent. Don't allow it to be received again in
* order to avoid looping. */
if ((status == 0) && (time_sent >= ((uint64_t) vl->time)))
- return (false);
+ return (0);
- return (true);
+ return (1);
} /* }}} _Bool check_receive_okay */
static _Bool check_send_okay (const value_list_t *vl) /* {{{ */
{
- _Bool received = false;
+ _Bool received = 0;
int status;
if (network_config_forward != 0)
- return (true);
+ return (1);
if (vl->meta == NULL)
- return (true);
+ return (1);
status = meta_data_get_boolean (vl->meta, "network:received", &received);
if (status == -ENOENT)
- return (true);
+ return (1);
else if (status != 0)
{
ERROR ("network plugin: check_send_okay: meta_data_get_boolean failed "
"with status %i.", status);
- return (true);
+ return (1);
}
/* By default, only *send* value lists that were not *received* by the
{
c_complain_once (LOG_ERR, &complain_forwarding,
"network plugin: A notification has been received via the network "
- "forwarding if enabled. Forwarding of notifications is currently "
+ "and forwarding is enabled. Forwarding of notifications is currently "
"not supported, because there is not loop-deteciton available. "
"Please contact the collectd mailing list if you need this "
"feature.");
return (-ENOMEM);
}
- status = meta_data_add_boolean (vl->meta, "network:received", true);
+ status = meta_data_add_boolean (vl->meta, "network:received", 1);
if (status != 0)
{
ERROR ("network plugin: meta_data_add_boolean failed.");
}
}
- plugin_dispatch_values_secure (vl);
+ plugin_dispatch_values (vl);
stats_values_dispatched++;
meta_data_destroy (vl->meta);
} /* }}} int network_dispatch_notification */
#if HAVE_LIBGCRYPT
+static void network_init_gcrypt (void) /* {{{ */
+{
+ /* http://lists.gnupg.org/pipermail/gcrypt-devel/2003-August/000458.html
+ * Because you can't know in a library whether another library has
+ * already initialized the library */
+ if (gcry_control (GCRYCTL_ANY_INITIALIZATION_P))
+ return;
+
+ /* http://www.gnupg.org/documentation/manuals/gcrypt/Multi_002dThreading.html
+ * To ensure thread-safety, it's important to set GCRYCTL_SET_THREAD_CBS
+ * *before* initalizing Libgcrypt with gcry_check_version(), which itself must
+ * be called before any other gcry_* function. GCRYCTL_ANY_INITIALIZATION_P
+ * above doesn't count, as it doesn't implicitly initalize Libgcrypt.
+ *
+ * tl;dr: keep all these gry_* statements in this exact order please. */
+# if GCRYPT_VERSION_NUMBER < 0x010600
+ gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
+# endif
+ gcry_check_version (NULL);
+ gcry_control (GCRYCTL_INIT_SECMEM, 32768);
+ gcry_control (GCRYCTL_INITIALIZATION_FINISHED);
+} /* }}} void network_init_gcrypt */
+
static gcry_cipher_hd_t network_get_aes256_cypher (sockent_t *se, /* {{{ */
const void *iv, size_t iv_size, const char *username)
{
part_header_t pkg_head;
uint64_t pkg_value;
-
+
int offset;
packet_len = sizeof (pkg_head) + sizeof (pkg_value);
&tmp);
if (status == 0)
{
- vl.time = (time_t) tmp;
- n.time = (time_t) tmp;
+ vl.time = TIME_T_TO_CDTIME_T (tmp);
+ n.time = TIME_T_TO_CDTIME_T (tmp);
+ }
+ }
+ else if (pkg_type == TYPE_TIME_HR)
+ {
+ uint64_t tmp = 0;
+ status = parse_part_number (&buffer, &buffer_size,
+ &tmp);
+ if (status == 0)
+ {
+ vl.time = (cdtime_t) tmp;
+ n.time = (cdtime_t) tmp;
}
}
else if (pkg_type == TYPE_INTERVAL)
status = parse_part_number (&buffer, &buffer_size,
&tmp);
if (status == 0)
- vl.interval = (int) tmp;
+ vl.interval = TIME_T_TO_CDTIME_T (tmp);
+ }
+ else if (pkg_type == TYPE_INTERVAL_HR)
+ {
+ uint64_t tmp = 0;
+ status = parse_part_number (&buffer, &buffer_size,
+ &tmp);
+ if (status == 0)
+ vl.interval = (cdtime_t) tmp;
}
else if (pkg_type == TYPE_HOST)
{
sizeof (network_config_ttl)) != 0)
{
char errbuf[1024];
- ERROR ("setsockopt: %s",
+ ERROR ("network plugin: setsockopt (ipv4-ttl): %s",
sstrerror (errno, errbuf, sizeof (errbuf)));
return (-1);
}
sizeof (network_config_ttl)) != 0)
{
char errbuf[1024];
- ERROR ("setsockopt: %s",
+ ERROR ("network plugin: setsockopt(ipv6-ttl): %s",
sstrerror (errno, errbuf,
sizeof (errbuf)));
return (-1);
&mreq, sizeof (mreq)) != 0)
{
char errbuf[1024];
- ERROR ("setsockopt: %s",
+ ERROR ("network plugin: setsockopt (ipv4-multicast-if): %s",
sstrerror (errno, errbuf, sizeof (errbuf)));
return (-1);
}
sizeof (se->interface)) != 0)
{
char errbuf[1024];
- ERROR ("setsockopt: %s",
+ ERROR ("network plugin: setsockopt (ipv6-multicast-if): %s",
sstrerror (errno, errbuf,
sizeof (errbuf)));
return (-1);
sizeof(interface_name)) == -1 )
{
char errbuf[1024];
- ERROR ("setsockopt: %s",
+ ERROR ("network plugin: setsockopt (bind-if): %s",
sstrerror (errno, errbuf, sizeof (errbuf)));
return (-1);
}
static int network_bind_socket (int fd, const struct addrinfo *ai, const int interface_idx)
{
+#if KERNEL_SOLARIS
+ char loop = 0;
+#else
int loop = 0;
+#endif
int yes = 1;
/* allow multiple sockets to use the same PORT number */
if (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR,
&yes, sizeof(yes)) == -1) {
char errbuf[1024];
- ERROR ("setsockopt: %s",
+ ERROR ("network plugin: setsockopt (reuseaddr): %s",
sstrerror (errno, errbuf, sizeof (errbuf)));
return (-1);
}
&loop, sizeof (loop)) == -1)
{
char errbuf[1024];
- ERROR ("setsockopt: %s",
+ ERROR ("network plugin: setsockopt (multicast-loop): %s",
sstrerror (errno, errbuf,
sizeof (errbuf)));
return (-1);
&mreq, sizeof (mreq)) == -1)
{
char errbuf[1024];
- ERROR ("setsockopt: %s",
+ ERROR ("network plugin: setsockopt (add-membership): %s",
sstrerror (errno, errbuf,
sizeof (errbuf)));
return (-1);
&loop, sizeof (loop)) == -1)
{
char errbuf[1024];
- ERROR ("setsockopt: %s",
+ ERROR ("network plugin: setsockopt (ipv6-multicast-loop): %s",
sstrerror (errno, errbuf,
sizeof (errbuf)));
return (-1);
&mreq, sizeof (mreq)) == -1)
{
char errbuf[1024];
- ERROR ("setsockopt: %s",
+ ERROR ("network plugin: setsockopt (ipv6-add-membership): %s",
sstrerror (errno, errbuf,
sizeof (errbuf)));
return (-1);
sizeof(interface_name)) == -1 )
{
char errbuf[1024];
- ERROR ("setsockopt: %s",
+ ERROR ("network plugin: setsockopt (bind-if): %s",
sstrerror (errno, errbuf, sizeof (errbuf)));
return (-1);
}
/* Initialize a sockent structure. `type' must be either `SOCKENT_TYPE_CLIENT'
* or `SOCKENT_TYPE_SERVER' */
-static int sockent_init (sockent_t *se, int type) /* {{{ */
+static sockent_t *sockent_create (int type) /* {{{ */
{
- if (se == NULL)
- return (-1);
+ sockent_t *se;
+
+ if ((type != SOCKENT_TYPE_CLIENT) && (type != SOCKENT_TYPE_SERVER))
+ return (NULL);
+ se = malloc (sizeof (*se));
+ if (se == NULL)
+ return (NULL);
memset (se, 0, sizeof (*se));
- se->type = SOCKENT_TYPE_CLIENT;
+ se->type = type;
se->node = NULL;
se->service = NULL;
se->interface = 0;
if (type == SOCKENT_TYPE_SERVER)
{
- se->type = SOCKENT_TYPE_SERVER;
se->data.server.fd = NULL;
+ se->data.server.fd_num = 0;
#if HAVE_LIBGCRYPT
se->data.server.security_level = SECURITY_LEVEL_NONE;
se->data.server.auth_file = NULL;
#endif
}
- return (0);
-} /* }}} int sockent_init */
+ return (se);
+} /* }}} sockent_t *sockent_create */
-/* Open the file descriptors for a initialized sockent structure. */
-static int sockent_open (sockent_t *se) /* {{{ */
+static int sockent_init_crypto (sockent_t *se) /* {{{ */
{
- struct addrinfo ai_hints;
- struct addrinfo *ai_list, *ai_ptr;
- int ai_return;
-
- const char *node;
- const char *service;
-
- if (se == NULL)
- return (-1);
-
- /* Set up the security structures. */
#if HAVE_LIBGCRYPT /* {{{ */
if (se->type == SOCKENT_TYPE_CLIENT)
{
if (se->data.client.security_level > SECURITY_LEVEL_NONE)
{
+ network_init_gcrypt ();
+
if ((se->data.client.username == NULL)
|| (se->data.client.password == NULL))
{
{
if (se->data.server.security_level > SECURITY_LEVEL_NONE)
{
+ network_init_gcrypt ();
+
if (se->data.server.auth_file == NULL)
{
ERROR ("network plugin: Server socket with "
}
#endif /* }}} HAVE_LIBGCRYPT */
+ return (0);
+} /* }}} int sockent_init_crypto */
+
+static int sockent_client_connect (sockent_t *se) /* {{{ */
+{
+ static c_complain_t complaint = C_COMPLAIN_INIT_STATIC;
+
+ struct sockent_client *client;
+ struct addrinfo ai_hints;
+ struct addrinfo *ai_list = NULL, *ai_ptr;
+ int status;
+
+ if ((se == NULL) || (se->type != SOCKENT_TYPE_CLIENT))
+ return (EINVAL);
+
+ client = &se->data.client;
+ if (client->fd >= 0) /* already connected */
+ return (0);
+
+ memset (&ai_hints, 0, sizeof (ai_hints));
+#ifdef AI_ADDRCONFIG
+ ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+ ai_hints.ai_family = AF_UNSPEC;
+ ai_hints.ai_socktype = SOCK_DGRAM;
+ ai_hints.ai_protocol = IPPROTO_UDP;
+
+ status = getaddrinfo (se->node,
+ (se->service != NULL) ? se->service : NET_DEFAULT_PORT,
+ &ai_hints, &ai_list);
+ if (status != 0)
+ {
+ c_complain (LOG_ERR, &complaint,
+ "network plugin: getaddrinfo (%s, %s) failed: %s",
+ (se->node == NULL) ? "(null)" : se->node,
+ (se->service == NULL) ? "(null)" : se->service,
+ gai_strerror (status));
+ return (-1);
+ }
+ else
+ {
+ c_release (LOG_NOTICE, &complaint,
+ "network plugin: Successfully resolved \"%s\".",
+ se->node);
+ }
+
+ for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+ {
+ client->fd = socket (ai_ptr->ai_family,
+ ai_ptr->ai_socktype,
+ ai_ptr->ai_protocol);
+ if (client->fd < 0)
+ {
+ char errbuf[1024];
+ ERROR ("network plugin: socket(2) failed: %s",
+ sstrerror (errno, errbuf,
+ sizeof (errbuf)));
+ continue;
+ }
+
+ client->addr = malloc (sizeof (*client->addr));
+ if (client->addr == NULL)
+ {
+ ERROR ("network plugin: malloc failed.");
+ close (client->fd);
+ client->fd = -1;
+ continue;
+ }
+
+ memset (client->addr, 0, sizeof (*client->addr));
+ assert (sizeof (*client->addr) >= ai_ptr->ai_addrlen);
+ memcpy (client->addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+ client->addrlen = ai_ptr->ai_addrlen;
+
+ network_set_ttl (se, ai_ptr);
+ network_set_interface (se, ai_ptr);
+
+ /* We don't open more than one write-socket per
+ * node/service pair.. */
+ break;
+ }
+
+ freeaddrinfo (ai_list);
+ if (client->fd < 0)
+ return (-1);
+ return (0);
+} /* }}} int sockent_client_connect */
+
+static int sockent_client_disconnect (sockent_t *se) /* {{{ */
+{
+ struct sockent_client *client;
+
+ if ((se == NULL) || (se->type != SOCKENT_TYPE_CLIENT))
+ return (EINVAL);
+
+ client = &se->data.client;
+ if (client->fd >= 0) /* connected */
+ {
+ close (client->fd);
+ client->fd = -1;
+ }
+
+ sfree (client->addr);
+ client->addrlen = 0;
+
+ return (0);
+} /* }}} int sockent_client_disconnect */
+
+/* Open the file descriptors for a initialized sockent structure. */
+static int sockent_server_listen (sockent_t *se) /* {{{ */
+{
+ struct addrinfo ai_hints;
+ struct addrinfo *ai_list, *ai_ptr;
+ int status;
+
+ const char *node;
+ const char *service;
+
+ if (se == NULL)
+ return (-1);
+
+ assert (se->data.server.fd == NULL);
+ assert (se->data.server.fd_num == 0);
+
node = se->node;
service = se->service;
if (service == NULL)
service = NET_DEFAULT_PORT;
- DEBUG ("network plugin: sockent_open: node = %s; service = %s;",
+ DEBUG ("network plugin: sockent_server_listen: node = %s; service = %s;",
node, service);
memset (&ai_hints, 0, sizeof (ai_hints));
ai_hints.ai_socktype = SOCK_DGRAM;
ai_hints.ai_protocol = IPPROTO_UDP;
- ai_return = getaddrinfo (node, service, &ai_hints, &ai_list);
- if (ai_return != 0)
+ status = getaddrinfo (node, service, &ai_hints, &ai_list);
+ if (status != 0)
{
ERROR ("network plugin: getaddrinfo (%s, %s) failed: %s",
(se->node == NULL) ? "(null)" : se->node,
(se->service == NULL) ? "(null)" : se->service,
- gai_strerror (ai_return));
+ gai_strerror (status));
return (-1);
}
for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
{
- int status;
+ int *tmp;
- if (se->type == SOCKENT_TYPE_SERVER) /* {{{ */
+ tmp = realloc (se->data.server.fd,
+ sizeof (*tmp) * (se->data.server.fd_num + 1));
+ if (tmp == NULL)
{
- int *tmp;
-
- tmp = realloc (se->data.server.fd,
- sizeof (*tmp) * (se->data.server.fd_num + 1));
- if (tmp == NULL)
- {
- ERROR ("network plugin: realloc failed.");
- continue;
- }
- se->data.server.fd = tmp;
- tmp = se->data.server.fd + se->data.server.fd_num;
-
- *tmp = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
- ai_ptr->ai_protocol);
- if (*tmp < 0)
- {
- char errbuf[1024];
- ERROR ("network plugin: socket(2) failed: %s",
- sstrerror (errno, errbuf,
- sizeof (errbuf)));
- continue;
- }
-
- status = network_bind_socket (*tmp, ai_ptr, se->interface);
- if (status != 0)
- {
- close (*tmp);
- *tmp = -1;
- continue;
- }
-
- se->data.server.fd_num++;
+ ERROR ("network plugin: realloc failed.");
continue;
- } /* }}} if (se->type == SOCKENT_TYPE_SERVER) */
- else /* if (se->type == SOCKENT_TYPE_CLIENT) {{{ */
- {
- se->data.client.fd = socket (ai_ptr->ai_family,
- ai_ptr->ai_socktype,
- ai_ptr->ai_protocol);
- if (se->data.client.fd < 0)
- {
- char errbuf[1024];
- ERROR ("network plugin: socket(2) failed: %s",
- sstrerror (errno, errbuf,
- sizeof (errbuf)));
- continue;
- }
-
- se->data.client.addr = malloc (sizeof (*se->data.client.addr));
- if (se->data.client.addr == NULL)
- {
- ERROR ("network plugin: malloc failed.");
- close (se->data.client.fd);
- se->data.client.fd = -1;
- continue;
- }
+ }
+ se->data.server.fd = tmp;
+ tmp = se->data.server.fd + se->data.server.fd_num;
- memset (se->data.client.addr, 0, sizeof (*se->data.client.addr));
- assert (sizeof (*se->data.client.addr) >= ai_ptr->ai_addrlen);
- memcpy (se->data.client.addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
- se->data.client.addrlen = ai_ptr->ai_addrlen;
+ *tmp = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
+ ai_ptr->ai_protocol);
+ if (*tmp < 0)
+ {
+ char errbuf[1024];
+ ERROR ("network plugin: socket(2) failed: %s",
+ sstrerror (errno, errbuf,
+ sizeof (errbuf)));
+ continue;
+ }
- network_set_ttl (se, ai_ptr);
- network_set_interface (se, ai_ptr);
+ status = network_bind_socket (*tmp, ai_ptr, se->interface);
+ if (status != 0)
+ {
+ close (*tmp);
+ *tmp = -1;
+ continue;
+ }
- /* We don't open more than one write-socket per
- * node/service pair.. */
- break;
- } /* }}} if (se->type == SOCKENT_TYPE_CLIENT) */
+ se->data.server.fd_num++;
+ continue;
} /* for (ai_list) */
freeaddrinfo (ai_list);
- /* Check if all went well. */
- if (se->type == SOCKENT_TYPE_SERVER)
- {
- if (se->data.server.fd_num <= 0)
- return (-1);
- }
- else /* if (se->type == SOCKENT_TYPE_CLIENT) */
- {
- if (se->data.client.fd < 0)
- return (-1);
- }
-
+ if (se->data.server.fd_num <= 0)
+ return (-1);
return (0);
-} /* }}} int sockent_open */
+} /* }}} int sockent_server_listen */
/* Add a sockent to the global list of sockets */
static int sockent_add (sockent_t *se) /* {{{ */
int buffer_len;
int i;
- int status;
+ int status = 0;
receive_list_entry_t *private_list_head;
receive_list_entry_t *private_list_tail;
uint64_t private_list_length;
- assert (listen_sockets_num > 0);
+ assert (listen_sockets_num > 0);
private_list_head = NULL;
private_list_tail = NULL;
while (listen_loop == 0)
{
status = poll (listen_sockets_pollfd, listen_sockets_num, -1);
-
if (status <= 0)
{
char errbuf[1024];
if (errno == EINTR)
continue;
- ERROR ("poll failed: %s",
+ ERROR ("network plugin: poll(2) failed: %s",
sstrerror (errno, errbuf, sizeof (errbuf)));
- return (-1);
+ break;
}
for (i = 0; (i < listen_sockets_num) && (status > 0); i++)
if (buffer_len < 0)
{
char errbuf[1024];
- ERROR ("recv failed: %s",
- sstrerror (errno, errbuf,
- sizeof (errbuf)));
- return (-1);
+ status = (errno != 0) ? errno : -1;
+ ERROR ("network plugin: recv(2) failed: %s",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ break;
}
stats_octets_rx += ((uint64_t) buffer_len);
if (ent == NULL)
{
ERROR ("network plugin: malloc failed.");
- return (-1);
+ status = ENOMEM;
+ break;
}
memset (ent, 0, sizeof (receive_list_entry_t));
ent->data = malloc (network_config_packet_size);
{
sfree (ent);
ERROR ("network plugin: malloc failed.");
- return (-1);
+ status = ENOMEM;
+ break;
}
ent->fd = listen_sockets_pollfd[i].fd;
ent->next = NULL;
private_list_tail = NULL;
private_list_length = 0;
}
+
+ status = 0;
} /* for (listen_sockets_pollfd) */
+
+ if (status != 0)
+ break;
} /* while (listen_loop == 0) */
/* Make sure everything is dispatched before exiting. */
receive_list_tail = private_list_tail;
receive_list_length += private_list_length;
- private_list_head = NULL;
- private_list_tail = NULL;
- private_list_length = 0;
-
pthread_cond_signal (&receive_list_cond);
pthread_mutex_unlock (&receive_list_lock);
}
- return (0);
+ return (status);
} /* }}} int network_receive */
static void *receive_thread (void __attribute__((unused)) *arg)
memset (&send_buffer_vl, 0, sizeof (send_buffer_vl));
} /* int network_init_buffer */
-static void networt_send_buffer_plain (const sockent_t *se, /* {{{ */
+static void networt_send_buffer_plain (sockent_t *se, /* {{{ */
const char *buffer, size_t buffer_size)
{
int status;
while (42)
{
+ status = sockent_client_connect (se);
+ if (status != 0)
+ return;
+
status = sendto (se->data.client.fd, buffer, buffer_size,
- /* flags = */ 0,
- (struct sockaddr *) se->data.client.addr,
- se->data.client.addrlen);
- if (status < 0)
+ /* flags = */ 0,
+ (struct sockaddr *) se->data.client.addr,
+ se->data.client.addrlen);
+ if (status < 0)
{
char errbuf[1024];
- if (errno == EINTR)
+
+ if ((errno == EINTR) || (errno == EAGAIN))
continue;
- ERROR ("network plugin: sendto failed: %s",
- sstrerror (errno, errbuf,
- sizeof (errbuf)));
- break;
+
+ ERROR ("network plugin: sendto failed: %s. Closing sending socket.",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ sockent_client_disconnect (se);
+ return;
}
break;
buffer_offset += (s); \
} while (0)
-static void networt_send_buffer_signed (const sockent_t *se, /* {{{ */
+static void networt_send_buffer_signed (sockent_t *se, /* {{{ */
const char *in_buffer, size_t in_buffer_size)
{
part_signature_sha256_t ps;
if (vl_def->time != vl->time)
{
- if (write_part_number (&buffer, &buffer_size, TYPE_TIME,
+ if (write_part_number (&buffer, &buffer_size, TYPE_TIME_HR,
(uint64_t) vl->time))
return (-1);
vl_def->time = vl->time;
if (vl_def->interval != vl->interval)
{
- if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL,
+ if (write_part_number (&buffer, &buffer_size, TYPE_INTERVAL_HR,
(uint64_t) vl->interval))
return (-1);
vl_def->interval = vl->interval;
return (-1);
sstrncpy (vl_def->type_instance, vl->type_instance, sizeof (vl_def->type_instance));
}
-
+
if (write_part_values (&buffer, &buffer_size, ds, vl) != 0)
return (-1);
ERROR ("network plugin: Unable to append to the "
"buffer for some weird reason");
}
- else if (send_buffer_fill >= 1452 - 15)
+ else if ((network_config_packet_size - send_buffer_fill) < 15)
{
flush_buffer ();
}
tmp = (int) ci->values[0].value.number;
if ((tmp > 0) && (tmp <= 255))
network_config_ttl = tmp;
+ else {
+ WARNING ("network plugin: The `TimeToLive' must be between 1 and 255.");
+ return (-1);
+ }
return (0);
} /* }}} int network_config_set_ttl */
return (-1);
}
- se = malloc (sizeof (*se));
+ se = sockent_create (SOCKENT_TYPE_SERVER);
if (se == NULL)
{
- ERROR ("network plugin: malloc failed.");
+ ERROR ("network plugin: sockent_create failed.");
return (-1);
}
- sockent_init (se, SOCKENT_TYPE_SERVER);
se->node = strdup (ci->values[0].value.string);
if (ci->values_num >= 2)
}
#endif /* HAVE_LIBGCRYPT */
- status = sockent_open (se);
+ status = sockent_init_crypto (se);
+ if (status != 0)
+ {
+ ERROR ("network plugin: network_config_add_listen: sockent_init_crypto() failed.");
+ sockent_destroy (se);
+ return (-1);
+ }
+
+ status = sockent_server_listen (se);
if (status != 0)
{
- ERROR ("network plugin: network_config_add_listen: sockent_open failed.");
+ ERROR ("network plugin: network_config_add_server: sockent_server_listen failed.");
sockent_destroy (se);
return (-1);
}
return (-1);
}
- se = malloc (sizeof (*se));
+ se = sockent_create (SOCKENT_TYPE_CLIENT);
if (se == NULL)
{
- ERROR ("network plugin: malloc failed.");
+ ERROR ("network plugin: sockent_create failed.");
return (-1);
}
- sockent_init (se, SOCKENT_TYPE_CLIENT);
se->node = strdup (ci->values[0].value.string);
if (ci->values_num >= 2)
}
#endif /* HAVE_LIBGCRYPT */
- status = sockent_open (se);
+ status = sockent_init_crypto (se);
if (status != 0)
{
- ERROR ("network plugin: network_config_add_server: sockent_open failed.");
+ ERROR ("network plugin: network_config_add_server: sockent_init_crypto() failed.");
sockent_destroy (se);
return (-1);
}
+ /* No call to sockent_client_connect() here -- it is called from
+ * networt_send_buffer_plain(). */
+
status = sockent_add (se);
if (status != 0)
{
{
int i;
+ /* The options need to be applied first */
+ for (i = 0; i < ci->children_num; i++)
+ {
+ oconfig_item_t *child = ci->children + i;
+ if (strcasecmp ("TimeToLive", child->key) == 0)
+ network_config_set_ttl (child);
+ }
+
for (i = 0; i < ci->children_num; i++)
{
oconfig_item_t *child = ci->children + i;
network_config_add_listen (child);
else if (strcasecmp ("Server", child->key) == 0)
network_config_add_server (child);
- else if (strcasecmp ("TimeToLive", child->key) == 0)
- network_config_set_ttl (child);
+ else if (strcasecmp ("TimeToLive", child->key) == 0) {
+ /* Handled earlier */
+ }
else if (strcasecmp ("MaxPacketSize", child->key) == 0)
network_config_set_buffer_size (child);
else if (strcasecmp ("Forward", child->key) == 0)
network_config_set_boolean (child, &network_config_forward);
else if (strcasecmp ("ReportStats", child->key) == 0)
network_config_set_boolean (child, &network_config_stats);
- else if (strcasecmp ("CacheFlush", child->key) == 0)
- /* no op for backwards compatibility only */;
else
{
WARNING ("network plugin: Option `%s' is not allowed here.",
memset (buffer, 0, sizeof (buffer));
- status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME,
+ status = write_part_number (&buffer_ptr, &buffer_free, TYPE_TIME_HR,
(uint64_t) n->time);
if (status != 0)
return (-1);
static int network_shutdown (void)
{
+ sockent_t *se;
+
listen_loop++;
/* Kill the listening thread */
sfree (send_buffer);
- /* TODO: Close `sending_sockets' */
+ for (se = sending_sockets; se != NULL; se = se->next)
+ sockent_client_disconnect (se);
+ sockent_destroy (sending_sockets);
plugin_unregister_config ("network");
plugin_unregister_init ("network");
static int network_stats_read (void) /* {{{ */
{
- uint64_t copy_octets_rx;
- uint64_t copy_octets_tx;
- uint64_t copy_packets_rx;
- uint64_t copy_packets_tx;
- uint64_t copy_values_dispatched;
- uint64_t copy_values_not_dispatched;
- uint64_t copy_values_sent;
- uint64_t copy_values_not_sent;
- uint64_t copy_receive_list_length;
+ derive_t copy_octets_rx;
+ derive_t copy_octets_tx;
+ derive_t copy_packets_rx;
+ derive_t copy_packets_tx;
+ derive_t copy_values_dispatched;
+ derive_t copy_values_not_dispatched;
+ derive_t copy_values_sent;
+ derive_t copy_values_not_sent;
+ derive_t copy_receive_list_length;
value_list_t vl = VALUE_LIST_INIT;
value_t values[2];
vl.values = values;
vl.values_len = 2;
vl.time = 0;
- vl.interval = interval_g;
sstrncpy (vl.host, hostname_g, sizeof (vl.host));
sstrncpy (vl.plugin, "network", sizeof (vl.plugin));
/* Octets received / sent */
- vl.values[0].counter = (counter_t) copy_octets_rx;
- vl.values[1].counter = (counter_t) copy_octets_tx;
+ vl.values[0].derive = (derive_t) copy_octets_rx;
+ vl.values[1].derive = (derive_t) copy_octets_tx;
sstrncpy (vl.type, "if_octets", sizeof (vl.type));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
/* Packets received / send */
- vl.values[0].counter = (counter_t) copy_packets_rx;
- vl.values[1].counter = (counter_t) copy_packets_tx;
+ vl.values[0].derive = (derive_t) copy_packets_rx;
+ vl.values[1].derive = (derive_t) copy_packets_tx;
sstrncpy (vl.type, "if_packets", sizeof (vl.type));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
/* Values (not) dispatched and (not) send */
sstrncpy (vl.type, "total_values", sizeof (vl.type));
vl.values[0].derive = (derive_t) copy_values_dispatched;
sstrncpy (vl.type_instance, "dispatch-accepted",
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
vl.values[0].derive = (derive_t) copy_values_not_dispatched;
sstrncpy (vl.type_instance, "dispatch-rejected",
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
vl.values[0].derive = (derive_t) copy_values_sent;
sstrncpy (vl.type_instance, "send-accepted",
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
vl.values[0].derive = (derive_t) copy_values_not_sent;
sstrncpy (vl.type_instance, "send-rejected",
sizeof (vl.type_instance));
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
/* Receive queue length */
vl.values[0].gauge = (gauge_t) copy_receive_list_length;
sstrncpy (vl.type, "queue_length", sizeof (vl.type));
vl.type_instance[0] = 0;
- plugin_dispatch_values_secure (&vl);
+ plugin_dispatch_values (&vl);
return (0);
} /* }}} int network_stats_read */
static int network_init (void)
{
- static _Bool have_init = false;
+ static _Bool have_init = 0;
/* Check if we were already initialized. If so, just return - there's
* nothing more to do (for now, that is). */
if (have_init)
return (0);
- have_init = true;
+ have_init = 1;
#if HAVE_LIBGCRYPT
- gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
- gcry_control (GCRYCTL_INIT_SECMEM, 32768, 0);
- gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
+ network_init_gcrypt ();
#endif
if (network_config_stats != 0)
if (dispatch_thread_running == 0)
{
int status;
- status = pthread_create (&dispatch_thread_id,
+ status = plugin_thread_create (&dispatch_thread_id,
NULL /* no attributes */,
dispatch_thread,
NULL /* no argument */);
if (receive_thread_running == 0)
{
int status;
- status = pthread_create (&receive_thread_id,
+ status = plugin_thread_create (&receive_thread_id,
NULL /* no attributes */,
receive_thread,
NULL /* no argument */);
return (0);
} /* int network_init */
-/*
+/*
* The flush option of the network plugin cannot flush individual identifiers.
* All the values are added to a buffer and sent when the buffer is full, the
* requested value may or may not be in there, it's not worth finding out. We
* just send the buffer if `flush' is called - if the requested value was in
* there, good. If not, well, then there is nothing to flush.. -octo
*/
-static int network_flush (int timeout,
- const char __attribute__((unused)) *identifier,
- user_data_t __attribute__((unused)) *user_data)
+static int network_flush (__attribute__((unused)) cdtime_t timeout,
+ __attribute__((unused)) const char *identifier,
+ __attribute__((unused)) user_data_t *user_data)
{
pthread_mutex_lock (&send_buffer_lock);