collectd-tg: Fix sleep interval.
[collectd.git] / src / perl.c
index 27214bb..9eef6c1 100644 (file)
  * interface for collectd plugins written in perl.
  */
 
-/* do not automatically get the thread specific perl interpreter */
+/* do not automatically get the thread specific Perl interpreter */
 #define PERL_NO_GET_CONTEXT
 
 #define DONT_POISON_SPRINTF_YET 1
 #include "collectd.h"
-#undef DONT_POISON_SPRINTF_YET
 
-#include "configfile.h"
+#undef DONT_POISON_SPRINTF_YET
 
 #if HAVE_STDBOOL_H
 # include <stdbool.h>
@@ -63,8 +62,6 @@
 
 #include "filter_chain.h"
 
-#include <pthread.h>
-
 #if !defined(USE_ITHREADS)
 # error "Perl does not support ithreads!"
 #endif /* !defined(USE_ITHREADS) */
@@ -123,6 +120,9 @@ static XS (Collectd_call_by_name);
 typedef struct c_ithread_s {
        /* the thread's Perl interpreter */
        PerlInterpreter *interp;
+       _Bool running;  /* thread is inside Perl interpreter */
+       _Bool shutdown;
+       pthread_t pthread;
 
        /* double linked list of threads */
        struct c_ithread_s *prev;
@@ -139,6 +139,7 @@ typedef struct {
 #endif /* COLLECT_DEBUG */
 
        pthread_mutex_t mutex;
+       pthread_mutexattr_t mutexattr;
 } c_ithread_list_t;
 
 /* name / user_data for Perl matches / targets */
@@ -306,7 +307,6 @@ static int hv2data_source (pTHX_ HV *hash, data_source_t *ds)
 static size_t av2value (pTHX_ char *name, AV *array, value_t *value, size_t array_len)
 {
        const data_set_t *ds;
-       size_t i;
 
        if ((NULL == name) || (NULL == array) || (NULL == value) || (array_len == 0))
                return 0;
@@ -326,7 +326,7 @@ static size_t av2value (pTHX_ char *name, AV *array, value_t *value, size_t arra
                                name, array_len, ds->ds_num);
        }
 
-       for (i = 0; i < ds->ds_num; ++i) {
+       for (size_t i = 0; i < ds->ds_num; ++i) {
                SV **tmp = av_fetch (array, i, 0);
 
                if (NULL != tmp) {
@@ -425,7 +425,7 @@ static int hv2value_list (pTHX_ HV *hash, value_list_t *vl)
 
 static int av2data_set (pTHX_ AV *array, char *name, data_set_t *ds)
 {
-       int len, i;
+       int len;
 
        if ((NULL == array) || (NULL == name) || (NULL == ds))
                return -1;
@@ -440,7 +440,7 @@ static int av2data_set (pTHX_ AV *array, char *name, data_set_t *ds)
        ds->ds = smalloc ((len + 1) * sizeof (*ds->ds));
        ds->ds_num = len + 1;
 
-       for (i = 0; i <= len; ++i) {
+       for (int i = 0; i <= len; ++i) {
                SV **elem = av_fetch (array, i, 0);
 
                if (NULL == elem) {
@@ -484,9 +484,8 @@ static int av2notification_meta (pTHX_ AV *array, notification_meta_t **meta)
        notification_meta_t **m = meta;
 
        int len = av_len (array);
-       int i;
 
-       for (i = 0; i <= len; ++i) {
+       for (int i = 0; i <= len; ++i) {
                SV **tmp = av_fetch (array, i, 0);
                HV  *hash;
 
@@ -602,14 +601,12 @@ static int hv2notification (pTHX_ HV *hash, notification_t *n)
 
 static int data_set2av (pTHX_ data_set_t *ds, AV *array)
 {
-       size_t i;
-
        if ((NULL == ds) || (NULL == array))
                return -1;
 
        av_extend (array, ds->ds_num);
 
-       for (i = 0; i < ds->ds_num; ++i) {
+       for (size_t i = 0; i < ds->ds_num; ++i) {
                HV *source = newHV ();
 
                if (NULL == hv_store (source, "name", 4,
@@ -709,7 +706,6 @@ static int value_list2hv (pTHX_ value_list_t *vl, data_set_t *ds, HV *hash)
 static int notification_meta2av (pTHX_ notification_meta_t *meta, AV *array)
 {
        int meta_num = 0;
-       int i;
 
        while (meta) {
                ++meta_num;
@@ -718,7 +714,7 @@ static int notification_meta2av (pTHX_ notification_meta_t *meta, AV *array)
 
        av_extend (array, meta_num);
 
-       for (i = 0; NULL != meta; meta = meta->next, ++i) {
+       for (int i = 0; NULL != meta; meta = meta->next, ++i) {
                HV *m = newHV ();
                SV *value;
 
@@ -805,8 +801,6 @@ static int notification2hv (pTHX_ notification_t *n, HV *hash)
 
 static int oconfig_item2hv (pTHX_ oconfig_item_t *ci, HV *hash)
 {
-       int i;
-
        AV *values;
        AV *children;
 
@@ -823,7 +817,7 @@ static int oconfig_item2hv (pTHX_ oconfig_item_t *ci, HV *hash)
                return -1;
        }
 
-       for (i = 0; i < ci->values_num; ++i) {
+       for (int i = 0; i < ci->values_num; ++i) {
                SV *value;
 
                switch (ci->values[i].type) {
@@ -860,7 +854,7 @@ static int oconfig_item2hv (pTHX_ oconfig_item_t *ci, HV *hash)
                return -1;
        }
 
-       for (i = 0; i < ci->children_num; ++i) {
+       for (int i = 0; i < ci->children_num; ++i) {
                HV *child = newHV ();
 
                if (0 != oconfig_item2hv (aTHX_ ci->children + i, child)) {
@@ -981,15 +975,13 @@ static int pplugin_write (pTHX_ const char *plugin, AV *data_set, HV *values)
  */
 static int pplugin_dispatch_notification (pTHX_ HV *notif)
 {
-       notification_t n;
+       notification_t n = { 0 };
 
        int ret;
 
        if (NULL == notif)
                return -1;
 
-       memset (&n, 0, sizeof (n));
-
        if (0 != hv2notification (aTHX_ notif, &n))
                return -1;
 
@@ -999,6 +991,32 @@ static int pplugin_dispatch_notification (pTHX_ HV *notif)
 } /* static int pplugin_dispatch_notification (HV *) */
 
 /*
+ * Call perl sub with thread locking flags handled.
+ */
+static int call_pv_locked (pTHX_ const char* sub_name)
+{
+       _Bool old_running;
+       int ret;
+
+       c_ithread_t *t = (c_ithread_t *)pthread_getspecific(perl_thr_key);
+       if (t == NULL) /* thread destroyed */
+               return 0;
+
+       old_running = t->running;
+       t->running = 1;
+
+       if (t->shutdown) {
+               t->running = old_running;
+               return 0;
+       }
+
+       ret = call_pv (sub_name, G_SCALAR);
+
+       t->running = old_running;
+       return ret;
+} /* static int call_pv_locked (pTHX, *sub_name) */
+
+/*
  * Call all working functions of the given type.
  */
 static int pplugin_call_all (pTHX_ int type, ...)
@@ -1127,7 +1145,7 @@ static int pplugin_call_all (pTHX_ int type, ...)
 
        PUTBACK;
 
-       retvals = call_pv ("Collectd::plugin_call_all", G_SCALAR);
+       retvals = call_pv_locked (aTHX_ "Collectd::plugin_call_all");
 
        SPAGAIN;
        if (0 < retvals) {
@@ -1145,7 +1163,7 @@ static int pplugin_call_all (pTHX_ int type, ...)
 } /* static int pplugin_call_all (int, ...) */
 
 /*
- * collectd's perl interpreter based thread implementation.
+ * collectd's Perl interpreter based thread implementation.
  *
  * This has been inspired by Perl's ithreads introduced in version 5.6.0.
  */
@@ -1248,6 +1266,9 @@ static c_ithread_t *c_ithread_create (PerlInterpreter *base)
                t->prev = perl_threads->tail;
        }
 
+       t->pthread = pthread_self();
+       t->running = 0;
+       t->shutdown = 0;
        perl_threads->tail = t;
 
        pthread_setspecific (perl_thr_key, (const void *)t);
@@ -1368,7 +1389,7 @@ static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
 
        PUTBACK;
 
-       retvals = call_pv ("Collectd::fc_call", G_SCALAR);
+       retvals = call_pv_locked (aTHX_ "Collectd::fc_call");
 
        if ((FC_CB_EXEC == cb_type) && (meta != NULL)) {
                assert (pmeta != NULL);
@@ -1908,6 +1929,7 @@ static XS (Collectd_call_by_name)
 
 static int perl_init (void)
 {
+       int status;
        dTHX;
 
        if (NULL == perl_threads)
@@ -1925,7 +1947,19 @@ static int perl_init (void)
 
        log_debug ("perl_init: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
-       return pplugin_call_all (aTHX_ PLUGIN_INIT);
+
+       /* Lock the base thread to avoid race conditions with c_ithread_create().
+        * See https://github.com/collectd/collectd/issues/9 and
+        *     https://github.com/collectd/collectd/issues/1706 for details.
+       */
+       assert (aTHX == perl_threads->head->interp);
+       pthread_mutex_lock (&perl_threads->mutex);
+
+       status = pplugin_call_all (aTHX_ PLUGIN_INIT);
+
+       pthread_mutex_unlock (&perl_threads->mutex);
+
+       return status;
 } /* static int perl_init (void) */
 
 static int perl_read (void)
@@ -2010,7 +2044,9 @@ static void perl_log (int level, const char *msg,
 
        /* Lock the base thread if this is not called from one of the read threads
         * to avoid race conditions with c_ithread_create(). See
-        * https://github.com/collectd/collectd/issues/9 for details. */
+        * https://github.com/collectd/collectd/issues/9 for details.
+       */
+
        if (aTHX == perl_threads->head->interp)
                pthread_mutex_lock (&perl_threads->mutex);
 
@@ -2064,9 +2100,8 @@ static int perl_flush (cdtime_t timeout, const char *identifier,
 
 static int perl_shutdown (void)
 {
-       c_ithread_t *t = NULL;
-
-       int ret = 0;
+       c_ithread_t *t;
+       int ret;
 
        dTHX;
 
@@ -2076,8 +2111,6 @@ static int perl_shutdown (void)
                return 0;
 
        if (NULL == aTHX) {
-               t = NULL;
-
                pthread_mutex_lock (&perl_threads->mutex);
                t = c_ithread_create (perl_threads->head->interp);
                pthread_mutex_unlock (&perl_threads->mutex);
@@ -2101,17 +2134,31 @@ static int perl_shutdown (void)
        t = perl_threads->tail;
 
        while (NULL != t) {
+               struct timespec ts_wait;
                c_ithread_t *thr = t;
 
                /* the pointer has to be advanced before destroying
                 * the thread as this will free the memory */
                t = t->prev;
 
+               thr->shutdown = 1;
+               if (thr->running) {
+                       /* Give some time to thread to exit from Perl interpreter */
+                       WARNING ("perl shutdown: Thread is running inside Perl. Waiting.");
+                       ts_wait.tv_sec = 0;
+                       ts_wait.tv_nsec = 500000;
+                       nanosleep (&ts_wait, NULL);
+               }
+               if (thr->running) {
+                       pthread_kill (thr->pthread, SIGTERM);
+                       ERROR ("perl shutdown: Thread hangs inside Perl. Thread killed.");
+               }
                c_ithread_destroy (thr);
        }
 
        pthread_mutex_unlock (&perl_threads->mutex);
        pthread_mutex_destroy (&perl_threads->mutex);
+       pthread_mutexattr_destroy (&perl_threads->mutexattr);
 
        sfree (perl_threads);
 
@@ -2183,21 +2230,19 @@ static void xs_init (pTHX)
        SV   *tmp   = NULL;
        char *file  = __FILE__;
 
-       int i = 0;
-
        dXSUB_SYS;
 
        /* enable usage of Perl modules using shared libraries */
        newXS ("DynaLoader::boot_DynaLoader", boot_DynaLoader, file);
 
        /* register API */
-       for (i = 0; NULL != api[i].f; ++i)
+       for (int i = 0; NULL != api[i].f; ++i)
                newXS (api[i].name, api[i].f, file);
 
        stash = gv_stashpv ("Collectd", 1);
 
        /* export "constants" */
-       for (i = 0; '\0' != constants[i].name[0]; ++i)
+       for (int i = 0; '\0' != constants[i].name[0]; ++i)
                newCONSTSUB (stash, constants[i].name, newSViv (constants[i].value));
 
        /* export global variables
@@ -2206,7 +2251,7 @@ static void xs_init (pTHX)
         * accessing any such variable (this is basically the same as using
         * tie() in Perl) */
        /* global strings */
-       for (i = 0; '\0' != g_strings[i].name[0]; ++i) {
+       for (int i = 0; '\0' != g_strings[i].name[0]; ++i) {
                tmp = get_sv (g_strings[i].name, 1);
                sv_magicext (tmp, NULL, PERL_MAGIC_ext, &g_pv_vtbl,
                                g_strings[i].var, 0);
@@ -2231,9 +2276,7 @@ static int init_pi (int argc, char **argv)
        log_info ("Initializing Perl interpreter...");
 #if COLLECT_DEBUG
        {
-               int i = 0;
-
-               for (i = 0; i < argc; ++i)
+               for (int i = 0; i < argc; ++i)
                        log_debug ("argv[%i] = \"%s\"", i, argv[i]);
        }
 #endif /* COLLECT_DEBUG */
@@ -2255,7 +2298,9 @@ static int init_pi (int argc, char **argv)
        perl_threads = smalloc (sizeof (*perl_threads));
        memset (perl_threads, 0, sizeof (c_ithread_list_t));
 
-       pthread_mutex_init (&perl_threads->mutex, NULL);
+       pthread_mutexattr_init(&perl_threads->mutexattr);
+       pthread_mutexattr_settype(&perl_threads->mutexattr, PTHREAD_MUTEX_RECURSIVE);
+       pthread_mutex_init (&perl_threads->mutex, &perl_threads->mutexattr);
        /* locking the mutex should not be necessary at this point
         * but let's just do it for the sake of completeness */
        pthread_mutex_lock (&perl_threads->mutex);
@@ -2336,7 +2381,7 @@ static int perl_config_loadplugin (pTHX_ oconfig_item_t *ci)
 
        aTHX = perl_threads->head->interp;
 
-       log_debug ("perl_config: loading perl plugin \"%s\"", value);
+       log_debug ("perl_config: Loading Perl plugin \"%s\"", value);
        load_module (PERL_LOADMOD_NOIMPORT,
                        newSVpv (module_name, strlen (module_name)), Nullsv);
        return 0;
@@ -2500,11 +2545,10 @@ static int perl_config_plugin (pTHX_ oconfig_item_t *ci)
 static int perl_config (oconfig_item_t *ci)
 {
        int status = 0;
-       int i = 0;
 
        dTHXa (NULL);
 
-       for (i = 0; i < ci->children_num; ++i) {
+       for (int i = 0; i < ci->children_num; ++i) {
                oconfig_item_t *c = ci->children + i;
                int current_status = 0;