+ ds = va_arg (ap, data_set_t *);
+ vl = va_arg (ap, value_list_t *);
+
+ if (-1 == data_set2av (aTHX_ ds, pds)) {
+ av_clear (pds);
+ av_undef (pds);
+ pds = (AV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ if (-1 == value_list2hv (aTHX_ vl, ds, pvl)) {
+ hv_clear (pvl);
+ hv_undef (pvl);
+ pvl = (HV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ XPUSHs (sv_2mortal (newSVpv (ds->type, 0)));
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pds)));
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pvl)));
+ }
+ else if (PLUGIN_LOG == type) {
+ /*
+ * $_[0] = $level;
+ *
+ * $_[1] = $message;
+ */
+ XPUSHs (sv_2mortal (newSViv (va_arg (ap, int))));
+ XPUSHs (sv_2mortal (newSVpv (va_arg (ap, char *), 0)));
+ }
+ else if (PLUGIN_NOTIF == type) {
+ /*
+ * $_[0] =
+ * {
+ * severity => $severity,
+ * time => $time,
+ * message => $msg,
+ * host => $host,
+ * plugin => $plugin,
+ * type => $type,
+ * plugin_instance => $instance,
+ * type_instance => $type_instance
+ * };
+ */
+ notification_t *n;
+ HV *notif = newHV ();
+
+ n = va_arg (ap, notification_t *);
+
+ if (-1 == notification2hv (aTHX_ n, notif)) {
+ hv_clear (notif);
+ hv_undef (notif);
+ notif = (HV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)notif)));
+ }
+ else if (PLUGIN_FLUSH == type) {
+ /*
+ * $_[0] = $timeout;
+ * $_[1] = $identifier;
+ */
+ XPUSHs (sv_2mortal (newSViv (va_arg (ap, int))));
+ XPUSHs (sv_2mortal (newSVpv (va_arg (ap, char *), 0)));
+ }
+
+ PUTBACK;
+
+ retvals = call_pv ("Collectd::plugin_call_all", G_SCALAR);
+
+ SPAGAIN;
+ if (0 < retvals) {
+ SV *tmp = POPs;
+ if (! SvTRUE (tmp))
+ ret = -1;
+ }
+
+ PUTBACK;
+ FREETMPS;
+ LEAVE;
+
+ va_end (ap);
+ return ret;
+} /* static int pplugin_call_all (int, ...) */
+
+/*
+ * collectd's perl interpreter based thread implementation.
+ *
+ * This has been inspired by Perl's ithreads introduced in version 5.6.0.
+ */
+
+/* must be called with perl_threads->mutex locked */
+static void c_ithread_destroy (c_ithread_t *ithread)
+{
+ dTHXa (ithread->interp);
+
+ assert (NULL != perl_threads);
+
+ PERL_SET_CONTEXT (aTHX);
+ log_debug ("Shutting down Perl interpreter %p...", aTHX);
+
+#if COLLECT_DEBUG
+ sv_report_used ();
+
+ --perl_threads->number_of_threads;
+#endif /* COLLECT_DEBUG */
+
+ perl_destruct (aTHX);
+ perl_free (aTHX);
+
+ if (NULL == ithread->prev)
+ perl_threads->head = ithread->next;
+ else
+ ithread->prev->next = ithread->next;
+
+ if (NULL == ithread->next)
+ perl_threads->tail = ithread->prev;
+ else
+ ithread->next->prev = ithread->prev;
+
+ sfree (ithread);
+ return;
+} /* static void c_ithread_destroy (c_ithread_t *) */
+
+static void c_ithread_destructor (void *arg)
+{
+ c_ithread_t *ithread = (c_ithread_t *)arg;
+ c_ithread_t *t = NULL;
+
+ if (NULL == perl_threads)
+ return;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+
+ for (t = perl_threads->head; NULL != t; t = t->next)
+ if (t == ithread)
+ break;
+
+ /* the ithread no longer exists */
+ if (NULL == t)
+ return;
+
+ c_ithread_destroy (ithread);
+
+ pthread_mutex_unlock (&perl_threads->mutex);
+ return;
+} /* static void c_ithread_destructor (void *) */
+
+/* must be called with perl_threads->mutex locked */
+static c_ithread_t *c_ithread_create (PerlInterpreter *base)
+{
+ c_ithread_t *t = NULL;
+ dTHXa (NULL);
+
+ assert (NULL != perl_threads);
+
+ t = (c_ithread_t *)smalloc (sizeof (c_ithread_t));
+ memset (t, 0, sizeof (c_ithread_t));
+
+ t->interp = (NULL == base)
+ ? NULL
+ : perl_clone (base, CLONEf_KEEP_PTR_TABLE);
+
+ aTHX = t->interp;
+
+ if ((NULL != base) && (NULL != PL_endav)) {
+ av_clear (PL_endav);
+ av_undef (PL_endav);
+ PL_endav = Nullav;
+ }
+
+#if COLLECT_DEBUG
+ ++perl_threads->number_of_threads;
+#endif /* COLLECT_DEBUG */
+
+ t->next = NULL;
+
+ if (NULL == perl_threads->tail) {
+ perl_threads->head = t;
+ t->prev = NULL;
+ }
+ else {
+ perl_threads->tail->next = t;
+ t->prev = perl_threads->tail;
+ }
+
+ perl_threads->tail = t;
+
+ pthread_setspecific (perl_thr_key, (const void *)t);
+ return t;
+} /* static c_ithread_t *c_ithread_create (PerlInterpreter *) */
+
+/*
+ * Filter chains implementation.
+ */
+
+static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
+{
+ int retvals = 0;
+
+ va_list ap;
+ int ret = 0;
+
+ notification_meta_t **meta = NULL;
+ AV *pmeta = NULL;
+
+ dSP;
+
+ if ((type < 0) || (type >= FC_TYPES))
+ return -1;
+
+ if ((cb_type < 0) || (cb_type >= FC_CB_TYPES))
+ return -1;
+
+ va_start (ap, data);
+
+ ENTER;
+ SAVETMPS;
+
+ PUSHMARK (SP);
+
+ XPUSHs (sv_2mortal (newSViv ((IV)type)));
+ XPUSHs (sv_2mortal (newSVpv (data->name, 0)));
+ XPUSHs (sv_2mortal (newSViv ((IV)cb_type)));
+
+ if (FC_CB_CREATE == cb_type) {
+ /*
+ * $_[0] = $ci;
+ * $_[1] = $user_data;
+ */
+ oconfig_item_t *ci;
+ HV *config = newHV ();
+
+ ci = va_arg (ap, oconfig_item_t *);
+
+ if (0 != oconfig_item2hv (aTHX_ ci, config)) {
+ hv_clear (config);
+ hv_undef (config);
+ config = (HV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)config)));
+ }
+ else if (FC_CB_DESTROY == cb_type) {
+ /*
+ * $_[1] = $user_data;
+ */
+
+ /* nothing to be done - the user data pointer
+ * is pushed onto the stack later */
+ }
+ else if (FC_CB_EXEC == cb_type) {
+ /*
+ * $_[0] = $ds;
+ * $_[1] = $vl;
+ * $_[2] = $meta;
+ * $_[3] = $user_data;
+ */
+ data_set_t *ds;
+ value_list_t *vl;
+
+ AV *pds = newAV ();
+ HV *pvl = newHV ();
+
+ ds = va_arg (ap, data_set_t *);
+ vl = va_arg (ap, value_list_t *);
+ meta = va_arg (ap, notification_meta_t **);
+
+ if (0 != data_set2av (aTHX_ ds, pds)) {
+ av_clear (pds);
+ av_undef (pds);
+ pds = (AV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ if (0 != value_list2hv (aTHX_ vl, ds, pvl)) {
+ hv_clear (pvl);
+ hv_undef (pvl);
+ pvl = (HV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ if (NULL != meta) {
+ pmeta = newAV ();
+
+ if (0 != notification_meta2av (aTHX_ *meta, pmeta)) {
+ av_clear (pmeta);
+ av_undef (pmeta);
+ pmeta = (AV *)&PL_sv_undef;
+ ret = -1;
+ }
+ }
+ else {
+ pmeta = (AV *)&PL_sv_undef;
+ }
+
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pds)));
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pvl)));
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pmeta)));
+ }
+
+ XPUSHs (sv_2mortal (newRV_inc (data->user_data)));
+
+ PUTBACK;
+
+ retvals = call_pv ("Collectd::fc_call", G_SCALAR);
+
+ if ((FC_CB_EXEC == cb_type) && (meta != NULL)) {
+ assert (pmeta != NULL);
+
+ plugin_notification_meta_free (*meta);
+ av2notification_meta (aTHX_ pmeta, meta);
+ }
+
+ SPAGAIN;
+ if (0 < retvals) {
+ SV *tmp = POPs;
+
+ /* the exec callbacks return a status, while
+ * the others return a boolean value */
+ if (FC_CB_EXEC == cb_type)
+ ret = SvIV (tmp);
+ else if (! SvTRUE (tmp))
+ ret = -1;
+ }
+
+ PUTBACK;
+ FREETMPS;
+ LEAVE;
+
+ va_end (ap);
+ return ret;
+} /* static int fc_call (int, int, pfc_user_data_t *, ...) */
+
+static int fc_create (int type, const oconfig_item_t *ci, void **user_data)
+{
+ pfc_user_data_t *data;
+
+ int ret = 0;
+
+ dTHX;
+
+ if (NULL == perl_threads)
+ return 0;
+
+ if (NULL == aTHX) {
+ c_ithread_t *t = NULL;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+ t = c_ithread_create (perl_threads->head->interp);
+ pthread_mutex_unlock (&perl_threads->mutex);
+
+ aTHX = t->interp;
+ }
+
+ log_debug ("fc_create: c_ithread: interp = %p (active threads: %i)",
+ aTHX, perl_threads->number_of_threads);
+
+ if ((1 != ci->values_num)
+ || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
+ log_warn ("A \"%s\" block expects a single string argument.",
+ (FC_MATCH == type) ? "Match" : "Target");
+ return -1;
+ }
+
+ data = (pfc_user_data_t *)smalloc (sizeof (*data));
+ data->name = sstrdup (ci->values[0].value.string);
+ data->user_data = newSV (0);
+
+ ret = fc_call (aTHX_ type, FC_CB_CREATE, data, ci);
+
+ if (0 != ret)
+ PFC_USER_DATA_FREE (data);
+ else
+ *user_data = data;
+ return ret;
+} /* static int fc_create (int, const oconfig_item_t *, void **) */
+
+static int fc_destroy (int type, void **user_data)
+{
+ pfc_user_data_t *data = *(pfc_user_data_t **)user_data;
+
+ int ret = 0;
+
+ dTHX;
+
+ if ((NULL == perl_threads) || (NULL == data))
+ return 0;
+
+ if (NULL == aTHX) {
+ c_ithread_t *t = NULL;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+ t = c_ithread_create (perl_threads->head->interp);
+ pthread_mutex_unlock (&perl_threads->mutex);
+
+ aTHX = t->interp;
+ }
+
+ log_debug ("fc_destroy: c_ithread: interp = %p (active threads: %i)",
+ aTHX, perl_threads->number_of_threads);
+
+ ret = fc_call (aTHX_ type, FC_CB_DESTROY, data);
+
+ PFC_USER_DATA_FREE (data);
+ *user_data = NULL;
+ return ret;
+} /* static int fc_destroy (int, void **) */
+
+static int fc_exec (int type, const data_set_t *ds, const value_list_t *vl,
+ notification_meta_t **meta, void **user_data)
+{
+ pfc_user_data_t *data = *(pfc_user_data_t **)user_data;
+
+ dTHX;
+
+ if (NULL == perl_threads)
+ return 0;
+
+ assert (NULL != data);