O(queue size). This improves performance of individual flushes when
there is a large number of files in the queue. As a result, we don't
hold the cache_lock as much.
Revamped enqueue_cache_item to take advantage of the new structure.
Renamed _wipe_ci_values to look nicer with other code.
--kevin
git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk/program@1555
a5681a0c-68f1-0310-ab6d-
d61299d08faa
+---+----+---+ +------+-----+ +---+----+---+
! File: foo ! ! File: bar ! ! File: qux !
! First: 101 ! ! First: 119 ! ! First: 180 !
+---+----+---+ +------+-----+ +---+----+---+
! File: foo ! ! File: bar ! ! File: qux !
! First: 101 ! ! First: 119 ! ! First: 180 !
- ! Next: ---+--->! Next: ---+---> ... --->! Next: - !
+ ! Next:&bar -+--->! Next:&... -+---> ... --->! Next:NULL !
+ | Prev:NULL !<---+-Prev:&foo !<--- ... ----+-Prev: &... !
+============+ +============+ +============+
! Time: 100 ! ! Time: 120 ! ! Time: 180 !
! Value: 10 ! ! Value: 0.1 ! ! Value: 2,2 !
+============+ +============+ +============+
! Time: 100 ! ! Time: 120 ! ! Time: 180 !
! Value: 10 ! ! Value: 0.1 ! ! Value: 2,2 !
#define CI_FLAGS_IN_QUEUE (1<<1)
int flags;
pthread_cond_t flushed;
#define CI_FLAGS_IN_QUEUE (1<<1)
int flags;
pthread_cond_t flushed;
return (0);
} /* }}} ssize_t swrite */
return (0);
} /* }}} ssize_t swrite */
-static void _wipe_ci_values(cache_item_t *ci, time_t when)
+static void wipe_ci_values(cache_item_t *ci, time_t when)
{
ci->values = NULL;
ci->values_num = 0;
{
ci->values = NULL;
ci->values_num = 0;
ci->last_flush_time = when;
if (config_write_jitter > 0)
ci->last_flush_time += (random() % config_write_jitter);
ci->last_flush_time = when;
if (config_write_jitter > 0)
ci->last_flush_time += (random() % config_write_jitter);
-
- ci->flags &= ~(CI_FLAGS_IN_QUEUE);
+/* remove_from_queue
+ * remove a "cache_item_t" item from the queue.
+ * must hold 'cache_lock' when calling this
+ */
+static void remove_from_queue(cache_item_t *ci) /* {{{ */
+{
+ if (ci == NULL) return;
+
+ if (ci->prev == NULL)
+ cache_queue_head = ci->next; /* reset head */
+ else
+ ci->prev->next = ci->next;
+
+ if (ci->next == NULL)
+ cache_queue_tail = ci->prev; /* reset the tail */
+ else
+ ci->next->prev = ci->prev;
+
+ ci->next = ci->prev = NULL;
+ ci->flags &= ~CI_FLAGS_IN_QUEUE;
+} /* }}} static void remove_from_queue */
+
/*
* enqueue_cache_item:
* `cache_lock' must be acquired before calling this function!
/*
* enqueue_cache_item:
* `cache_lock' must be acquired before calling this function!
static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
queue_side_t side)
{
static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
queue_side_t side)
{
if (ci == NULL)
return (-1);
if (ci == NULL)
return (-1);
- if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
- {
- assert (ci->next == NULL);
- ci->next = cache_queue_head;
- cache_queue_head = ci;
-
- if (cache_queue_tail == NULL)
- cache_queue_tail = cache_queue_head;
+ if (cache_queue_head == ci)
+ return 0;
- did_insert = 1;
- }
- else if (cache_queue_head == ci)
- {
- /* do nothing */
- }
- else /* enqueued, but not first entry */
- {
- cache_item_t *prev;
+ /* remove from the double linked list */
+ if (ci->flags & CI_FLAGS_IN_QUEUE)
+ remove_from_queue(ci);
- /* find previous entry */
- for (prev = cache_queue_head; prev != NULL; prev = prev->next)
- if (prev->next == ci)
- break;
- assert (prev != NULL);
+ ci->prev = NULL;
+ ci->next = cache_queue_head;
+ if (ci->next != NULL)
+ ci->next->prev = ci;
+ cache_queue_head = ci;
- /* move to the front */
- prev->next = ci->next;
- ci->next = cache_queue_head;
- cache_queue_head = ci;
-
- /* check if we need to adapt the tail */
- if (cache_queue_tail == ci)
- cache_queue_tail = prev;
- }
+ if (cache_queue_tail == NULL)
+ cache_queue_tail = cache_queue_head;
}
else /* (side == TAIL) */
{
/* We don't move values back in the list.. */
}
else /* (side == TAIL) */
{
/* We don't move values back in the list.. */
- if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
+ if (ci->flags & CI_FLAGS_IN_QUEUE)
return (0);
assert (ci->next == NULL);
return (0);
assert (ci->next == NULL);
+ assert (ci->prev == NULL);
+
+ ci->prev = cache_queue_tail;
if (cache_queue_tail == NULL)
cache_queue_head = ci;
else
cache_queue_tail->next = ci;
if (cache_queue_tail == NULL)
cache_queue_head = ci;
else
cache_queue_tail->next = ci;
}
ci->flags |= CI_FLAGS_IN_QUEUE;
}
ci->flags |= CI_FLAGS_IN_QUEUE;
- if (did_insert)
- {
- pthread_cond_broadcast(&cache_cond);
- pthread_mutex_lock (&stats_lock);
- stats_queue_length++;
- pthread_mutex_unlock (&stats_lock);
- }
+ pthread_cond_broadcast(&cache_cond);
+ pthread_mutex_lock (&stats_lock);
+ stats_queue_length++;
+ pthread_mutex_unlock (&stats_lock);
return (0);
} /* }}} int enqueue_cache_item */
return (0);
} /* }}} int enqueue_cache_item */
values = ci->values;
values_num = ci->values_num;
values = ci->values;
values_num = ci->values_num;
- _wipe_ci_values(ci, time(NULL));
-
- cache_queue_head = ci->next;
- if (cache_queue_head == NULL)
- cache_queue_tail = NULL;
- ci->next = NULL;
+ wipe_ci_values(ci, time(NULL));
+ remove_from_queue(ci);
pthread_mutex_lock (&stats_lock);
assert (stats_queue_length > 0);
pthread_mutex_lock (&stats_lock);
assert (stats_queue_length > 0);
- _wipe_ci_values(ci, now);
+ wipe_ci_values(ci, now);
ci->flags = CI_FLAGS_IN_TREE;
pthread_mutex_lock(&cache_lock);
ci->flags = CI_FLAGS_IN_TREE;
pthread_mutex_lock(&cache_lock);
- _wipe_ci_values(ci, time(NULL));
+ wipe_ci_values(ci, time(NULL));
+ remove_from_queue(ci);
pthread_mutex_unlock(&cache_lock);
return (0);
pthread_mutex_unlock(&cache_lock);
return (0);