perl plugin: Be more consistent about capitalization.
[collectd.git] / src / perl.c
index b6e7b22..19b0cb2 100644 (file)
@@ -24,7 +24,7 @@
  * interface for collectd plugins written in perl.
  */
 
-/* do not automatically get the thread specific perl interpreter */
+/* do not automatically get the thread specific Perl interpreter */
 #define PERL_NO_GET_CONTEXT
 
 #define DONT_POISON_SPRINTF_YET 1
@@ -112,6 +112,9 @@ static XS (Collectd_call_by_name);
 typedef struct c_ithread_s {
        /* the thread's Perl interpreter */
        PerlInterpreter *interp;
+       _Bool running;  /* thread is inside Perl interpreter */
+       _Bool shutdown;
+       pthread_t pthread;
 
        /* double linked list of threads */
        struct c_ithread_s *prev;
@@ -128,6 +131,7 @@ typedef struct {
 #endif /* COLLECT_DEBUG */
 
        pthread_mutex_t mutex;
+       pthread_mutexattr_t mutexattr;
 } c_ithread_list_t;
 
 /* name / user_data for Perl matches / targets */
@@ -145,6 +149,11 @@ typedef struct {
        } while (0)
 
 /*
+ * Public variable
+ */
+extern char **environ;
+
+/*
  * private variables
  */
 
@@ -194,6 +203,8 @@ struct {
        { "Collectd::TYPE_DATASET",       PLUGIN_DATASET },
        { "Collectd::DS_TYPE_COUNTER",    DS_TYPE_COUNTER },
        { "Collectd::DS_TYPE_GAUGE",      DS_TYPE_GAUGE },
+       { "Collectd::DS_TYPE_DERIVE",     DS_TYPE_DERIVE },
+       { "Collectd::DS_TYPE_ABSOLUTE",   DS_TYPE_ABSOLUTE },
        { "Collectd::LOG_ERR",            LOG_ERR },
        { "Collectd::LOG_WARNING",        LOG_WARNING },
        { "Collectd::LOG_NOTICE",         LOG_NOTICE },
@@ -267,7 +278,10 @@ static int hv2data_source (pTHX_ HV *hash, data_source_t *ds)
        if (NULL != (tmp = hv_fetch (hash, "type", 4, 0))) {
                ds->type = SvIV (*tmp);
 
-               if ((DS_TYPE_COUNTER != ds->type) && (DS_TYPE_GAUGE != ds->type)) {
+               if ((DS_TYPE_COUNTER != ds->type)
+                               && (DS_TYPE_GAUGE != ds->type)
+                               && (DS_TYPE_DERIVE != ds->type)
+                               && (DS_TYPE_ABSOLUTE != ds->type)) {
                        log_err ("hv2data_source: Invalid DS type.");
                        return -1;
                }
@@ -320,8 +334,12 @@ static int av2value (pTHX_ char *name, AV *array, value_t *value, int len)
                if (NULL != tmp) {
                        if (DS_TYPE_COUNTER == ds->ds[i].type)
                                value[i].counter = SvIV (*tmp);
-                       else
+                       else if (DS_TYPE_GAUGE == ds->ds[i].type)
                                value[i].gauge = SvNV (*tmp);
+                       else if (DS_TYPE_DERIVE == ds->ds[i].type)
+                               value[i].derive = SvIV (*tmp);
+                       else if (DS_TYPE_ABSOLUTE == ds->ds[i].type)
+                               value[i].absolute = SvIV (*tmp);
                }
                else {
                        return -1;
@@ -637,8 +655,12 @@ static int value_list2hv (pTHX_ value_list_t *vl, data_set_t *ds, HV *hash)
 
                if (DS_TYPE_COUNTER == ds->ds[i].type)
                        val = newSViv (vl->values[i].counter);
-               else
+               else if (DS_TYPE_GAUGE == ds->ds[i].type)
                        val = newSVnv (vl->values[i].gauge);
+               else if (DS_TYPE_DERIVE == ds->ds[i].type)
+                       val = newSViv (vl->values[i].derive);
+               else if (DS_TYPE_ABSOLUTE == ds->ds[i].type)
+                       val = newSViv (vl->values[i].absolute);
 
                if (NULL == av_store (values, i, val)) {
                        av_undef (values);
@@ -970,6 +992,32 @@ static int pplugin_dispatch_notification (pTHX_ HV *notif)
 } /* static int pplugin_dispatch_notification (HV *) */
 
 /*
+ * Call perl sub with thread locking flags handled.
+ */
+static int call_pv_locked (pTHX_ const char* sub_name)
+{
+       _Bool old_running;
+       int ret;
+
+       c_ithread_t *t = (c_ithread_t *)pthread_getspecific(perl_thr_key);
+       if (t == NULL) /* thread destroyed */
+               return 0;
+
+       old_running = t->running;
+       t->running = 1;
+
+       if (t->shutdown) {
+               t->running = old_running;
+               return 0;
+       }
+
+       ret = call_pv (sub_name, G_SCALAR);
+
+       t->running = old_running;
+       return ret;
+} /* static int call_pv_locked (pTHX, *sub_name) */
+
+/*
  * Call all working functions of the given type.
  */
 static int pplugin_call_all (pTHX_ int type, ...)
@@ -1094,7 +1142,7 @@ static int pplugin_call_all (pTHX_ int type, ...)
 
        PUTBACK;
 
-       retvals = call_pv ("Collectd::plugin_call_all", G_SCALAR);
+       retvals = call_pv_locked (aTHX_ "Collectd::plugin_call_all");
 
        SPAGAIN;
        if (0 < retvals) {
@@ -1112,7 +1160,7 @@ static int pplugin_call_all (pTHX_ int type, ...)
 } /* static int pplugin_call_all (int, ...) */
 
 /*
- * collectd's perl interpreter based thread implementation.
+ * collectd's Perl interpreter based thread implementation.
  *
  * This has been inspired by Perl's ithreads introduced in version 5.6.0.
  */
@@ -1212,6 +1260,9 @@ static c_ithread_t *c_ithread_create (PerlInterpreter *base)
                t->prev = perl_threads->tail;
        }
 
+       t->pthread = pthread_self();
+       t->running = 0;
+       t->shutdown = 0;
        perl_threads->tail = t;
 
        pthread_setspecific (perl_thr_key, (const void *)t);
@@ -1332,7 +1383,7 @@ static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
 
        PUTBACK;
 
-       retvals = call_pv ("Collectd::fc_call", G_SCALAR);
+       retvals = call_pv_locked (aTHX_ "Collectd::fc_call");
 
        if ((FC_CB_EXEC == cb_type) && (meta != NULL)) {
                assert (pmeta != NULL);
@@ -1868,6 +1919,7 @@ static XS (Collectd_call_by_name)
 
 static int perl_init (void)
 {
+       int status;
        dTHX;
 
        if (NULL == perl_threads)
@@ -1885,7 +1937,19 @@ static int perl_init (void)
 
        log_debug ("perl_init: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
-       return pplugin_call_all (aTHX_ PLUGIN_INIT);
+
+       /* Lock the base thread to avoid race conditions with c_ithread_create().
+        * See https://github.com/collectd/collectd/issues/9 and
+        *     https://github.com/collectd/collectd/issues/1706 for details.
+       */
+       assert (aTHX == perl_threads->head->interp);
+       pthread_mutex_lock (&perl_threads->mutex);
+
+       status = pplugin_call_all (aTHX_ PLUGIN_INIT);
+
+       pthread_mutex_unlock (&perl_threads->mutex);
+
+       return status;
 } /* static int perl_init (void) */
 
 static int perl_read (void)
@@ -1905,6 +1969,11 @@ static int perl_read (void)
                aTHX = t->interp;
        }
 
+       /* Assert that we're not running as the base thread. Otherwise, we might
+        * run into concurrency issues with c_ithread_create(). See
+        * https://github.com/collectd/collectd/issues/9 for details. */
+       assert (aTHX != perl_threads->head->interp);
+
        log_debug ("perl_read: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
        return pplugin_call_all (aTHX_ PLUGIN_READ);
@@ -1913,6 +1982,7 @@ static int perl_read (void)
 static int perl_write (const data_set_t *ds, const value_list_t *vl,
                user_data_t __attribute__((unused)) *user_data)
 {
+       int status;
        dTHX;
 
        if (NULL == perl_threads)
@@ -1928,9 +1998,20 @@ static int perl_write (const data_set_t *ds, const value_list_t *vl,
                aTHX = t->interp;
        }
 
+       /* Lock the base thread if this is not called from one of the read threads
+        * to avoid race conditions with c_ithread_create(). See
+        * https://github.com/collectd/collectd/issues/9 for details. */
+       if (aTHX == perl_threads->head->interp)
+               pthread_mutex_lock (&perl_threads->mutex);
+
        log_debug ("perl_write: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
-       return pplugin_call_all (aTHX_ PLUGIN_WRITE, ds, vl);
+       status = pplugin_call_all (aTHX_ PLUGIN_WRITE, ds, vl);
+
+       if (aTHX == perl_threads->head->interp)
+               pthread_mutex_unlock (&perl_threads->mutex);
+
+       return status;
 } /* static int perl_write (const data_set_t *, const value_list_t *) */
 
 static void perl_log (int level, const char *msg,
@@ -1951,7 +2032,19 @@ static void perl_log (int level, const char *msg,
                aTHX = t->interp;
        }
 
+       /* Lock the base thread if this is not called from one of the read threads
+        * to avoid race conditions with c_ithread_create(). See
+        * https://github.com/collectd/collectd/issues/9 for details.
+       */
+
+       if (aTHX == perl_threads->head->interp)
+               pthread_mutex_lock (&perl_threads->mutex);
+
        pplugin_call_all (aTHX_ PLUGIN_LOG, level, msg);
+
+       if (aTHX == perl_threads->head->interp)
+               pthread_mutex_unlock (&perl_threads->mutex);
+
        return;
 } /* static void perl_log (int, const char *) */
 
@@ -2034,17 +2127,31 @@ static int perl_shutdown (void)
        t = perl_threads->tail;
 
        while (NULL != t) {
+               struct timespec ts_wait;
                c_ithread_t *thr = t;
 
                /* the pointer has to be advanced before destroying
                 * the thread as this will free the memory */
                t = t->prev;
 
+               thr->shutdown = 1;
+               if (thr->running) {
+                       /* Give some time to thread to exit from Perl interpreter */
+                       WARNING ("perl shutdown: Thread is running inside Perl. Waiting.");
+                       ts_wait.tv_sec = 0;
+                       ts_wait.tv_nsec = 500000;
+                       nanosleep (&ts_wait, NULL);
+               }
+               if (thr->running) {
+                       pthread_kill (thr->pthread, SIGTERM);
+                       ERROR ("perl shutdown: Thread hangs inside Perl. Thread killed.");
+               }
                c_ithread_destroy (thr);
        }
 
        pthread_mutex_unlock (&perl_threads->mutex);
        pthread_mutex_destroy (&perl_threads->mutex);
+       pthread_mutexattr_destroy (&perl_threads->mutexattr);
 
        sfree (perl_threads);
 
@@ -2184,7 +2291,9 @@ static int init_pi (int argc, char **argv)
        perl_threads = (c_ithread_list_t *)smalloc (sizeof (c_ithread_list_t));
        memset (perl_threads, 0, sizeof (c_ithread_list_t));
 
-       pthread_mutex_init (&perl_threads->mutex, NULL);
+       pthread_mutexattr_init(&perl_threads->mutexattr);
+       pthread_mutexattr_settype(&perl_threads->mutexattr, PTHREAD_MUTEX_RECURSIVE);
+       pthread_mutex_init (&perl_threads->mutex, &perl_threads->mutexattr);
        /* locking the mutex should not be necessary at this point
         * but let's just do it for the sake of completeness */
        pthread_mutex_lock (&perl_threads->mutex);
@@ -2265,7 +2374,7 @@ static int perl_config_loadplugin (pTHX_ oconfig_item_t *ci)
 
        aTHX = perl_threads->head->interp;
 
-       log_debug ("perl_config: loading perl plugin \"%s\"", value);
+       log_debug ("perl_config: Loading Perl plugin \"%s\"", value);
        load_module (PERL_LOADMOD_NOIMPORT,
                        newSVpv (module_name, strlen (module_name)), Nullsv);
        return 0;