X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fperl.c;h=4daa7621e4bec0d0c89103aa6f94eb452a9d9e76;hb=82f6ebad0250c0b8cd0cdf7453fe427fd7b38135;hp=c0326943298837d42741afb9acb65276081fa6b2;hpb=34a35d78bb0ec655f9b8cf6f464fedbe2877470b;p=collectd.git diff --git a/src/perl.c b/src/perl.c index c0326943..4daa7621 100644 --- a/src/perl.c +++ b/src/perl.c @@ -1,6 +1,6 @@ -/** +/* * collectd - src/perl.c - * Copyright (C) 2007, 2008 Sebastian Harl + * Copyright (C) 2007-2009 Sebastian Harl * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -51,6 +51,8 @@ #include "plugin.h" #include "common.h" +#include "filter_chain.h" + #include #if !defined(USE_ITHREADS) @@ -74,6 +76,17 @@ #define PLUGIN_CONFIG 254 #define PLUGIN_DATASET 255 +#define FC_MATCH 0 +#define FC_TARGET 1 + +#define FC_TYPES 2 + +#define FC_CB_CREATE 0 +#define FC_CB_DESTROY 1 +#define FC_CB_EXEC 2 + +#define FC_CB_TYPES 3 + #define log_debug(...) DEBUG ("perl: " __VA_ARGS__) #define log_info(...) INFO ("perl: " __VA_ARGS__) #define log_warn(...) WARNING ("perl: " __VA_ARGS__) @@ -85,9 +98,11 @@ void boot_DynaLoader (PerlInterpreter *, CV *); static XS (Collectd_plugin_register_ds); static XS (Collectd_plugin_unregister_ds); static XS (Collectd_plugin_dispatch_values); +static XS (Collectd__plugin_write); static XS (Collectd__plugin_flush); static XS (Collectd_plugin_dispatch_notification); static XS (Collectd_plugin_log); +static XS (Collectd__fc_register); static XS (Collectd_call_by_name); /* @@ -115,6 +130,25 @@ typedef struct { pthread_mutex_t mutex; } c_ithread_list_t; +/* name / user_data for Perl matches / targets */ +typedef struct { + char *name; + SV *user_data; +} pfc_user_data_t; + +#define PFC_USER_DATA_FREE(data) \ + do { \ + sfree ((data)->name); \ + if (NULL != (data)->user_data) \ + sv_free ((data)->user_data); \ + sfree (data); \ + } while (0) + +/* + * Public variable + */ +extern char **environ; + /* * private variables */ @@ -139,10 +173,12 @@ static struct { { "Collectd::plugin_register_data_set", Collectd_plugin_register_ds }, { "Collectd::plugin_unregister_data_set", Collectd_plugin_unregister_ds }, { "Collectd::plugin_dispatch_values", Collectd_plugin_dispatch_values }, + { "Collectd::_plugin_write", Collectd__plugin_write }, { "Collectd::_plugin_flush", Collectd__plugin_flush }, { "Collectd::plugin_dispatch_notification", Collectd_plugin_dispatch_notification }, { "Collectd::plugin_log", Collectd_plugin_log }, + { "Collectd::_fc_register", Collectd__fc_register }, { "Collectd::call_by_name", Collectd_call_by_name }, { "", NULL } }; @@ -152,25 +188,37 @@ struct { int value; } constants[] = { - { "Collectd::TYPE_INIT", PLUGIN_INIT }, - { "Collectd::TYPE_READ", PLUGIN_READ }, - { "Collectd::TYPE_WRITE", PLUGIN_WRITE }, - { "Collectd::TYPE_SHUTDOWN", PLUGIN_SHUTDOWN }, - { "Collectd::TYPE_LOG", PLUGIN_LOG }, - { "Collectd::TYPE_NOTIF", PLUGIN_NOTIF }, - { "Collectd::TYPE_FLUSH", PLUGIN_FLUSH }, - { "Collectd::TYPE_CONFIG", PLUGIN_CONFIG }, - { "Collectd::TYPE_DATASET", PLUGIN_DATASET }, - { "Collectd::DS_TYPE_COUNTER", DS_TYPE_COUNTER }, - { "Collectd::DS_TYPE_GAUGE", DS_TYPE_GAUGE }, - { "Collectd::LOG_ERR", LOG_ERR }, - { "Collectd::LOG_WARNING", LOG_WARNING }, - { "Collectd::LOG_NOTICE", LOG_NOTICE }, - { "Collectd::LOG_INFO", LOG_INFO }, - { "Collectd::LOG_DEBUG", LOG_DEBUG }, - { "Collectd::NOTIF_FAILURE", NOTIF_FAILURE }, - { "Collectd::NOTIF_WARNING", NOTIF_WARNING }, - { "Collectd::NOTIF_OKAY", NOTIF_OKAY }, + { "Collectd::TYPE_INIT", PLUGIN_INIT }, + { "Collectd::TYPE_READ", PLUGIN_READ }, + { "Collectd::TYPE_WRITE", PLUGIN_WRITE }, + { "Collectd::TYPE_SHUTDOWN", PLUGIN_SHUTDOWN }, + { "Collectd::TYPE_LOG", PLUGIN_LOG }, + { "Collectd::TYPE_NOTIF", PLUGIN_NOTIF }, + { "Collectd::TYPE_FLUSH", PLUGIN_FLUSH }, + { "Collectd::TYPE_CONFIG", PLUGIN_CONFIG }, + { "Collectd::TYPE_DATASET", PLUGIN_DATASET }, + { "Collectd::DS_TYPE_COUNTER", DS_TYPE_COUNTER }, + { "Collectd::DS_TYPE_GAUGE", DS_TYPE_GAUGE }, + { "Collectd::DS_TYPE_DERIVE", DS_TYPE_DERIVE }, + { "Collectd::DS_TYPE_ABSOLUTE", DS_TYPE_ABSOLUTE }, + { "Collectd::LOG_ERR", LOG_ERR }, + { "Collectd::LOG_WARNING", LOG_WARNING }, + { "Collectd::LOG_NOTICE", LOG_NOTICE }, + { "Collectd::LOG_INFO", LOG_INFO }, + { "Collectd::LOG_DEBUG", LOG_DEBUG }, + { "Collectd::FC_MATCH", FC_MATCH }, + { "Collectd::FC_TARGET", FC_TARGET }, + { "Collectd::FC_CB_CREATE", FC_CB_CREATE }, + { "Collectd::FC_CB_DESTROY", FC_CB_DESTROY }, + { "Collectd::FC_CB_EXEC", FC_CB_EXEC }, + { "Collectd::FC_MATCH_NO_MATCH", FC_MATCH_NO_MATCH }, + { "Collectd::FC_MATCH_MATCHES", FC_MATCH_MATCHES }, + { "Collectd::FC_TARGET_CONTINUE", FC_TARGET_CONTINUE }, + { "Collectd::FC_TARGET_STOP", FC_TARGET_STOP }, + { "Collectd::FC_TARGET_RETURN", FC_TARGET_RETURN }, + { "Collectd::NOTIF_FAILURE", NOTIF_FAILURE }, + { "Collectd::NOTIF_WARNING", NOTIF_WARNING }, + { "Collectd::NOTIF_OKAY", NOTIF_OKAY }, { "", 0 } }; @@ -226,7 +274,10 @@ static int hv2data_source (pTHX_ HV *hash, data_source_t *ds) if (NULL != (tmp = hv_fetch (hash, "type", 4, 0))) { ds->type = SvIV (*tmp); - if ((DS_TYPE_COUNTER != ds->type) && (DS_TYPE_GAUGE != ds->type)) { + if ((DS_TYPE_COUNTER != ds->type) + && (DS_TYPE_GAUGE != ds->type) + && (DS_TYPE_DERIVE != ds->type) + && (DS_TYPE_ABSOLUTE != ds->type)) { log_err ("hv2data_source: Invalid DS type."); return -1; } @@ -279,8 +330,12 @@ static int av2value (pTHX_ char *name, AV *array, value_t *value, int len) if (NULL != tmp) { if (DS_TYPE_COUNTER == ds->ds[i].type) value[i].counter = SvIV (*tmp); - else + else if (DS_TYPE_GAUGE == ds->ds[i].type) value[i].gauge = SvNV (*tmp); + else if (DS_TYPE_DERIVE == ds->ds[i].type) + value[i].derive = SvIV (*tmp); + else if (DS_TYPE_ABSOLUTE == ds->ds[i].type) + value[i].absolute = SvIV (*tmp); } else { return -1; @@ -337,16 +392,16 @@ static int hv2value_list (pTHX_ HV *hash, value_list_t *vl) } } - if (NULL != (tmp = hv_fetch (hash, "time", 4, 0))) { + if (NULL != (tmp = hv_fetch (hash, "time", 4, 0))) vl->time = (time_t)SvIV (*tmp); - } - if (NULL != (tmp = hv_fetch (hash, "host", 4, 0))) { + if (NULL != (tmp = hv_fetch (hash, "interval", 8, 0))) + vl->interval = SvIV (*tmp); + + if (NULL != (tmp = hv_fetch (hash, "host", 4, 0))) sstrncpy (vl->host, SvPV_nolen (*tmp), sizeof (vl->host)); - } - else { + else sstrncpy (vl->host, hostname_g, sizeof (vl->host)); - } if (NULL != (tmp = hv_fetch (hash, "plugin", 6, 0))) sstrncpy (vl->plugin, SvPV_nolen (*tmp), sizeof (vl->plugin)); @@ -403,6 +458,139 @@ static int av2data_set (pTHX_ AV *array, char *name, data_set_t *ds) return 0; } /* static int av2data_set (pTHX_ AV *, data_set_t *) */ +/* + * notification: + * { + * severity => $severity, + * time => $time, + * message => $msg, + * host => $host, + * plugin => $plugin, + * type => $type, + * plugin_instance => $instance, + * type_instance => $type_instance, + * meta => [ { name => , value => }, ... ] + * } + */ +static int av2notification_meta (pTHX_ AV *array, notification_meta_t **meta) +{ + notification_meta_t **m = meta; + + int len = av_len (array); + int i; + + for (i = 0; i <= len; ++i) { + SV **tmp = av_fetch (array, i, 0); + HV *hash; + + if (NULL == tmp) + return -1; + + if (! (SvROK (*tmp) && (SVt_PVHV == SvTYPE (SvRV (*tmp))))) { + log_warn ("av2notification_meta: Skipping invalid " + "meta information."); + continue; + } + + hash = (HV *)SvRV (*tmp); + + *m = (notification_meta_t *)smalloc (sizeof (**m)); + + if (NULL == (tmp = hv_fetch (hash, "name", 4, 0))) { + log_warn ("av2notification_meta: Skipping invalid " + "meta information."); + free (*m); + continue; + } + sstrncpy ((*m)->name, SvPV_nolen (*tmp), sizeof ((*m)->name)); + + if (NULL == (tmp = hv_fetch (hash, "value", 5, 0))) { + log_warn ("av2notification_meta: Skipping invalid " + "meta information."); + free ((*m)->name); + free (*m); + continue; + } + + if (SvNOK (*tmp)) { + (*m)->nm_value.nm_double = SvNVX (*tmp); + (*m)->type = NM_TYPE_DOUBLE; + } + else if (SvUOK (*tmp)) { + (*m)->nm_value.nm_unsigned_int = SvUVX (*tmp); + (*m)->type = NM_TYPE_UNSIGNED_INT; + } + else if (SvIOK (*tmp)) { + (*m)->nm_value.nm_signed_int = SvIVX (*tmp); + (*m)->type = NM_TYPE_SIGNED_INT; + } + else { + (*m)->nm_value.nm_string = sstrdup (SvPV_nolen (*tmp)); + (*m)->type = NM_TYPE_STRING; + } + + (*m)->next = NULL; + m = &((*m)->next); + } + return 0; +} /* static int av2notification_meta (AV *, notification_meta_t *) */ + +static int hv2notification (pTHX_ HV *hash, notification_t *n) +{ + SV **tmp = NULL; + + if ((NULL == hash) || (NULL == n)) + return -1; + + if (NULL != (tmp = hv_fetch (hash, "severity", 8, 0))) + n->severity = SvIV (*tmp); + else + n->severity = NOTIF_FAILURE; + + if (NULL != (tmp = hv_fetch (hash, "time", 4, 0))) + n->time = (time_t)SvIV (*tmp); + else + n->time = time (NULL); + + if (NULL != (tmp = hv_fetch (hash, "message", 7, 0))) + sstrncpy (n->message, SvPV_nolen (*tmp), sizeof (n->message)); + + if (NULL != (tmp = hv_fetch (hash, "host", 4, 0))) + sstrncpy (n->host, SvPV_nolen (*tmp), sizeof (n->host)); + else + sstrncpy (n->host, hostname_g, sizeof (n->host)); + + if (NULL != (tmp = hv_fetch (hash, "plugin", 6, 0))) + sstrncpy (n->plugin, SvPV_nolen (*tmp), sizeof (n->plugin)); + + if (NULL != (tmp = hv_fetch (hash, "plugin_instance", 15, 0))) + sstrncpy (n->plugin_instance, SvPV_nolen (*tmp), + sizeof (n->plugin_instance)); + + if (NULL != (tmp = hv_fetch (hash, "type", 4, 0))) + sstrncpy (n->type, SvPV_nolen (*tmp), sizeof (n->type)); + + if (NULL != (tmp = hv_fetch (hash, "type_instance", 13, 0))) + sstrncpy (n->type_instance, SvPV_nolen (*tmp), + sizeof (n->type_instance)); + + n->meta = NULL; + while (NULL != (tmp = hv_fetch (hash, "meta", 4, 0))) { + if (! (SvROK (*tmp) && (SVt_PVAV == SvTYPE (SvRV (*tmp))))) { + log_warn ("hv2notification: Ignoring invalid meta information."); + break; + } + + if (0 != av2notification_meta (aTHX_ (AV *)SvRV (*tmp), &n->meta)) { + plugin_notification_meta_free (n->meta); + n->meta = NULL; + return -1; + } + break; + } + return 0; +} /* static int hv2notification (pTHX_ HV *, notification_t *) */ + static int data_set2av (pTHX_ data_set_t *ds, AV *array) { int i = 0; @@ -463,8 +651,12 @@ static int value_list2hv (pTHX_ value_list_t *vl, data_set_t *ds, HV *hash) if (DS_TYPE_COUNTER == ds->ds[i].type) val = newSViv (vl->values[i].counter); - else + else if (DS_TYPE_GAUGE == ds->ds[i].type) val = newSVnv (vl->values[i].gauge); + else if (DS_TYPE_DERIVE == ds->ds[i].type) + val = newSViv (vl->values[i].derive); + else if (DS_TYPE_ABSOLUTE == ds->ds[i].type) + val = newSViv (vl->values[i].absolute); if (NULL == av_store (values, i, val)) { av_undef (values); @@ -479,6 +671,9 @@ static int value_list2hv (pTHX_ value_list_t *vl, data_set_t *ds, HV *hash) if (NULL == hv_store (hash, "time", 4, newSViv (vl->time), 0)) return -1; + if (NULL == hv_store (hash, "interval", 8, newSViv (vl->interval), 0)) + return -1; + if ('\0' != vl->host[0]) if (NULL == hv_store (hash, "host", 4, newSVpv (vl->host, 0), 0)) return -1; @@ -503,6 +698,52 @@ static int value_list2hv (pTHX_ value_list_t *vl, data_set_t *ds, HV *hash) return 0; } /* static int value2av (value_list_t *, data_set_t *, HV *) */ +static int notification_meta2av (pTHX_ notification_meta_t *meta, AV *array) +{ + int meta_num = 0; + int i; + + while (meta) { + ++meta_num; + meta = meta->next; + } + + av_extend (array, meta_num); + + for (i = 0; NULL != meta; meta = meta->next, ++i) { + HV *m = newHV (); + SV *value; + + if (NULL == hv_store (m, "name", 4, newSVpv (meta->name, 0), 0)) + return -1; + + if (NM_TYPE_STRING == meta->type) + value = newSVpv (meta->nm_value.nm_string, 0); + else if (NM_TYPE_SIGNED_INT == meta->type) + value = newSViv (meta->nm_value.nm_signed_int); + else if (NM_TYPE_UNSIGNED_INT == meta->type) + value = newSVuv (meta->nm_value.nm_unsigned_int); + else if (NM_TYPE_DOUBLE == meta->type) + value = newSVnv (meta->nm_value.nm_double); + else if (NM_TYPE_BOOLEAN == meta->type) + value = meta->nm_value.nm_boolean ? &PL_sv_yes : &PL_sv_no; + else + return -1; + + if (NULL == hv_store (m, "value", 5, value, 0)) { + sv_free (value); + return -1; + } + + if (NULL == av_store (array, i, newRV_noinc ((SV *)m))) { + hv_clear (m); + hv_undef (m); + return -1; + } + } + return 0; +} /* static int notification_meta2av (notification_meta_t *, AV *) */ + static int notification2hv (pTHX_ notification_t *n, HV *hash) { if (NULL == hv_store (hash, "severity", 8, newSViv (n->severity), 0)) @@ -537,6 +778,17 @@ static int notification2hv (pTHX_ notification_t *n, HV *hash) if (NULL == hv_store (hash, "type_instance", 13, newSVpv (n->type_instance, 0), 0)) return -1; + + if (NULL != n->meta) { + AV *meta = newAV (); + if ((0 != notification_meta2av (aTHX_ n->meta, meta)) + || (NULL == hv_store (hash, "meta", 4, + newRV_noinc ((SV *)meta), 0))) { + av_clear (meta); + av_undef (meta); + return -1; + } + } return 0; } /* static int notification2hv (notification_t *, HV *) */ @@ -683,62 +935,56 @@ static int pplugin_dispatch_values (pTHX_ HV *values) } /* static int pplugin_dispatch_values (char *, HV *) */ /* - * Dispatch a notification. - * - * notification: - * { - * severity => $severity, - * time => $time, - * message => $msg, - * host => $host, - * plugin => $plugin, - * type => $type, - * plugin_instance => $instance, - * type_instance => $type_instance - * } + * Submit the values to a single write function. */ -static int pplugin_dispatch_notification (pTHX_ HV *notif) +static int pplugin_write (pTHX_ const char *plugin, AV *data_set, HV *values) { - notification_t n; + data_set_t ds; + value_list_t vl = VALUE_LIST_INIT; - SV **tmp = NULL; + int ret; - if (NULL == notif) + if (NULL == values) return -1; - memset (&n, 0, sizeof (n)); + if (0 != hv2value_list (aTHX_ values, &vl)) + return -1; - if (NULL != (tmp = hv_fetch (notif, "severity", 8, 0))) - n.severity = SvIV (*tmp); - else - n.severity = NOTIF_FAILURE; + if ((NULL != data_set) + && (0 != av2data_set (aTHX_ data_set, vl.type, &ds))) + return -1; - if (NULL != (tmp = hv_fetch (notif, "time", 4, 0))) - n.time = (time_t)SvIV (*tmp); - else - n.time = time (NULL); + ret = plugin_write (plugin, NULL == data_set ? NULL : &ds, &vl); + if (0 != ret) + log_warn ("Dispatching value to plugin \"%s\" failed with status %i.", + NULL == plugin ? "" : plugin, ret); + + if (NULL != data_set) + sfree (ds.ds); + sfree (vl.values); + return ret; +} /* static int pplugin_write (const char *plugin, HV *, HV *) */ - if (NULL != (tmp = hv_fetch (notif, "message", 7, 0))) - sstrncpy (n.message, SvPV_nolen (*tmp), sizeof (n.message)); +/* + * Dispatch a notification. + */ +static int pplugin_dispatch_notification (pTHX_ HV *notif) +{ + notification_t n; - if (NULL != (tmp = hv_fetch (notif, "host", 4, 0))) - sstrncpy (n.host, SvPV_nolen (*tmp), sizeof (n.host)); - else - sstrncpy (n.host, hostname_g, sizeof (n.host)); + int ret; - if (NULL != (tmp = hv_fetch (notif, "plugin", 6, 0))) - sstrncpy (n.plugin, SvPV_nolen (*tmp), sizeof (n.plugin)); + if (NULL == notif) + return -1; - if (NULL != (tmp = hv_fetch (notif, "plugin_instance", 15, 0))) - sstrncpy (n.plugin_instance, SvPV_nolen (*tmp), - sizeof (n.plugin_instance)); + memset (&n, 0, sizeof (n)); - if (NULL != (tmp = hv_fetch (notif, "type", 4, 0))) - sstrncpy (n.type, SvPV_nolen (*tmp), sizeof (n.type)); + if (0 != hv2notification (aTHX_ notif, &n)) + return -1; - if (NULL != (tmp = hv_fetch (notif, "type_instance", 13, 0))) - sstrncpy (n.type_instance, SvPV_nolen (*tmp), sizeof (n.type_instance)); - return plugin_dispatch_notification (&n); + ret = plugin_dispatch_notification (&n); + plugin_notification_meta_free (n.meta); + return ret; } /* static int pplugin_dispatch_notification (HV *) */ /* @@ -803,14 +1049,14 @@ static int pplugin_call_all (pTHX_ int type, ...) if (-1 == data_set2av (aTHX_ ds, pds)) { av_clear (pds); av_undef (pds); - pds = Nullav; + pds = (AV *)&PL_sv_undef; ret = -1; } if (-1 == value_list2hv (aTHX_ vl, ds, pvl)) { hv_clear (pvl); hv_undef (pvl); - pvl = Nullhv; + pvl = (HV *)&PL_sv_undef; ret = -1; } @@ -849,7 +1095,7 @@ static int pplugin_call_all (pTHX_ int type, ...) if (-1 == notification2hv (aTHX_ n, notif)) { hv_clear (notif); hv_undef (notif); - notif = Nullhv; + notif = (HV *)&PL_sv_undef; ret = -1; } @@ -884,92 +1130,491 @@ static int pplugin_call_all (pTHX_ int type, ...) } /* static int pplugin_call_all (int, ...) */ /* - * Exported Perl API. - */ - -/* - * Collectd::plugin_register_data_set (type, dataset). - * - * type: - * type of the dataset + * collectd's perl interpreter based thread implementation. * - * dataset: - * dataset to be registered + * This has been inspired by Perl's ithreads introduced in version 5.6.0. */ -static XS (Collectd_plugin_register_ds) + +/* must be called with perl_threads->mutex locked */ +static void c_ithread_destroy (c_ithread_t *ithread) { - SV *data = NULL; - int ret = 0; + dTHXa (ithread->interp); - dXSARGS; + assert (NULL != perl_threads); - if (2 != items) { - log_err ("Usage: Collectd::plugin_register_data_set(type, dataset)"); - XSRETURN_EMPTY; - } + PERL_SET_CONTEXT (aTHX); + log_debug ("Shutting down Perl interpreter %p...", aTHX); - log_debug ("Collectd::plugin_register_data_set: " - "type = \"%s\", dataset = \"%s\"", - SvPV_nolen (ST (0)), SvPV_nolen (ST (1))); +#if COLLECT_DEBUG + sv_report_used (); - data = ST (1); + --perl_threads->number_of_threads; +#endif /* COLLECT_DEBUG */ - if (SvROK (data) && (SVt_PVAV == SvTYPE (SvRV (data)))) { - ret = pplugin_register_data_set (aTHX_ SvPV_nolen (ST (0)), - (AV *)SvRV (data)); - } - else { - log_err ("Collectd::plugin_register_data_set: Invalid data."); - XSRETURN_EMPTY; - } + perl_destruct (aTHX); + perl_free (aTHX); - if (0 == ret) - XSRETURN_YES; + if (NULL == ithread->prev) + perl_threads->head = ithread->next; else - XSRETURN_EMPTY; -} /* static XS (Collectd_plugin_register_ds) */ + ithread->prev->next = ithread->next; -/* - * Collectd::plugin_unregister_data_set (type). - * - * type: - * type of the dataset - */ -static XS (Collectd_plugin_unregister_ds) -{ - dXSARGS; + if (NULL == ithread->next) + perl_threads->tail = ithread->prev; + else + ithread->next->prev = ithread->prev; - if (1 != items) { - log_err ("Usage: Collectd::plugin_unregister_data_set(type)"); - XSRETURN_EMPTY; - } + sfree (ithread); + return; +} /* static void c_ithread_destroy (c_ithread_t *) */ - log_debug ("Collectd::plugin_unregister_data_set: type = \"%s\"", - SvPV_nolen (ST (0))); +static void c_ithread_destructor (void *arg) +{ + c_ithread_t *ithread = (c_ithread_t *)arg; + c_ithread_t *t = NULL; - if (0 == pplugin_unregister_data_set (SvPV_nolen (ST (0)))) - XSRETURN_YES; - else - XSRETURN_EMPTY; -} /* static XS (Collectd_plugin_register_ds) */ + if (NULL == perl_threads) + return; -/* - * Collectd::plugin_dispatch_values (name, values). - * - * name: - * name of the plugin - * - * values: - * value list to submit - */ -static XS (Collectd_plugin_dispatch_values) -{ - SV *values = NULL; - int values_idx = 0; + pthread_mutex_lock (&perl_threads->mutex); - int ret = 0; + for (t = perl_threads->head; NULL != t; t = t->next) + if (t == ithread) + break; - dXSARGS; + /* the ithread no longer exists */ + if (NULL == t) + return; + + c_ithread_destroy (ithread); + + pthread_mutex_unlock (&perl_threads->mutex); + return; +} /* static void c_ithread_destructor (void *) */ + +/* must be called with perl_threads->mutex locked */ +static c_ithread_t *c_ithread_create (PerlInterpreter *base) +{ + c_ithread_t *t = NULL; + dTHXa (NULL); + + assert (NULL != perl_threads); + + t = (c_ithread_t *)smalloc (sizeof (c_ithread_t)); + memset (t, 0, sizeof (c_ithread_t)); + + t->interp = (NULL == base) + ? NULL + : perl_clone (base, CLONEf_KEEP_PTR_TABLE); + + aTHX = t->interp; + + if ((NULL != base) && (NULL != PL_endav)) { + av_clear (PL_endav); + av_undef (PL_endav); + PL_endav = Nullav; + } + +#if COLLECT_DEBUG + ++perl_threads->number_of_threads; +#endif /* COLLECT_DEBUG */ + + t->next = NULL; + + if (NULL == perl_threads->tail) { + perl_threads->head = t; + t->prev = NULL; + } + else { + perl_threads->tail->next = t; + t->prev = perl_threads->tail; + } + + perl_threads->tail = t; + + pthread_setspecific (perl_thr_key, (const void *)t); + return t; +} /* static c_ithread_t *c_ithread_create (PerlInterpreter *) */ + +/* + * Filter chains implementation. + */ + +static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...) +{ + int retvals = 0; + + va_list ap; + int ret = 0; + + notification_meta_t **meta = NULL; + AV *pmeta = NULL; + + dSP; + + if ((type < 0) || (type >= FC_TYPES)) + return -1; + + if ((cb_type < 0) || (cb_type >= FC_CB_TYPES)) + return -1; + + va_start (ap, data); + + ENTER; + SAVETMPS; + + PUSHMARK (SP); + + XPUSHs (sv_2mortal (newSViv ((IV)type))); + XPUSHs (sv_2mortal (newSVpv (data->name, 0))); + XPUSHs (sv_2mortal (newSViv ((IV)cb_type))); + + if (FC_CB_CREATE == cb_type) { + /* + * $_[0] = $ci; + * $_[1] = $user_data; + */ + oconfig_item_t *ci; + HV *config = newHV (); + + ci = va_arg (ap, oconfig_item_t *); + + if (0 != oconfig_item2hv (aTHX_ ci, config)) { + hv_clear (config); + hv_undef (config); + config = (HV *)&PL_sv_undef; + ret = -1; + } + + XPUSHs (sv_2mortal (newRV_noinc ((SV *)config))); + } + else if (FC_CB_DESTROY == cb_type) { + /* + * $_[1] = $user_data; + */ + + /* nothing to be done - the user data pointer + * is pushed onto the stack later */ + } + else if (FC_CB_EXEC == cb_type) { + /* + * $_[0] = $ds; + * $_[1] = $vl; + * $_[2] = $meta; + * $_[3] = $user_data; + */ + data_set_t *ds; + value_list_t *vl; + + AV *pds = newAV (); + HV *pvl = newHV (); + + ds = va_arg (ap, data_set_t *); + vl = va_arg (ap, value_list_t *); + meta = va_arg (ap, notification_meta_t **); + + if (0 != data_set2av (aTHX_ ds, pds)) { + av_clear (pds); + av_undef (pds); + pds = (AV *)&PL_sv_undef; + ret = -1; + } + + if (0 != value_list2hv (aTHX_ vl, ds, pvl)) { + hv_clear (pvl); + hv_undef (pvl); + pvl = (HV *)&PL_sv_undef; + ret = -1; + } + + if (NULL != meta) { + pmeta = newAV (); + + if (0 != notification_meta2av (aTHX_ *meta, pmeta)) { + av_clear (pmeta); + av_undef (pmeta); + pmeta = (AV *)&PL_sv_undef; + ret = -1; + } + } + else { + pmeta = (AV *)&PL_sv_undef; + } + + XPUSHs (sv_2mortal (newRV_noinc ((SV *)pds))); + XPUSHs (sv_2mortal (newRV_noinc ((SV *)pvl))); + XPUSHs (sv_2mortal (newRV_noinc ((SV *)pmeta))); + } + + XPUSHs (sv_2mortal (newRV_inc (data->user_data))); + + PUTBACK; + + retvals = call_pv ("Collectd::fc_call", G_SCALAR); + + if ((FC_CB_EXEC == cb_type) && (meta != NULL)) { + assert (pmeta != NULL); + + plugin_notification_meta_free (*meta); + av2notification_meta (aTHX_ pmeta, meta); + } + + SPAGAIN; + if (0 < retvals) { + SV *tmp = POPs; + + /* the exec callbacks return a status, while + * the others return a boolean value */ + if (FC_CB_EXEC == cb_type) + ret = SvIV (tmp); + else if (! SvTRUE (tmp)) + ret = -1; + } + + PUTBACK; + FREETMPS; + LEAVE; + + va_end (ap); + return ret; +} /* static int fc_call (int, int, pfc_user_data_t *, ...) */ + +static int fc_create (int type, const oconfig_item_t *ci, void **user_data) +{ + pfc_user_data_t *data; + + int ret = 0; + + dTHX; + + if (NULL == perl_threads) + return 0; + + if (NULL == aTHX) { + c_ithread_t *t = NULL; + + pthread_mutex_lock (&perl_threads->mutex); + t = c_ithread_create (perl_threads->head->interp); + pthread_mutex_unlock (&perl_threads->mutex); + + aTHX = t->interp; + } + + log_debug ("fc_create: c_ithread: interp = %p (active threads: %i)", + aTHX, perl_threads->number_of_threads); + + if ((1 != ci->values_num) + || (OCONFIG_TYPE_STRING != ci->values[0].type)) { + log_warn ("A \"%s\" block expects a single string argument.", + (FC_MATCH == type) ? "Match" : "Target"); + return -1; + } + + data = (pfc_user_data_t *)smalloc (sizeof (*data)); + data->name = sstrdup (ci->values[0].value.string); + data->user_data = newSV (0); + + ret = fc_call (aTHX_ type, FC_CB_CREATE, data, ci); + + if (0 != ret) + PFC_USER_DATA_FREE (data); + else + *user_data = data; + return ret; +} /* static int fc_create (int, const oconfig_item_t *, void **) */ + +static int fc_destroy (int type, void **user_data) +{ + pfc_user_data_t *data = *(pfc_user_data_t **)user_data; + + int ret = 0; + + dTHX; + + if ((NULL == perl_threads) || (NULL == data)) + return 0; + + if (NULL == aTHX) { + c_ithread_t *t = NULL; + + pthread_mutex_lock (&perl_threads->mutex); + t = c_ithread_create (perl_threads->head->interp); + pthread_mutex_unlock (&perl_threads->mutex); + + aTHX = t->interp; + } + + log_debug ("fc_destroy: c_ithread: interp = %p (active threads: %i)", + aTHX, perl_threads->number_of_threads); + + ret = fc_call (aTHX_ type, FC_CB_DESTROY, data); + + PFC_USER_DATA_FREE (data); + *user_data = NULL; + return ret; +} /* static int fc_destroy (int, void **) */ + +static int fc_exec (int type, const data_set_t *ds, const value_list_t *vl, + notification_meta_t **meta, void **user_data) +{ + pfc_user_data_t *data = *(pfc_user_data_t **)user_data; + + dTHX; + + if (NULL == perl_threads) + return 0; + + assert (NULL != data); + + if (NULL == aTHX) { + c_ithread_t *t = NULL; + + pthread_mutex_lock (&perl_threads->mutex); + t = c_ithread_create (perl_threads->head->interp); + pthread_mutex_unlock (&perl_threads->mutex); + + aTHX = t->interp; + } + + log_debug ("fc_exec: c_ithread: interp = %p (active threads: %i)", + aTHX, perl_threads->number_of_threads); + + return fc_call (aTHX_ type, FC_CB_EXEC, data, ds, vl, meta); +} /* static int fc_exec (int, const data_set_t *, const value_list_t *, + notification_meta_t **, void **) */ + +static int pmatch_create (const oconfig_item_t *ci, void **user_data) +{ + return fc_create (FC_MATCH, ci, user_data); +} /* static int pmatch_create (const oconfig_item_t *, void **) */ + +static int pmatch_destroy (void **user_data) +{ + return fc_destroy (FC_MATCH, user_data); +} /* static int pmatch_destroy (void **) */ + +static int pmatch_match (const data_set_t *ds, const value_list_t *vl, + notification_meta_t **meta, void **user_data) +{ + return fc_exec (FC_MATCH, ds, vl, meta, user_data); +} /* static int pmatch_match (const data_set_t *, const value_list_t *, + notification_meta_t **, void **) */ + +static match_proc_t pmatch = { + pmatch_create, pmatch_destroy, pmatch_match +}; + +static int ptarget_create (const oconfig_item_t *ci, void **user_data) +{ + return fc_create (FC_TARGET, ci, user_data); +} /* static int ptarget_create (const oconfig_item_t *, void **) */ + +static int ptarget_destroy (void **user_data) +{ + return fc_destroy (FC_TARGET, user_data); +} /* static int ptarget_destroy (void **) */ + +static int ptarget_invoke (const data_set_t *ds, value_list_t *vl, + notification_meta_t **meta, void **user_data) +{ + return fc_exec (FC_TARGET, ds, vl, meta, user_data); +} /* static int ptarget_invoke (const data_set_t *, value_list_t *, + notification_meta_t **, void **) */ + +static target_proc_t ptarget = { + ptarget_create, ptarget_destroy, ptarget_invoke +}; + +/* + * Exported Perl API. + */ + +/* + * Collectd::plugin_register_data_set (type, dataset). + * + * type: + * type of the dataset + * + * dataset: + * dataset to be registered + */ +static XS (Collectd_plugin_register_ds) +{ + SV *data = NULL; + int ret = 0; + + dXSARGS; + + log_warn ("Using plugin_register() to register new data-sets is " + "deprecated - add new entries to a custom types.db instead."); + + if (2 != items) { + log_err ("Usage: Collectd::plugin_register_data_set(type, dataset)"); + XSRETURN_EMPTY; + } + + log_debug ("Collectd::plugin_register_data_set: " + "type = \"%s\", dataset = \"%s\"", + SvPV_nolen (ST (0)), SvPV_nolen (ST (1))); + + data = ST (1); + + if (SvROK (data) && (SVt_PVAV == SvTYPE (SvRV (data)))) { + ret = pplugin_register_data_set (aTHX_ SvPV_nolen (ST (0)), + (AV *)SvRV (data)); + } + else { + log_err ("Collectd::plugin_register_data_set: Invalid data."); + XSRETURN_EMPTY; + } + + if (0 == ret) + XSRETURN_YES; + else + XSRETURN_EMPTY; +} /* static XS (Collectd_plugin_register_ds) */ + +/* + * Collectd::plugin_unregister_data_set (type). + * + * type: + * type of the dataset + */ +static XS (Collectd_plugin_unregister_ds) +{ + dXSARGS; + + if (1 != items) { + log_err ("Usage: Collectd::plugin_unregister_data_set(type)"); + XSRETURN_EMPTY; + } + + log_debug ("Collectd::plugin_unregister_data_set: type = \"%s\"", + SvPV_nolen (ST (0))); + + if (0 == pplugin_unregister_data_set (SvPV_nolen (ST (0)))) + XSRETURN_YES; + else + XSRETURN_EMPTY; +} /* static XS (Collectd_plugin_register_ds) */ + +/* + * Collectd::plugin_dispatch_values (name, values). + * + * name: + * name of the plugin + * + * values: + * value list to submit + */ +static XS (Collectd_plugin_dispatch_values) +{ + SV *values = NULL; + int values_idx = 0; + + int ret = 0; + + dXSARGS; if (2 == items) { log_warn ("Collectd::plugin_dispatch_values with two arguments " @@ -1008,6 +1653,65 @@ static XS (Collectd_plugin_dispatch_values) XSRETURN_EMPTY; } /* static XS (Collectd_plugin_dispatch_values) */ +/* Collectd::plugin_write (plugin, ds, vl). + * + * plugin: + * name of the plugin to call, may be 'undef' + * + * ds: + * data-set that describes the submitted values, may be 'undef' + * + * vl: + * value-list to be written + */ +static XS (Collectd__plugin_write) +{ + char *plugin; + SV *ds, *vl; + AV *ds_array; + + int ret; + + dXSARGS; + + if (3 != items) { + log_err ("Usage: Collectd::plugin_write(plugin, ds, vl)"); + XSRETURN_EMPTY; + } + + log_debug ("Collectd::plugin_write: plugin=\"%s\", ds=\"%s\", vl=\"%s\"", + SvPV_nolen (ST (0)), SvOK (ST (1)) ? SvPV_nolen (ST (1)) : "", + SvPV_nolen (ST (2))); + + if (! SvOK (ST (0))) + plugin = NULL; + else + plugin = SvPV_nolen (ST (0)); + + ds = ST (1); + if (SvROK (ds) && (SVt_PVAV == SvTYPE (SvRV (ds)))) + ds_array = (AV *)SvRV (ds); + else if (! SvOK (ds)) + ds_array = NULL; + else { + log_err ("Collectd::plugin_write: Invalid data-set."); + XSRETURN_EMPTY; + } + + vl = ST (2); + if (! (SvROK (vl) && (SVt_PVHV == SvTYPE (SvRV (vl))))) { + log_err ("Collectd::plugin_write: Invalid value-list."); + XSRETURN_EMPTY; + } + + ret = pplugin_write (aTHX_ plugin, ds_array, (HV *)SvRV (vl)); + + if (0 == ret) + XSRETURN_YES; + else + XSRETURN_EMPTY; +} /* static XS (Collectd__plugin_write) */ + /* * Collectd::_plugin_flush (plugin, timeout, identifier). * @@ -1111,6 +1815,43 @@ static XS (Collectd_plugin_log) } /* static XS (Collectd_plugin_log) */ /* + * Collectd::_fc_register (type, name) + * + * type: + * match | target + * + * name: + * name of the match + */ +static XS (Collectd__fc_register) +{ + int type; + char *name; + + int ret = 0; + + dXSARGS; + + if (2 != items) { + log_err ("Usage: Collectd::_fc_register(type, name)"); + XSRETURN_EMPTY; + } + + type = SvIV (ST (0)); + name = SvPV_nolen (ST (1)); + + if (FC_MATCH == type) + ret = fc_register_match (name, pmatch); + else if (FC_TARGET == type) + ret = fc_register_target (name, ptarget); + + if (0 == ret) + XSRETURN_YES; + else + XSRETURN_EMPTY; +} /* static XS (Collectd_fc_register) */ + +/* * Collectd::call_by_name (...). * * Call a Perl sub identified by its name passed through $Collectd::cb_name. @@ -1140,118 +1881,12 @@ static XS (Collectd_call_by_name) } /* static XS (Collectd_call_by_name) */ /* - * collectd's perl interpreter based thread implementation. - * - * This has been inspired by Perl's ithreads introduced in version 5.6.0. - */ - -/* must be called with perl_threads->mutex locked */ -static void c_ithread_destroy (c_ithread_t *ithread) -{ - dTHXa (ithread->interp); - - assert (NULL != perl_threads); - - PERL_SET_CONTEXT (aTHX); - log_debug ("Shutting down Perl interpreter %p...", aTHX); - -#if COLLECT_DEBUG - sv_report_used (); - - --perl_threads->number_of_threads; -#endif /* COLLECT_DEBUG */ - - perl_destruct (aTHX); - perl_free (aTHX); - - if (NULL == ithread->prev) - perl_threads->head = ithread->next; - else - ithread->prev->next = ithread->next; - - if (NULL == ithread->next) - perl_threads->tail = ithread->prev; - else - ithread->next->prev = ithread->prev; - - sfree (ithread); - return; -} /* static void c_ithread_destroy (c_ithread_t *) */ - -static void c_ithread_destructor (void *arg) -{ - c_ithread_t *ithread = (c_ithread_t *)arg; - c_ithread_t *t = NULL; - - if (NULL == perl_threads) - return; - - pthread_mutex_lock (&perl_threads->mutex); - - for (t = perl_threads->head; NULL != t; t = t->next) - if (t == ithread) - break; - - /* the ithread no longer exists */ - if (NULL == t) - return; - - c_ithread_destroy (ithread); - - pthread_mutex_unlock (&perl_threads->mutex); - return; -} /* static void c_ithread_destructor (void *) */ - -/* must be called with perl_threads->mutex locked */ -static c_ithread_t *c_ithread_create (PerlInterpreter *base) -{ - c_ithread_t *t = NULL; - dTHXa (NULL); - - assert (NULL != perl_threads); - - t = (c_ithread_t *)smalloc (sizeof (c_ithread_t)); - memset (t, 0, sizeof (c_ithread_t)); - - t->interp = (NULL == base) - ? NULL - : perl_clone (base, CLONEf_KEEP_PTR_TABLE); - - aTHX = t->interp; - - if ((NULL != base) && (NULL != PL_endav)) { - av_clear (PL_endav); - av_undef (PL_endav); - PL_endav = Nullav; - } - -#if COLLECT_DEBUG - ++perl_threads->number_of_threads; -#endif /* COLLECT_DEBUG */ - - t->next = NULL; - - if (NULL == perl_threads->tail) { - perl_threads->head = t; - t->prev = NULL; - } - else { - perl_threads->tail->next = t; - t->prev = perl_threads->tail; - } - - perl_threads->tail = t; - - pthread_setspecific (perl_thr_key, (const void *)t); - return t; -} /* static c_ithread_t *c_ithread_create (PerlInterpreter *) */ - -/* * Interface to collectd. */ static int perl_init (void) { + int status; dTHX; if (NULL == perl_threads) @@ -1269,7 +1904,19 @@ static int perl_init (void) log_debug ("perl_init: c_ithread: interp = %p (active threads: %i)", aTHX, perl_threads->number_of_threads); - return pplugin_call_all (aTHX_ PLUGIN_INIT); + + /* Lock the base thread to avoid race conditions with c_ithread_create(). + * See https://github.com/collectd/collectd/issues/9 and + * https://github.com/collectd/collectd/issues/1706 for details. */ + + assert (aTHX == perl_threads->head->interp); + pthread_mutex_lock (&perl_threads->mutex); + + status = pplugin_call_all (aTHX_ PLUGIN_INIT); + + pthread_mutex_unlock (&perl_threads->mutex); + + return status; } /* static int perl_init (void) */ static int perl_read (void) @@ -1289,13 +1936,20 @@ static int perl_read (void) aTHX = t->interp; } + /* Assert that we're not running as the base thread. Otherwise, we might + * run into concurrency issues with c_ithread_create(). See + * https://github.com/collectd/collectd/issues/9 for details. */ + assert (aTHX != perl_threads->head->interp); + log_debug ("perl_read: c_ithread: interp = %p (active threads: %i)", aTHX, perl_threads->number_of_threads); return pplugin_call_all (aTHX_ PLUGIN_READ); } /* static int perl_read (void) */ -static int perl_write (const data_set_t *ds, const value_list_t *vl) +static int perl_write (const data_set_t *ds, const value_list_t *vl, + user_data_t __attribute__((unused)) *user_data) { + int status; dTHX; if (NULL == perl_threads) @@ -1311,12 +1965,24 @@ static int perl_write (const data_set_t *ds, const value_list_t *vl) aTHX = t->interp; } + /* Lock the base thread if this is not called from one of the read threads + * to avoid race conditions with c_ithread_create(). See + * https://github.com/collectd/collectd/issues/9 for details. */ + if (aTHX == perl_threads->head->interp) + pthread_mutex_lock (&perl_threads->mutex); + log_debug ("perl_write: c_ithread: interp = %p (active threads: %i)", aTHX, perl_threads->number_of_threads); - return pplugin_call_all (aTHX_ PLUGIN_WRITE, ds, vl); + status = pplugin_call_all (aTHX_ PLUGIN_WRITE, ds, vl); + + if (aTHX == perl_threads->head->interp) + pthread_mutex_unlock (&perl_threads->mutex); + + return status; } /* static int perl_write (const data_set_t *, const value_list_t *) */ -static void perl_log (int level, const char *msg) +static void perl_log (int level, const char *msg, + user_data_t __attribute__((unused)) *user_data) { dTHX; @@ -1333,11 +1999,22 @@ static void perl_log (int level, const char *msg) aTHX = t->interp; } + /* Lock the base thread if this is not called from one of the read threads + * to avoid race conditions with c_ithread_create(). See + * https://github.com/collectd/collectd/issues/9 for details. */ + if (aTHX == perl_threads->head->interp) + pthread_mutex_lock (&perl_threads->mutex); + pplugin_call_all (aTHX_ PLUGIN_LOG, level, msg); + + if (aTHX == perl_threads->head->interp) + pthread_mutex_unlock (&perl_threads->mutex); + return; } /* static void perl_log (int, const char *) */ -static int perl_notify (const notification_t *notif) +static int perl_notify (const notification_t *notif, + user_data_t __attribute__((unused)) *user_data) { dTHX; @@ -1356,7 +2033,8 @@ static int perl_notify (const notification_t *notif) return pplugin_call_all (aTHX_ PLUGIN_NOTIF, notif); } /* static int perl_notify (const notification_t *) */ -static int perl_flush (int timeout, const char *identifier) +static int perl_flush (int timeout, const char *identifier, + user_data_t __attribute__((unused)) *user_data) { dTHX; @@ -1471,8 +2149,18 @@ static int g_iv_set (pTHX_ SV *var, MAGIC *mg) return 0; } /* static int g_iv_set (pTHX_ SV *, MAGIC *) */ -static MGVTBL g_pv_vtbl = { g_pv_get, g_pv_set, NULL, NULL, NULL, NULL, NULL }; -static MGVTBL g_iv_vtbl = { g_iv_get, g_iv_set, NULL, NULL, NULL, NULL, NULL }; +static MGVTBL g_pv_vtbl = { + g_pv_get, g_pv_set, NULL, NULL, NULL, NULL, NULL +#if HAVE_PERL_STRUCT_MGVTBL_SVT_LOCAL + , NULL +#endif +}; +static MGVTBL g_iv_vtbl = { + g_iv_get, g_iv_set, NULL, NULL, NULL, NULL, NULL +#if HAVE_PERL_STRUCT_MGVTBL_SVT_LOCAL + , NULL +#endif +}; /* bootstrap the Collectd module */ static void xs_init (pTHX) @@ -1592,14 +2280,15 @@ static int init_pi (int argc, char **argv) perl_run (aTHX); - plugin_register_log ("perl", perl_log); - plugin_register_notification ("perl", perl_notify); + plugin_register_log ("perl", perl_log, /* user_data = */ NULL); + plugin_register_notification ("perl", perl_notify, + /* user_data = */ NULL); plugin_register_init ("perl", perl_init); plugin_register_read ("perl", perl_read); - plugin_register_write ("perl", perl_write); - plugin_register_flush ("perl", perl_flush); + plugin_register_write ("perl", perl_write, /* user_data = */ NULL); + plugin_register_flush ("perl", perl_flush, /* user_data = */ NULL); plugin_register_shutdown ("perl", perl_shutdown); return 0; } /* static int init_pi (const char **, const int) */ @@ -1765,7 +2454,7 @@ static int perl_config_plugin (pTHX_ oconfig_item_t *ci) hv_undef (config); log_err ("Unable to convert configuration to a Perl hash value."); - config = Nullhv; + config = (HV *)&PL_sv_undef; } ENTER;