perl plugin: Fixes for #1706
[collectd.git] / src / perl.c
index 78e508a..c5b9d2a 100644 (file)
@@ -117,6 +117,9 @@ static XS (Collectd_call_by_name);
 typedef struct c_ithread_s {
        /* the thread's Perl interpreter */
        PerlInterpreter *interp;
+       _Bool running;  /* thread is inside pi */
+       _Bool shutdown;
+       pthread_t pthread;
 
        /* double linked list of threads */
        struct c_ithread_s *prev;
@@ -510,7 +513,6 @@ static int av2notification_meta (pTHX_ AV *array, notification_meta_t **meta)
                if (NULL == (tmp = hv_fetch (hash, "value", 5, 0))) {
                        log_warn ("av2notification_meta: Skipping invalid "
                                        "meta information.");
-                       free ((*m)->name);
                        free (*m);
                        continue;
                }
@@ -1009,11 +1011,24 @@ static int pplugin_call_all (pTHX_ int type, ...)
 {
        int retvals = 0;
 
+       _Bool old_running;
        va_list ap;
        int ret = 0;
 
        dSP;
 
+       c_ithread_t *t = (c_ithread_t *)pthread_getspecific(perl_thr_key);
+       if (t == NULL) /* thread destroyed ( c_ithread_destroy*() -> log_debug() ) */
+               return 0;
+
+       old_running = t->running;
+       t->running = 1;
+       
+       if (t->shutdown) {
+               t->running = old_running;
+               return 0;
+       }
+
        if ((type < 0) || (type >= PLUGIN_TYPES))
                return -1;
 
@@ -1144,6 +1159,7 @@ static int pplugin_call_all (pTHX_ int type, ...)
        FREETMPS;
        LEAVE;
 
+       t->running = old_running;
        va_end (ap);
        return ret;
 } /* static int pplugin_call_all (int, ...) */
@@ -1203,7 +1219,10 @@ static void c_ithread_destructor (void *arg)
 
        /* the ithread no longer exists */
        if (NULL == t)
+       {
+               pthread_mutex_unlock (&perl_threads->mutex);
                return;
+       }
 
        c_ithread_destroy (ithread);
 
@@ -1249,6 +1268,9 @@ static c_ithread_t *c_ithread_create (PerlInterpreter *base)
                t->prev = perl_threads->tail;
        }
 
+       t->pthread = pthread_self();
+       t->running = 0;
+       t->shutdown = 0;
        perl_threads->tail = t;
 
        pthread_setspecific (perl_thr_key, (const void *)t);
@@ -1263,6 +1285,7 @@ static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
 {
        int retvals = 0;
 
+       _Bool old_running;
        va_list ap;
        int ret = 0;
 
@@ -1271,6 +1294,18 @@ static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
 
        dSP;
 
+       c_ithread_t *t = (c_ithread_t *)pthread_getspecific(perl_thr_key);
+       if (t == NULL) /* thread destroyed */
+               return 0;
+
+       old_running = t->running;
+       t->running = 1;
+
+       if (t->shutdown) {
+               t->running = old_running;
+               return 0;
+       }
+
        if ((type < 0) || (type >= FC_TYPES))
                return -1;
 
@@ -1394,6 +1429,7 @@ static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
        FREETMPS;
        LEAVE;
 
+       t->running = old_running;
        va_end (ap);
        return ret;
 } /* static int fc_call (int, int, pfc_user_data_t *, ...) */
@@ -1644,15 +1680,15 @@ static XS (Collectd_plugin_dispatch_values)
 
        values = ST (/* stack index = */ 0);
 
+       if (NULL == values)
+               XSRETURN_EMPTY;
+
        /* Make sure the argument is a hash reference. */
        if (! (SvROK (values) && (SVt_PVHV == SvTYPE (SvRV (values))))) {
                log_err ("Collectd::plugin_dispatch_values: Invalid values.");
                XSRETURN_EMPTY;
        }
 
-       if (NULL == values)
-               XSRETURN_EMPTY;
-
        ret = pplugin_dispatch_values (aTHX_ (HV *)SvRV (values));
 
        if (0 == ret)
