struct callback_flush_data_s
{
time_t now;
+ time_t abs_timeout;
char **keys;
size_t keys_num;
};
ci = (cache_item_t *) value;
cfd = (callback_flush_data_t *) data;
- if (((cfd->now - ci->last_flush_time) >= config_write_interval)
+ if ((ci->last_flush_time <= cfd->abs_timeout)
+ && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+ && (ci->values_num > 0))
+ {
+ enqueue_cache_item (ci, TAIL);
+ }
+ else if ((do_shutdown != 0)
&& ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
&& (ci->values_num > 0))
{
return (FALSE);
} /* }}} gboolean tree_callback_flush */
+static int flush_old_values (int max_age)
+{
+ callback_flush_data_t cfd;
+ size_t k;
+
+ memset (&cfd, 0, sizeof (cfd));
+ /* Pass the current time as user data so that we don't need to call
+ * `time' for each node. */
+ cfd.now = time (NULL);
+ cfd.keys = NULL;
+ cfd.keys_num = 0;
+
+ if (max_age > 0)
+ cfd.abs_timeout = cfd.now - max_age;
+ else
+ cfd.abs_timeout = cfd.now + 1;
+
+ /* `tree_callback_flush' will return the keys of all values that haven't
+ * been touched in the last `config_flush_interval' seconds in `cfd'.
+ * The char*'s in this array point to the same memory as ci->file, so we
+ * don't need to free them separately. */
+ g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
+
+ for (k = 0; k < cfd.keys_num; k++)
+ {
+ cache_item_t *ci;
+
+ /* This must not fail. */
+ ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
+ assert (ci != NULL);
+
+ /* If we end up here with values available, something's seriously
+ * messed up. */
+ assert (ci->values_num == 0);
+
+ /* Remove the node from the tree */
+ g_tree_remove (cache_tree, cfd.keys[k]);
+ cfd.keys[k] = NULL;
+
+ /* Now free and clean up `ci'. */
+ free (ci->file);
+ ci->file = NULL;
+ free (ci);
+ ci = NULL;
+ } /* for (k = 0; k < cfd.keys_num; k++) */
+
+ if (cfd.keys != NULL)
+ {
+ free (cfd.keys);
+ cfd.keys = NULL;
+ }
+
+ return (0);
+} /* int flush_old_values */
+
static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
{
struct timeval now;
|| ((now.tv_sec == next_flush.tv_sec)
&& ((1000 * now.tv_usec) > next_flush.tv_nsec)))
{
- callback_flush_data_t cfd;
- size_t k;
-
- memset (&cfd, 0, sizeof (cfd));
- /* Pass the current time as user data so that we don't need to call
- * `time' for each node. */
- cfd.now = time (NULL);
- cfd.keys = NULL;
- cfd.keys_num = 0;
-
- /* `tree_callback_flush' will return the keys of all values that haven't
- * been touched in the last `config_flush_interval' seconds in `cfd'.
- * The char*'s in this array point to the same memory as ci->file, so we
- * don't need to free them separately. */
- g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
-
- for (k = 0; k < cfd.keys_num; k++)
- {
- /* This must not fail. */
- ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
- assert (ci != NULL);
-
- /* If we end up here with values available, something's seriously
- * messed up. */
- assert (ci->values_num == 0);
-
- /* Remove the node from the tree */
- g_tree_remove (cache_tree, cfd.keys[k]);
- cfd.keys[k] = NULL;
-
- /* Now free and clean up `ci'. */
- free (ci->file);
- ci->file = NULL;
- free (ci);
- ci = NULL;
- } /* for (k = 0; k < cfd.keys_num; k++) */
-
- if (cfd.keys != NULL)
- {
- free (cfd.keys);
- cfd.keys = NULL;
- }
+ /* Flush all values that haven't been written in the last
+ * `config_write_interval' seconds. */
+ flush_old_values (config_write_interval);
/* Determine the time of the next cache flush. */
while (next_flush.tv_sec < now.tv_sec)
}
}
+ /* We're about to shut down, so lets flush the entire tree. */
+ if ((do_shutdown != 0) && (cache_queue_head == NULL))
+ flush_old_values (/* max age = */ -1);
+
/* Check if a value has arrived. This may be NULL if we timed out or there
* was an interrupt such as a signal. */
if (cache_queue_head == NULL)
for (i = 0; i < values_num; i++)
free (values[i]);
- pthread_mutex_lock (&stats_lock);
- stats_updates_written++;
- stats_data_sets_written += values_num;
- pthread_mutex_unlock (&stats_lock);
+ if (status == 0)
+ {
+ pthread_mutex_lock (&stats_lock);
+ stats_updates_written++;
+ stats_data_sets_written += values_num;
+ pthread_mutex_unlock (&stats_lock);
+ }
pthread_mutex_lock (&cache_lock);
pthread_cond_broadcast (&flush_cond);
- } /* while (do_shutdown == 0) */
+
+ /* We're about to shut down, so lets flush the entire tree. */
+ if ((do_shutdown != 0) && (cache_queue_head == NULL))
+ flush_old_values (/* max age = */ -1);
+ } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
pthread_mutex_unlock (&cache_lock);
return (NULL);
ci = g_tree_lookup (cache_tree, file);
if (ci == NULL) /* {{{ */
{
+ struct stat statbuf;
+
+ memset (&statbuf, 0, sizeof (statbuf));
+ status = stat (file, &statbuf);
+ if (status != 0)
+ {
+ pthread_mutex_unlock (&cache_lock);
+ RRDD_LOG (LOG_ERR, "handle_request_update: stat (%s) failed.", file);
+
+ status = errno;
+ if (status == ENOENT)
+ snprintf (answer, sizeof (answer), "-1 No such file: %s", file);
+ else
+ snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
+ status);
+ RRDD_UPDATE_SEND;
+ return (0);
+ }
+ if (!S_ISREG (statbuf.st_mode))
+ {
+ pthread_mutex_unlock (&cache_lock);
+
+ snprintf (answer, sizeof (answer), "-1 Not a regular file: %s", file);
+ RRDD_UPDATE_SEND;
+ return (0);
+ }
+
ci = (cache_item_t *) malloc (sizeof (cache_item_t));
if (ci == NULL)
{
struct sockaddr_storage client_sa;
socklen_t client_sa_size;
pthread_t tid;
+ pthread_attr_t attr;
if (pollfds[i].revents == 0)
continue;
continue;
}
- status = pthread_create (&tid, /* attr = */ NULL, connection_thread_main,
+ pthread_attr_init (&attr);
+ pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
+
+ status = pthread_create (&tid, &attr, connection_thread_main,
/* args = */ (void *) client_sd);
if (status != 0)
{