Merged branch 'collectd-4.10' into collectd-5.4.
[collectd.git] / src / perl.c
index f8a4822..d0c7e37 100644 (file)
@@ -24,7 +24,7 @@
  * interface for collectd plugins written in perl.
  */
 
-/* do not automatically get the thread specific perl interpreter */
+/* do not automatically get the thread specific Perl interpreter */
 #define PERL_NO_GET_CONTEXT
 
 #define DONT_POISON_SPRINTF_YET 1
@@ -102,6 +102,7 @@ void boot_DynaLoader (PerlInterpreter *, CV *);
 static XS (Collectd_plugin_register_ds);
 static XS (Collectd_plugin_unregister_ds);
 static XS (Collectd_plugin_dispatch_values);
+static XS (Collectd_plugin_get_interval);
 static XS (Collectd__plugin_write);
 static XS (Collectd__plugin_flush);
 static XS (Collectd_plugin_dispatch_notification);
@@ -116,6 +117,9 @@ static XS (Collectd_call_by_name);
 typedef struct c_ithread_s {
        /* the thread's Perl interpreter */
        PerlInterpreter *interp;
+       _Bool running;  /* thread is inside Perl interpreter */
+       _Bool shutdown;
+       pthread_t pthread;
 
        /* double linked list of threads */
        struct c_ithread_s *prev;
@@ -132,6 +136,7 @@ typedef struct {
 #endif /* COLLECT_DEBUG */
 
        pthread_mutex_t mutex;
+       pthread_mutexattr_t mutexattr;
 } c_ithread_list_t;
 
 /* name / user_data for Perl matches / targets */
@@ -177,6 +182,7 @@ static struct {
        { "Collectd::plugin_register_data_set",   Collectd_plugin_register_ds },
        { "Collectd::plugin_unregister_data_set", Collectd_plugin_unregister_ds },
        { "Collectd::plugin_dispatch_values",     Collectd_plugin_dispatch_values },
+       { "Collectd::plugin_get_interval",        Collectd_plugin_get_interval },
        { "Collectd::_plugin_write",              Collectd__plugin_write },
        { "Collectd::_plugin_flush",              Collectd__plugin_flush },
        { "Collectd::plugin_dispatch_notification",
@@ -508,7 +514,6 @@ static int av2notification_meta (pTHX_ AV *array, notification_meta_t **meta)
                if (NULL == (tmp = hv_fetch (hash, "value", 5, 0))) {
                        log_warn ("av2notification_meta: Skipping invalid "
                                        "meta information.");
-                       free ((*m)->name);
                        free (*m);
                        continue;
                }
@@ -1001,6 +1006,32 @@ static int pplugin_dispatch_notification (pTHX_ HV *notif)
 } /* static int pplugin_dispatch_notification (HV *) */
 
 /*
+ * Call perl sub with thread locking flags handled.
+ */
+static int call_pv_locked (pTHX_ const char* sub_name)
+{
+       _Bool old_running;
+       int ret;
+
+       c_ithread_t *t = (c_ithread_t *)pthread_getspecific(perl_thr_key);
+       if (t == NULL) /* thread destroyed */
+               return 0;
+
+       old_running = t->running;
+       t->running = 1;
+
+       if (t->shutdown) {
+               t->running = old_running;
+               return 0;
+       }
+
+       ret = call_pv (sub_name, G_SCALAR);
+
+       t->running = old_running;
+       return ret;
+} /* static int call_pv_locked (pTHX, *sub_name) */
+
+/*
  * Call all working functions of the given type.
  */
 static int pplugin_call_all (pTHX_ int type, ...)
@@ -1129,7 +1160,7 @@ static int pplugin_call_all (pTHX_ int type, ...)
 
        PUTBACK;
 
-       retvals = call_pv ("Collectd::plugin_call_all", G_SCALAR);
+       retvals = call_pv_locked (aTHX_ "Collectd::plugin_call_all");
 
        SPAGAIN;
        if (0 < retvals) {
@@ -1147,7 +1178,7 @@ static int pplugin_call_all (pTHX_ int type, ...)
 } /* static int pplugin_call_all (int, ...) */
 
 /*
- * collectd's perl interpreter based thread implementation.
+ * collectd's Perl interpreter based thread implementation.
  *
  * This has been inspired by Perl's ithreads introduced in version 5.6.0.
  */
@@ -1201,7 +1232,10 @@ static void c_ithread_destructor (void *arg)
 
        /* the ithread no longer exists */
        if (NULL == t)
+       {
+               pthread_mutex_unlock (&perl_threads->mutex);
                return;
+       }
 
        c_ithread_destroy (ithread);
 
@@ -1247,6 +1281,9 @@ static c_ithread_t *c_ithread_create (PerlInterpreter *base)
                t->prev = perl_threads->tail;
        }
 
+       t->pthread = pthread_self();
+       t->running = 0;
+       t->shutdown = 0;
        perl_threads->tail = t;
 
        pthread_setspecific (perl_thr_key, (const void *)t);
@@ -1367,7 +1404,7 @@ static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
 
        PUTBACK;
 
-       retvals = call_pv ("Collectd::fc_call", G_SCALAR);
+       retvals = call_pv_locked (aTHX_ "Collectd::fc_call");
 
        if ((FC_CB_EXEC == cb_type) && (meta != NULL)) {
                assert (pmeta != NULL);
@@ -1642,15 +1679,15 @@ static XS (Collectd_plugin_dispatch_values)
 
        values = ST (/* stack index = */ 0);
 
+       if (NULL == values)
+               XSRETURN_EMPTY;
+
        /* Make sure the argument is a hash reference. */
        if (! (SvROK (values) && (SVt_PVHV == SvTYPE (SvRV (values))))) {
                log_err ("Collectd::plugin_dispatch_values: Invalid values.");
                XSRETURN_EMPTY;
        }
 
-       if (NULL == values)
-               XSRETURN_EMPTY;
-
        ret = pplugin_dispatch_values (aTHX_ (HV *)SvRV (values));
 
        if (0 == ret)
@@ -1659,6 +1696,21 @@ static XS (Collectd_plugin_dispatch_values)
                XSRETURN_EMPTY;
 } /* static XS (Collectd_plugin_dispatch_values) */
 
+/*
+ * Collectd::plugin_get_interval ().
+ */
+static XS (Collectd_plugin_get_interval)
+{
+       dXSARGS;
+
+       /* make sure we don't get any unused variable warnings for 'items';
+        * don't abort, though */
+       if (items)
+               log_err ("Usage: Collectd::plugin_get_interval()");
+
+       XSRETURN_NV ((NV) CDTIME_T_TO_DOUBLE (plugin_get_interval ()));
+} /* static XS (Collectd_plugin_get_interval) */
+
 /* Collectd::plugin_write (plugin, ds, vl).
  *
  * plugin:
@@ -1892,6 +1944,7 @@ static XS (Collectd_call_by_name)
 
 static int perl_init (void)
 {
+       int status;
        dTHX;
 
        if (NULL == perl_threads)
@@ -1909,7 +1962,19 @@ static int perl_init (void)
 
        log_debug ("perl_init: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
-       return pplugin_call_all (aTHX_ PLUGIN_INIT);
+
+       /* Lock the base thread to avoid race conditions with c_ithread_create().
+        * See https://github.com/collectd/collectd/issues/9 and
+        *     https://github.com/collectd/collectd/issues/1706 for details.
+       */
+       assert (aTHX == perl_threads->head->interp);
+       pthread_mutex_lock (&perl_threads->mutex);
+
+       status = pplugin_call_all (aTHX_ PLUGIN_INIT);
+
+       pthread_mutex_unlock (&perl_threads->mutex);
+
+       return status;
 } /* static int perl_init (void) */
 
 static int perl_read (void)
