{
char *file;
char **values;
- int values_num;
+ size_t values_num;
time_t last_flush_time;
time_t last_update_stamp;
#define CI_FLAGS_IN_TREE (1<<0)
static listen_socket_t *listen_fds = NULL;
static size_t listen_fds_num = 0;
-static int do_shutdown = 0;
+enum {
+ RUNNING, /* normal operation */
+ FLUSHING, /* flushing remaining values */
+ SHUTDOWN /* shutting down */
+} state = RUNNING;
static pthread_t *queue_threads;
static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
static int config_write_base_only = 0;
static listen_socket_t **config_listen_address_list = NULL;
-static int config_listen_address_list_len = 0;
+static size_t config_listen_address_list_len = 0;
static uint64_t stats_queue_length = 0;
static uint64_t stats_updates_received = 0;
static void sig_common (const char *sig) /* {{{ */
{
RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig);
- do_shutdown++;
+ state = FLUSHING;
pthread_cond_broadcast(&flush_cond);
pthread_cond_broadcast(&queue_cond);
} /* }}} void sig_common */
}
lseek(pid_fd, 0, SEEK_SET);
- ftruncate(pid_fd, 0);
+ if (ftruncate(pid_fd, 0) == -1)
+ {
+ fprintf(stderr,
+ "FATAL: Faild to truncate stale PID file. (pid %d)\n", pid);
+ close(pid_fd);
+ return -1;
+ }
fprintf(stderr,
"rrdcached: removed stale PID file (no rrdcached on pid %d)\n"
remove_from_queue(ci);
- for (int i=0; i < ci->values_num; i++)
+ for (size_t i=0; i < ci->values_num; i++)
free(ci->values[i]);
free (ci->values);
/* in case anyone is waiting */
pthread_cond_broadcast(&ci->flushed);
+ pthread_cond_destroy(&ci->flushed);
free (ci);
if (ci->flags & CI_FLAGS_IN_QUEUE)
return FALSE;
- if ((ci->last_flush_time <= cfd->abs_timeout)
- && (ci->values_num > 0))
- {
- enqueue_cache_item (ci, TAIL);
- }
- else if ((do_shutdown != 0)
- && (ci->values_num > 0))
+ if (ci->values_num > 0
+ && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING))
{
enqueue_cache_item (ci, TAIL);
}
else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
&& (ci->values_num <= 0))
{
- char **temp;
-
- temp = (char **) rrd_realloc (cfd->keys,
- sizeof (char *) * (cfd->keys_num + 1));
- if (temp == NULL)
+ assert ((char *) key == ci->file);
+ if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key))
{
- RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
+ RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed.");
return (FALSE);
}
- cfd->keys = temp;
- /* Make really sure this points to the _same_ place */
- assert ((char *) key == ci->file);
- cfd->keys[cfd->keys_num] = (char *) key;
- cfd->keys_num++;
}
return (FALSE);
pthread_mutex_lock(&cache_lock);
- while (!do_shutdown)
+ while (state == RUNNING)
{
gettimeofday (&now, NULL);
if ((now.tv_sec > next_flush.tv_sec)
|| ((now.tv_sec == next_flush.tv_sec)
&& ((1000 * now.tv_usec) > next_flush.tv_nsec)))
{
+ RRDD_LOG(LOG_DEBUG, "flushing old values");
+
+ /* Determine the time of the next cache flush. */
+ next_flush.tv_sec = now.tv_sec + config_flush_interval;
+
/* Flush all values that haven't been written in the last
* `config_write_interval' seconds. */
flush_old_values (config_write_interval);
- /* Determine the time of the next cache flush. */
- next_flush.tv_sec =
- now.tv_sec + next_flush.tv_sec % config_flush_interval;
-
/* unlock the cache while we rotate so we don't block incoming
* updates if the fsync() blocks on disk I/O */
pthread_mutex_unlock(&cache_lock);
if (config_flush_at_shutdown)
flush_old_values (-1); /* flush everything */
+ state = SHUTDOWN;
+
pthread_mutex_unlock(&cache_lock);
return NULL;
{
pthread_mutex_lock (&cache_lock);
- while (!do_shutdown
+ while (state != SHUTDOWN
|| (cache_queue_head != NULL && config_flush_at_shutdown))
{
cache_item_t *ci;
char *file;
char **values;
- int values_num;
+ size_t values_num;
int status;
- int i;
/* Now, check if there's something to store away. If not, wait until
- * something comes in. if we are shutting down, do not wait around. */
- if (cache_queue_head == NULL && !do_shutdown)
+ * something comes in. */
+ if (cache_queue_head == NULL)
{
status = pthread_cond_wait (&queue_cond, &cache_lock);
if ((status != 0) && (status != ETIMEDOUT))
pthread_mutex_unlock (&cache_lock);
rrd_clear_error ();
- status = rrd_update_r (file, NULL, values_num, (void *) values);
+ status = rrd_update_r (file, NULL, (int) values_num, (void *) values);
if (status != 0)
{
RRDD_LOG (LOG_NOTICE, "queue_thread_main: "
}
journal_write("wrote", file);
- pthread_cond_broadcast(&ci->flushed);
- for (i = 0; i < values_num; i++)
- free (values[i]);
-
- free(values);
- free(file);
+ /* Search again in the tree. It's possible someone issued a "FORGET"
+ * while we were writing the update values. */
+ pthread_mutex_lock(&cache_lock);
+ ci = (cache_item_t *) g_tree_lookup(cache_tree, file);
+ if (ci)
+ pthread_cond_broadcast(&ci->flushed);
+ pthread_mutex_unlock(&cache_lock);
if (status == 0)
{
pthread_mutex_unlock (&stats_lock);
}
+ rrd_free_ptrs((void ***) &values, &values_num);
+ free(file);
+
pthread_mutex_lock (&cache_lock);
}
pthread_mutex_unlock (&cache_lock);
return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT));
}
- for (int i=0; i < ci->values_num; i++)
+ for (size_t i=0; i < ci->values_num; i++)
add_response_info(sock, "%s\n", ci->values[i]);
pthread_mutex_unlock(&cache_lock);
if (ci == NULL) /* {{{ */
{
struct stat statbuf;
+ cache_item_t *tmp;
/* don't hold the lock while we setup; stat(2) might block */
pthread_mutex_unlock(&cache_lock);
pthread_cond_init(&ci->flushed, NULL);
pthread_mutex_lock(&cache_lock);
- g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
+
+ /* another UPDATE might have added this entry in the meantime */
+ tmp = g_tree_lookup (cache_tree, file);
+ if (tmp == NULL)
+ g_tree_replace (cache_tree, (void *) ci->file, (void *) ci);
+ else
+ {
+ free_cache_item (ci);
+ ci = tmp;
+ }
+
+ /* state may have changed while we were unlocked */
+ if (state == SHUTDOWN)
+ return -1;
} /* }}} */
assert (ci != NULL);
while (buffer_size > 0)
{
- char **temp;
char *value;
time_t stamp;
char *eostamp;
else
ci->last_update_stamp = stamp;
- temp = (char **) rrd_realloc (ci->values,
- sizeof (char *) * (ci->values_num + 1));
- if (temp == NULL)
+ if (!rrd_add_strdup(&ci->values, &ci->values_num, value))
{
- RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
+ RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed.");
continue;
}
- ci->values = temp;
-
- ci->values[ci->values_num] = strdup (value);
- if (ci->values[ci->values_num] == NULL)
- {
- RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
- continue;
- }
- ci->values_num++;
values_num++;
}
*/
static int handle_request_wrote (HANDLER_PROTO) /* {{{ */
{
- int i;
cache_item_t *ci;
const char *file = buffer;
}
if (ci->values)
- {
- for (i=0; i < ci->values_num; i++)
- free(ci->values[i]);
-
- free(ci->values);
- }
+ rrd_free_ptrs((void ***) &ci->values, &ci->values_num);
wipe_ci_values(ci, now);
remove_from_queue(ci);
connection_threads_num++;
pthread_mutex_unlock (&connection_threads_lock);
- while (do_shutdown == 0)
+ while (state == RUNNING)
{
char *cmd;
ssize_t cmd_len;
pollfd.revents = 0;
status = poll (&pollfd, 1, /* timeout = */ 500);
- if (do_shutdown)
+ if (state != RUNNING)
break;
else if (status == 0) /* timeout */
continue;
RRDD_LOG(LOG_INFO, "listening for connections");
- while (do_shutdown == 0)
+ while (state == RUNNING)
{
for (i = 0; i < pollfds_num; i++)
{
}
status = poll (pollfds, pollfds_num, /* timeout = */ 1000);
- if (do_shutdown)
+ if (state != RUNNING)
break;
else if (status == 0) /* timeout */
continue;
continue;
}
} /* for (pollfds_num) */
- } /* while (do_shutdown == 0) */
+ } /* while (state == RUNNING) */
RRDD_LOG(LOG_INFO, "starting shutdown");
/* open all the listen sockets */
if (config_listen_address_list_len > 0)
{
- for (int i = 0; i < config_listen_address_list_len; i++)
- {
+ for (size_t i = 0; i < config_listen_address_list_len; i++)
open_listen_socket (config_listen_address_list[i]);
- free_listen_socket (config_listen_address_list[i]);
- }
- free(config_listen_address_list);
+ rrd_free_ptrs((void ***) &config_listen_address_list,
+ &config_listen_address_list_len);
}
else
{
close (0);
open ("/dev/null", O_RDWR);
- dup (0);
- dup (0);
+ if (dup(0) == -1 || dup(0) == -1){
+ RRDD_LOG (LOG_ERR, "faild to run dup.\n");
+ }
} /* if (!stay_foreground) */
/* Change into the /tmp directory. */
static int cleanup (void) /* {{{ */
{
- do_shutdown++;
-
pthread_cond_broadcast (&flush_cond);
pthread_join (flush_thread, NULL);
case 'L':
case 'l':
{
- listen_socket_t **temp;
listen_socket_t *new;
new = malloc(sizeof(listen_socket_t));
}
memset(new, 0, sizeof(listen_socket_t));
- temp = (listen_socket_t **) rrd_realloc (config_listen_address_list,
- sizeof (listen_socket_t *) * (config_listen_address_list_len + 1));
- if (temp == NULL)
- {
- fprintf (stderr, "read_options: realloc failed.\n");
- return (2);
- }
- config_listen_address_list = temp;
-
strncpy(new->addr, optarg, sizeof(new->addr)-1);
new->privilege = (option == 'l') ? PRIV_HIGH : PRIV_LOW;
- temp[config_listen_address_list_len] = new;
- config_listen_address_list_len++;
+ if (!rrd_add_ptr((void ***)&config_listen_address_list,
+ &config_listen_address_list_len, new))
+ {
+ fprintf(stderr, "read_options: rrd_add_ptr failed.\n");
+ return (2);
+ }
}
break;