Merge pull request #1710 from rpv-tomsk/perl-plugin-fixes
authorSebastian Harl <sh@tokkee.org>
Mon, 30 May 2016 18:54:42 +0000 (20:54 +0200)
committerSebastian Harl <sh@tokkee.org>
Mon, 30 May 2016 18:54:42 +0000 (20:54 +0200)
perl plugin: Synchronize access to thread information.

Cf. #1706

1  2 
src/perl.c

diff --combined src/perl.c
@@@ -2,25 -2,20 +2,25 @@@
   * collectd - src/perl.c
   * Copyright (C) 2007-2009  Sebastian Harl
   *
 - * This program is free software; you can redistribute it and/or modify it
 - * under the terms of the GNU General Public License as published by the
 - * Free Software Foundation; only version 2 of the License is applicable.
 + * Permission is hereby granted, free of charge, to any person obtaining a
 + * copy of this software and associated documentation files (the "Software"),
 + * to deal in the Software without restriction, including without limitation
 + * the rights to use, copy, modify, merge, publish, distribute, sublicense,
 + * and/or sell copies of the Software, and to permit persons to whom the
 + * Software is furnished to do so, subject to the following conditions:
   *
 - * This program is distributed in the hope that it will be useful, but
 - * WITHOUT ANY WARRANTY; without even the implied warranty of
 - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 - * General Public License for more details.
 + * The above copyright notice and this permission notice shall be included in
 + * all copies or substantial portions of the Software.
   *
 - * You should have received a copy of the GNU General Public License along
 - * with this program; if not, write to the Free Software Foundation, Inc.,
 - * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
 + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 + * DEALINGS IN THE SOFTWARE.
   *
 - * Author:
 + * Authors:
   *   Sebastian Harl <sh at tokkee.org>
   **/
  
@@@ -46,7 -41,6 +46,7 @@@
  #include <perl.h>
  
  #if defined(COLLECT_DEBUG) && COLLECT_DEBUG && defined(__GNUC__) && __GNUC__
 +# undef sprintf
  # pragma GCC poison sprintf
  #endif
  
@@@ -123,6 -117,9 +123,9 @@@ static XS (Collectd_call_by_name)
  typedef struct c_ithread_s {
        /* the thread's Perl interpreter */
        PerlInterpreter *interp;
+       _Bool running;  /* thread is inside pi */
+       _Bool shutdown;
+       pthread_t pthread;
  
        /* double linked list of threads */
        struct c_ithread_s *prev;
@@@ -139,6 -136,7 +142,7 @@@ typedef struct 
  #endif /* COLLECT_DEBUG */
  
        pthread_mutex_t mutex;
+       pthread_mutexattr_t mutexattr;
  } c_ithread_list_t;
  
  /* name / user_data for Perl matches / targets */