@@ -1929,6 +1994,11 @@ static int perl_read (void)
                aTHX = t->interp;
        }
 
+       /* Assert that we're not running as the base thread. Otherwise, we might
+        * run into concurrency issues with c_ithread_create(). See
+        * https://github.com/collectd/collectd/issues/9 for details. */
+       assert (aTHX != perl_threads->head->interp);
+
        log_debug ("perl_read: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
        return pplugin_call_all (aTHX_ PLUGIN_READ);
@@ -1937,6 +2007,7 @@ static int perl_read (void)
 static int perl_write (const data_set_t *ds, const value_list_t *vl,
                user_data_t __attribute__((unused)) *user_data)
 {
+       int status;
        dTHX;
 
        if (NULL == perl_threads)
@@ -1952,9 +2023,20 @@ static int perl_write (const data_set_t *ds, const value_list_t *vl,
                aTHX = t->interp;
        }
 
+       /* Lock the base thread if this is not called from one of the read threads
+        * to avoid race conditions with c_ithread_create(). See
+        * https://github.com/collectd/collectd/issues/9 for details. */
+       if (aTHX == perl_threads->head->interp)
+               pthread_mutex_lock (&perl_threads->mutex);
+
        log_debug ("perl_write: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
-       return pplugin_call_all (aTHX_ PLUGIN_WRITE, ds, vl);
+       status = pplugin_call_all (aTHX_ PLUGIN_WRITE, ds, vl);
+
+       if (aTHX == perl_threads->head->interp)
+               pthread_mutex_unlock (&perl_threads->mutex);
+
+       return status;
 } /* static int perl_write (const data_set_t *, const value_list_t *) */
 
 static void perl_log (int level, const char *msg,
@@ -1975,7 +2057,19 @@ static void perl_log (int level, const char *msg,
                aTHX = t->interp;
        }
 
+       /* Lock the base thread if this is not called from one of the read threads
+        * to avoid race conditions with c_ithread_create(). See
+        * https://github.com/collectd/collectd/issues/9 for details.
+       */
+
+       if (aTHX == perl_threads->head->interp)
+               pthread_mutex_lock (&perl_threads->mutex);
+
        pplugin_call_all (aTHX_ PLUGIN_LOG, level, msg);
+
+       if (aTHX == perl_threads->head->interp)
+               pthread_mutex_unlock (&perl_threads->mutex);
+
        return;
 } /* static void perl_log (int, const char *) */
 
@@ -2033,7 +2127,7 @@ static int perl_shutdown (void)
                return 0;
 
        if (NULL == aTHX) {
-               c_ithread_t *t = NULL;
+               t = NULL;
 
                pthread_mutex_lock (&perl_threads->mutex);
                t = c_ithread_create (perl_threads->head->interp);
@@ -2058,17 +2152,31 @@ static int perl_shutdown (void)
        t = perl_threads->tail;
 
        while (NULL != t) {
+               struct timespec ts_wait;
                c_ithread_t *thr = t;
 
                /* the pointer has to be advanced before destroying
                 * the thread as this will free the memory */
                t = t->prev;
 
+               thr->shutdown = 1;
+               if (thr->running) {
+                       /* Give some time to thread to exit from Perl interpreter */
+                       WARNING ("perl shutdown: Thread is running inside Perl. Waiting.");
+                       ts_wait.tv_sec = 0;
+                       ts_wait.tv_nsec = 500000;
+                       nanosleep (&ts_wait, NULL);
+               }
+               if (thr->running) {
+                       pthread_kill (thr->pthread, SIGTERM);
+                       ERROR ("perl shutdown: Thread hangs inside Perl. Thread killed.");
+               }
                c_ithread_destroy (thr);
        }
 
        pthread_mutex_unlock (&perl_threads->mutex);
        pthread_mutex_destroy (&perl_threads->mutex);
+       pthread_mutexattr_destroy (&perl_threads->mutexattr);
 
        sfree (perl_threads);
 
@@ -2103,23 +2211,20 @@ static int g_pv_set (pTHX_ SV *var, MAGIC *mg)
 
 static int g_interval_get (pTHX_ SV *var, MAGIC *mg)
 {
-       cdtime_t *interval = (cdtime_t *)mg->mg_ptr;
-       double nv;
-
-       nv = CDTIME_T_TO_DOUBLE (*interval);
-
-       sv_setnv (var, nv);
+       log_warn ("Accessing $interval_g is deprecated (and might not "
+                       "give the desired results) - plugin_get_interval() should "
+                       "be used instead.");
+       sv_setnv (var, CDTIME_T_TO_DOUBLE (interval_g));
        return 0;
 } /* static int g_interval_get (pTHX_ SV *, MAGIC *) */
 
 static int g_interval_set (pTHX_ SV *var, MAGIC *mg)
 {
-       cdtime_t *interval = (cdtime_t *)mg->mg_ptr;
-       double nv;
-
-       nv = (double)SvNV (var);
-
-       *interval = DOUBLE_TO_CDTIME_T (nv);
+       double nv = (double)SvNV (var);
+       log_warn ("Accessing $interval_g is deprecated (and might not "
+                       "give the desired results) - plugin_get_interval() should "
+                       "be used instead.");
+       interval_g = DOUBLE_TO_CDTIME_T (nv);
        return 0;
 } /* static int g_interval_set (pTHX_ SV *, MAGIC *) */
 
@@ -2175,7 +2280,7 @@ static void xs_init (pTHX)
        tmp = get_sv ("Collectd::interval_g", /* create = */ 1);
        sv_magicext (tmp, NULL, /* how = */ PERL_MAGIC_ext,
                        /* vtbl = */ &g_interval_vtbl,
-                       /* name = */ (char *) &interval_g, /* namelen = */ 0);
+                       /* name = */ NULL, /* namelen = */ 0);
 
        return;
 } /* static void xs_init (pTHX) */
@@ -2215,7 +2320,9 @@ static int init_pi (int argc, char **argv)
        perl_threads = (c_ithread_list_t *)smalloc (sizeof (c_ithread_list_t));
        memset (perl_threads, 0, sizeof (c_ithread_list_t));
 
-       pthread_mutex_init (&perl_threads->mutex, NULL);
+       pthread_mutexattr_init(&perl_threads->mutexattr);
+       pthread_mutexattr_settype(&perl_threads->mutexattr, PTHREAD_MUTEX_RECURSIVE);
+       pthread_mutex_init (&perl_threads->mutex, &perl_threads->mutexattr);
        /* locking the mutex should not be necessary at this point
         * but let's just do it for the sake of completeness */
        pthread_mutex_lock (&perl_threads->mutex);
@@ -2296,7 +2403,7 @@ static int perl_config_loadplugin (pTHX_ oconfig_item_t *ci)
 
        aTHX = perl_threads->head->interp;
 
-       log_debug ("perl_config: loading perl plugin \"%s\"", value);
+       log_debug ("perl_config: Loading Perl plugin \"%s\"", value);
        load_module (PERL_LOADMOD_NOIMPORT,
                        newSVpv (module_name, strlen (module_name)), Nullsv);
        return 0;
@@ -2469,7 +2576,10 @@ static int perl_config (oconfig_item_t *ci)
                int current_status = 0;
 
                if (NULL != perl_threads)
-                       aTHX = PERL_GET_CONTEXT;
+               {
+                       if ((aTHX = PERL_GET_CONTEXT) == NULL)
+                               return -1;
+               }
 
                if (0 == strcasecmp (c->key, "LoadPlugin"))
                        current_status = perl_config_loadplugin (aTHX_ c);