+static void plugin_value_list_free (value_list_t *vl) /* {{{ */
+{
+ if (vl == NULL)
+ return;
+
+ meta_data_destroy (vl->meta);
+ sfree (vl->values);
+ sfree (vl);
+} /* }}} void plugin_value_list_free */
+
+static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */
+{
+ value_list_t *vl;
+
+ if (vl_orig == NULL)
+ return (NULL);
+
+ vl = malloc (sizeof (*vl));
+ if (vl == NULL)
+ return (NULL);
+ memcpy (vl, vl_orig, sizeof (*vl));
+
+ vl->values = calloc (vl_orig->values_len, sizeof (*vl->values));
+ if (vl->values == NULL)
+ {
+ plugin_value_list_free (vl);
+ return (NULL);
+ }
+ memcpy (vl->values, vl_orig->values,
+ vl_orig->values_len * sizeof (*vl->values));
+
+ vl->meta = meta_data_clone (vl->meta);
+ if ((vl_orig->meta != NULL) && (vl->meta == NULL))
+ {
+ plugin_value_list_free (vl);
+ return (NULL);
+ }
+
+ if (vl->time == 0)
+ vl->time = cdtime ();
+
+ /* Fill in the interval from the thread context, if it is zero. */
+ if (vl->interval == 0)
+ {
+ plugin_ctx_t ctx = plugin_get_ctx ();
+
+ if (ctx.interval != 0)
+ vl->interval = ctx.interval;
+ else
+ {
+ char name[6 * DATA_MAX_NAME_LEN];
+ FORMAT_VL (name, sizeof (name), vl);
+ ERROR ("plugin_value_list_clone: Unable to determine "
+ "interval from context for "
+ "value list \"%s\". "
+ "This indicates a broken plugin. "
+ "Please report this problem to the "
+ "collectd mailing list or at "
+ "<http://collectd.org/bugs/>.", name);
+ vl->interval = cf_get_default_interval ();
+ }
+ }
+
+ return (vl);
+} /* }}} value_list_t *plugin_value_list_clone */
+
+static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */
+{
+ write_queue_t *q;
+
+ q = malloc (sizeof (*q));
+ if (q == NULL)
+ return (ENOMEM);
+ q->next = NULL;
+
+ q->vl = plugin_value_list_clone (vl);
+ if (q->vl == NULL)
+ {
+ sfree (q);
+ return (ENOMEM);
+ }
+
+ /* Store context of caller (read plugin); otherwise, it would not be
+ * available to the write plugins when actually dispatching the
+ * value-list later on. */
+ q->ctx = plugin_get_ctx ();
+
+ pthread_mutex_lock (&write_lock);
+
+ if (write_queue_tail == NULL)
+ {
+ write_queue_head = q;
+ write_queue_tail = q;
+ write_queue_length = 1;
+ }
+ else
+ {
+ write_queue_tail->next = q;
+ write_queue_tail = q;
+ write_queue_length += 1;
+ }
+
+ pthread_cond_signal (&write_cond);
+ pthread_mutex_unlock (&write_lock);
+
+ return (0);
+} /* }}} int plugin_write_enqueue */
+
+static value_list_t *plugin_write_dequeue (void) /* {{{ */
+{
+ write_queue_t *q;
+ value_list_t *vl;
+
+ pthread_mutex_lock (&write_lock);
+
+ while (write_loop && (write_queue_head == NULL))
+ pthread_cond_wait (&write_cond, &write_lock);
+
+ if (write_queue_head == NULL)
+ {
+ pthread_mutex_unlock (&write_lock);
+ return (NULL);
+ }
+
+ q = write_queue_head;
+ write_queue_head = q->next;
+ write_queue_length -= 1;
+ if (write_queue_head == NULL) {
+ write_queue_tail = NULL;
+ assert(0 == write_queue_length);
+ }
+
+ pthread_mutex_unlock (&write_lock);
+
+ (void) plugin_set_ctx (q->ctx);
+
+ vl = q->vl;
+ sfree (q);
+ return (vl);
+} /* }}} value_list_t *plugin_write_dequeue */
+
+static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */
+{
+ while (write_loop)
+ {
+ value_list_t *vl = plugin_write_dequeue ();
+ if (vl == NULL)
+ continue;
+
+ plugin_dispatch_values_internal (vl);
+
+ plugin_value_list_free (vl);
+ }
+
+ pthread_exit (NULL);
+ return ((void *) 0);
+} /* }}} void *plugin_write_thread */
+
+static void start_write_threads (size_t num) /* {{{ */
+{
+ size_t i;
+
+ if (write_threads != NULL)
+ return;
+
+ write_threads = (pthread_t *) calloc (num, sizeof (pthread_t));
+ if (write_threads == NULL)
+ {
+ ERROR ("plugin: start_write_threads: calloc failed.");
+ return;
+ }
+
+ write_threads_num = 0;
+ for (i = 0; i < num; i++)
+ {
+ int status;
+
+ status = pthread_create (write_threads + write_threads_num,
+ /* attr = */ NULL,
+ plugin_write_thread,
+ /* arg = */ NULL);
+ if (status != 0)
+ {
+ char errbuf[1024];
+ ERROR ("plugin: start_write_threads: pthread_create failed "
+ "with status %i (%s).", status,
+ sstrerror (status, errbuf, sizeof (errbuf)));
+ return;
+ }
+
+ write_threads_num++;
+ } /* for (i) */
+} /* }}} void start_write_threads */
+
+static void stop_write_threads (void) /* {{{ */
+{
+ write_queue_t *q;
+ int i;
+
+ if (write_threads == NULL)
+ return;
+
+ INFO ("collectd: Stopping %zu write threads.", write_threads_num);
+
+ pthread_mutex_lock (&write_lock);
+ write_loop = 0;
+ DEBUG ("plugin: stop_write_threads: Signalling `write_cond'");
+ pthread_cond_broadcast (&write_cond);
+ pthread_mutex_unlock (&write_lock);
+
+ for (i = 0; i < write_threads_num; i++)
+ {
+ if (pthread_join (write_threads[i], NULL) != 0)
+ {
+ ERROR ("plugin: stop_write_threads: pthread_join failed.");
+ }
+ write_threads[i] = (pthread_t) 0;
+ }
+ sfree (write_threads);
+ write_threads_num = 0;
+
+ pthread_mutex_lock (&write_lock);
+ i = 0;
+ for (q = write_queue_head; q != NULL; )
+ {
+ write_queue_t *q1 = q;
+ plugin_value_list_free (q->vl);
+ q = q->next;
+ sfree (q1);
+ i++;
+ }
+ write_queue_head = NULL;
+ write_queue_tail = NULL;
+ write_queue_length = 0;
+ pthread_mutex_unlock (&write_lock);
+
+ if (i > 0)
+ {
+ WARNING ("plugin: %i value list%s left after shutting down "
+ "the write threads.",
+ i, (i == 1) ? " was" : "s were");
+ }
+} /* }}} void stop_write_threads */
+