* Aman Gupta <aman at tmm1.net>
**/
+#define _DEFAULT_SOURCE
#define _BSD_SOURCE /* For struct ip_mreq */
#include "collectd.h"
/* Re enable deprecation warnings */
# pragma GCC diagnostic warning "-Wdeprecated-declarations"
# endif
+# if GCRYPT_VERSION_NUMBER < 0x010600
GCRY_THREAD_OPTION_PTHREAD_IMPL;
+# endif
#endif
#ifndef IPV6_ADD_MEMBERSHIP
gcry_cipher_hd_t cypher;
unsigned char password_hash[32];
#endif
+ cdtime_t next_resolve_reconnect;
+ cdtime_t resolve_interval;
};
struct sockent_server
{
c_complain_once (LOG_ERR, &complain_forwarding,
"network plugin: A notification has been received via the network "
- "forwarding if enabled. Forwarding of notifications is currently "
+ "and forwarding is enabled. Forwarding of notifications is currently "
"not supported, because there is not loop-deteciton available. "
"Please contact the collectd mailing list if you need this "
"feature.");
if (gcry_control (GCRYCTL_ANY_INITIALIZATION_P))
return;
- gcry_check_version (NULL); /* before calling any other functions */
+ /* http://www.gnupg.org/documentation/manuals/gcrypt/Multi_002dThreading.html
+ * To ensure thread-safety, it's important to set GCRYCTL_SET_THREAD_CBS
+ * *before* initalizing Libgcrypt with gcry_check_version(), which itself must
+ * be called before any other gcry_* function. GCRYCTL_ANY_INITIALIZATION_P
+ * above doesn't count, as it doesn't implicitly initalize Libgcrypt.
+ *
+ * tl;dr: keep all these gry_* statements in this exact order please. */
+# if GCRYPT_VERSION_NUMBER < 0x010600
gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
+# endif
+ gcry_check_version (NULL);
gcry_control (GCRYCTL_INIT_SECMEM, 32768);
gcry_control (GCRYCTL_INITIALIZATION_FINISHED);
} /* }}} void network_init_gcrypt */
} /* int parse_part_number */
static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len,
- char *output, int output_len)
+ char *output, size_t const output_len)
{
char *buffer = *ret_buffer;
size_t buffer_len = *ret_buffer_len;
uint16_t tmp16;
- size_t header_size = 2 * sizeof (uint16_t);
+ size_t const header_size = 2 * sizeof (uint16_t);
uint16_t pkg_length;
+ size_t payload_size;
+
+ if (output_len <= 0)
+ return (EINVAL);
if (buffer_len < header_size)
{
memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
buffer += sizeof (tmp16);
pkg_length = ntohs (tmp16);
+ payload_size = ((size_t) pkg_length) - header_size;
/* Check that packet fits in the input buffer */
if (pkg_length > buffer_len)
/* Check that the package data fits into the output buffer.
* The previous if-statement ensures that:
* `pkg_length > header_size' */
- if ((output_len < 0)
- || ((size_t) output_len < ((size_t) pkg_length - header_size)))
+ if (output_len < payload_size)
{
WARNING ("network plugin: parse_part_string: "
- "Output buffer too small.");
+ "Buffer too small: "
+ "Output buffer holds %zu bytes, "
+ "which is too small to hold the received "
+ "%zu byte string.",
+ output_len, payload_size);
return (-1);
}
/* All sanity checks successfull, let's copy the data over */
- output_len = pkg_length - header_size;
- memcpy ((void *) output, (void *) buffer, output_len);
- buffer += output_len;
+ memcpy ((void *) output, (void *) buffer, payload_size);
+ buffer += payload_size;
/* For some very weird reason '\0' doesn't do the trick on SPARC in
* this statement. */
- if (output[output_len - 1] != 0)
+ if (output[payload_size - 1] != 0)
{
WARNING ("network plugin: parse_part_string: "
"Received string does not end "
/* Initialize a sockent structure. `type' must be either `SOCKENT_TYPE_CLIENT'
* or `SOCKENT_TYPE_SERVER' */
-static int sockent_init (sockent_t *se, int type) /* {{{ */
+static sockent_t *sockent_create (int type) /* {{{ */
{
- if (se == NULL)
- return (-1);
+ sockent_t *se;
+
+ if ((type != SOCKENT_TYPE_CLIENT) && (type != SOCKENT_TYPE_SERVER))
+ return (NULL);
+ se = malloc (sizeof (*se));
+ if (se == NULL)
+ return (NULL);
memset (se, 0, sizeof (*se));
- se->type = SOCKENT_TYPE_CLIENT;
+ se->type = type;
se->node = NULL;
se->service = NULL;
se->interface = 0;
if (type == SOCKENT_TYPE_SERVER)
{
- se->type = SOCKENT_TYPE_SERVER;
se->data.server.fd = NULL;
#if HAVE_LIBGCRYPT
se->data.server.security_level = SECURITY_LEVEL_NONE;
{
se->data.client.fd = -1;
se->data.client.addr = NULL;
+ se->data.client.resolve_interval = 0;
+ se->data.client.next_resolve_reconnect = 0;
#if HAVE_LIBGCRYPT
se->data.client.security_level = SECURITY_LEVEL_NONE;
se->data.client.username = NULL;
#endif
}
- return (0);
-} /* }}} int sockent_init */
+ return (se);
+} /* }}} sockent_t *sockent_create */
-/* Open the file descriptors for a initialized sockent structure. */
-static int sockent_open (sockent_t *se) /* {{{ */
+static int sockent_init_crypto (sockent_t *se) /* {{{ */
{
- struct addrinfo ai_hints;
- struct addrinfo *ai_list, *ai_ptr;
- int ai_return;
-
- const char *node;
- const char *service;
-
- if (se == NULL)
- return (-1);
-
- /* Set up the security structures. */
#if HAVE_LIBGCRYPT /* {{{ */
if (se->type == SOCKENT_TYPE_CLIENT)
{
}
#endif /* }}} HAVE_LIBGCRYPT */
+ return (0);
+} /* }}} int sockent_init_crypto */
+
+static int sockent_client_disconnect (sockent_t *se) /* {{{ */
+{
+ struct sockent_client *client;
+
+ if ((se == NULL) || (se->type != SOCKENT_TYPE_CLIENT))
+ return (EINVAL);
+
+ client = &se->data.client;
+ if (client->fd >= 0) /* connected */
+ {
+ close (client->fd);
+ client->fd = -1;
+ }
+
+ sfree (client->addr);
+ client->addrlen = 0;
+
+ return (0);
+} /* }}} int sockent_client_disconnect */
+
+static int sockent_client_connect (sockent_t *se) /* {{{ */
+{
+ static c_complain_t complaint = C_COMPLAIN_INIT_STATIC;
+
+ struct sockent_client *client;
+ struct addrinfo ai_hints;
+ struct addrinfo *ai_list = NULL, *ai_ptr;
+ int status;
+ _Bool reconnect = 0;
+ cdtime_t now;
+
+ if ((se == NULL) || (se->type != SOCKENT_TYPE_CLIENT))
+ return (EINVAL);
+
+ client = &se->data.client;
+
+ now = cdtime ();
+ if (client->resolve_interval != 0 && client->next_resolve_reconnect < now) {
+ DEBUG("network plugin: Reconnecting socket, resolve_interval = %lf, next_resolve_reconnect = %lf",
+ CDTIME_T_TO_DOUBLE(client->resolve_interval), CDTIME_T_TO_DOUBLE(client->next_resolve_reconnect));
+ reconnect = 1;
+ }
+
+ if (client->fd >= 0 && !reconnect) /* already connected and not stale*/
+ return (0);
+
+ memset (&ai_hints, 0, sizeof (ai_hints));
+#ifdef AI_ADDRCONFIG
+ ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+ ai_hints.ai_family = AF_UNSPEC;
+ ai_hints.ai_socktype = SOCK_DGRAM;
+ ai_hints.ai_protocol = IPPROTO_UDP;
+
+ status = getaddrinfo (se->node,
+ (se->service != NULL) ? se->service : NET_DEFAULT_PORT,
+ &ai_hints, &ai_list);
+ if (status != 0)
+ {
+ c_complain (LOG_ERR, &complaint,
+ "network plugin: getaddrinfo (%s, %s) failed: %s",
+ (se->node == NULL) ? "(null)" : se->node,
+ (se->service == NULL) ? "(null)" : se->service,
+ gai_strerror (status));
+ return (-1);
+ }
+ else
+ {
+ c_release (LOG_NOTICE, &complaint,
+ "network plugin: Successfully resolved \"%s\".",
+ se->node);
+ }
+
+ for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+ {
+ if (client->fd >= 0) /* when we reconnect */
+ sockent_client_disconnect(se);
+
+ client->fd = socket (ai_ptr->ai_family,
+ ai_ptr->ai_socktype,
+ ai_ptr->ai_protocol);
+ if (client->fd < 0)
+ {
+ char errbuf[1024];
+ ERROR ("network plugin: socket(2) failed: %s",
+ sstrerror (errno, errbuf,
+ sizeof (errbuf)));
+ continue;
+ }
+
+ client->addr = malloc (sizeof (*client->addr));
+ if (client->addr == NULL)
+ {
+ ERROR ("network plugin: malloc failed.");
+ close (client->fd);
+ client->fd = -1;
+ continue;
+ }
+
+ memset (client->addr, 0, sizeof (*client->addr));
+ assert (sizeof (*client->addr) >= ai_ptr->ai_addrlen);
+ memcpy (client->addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+ client->addrlen = ai_ptr->ai_addrlen;
+
+ network_set_ttl (se, ai_ptr);
+ network_set_interface (se, ai_ptr);
+
+ /* We don't open more than one write-socket per
+ * node/service pair.. */
+ break;
+ }
+
+ freeaddrinfo (ai_list);
+ if (client->fd < 0)
+ return (-1);
+
+ if (client->resolve_interval > 0)
+ client->next_resolve_reconnect = now + client->resolve_interval;
+ return (0);
+} /* }}} int sockent_client_connect */
+
+/* Open the file descriptors for a initialized sockent structure. */
+static int sockent_server_listen (sockent_t *se) /* {{{ */
+{
+ struct addrinfo ai_hints;
+ struct addrinfo *ai_list, *ai_ptr;
+ int status;
+
+ const char *node;
+ const char *service;
+
+ if (se == NULL)
+ return (-1);
+
node = se->node;
service = se->service;
if (service == NULL)
service = NET_DEFAULT_PORT;
- DEBUG ("network plugin: sockent_open: node = %s; service = %s;",
+ DEBUG ("network plugin: sockent_server_listen: node = %s; service = %s;",
node, service);
memset (&ai_hints, 0, sizeof (ai_hints));
ai_hints.ai_socktype = SOCK_DGRAM;
ai_hints.ai_protocol = IPPROTO_UDP;
- ai_return = getaddrinfo (node, service, &ai_hints, &ai_list);
- if (ai_return != 0)
+ status = getaddrinfo (node, service, &ai_hints, &ai_list);
+ if (status != 0)
{
ERROR ("network plugin: getaddrinfo (%s, %s) failed: %s",
(se->node == NULL) ? "(null)" : se->node,
(se->service == NULL) ? "(null)" : se->service,
- gai_strerror (ai_return));
+ gai_strerror (status));
return (-1);
}
for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
{
- int status;
+ int *tmp;
- if (se->type == SOCKENT_TYPE_SERVER) /* {{{ */
+ tmp = realloc (se->data.server.fd,
+ sizeof (*tmp) * (se->data.server.fd_num + 1));
+ if (tmp == NULL)
{
- int *tmp;
-
- tmp = realloc (se->data.server.fd,
- sizeof (*tmp) * (se->data.server.fd_num + 1));
- if (tmp == NULL)
- {
- ERROR ("network plugin: realloc failed.");
- continue;
- }
- se->data.server.fd = tmp;
- tmp = se->data.server.fd + se->data.server.fd_num;
-
- *tmp = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
- ai_ptr->ai_protocol);
- if (*tmp < 0)
- {
- char errbuf[1024];
- ERROR ("network plugin: socket(2) failed: %s",
- sstrerror (errno, errbuf,
- sizeof (errbuf)));
- continue;
- }
-
- status = network_bind_socket (*tmp, ai_ptr, se->interface);
- if (status != 0)
- {
- close (*tmp);
- *tmp = -1;
- continue;
- }
-
- se->data.server.fd_num++;
+ ERROR ("network plugin: realloc failed.");
continue;
- } /* }}} if (se->type == SOCKENT_TYPE_SERVER) */
- else /* if (se->type == SOCKENT_TYPE_CLIENT) {{{ */
- {
- se->data.client.fd = socket (ai_ptr->ai_family,
- ai_ptr->ai_socktype,
- ai_ptr->ai_protocol);
- if (se->data.client.fd < 0)
- {
- char errbuf[1024];
- ERROR ("network plugin: socket(2) failed: %s",
- sstrerror (errno, errbuf,
- sizeof (errbuf)));
- continue;
- }
-
- se->data.client.addr = malloc (sizeof (*se->data.client.addr));
- if (se->data.client.addr == NULL)
- {
- ERROR ("network plugin: malloc failed.");
- close (se->data.client.fd);
- se->data.client.fd = -1;
- continue;
- }
+ }
+ se->data.server.fd = tmp;
+ tmp = se->data.server.fd + se->data.server.fd_num;
- memset (se->data.client.addr, 0, sizeof (*se->data.client.addr));
- assert (sizeof (*se->data.client.addr) >= ai_ptr->ai_addrlen);
- memcpy (se->data.client.addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
- se->data.client.addrlen = ai_ptr->ai_addrlen;
+ *tmp = socket (ai_ptr->ai_family, ai_ptr->ai_socktype,
+ ai_ptr->ai_protocol);
+ if (*tmp < 0)
+ {
+ char errbuf[1024];
+ ERROR ("network plugin: socket(2) failed: %s",
+ sstrerror (errno, errbuf,
+ sizeof (errbuf)));
+ continue;
+ }
- network_set_ttl (se, ai_ptr);
- network_set_interface (se, ai_ptr);
+ status = network_bind_socket (*tmp, ai_ptr, se->interface);
+ if (status != 0)
+ {
+ close (*tmp);
+ *tmp = -1;
+ continue;
+ }
- /* We don't open more than one write-socket per
- * node/service pair.. */
- break;
- } /* }}} if (se->type == SOCKENT_TYPE_CLIENT) */
+ se->data.server.fd_num++;
+ continue;
} /* for (ai_list) */
freeaddrinfo (ai_list);
- /* Check if all went well. */
- if (se->type == SOCKENT_TYPE_SERVER)
- {
- if (se->data.server.fd_num <= 0)
- return (-1);
- }
- else /* if (se->type == SOCKENT_TYPE_CLIENT) */
- {
- if (se->data.client.fd < 0)
- return (-1);
- }
-
+ if (se->data.server.fd_num <= 0)
+ return (-1);
return (0);
-} /* }}} int sockent_open */
+} /* }}} int sockent_server_listen */
/* Add a sockent to the global list of sockets */
static int sockent_add (sockent_t *se) /* {{{ */
memset (&send_buffer_vl, 0, sizeof (send_buffer_vl));
} /* int network_init_buffer */
-static void networt_send_buffer_plain (const sockent_t *se, /* {{{ */
+static void networt_send_buffer_plain (sockent_t *se, /* {{{ */
const char *buffer, size_t buffer_size)
{
int status;
while (42)
{
+ status = sockent_client_connect (se);
+ if (status != 0)
+ return;
+
status = sendto (se->data.client.fd, buffer, buffer_size,
- /* flags = */ 0,
- (struct sockaddr *) se->data.client.addr,
- se->data.client.addrlen);
- if (status < 0)
+ /* flags = */ 0,
+ (struct sockaddr *) se->data.client.addr,
+ se->data.client.addrlen);
+ if (status < 0)
{
char errbuf[1024];
- if (errno == EINTR)
+
+ if ((errno == EINTR) || (errno == EAGAIN))
continue;
- ERROR ("network plugin: sendto failed: %s",
- sstrerror (errno, errbuf,
- sizeof (errbuf)));
- break;
+
+ ERROR ("network plugin: sendto failed: %s. Closing sending socket.",
+ sstrerror (errno, errbuf, sizeof (errbuf)));
+ sockent_client_disconnect (se);
+ return;
}
break;
buffer_offset += (s); \
} while (0)
-static void networt_send_buffer_signed (const sockent_t *se, /* {{{ */
+static void networt_send_buffer_signed (sockent_t *se, /* {{{ */
const char *in_buffer, size_t in_buffer_size)
{
part_signature_sha256_t ps;
network_config_ttl = tmp;
else {
WARNING ("network plugin: The `TimeToLive' must be between 1 and 255.");
- return (-1);
+ return (-1);
}
return (0);
return (-1);
}
- se = malloc (sizeof (*se));
+ se = sockent_create (SOCKENT_TYPE_SERVER);
if (se == NULL)
{
- ERROR ("network plugin: malloc failed.");
+ ERROR ("network plugin: sockent_create failed.");
return (-1);
}
- sockent_init (se, SOCKENT_TYPE_SERVER);
se->node = strdup (ci->values[0].value.string);
if (ci->values_num >= 2)
}
#endif /* HAVE_LIBGCRYPT */
- status = sockent_open (se);
+ status = sockent_init_crypto (se);
+ if (status != 0)
+ {
+ ERROR ("network plugin: network_config_add_listen: sockent_init_crypto() failed.");
+ sockent_destroy (se);
+ return (-1);
+ }
+
+ status = sockent_server_listen (se);
if (status != 0)
{
- ERROR ("network plugin: network_config_add_listen: sockent_open failed.");
+ ERROR ("network plugin: network_config_add_server: sockent_server_listen failed.");
sockent_destroy (se);
return (-1);
}
return (-1);
}
- se = malloc (sizeof (*se));
+ se = sockent_create (SOCKENT_TYPE_CLIENT);
if (se == NULL)
{
- ERROR ("network plugin: malloc failed.");
+ ERROR ("network plugin: sockent_create failed.");
return (-1);
}
- sockent_init (se, SOCKENT_TYPE_CLIENT);
se->node = strdup (ci->values[0].value.string);
if (ci->values_num >= 2)
if (strcasecmp ("Interface", child->key) == 0)
network_config_set_interface (child,
&se->interface);
+ else if (strcasecmp ("ResolveInterval", child->key) == 0)
+ cf_util_get_cdtime(child, &se->data.client.resolve_interval);
else
{
WARNING ("network plugin: Option `%s' is not allowed here.",
}
#endif /* HAVE_LIBGCRYPT */
- status = sockent_open (se);
+ status = sockent_init_crypto (se);
if (status != 0)
{
- ERROR ("network plugin: network_config_add_server: sockent_open failed.");
+ ERROR ("network plugin: network_config_add_server: sockent_init_crypto() failed.");
sockent_destroy (se);
return (-1);
}
+ /* No call to sockent_client_connect() here -- it is called from
+ * networt_send_buffer_plain(). */
+
status = sockent_add (se);
if (status != 0)
{
static int network_shutdown (void)
{
+ sockent_t *se;
+
listen_loop++;
/* Kill the listening thread */
sfree (send_buffer);
- /* TODO: Close `sending_sockets' */
+ for (se = sending_sockets; se != NULL; se = se->next)
+ sockent_client_disconnect (se);
+ sockent_destroy (sending_sockets);
plugin_unregister_config ("network");
plugin_unregister_init ("network");