+static int rrd_write_to_file (char *filename, char **values, int values_num)
+{
+ char **argv;
+ int argc;
+ int status;
+
+ if (values_num < 1)
+ return (0);
+
+ argc = values_num + 2;
+ argv = (char **) malloc ((argc + 1) * sizeof (char *));
+ if (argv == NULL)
+ return (-1);
+
+ argv[0] = "update";
+ argv[1] = filename;
+ memcpy (argv + 2, values, values_num * sizeof (char *));
+ argv[argc] = NULL;
+
+ DEBUG ("rrd_update (argc = %i, argv = %p)", argc, (void *) argv);
+
+ optind = 0; /* bug in librrd? */
+ rrd_clear_error ();
+ status = rrd_update (argc, argv);
+ if (status != 0)
+ {
+ WARNING ("rrd_update failed: %s: %s",
+ filename, rrd_get_error ());
+ status = -1;
+ }
+
+ sfree (argv);
+
+ return (status);
+} /* int rrd_write_cache_entry */
+
+static void *rrd_queue_thread (void *data)
+{
+ while (42)
+ {
+ rrd_queue_t *queue_entry;
+ rrd_cache_t *cache_entry;
+ char **values;
+ int values_num;
+ int i;
+
+ /* XXX: If you need to lock both, cache_lock and queue_lock, at
+ * the same time, ALWAYS lock `cache_lock' first! */
+
+ /* wait until an entry is available */
+ pthread_mutex_lock (&queue_lock);
+ while ((queue_head == NULL) && (do_shutdown == 0))
+ pthread_cond_wait (&queue_cond, &queue_lock);
+
+ /* We're in the shutdown phase */
+ if (queue_head == NULL)
+ {
+ pthread_mutex_unlock (&queue_lock);
+ break;
+ }
+
+ /* Dequeue the first entry */
+ queue_entry = queue_head;
+ if (queue_head == queue_tail)
+ queue_head = queue_tail = NULL;
+ else
+ queue_head = queue_head->next;
+
+ /* Unlock the queue again */
+ pthread_mutex_unlock (&queue_lock);
+
+ /* We now need the cache lock so the entry isn't updated while
+ * we make a copy of it's values */
+ pthread_mutex_lock (&cache_lock);
+
+ avl_get (cache, queue_entry->filename, (void *) &cache_entry);
+
+ values = cache_entry->values;
+ values_num = cache_entry->values_num;
+
+ cache_entry->values = NULL;
+ cache_entry->values_num = 0;
+ cache_entry->flags = FLAG_NONE;
+
+ pthread_mutex_unlock (&cache_lock);
+
+ /* Write the values to the RRD-file */
+ rrd_write_to_file (queue_entry->filename, values, values_num);
+
+ for (i = 0; i < values_num; i++)
+ {
+ sfree (values[i]);
+ }
+ sfree (values);
+ sfree (queue_entry->filename);
+ sfree (queue_entry);
+ } /* while (42) */
+
+ pthread_mutex_lock (&cache_lock);
+ avl_destroy (cache);
+ cache = NULL;
+ pthread_mutex_unlock (&cache_lock);
+
+ pthread_exit ((void *) 0);
+ return ((void *) 0);
+} /* void *rrd_queue_thread */
+
+static int rrd_queue_cache_entry (const char *filename)
+{
+ rrd_queue_t *queue_entry;
+
+ queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t));
+ if (queue_entry == NULL)
+ return (-1);
+
+ queue_entry->filename = strdup (filename);
+ if (queue_entry->filename == NULL)
+ {
+ free (queue_entry);
+ return (-1);
+ }
+
+ queue_entry->next = NULL;
+
+ pthread_mutex_lock (&queue_lock);
+ if (queue_tail == NULL)
+ queue_head = queue_entry;
+ else
+ queue_tail->next = queue_entry;
+ queue_tail = queue_entry;
+ pthread_cond_signal (&queue_cond);
+ pthread_mutex_unlock (&queue_lock);
+
+ DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename);
+
+ return (0);
+} /* int rrd_queue_cache_entry */
+
+static void rrd_cache_flush (int timeout)
+{
+ rrd_cache_t *rc;
+ time_t now;
+
+ char **keys = NULL;
+ int keys_num = 0;
+
+ char *key;
+ avl_iterator_t *iter;
+ int i;
+
+ DEBUG ("Flushing cache, timeout = %i", timeout);
+
+ now = time (NULL);
+
+ /* Build a list of entries to be flushed */
+ iter = avl_get_iterator (cache);
+ while (avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
+ {
+ DEBUG ("key = %s; age = %i;", key, now - rc->first_value);
+
+ if (rc->flags == FLAG_QUEUED)
+ continue;
+ else if ((now - rc->first_value) < timeout)
+ continue;
+ else if (rc->values_num > 0)
+ {
+ if (rrd_queue_cache_entry (key) == 0)
+ rc->flags = FLAG_QUEUED;
+ }
+ else /* ancient and no values -> waste of memory */
+ {
+ keys = (char **) realloc ((void *) keys,
+ (keys_num + 1) * sizeof (char *));
+ if (keys == NULL)
+ {
+ char errbuf[1024];
+ ERROR ("rrdtool plugin: "
+ "realloc failed: %s",
+ sstrerror (errno, errbuf,
+ sizeof (errbuf)));
+ avl_iterator_destroy (iter);
+ return;
+ }
+ keys[keys_num] = key;
+ keys_num++;
+ }
+ } /* while (avl_iterator_next) */
+ avl_iterator_destroy (iter);
+
+ for (i = 0; i < keys_num; i++)
+ {
+ if (avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0)
+ {
+ DEBUG ("avl_remove (%s) failed.", keys[i]);
+ continue;
+ }
+
+ assert (rc->values == NULL);
+ assert (rc->values_num == 0);
+
+ sfree (rc);
+ sfree (key);
+ keys[i] = NULL;
+ } /* for (i = 0..keys_num) */
+
+ free (keys);
+ DEBUG ("Flushed %i value(s)", keys_num);
+
+ cache_flush_last = now;
+} /* void rrd_cache_flush */
+
+static int rrd_cache_insert (const char *filename,
+ const char *value, time_t value_time)
+{
+ rrd_cache_t *rc = NULL;
+ int new_rc = 0;
+ char **values_new;
+
+ pthread_mutex_lock (&cache_lock);
+
+ avl_get (cache, filename, (void *) &rc);
+
+ if (rc == NULL)
+ {
+ rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
+ if (rc == NULL)
+ return (-1);
+ rc->values_num = 0;
+ rc->values = NULL;
+ rc->first_value = 0;
+ rc->last_value = 0;
+ rc->flags = FLAG_NONE;
+ new_rc = 1;
+ }
+
+ if (rc->last_value >= value_time)
+ {
+ pthread_mutex_unlock (&cache_lock);
+ WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)",
+ (unsigned int) rc->last_value,
+ (unsigned int) value_time);
+ return (-1);
+ }
+
+ values_new = (char **) realloc ((void *) rc->values,
+ (rc->values_num + 1) * sizeof (char *));
+ if (values_new == NULL)
+ {
+ char errbuf[1024];
+ void *cache_key = NULL;
+
+ sstrerror (errno, errbuf, sizeof (errbuf));
+
+ avl_remove (cache, filename, &cache_key, NULL);
+ pthread_mutex_unlock (&cache_lock);
+
+ ERROR ("rrdtool plugin: realloc failed: %s", errbuf);
+
+ sfree (cache_key);
+ sfree (rc->values);
+ sfree (rc);
+ return (-1);
+ }
+ rc->values = values_new;
+
+ rc->values[rc->values_num] = strdup (value);
+ if (rc->values[rc->values_num] != NULL)
+ rc->values_num++;
+
+ if (rc->values_num == 1)
+ rc->first_value = value_time;
+ rc->last_value = value_time;
+
+ /* Insert if this is the first value */
+ if (new_rc == 1)
+ {
+ void *cache_key = strdup (filename);
+
+ if (cache_key == NULL)
+ {
+ char errbuf[1024];
+ sstrerror (errno, errbuf, sizeof (errbuf));
+
+ pthread_mutex_unlock (&cache_lock);
+
+ ERROR ("rrdtool plugin: strdup failed: %s", errbuf);
+
+ sfree (rc->values[0]);
+ sfree (rc->values);
+ sfree (rc);
+ return (-1);
+ }
+
+ avl_insert (cache, cache_key, rc);
+ }
+
+ DEBUG ("rrd_cache_insert (%s, %s, %u) = %p", filename, value,
+ (unsigned int) value_time, (void *) rc);
+
+ if ((rc->last_value - rc->first_value) >= cache_timeout)
+ {
+ /* XXX: If you need to lock both, cache_lock and queue_lock, at
+ * the same time, ALWAYS lock `cache_lock' first! */
+ if (rc->flags != FLAG_QUEUED)
+ {
+ if (rrd_queue_cache_entry (filename) == 0)
+ rc->flags = FLAG_QUEUED;
+ }
+ else
+ {
+ DEBUG ("rrdtool plugin: `%s' is already queued.", filename);
+ }
+ }
+
+ if ((cache_timeout > 0) &&
+ ((time (NULL) - cache_flush_last) > cache_flush_timeout))
+ rrd_cache_flush (cache_flush_timeout);
+
+
+ pthread_mutex_unlock (&cache_lock);
+
+ return (0);
+} /* int rrd_cache_insert */
+