+ if (NULL == perl_threads->tail) {
+ perl_threads->head = t;
+ t->prev = NULL;
+ }
+ else {
+ perl_threads->tail->next = t;
+ t->prev = perl_threads->tail;
+ }
+
+ t->pthread = pthread_self();
+ t->running = 0;
+ t->shutdown = 0;
+ perl_threads->tail = t;
+
+ pthread_setspecific (perl_thr_key, (const void *)t);
+ return t;
+} /* static c_ithread_t *c_ithread_create (PerlInterpreter *) */
+
+/*
+ * Filter chains implementation.
+ */
+
+static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
+{
+ int retvals = 0;
+
+ va_list ap;
+ int ret = 0;
+
+ notification_meta_t **meta = NULL;
+ AV *pmeta = NULL;
+
+ dSP;
+
+ if ((type < 0) || (type >= FC_TYPES))
+ return -1;
+
+ if ((cb_type < 0) || (cb_type >= FC_CB_TYPES))
+ return -1;
+
+ va_start (ap, data);
+
+ ENTER;
+ SAVETMPS;
+
+ PUSHMARK (SP);
+
+ XPUSHs (sv_2mortal (newSViv ((IV)type)));
+ XPUSHs (sv_2mortal (newSVpv (data->name, 0)));
+ XPUSHs (sv_2mortal (newSViv ((IV)cb_type)));
+
+ if (FC_CB_CREATE == cb_type) {
+ /*
+ * $_[0] = $ci;
+ * $_[1] = $user_data;
+ */
+ oconfig_item_t *ci;
+ HV *config = newHV ();
+
+ ci = va_arg (ap, oconfig_item_t *);
+
+ if (0 != oconfig_item2hv (aTHX_ ci, config)) {
+ hv_clear (config);
+ hv_undef (config);
+ config = (HV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)config)));
+ }
+ else if (FC_CB_DESTROY == cb_type) {
+ /*
+ * $_[1] = $user_data;
+ */
+
+ /* nothing to be done - the user data pointer
+ * is pushed onto the stack later */
+ }
+ else if (FC_CB_EXEC == cb_type) {
+ /*
+ * $_[0] = $ds;
+ * $_[1] = $vl;
+ * $_[2] = $meta;
+ * $_[3] = $user_data;
+ */
+ data_set_t *ds;
+ value_list_t *vl;
+
+ AV *pds = newAV ();
+ HV *pvl = newHV ();
+
+ ds = va_arg (ap, data_set_t *);
+ vl = va_arg (ap, value_list_t *);
+ meta = va_arg (ap, notification_meta_t **);
+
+ if (0 != data_set2av (aTHX_ ds, pds)) {
+ av_clear (pds);
+ av_undef (pds);
+ pds = (AV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ if (0 != value_list2hv (aTHX_ vl, ds, pvl)) {
+ hv_clear (pvl);
+ hv_undef (pvl);
+ pvl = (HV *)&PL_sv_undef;
+ ret = -1;
+ }
+
+ if (NULL != meta) {
+ pmeta = newAV ();
+
+ if (0 != notification_meta2av (aTHX_ *meta, pmeta)) {
+ av_clear (pmeta);
+ av_undef (pmeta);
+ pmeta = (AV *)&PL_sv_undef;
+ ret = -1;
+ }
+ }
+ else {
+ pmeta = (AV *)&PL_sv_undef;
+ }
+
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pds)));
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pvl)));
+ XPUSHs (sv_2mortal (newRV_noinc ((SV *)pmeta)));
+ }
+
+ XPUSHs (sv_2mortal (newRV_inc (data->user_data)));
+
+ PUTBACK;
+
+ retvals = call_pv_locked (aTHX_ "Collectd::fc_call");
+
+ if ((FC_CB_EXEC == cb_type) && (meta != NULL)) {
+ assert (pmeta != NULL);
+
+ plugin_notification_meta_free (*meta);
+ av2notification_meta (aTHX_ pmeta, meta);
+ }
+
+ SPAGAIN;
+ if (0 < retvals) {
+ SV *tmp = POPs;
+
+ /* the exec callbacks return a status, while
+ * the others return a boolean value */
+ if (FC_CB_EXEC == cb_type)
+ ret = SvIV (tmp);
+ else if (! SvTRUE (tmp))
+ ret = -1;
+ }
+
+ PUTBACK;
+ FREETMPS;
+ LEAVE;
+
+ va_end (ap);
+ return ret;
+} /* static int fc_call (int, int, pfc_user_data_t *, ...) */
+
+static int fc_create (int type, const oconfig_item_t *ci, void **user_data)
+{
+ pfc_user_data_t *data;
+
+ int ret = 0;
+
+ dTHX;
+
+ if (NULL == perl_threads)
+ return 0;
+
+ if (NULL == aTHX) {
+ c_ithread_t *t = NULL;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+ t = c_ithread_create (perl_threads->head->interp);
+ pthread_mutex_unlock (&perl_threads->mutex);
+
+ aTHX = t->interp;
+ }
+
+ log_debug ("fc_create: c_ithread: interp = %p (active threads: %i)",
+ aTHX, perl_threads->number_of_threads);
+
+ if ((1 != ci->values_num)
+ || (OCONFIG_TYPE_STRING != ci->values[0].type)) {
+ log_warn ("A \"%s\" block expects a single string argument.",
+ (FC_MATCH == type) ? "Match" : "Target");
+ return -1;
+ }
+
+ data = (pfc_user_data_t *)smalloc (sizeof (*data));
+ data->name = sstrdup (ci->values[0].value.string);
+ data->user_data = newSV (0);
+
+ ret = fc_call (aTHX_ type, FC_CB_CREATE, data, ci);
+
+ if (0 != ret)
+ PFC_USER_DATA_FREE (data);
+ else
+ *user_data = data;
+ return ret;
+} /* static int fc_create (int, const oconfig_item_t *, void **) */
+
+static int fc_destroy (int type, void **user_data)
+{
+ pfc_user_data_t *data = *(pfc_user_data_t **)user_data;
+
+ int ret = 0;
+
+ dTHX;
+
+ if ((NULL == perl_threads) || (NULL == data))
+ return 0;
+
+ if (NULL == aTHX) {
+ c_ithread_t *t = NULL;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+ t = c_ithread_create (perl_threads->head->interp);
+ pthread_mutex_unlock (&perl_threads->mutex);
+
+ aTHX = t->interp;
+ }
+
+ log_debug ("fc_destroy: c_ithread: interp = %p (active threads: %i)",
+ aTHX, perl_threads->number_of_threads);
+
+ ret = fc_call (aTHX_ type, FC_CB_DESTROY, data);
+
+ PFC_USER_DATA_FREE (data);
+ *user_data = NULL;
+ return ret;
+} /* static int fc_destroy (int, void **) */
+
+static int fc_exec (int type, const data_set_t *ds, const value_list_t *vl,
+ notification_meta_t **meta, void **user_data)
+{
+ pfc_user_data_t *data = *(pfc_user_data_t **)user_data;
+
+ dTHX;
+
+ if (NULL == perl_threads)
+ return 0;
+
+ assert (NULL != data);
+
+ if (NULL == aTHX) {
+ c_ithread_t *t = NULL;
+
+ pthread_mutex_lock (&perl_threads->mutex);
+ t = c_ithread_create (perl_threads->head->interp);
+ pthread_mutex_unlock (&perl_threads->mutex);
+
+ aTHX = t->interp;
+ }
+
+ log_debug ("fc_exec: c_ithread: interp = %p (active threads: %i)",
+ aTHX, perl_threads->number_of_threads);
+
+ return fc_call (aTHX_ type, FC_CB_EXEC, data, ds, vl, meta);
+} /* static int fc_exec (int, const data_set_t *, const value_list_t *,
+ notification_meta_t **, void **) */
+
+static int pmatch_create (const oconfig_item_t *ci, void **user_data)
+{
+ return fc_create (FC_MATCH, ci, user_data);
+} /* static int pmatch_create (const oconfig_item_t *, void **) */
+
+static int pmatch_destroy (void **user_data)
+{
+ return fc_destroy (FC_MATCH, user_data);
+} /* static int pmatch_destroy (void **) */
+
+static int pmatch_match (const data_set_t *ds, const value_list_t *vl,
+ notification_meta_t **meta, void **user_data)
+{
+ return fc_exec (FC_MATCH, ds, vl, meta, user_data);
+} /* static int pmatch_match (const data_set_t *, const value_list_t *,
+ notification_meta_t **, void **) */
+
+static match_proc_t pmatch = {
+ pmatch_create, pmatch_destroy, pmatch_match
+};
+
+static int ptarget_create (const oconfig_item_t *ci, void **user_data)
+{
+ return fc_create (FC_TARGET, ci, user_data);
+} /* static int ptarget_create (const oconfig_item_t *, void **) */
+
+static int ptarget_destroy (void **user_data)
+{
+ return fc_destroy (FC_TARGET, user_data);
+} /* static int ptarget_destroy (void **) */
+
+static int ptarget_invoke (const data_set_t *ds, value_list_t *vl,
+ notification_meta_t **meta, void **user_data)
+{
+ return fc_exec (FC_TARGET, ds, vl, meta, user_data);
+} /* static int ptarget_invoke (const data_set_t *, value_list_t *,
+ notification_meta_t **, void **) */
+
+static target_proc_t ptarget = {
+ ptarget_create, ptarget_destroy, ptarget_invoke
+};
+
+/*
+ * Exported Perl API.
+ */
+
+/*
+ * Collectd::plugin_register_data_set (type, dataset).
+ *
+ * type:
+ * type of the dataset
+ *
+ * dataset:
+ * dataset to be registered
+ */
+static XS (Collectd_plugin_register_ds)
+{
+ SV *data = NULL;
+ int ret = 0;
+
+ dXSARGS;
+
+ log_warn ("Using plugin_register() to register new data-sets is "
+ "deprecated - add new entries to a custom types.db instead.");
+
+ if (2 != items) {
+ log_err ("Usage: Collectd::plugin_register_data_set(type, dataset)");
+ XSRETURN_EMPTY;
+ }
+
+ log_debug ("Collectd::plugin_register_data_set: "
+ "type = \"%s\", dataset = \"%s\"",
+ SvPV_nolen (ST (0)), SvPV_nolen (ST (1)));
+
+ data = ST (1);
+
+ if (SvROK (data) && (SVt_PVAV == SvTYPE (SvRV (data)))) {
+ ret = pplugin_register_data_set (aTHX_ SvPV_nolen (ST (0)),
+ (AV *)SvRV (data));
+ }
+ else {
+ log_err ("Collectd::plugin_register_data_set: Invalid data.");
+ XSRETURN_EMPTY;
+ }
+
+ if (0 == ret)
+ XSRETURN_YES;
+ else
+ XSRETURN_EMPTY;
+} /* static XS (Collectd_plugin_register_ds) */
+
+/*
+ * Collectd::plugin_unregister_data_set (type).
+ *
+ * type:
+ * type of the dataset
+ */
+static XS (Collectd_plugin_unregister_ds)
+{
+ dXSARGS;
+
+ if (1 != items) {
+ log_err ("Usage: Collectd::plugin_unregister_data_set(type)");
+ XSRETURN_EMPTY;
+ }
+
+ log_debug ("Collectd::plugin_unregister_data_set: type = \"%s\"",
+ SvPV_nolen (ST (0)));
+
+ if (0 == pplugin_unregister_data_set (SvPV_nolen (ST (0))))
+ XSRETURN_YES;
+ else
+ XSRETURN_EMPTY;
+} /* static XS (Collectd_plugin_register_ds) */
+
+/*
+ * Collectd::plugin_dispatch_values (name, values).
+ *
+ * name:
+ * name of the plugin
+ *
+ * values:
+ * value list to submit
+ */
+static XS (Collectd_plugin_dispatch_values)
+{
+ SV *values = NULL;
+ int values_idx = 0;
+
+ int ret = 0;
+
+ dXSARGS;
+
+ if (2 == items) {
+ log_warn ("Collectd::plugin_dispatch_values with two arguments "
+ "is deprecated - pass the type through values->{type}.");
+ values_idx = 1;
+ }
+ else if (1 != items) {
+ log_err ("Usage: Collectd::plugin_dispatch_values(values)");
+ XSRETURN_EMPTY;
+ }
+
+ log_debug ("Collectd::plugin_dispatch_values: values=\"%s\"",
+ SvPV_nolen (ST (values_idx)));
+
+ values = ST (values_idx);
+
+ if (! (SvROK (values) && (SVt_PVHV == SvTYPE (SvRV (values))))) {
+ log_err ("Collectd::plugin_dispatch_values: Invalid values.");
+ XSRETURN_EMPTY;
+ }
+
+ if (((2 == items) && (NULL == ST (0))) || (NULL == values))
+ XSRETURN_EMPTY;
+
+ if ((2 == items) && (NULL == hv_store ((HV *)SvRV (values), "type", 4,
+ newSVsv (ST (0)), 0))) {
+ log_err ("Collectd::plugin_dispatch_values: Could not store type.");
+ XSRETURN_EMPTY;
+ }
+
+ ret = pplugin_dispatch_values (aTHX_ (HV *)SvRV (values));
+
+ if (0 == ret)
+ XSRETURN_YES;
+ else
+ XSRETURN_EMPTY;
+} /* static XS (Collectd_plugin_dispatch_values) */
+
+/* Collectd::plugin_write (plugin, ds, vl).