X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fnetwork.c;h=e58d1dc589f3530a1c4790103acaf95e7789b20d;hb=e4fe9b4d69c69d922cad1272104fdb8de5c24faf;hp=98cb8eba100a8ccf19bb89d9dc39ce81d3f651e9;hpb=77a69360da55e89f2ede7b5c8942703221bfde9a;p=collectd.git diff --git a/src/network.c b/src/network.c index 98cb8eba..e58d1dc5 100644 --- a/src/network.c +++ b/src/network.c @@ -22,6 +22,7 @@ * Aman Gupta **/ +#define _DEFAULT_SOURCE #define _BSD_SOURCE /* For struct ip_mreq */ #include "collectd.h" @@ -76,7 +77,9 @@ /* Re enable deprecation warnings */ # pragma GCC diagnostic warning "-Wdeprecated-declarations" # endif +# if GCRYPT_VERSION_NUMBER < 0x010600 GCRY_THREAD_OPTION_PTHREAD_IMPL; +# endif #endif #ifndef IPV6_ADD_MEMBERSHIP @@ -117,6 +120,8 @@ struct sockent_client gcry_cipher_hd_t cypher; unsigned char password_hash[32]; #endif + cdtime_t next_resolve_reconnect; + cdtime_t resolve_interval; }; struct sockent_server @@ -305,6 +310,7 @@ static pthread_t dispatch_thread_id; static char *send_buffer; static char *send_buffer_ptr; static int send_buffer_fill; +static cdtime_t send_buffer_last_update; static value_list_t send_buffer_vl = VALUE_LIST_STATIC; static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER; @@ -395,7 +401,7 @@ static _Bool check_send_notify_okay (const notification_t *n) /* {{{ */ { c_complain_once (LOG_ERR, &complain_forwarding, "network plugin: A notification has been received via the network " - "forwarding if enabled. Forwarding of notifications is currently " + "and forwarding is enabled. Forwarding of notifications is currently " "not supported, because there is not loop-deteciton available. " "Please contact the collectd mailing list if you need this " "feature."); @@ -501,8 +507,17 @@ static void network_init_gcrypt (void) /* {{{ */ if (gcry_control (GCRYCTL_ANY_INITIALIZATION_P)) return; - gcry_check_version (NULL); /* before calling any other functions */ + /* http://www.gnupg.org/documentation/manuals/gcrypt/Multi_002dThreading.html + * To ensure thread-safety, it's important to set GCRYCTL_SET_THREAD_CBS + * *before* initalizing Libgcrypt with gcry_check_version(), which itself must + * be called before any other gcry_* function. GCRYCTL_ANY_INITIALIZATION_P + * above doesn't count, as it doesn't implicitly initalize Libgcrypt. + * + * tl;dr: keep all these gry_* statements in this exact order please. */ +# if GCRYPT_VERSION_NUMBER < 0x010600 gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread); +# endif + gcry_check_version (NULL); gcry_control (GCRYCTL_INIT_SECMEM, 32768); gcry_control (GCRYCTL_INITIALIZATION_FINISHED); } /* }}} void network_init_gcrypt */ @@ -910,15 +925,19 @@ static int parse_part_number (void **ret_buffer, size_t *ret_buffer_len, } /* int parse_part_number */ static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len, - char *output, int output_len) + char *output, size_t const output_len) { char *buffer = *ret_buffer; size_t buffer_len = *ret_buffer_len; uint16_t tmp16; - size_t header_size = 2 * sizeof (uint16_t); + size_t const header_size = 2 * sizeof (uint16_t); uint16_t pkg_length; + size_t payload_size; + + if (output_len <= 0) + return (EINVAL); if (buffer_len < header_size) { @@ -937,6 +956,7 @@ static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len, memcpy ((void *) &tmp16, buffer, sizeof (tmp16)); buffer += sizeof (tmp16); pkg_length = ntohs (tmp16); + payload_size = ((size_t) pkg_length) - header_size; /* Check that packet fits in the input buffer */ if (pkg_length > buffer_len) @@ -962,22 +982,24 @@ static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len, /* Check that the package data fits into the output buffer. * The previous if-statement ensures that: * `pkg_length > header_size' */ - if ((output_len < 0) - || ((size_t) output_len < ((size_t) pkg_length - header_size))) + if (output_len < payload_size) { WARNING ("network plugin: parse_part_string: " - "Output buffer too small."); + "Buffer too small: " + "Output buffer holds %zu bytes, " + "which is too small to hold the received " + "%zu byte string.", + output_len, payload_size); return (-1); } /* All sanity checks successfull, let's copy the data over */ - output_len = pkg_length - header_size; - memcpy ((void *) output, (void *) buffer, output_len); - buffer += output_len; + memcpy ((void *) output, (void *) buffer, payload_size); + buffer += payload_size; /* For some very weird reason '\0' doesn't do the trick on SPARC in * this statement. */ - if (output[output_len - 1] != 0) + if (output[payload_size - 1] != 0) { WARNING ("network plugin: parse_part_string: " "Received string does not end " @@ -1985,14 +2007,19 @@ static int network_bind_socket (int fd, const struct addrinfo *ai, const int int /* Initialize a sockent structure. `type' must be either `SOCKENT_TYPE_CLIENT' * or `SOCKENT_TYPE_SERVER' */ -static int sockent_init (sockent_t *se, int type) /* {{{ */ +static sockent_t *sockent_create (int type) /* {{{ */ { - if (se == NULL) - return (-1); + sockent_t *se; + + if ((type != SOCKENT_TYPE_CLIENT) && (type != SOCKENT_TYPE_SERVER)) + return (NULL); + se = malloc (sizeof (*se)); + if (se == NULL) + return (NULL); memset (se, 0, sizeof (*se)); - se->type = SOCKENT_TYPE_CLIENT; + se->type = type; se->node = NULL; se->service = NULL; se->interface = 0; @@ -2000,7 +2027,6 @@ static int sockent_init (sockent_t *se, int type) /* {{{ */ if (type == SOCKENT_TYPE_SERVER) { - se->type = SOCKENT_TYPE_SERVER; se->data.server.fd = NULL; #if HAVE_LIBGCRYPT se->data.server.security_level = SECURITY_LEVEL_NONE; @@ -2013,6 +2039,8 @@ static int sockent_init (sockent_t *se, int type) /* {{{ */ { se->data.client.fd = -1; se->data.client.addr = NULL; + se->data.client.resolve_interval = 0; + se->data.client.next_resolve_reconnect = 0; #if HAVE_LIBGCRYPT se->data.client.security_level = SECURITY_LEVEL_NONE; se->data.client.username = NULL; @@ -2021,23 +2049,11 @@ static int sockent_init (sockent_t *se, int type) /* {{{ */ #endif } - return (0); -} /* }}} int sockent_init */ + return (se); +} /* }}} sockent_t *sockent_create */ -/* Open the file descriptors for a initialized sockent structure. */ -static int sockent_open (sockent_t *se) /* {{{ */ +static int sockent_init_crypto (sockent_t *se) /* {{{ */ { - struct addrinfo ai_hints; - struct addrinfo *ai_list, *ai_ptr; - int ai_return; - - const char *node; - const char *service; - - if (se == NULL) - return (-1); - - /* Set up the security structures. */ #if HAVE_LIBGCRYPT /* {{{ */ if (se->type == SOCKENT_TYPE_CLIENT) { @@ -2088,13 +2104,150 @@ static int sockent_open (sockent_t *se) /* {{{ */ } #endif /* }}} HAVE_LIBGCRYPT */ + return (0); +} /* }}} int sockent_init_crypto */ + +static int sockent_client_disconnect (sockent_t *se) /* {{{ */ +{ + struct sockent_client *client; + + if ((se == NULL) || (se->type != SOCKENT_TYPE_CLIENT)) + return (EINVAL); + + client = &se->data.client; + if (client->fd >= 0) /* connected */ + { + close (client->fd); + client->fd = -1; + } + + sfree (client->addr); + client->addrlen = 0; + + return (0); +} /* }}} int sockent_client_disconnect */ + +static int sockent_client_connect (sockent_t *se) /* {{{ */ +{ + static c_complain_t complaint = C_COMPLAIN_INIT_STATIC; + + struct sockent_client *client; + struct addrinfo ai_hints; + struct addrinfo *ai_list = NULL, *ai_ptr; + int status; + _Bool reconnect = 0; + cdtime_t now; + + if ((se == NULL) || (se->type != SOCKENT_TYPE_CLIENT)) + return (EINVAL); + + client = &se->data.client; + + now = cdtime (); + if (client->resolve_interval != 0 && client->next_resolve_reconnect < now) { + DEBUG("network plugin: Reconnecting socket, resolve_interval = %lf, next_resolve_reconnect = %lf", + CDTIME_T_TO_DOUBLE(client->resolve_interval), CDTIME_T_TO_DOUBLE(client->next_resolve_reconnect)); + reconnect = 1; + } + + if (client->fd >= 0 && !reconnect) /* already connected and not stale*/ + return (0); + + memset (&ai_hints, 0, sizeof (ai_hints)); +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_DGRAM; + ai_hints.ai_protocol = IPPROTO_UDP; + + status = getaddrinfo (se->node, + (se->service != NULL) ? se->service : NET_DEFAULT_PORT, + &ai_hints, &ai_list); + if (status != 0) + { + c_complain (LOG_ERR, &complaint, + "network plugin: getaddrinfo (%s, %s) failed: %s", + (se->node == NULL) ? "(null)" : se->node, + (se->service == NULL) ? "(null)" : se->service, + gai_strerror (status)); + return (-1); + } + else + { + c_release (LOG_NOTICE, &complaint, + "network plugin: Successfully resolved \"%s\".", + se->node); + } + + for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + if (client->fd >= 0) /* when we reconnect */ + sockent_client_disconnect(se); + + client->fd = socket (ai_ptr->ai_family, + ai_ptr->ai_socktype, + ai_ptr->ai_protocol); + if (client->fd < 0) + { + char errbuf[1024]; + ERROR ("network plugin: socket(2) failed: %s", + sstrerror (errno, errbuf, + sizeof (errbuf))); + continue; + } + + client->addr = malloc (sizeof (*client->addr)); + if (client->addr == NULL) + { + ERROR ("network plugin: malloc failed."); + close (client->fd); + client->fd = -1; + continue; + } + + memset (client->addr, 0, sizeof (*client->addr)); + assert (sizeof (*client->addr) >= ai_ptr->ai_addrlen); + memcpy (client->addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + client->addrlen = ai_ptr->ai_addrlen; + + network_set_ttl (se, ai_ptr); + network_set_interface (se, ai_ptr); + + /* We don't open more than one write-socket per + * node/service pair.. */ + break; + } + + freeaddrinfo (ai_list); + if (client->fd < 0) + return (-1); + + if (client->resolve_interval > 0) + client->next_resolve_reconnect = now + client->resolve_interval; + return (0); +} /* }}} int sockent_client_connect */ + +/* Open the file descriptors for a initialized sockent structure. */ +static int sockent_server_listen (sockent_t *se) /* {{{ */ +{ + struct addrinfo ai_hints; + struct addrinfo *ai_list, *ai_ptr; + int status; + + const char *node; + const char *service; + + if (se == NULL) + return (-1); + node = se->node; service = se->service; if (service == NULL) service = NET_DEFAULT_PORT; - DEBUG ("network plugin: sockent_open: node = %s; service = %s;", + DEBUG ("network plugin: sockent_server_listen: node = %s; service = %s;", node, service); memset (&ai_hints, 0, sizeof (ai_hints)); @@ -2109,109 +2262,59 @@ static int sockent_open (sockent_t *se) /* {{{ */ ai_hints.ai_socktype = SOCK_DGRAM; ai_hints.ai_protocol = IPPROTO_UDP; - ai_return = getaddrinfo (node, service, &ai_hints, &ai_list); - if (ai_return != 0) + status = getaddrinfo (node, service, &ai_hints, &ai_list); + if (status != 0) { ERROR ("network plugin: getaddrinfo (%s, %s) failed: %s", (se->node == NULL) ? "(null)" : se->node, (se->service == NULL) ? "(null)" : se->service, - gai_strerror (ai_return)); + gai_strerror (status)); return (-1); } for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) { - int status; + int *tmp; - if (se->type == SOCKENT_TYPE_SERVER) /* {{{ */ + tmp = realloc (se->data.server.fd, + sizeof (*tmp) * (se->data.server.fd_num + 1)); + if (tmp == NULL) { - int *tmp; - - tmp = realloc (se->data.server.fd, - sizeof (*tmp) * (se->data.server.fd_num + 1)); - if (tmp == NULL) - { - ERROR ("network plugin: realloc failed."); - continue; - } - se->data.server.fd = tmp; - tmp = se->data.server.fd + se->data.server.fd_num; - - *tmp = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, - ai_ptr->ai_protocol); - if (*tmp < 0) - { - char errbuf[1024]; - ERROR ("network plugin: socket(2) failed: %s", - sstrerror (errno, errbuf, - sizeof (errbuf))); - continue; - } - - status = network_bind_socket (*tmp, ai_ptr, se->interface); - if (status != 0) - { - close (*tmp); - *tmp = -1; - continue; - } - - se->data.server.fd_num++; + ERROR ("network plugin: realloc failed."); continue; - } /* }}} if (se->type == SOCKENT_TYPE_SERVER) */ - else /* if (se->type == SOCKENT_TYPE_CLIENT) {{{ */ - { - se->data.client.fd = socket (ai_ptr->ai_family, - ai_ptr->ai_socktype, - ai_ptr->ai_protocol); - if (se->data.client.fd < 0) - { - char errbuf[1024]; - ERROR ("network plugin: socket(2) failed: %s", - sstrerror (errno, errbuf, - sizeof (errbuf))); - continue; - } - - se->data.client.addr = malloc (sizeof (*se->data.client.addr)); - if (se->data.client.addr == NULL) - { - ERROR ("network plugin: malloc failed."); - close (se->data.client.fd); - se->data.client.fd = -1; - continue; - } + } + se->data.server.fd = tmp; + tmp = se->data.server.fd + se->data.server.fd_num; - memset (se->data.client.addr, 0, sizeof (*se->data.client.addr)); - assert (sizeof (*se->data.client.addr) >= ai_ptr->ai_addrlen); - memcpy (se->data.client.addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen); - se->data.client.addrlen = ai_ptr->ai_addrlen; + *tmp = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, + ai_ptr->ai_protocol); + if (*tmp < 0) + { + char errbuf[1024]; + ERROR ("network plugin: socket(2) failed: %s", + sstrerror (errno, errbuf, + sizeof (errbuf))); + continue; + } - network_set_ttl (se, ai_ptr); - network_set_interface (se, ai_ptr); + status = network_bind_socket (*tmp, ai_ptr, se->interface); + if (status != 0) + { + close (*tmp); + *tmp = -1; + continue; + } - /* We don't open more than one write-socket per - * node/service pair.. */ - break; - } /* }}} if (se->type == SOCKENT_TYPE_CLIENT) */ + se->data.server.fd_num++; + continue; } /* for (ai_list) */ freeaddrinfo (ai_list); - /* Check if all went well. */ - if (se->type == SOCKENT_TYPE_SERVER) - { - if (se->data.server.fd_num <= 0) - return (-1); - } - else /* if (se->type == SOCKENT_TYPE_CLIENT) */ - { - if (se->data.client.fd < 0) - return (-1); - } - + if (se->data.server.fd_num <= 0) + return (-1); return (0); -} /* }}} int sockent_open */ +} /* }}} int sockent_server_listen */ /* Add a sockent to the global list of sockets */ static int sockent_add (sockent_t *se) /* {{{ */ @@ -2475,30 +2578,37 @@ static void network_init_buffer (void) memset (send_buffer, 0, network_config_packet_size); send_buffer_ptr = send_buffer; send_buffer_fill = 0; + send_buffer_last_update = 0; memset (&send_buffer_vl, 0, sizeof (send_buffer_vl)); } /* int network_init_buffer */ -static void networt_send_buffer_plain (const sockent_t *se, /* {{{ */ +static void networt_send_buffer_plain (sockent_t *se, /* {{{ */ const char *buffer, size_t buffer_size) { int status; while (42) { + status = sockent_client_connect (se); + if (status != 0) + return; + status = sendto (se->data.client.fd, buffer, buffer_size, - /* flags = */ 0, - (struct sockaddr *) se->data.client.addr, - se->data.client.addrlen); - if (status < 0) + /* flags = */ 0, + (struct sockaddr *) se->data.client.addr, + se->data.client.addrlen); + if (status < 0) { char errbuf[1024]; - if (errno == EINTR) + + if ((errno == EINTR) || (errno == EAGAIN)) continue; - ERROR ("network plugin: sendto failed: %s", - sstrerror (errno, errbuf, - sizeof (errbuf))); - break; + + ERROR ("network plugin: sendto failed: %s. Closing sending socket.", + sstrerror (errno, errbuf, sizeof (errbuf))); + sockent_client_disconnect (se); + return; } break; @@ -2511,7 +2621,7 @@ static void networt_send_buffer_plain (const sockent_t *se, /* {{{ */ buffer_offset += (s); \ } while (0) -static void networt_send_buffer_signed (const sockent_t *se, /* {{{ */ +static void networt_send_buffer_signed (sockent_t *se, /* {{{ */ const char *in_buffer, size_t in_buffer_size) { part_signature_sha256_t ps; @@ -2808,6 +2918,7 @@ static int network_write (const data_set_t *ds, const value_list_t *vl, /* status == bytes added to the buffer */ send_buffer_fill += status; send_buffer_ptr += status; + send_buffer_last_update = cdtime(); stats_values_sent++; } @@ -2897,6 +3008,10 @@ static int network_config_set_ttl (const oconfig_item_t *ci) /* {{{ */ tmp = (int) ci->values[0].value.number; if ((tmp > 0) && (tmp <= 255)) network_config_ttl = tmp; + else { + WARNING ("network plugin: The `TimeToLive' must be between 1 and 255."); + return (-1); + } return (0); } /* }}} int network_config_set_ttl */ @@ -3007,13 +3122,12 @@ static int network_config_add_listen (const oconfig_item_t *ci) /* {{{ */ return (-1); } - se = malloc (sizeof (*se)); + se = sockent_create (SOCKENT_TYPE_SERVER); if (se == NULL) { - ERROR ("network plugin: malloc failed."); + ERROR ("network plugin: sockent_create failed."); return (-1); } - sockent_init (se, SOCKENT_TYPE_SERVER); se->node = strdup (ci->values[0].value.string); if (ci->values_num >= 2) @@ -3053,10 +3167,18 @@ static int network_config_add_listen (const oconfig_item_t *ci) /* {{{ */ } #endif /* HAVE_LIBGCRYPT */ - status = sockent_open (se); + status = sockent_init_crypto (se); if (status != 0) { - ERROR ("network plugin: network_config_add_listen: sockent_open failed."); + ERROR ("network plugin: network_config_add_listen: sockent_init_crypto() failed."); + sockent_destroy (se); + return (-1); + } + + status = sockent_server_listen (se); + if (status != 0) + { + ERROR ("network plugin: network_config_add_server: sockent_server_listen failed."); sockent_destroy (se); return (-1); } @@ -3087,13 +3209,12 @@ static int network_config_add_server (const oconfig_item_t *ci) /* {{{ */ return (-1); } - se = malloc (sizeof (*se)); + se = sockent_create (SOCKENT_TYPE_CLIENT); if (se == NULL) { - ERROR ("network plugin: malloc failed."); + ERROR ("network plugin: sockent_create failed."); return (-1); } - sockent_init (se, SOCKENT_TYPE_CLIENT); se->node = strdup (ci->values[0].value.string); if (ci->values_num >= 2) @@ -3116,6 +3237,8 @@ static int network_config_add_server (const oconfig_item_t *ci) /* {{{ */ if (strcasecmp ("Interface", child->key) == 0) network_config_set_interface (child, &se->interface); + else if (strcasecmp ("ResolveInterval", child->key) == 0) + cf_util_get_cdtime(child, &se->data.client.resolve_interval); else { WARNING ("network plugin: Option `%s' is not allowed here.", @@ -3136,14 +3259,17 @@ static int network_config_add_server (const oconfig_item_t *ci) /* {{{ */ } #endif /* HAVE_LIBGCRYPT */ - status = sockent_open (se); + status = sockent_init_crypto (se); if (status != 0) { - ERROR ("network plugin: network_config_add_server: sockent_open failed."); + ERROR ("network plugin: network_config_add_server: sockent_init_crypto() failed."); sockent_destroy (se); return (-1); } + /* No call to sockent_client_connect() here -- it is called from + * networt_send_buffer_plain(). */ + status = sockent_add (se); if (status != 0) { @@ -3159,6 +3285,14 @@ static int network_config (oconfig_item_t *ci) /* {{{ */ { int i; + /* The options need to be applied first */ + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + if (strcasecmp ("TimeToLive", child->key) == 0) + network_config_set_ttl (child); + } + for (i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; @@ -3167,8 +3301,9 @@ static int network_config (oconfig_item_t *ci) /* {{{ */ network_config_add_listen (child); else if (strcasecmp ("Server", child->key) == 0) network_config_add_server (child); - else if (strcasecmp ("TimeToLive", child->key) == 0) - network_config_set_ttl (child); + else if (strcasecmp ("TimeToLive", child->key) == 0) { + /* Handled earlier */ + } else if (strcasecmp ("MaxPacketSize", child->key) == 0) network_config_set_buffer_size (child); else if (strcasecmp ("Forward", child->key) == 0) @@ -3261,6 +3396,8 @@ static int network_notification (const notification_t *n, static int network_shutdown (void) { + sockent_t *se; + listen_loop++; /* Kill the listening thread */ @@ -3291,7 +3428,9 @@ static int network_shutdown (void) sfree (send_buffer); - /* TODO: Close `sending_sockets' */ + for (se = sending_sockets; se != NULL; se = se->next) + sockent_client_disconnect (se); + sockent_destroy (sending_sockets); plugin_unregister_config ("network"); plugin_unregister_init ("network"); @@ -3469,15 +3608,25 @@ static int network_init (void) * just send the buffer if `flush' is called - if the requested value was in * there, good. If not, well, then there is nothing to flush.. -octo */ -static int network_flush (__attribute__((unused)) cdtime_t timeout, +static int network_flush (cdtime_t timeout, __attribute__((unused)) const char *identifier, __attribute__((unused)) user_data_t *user_data) { pthread_mutex_lock (&send_buffer_lock); if (send_buffer_fill > 0) - flush_buffer (); - + { + if (timeout > 0) + { + cdtime_t now = cdtime (); + if ((send_buffer_last_update + timeout) > now) + { + pthread_mutex_unlock (&send_buffer_lock); + return (0); + } + } + flush_buffer (); + } pthread_mutex_unlock (&send_buffer_lock); return (0);