From 108b6a44ac3e33a0a745b914d61126a5c7048ac0 Mon Sep 17 00:00:00 2001 From: Pavel Rochnyack Date: Sat, 14 May 2016 17:23:48 +0600 Subject: [PATCH] perl plugin: Register perl plugins with use of 'userdata'. --- bindings/perl/lib/Collectd.pm | 77 +++++------ src/perl.c | 301 +++++++++++++++++++++++++++++++++++------- 2 files changed, 290 insertions(+), 88 deletions(-) diff --git a/bindings/perl/lib/Collectd.pm b/bindings/perl/lib/Collectd.pm index c1adf442..576e5f43 100644 --- a/bindings/perl/lib/Collectd.pm +++ b/bindings/perl/lib/Collectd.pm @@ -172,7 +172,6 @@ sub plugin_call_all { my $type = shift; my %plugins; - my $interval; our $cb_name = undef; @@ -196,21 +195,9 @@ sub plugin_call_all { %plugins = %{$plugins[$type]}; } - $interval = plugin_get_interval (); - foreach my $plugin (keys %plugins) { - my $p = $plugins{$plugin}; - - my $status = 0; - - if ($p->{'wait_left'} > 0) { - $p->{'wait_left'} -= $interval; - } - - next if ($p->{'wait_left'} > 0); - - $cb_name = $p->{'cb_name'}; - $status = call_by_name (@_); + $cb_name = $plugins{$plugin}; + my $status = call_by_name (@_); if (! $status) { my $err = undef; @@ -230,23 +217,7 @@ sub plugin_call_all { } if ($status) { - $p->{'wait_left'} = 0; - $p->{'wait_time'} = $interval; - } - elsif (TYPE_READ == $type) { - if ($p->{'wait_time'} < $interval) { - $p->{'wait_time'} = $interval; - } - - $p->{'wait_left'} = $p->{'wait_time'}; - $p->{'wait_time'} *= 2; - - if ($p->{'wait_time'} > 86400) { - $p->{'wait_time'} = 86400; - } - - WARNING ("${plugin}->read() failed with status $status. " - . "Will suspend it for $p->{'wait_left'} seconds."); + #NOOP } elsif (TYPE_INIT == $type) { ERROR ("${plugin}->init() failed with status $status. " @@ -309,21 +280,26 @@ sub plugin_register { } elsif ((TYPE_DATASET != $type) && (! ref $data)) { my $pkg = scalar caller; - - my %p : shared; - if ($data !~ m/^$pkg\:\:/) { $data = $pkg . "::" . $data; } - - %p = ( - wait_time => plugin_get_interval (), - wait_left => 0, - cb_name => $data, - ); - + if (TYPE_READ == $type) { + return plugin_register_read($name, $data); + } + if (TYPE_WRITE == $type) { + return plugin_register_write($name, $data); + } + if (TYPE_LOG == $type) { + return plugin_register_log($name, $data); + } + if (TYPE_NOTIF == $type) { + return plugin_register_notification($name, $data); + } + if (TYPE_FLUSH == $type) { + return plugin_register_flush($name, $data); + } lock %{$plugins[$type]}; - $plugins[$type]->{$name} = \%p; + $plugins[$type]->{$name} = $data; } else { ERROR ("Collectd::plugin_register: Invalid data."); @@ -351,6 +327,21 @@ sub plugin_unregister { lock %cf_callbacks; delete $cf_callbacks{$name}; } + elsif (TYPE_READ == $type) { + return plugin_unregister_read ($name); + } + elsif (TYPE_WRITE == $type) { + return plugin_unregister_write($name); + } + elsif (TYPE_LOG == $type) { + return plugin_unregister_log ($name); + } + elsif (TYPE_NOTIF == $type) { + return plugin_unregister_notification($name); + } + elsif (TYPE_FLUSH == $type) { + return plugin_unregister_flush($name); + } elsif (defined $plugins[$type]) { lock %{$plugins[$type]}; delete $plugins[$type]->{$name}; diff --git a/src/perl.c b/src/perl.c index 9eef6c1a..9f7b869f 100644 --- a/src/perl.c +++ b/src/perl.c @@ -102,6 +102,16 @@ /* this is defined in DynaLoader.a */ void boot_DynaLoader (PerlInterpreter *, CV *); +static XS (Collectd_plugin_register_read); +static XS (Collectd_plugin_register_write); +static XS (Collectd_plugin_register_log); +static XS (Collectd_plugin_register_notification); +static XS (Collectd_plugin_register_flush); +static XS (Collectd_plugin_unregister_read); +static XS (Collectd_plugin_unregister_write); +static XS (Collectd_plugin_unregister_log); +static XS (Collectd_plugin_unregister_notification); +static XS (Collectd_plugin_unregister_flush); static XS (Collectd_plugin_register_ds); static XS (Collectd_plugin_unregister_ds); static XS (Collectd_plugin_dispatch_values); @@ -113,6 +123,14 @@ static XS (Collectd_plugin_log); static XS (Collectd__fc_register); static XS (Collectd_call_by_name); +static int perl_read (user_data_t *ud); +static int perl_write (const data_set_t *ds, const value_list_t *vl, + user_data_t *user_data); +static void perl_log (int level, const char *msg, user_data_t *user_data); +static int perl_notify (const notification_t *notif, user_data_t *user_data); +static int perl_flush (cdtime_t timeout, const char *identifier, + user_data_t *user_data); + /* * private data types */ @@ -182,6 +200,18 @@ static struct { XS ((*f)); } api[] = { + { "Collectd::plugin_register_read", Collectd_plugin_register_read }, + { "Collectd::plugin_register_write", Collectd_plugin_register_write }, + { "Collectd::plugin_register_log", Collectd_plugin_register_log }, + { "Collectd::plugin_register_notification", + Collectd_plugin_register_notification }, + { "Collectd::plugin_register_flush", Collectd_plugin_register_flush }, + { "Collectd::plugin_unregister_read", Collectd_plugin_unregister_read }, + { "Collectd::plugin_unregister_write", Collectd_plugin_unregister_write }, + { "Collectd::plugin_unregister_log", Collectd_plugin_unregister_log }, + { "Collectd::plugin_unregister_notification", + Collectd_plugin_unregister_notification }, + { "Collectd::plugin_unregister_flush", Collectd_plugin_unregister_flush }, { "Collectd::plugin_register_data_set", Collectd_plugin_register_ds }, { "Collectd::plugin_unregister_data_set", Collectd_plugin_unregister_ds }, { "Collectd::plugin_dispatch_values", Collectd_plugin_dispatch_values }, @@ -1010,7 +1040,7 @@ static int call_pv_locked (pTHX_ const char* sub_name) return 0; } - ret = call_pv (sub_name, G_SCALAR); + ret = call_pv (sub_name, G_SCALAR|G_EVAL); t->running = old_running; return ret; @@ -1019,12 +1049,13 @@ static int call_pv_locked (pTHX_ const char* sub_name) /* * Call all working functions of the given type. */ -static int pplugin_call_all (pTHX_ int type, ...) +static int pplugin_call (pTHX_ int type, ...) { int retvals = 0; va_list ap; int ret = 0; + char *subname; dSP; @@ -1038,9 +1069,17 @@ static int pplugin_call_all (pTHX_ int type, ...) PUSHMARK (SP); - XPUSHs (sv_2mortal (newSViv ((IV)type))); + if (PLUGIN_READ == type) { + subname = va_arg(ap, char *); + } + else if (PLUGIN_WRITE == type) { + data_set_t *ds; + value_list_t *vl; - if (PLUGIN_WRITE == type) { + AV *pds = newAV (); + HV *pvl = newHV (); + + subname = va_arg(ap, char *); /* * $_[0] = $plugin_type; * @@ -1066,12 +1105,6 @@ static int pplugin_call_all (pTHX_ int type, ...) * type_instance => $type_instance * }; */ - data_set_t *ds; - value_list_t *vl; - - AV *pds = newAV (); - HV *pvl = newHV (); - ds = va_arg (ap, data_set_t *); vl = va_arg (ap, value_list_t *); @@ -1094,6 +1127,7 @@ static int pplugin_call_all (pTHX_ int type, ...) XPUSHs (sv_2mortal (newRV_noinc ((SV *)pvl))); } else if (PLUGIN_LOG == type) { + subname = va_arg(ap, char *); /* * $_[0] = $level; * @@ -1103,6 +1137,10 @@ static int pplugin_call_all (pTHX_ int type, ...) XPUSHs (sv_2mortal (newSVpv (va_arg (ap, char *), 0))); } else if (PLUGIN_NOTIF == type) { + notification_t *n; + HV *notif = newHV (); + + subname = va_arg(ap, char *); /* * $_[0] = * { @@ -1116,9 +1154,6 @@ static int pplugin_call_all (pTHX_ int type, ...) * type_instance => $type_instance * }; */ - notification_t *n; - HV *notif = newHV (); - n = va_arg (ap, notification_t *); if (-1 == notification2hv (aTHX_ n, notif)) { @@ -1132,7 +1167,7 @@ static int pplugin_call_all (pTHX_ int type, ...) } else if (PLUGIN_FLUSH == type) { cdtime_t timeout; - + subname = va_arg(ap, char *); /* * $_[0] = $timeout; * $_[1] = $identifier; @@ -1142,13 +1177,30 @@ static int pplugin_call_all (pTHX_ int type, ...) XPUSHs (sv_2mortal (newSVnv (CDTIME_T_TO_DOUBLE (timeout)))); XPUSHs (sv_2mortal (newSVpv (va_arg (ap, char *), 0))); } + else if (PLUGIN_INIT == type) { + subname = "Collectd::plugin_call_all"; + XPUSHs (sv_2mortal (newSViv ((IV)type))); + } + else if (PLUGIN_SHUTDOWN == type) { + subname = "Collectd::plugin_call_all"; + XPUSHs (sv_2mortal (newSViv ((IV)type))); + } + else { /* Unknown type. Run 'plugin_call_all' and make compiler happy */ + subname = "Collectd::plugin_call_all"; + XPUSHs (sv_2mortal (newSViv ((IV)type))); + } PUTBACK; - retvals = call_pv_locked (aTHX_ "Collectd::plugin_call_all"); + retvals = call_pv_locked (aTHX_ subname); SPAGAIN; - if (0 < retvals) { + if (SvTRUE(ERRSV)) { + if (PLUGIN_LOG != type) + ERROR ("perl: %s error: %s", subname, SvPV_nolen(ERRSV)); + ret = -1; + } + else if (0 < retvals) { SV *tmp = POPs; if (! SvTRUE (tmp)) ret = -1; @@ -1160,7 +1212,7 @@ static int pplugin_call_all (pTHX_ int type, ...) va_end (ap); return ret; -} /* static int pplugin_call_all (int, ...) */ +} /* static int pplugin_call (int, ...) */ /* * collectd's Perl interpreter based thread implementation. @@ -1399,7 +1451,11 @@ static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...) } SPAGAIN; - if (0 < retvals) { + if (SvTRUE(ERRSV)) { + ERROR ("perl: Collectd::fc_call error: %s", SvPV_nolen(ERRSV)); + ret = -1; + } + else if (0 < retvals) { SV *tmp = POPs; /* the exec callbacks return a status, while @@ -1568,6 +1624,169 @@ static target_proc_t ptarget = { * Exported Perl API. */ +static void _plugin_register_generic_userdata (pTHX, int type, const char *desc) +{ + int ret = 0; + user_data_t userdata; + char cb_name[DATA_MAX_NAME_LEN]; + + dXSARGS; + + if (2 != items) { + log_err ("Usage: Collectd::plugin_register_%s(pluginname, subname)", + desc); + XSRETURN_EMPTY; + } + + if (! SvOK (ST (0))) { + log_err ("Collectd::plugin_register_%s(pluginname, subname): " + "Invalid pluginname", desc); + XSRETURN_EMPTY; + } + if (! SvOK (ST (1))) { + log_err ("Collectd::plugin_register_%s(pluginname, subname): " + "Invalid subname", desc); + XSRETURN_EMPTY; + } + + log_debug ("Collectd::plugin_register_%s: " + "plugin = \"%s\", sub = \"%s\"", + desc, SvPV_nolen (ST (0)), SvPV_nolen (ST (1))); + + ssnprintf (cb_name, sizeof (cb_name), "perl/%s", SvPV_nolen (ST (0))); + memset(&userdata, 0, sizeof(userdata)); + userdata.data = strdup(SvPV_nolen (ST (1))); + userdata.free_func = free; + + if (PLUGIN_READ == type) { + ret = plugin_register_complex_read( + "perl", /* group */ + cb_name, + perl_read, + plugin_get_interval(), /* Default interval */ + &userdata); + } + else if (PLUGIN_WRITE == type) { + ret = plugin_register_write(cb_name, perl_write, &userdata); + } + else if (PLUGIN_LOG == type) { + ret = plugin_register_log(cb_name, perl_log, &userdata); + } + else if (PLUGIN_NOTIF == type) { + ret = plugin_register_notification(cb_name, perl_notify, &userdata); + } + else if (PLUGIN_FLUSH == type) { + ret = plugin_register_flush(cb_name, perl_flush, &userdata); + } + else { + ret = -1; + } + + if (0 == ret) + XSRETURN_YES; + else { + free (userdata.data); + XSRETURN_EMPTY; + } +} /* static void _plugin_register_generic_userdata ( ... ) */ + +/* + * Collectd::plugin_register_TYPE (pluginname, subname). + * + * pluginname: + * name of the perl plugin + * + * subname: + * name of the plugin's subroutine that does the work + */ + +static XS (Collectd_plugin_register_read) { + return _plugin_register_generic_userdata(aTHX, PLUGIN_READ, "read"); +} + +static XS (Collectd_plugin_register_write) { + return _plugin_register_generic_userdata(aTHX, PLUGIN_WRITE, "write"); +} + +static XS (Collectd_plugin_register_log) { + return _plugin_register_generic_userdata(aTHX, PLUGIN_LOG, "log"); +} + +static XS (Collectd_plugin_register_notification) { + return _plugin_register_generic_userdata(aTHX, PLUGIN_NOTIF, "notification"); +} + +static XS (Collectd_plugin_register_flush) { + return _plugin_register_generic_userdata(aTHX, PLUGIN_FLUSH, "flush"); +} + +typedef int perl_unregister_function_t(const char *name); + +static void _plugin_unregister_generic (pTHX, + perl_unregister_function_t *unreg, const char *desc) +{ + char cb_name[DATA_MAX_NAME_LEN]; + + dXSARGS; + + if (1 != items) { + log_err ("Usage: Collectd::plugin_unregister_%s(pluginname)", desc); + XSRETURN_EMPTY; + } + + if (! SvOK (ST (0))) { + log_err ("Collectd::plugin_unregister_%s(pluginname): " + "Invalid pluginname", desc); + XSRETURN_EMPTY; + } + + log_debug ("Collectd::plugin_unregister_%s: plugin = \"%s\"", + desc, SvPV_nolen (ST (0))); + + ssnprintf (cb_name, sizeof (cb_name), "perl/%s", SvPV_nolen (ST (0))); + + unreg(cb_name); + + XSRETURN_EMPTY; + + return; +} /* static void _plugin_unregister_generic ( ... ) */ + +/* + * Collectd::plugin_unregister_TYPE (pluginname). + * + * TYPE: + * type of callback to be unregistered: read, write, log, notification, flush + * + * pluginname: + * name of the perl plugin + */ + +static XS (Collectd_plugin_unregister_read) { + return _plugin_unregister_generic(aTHX, + plugin_unregister_read, "read"); +} + +static XS (Collectd_plugin_unregister_write) { + return _plugin_unregister_generic(aTHX, + plugin_unregister_write, "write"); +} + +static XS (Collectd_plugin_unregister_log) { + return _plugin_unregister_generic(aTHX, + plugin_unregister_log, "log"); +} + +static XS (Collectd_plugin_unregister_notification) { + return _plugin_unregister_generic(aTHX, + plugin_unregister_notification, "notification"); +} + +static XS (Collectd_plugin_unregister_flush) { + return _plugin_unregister_generic(aTHX, + plugin_unregister_flush, "flush"); +} + /* * Collectd::plugin_register_data_set (type, dataset). * @@ -1955,14 +2174,14 @@ static int perl_init (void) assert (aTHX == perl_threads->head->interp); pthread_mutex_lock (&perl_threads->mutex); - status = pplugin_call_all (aTHX_ PLUGIN_INIT); + status = pplugin_call (aTHX_ PLUGIN_INIT); pthread_mutex_unlock (&perl_threads->mutex); return status; } /* static int perl_init (void) */ -static int perl_read (void) +static int perl_read (user_data_t *user_data) { dTHX; @@ -1986,11 +2205,12 @@ static int perl_read (void) log_debug ("perl_read: c_ithread: interp = %p (active threads: %i)", aTHX, perl_threads->number_of_threads); - return pplugin_call_all (aTHX_ PLUGIN_READ); -} /* static int perl_read (void) */ + + return pplugin_call (aTHX_ PLUGIN_READ, user_data->data); +} /* static int perl_read (user_data_t *user_data) */ static int perl_write (const data_set_t *ds, const value_list_t *vl, - user_data_t __attribute__((unused)) *user_data) + user_data_t *user_data) { int status; dTHX; @@ -2016,7 +2236,7 @@ static int perl_write (const data_set_t *ds, const value_list_t *vl, log_debug ("perl_write: c_ithread: interp = %p (active threads: %i)", aTHX, perl_threads->number_of_threads); - status = pplugin_call_all (aTHX_ PLUGIN_WRITE, ds, vl); + status = pplugin_call (aTHX_ PLUGIN_WRITE, user_data->data, ds, vl); if (aTHX == perl_threads->head->interp) pthread_mutex_unlock (&perl_threads->mutex); @@ -2025,7 +2245,7 @@ static int perl_write (const data_set_t *ds, const value_list_t *vl, } /* static int perl_write (const data_set_t *, const value_list_t *) */ static void perl_log (int level, const char *msg, - user_data_t __attribute__((unused)) *user_data) + user_data_t *user_data) { dTHX; @@ -2050,7 +2270,7 @@ static void perl_log (int level, const char *msg, if (aTHX == perl_threads->head->interp) pthread_mutex_lock (&perl_threads->mutex); - pplugin_call_all (aTHX_ PLUGIN_LOG, level, msg); + pplugin_call (aTHX_ PLUGIN_LOG, user_data->data, level, msg); if (aTHX == perl_threads->head->interp) pthread_mutex_unlock (&perl_threads->mutex); @@ -2058,8 +2278,7 @@ static void perl_log (int level, const char *msg, return; } /* static void perl_log (int, const char *) */ -static int perl_notify (const notification_t *notif, - user_data_t __attribute__((unused)) *user_data) +static int perl_notify (const notification_t *notif, user_data_t *user_data) { dTHX; @@ -2075,11 +2294,11 @@ static int perl_notify (const notification_t *notif, aTHX = t->interp; } - return pplugin_call_all (aTHX_ PLUGIN_NOTIF, notif); + return pplugin_call (aTHX_ PLUGIN_NOTIF, user_data->data, notif); } /* static int perl_notify (const notification_t *) */ static int perl_flush (cdtime_t timeout, const char *identifier, - user_data_t __attribute__((unused)) *user_data) + user_data_t *user_data) { dTHX; @@ -2095,7 +2314,7 @@ static int perl_flush (cdtime_t timeout, const char *identifier, aTHX = t->interp; } - return pplugin_call_all (aTHX_ PLUGIN_FLUSH, timeout, identifier); + return pplugin_call (aTHX_ PLUGIN_FLUSH, user_data->data, timeout, identifier); } /* static int perl_flush (const int) */ static int perl_shutdown (void) @@ -2106,6 +2325,7 @@ static int perl_shutdown (void) dTHX; plugin_unregister_complex_config ("perl"); + plugin_unregister_read_group ("perl"); if (NULL == perl_threads) return 0; @@ -2121,14 +2341,9 @@ static int perl_shutdown (void) log_debug ("perl_shutdown: c_ithread: interp = %p (active threads: %i)", aTHX, perl_threads->number_of_threads); - plugin_unregister_log ("perl"); - plugin_unregister_notification ("perl"); plugin_unregister_init ("perl"); - plugin_unregister_read ("perl"); - plugin_unregister_write ("perl"); - plugin_unregister_flush ("perl"); - ret = pplugin_call_all (aTHX_ PLUGIN_SHUTDOWN); + ret = pplugin_call (aTHX_ PLUGIN_SHUTDOWN); pthread_mutex_lock (&perl_threads->mutex); t = perl_threads->tail; @@ -2153,6 +2368,10 @@ static int perl_shutdown (void) pthread_kill (thr->pthread, SIGTERM); ERROR ("perl shutdown: Thread hangs inside Perl. Thread killed."); } + /* Mark as running to avoid deadlock: + c_ithread_destroy -> log_debug -> perl_log() + */ + thr->running = 1; c_ithread_destroy (thr); } @@ -2338,15 +2557,7 @@ static int init_pi (int argc, char **argv) perl_run (aTHX); - plugin_register_log ("perl", perl_log, /* user_data = */ NULL); - plugin_register_notification ("perl", perl_notify, - /* user_data = */ NULL); plugin_register_init ("perl", perl_init); - - plugin_register_read ("perl", perl_read); - - plugin_register_write ("perl", perl_write, /* user_data = */ NULL); - plugin_register_flush ("perl", perl_flush, /* user_data = */ NULL); plugin_register_shutdown ("perl", perl_shutdown); return 0; } /* static int init_pi (const char **, const int) */ -- 2.11.0