@@@ -301,32 -299,33 +305,32 @@@ static int hv2data_source (pTHX_ HV *ha
        return 0;
  } /* static int hv2data_source (HV *, data_source_t *) */
  
 -static int av2value (pTHX_ char *name, AV *array, value_t *value, int len)
 +/* av2value converts at most "len" elements from "array" to "value". Returns the
 + * number of elements converted or zero on error. */
 +static size_t av2value (pTHX_ char *name, AV *array, value_t *value, size_t array_len)
  {
        const data_set_t *ds;
 +      size_t i;
  
 -      int i = 0;
 -
 -      if ((NULL == name) || (NULL == array) || (NULL == value))
 -              return -1;
 -
 -      if (av_len (array) < len - 1)
 -              len = av_len (array) + 1;
 -
 -      if (0 >= len)
 -              return -1;
 +      if ((NULL == name) || (NULL == array) || (NULL == value) || (array_len == 0))
 +              return 0;
  
        ds = plugin_get_ds (name);
        if (NULL == ds) {
                log_err ("av2value: Unknown dataset \"%s\"", name);
 -              return -1;
 +              return 0;
        }
  
 -      if (ds->ds_num < len) {
 -              log_warn ("av2value: Value length exceeds data set length.");
 -              len = ds->ds_num;
 +      if (array_len < ds->ds_num) {
 +              log_warn ("av2value: array does not contain enough elements for type \"%s\": got %zu, want %zu",
 +                              name, array_len, ds->ds_num);
 +              return 0;
 +      } else if (array_len > ds->ds_num) {
 +              log_warn ("av2value: array contains excess elements for type \"%s\": got %zu, want %zu",
 +                              name, array_len, ds->ds_num);
        }
  
 -      for (i = 0; i < len; ++i) {
 +      for (i = 0; i < ds->ds_num; ++i) {
                SV **tmp = av_fetch (array, i, 0);
  
                if (NULL != tmp) {
                                value[i].absolute = SvIV (*tmp);
                }
                else {
 -                      return -1;
 +                      return 0;
                }
        }
 -      return len;
 -} /* static int av2value (char *, AV *, value_t *, int) */
 +
 +      return ds->ds_num;
 +} /* static size_t av2value (char *, AV *, value_t *, size_t) */
  
  /*
   * value list:
@@@ -380,14 -378,16 +384,14 @@@ static int hv2value_list (pTHX_ HV *has
  
        {
                AV  *array = (AV *)SvRV (*tmp);
 -              int len    = av_len (array) + 1;
 -
 -              if (len <= 0)
 +              /* av_len returns the highest index, not the actual length. */
 +              size_t array_len = (size_t) (av_len (array) + 1);
 +              if (array_len == 0)
                        return -1;
  
 -              vl->values     = (value_t *)smalloc (len * sizeof (value_t));
 -              vl->values_len = av2value (aTHX_ vl->type, (AV *)SvRV (*tmp),
 -                              vl->values, len);
 -
 -              if (-1 == vl->values_len) {
 +              vl->values     = calloc (array_len, sizeof (*vl->values));
 +              vl->values_len = av2value (aTHX_ vl->type, (AV *)SvRV (*tmp), vl->values, array_len);
 +              if (vl->values_len == 0) {
                        sfree (vl->values);
                        return -1;
                }
@@@ -437,7 -437,7 +441,7 @@@ static int av2data_set (pTHX_ AV *array
                return -1;
        }
  
 -      ds->ds = (data_source_t *)smalloc ((len + 1) * sizeof (data_source_t));
 +      ds->ds = smalloc ((len + 1) * sizeof (*ds->ds));
        ds->ds_num = len + 1;
  
        for (i = 0; i <= len; ++i) {
@@@ -501,7 -501,7 +505,7 @@@ static int av2notification_meta (pTHX_ 
  
                hash = (HV *)SvRV (*tmp);
  
 -              *m = (notification_meta_t *)smalloc (sizeof (**m));
 +              *m = smalloc (sizeof (**m));
  
                if (NULL == (tmp = hv_fetch (hash, "name", 4, 0))) {
                        log_warn ("av2notification_meta: Skipping invalid "
@@@ -602,7 -602,7 +606,7 @@@ static int hv2notification (pTHX_ HV *h
  
  static int data_set2av (pTHX_ data_set_t *ds, AV *array)
  {
 -      int i = 0;
 +      size_t i;
  
        if ((NULL == ds) || (NULL == array))
                return -1;
  static int value_list2hv (pTHX_ value_list_t *vl, data_set_t *ds, HV *hash)
  {
        AV *values = NULL;
 -
 -      int i   = 0;
 -      int len = 0;
 +      size_t i;
  
        if ((NULL == vl) || (NULL == ds) || (NULL == hash))
                return -1;
  
 -      len = vl->values_len;
 -
 -      if (ds->ds_num < len) {
 -              log_warn ("value2av: Value length exceeds data set length.");
 -              len = ds->ds_num;
 -      }
 -
        values = newAV ();
 -      av_extend (values, len - 1);
 +      /* av_extend takes the last *index* to which the array should be extended. */
 +      av_extend (values, vl->values_len - 1);
  
 -      for (i = 0; i < len; ++i) {
 +      assert (ds->ds_num == vl->values_len);
 +      for (i = 0; i < vl->values_len; ++i) {
                SV *val = NULL;
  
                if (DS_TYPE_COUNTER == ds->ds[i].type)
@@@ -999,6 -1006,32 +1003,32 @@@ static int pplugin_dispatch_notificatio
  } /* static int pplugin_dispatch_notification (HV *) */
  
  /*
+  * Call perl sub with thread locking flags handled.
+  */
+ static int call_pv_locked (pTHX_ const char* sub_name)
+ {
+       _Bool old_running;
+       int ret;
+       c_ithread_t *t = (c_ithread_t *)pthread_getspecific(perl_thr_key);
+       if (t == NULL) /* thread destroyed */
+               return 0;
+       old_running = t->running;
+       t->running = 1;
+       if (t->shutdown) {
+               t->running = old_running;
+               return 0;
+       }
+       ret = call_pv (sub_name, G_SCALAR);
+       t->running = old_running;
+       return ret;
+ } /* static int call_pv_locked (pTHX, *sub_name) */
+ /*
   * Call all working functions of the given type.
   */
  static int pplugin_call_all (pTHX_ int type, ...)
  
        PUTBACK;
  
-       retvals = call_pv ("Collectd::plugin_call_all", G_SCALAR);
+       retvals = call_pv_locked (aTHX_ "Collectd::plugin_call_all");
  
        SPAGAIN;
        if (0 < retvals) {
@@@ -1218,7 -1251,7 +1248,7 @@@ static c_ithread_t *c_ithread_create (P
  
        assert (NULL != perl_threads);
  
 -      t = (c_ithread_t *)smalloc (sizeof (c_ithread_t));
 +      t = smalloc (sizeof (*t));
        memset (t, 0, sizeof (c_ithread_t));
  
        t->interp = (NULL == base)
                t->prev = perl_threads->tail;
        }
  
+       t->pthread = pthread_self();
+       t->running = 0;
+       t->shutdown = 0;
        perl_threads->tail = t;
  
        pthread_setspecific (perl_thr_key, (const void *)t);
@@@ -1368,7 -1404,7 +1401,7 @@@ static int fc_call (pTHX_ int type, in
  
        PUTBACK;
  
-       retvals = call_pv ("Collectd::fc_call", G_SCALAR);
+       retvals = call_pv_locked (aTHX_ "Collectd::fc_call");
  
        if ((FC_CB_EXEC == cb_type) && (meta != NULL)) {
                assert (pmeta != NULL);
@@@ -1428,7 -1464,7 +1461,7 @@@ static int fc_create (int type, const o
                return -1;
        }
  
 -      data = (pfc_user_data_t *)smalloc (sizeof (*data));
 +      data = smalloc (sizeof (*data));
        data->name      = sstrdup (ci->values[0].value.string);
        data->user_data = newSV (0);
  
@@@ -1908,6 -1944,7 +1941,7 @@@ static XS (Collectd_call_by_name
  
  static int perl_init (void)
  {
+       int status;
        dTHX;
  
        if (NULL == perl_threads)
  
        log_debug ("perl_init: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
-       return pplugin_call_all (aTHX_ PLUGIN_INIT);
+       /* Lock the base thread to avoid race conditions with c_ithread_create().
+        * See https://github.com/collectd/collectd/issues/9 and
+        *     https://github.com/collectd/collectd/issues/1706 for details.
+       */
+       assert (aTHX == perl_threads->head->interp);
+       pthread_mutex_lock (&perl_threads->mutex);
+       status = pplugin_call_all (aTHX_ PLUGIN_INIT);
+       pthread_mutex_unlock (&perl_threads->mutex);
+       return status;
  } /* static int perl_init (void) */
  
  static int perl_read (void)
@@@ -2010,7 -2059,9 +2056,9 @@@ static void perl_log (int level, const 
  
        /* Lock the base thread if this is not called from one of the read threads
         * to avoid race conditions with c_ithread_create(). See
-        * https://github.com/collectd/collectd/issues/9 for details. */
+        * https://github.com/collectd/collectd/issues/9 for details.
+       */
        if (aTHX == perl_threads->head->interp)
                pthread_mutex_lock (&perl_threads->mutex);
  
@@@ -2064,8 -2115,9 +2112,8 @@@ static int perl_flush (cdtime_t timeout
  
  static int perl_shutdown (void)
  {
 -      c_ithread_t *t = NULL;
 -
 -      int ret = 0;
 +      c_ithread_t *t;
 +      int ret;
  
        dTHX;
  
                return 0;
  
        if (NULL == aTHX) {
 -              t = NULL;
 -
                pthread_mutex_lock (&perl_threads->mutex);
                t = c_ithread_create (perl_threads->head->interp);
                pthread_mutex_unlock (&perl_threads->mutex);
        t = perl_threads->tail;
  
        while (NULL != t) {
+               struct timespec ts_wait;
                c_ithread_t *thr = t;
  
                /* the pointer has to be advanced before destroying
                 * the thread as this will free the memory */
                t = t->prev;
  
+               thr->shutdown = 1;
+               if (thr->running) {
+                       /* Give some time to thread to exit from pi */
+                       WARNING ("perl shutdown: thread is running inside perl. Waiting.");
+                       ts_wait.tv_sec = 0;
+                       ts_wait.tv_nsec = 500000;
+                       nanosleep (&ts_wait, NULL);
+               }
+               if (thr->running) {
+                       ERROR ("perl shutdown: thread hangs inside perl. Thread killed.");
+                       pthread_kill (thr->pthread, SIGTERM);
+               }
                c_ithread_destroy (thr);
        }
  
        pthread_mutex_unlock (&perl_threads->mutex);
        pthread_mutex_destroy (&perl_threads->mutex);
+       pthread_mutexattr_destroy (&perl_threads->mutexattr);
  
        sfree (perl_threads);
  
@@@ -2249,10 -2317,12 +2311,12 @@@ static int init_pi (int argc, char **ar
  #endif
        PERL_SYS_INIT3 (&argc, &argv, &environ);
  
 -      perl_threads = (c_ithread_list_t *)smalloc (sizeof (c_ithread_list_t));
 +      perl_threads = smalloc (sizeof (*perl_threads));
        memset (perl_threads, 0, sizeof (c_ithread_list_t));
  
-       pthread_mutex_init (&perl_threads->mutex, NULL);
+       pthread_mutexattr_init(&perl_threads->mutexattr);
+       pthread_mutexattr_settype(&perl_threads->mutexattr, PTHREAD_MUTEX_RECURSIVE);
+       pthread_mutex_init (&perl_threads->mutex, &perl_threads->mutexattr);
        /* locking the mutex should not be necessary at this point
         * but let's just do it for the sake of completeness */
        pthread_mutex_lock (&perl_threads->mutex);
@@@ -2379,7 -2449,7 +2443,7 @@@ static int perl_config_enabledebugger (
  
        value = ci->values[0].value.string;
  
 -      perl_argv = (char **)realloc (perl_argv,
 +      perl_argv = realloc (perl_argv,
                        (++perl_argc + 1) * sizeof (char *));
  
        if (NULL == perl_argv) {
                perl_argv[perl_argc - 1] = "-d";
        }
        else {
 -              perl_argv[perl_argc - 1] = (char *)smalloc (strlen (value) + 4);
 +              perl_argv[perl_argc - 1] = smalloc (strlen (value) + 4);
                sstrncpy (perl_argv[perl_argc - 1], "-d:", 4);
                sstrncpy (perl_argv[perl_argc - 1] + 3, value, strlen (value) + 1);
        }
@@@ -2416,7 -2486,7 +2480,7 @@@ static int perl_config_includedir (pTHX
        value = ci->values[0].value.string;
  
        if (NULL == aTHX) {
 -              perl_argv = (char **)realloc (perl_argv,
 +              perl_argv = realloc (perl_argv,
                                (++perl_argc + 1) * sizeof (char *));
  
                if (NULL == perl_argv) {
                        exit (3);
                }
  
 -              perl_argv[perl_argc - 1] = (char *)smalloc (strlen (value) + 3);
 +              perl_argv[perl_argc - 1] = smalloc (strlen (value) + 3);
                sstrncpy(perl_argv[perl_argc - 1], "-I", 3);
                sstrncpy(perl_argv[perl_argc - 1] + 2, value, strlen (value) + 1);
  
@@@ -2542,7 -2612,7 +2606,7 @@@ static int perl_config (oconfig_item_t 
  void module_register (void)
  {
        perl_argc = 4;
 -      perl_argv = (char **)smalloc ((perl_argc + 1) * sizeof (char *));
 +      perl_argv = smalloc ((perl_argc + 1) * sizeof (*perl_argv));
  
        /* default options for the Perl interpreter */
        perl_argv[0] = "";