X-Git-Url: https://git.octo.it/?p=rrdtool.git;a=blobdiff_plain;f=src%2Frrd_daemon.c;h=0e95e903a97063aef0a64358c112d60c75b250db;hp=8b1fc9e79059b05aed3c4581decfd33cfad17405;hb=eccb8f633c724cfff32a96bb036647006e2a5e56;hpb=e00381f4655492dfcd8bb8c12065eb62229b20c1 diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 8b1fc9e..0e95e90 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -215,7 +215,11 @@ static uid_t daemon_uid; static listen_socket_t *listen_fds = NULL; static size_t listen_fds_num = 0; -static int do_shutdown = 0; +enum { + RUNNING, /* normal operation */ + FLUSHING, /* flushing remaining values */ + SHUTDOWN /* shutting down */ +} state = RUNNING; static pthread_t *queue_threads; static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; @@ -273,7 +277,7 @@ static int handle_request_help (HANDLER_PROTO); static void sig_common (const char *sig) /* {{{ */ { RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig); - do_shutdown++; + state = FLUSHING; pthread_cond_broadcast(&flush_cond); pthread_cond_broadcast(&queue_cond); } /* }}} void sig_common */ @@ -739,13 +743,8 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ if (ci->flags & CI_FLAGS_IN_QUEUE) return FALSE; - if ((ci->last_flush_time <= cfd->abs_timeout) - && (ci->values_num > 0)) - { - enqueue_cache_item (ci, TAIL); - } - else if ((do_shutdown != 0) - && (ci->values_num > 0)) + if (ci->values_num > 0 + && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING)) { enqueue_cache_item (ci, TAIL); } @@ -814,21 +813,22 @@ static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_lock(&cache_lock); - while (!do_shutdown) + while (state == RUNNING) { gettimeofday (&now, NULL); if ((now.tv_sec > next_flush.tv_sec) || ((now.tv_sec == next_flush.tv_sec) && ((1000 * now.tv_usec) > next_flush.tv_nsec))) { + RRDD_LOG(LOG_DEBUG, "flushing old values"); + + /* Determine the time of the next cache flush. */ + next_flush.tv_sec = now.tv_sec + config_flush_interval; + /* Flush all values that haven't been written in the last * `config_write_interval' seconds. */ flush_old_values (config_write_interval); - /* Determine the time of the next cache flush. */ - next_flush.tv_sec = - now.tv_sec + next_flush.tv_sec % config_flush_interval; - /* unlock the cache while we rotate so we don't block incoming * updates if the fsync() blocks on disk I/O */ pthread_mutex_unlock(&cache_lock); @@ -847,6 +847,8 @@ static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ if (config_flush_at_shutdown) flush_old_values (-1); /* flush everything */ + state = SHUTDOWN; + pthread_mutex_unlock(&cache_lock); return NULL; @@ -856,7 +858,7 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ { pthread_mutex_lock (&cache_lock); - while (!do_shutdown + while (state != SHUTDOWN || (cache_queue_head != NULL && config_flush_at_shutdown)) { cache_item_t *ci; @@ -866,8 +868,8 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ int status; /* Now, check if there's something to store away. If not, wait until - * something comes in. if we are shutting down, do not wait around. */ - if (cache_queue_head == NULL && !do_shutdown) + * something comes in. */ + if (cache_queue_head == NULL) { status = pthread_cond_wait (&queue_cond, &cache_lock); if ((status != 0) && (status != ETIMEDOUT)) @@ -922,9 +924,6 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_cond_broadcast(&ci->flushed); pthread_mutex_unlock(&cache_lock); - rrd_free_ptrs((void ***) &values, &values_num); - free(file); - if (status == 0) { pthread_mutex_lock (&stats_lock); @@ -933,6 +932,9 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_unlock (&stats_lock); } + rrd_free_ptrs((void ***) &values, &values_num); + free(file); + pthread_mutex_lock (&cache_lock); } pthread_mutex_unlock (&cache_lock); @@ -1372,6 +1374,10 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ free_cache_item (ci); ci = tmp; } + + /* state may have changed while we were unlocked */ + if (state == SHUTDOWN) + return -1; } /* }}} */ assert (ci != NULL); @@ -2003,7 +2009,7 @@ static void *connection_thread_main (void *args) /* {{{ */ connection_threads_num++; pthread_mutex_unlock (&connection_threads_lock); - while (do_shutdown == 0) + while (state == RUNNING) { char *cmd; ssize_t cmd_len; @@ -2018,7 +2024,7 @@ static void *connection_thread_main (void *args) /* {{{ */ pollfd.revents = 0; status = poll (&pollfd, 1, /* timeout = */ 500); - if (do_shutdown) + if (state != RUNNING) break; else if (status == 0) /* timeout */ continue; @@ -2323,7 +2329,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ RRDD_LOG(LOG_INFO, "listening for connections"); - while (do_shutdown == 0) + while (state == RUNNING) { for (i = 0; i < pollfds_num; i++) { @@ -2333,7 +2339,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } status = poll (pollfds, pollfds_num, /* timeout = */ 1000); - if (do_shutdown) + if (state != RUNNING) break; else if (status == 0) /* timeout */ continue; @@ -2396,7 +2402,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ continue; } } /* for (pollfds_num) */ - } /* while (do_shutdown == 0) */ + } /* while (state == RUNNING) */ RRDD_LOG(LOG_INFO, "starting shutdown"); @@ -2508,8 +2514,6 @@ error: static int cleanup (void) /* {{{ */ { - do_shutdown++; - pthread_cond_broadcast (&flush_cond); pthread_join (flush_thread, NULL);