@@ -1909,6 +1945,7 @@ static XS (Collectd_call_by_name)
 
 static int perl_init (void)
 {
+       int status;
        dTHX;
 
        if (NULL == perl_threads)
@@ -1926,7 +1963,20 @@ static int perl_init (void)
 
        log_debug ("perl_init: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
-       return pplugin_call_all (aTHX_ PLUGIN_INIT);
+
+       /* Lock the base thread to avoid race conditions with c_ithread_create().
+        * See https://github.com/collectd/collectd/issues/9 and
+        *     https://github.com/collectd/collectd/issues/1706 for details.
+        * Locking here requires additional check in perl_log() to avoid deadlock.
+       */
+       assert (aTHX == perl_threads->head->interp);
+       pthread_mutex_lock (&perl_threads->mutex);
+
+       status = pplugin_call_all (aTHX_ PLUGIN_INIT);
+
+       pthread_mutex_unlock (&perl_threads->mutex);
+
+       return status;
 } /* static int perl_init (void) */
 
 static int perl_read (void)
@@ -1995,6 +2045,7 @@ static void perl_log (int level, const char *msg,
                user_data_t __attribute__((unused)) *user_data)
 {
        dTHX;
+       int locked = 0;
 
        if (NULL == perl_threads)
                return;
@@ -2011,13 +2062,19 @@ static void perl_log (int level, const char *msg,
 
        /* Lock the base thread if this is not called from one of the read threads
         * to avoid race conditions with c_ithread_create(). See
-        * https://github.com/collectd/collectd/issues/9 for details. */
-       if (aTHX == perl_threads->head->interp)
+        * https://github.com/collectd/collectd/issues/9 for details.
+        * Additionally check, if we are called from perl interpreter.
+        * Maybe PTHREAD_MUTEX_RECURSIVE mutex type will be more appropriate?
+       */
+
+       if (aTHX == perl_threads->head->interp && !perl_threads->head->running) {
                pthread_mutex_lock (&perl_threads->mutex);
+               locked = 1;
+       }
 
        pplugin_call_all (aTHX_ PLUGIN_LOG, level, msg);
 
-       if (aTHX == perl_threads->head->interp)
+       if (locked)
                pthread_mutex_unlock (&perl_threads->mutex);
 
        return;
@@ -2077,7 +2134,7 @@ static int perl_shutdown (void)
                return 0;
 
        if (NULL == aTHX) {
-               c_ithread_t *t = NULL;
+               t = NULL;
 
                pthread_mutex_lock (&perl_threads->mutex);
                t = c_ithread_create (perl_threads->head->interp);
@@ -2102,12 +2159,30 @@ static int perl_shutdown (void)
        t = perl_threads->tail;
 
        while (NULL != t) {
+               struct timespec ts_wait;
                c_ithread_t *thr = t;
 
                /* the pointer has to be advanced before destroying
                 * the thread as this will free the memory */
                t = t->prev;
 
+               thr->shutdown = 1;
+               if (thr->running) {
+                       /* Give some time to thread to exit from pi */
+                       WARNING ("perl shutdown: thread is running inside perl. Waiting.");
+                       ts_wait.tv_sec = 0;
+                       ts_wait.tv_nsec = 500000;
+                       nanosleep (&ts_wait, NULL);
+               }
+               if (thr->running) {
+                       /* This will crash collectd process later due to PERL_SYS_TERM() */
+                       //ERROR ("perl shutdown: thread hangs inside perl. "
+                       //       "Skipped perl interpreter destroy.");
+                       //continue;
+                       
+                       ERROR ("perl shutdown: thread hangs inside perl. Thread killed.");
+                       pthread_kill (thr->pthread, SIGTERM);
+               }
                c_ithread_destroy (thr);
        }
 
@@ -2510,7 +2585,10 @@ static int perl_config (oconfig_item_t *ci)
                int current_status = 0;
 
                if (NULL != perl_threads)
-                       aTHX = PERL_GET_CONTEXT;
+               {
+                       if ((aTHX = PERL_GET_CONTEXT) == NULL)
+                               return -1;
+               }
 
                if (0 == strcasecmp (c->key, "LoadPlugin"))
                        current_status = perl_config_loadplugin (aTHX_ c);