/**
* collectd - src/perl.c
- * Copyright (C) 2007, 2008 Sebastian Harl
+ * Copyright (C) 2007-2009 Sebastian Harl
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
} while (0)
/*
+ * Public variable
+ */
+extern char **environ;
+
+/*
* private variables
*/
{ "Collectd::TYPE_DATASET", PLUGIN_DATASET },
{ "Collectd::DS_TYPE_COUNTER", DS_TYPE_COUNTER },
{ "Collectd::DS_TYPE_GAUGE", DS_TYPE_GAUGE },
+ { "Collectd::DS_TYPE_DERIVE", DS_TYPE_DERIVE },
+ { "Collectd::DS_TYPE_ABSOLUTE", DS_TYPE_ABSOLUTE },
{ "Collectd::LOG_ERR", LOG_ERR },
{ "Collectd::LOG_WARNING", LOG_WARNING },
{ "Collectd::LOG_NOTICE", LOG_NOTICE },
if (NULL != (tmp = hv_fetch (hash, "type", 4, 0))) {
ds->type = SvIV (*tmp);
- if ((DS_TYPE_COUNTER != ds->type) && (DS_TYPE_GAUGE != ds->type)) {
+ if ((DS_TYPE_COUNTER != ds->type)
+ && (DS_TYPE_GAUGE != ds->type)
+ && (DS_TYPE_DERIVE != ds->type)
+ && (DS_TYPE_ABSOLUTE != ds->type)) {
log_err ("hv2data_source: Invalid DS type.");
return -1;
}
if (NULL != tmp) {
if (DS_TYPE_COUNTER == ds->ds[i].type)
value[i].counter = SvIV (*tmp);
- else
+ else if (DS_TYPE_GAUGE == ds->ds[i].type)
value[i].gauge = SvNV (*tmp);
+ else if (DS_TYPE_DERIVE == ds->ds[i].type)
+ value[i].derive = SvIV (*tmp);
+ else if (DS_TYPE_ABSOLUTE == ds->ds[i].type)
+ value[i].absolute = SvIV (*tmp);
}
else {
return -1;
if (DS_TYPE_COUNTER == ds->ds[i].type)
val = newSViv (vl->values[i].counter);
- else
+ else if (DS_TYPE_GAUGE == ds->ds[i].type)
val = newSVnv (vl->values[i].gauge);
+ else if (DS_TYPE_DERIVE == ds->ds[i].type)
+ val = newSViv (vl->values[i].derive);
+ else if (DS_TYPE_ABSOLUTE == ds->ds[i].type)
+ val = newSViv (vl->values[i].absolute);
if (NULL == av_store (values, i, val)) {
av_undef (values);
aTHX = t->interp;
}
+ /* Assert that we're not running as the base thread. Otherwise, we might
+ * run into concurrency issues with c_ithread_create(). See
+ * https://github.com/collectd/collectd/issues/9 for details. */
+ assert (aTHX != perl_threads->head->interp);
+
log_debug ("perl_read: c_ithread: interp = %p (active threads: %i)",
aTHX, perl_threads->number_of_threads);
return pplugin_call_all (aTHX_ PLUGIN_READ);
static int perl_write (const data_set_t *ds, const value_list_t *vl,
user_data_t __attribute__((unused)) *user_data)
{
+ int status;
dTHX;
if (NULL == perl_threads)
aTHX = t->interp;
}
+ /* Lock the base thread if this is not called from one of the read threads
+ * to avoid race conditions with c_ithread_create(). See
+ * https://github.com/collectd/collectd/issues/9 for details. */
+ if (aTHX == perl_threads->head->interp)
+ pthread_mutex_lock (&perl_threads->mutex);
+
log_debug ("perl_write: c_ithread: interp = %p (active threads: %i)",
aTHX, perl_threads->number_of_threads);
- return pplugin_call_all (aTHX_ PLUGIN_WRITE, ds, vl);
+ status = pplugin_call_all (aTHX_ PLUGIN_WRITE, ds, vl);
+
+ if (aTHX == perl_threads->head->interp)
+ pthread_mutex_unlock (&perl_threads->mutex);
+
+ return status;
} /* static int perl_write (const data_set_t *, const value_list_t *) */
-static void perl_log (int level, const char *msg)
+static void perl_log (int level, const char *msg,
+ user_data_t __attribute__((unused)) *user_data)
{
dTHX;
aTHX = t->interp;
}
+ /* Lock the base thread if this is not called from one of the read threads
+ * to avoid race conditions with c_ithread_create(). See
+ * https://github.com/collectd/collectd/issues/9 for details. */
+ if (aTHX == perl_threads->head->interp)
+ pthread_mutex_lock (&perl_threads->mutex);
+
pplugin_call_all (aTHX_ PLUGIN_LOG, level, msg);
+
+ if (aTHX == perl_threads->head->interp)
+ pthread_mutex_unlock (&perl_threads->mutex);
+
return;
} /* static void perl_log (int, const char *) */
-static int perl_notify (const notification_t *notif)
+static int perl_notify (const notification_t *notif,
+ user_data_t __attribute__((unused)) *user_data)
{
dTHX;
return pplugin_call_all (aTHX_ PLUGIN_NOTIF, notif);
} /* static int perl_notify (const notification_t *) */
-static int perl_flush (int timeout, const char *identifier)
+static int perl_flush (int timeout, const char *identifier,
+ user_data_t __attribute__((unused)) *user_data)
{
dTHX;
perl_run (aTHX);
- plugin_register_log ("perl", perl_log);
- plugin_register_notification ("perl", perl_notify);
+ plugin_register_log ("perl", perl_log, /* user_data = */ NULL);
+ plugin_register_notification ("perl", perl_notify,
+ /* user_data = */ NULL);
plugin_register_init ("perl", perl_init);
plugin_register_read ("perl", perl_read);
plugin_register_write ("perl", perl_write, /* user_data = */ NULL);
- plugin_register_flush ("perl", perl_flush);
+ plugin_register_flush ("perl", perl_flush, /* user_data = */ NULL);
plugin_register_shutdown ("perl", perl_shutdown);
return 0;
} /* static int init_pi (const char **, const int) */