X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fnetwork.c;h=7e55986759595802700bcced5e966bd49146ebd8;hb=2f0bd3ba78bdbfb6a1a5a1252573e5815ce3fb0b;hp=39069aa751a551eef139b92e9ecd467bb8e5b27a;hpb=5b5f7f4e3dd28d9e41469c331f3f9f6a09e3147b;p=collectd.git diff --git a/src/network.c b/src/network.c index 39069aa7..7e559867 100644 --- a/src/network.c +++ b/src/network.c @@ -168,20 +168,27 @@ static pthread_mutex_t receive_list_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t receive_list_cond = PTHREAD_COND_INITIALIZER; static struct pollfd *listen_sockets = NULL; -static int listen_sockets_num = 0; - -static int listen_loop = 0; -static pthread_t receive_thread_id = 0; -static pthread_t dispatch_thread_id = 0; - -static char send_buffer[BUFF_SIZE]; -static char *send_buffer_ptr; -static int send_buffer_fill; -static value_list_t send_buffer_vl = VALUE_LIST_STATIC; -static char send_buffer_type[DATA_MAX_NAME_LEN]; -static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER; - -static c_avl_tree_t *cache_tree = NULL; +static int listen_sockets_num = 0; + +/* The receive and dispatch threads will run as long as `listen_loop' is set to + * zero. */ +static int listen_loop = 0; +static int receive_thread_running = 0; +static pthread_t receive_thread_id; +static int dispatch_thread_running = 0; +static pthread_t dispatch_thread_id; + +/* Buffer in which to-be-sent network packets are constructed. */ +static char send_buffer[BUFF_SIZE]; +static char *send_buffer_ptr; +static int send_buffer_fill; +static value_list_t send_buffer_vl = VALUE_LIST_STATIC; +static pthread_mutex_t send_buffer_lock = PTHREAD_MUTEX_INITIALIZER; + +/* In this cache we store all the values we received, so we can send out only + * those values which were *not* received via the network plugin, too. This is + * used for the `Forward false' option. */ +static c_avl_tree_t *cache_tree = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; static time_t cache_flush_last = 0; static int cache_flush_interval = 1800; @@ -499,7 +506,7 @@ static int parse_part_values (void **ret_buffer, int *ret_buffer_len, exp_size = 3 * sizeof (uint16_t) + pkg_numval * (sizeof (uint8_t) + sizeof (value_t)); - if (buffer_len < exp_size) + if ((buffer_len < 0) || ((size_t) buffer_len < exp_size)) { WARNING ("network plugin: parse_part_values: " "Packet too short: " @@ -563,7 +570,7 @@ static int parse_part_number (void **ret_buffer, int *ret_buffer_len, uint16_t pkg_length; uint16_t pkg_type; - if (buffer_len < exp_size) + if ((buffer_len < 0) || ((size_t) buffer_len < exp_size)) { WARNING ("network plugin: parse_part_number: " "Packet too short: " @@ -603,7 +610,7 @@ static int parse_part_string (void **ret_buffer, int *ret_buffer_len, uint16_t pkg_length; uint16_t pkg_type; - if (buffer_len < header_size) + if ((buffer_len < 0) || ((size_t) buffer_len < header_size)) { WARNING ("network plugin: parse_part_string: " "Packet too short: " @@ -645,7 +652,8 @@ static int parse_part_string (void **ret_buffer, int *ret_buffer_len, /* Check that the package data fits into the output buffer. * The previous if-statement ensures that: * `pkg_length > header_size' */ - if ((pkg_length - header_size) > output_len) + if ((output_len < 0) + || ((size_t) output_len < ((size_t) pkg_length - header_size))) { WARNING ("network plugin: parse_part_string: " "Output buffer too small."); @@ -1241,7 +1249,7 @@ static int network_add_sending_socket (const char *node, const char *service) return (0); } /* int network_get_listen_socket */ -static void *dispatch_thread (void *arg) +static void *dispatch_thread (void __attribute__((unused)) *arg) { while (42) { @@ -1280,6 +1288,9 @@ static int network_receive (void) int i; int status; + receive_list_entry_t *private_list_head; + receive_list_entry_t *private_list_tail; + if (listen_sockets_num == 0) network_add_listen_socket (NULL, NULL); @@ -1289,6 +1300,9 @@ static int network_receive (void) return (-1); } + private_list_head = NULL; + private_list_tail = NULL; + while (listen_loop == 0) { status = poll (listen_sockets, listen_sockets_num, -1); @@ -1323,13 +1337,18 @@ static int network_receive (void) return (-1); } + /* TODO: Possible performance enhancement: Do not free + * these entries in the dispatch thread but put them in + * another list, so we don't have to allocate more and + * more of these structures. */ ent = malloc (sizeof (receive_list_entry_t)); if (ent == NULL) { ERROR ("network plugin: malloc failed."); return (-1); } - memset (ent, '\0', sizeof (receive_list_entry_t)); + memset (ent, 0, sizeof (receive_list_entry_t)); + ent->next = NULL; /* Hopefully this be optimized out by the compiler. It * might help prevent stupid bugs in the future though. @@ -1339,26 +1358,53 @@ static int network_receive (void) memcpy (ent->data, buffer, buffer_len); ent->data_len = buffer_len; - pthread_mutex_lock (&receive_list_lock); - if (receive_list_head == NULL) - { - receive_list_head = ent; - receive_list_tail = ent; - } + if (private_list_head == NULL) + private_list_head = ent; else + private_list_tail->next = ent; + private_list_tail = ent; + + /* Do not block here. Blocking here has led to + * insufficient performance in the past. */ + if (pthread_mutex_trylock (&receive_list_lock) == 0) { - receive_list_tail->next = ent; - receive_list_tail = ent; + if (receive_list_head == NULL) + receive_list_head = private_list_head; + else + receive_list_tail->next = private_list_head; + receive_list_tail = private_list_tail; + + private_list_head = NULL; + private_list_tail = NULL; + + pthread_cond_signal (&receive_list_cond); + pthread_mutex_unlock (&receive_list_lock); } - pthread_cond_signal (&receive_list_cond); - pthread_mutex_unlock (&receive_list_lock); } /* for (listen_sockets) */ } /* while (listen_loop == 0) */ + /* Make sure everything is dispatched before exiting. */ + if (private_list_head != NULL) + { + pthread_mutex_lock (&receive_list_lock); + + if (receive_list_head == NULL) + receive_list_head = private_list_head; + else + receive_list_tail->next = private_list_head; + receive_list_tail = private_list_tail; + + private_list_head = NULL; + private_list_tail = NULL; + + pthread_cond_signal (&receive_list_cond); + pthread_mutex_unlock (&receive_list_lock); + } + return (0); } -static void *receive_thread (void *arg) +static void *receive_thread (void __attribute__((unused)) *arg) { return (network_receive () ? (void *) 1 : (void *) 0); } /* void *receive_thread */ @@ -1393,7 +1439,7 @@ static void network_send_buffer (const char *buffer, int buffer_len) } /* void network_send_buffer */ static int add_to_buffer (char *buffer, int buffer_size, - value_list_t *vl_def, char *type_def, + value_list_t *vl_def, const data_set_t *ds, const value_list_t *vl) { char *buffer_orig = buffer; @@ -1439,12 +1485,12 @@ static int add_to_buffer (char *buffer, int buffer_size, sstrncpy (vl_def->plugin_instance, vl->plugin_instance, sizeof (vl_def->plugin_instance)); } - if (strcmp (type_def, vl->type) != 0) + if (strcmp (vl_def->type, vl->type) != 0) { if (write_part_string (&buffer, &buffer_size, TYPE_TYPE, vl->type, strlen (vl->type)) != 0) return (-1); - sstrncpy (type_def, vl->type, sizeof (type_def)); + sstrncpy (vl_def->type, ds->type, sizeof (vl_def->type)); } if (strcmp (vl_def->type_instance, vl->type_instance) != 0) @@ -1470,11 +1516,11 @@ static void flush_buffer (void) network_send_buffer (send_buffer, send_buffer_fill); send_buffer_ptr = send_buffer; send_buffer_fill = 0; - memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl)); - memset (send_buffer_type, '\0', sizeof (send_buffer_type)); + memset (&send_buffer_vl, 0, sizeof (send_buffer_vl)); } -static int network_write (const data_set_t *ds, const value_list_t *vl) +static int network_write (const data_set_t *ds, const value_list_t *vl, + user_data_t __attribute__((unused)) *user_data) { int status; @@ -1490,7 +1536,7 @@ static int network_write (const data_set_t *ds, const value_list_t *vl) status = add_to_buffer (send_buffer_ptr, sizeof (send_buffer) - send_buffer_fill, - &send_buffer_vl, send_buffer_type, + &send_buffer_vl, ds, vl); if (status >= 0) { @@ -1504,7 +1550,7 @@ static int network_write (const data_set_t *ds, const value_list_t *vl) status = add_to_buffer (send_buffer_ptr, sizeof (send_buffer) - send_buffer_fill, - &send_buffer_vl, send_buffer_type, + &send_buffer_vl, ds, vl); if (status >= 0) @@ -1548,7 +1594,10 @@ static int network_config (const char *key, const char *val) fields_num = strsplit (val_cpy, fields, 3); if ((fields_num != 1) && (fields_num != 2)) + { + sfree (val_cpy); return (1); + } else if (fields_num == 2) { if ((service = strchr (fields[1], '.')) != NULL) @@ -1561,6 +1610,8 @@ static int network_config (const char *key, const char *val) network_add_listen_socket (node, service); else network_add_sending_socket (node, service); + + sfree (val_cpy); } else if (strcasecmp ("TimeToLive", key) == 0) { @@ -1593,7 +1644,8 @@ static int network_config (const char *key, const char *val) return (0); } /* int network_config */ -static int network_notification (const notification_t *n) +static int network_notification (const notification_t *n, + user_data_t __attribute__((unused)) *user_data) { char buffer[BUFF_SIZE]; char *buffer_ptr = buffer; @@ -1669,16 +1721,23 @@ static int network_shutdown (void) listen_loop++; /* Kill the listening thread */ - if (receive_thread_id != (pthread_t) 0) + if (receive_thread_running != 0) { + INFO ("network plugin: Stopping receive thread."); pthread_kill (receive_thread_id, SIGTERM); pthread_join (receive_thread_id, NULL /* no return value */); - receive_thread_id = (pthread_t) 0; + memset (&receive_thread_id, 0, sizeof (receive_thread_id)); + receive_thread_running = 0; } /* Shutdown the dispatching thread */ - if (dispatch_thread_id != (pthread_t) 0) + if (dispatch_thread_running != 0) + { + INFO ("network plugin: Stopping dispatch thread."); + pthread_join (dispatch_thread_id, /* ret = */ NULL); pthread_cond_broadcast (&receive_list_cond); + dispatch_thread_running = 0; + } if (send_buffer_fill > 0) flush_buffer (); @@ -1721,8 +1780,7 @@ static int network_init (void) send_buffer_ptr = send_buffer; send_buffer_fill = 0; - memset (&send_buffer_vl, '\0', sizeof (send_buffer_vl)); - memset (send_buffer_type, '\0', sizeof (send_buffer_type)); + memset (&send_buffer_vl, 0, sizeof (send_buffer_vl)); cache_tree = c_avl_create ((int (*) (const void *, const void *)) strcmp); cache_flush_last = time (NULL); @@ -1730,14 +1788,21 @@ static int network_init (void) /* setup socket(s) and so on */ if (sending_sockets != NULL) { - plugin_register_write ("network", network_write); - plugin_register_notification ("network", network_notification); + plugin_register_write ("network", network_write, + /* user_data = */ NULL); + plugin_register_notification ("network", network_notification, + /* user_data = */ NULL); } - if ((listen_sockets_num != 0) && (receive_thread_id == 0)) + /* If no threads need to be started, return here. */ + if ((listen_sockets_num == 0) + || ((dispatch_thread_running != 0) + && (receive_thread_running != 0))) + return (0); + + if (dispatch_thread_running == 0) { int status; - status = pthread_create (&dispatch_thread_id, NULL /* no attributes */, dispatch_thread, @@ -1749,7 +1814,15 @@ static int network_init (void) sstrerror (errno, errbuf, sizeof (errbuf))); } + else + { + dispatch_thread_running = 1; + } + } + if (receive_thread_running == 0) + { + int status; status = pthread_create (&receive_thread_id, NULL /* no attributes */, receive_thread, @@ -1761,7 +1834,12 @@ static int network_init (void) sstrerror (errno, errbuf, sizeof (errbuf))); } + else + { + receive_thread_running = 1; + } } + return (0); } /* int network_init */ @@ -1772,7 +1850,9 @@ static int network_init (void) * just send the buffer if `flush' is called - if the requested value was in * there, good. If not, well, then there is nothing to flush.. -octo */ -static int network_flush (int timeout, const char *identifier) +static int network_flush (int timeout, + const char __attribute__((unused)) *identifier, + user_data_t __attribute__((unused)) *user_data) { pthread_mutex_lock (&send_buffer_lock); @@ -1792,5 +1872,6 @@ void module_register (void) plugin_register_config ("network", network_config, config_keys, config_keys_num); plugin_register_init ("network", network_init); - plugin_register_flush ("network", network_flush); + plugin_register_flush ("network", network_flush, + /* user_data = */ NULL); } /* void module_register */