perl plugin: Register perl plugins with use of 'userdata'.
authorPavel Rochnyack <pavel2000@ngs.ru>
Sat, 14 May 2016 11:23:48 +0000 (17:23 +0600)
committerPavel Rochnyack <pavel2000@ngs.ru>
Fri, 16 Sep 2016 04:46:26 +0000 (10:46 +0600)
bindings/perl/lib/Collectd.pm
src/perl.c

index c1adf44..576e5f4 100644 (file)
@@ -172,7 +172,6 @@ sub plugin_call_all {
        my $type = shift;
 
        my %plugins;
-       my $interval;
 
        our $cb_name = undef;
 
@@ -196,21 +195,9 @@ sub plugin_call_all {
                %plugins = %{$plugins[$type]};
        }
 
-       $interval = plugin_get_interval ();
-
        foreach my $plugin (keys %plugins) {
-               my $p = $plugins{$plugin};
-
-               my $status = 0;
-
-               if ($p->{'wait_left'} > 0) {
-                       $p->{'wait_left'} -= $interval;
-               }
-
-               next if ($p->{'wait_left'} > 0);
-
-               $cb_name = $p->{'cb_name'};
-               $status = call_by_name (@_);
+               $cb_name = $plugins{$plugin};
+               my $status = call_by_name (@_);
 
                if (! $status) {
                        my $err = undef;
@@ -230,23 +217,7 @@ sub plugin_call_all {
                }
 
                if ($status) {
-                       $p->{'wait_left'} = 0;
-                       $p->{'wait_time'} = $interval;
-               }
-               elsif (TYPE_READ == $type) {
-                       if ($p->{'wait_time'} < $interval) {
-                               $p->{'wait_time'} = $interval;
-                       }
-
-                       $p->{'wait_left'} = $p->{'wait_time'};
-                       $p->{'wait_time'} *= 2;
-
-                       if ($p->{'wait_time'} > 86400) {
-                               $p->{'wait_time'} = 86400;
-                       }
-
-                       WARNING ("${plugin}->read() failed with status $status. "
-                               . "Will suspend it for $p->{'wait_left'} seconds.");
+                       #NOOP
                }
                elsif (TYPE_INIT == $type) {
                        ERROR ("${plugin}->init() failed with status $status. "
@@ -309,21 +280,26 @@ sub plugin_register {
        }
        elsif ((TYPE_DATASET != $type) && (! ref $data)) {
                my $pkg = scalar caller;
-
-               my %p : shared;
-
                if ($data !~ m/^$pkg\:\:/) {
                        $data = $pkg . "::" . $data;
                }
-
-               %p = (
-                       wait_time => plugin_get_interval (),
-                       wait_left => 0,
-                       cb_name   => $data,
-               );
-
+               if (TYPE_READ == $type) {
+                       return plugin_register_read($name, $data);
+               }
+               if (TYPE_WRITE == $type) {
+                       return plugin_register_write($name, $data);
+               }
+               if (TYPE_LOG == $type) {
+                       return plugin_register_log($name, $data);
+               }
+               if (TYPE_NOTIF == $type) {
+                       return plugin_register_notification($name, $data);
+               }
+               if (TYPE_FLUSH == $type) {
+                       return plugin_register_flush($name, $data);
+               }
                lock %{$plugins[$type]};
-               $plugins[$type]->{$name} = \%p;
+               $plugins[$type]->{$name} = $data;
        }
        else {
                ERROR ("Collectd::plugin_register: Invalid data.");
@@ -351,6 +327,21 @@ sub plugin_unregister {
                lock %cf_callbacks;
                delete $cf_callbacks{$name};
        }
+       elsif (TYPE_READ == $type) {
+               return plugin_unregister_read ($name);
+       }
+       elsif (TYPE_WRITE == $type) {
+               return plugin_unregister_write($name);
+       }
+       elsif (TYPE_LOG == $type) {
+               return plugin_unregister_log ($name);
+       }
+       elsif (TYPE_NOTIF == $type) {
+               return plugin_unregister_notification($name);
+       }
+       elsif (TYPE_FLUSH == $type) {
+               return plugin_unregister_flush($name);
+       }
        elsif (defined $plugins[$type]) {
                lock %{$plugins[$type]};
                delete $plugins[$type]->{$name};
index 9eef6c1..9f7b869 100644 (file)
 /* this is defined in DynaLoader.a */
 void boot_DynaLoader (PerlInterpreter *, CV *);
 
+static XS (Collectd_plugin_register_read);
+static XS (Collectd_plugin_register_write);
+static XS (Collectd_plugin_register_log);
+static XS (Collectd_plugin_register_notification);
+static XS (Collectd_plugin_register_flush);
+static XS (Collectd_plugin_unregister_read);
+static XS (Collectd_plugin_unregister_write);
+static XS (Collectd_plugin_unregister_log);
+static XS (Collectd_plugin_unregister_notification);
+static XS (Collectd_plugin_unregister_flush);
 static XS (Collectd_plugin_register_ds);
 static XS (Collectd_plugin_unregister_ds);
 static XS (Collectd_plugin_dispatch_values);
@@ -113,6 +123,14 @@ static XS (Collectd_plugin_log);
 static XS (Collectd__fc_register);
 static XS (Collectd_call_by_name);
 
+static int perl_read (user_data_t *ud);
+static int perl_write (const data_set_t *ds, const value_list_t *vl,
+               user_data_t *user_data);
+static void perl_log (int level, const char *msg, user_data_t *user_data);
+static int perl_notify (const notification_t *notif, user_data_t *user_data);
+static int perl_flush (cdtime_t timeout, const char *identifier,
+               user_data_t *user_data);
+
 /*
  * private data types
  */
@@ -182,6 +200,18 @@ static struct {
        XS ((*f));
 } api[] =
 {
+       { "Collectd::plugin_register_read",       Collectd_plugin_register_read },
+       { "Collectd::plugin_register_write",      Collectd_plugin_register_write },
+       { "Collectd::plugin_register_log",        Collectd_plugin_register_log },
+       { "Collectd::plugin_register_notification",
+               Collectd_plugin_register_notification },
+       { "Collectd::plugin_register_flush",       Collectd_plugin_register_flush },
+       { "Collectd::plugin_unregister_read",     Collectd_plugin_unregister_read },
+       { "Collectd::plugin_unregister_write",    Collectd_plugin_unregister_write },
+       { "Collectd::plugin_unregister_log",      Collectd_plugin_unregister_log },
+       { "Collectd::plugin_unregister_notification",
+               Collectd_plugin_unregister_notification },
+       { "Collectd::plugin_unregister_flush",    Collectd_plugin_unregister_flush },
        { "Collectd::plugin_register_data_set",   Collectd_plugin_register_ds },
        { "Collectd::plugin_unregister_data_set", Collectd_plugin_unregister_ds },
        { "Collectd::plugin_dispatch_values",     Collectd_plugin_dispatch_values },
@@ -1010,7 +1040,7 @@ static int call_pv_locked (pTHX_ const char* sub_name)
                return 0;
        }
 
-       ret = call_pv (sub_name, G_SCALAR);
+       ret = call_pv (sub_name, G_SCALAR|G_EVAL);
 
        t->running = old_running;
        return ret;
@@ -1019,12 +1049,13 @@ static int call_pv_locked (pTHX_ const char* sub_name)
 /*
  * Call all working functions of the given type.
  */
-static int pplugin_call_all (pTHX_ int type, ...)
+static int pplugin_call (pTHX_ int type, ...)
 {
        int retvals = 0;
 
        va_list ap;
        int ret = 0;
+       char *subname;
 
        dSP;
 
@@ -1038,9 +1069,17 @@ static int pplugin_call_all (pTHX_ int type, ...)
 
        PUSHMARK (SP);
 
-       XPUSHs (sv_2mortal (newSViv ((IV)type)));
+       if (PLUGIN_READ == type) {
+               subname = va_arg(ap, char *);
+       }
+       else if (PLUGIN_WRITE == type) {
+               data_set_t   *ds;
+               value_list_t *vl;
 
-       if (PLUGIN_WRITE == type) {
+               AV *pds = newAV ();
+               HV *pvl = newHV ();
+
+               subname = va_arg(ap, char *);
                /*
                 * $_[0] = $plugin_type;
                 *
@@ -1066,12 +1105,6 @@ static int pplugin_call_all (pTHX_ int type, ...)
                 *   type_instance   => $type_instance
                 * };
                 */
-               data_set_t   *ds;
-               value_list_t *vl;
-
-               AV *pds = newAV ();
-               HV *pvl = newHV ();
-
                ds = va_arg (ap, data_set_t *);
                vl = va_arg (ap, value_list_t *);
 
@@ -1094,6 +1127,7 @@ static int pplugin_call_all (pTHX_ int type, ...)
                XPUSHs (sv_2mortal (newRV_noinc ((SV *)pvl)));
        }
        else if (PLUGIN_LOG == type) {
+               subname = va_arg(ap, char *);
                /*
                 * $_[0] = $level;
                 *
@@ -1103,6 +1137,10 @@ static int pplugin_call_all (pTHX_ int type, ...)
                XPUSHs (sv_2mortal (newSVpv (va_arg (ap, char *), 0)));
        }
        else if (PLUGIN_NOTIF == type) {
+               notification_t *n;
+               HV *notif = newHV ();
+
+               subname = va_arg(ap, char *);
                /*
                 * $_[0] =
                 * {
@@ -1116,9 +1154,6 @@ static int pplugin_call_all (pTHX_ int type, ...)
                 *   type_instance   => $type_instance
                 * };
                 */
-               notification_t *n;
-               HV *notif = newHV ();
-
                n = va_arg (ap, notification_t *);
 
                if (-1 == notification2hv (aTHX_ n, notif)) {
@@ -1132,7 +1167,7 @@ static int pplugin_call_all (pTHX_ int type, ...)
        }
        else if (PLUGIN_FLUSH == type) {
                cdtime_t timeout;
-
+               subname = va_arg(ap, char *);
                /*
                 * $_[0] = $timeout;
                 * $_[1] = $identifier;
@@ -1142,13 +1177,30 @@ static int pplugin_call_all (pTHX_ int type, ...)
                XPUSHs (sv_2mortal (newSVnv (CDTIME_T_TO_DOUBLE (timeout))));
                XPUSHs (sv_2mortal (newSVpv (va_arg (ap, char *), 0)));
        }
+       else if (PLUGIN_INIT == type) {
+               subname = "Collectd::plugin_call_all";
+               XPUSHs (sv_2mortal (newSViv ((IV)type)));
+       }
+       else if (PLUGIN_SHUTDOWN == type) {
+               subname = "Collectd::plugin_call_all";
+               XPUSHs (sv_2mortal (newSViv ((IV)type)));
+       }
+       else { /* Unknown type. Run 'plugin_call_all' and make compiler happy */
+               subname = "Collectd::plugin_call_all";
+               XPUSHs (sv_2mortal (newSViv ((IV)type)));
+       }
 
        PUTBACK;
 
-       retvals = call_pv_locked (aTHX_ "Collectd::plugin_call_all");
+       retvals = call_pv_locked (aTHX_ subname);
 
        SPAGAIN;
-       if (0 < retvals) {
+       if (SvTRUE(ERRSV)) {
+               if (PLUGIN_LOG != type)
+                       ERROR ("perl: %s error: %s", subname, SvPV_nolen(ERRSV));
+               ret = -1;
+       }
+       else if (0 < retvals) {
                SV *tmp = POPs;
                if (! SvTRUE (tmp))
                        ret = -1;
@@ -1160,7 +1212,7 @@ static int pplugin_call_all (pTHX_ int type, ...)
 
        va_end (ap);
        return ret;
-} /* static int pplugin_call_all (int, ...) */
+} /* static int pplugin_call (int, ...) */
 
 /*
  * collectd's Perl interpreter based thread implementation.
@@ -1399,7 +1451,11 @@ static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
        }
 
        SPAGAIN;
-       if (0 < retvals) {
+       if (SvTRUE(ERRSV)) {
+               ERROR ("perl: Collectd::fc_call error: %s", SvPV_nolen(ERRSV));
+               ret = -1;
+       }
+       else if (0 < retvals) {
                SV *tmp = POPs;
 
                /* the exec callbacks return a status, while
@@ -1568,6 +1624,169 @@ static target_proc_t ptarget = {
  * Exported Perl API.
  */
 
+static void _plugin_register_generic_userdata (pTHX, int type, const char *desc)
+{
+       int ret   = 0;
+       user_data_t userdata;
+       char cb_name[DATA_MAX_NAME_LEN];
+
+       dXSARGS;
+
+       if (2 != items) {
+               log_err ("Usage: Collectd::plugin_register_%s(pluginname, subname)",
+                                       desc);
+               XSRETURN_EMPTY;
+       }
+
+       if (! SvOK (ST (0))) {
+               log_err ("Collectd::plugin_register_%s(pluginname, subname): "
+                        "Invalid pluginname", desc);
+               XSRETURN_EMPTY;
+       }
+       if (! SvOK (ST (1))) {
+               log_err ("Collectd::plugin_register_%s(pluginname, subname): "
+                        "Invalid subname", desc);
+               XSRETURN_EMPTY;
+       }
+
+       log_debug ("Collectd::plugin_register_%s: "
+                       "plugin = \"%s\", sub = \"%s\"",
+                       desc, SvPV_nolen (ST (0)), SvPV_nolen (ST (1)));
+
+       ssnprintf (cb_name, sizeof (cb_name), "perl/%s", SvPV_nolen (ST (0)));
+       memset(&userdata, 0, sizeof(userdata));
+       userdata.data = strdup(SvPV_nolen (ST (1)));
+       userdata.free_func = free;
+
+       if (PLUGIN_READ == type) {
+               ret = plugin_register_complex_read(
+                       "perl",                /* group */
+                       cb_name,
+                       perl_read,
+                       plugin_get_interval(), /* Default interval */
+                       &userdata);
+       }
+       else if (PLUGIN_WRITE == type) {
+               ret = plugin_register_write(cb_name, perl_write, &userdata);
+       }
+       else if (PLUGIN_LOG == type) {
+               ret = plugin_register_log(cb_name, perl_log, &userdata);
+       }
+       else if (PLUGIN_NOTIF == type) {
+               ret = plugin_register_notification(cb_name, perl_notify, &userdata);
+       }
+       else if (PLUGIN_FLUSH == type) {
+               ret = plugin_register_flush(cb_name, perl_flush, &userdata);
+       }
+       else {
+               ret = -1;
+       }
+
+       if (0 == ret)
+               XSRETURN_YES;
+       else {
+               free (userdata.data);
+               XSRETURN_EMPTY;
+       }
+} /* static void _plugin_register_generic_userdata ( ... ) */
+
+/*
+ * Collectd::plugin_register_TYPE (pluginname, subname).
+ *
+ * pluginname:
+ *   name of the perl plugin
+ *
+ * subname:
+ *   name of the plugin's subroutine that does the work
+ */
+
+static XS (Collectd_plugin_register_read) {
+       return _plugin_register_generic_userdata(aTHX, PLUGIN_READ, "read");
+}
+
+static XS (Collectd_plugin_register_write) {
+       return _plugin_register_generic_userdata(aTHX, PLUGIN_WRITE, "write");
+}
+
+static XS (Collectd_plugin_register_log) {
+       return _plugin_register_generic_userdata(aTHX, PLUGIN_LOG, "log");
+}
+
+static XS (Collectd_plugin_register_notification) {
+       return _plugin_register_generic_userdata(aTHX, PLUGIN_NOTIF, "notification");
+}
+
+static XS (Collectd_plugin_register_flush) {
+       return _plugin_register_generic_userdata(aTHX, PLUGIN_FLUSH, "flush");
+}
+
+typedef int perl_unregister_function_t(const char *name);
+
+static void _plugin_unregister_generic (pTHX,
+                               perl_unregister_function_t *unreg, const char *desc)
+{
+       char cb_name[DATA_MAX_NAME_LEN];
+
+       dXSARGS;
+
+       if (1 != items) {
+               log_err ("Usage: Collectd::plugin_unregister_%s(pluginname)", desc);
+               XSRETURN_EMPTY;
+       }
+
+       if (! SvOK (ST (0))) {
+               log_err ("Collectd::plugin_unregister_%s(pluginname): "
+                        "Invalid pluginname", desc);
+               XSRETURN_EMPTY;
+       }
+
+       log_debug ("Collectd::plugin_unregister_%s: plugin = \"%s\"",
+                       desc, SvPV_nolen (ST (0)));
+
+       ssnprintf (cb_name, sizeof (cb_name), "perl/%s", SvPV_nolen (ST (0)));
+
+       unreg(cb_name);
+
+       XSRETURN_EMPTY;
+
+       return;
+} /* static void _plugin_unregister_generic ( ... ) */
+
+/*
+ * Collectd::plugin_unregister_TYPE (pluginname).
+ *
+ * TYPE:
+ *   type of callback to be unregistered: read, write, log, notification, flush
+ *
+ * pluginname:
+ *   name of the perl plugin
+ */
+
+static XS (Collectd_plugin_unregister_read) {
+       return _plugin_unregister_generic(aTHX,
+                               plugin_unregister_read, "read");
+}
+
+static XS (Collectd_plugin_unregister_write) {
+       return _plugin_unregister_generic(aTHX,
+                               plugin_unregister_write, "write");
+}
+
+static XS (Collectd_plugin_unregister_log) {
+       return _plugin_unregister_generic(aTHX,
+                               plugin_unregister_log, "log");
+}
+
+static XS (Collectd_plugin_unregister_notification) {
+       return _plugin_unregister_generic(aTHX,
+                               plugin_unregister_notification, "notification");
+}
+
+static XS (Collectd_plugin_unregister_flush) {
+       return _plugin_unregister_generic(aTHX,
+                               plugin_unregister_flush, "flush");
+}
+
 /*
  * Collectd::plugin_register_data_set (type, dataset).
  *
@@ -1955,14 +2174,14 @@ static int perl_init (void)
        assert (aTHX == perl_threads->head->interp);
        pthread_mutex_lock (&perl_threads->mutex);
 
-       status = pplugin_call_all (aTHX_ PLUGIN_INIT);
+       status = pplugin_call (aTHX_ PLUGIN_INIT);
 
        pthread_mutex_unlock (&perl_threads->mutex);
 
        return status;
 } /* static int perl_init (void) */
 
-static int perl_read (void)
+static int perl_read (user_data_t *user_data)
 {
        dTHX;
 
@@ -1986,11 +2205,12 @@ static int perl_read (void)
 
        log_debug ("perl_read: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
-       return pplugin_call_all (aTHX_ PLUGIN_READ);
-} /* static int perl_read (void) */
+
+       return pplugin_call (aTHX_ PLUGIN_READ, user_data->data);
+} /* static int perl_read (user_data_t *user_data) */
 
 static int perl_write (const data_set_t *ds, const value_list_t *vl,
-               user_data_t __attribute__((unused)) *user_data)
+               user_data_t *user_data)
 {
        int status;
        dTHX;
@@ -2016,7 +2236,7 @@ static int perl_write (const data_set_t *ds, const value_list_t *vl,
 
        log_debug ("perl_write: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
-       status = pplugin_call_all (aTHX_ PLUGIN_WRITE, ds, vl);
+       status = pplugin_call (aTHX_ PLUGIN_WRITE, user_data->data, ds, vl);
 
        if (aTHX == perl_threads->head->interp)
                pthread_mutex_unlock (&perl_threads->mutex);
@@ -2025,7 +2245,7 @@ static int perl_write (const data_set_t *ds, const value_list_t *vl,
 } /* static int perl_write (const data_set_t *, const value_list_t *) */
 
 static void perl_log (int level, const char *msg,
-               user_data_t __attribute__((unused)) *user_data)
+               user_data_t *user_data)
 {
        dTHX;
 
@@ -2050,7 +2270,7 @@ static void perl_log (int level, const char *msg,
        if (aTHX == perl_threads->head->interp)
                pthread_mutex_lock (&perl_threads->mutex);
 
-       pplugin_call_all (aTHX_ PLUGIN_LOG, level, msg);
+       pplugin_call (aTHX_ PLUGIN_LOG, user_data->data, level, msg);
 
        if (aTHX == perl_threads->head->interp)
                pthread_mutex_unlock (&perl_threads->mutex);
@@ -2058,8 +2278,7 @@ static void perl_log (int level, const char *msg,
        return;
 } /* static void perl_log (int, const char *) */
 
-static int perl_notify (const notification_t *notif,
-               user_data_t __attribute__((unused)) *user_data)
+static int perl_notify (const notification_t *notif, user_data_t *user_data)
 {
        dTHX;
 
@@ -2075,11 +2294,11 @@ static int perl_notify (const notification_t *notif,
 
                aTHX = t->interp;
        }
-       return pplugin_call_all (aTHX_ PLUGIN_NOTIF, notif);
+       return pplugin_call (aTHX_ PLUGIN_NOTIF, user_data->data, notif);
 } /* static int perl_notify (const notification_t *) */
 
 static int perl_flush (cdtime_t timeout, const char *identifier,
-               user_data_t __attribute__((unused)) *user_data)
+               user_data_t *user_data)
 {
        dTHX;
 
@@ -2095,7 +2314,7 @@ static int perl_flush (cdtime_t timeout, const char *identifier,
 
                aTHX = t->interp;
        }
-       return pplugin_call_all (aTHX_ PLUGIN_FLUSH, timeout, identifier);
+       return pplugin_call (aTHX_ PLUGIN_FLUSH, user_data->data, timeout, identifier);
 } /* static int perl_flush (const int) */
 
 static int perl_shutdown (void)
@@ -2106,6 +2325,7 @@ static int perl_shutdown (void)
        dTHX;
 
        plugin_unregister_complex_config ("perl");
+       plugin_unregister_read_group ("perl");
 
        if (NULL == perl_threads)
                return 0;
@@ -2121,14 +2341,9 @@ static int perl_shutdown (void)
        log_debug ("perl_shutdown: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
 
-       plugin_unregister_log ("perl");
-       plugin_unregister_notification ("perl");
        plugin_unregister_init ("perl");
-       plugin_unregister_read ("perl");
-       plugin_unregister_write ("perl");
-       plugin_unregister_flush ("perl");
 
-       ret = pplugin_call_all (aTHX_ PLUGIN_SHUTDOWN);
+       ret = pplugin_call (aTHX_ PLUGIN_SHUTDOWN);
 
        pthread_mutex_lock (&perl_threads->mutex);
        t = perl_threads->tail;
@@ -2153,6 +2368,10 @@ static int perl_shutdown (void)
                        pthread_kill (thr->pthread, SIGTERM);
                        ERROR ("perl shutdown: Thread hangs inside Perl. Thread killed.");
                }
+               /* Mark as running to avoid deadlock:
+                  c_ithread_destroy -> log_debug -> perl_log()
+               */
+               thr->running = 1; 
                c_ithread_destroy (thr);
        }
 
@@ -2338,15 +2557,7 @@ static int init_pi (int argc, char **argv)
 
        perl_run (aTHX);
 
-       plugin_register_log ("perl", perl_log, /* user_data = */ NULL);
-       plugin_register_notification ("perl", perl_notify,
-                       /* user_data = */ NULL);
        plugin_register_init ("perl", perl_init);
-
-       plugin_register_read ("perl", perl_read);
-
-       plugin_register_write ("perl", perl_write, /* user_data = */ NULL);
-       plugin_register_flush ("perl", perl_flush, /* user_data = */ NULL);
        plugin_register_shutdown ("perl", perl_shutdown);
        return 0;
 } /* static int init_pi (const char **, const int) */