X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_daemon.c;h=0e95e903a97063aef0a64358c112d60c75b250db;hp=7a7ae88696a9add76a304c792d9d6d88a601d87d;hb=eccb8f633c724cfff32a96bb036647006e2a5e56;hpb=2f0b984b0dfee0a8ab2cb1d41670f40a07ec5cdb diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 7a7ae88..0e95e90 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -175,7 +175,7 @@ struct cache_item_s { char *file; char **values; - int values_num; + size_t values_num; time_t last_flush_time; time_t last_update_stamp; #define CI_FLAGS_IN_TREE (1<<0) @@ -215,7 +215,11 @@ static uid_t daemon_uid; static listen_socket_t *listen_fds = NULL; static size_t listen_fds_num = 0; -static int do_shutdown = 0; +enum { + RUNNING, /* normal operation */ + FLUSHING, /* flushing remaining values */ + SHUTDOWN /* shutting down */ +} state = RUNNING; static pthread_t *queue_threads; static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; @@ -244,7 +248,7 @@ static size_t _config_base_dir_len = 0; static int config_write_base_only = 0; static listen_socket_t **config_listen_address_list = NULL; -static int config_listen_address_list_len = 0; +static size_t config_listen_address_list_len = 0; static uint64_t stats_queue_length = 0; static uint64_t stats_updates_received = 0; @@ -273,7 +277,7 @@ static int handle_request_help (HANDLER_PROTO); static void sig_common (const char *sig) /* {{{ */ { RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig); - do_shutdown++; + state = FLUSHING; pthread_cond_broadcast(&flush_cond); pthread_cond_broadcast(&queue_cond); } /* }}} void sig_common */ @@ -379,7 +383,13 @@ static int check_pidfile(void) } lseek(pid_fd, 0, SEEK_SET); - ftruncate(pid_fd, 0); + if (ftruncate(pid_fd, 0) == -1) + { + fprintf(stderr, + "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid); + close(pid_fd); + return -1; + } fprintf(stderr, "rrdcached: removed stale PID file (no rrdcached on pid %d)\n" @@ -642,7 +652,7 @@ static void *free_cache_item(cache_item_t *ci) /* {{{ */ remove_from_queue(ci); - for (int i=0; i < ci->values_num; i++) + for (size_t i=0; i < ci->values_num; i++) free(ci->values[i]); free (ci->values); @@ -650,6 +660,7 @@ static void *free_cache_item(cache_item_t *ci) /* {{{ */ /* in case anyone is waiting */ pthread_cond_broadcast(&ci->flushed); + pthread_cond_destroy(&ci->flushed); free (ci); @@ -732,33 +743,20 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ if (ci->flags & CI_FLAGS_IN_QUEUE) return FALSE; - if ((ci->last_flush_time <= cfd->abs_timeout) - && (ci->values_num > 0)) - { - enqueue_cache_item (ci, TAIL); - } - else if ((do_shutdown != 0) - && (ci->values_num > 0)) + if (ci->values_num > 0 + && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING)) { enqueue_cache_item (ci, TAIL); } else if (((cfd->now - ci->last_flush_time) >= config_flush_interval) && (ci->values_num <= 0)) { - char **temp; - - temp = (char **) rrd_realloc (cfd->keys, - sizeof (char *) * (cfd->keys_num + 1)); - if (temp == NULL) + assert ((char *) key == ci->file); + if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key)) { - RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed."); + RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed."); return (FALSE); } - cfd->keys = temp; - /* Make really sure this points to the _same_ place */ - assert ((char *) key == ci->file); - cfd->keys[cfd->keys_num] = (char *) key; - cfd->keys_num++; } return (FALSE); @@ -815,21 +813,22 @@ static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_lock(&cache_lock); - while (!do_shutdown) + while (state == RUNNING) { gettimeofday (&now, NULL); if ((now.tv_sec > next_flush.tv_sec) || ((now.tv_sec == next_flush.tv_sec) && ((1000 * now.tv_usec) > next_flush.tv_nsec))) { + RRDD_LOG(LOG_DEBUG, "flushing old values"); + + /* Determine the time of the next cache flush. */ + next_flush.tv_sec = now.tv_sec + config_flush_interval; + /* Flush all values that haven't been written in the last * `config_write_interval' seconds. */ flush_old_values (config_write_interval); - /* Determine the time of the next cache flush. */ - next_flush.tv_sec = - now.tv_sec + next_flush.tv_sec % config_flush_interval; - /* unlock the cache while we rotate so we don't block incoming * updates if the fsync() blocks on disk I/O */ pthread_mutex_unlock(&cache_lock); @@ -848,6 +847,8 @@ static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ if (config_flush_at_shutdown) flush_old_values (-1); /* flush everything */ + state = SHUTDOWN; + pthread_mutex_unlock(&cache_lock); return NULL; @@ -857,19 +858,18 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ { pthread_mutex_lock (&cache_lock); - while (!do_shutdown + while (state != SHUTDOWN || (cache_queue_head != NULL && config_flush_at_shutdown)) { cache_item_t *ci; char *file; char **values; - int values_num; + size_t values_num; int status; - int i; /* Now, check if there's something to store away. If not, wait until - * something comes in. if we are shutting down, do not wait around. */ - if (cache_queue_head == NULL && !do_shutdown) + * something comes in. */ + if (cache_queue_head == NULL) { status = pthread_cond_wait (&queue_cond, &cache_lock); if ((status != 0) && (status != ETIMEDOUT)) @@ -906,7 +906,7 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_unlock (&cache_lock); rrd_clear_error (); - status = rrd_update_r (file, NULL, values_num, (void *) values); + status = rrd_update_r (file, NULL, (int) values_num, (void *) values); if (status != 0) { RRDD_LOG (LOG_NOTICE, "queue_thread_main: " @@ -915,13 +915,14 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } journal_write("wrote", file); - pthread_cond_broadcast(&ci->flushed); - for (i = 0; i < values_num; i++) - free (values[i]); - - free(values); - free(file); + /* Search again in the tree. It's possible someone issued a "FORGET" + * while we were writing the update values. */ + pthread_mutex_lock(&cache_lock); + ci = (cache_item_t *) g_tree_lookup(cache_tree, file); + if (ci) + pthread_cond_broadcast(&ci->flushed); + pthread_mutex_unlock(&cache_lock); if (status == 0) { @@ -931,6 +932,9 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_unlock (&stats_lock); } + rrd_free_ptrs((void ***) &values, &values_num); + free(file); + pthread_mutex_lock (&cache_lock); } pthread_mutex_unlock (&cache_lock); @@ -1227,7 +1231,7 @@ static int handle_request_pending(HANDLER_PROTO) /* {{{ */ return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT)); } - for (int i=0; i < ci->values_num; i++) + for (size_t i=0; i < ci->values_num; i++) add_response_info(sock, "%s\n", ci->values[i]); pthread_mutex_unlock(&cache_lock); @@ -1312,6 +1316,7 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ if (ci == NULL) /* {{{ */ { struct stat statbuf; + cache_item_t *tmp; /* don't hold the lock while we setup; stat(2) might block */ pthread_mutex_unlock(&cache_lock); @@ -1359,7 +1364,20 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ pthread_cond_init(&ci->flushed, NULL); pthread_mutex_lock(&cache_lock); - g_tree_replace (cache_tree, (void *) ci->file, (void *) ci); + + /* another UPDATE might have added this entry in the meantime */ + tmp = g_tree_lookup (cache_tree, file); + if (tmp == NULL) + g_tree_replace (cache_tree, (void *) ci->file, (void *) ci); + else + { + free_cache_item (ci); + ci = tmp; + } + + /* state may have changed while we were unlocked */ + if (state == SHUTDOWN) + return -1; } /* }}} */ assert (ci != NULL); @@ -1369,7 +1387,6 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ while (buffer_size > 0) { - char **temp; char *value; time_t stamp; char *eostamp; @@ -1400,22 +1417,11 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ else ci->last_update_stamp = stamp; - temp = (char **) rrd_realloc (ci->values, - sizeof (char *) * (ci->values_num + 1)); - if (temp == NULL) + if (!rrd_add_strdup(&ci->values, &ci->values_num, value)) { - RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed."); + RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed."); continue; } - ci->values = temp; - - ci->values[ci->values_num] = strdup (value); - if (ci->values[ci->values_num] == NULL) - { - RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed."); - continue; - } - ci->values_num++; values_num++; } @@ -1445,7 +1451,6 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ */ static int handle_request_wrote (HANDLER_PROTO) /* {{{ */ { - int i; cache_item_t *ci; const char *file = buffer; @@ -1459,12 +1464,7 @@ static int handle_request_wrote (HANDLER_PROTO) /* {{{ */ } if (ci->values) - { - for (i=0; i < ci->values_num; i++) - free(ci->values[i]); - - free(ci->values); - } + rrd_free_ptrs((void ***) &ci->values, &ci->values_num); wipe_ci_values(ci, now); remove_from_queue(ci); @@ -2009,7 +2009,7 @@ static void *connection_thread_main (void *args) /* {{{ */ connection_threads_num++; pthread_mutex_unlock (&connection_threads_lock); - while (do_shutdown == 0) + while (state == RUNNING) { char *cmd; ssize_t cmd_len; @@ -2024,7 +2024,7 @@ static void *connection_thread_main (void *args) /* {{{ */ pollfd.revents = 0; status = poll (&pollfd, 1, /* timeout = */ 500); - if (do_shutdown) + if (state != RUNNING) break; else if (status == 0) /* timeout */ continue; @@ -2329,7 +2329,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ RRDD_LOG(LOG_INFO, "listening for connections"); - while (do_shutdown == 0) + while (state == RUNNING) { for (i = 0; i < pollfds_num; i++) { @@ -2339,7 +2339,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } status = poll (pollfds, pollfds_num, /* timeout = */ 1000); - if (do_shutdown) + if (state != RUNNING) break; else if (status == 0) /* timeout */ continue; @@ -2402,7 +2402,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ continue; } } /* for (pollfds_num) */ - } /* while (do_shutdown == 0) */ + } /* while (state == RUNNING) */ RRDD_LOG(LOG_INFO, "starting shutdown"); @@ -2434,13 +2434,11 @@ static int daemonize (void) /* {{{ */ /* open all the listen sockets */ if (config_listen_address_list_len > 0) { - for (int i = 0; i < config_listen_address_list_len; i++) - { + for (size_t i = 0; i < config_listen_address_list_len; i++) open_listen_socket (config_listen_address_list[i]); - free_listen_socket (config_listen_address_list[i]); - } - free(config_listen_address_list); + rrd_free_ptrs((void ***) &config_listen_address_list, + &config_listen_address_list_len); } else { @@ -2478,8 +2476,9 @@ static int daemonize (void) /* {{{ */ close (0); open ("/dev/null", O_RDWR); - dup (0); - dup (0); + if (dup(0) == -1 || dup(0) == -1){ + RRDD_LOG (LOG_ERR, "faild to run dup.\n"); + } } /* if (!stay_foreground) */ /* Change into the /tmp directory. */ @@ -2515,8 +2514,6 @@ error: static int cleanup (void) /* {{{ */ { - do_shutdown++; - pthread_cond_broadcast (&flush_cond); pthread_join (flush_thread, NULL); @@ -2564,7 +2561,6 @@ static int read_options (int argc, char **argv) /* {{{ */ case 'L': case 'l': { - listen_socket_t **temp; listen_socket_t *new; new = malloc(sizeof(listen_socket_t)); @@ -2575,20 +2571,15 @@ static int read_options (int argc, char **argv) /* {{{ */ } memset(new, 0, sizeof(listen_socket_t)); - temp = (listen_socket_t **) rrd_realloc (config_listen_address_list, - sizeof (listen_socket_t *) * (config_listen_address_list_len + 1)); - if (temp == NULL) - { - fprintf (stderr, "read_options: realloc failed.\n"); - return (2); - } - config_listen_address_list = temp; - strncpy(new->addr, optarg, sizeof(new->addr)-1); new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW; - temp[config_listen_address_list_len] = new; - config_listen_address_list_len++; + if (!rrd_add_ptr((void ***)&config_listen_address_list, + &config_listen_address_list_len, new)) + { + fprintf(stderr, "read_options: rrd_add_ptr failed.\n"); + return (2); + } } break;