src/Makefile: Don't unnecessarily set plugin specific CXXFLAGS.
[collectd.git] / src / perl.c
index 0c5ee0c..f90761a 100644 (file)
@@ -123,6 +123,9 @@ static XS (Collectd_call_by_name);
 typedef struct c_ithread_s {
        /* the thread's Perl interpreter */
        PerlInterpreter *interp;
+       _Bool running;  /* thread is inside pi */
+       _Bool shutdown;
+       pthread_t pthread;
 
        /* double linked list of threads */
        struct c_ithread_s *prev;
@@ -139,6 +142,7 @@ typedef struct {
 #endif /* COLLECT_DEBUG */
 
        pthread_mutex_t mutex;
+       pthread_mutexattr_t mutexattr;
 } c_ithread_list_t;
 
 /* name / user_data for Perl matches / targets */
@@ -437,7 +441,7 @@ static int av2data_set (pTHX_ AV *array, char *name, data_set_t *ds)
                return -1;
        }
 
-       ds->ds = (data_source_t *)smalloc ((len + 1) * sizeof (data_source_t));
+       ds->ds = smalloc ((len + 1) * sizeof (*ds->ds));
        ds->ds_num = len + 1;
 
        for (i = 0; i <= len; ++i) {
@@ -501,7 +505,7 @@ static int av2notification_meta (pTHX_ AV *array, notification_meta_t **meta)
 
                hash = (HV *)SvRV (*tmp);
 
-               *m = (notification_meta_t *)smalloc (sizeof (**m));
+               *m = smalloc (sizeof (**m));
 
                if (NULL == (tmp = hv_fetch (hash, "name", 4, 0))) {
                        log_warn ("av2notification_meta: Skipping invalid "
@@ -999,6 +1003,32 @@ static int pplugin_dispatch_notification (pTHX_ HV *notif)
 } /* static int pplugin_dispatch_notification (HV *) */
 
 /*
+ * Call perl sub with thread locking flags handled.
+ */
+static int call_pv_locked (pTHX_ const char* sub_name)
+{
+       _Bool old_running;
+       int ret;
+
+       c_ithread_t *t = (c_ithread_t *)pthread_getspecific(perl_thr_key);
+       if (t == NULL) /* thread destroyed */
+               return 0;
+
+       old_running = t->running;
+       t->running = 1;
+
+       if (t->shutdown) {
+               t->running = old_running;
+               return 0;
+       }
+
+       ret = call_pv (sub_name, G_SCALAR);
+
+       t->running = old_running;
+       return ret;
+} /* static int call_pv_locked (pTHX, *sub_name) */
+
+/*
  * Call all working functions of the given type.
  */
 static int pplugin_call_all (pTHX_ int type, ...)
@@ -1127,7 +1157,7 @@ static int pplugin_call_all (pTHX_ int type, ...)
 
        PUTBACK;
 
-       retvals = call_pv ("Collectd::plugin_call_all", G_SCALAR);
+       retvals = call_pv_locked (aTHX_ "Collectd::plugin_call_all");
 
        SPAGAIN;
        if (0 < retvals) {
@@ -1218,7 +1248,7 @@ static c_ithread_t *c_ithread_create (PerlInterpreter *base)
 
        assert (NULL != perl_threads);
 
-       t = (c_ithread_t *)smalloc (sizeof (c_ithread_t));
+       t = smalloc (sizeof (*t));
        memset (t, 0, sizeof (c_ithread_t));
 
        t->interp = (NULL == base)
@@ -1248,6 +1278,9 @@ static c_ithread_t *c_ithread_create (PerlInterpreter *base)
                t->prev = perl_threads->tail;
        }
 
+       t->pthread = pthread_self();
+       t->running = 0;
+       t->shutdown = 0;
        perl_threads->tail = t;
 
        pthread_setspecific (perl_thr_key, (const void *)t);
@@ -1368,7 +1401,7 @@ static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
 
        PUTBACK;
 
-       retvals = call_pv ("Collectd::fc_call", G_SCALAR);
+       retvals = call_pv_locked (aTHX_ "Collectd::fc_call");
 
        if ((FC_CB_EXEC == cb_type) && (meta != NULL)) {
                assert (pmeta != NULL);
@@ -1428,7 +1461,7 @@ static int fc_create (int type, const oconfig_item_t *ci, void **user_data)
                return -1;
        }
 
-       data = (pfc_user_data_t *)smalloc (sizeof (*data));
+       data = smalloc (sizeof (*data));
        data->name      = sstrdup (ci->values[0].value.string);
        data->user_data = newSV (0);
 
@@ -1908,6 +1941,7 @@ static XS (Collectd_call_by_name)
 
 static int perl_init (void)
 {
+       int status;
        dTHX;
 
        if (NULL == perl_threads)
@@ -1925,7 +1959,19 @@ static int perl_init (void)
 
        log_debug ("perl_init: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
-       return pplugin_call_all (aTHX_ PLUGIN_INIT);
+
+       /* Lock the base thread to avoid race conditions with c_ithread_create().
+        * See https://github.com/collectd/collectd/issues/9 and
+        *     https://github.com/collectd/collectd/issues/1706 for details.
+       */
+       assert (aTHX == perl_threads->head->interp);
+       pthread_mutex_lock (&perl_threads->mutex);
+
+       status = pplugin_call_all (aTHX_ PLUGIN_INIT);
+
+       pthread_mutex_unlock (&perl_threads->mutex);
+
+       return status;
 } /* static int perl_init (void) */
 
 static int perl_read (void)
@@ -2010,7 +2056,9 @@ static void perl_log (int level, const char *msg,
 
        /* Lock the base thread if this is not called from one of the read threads
         * to avoid race conditions with c_ithread_create(). See
-        * https://github.com/collectd/collectd/issues/9 for details. */
+        * https://github.com/collectd/collectd/issues/9 for details.
+       */
+
        if (aTHX == perl_threads->head->interp)
                pthread_mutex_lock (&perl_threads->mutex);
 
@@ -2064,9 +2112,8 @@ static int perl_flush (cdtime_t timeout, const char *identifier,
 
 static int perl_shutdown (void)
 {
-       c_ithread_t *t = NULL;
-
-       int ret = 0;
+       c_ithread_t *t;
+       int ret;
 
        dTHX;
 
@@ -2076,8 +2123,6 @@ static int perl_shutdown (void)
                return 0;
 
        if (NULL == aTHX) {
-               c_ithread_t *t = NULL;
-
                pthread_mutex_lock (&perl_threads->mutex);
                t = c_ithread_create (perl_threads->head->interp);
                pthread_mutex_unlock (&perl_threads->mutex);
@@ -2101,17 +2146,31 @@ static int perl_shutdown (void)
        t = perl_threads->tail;
 
        while (NULL != t) {
+               struct timespec ts_wait;
                c_ithread_t *thr = t;
 
                /* the pointer has to be advanced before destroying
                 * the thread as this will free the memory */
                t = t->prev;
 
+               thr->shutdown = 1;
+               if (thr->running) {
+                       /* Give some time to thread to exit from pi */
+                       WARNING ("perl shutdown: thread is running inside perl. Waiting.");
+                       ts_wait.tv_sec = 0;
+                       ts_wait.tv_nsec = 500000;
+                       nanosleep (&ts_wait, NULL);
+               }
+               if (thr->running) {
+                       ERROR ("perl shutdown: thread hangs inside perl. Thread killed.");
+                       pthread_kill (thr->pthread, SIGTERM);
+               }
                c_ithread_destroy (thr);
        }
 
        pthread_mutex_unlock (&perl_threads->mutex);
        pthread_mutex_destroy (&perl_threads->mutex);
+       pthread_mutexattr_destroy (&perl_threads->mutexattr);
 
        sfree (perl_threads);
 
@@ -2252,10 +2311,12 @@ static int init_pi (int argc, char **argv)
 #endif
        PERL_SYS_INIT3 (&argc, &argv, &environ);
 
-       perl_threads = (c_ithread_list_t *)smalloc (sizeof (c_ithread_list_t));
+       perl_threads = smalloc (sizeof (*perl_threads));
        memset (perl_threads, 0, sizeof (c_ithread_list_t));
 
-       pthread_mutex_init (&perl_threads->mutex, NULL);
+       pthread_mutexattr_init(&perl_threads->mutexattr);
+       pthread_mutexattr_settype(&perl_threads->mutexattr, PTHREAD_MUTEX_RECURSIVE);
+       pthread_mutex_init (&perl_threads->mutex, &perl_threads->mutexattr);
        /* locking the mutex should not be necessary at this point
         * but let's just do it for the sake of completeness */
        pthread_mutex_lock (&perl_threads->mutex);
@@ -2382,7 +2443,7 @@ static int perl_config_enabledebugger (pTHX_ oconfig_item_t *ci)
 
        value = ci->values[0].value.string;
 
-       perl_argv = (char **)realloc (perl_argv,
+       perl_argv = realloc (perl_argv,
                        (++perl_argc + 1) * sizeof (char *));
 
        if (NULL == perl_argv) {
@@ -2394,7 +2455,7 @@ static int perl_config_enabledebugger (pTHX_ oconfig_item_t *ci)
                perl_argv[perl_argc - 1] = "-d";
        }
        else {
-               perl_argv[perl_argc - 1] = (char *)smalloc (strlen (value) + 4);
+               perl_argv[perl_argc - 1] = smalloc (strlen (value) + 4);
                sstrncpy (perl_argv[perl_argc - 1], "-d:", 4);
                sstrncpy (perl_argv[perl_argc - 1] + 3, value, strlen (value) + 1);
        }
@@ -2419,7 +2480,7 @@ static int perl_config_includedir (pTHX_ oconfig_item_t *ci)
        value = ci->values[0].value.string;
 
        if (NULL == aTHX) {
-               perl_argv = (char **)realloc (perl_argv,
+               perl_argv = realloc (perl_argv,
                                (++perl_argc + 1) * sizeof (char *));
 
                if (NULL == perl_argv) {
@@ -2427,7 +2488,7 @@ static int perl_config_includedir (pTHX_ oconfig_item_t *ci)
                        exit (3);
                }
 
-               perl_argv[perl_argc - 1] = (char *)smalloc (strlen (value) + 3);
+               perl_argv[perl_argc - 1] = smalloc (strlen (value) + 3);
                sstrncpy(perl_argv[perl_argc - 1], "-I", 3);
                sstrncpy(perl_argv[perl_argc - 1] + 2, value, strlen (value) + 1);
 
@@ -2545,7 +2606,7 @@ static int perl_config (oconfig_item_t *ci)
 void module_register (void)
 {
        perl_argc = 4;
-       perl_argv = (char **)smalloc ((perl_argc + 1) * sizeof (char *));
+       perl_argv = smalloc ((perl_argc + 1) * sizeof (*perl_argv));
 
        /* default options for the Perl interpreter */
        perl_argv[0] = "";