collectd.spec: the dpdk is actually called dpdkstat...
[collectd.git] / src / perl.c
index 945e225..4a11d6c 100644 (file)
@@ -2,21 +2,27 @@
  * collectd - src/perl.c
  * Copyright (C) 2007-2009  Sebastian Harl
  *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; only version 2 of the License is applicable.
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
  *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License for more details.
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
  *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
  *
- * Author:
+ * Authors:
  *   Sebastian Harl <sh at tokkee.org>
+ *   Pavel Rochnyak <pavel2000 ngs.ru>
  **/
 
 /*
  * interface for collectd plugins written in perl.
  */
 
-/* do not automatically get the thread specific perl interpreter */
+/* do not automatically get the thread specific Perl interpreter */
 #define PERL_NO_GET_CONTEXT
 
 #define DONT_POISON_SPRINTF_YET 1
 #include "collectd.h"
-#undef DONT_POISON_SPRINTF_YET
 
-#include "configfile.h"
+#undef DONT_POISON_SPRINTF_YET
 
 #if HAVE_STDBOOL_H
 # include <stdbool.h>
@@ -41,6 +46,7 @@
 #include <perl.h>
 
 #if defined(COLLECT_DEBUG) && COLLECT_DEBUG && defined(__GNUC__) && __GNUC__
+# undef sprintf
 # pragma GCC poison sprintf
 #endif
 
@@ -57,8 +63,6 @@
 
 #include "filter_chain.h"
 
-#include <pthread.h>
-
 #if !defined(USE_ITHREADS)
 # error "Perl does not support ithreads!"
 #endif /* !defined(USE_ITHREADS) */
@@ -74,8 +78,9 @@
 #define PLUGIN_LOG      4
 #define PLUGIN_NOTIF    5
 #define PLUGIN_FLUSH    6
+#define PLUGIN_FLUSH_ALL 7  /* For collectd-5.6 only */
 
-#define PLUGIN_TYPES    7
+#define PLUGIN_TYPES    8
 
 #define PLUGIN_CONFIG   254
 #define PLUGIN_DATASET  255
 /* this is defined in DynaLoader.a */
 void boot_DynaLoader (PerlInterpreter *, CV *);
 
+static XS (Collectd_plugin_register_read);
+static XS (Collectd_plugin_register_write);
+static XS (Collectd_plugin_register_log);
+static XS (Collectd_plugin_register_notification);
+static XS (Collectd_plugin_register_flush);
+static XS (Collectd_plugin_unregister_read);
+static XS (Collectd_plugin_unregister_write);
+static XS (Collectd_plugin_unregister_log);
+static XS (Collectd_plugin_unregister_notification);
+static XS (Collectd_plugin_unregister_flush);
 static XS (Collectd_plugin_register_ds);
 static XS (Collectd_plugin_unregister_ds);
 static XS (Collectd_plugin_dispatch_values);
@@ -110,6 +125,14 @@ static XS (Collectd_plugin_log);
 static XS (Collectd__fc_register);
 static XS (Collectd_call_by_name);
 
+static int perl_read (user_data_t *ud);
+static int perl_write (const data_set_t *ds, const value_list_t *vl,
+               user_data_t *user_data);
+static void perl_log (int level, const char *msg, user_data_t *user_data);
+static int perl_notify (const notification_t *notif, user_data_t *user_data);
+static int perl_flush (cdtime_t timeout, const char *identifier,
+               user_data_t *user_data);
+
 /*
  * private data types
  */
@@ -117,6 +140,9 @@ static XS (Collectd_call_by_name);
 typedef struct c_ithread_s {
        /* the thread's Perl interpreter */
        PerlInterpreter *interp;
+       _Bool running;  /* thread is inside Perl interpreter */
+       _Bool shutdown;
+       pthread_t pthread;
 
        /* double linked list of threads */
        struct c_ithread_s *prev;
@@ -133,6 +159,7 @@ typedef struct {
 #endif /* COLLECT_DEBUG */
 
        pthread_mutex_t mutex;
+       pthread_mutexattr_t mutexattr;
 } c_ithread_list_t;
 
 /* name / user_data for Perl matches / targets */
@@ -158,6 +185,8 @@ extern char **environ;
  * private variables
  */
 
+static _Bool register_legacy_flush = 1;
+
 /* if perl_threads != NULL perl_threads->head must
  * point to the "base" thread */
 static c_ithread_list_t *perl_threads = NULL;
@@ -175,6 +204,18 @@ static struct {
        XS ((*f));
 } api[] =
 {
+       { "Collectd::plugin_register_read",       Collectd_plugin_register_read },
+       { "Collectd::plugin_register_write",      Collectd_plugin_register_write },
+       { "Collectd::plugin_register_log",        Collectd_plugin_register_log },
+       { "Collectd::plugin_register_notification",
+               Collectd_plugin_register_notification },
+       { "Collectd::plugin_register_flush",       Collectd_plugin_register_flush },
+       { "Collectd::plugin_unregister_read",     Collectd_plugin_unregister_read },
+       { "Collectd::plugin_unregister_write",    Collectd_plugin_unregister_write },
+       { "Collectd::plugin_unregister_log",      Collectd_plugin_unregister_log },
+       { "Collectd::plugin_unregister_notification",
+               Collectd_plugin_unregister_notification },
+       { "Collectd::plugin_unregister_flush",    Collectd_plugin_unregister_flush },
        { "Collectd::plugin_register_data_set",   Collectd_plugin_register_ds },
        { "Collectd::plugin_unregister_data_set", Collectd_plugin_unregister_ds },
        { "Collectd::plugin_dispatch_values",     Collectd_plugin_dispatch_values },
@@ -295,33 +336,31 @@ static int hv2data_source (pTHX_ HV *hash, data_source_t *ds)
        return 0;
 } /* static int hv2data_source (HV *, data_source_t *) */
 
-static int av2value (pTHX_ char *name, AV *array, value_t *value, int len)
+/* av2value converts at most "len" elements from "array" to "value". Returns the
+ * number of elements converted or zero on error. */
+static size_t av2value (pTHX_ char *name, AV *array, value_t *value, size_t array_len)
 {
        const data_set_t *ds;
 
-       int i = 0;
-
-       if ((NULL == name) || (NULL == array) || (NULL == value))
-               return -1;
-
-       if (av_len (array) < len - 1)
-               len = av_len (array) + 1;
-
-       if (0 >= len)
-               return -1;
+       if ((NULL == name) || (NULL == array) || (NULL == value) || (array_len == 0))
+               return 0;
 
        ds = plugin_get_ds (name);
        if (NULL == ds) {
                log_err ("av2value: Unknown dataset \"%s\"", name);
-               return -1;
+               return 0;
        }
 
-       if (ds->ds_num < len) {
-               log_warn ("av2value: Value length exceeds data set length.");
-               len = ds->ds_num;
+       if (array_len < ds->ds_num) {
+               log_warn ("av2value: array does not contain enough elements for type \"%s\": got %zu, want %zu",
+                               name, array_len, ds->ds_num);
+               return 0;
+       } else if (array_len > ds->ds_num) {
+               log_warn ("av2value: array contains excess elements for type \"%s\": got %zu, want %zu",
+                               name, array_len, ds->ds_num);
        }
 
-       for (i = 0; i < len; ++i) {
+       for (size_t i = 0; i < ds->ds_num; ++i) {
                SV **tmp = av_fetch (array, i, 0);
 
                if (NULL != tmp) {
@@ -335,11 +374,12 @@ static int av2value (pTHX_ char *name, AV *array, value_t *value, int len)
                                value[i].absolute = SvIV (*tmp);
                }
                else {
-                       return -1;
+                       return 0;
                }
        }
-       return len;
-} /* static int av2value (char *, AV *, value_t *, int) */
+
+       return ds->ds_num;
+} /* static size_t av2value (char *, AV *, value_t *, size_t) */
 
 /*
  * value list:
@@ -374,16 +414,14 @@ static int hv2value_list (pTHX_ HV *hash, value_list_t *vl)
 
        {
                AV  *array = (AV *)SvRV (*tmp);
-               int len    = av_len (array) + 1;
-
-               if (len <= 0)
+               /* av_len returns the highest index, not the actual length. */
+               size_t array_len = (size_t) (av_len (array) + 1);
+               if (array_len == 0)
                        return -1;
 
-               vl->values     = (value_t *)smalloc (len * sizeof (value_t));
-               vl->values_len = av2value (aTHX_ vl->type, (AV *)SvRV (*tmp),
-                               vl->values, len);
-
-               if (-1 == vl->values_len) {
+               vl->values     = calloc (array_len, sizeof (*vl->values));
+               vl->values_len = av2value (aTHX_ vl->type, (AV *)SvRV (*tmp), vl->values, array_len);
+               if (vl->values_len == 0) {
                        sfree (vl->values);
                        return -1;
                }
@@ -421,7 +459,7 @@ static int hv2value_list (pTHX_ HV *hash, value_list_t *vl)
 
 static int av2data_set (pTHX_ AV *array, char *name, data_set_t *ds)
 {
-       int len, i;
+       int len;
 
        if ((NULL == array) || (NULL == name) || (NULL == ds))
                return -1;
@@ -433,10 +471,10 @@ static int av2data_set (pTHX_ AV *array, char *name, data_set_t *ds)
                return -1;
        }
 
-       ds->ds = (data_source_t *)smalloc ((len + 1) * sizeof (data_source_t));
+       ds->ds = smalloc ((len + 1) * sizeof (*ds->ds));
        ds->ds_num = len + 1;
 
-       for (i = 0; i <= len; ++i) {
+       for (int i = 0; i <= len; ++i) {
                SV **elem = av_fetch (array, i, 0);
 
                if (NULL == elem) {
@@ -480,9 +518,8 @@ static int av2notification_meta (pTHX_ AV *array, notification_meta_t **meta)
        notification_meta_t **m = meta;
 
        int len = av_len (array);
-       int i;
 
-       for (i = 0; i <= len; ++i) {
+       for (int i = 0; i <= len; ++i) {
                SV **tmp = av_fetch (array, i, 0);
                HV  *hash;
 
@@ -497,7 +534,7 @@ static int av2notification_meta (pTHX_ AV *array, notification_meta_t **meta)
 
                hash = (HV *)SvRV (*tmp);
 
-               *m = (notification_meta_t *)smalloc (sizeof (**m));
+               *m = smalloc (sizeof (**m));
 
                if (NULL == (tmp = hv_fetch (hash, "name", 4, 0))) {
                        log_warn ("av2notification_meta: Skipping invalid "
@@ -598,14 +635,12 @@ static int hv2notification (pTHX_ HV *hash, notification_t *n)
 
 static int data_set2av (pTHX_ data_set_t *ds, AV *array)
 {
-       int i = 0;
-
        if ((NULL == ds) || (NULL == array))
                return -1;
 
        av_extend (array, ds->ds_num);
 
-       for (i = 0; i < ds->ds_num; ++i) {
+       for (size_t i = 0; i < ds->ds_num; ++i) {
                HV *source = newHV ();
 
                if (NULL == hv_store (source, "name", 4,
@@ -634,24 +669,17 @@ static int data_set2av (pTHX_ data_set_t *ds, AV *array)
 static int value_list2hv (pTHX_ value_list_t *vl, data_set_t *ds, HV *hash)
 {
        AV *values = NULL;
-
-       int i   = 0;
-       int len = 0;
+       size_t i;
 
        if ((NULL == vl) || (NULL == ds) || (NULL == hash))
                return -1;
 
-       len = vl->values_len;
-
-       if (ds->ds_num < len) {
-               log_warn ("value2av: Value length exceeds data set length.");
-               len = ds->ds_num;
-       }
-
        values = newAV ();
-       av_extend (values, len - 1);
+       /* av_extend takes the last *index* to which the array should be extended. */
+       av_extend (values, vl->values_len - 1);
 
-       for (i = 0; i < len; ++i) {
+       assert (ds->ds_num == vl->values_len);
+       for (i = 0; i < vl->values_len; ++i) {
                SV *val = NULL;
 
                if (DS_TYPE_COUNTER == ds->ds[i].type)
@@ -712,7 +740,6 @@ static int value_list2hv (pTHX_ value_list_t *vl, data_set_t *ds, HV *hash)
 static int notification_meta2av (pTHX_ notification_meta_t *meta, AV *array)
 {
        int meta_num = 0;
-       int i;
 
        while (meta) {
                ++meta_num;
@@ -721,7 +748,7 @@ static int notification_meta2av (pTHX_ notification_meta_t *meta, AV *array)
 
        av_extend (array, meta_num);
 
-       for (i = 0; NULL != meta; meta = meta->next, ++i) {
+       for (int i = 0; NULL != meta; meta = meta->next, ++i) {
                HV *m = newHV ();
                SV *value;
 
@@ -808,8 +835,6 @@ static int notification2hv (pTHX_ notification_t *n, HV *hash)
 
 static int oconfig_item2hv (pTHX_ oconfig_item_t *ci, HV *hash)
 {
-       int i;
-
        AV *values;
        AV *children;
 
@@ -826,7 +851,7 @@ static int oconfig_item2hv (pTHX_ oconfig_item_t *ci, HV *hash)
                return -1;
        }
 
-       for (i = 0; i < ci->values_num; ++i) {
+       for (int i = 0; i < ci->values_num; ++i) {
                SV *value;
 
                switch (ci->values[i].type) {
@@ -863,7 +888,7 @@ static int oconfig_item2hv (pTHX_ oconfig_item_t *ci, HV *hash)
                return -1;
        }
 
-       for (i = 0; i < ci->children_num; ++i) {
+       for (int i = 0; i < ci->children_num; ++i) {
                HV *child = newHV ();
 
                if (0 != oconfig_item2hv (aTHX_ ci->children + i, child)) {
@@ -984,15 +1009,13 @@ static int pplugin_write (pTHX_ const char *plugin, AV *data_set, HV *values)
  */
 static int pplugin_dispatch_notification (pTHX_ HV *notif)
 {
-       notification_t n;
+       notification_t n = { 0 };
 
        int ret;
 
        if (NULL == notif)
                return -1;
 
-       memset (&n, 0, sizeof (n));
-
        if (0 != hv2notification (aTHX_ notif, &n))
                return -1;
 
@@ -1002,14 +1025,41 @@ static int pplugin_dispatch_notification (pTHX_ HV *notif)
 } /* static int pplugin_dispatch_notification (HV *) */
 
 /*
+ * Call perl sub with thread locking flags handled.
+ */
+static int call_pv_locked (pTHX_ const char* sub_name)
+{
+       _Bool old_running;
+       int ret;
+
+       c_ithread_t *t = (c_ithread_t *)pthread_getspecific(perl_thr_key);
+       if (t == NULL) /* thread destroyed */
+               return 0;
+
+       old_running = t->running;
+       t->running = 1;
+
+       if (t->shutdown) {
+               t->running = old_running;
+               return 0;
+       }
+
+       ret = call_pv (sub_name, G_SCALAR|G_EVAL);
+
+       t->running = old_running;
+       return ret;
+} /* static int call_pv_locked (pTHX, *sub_name) */
+
+/*
  * Call all working functions of the given type.
  */
-static int pplugin_call_all (pTHX_ int type, ...)
+static int pplugin_call (pTHX_ int type, ...)
 {
        int retvals = 0;
 
        va_list ap;
        int ret = 0;
+       char *subname;
 
        dSP;
 
@@ -1023,9 +1073,17 @@ static int pplugin_call_all (pTHX_ int type, ...)
 
        PUSHMARK (SP);
 
-       XPUSHs (sv_2mortal (newSViv ((IV)type)));
+       if (PLUGIN_READ == type) {
+               subname = va_arg(ap, char *);
+       }
+       else if (PLUGIN_WRITE == type) {
+               data_set_t   *ds;
+               value_list_t *vl;
+
+               AV *pds = newAV ();
+               HV *pvl = newHV ();
 
-       if (PLUGIN_WRITE == type) {
+               subname = va_arg(ap, char *);
                /*
                 * $_[0] = $plugin_type;
                 *
@@ -1051,12 +1109,6 @@ static int pplugin_call_all (pTHX_ int type, ...)
                 *   type_instance   => $type_instance
                 * };
                 */
-               data_set_t   *ds;
-               value_list_t *vl;
-
-               AV *pds = newAV ();
-               HV *pvl = newHV ();
-
                ds = va_arg (ap, data_set_t *);
                vl = va_arg (ap, value_list_t *);
 
@@ -1079,6 +1131,7 @@ static int pplugin_call_all (pTHX_ int type, ...)
                XPUSHs (sv_2mortal (newRV_noinc ((SV *)pvl)));
        }
        else if (PLUGIN_LOG == type) {
+               subname = va_arg(ap, char *);
                /*
                 * $_[0] = $level;
                 *
@@ -1088,6 +1141,10 @@ static int pplugin_call_all (pTHX_ int type, ...)
                XPUSHs (sv_2mortal (newSVpv (va_arg (ap, char *), 0)));
        }
        else if (PLUGIN_NOTIF == type) {
+               notification_t *n;
+               HV *notif = newHV ();
+
+               subname = va_arg(ap, char *);
                /*
                 * $_[0] =
                 * {
@@ -1101,9 +1158,6 @@ static int pplugin_call_all (pTHX_ int type, ...)
                 *   type_instance   => $type_instance
                 * };
                 */
-               notification_t *n;
-               HV *notif = newHV ();
-
                n = va_arg (ap, notification_t *);
 
                if (-1 == notification2hv (aTHX_ n, notif)) {
@@ -1117,23 +1171,53 @@ static int pplugin_call_all (pTHX_ int type, ...)
        }
        else if (PLUGIN_FLUSH == type) {
                cdtime_t timeout;
+               subname = va_arg(ap, char *);
+               /*
+                * $_[0] = $timeout;
+                * $_[1] = $identifier;
+                */
+               timeout = va_arg (ap, cdtime_t);
 
+               XPUSHs (sv_2mortal (newSVnv (CDTIME_T_TO_DOUBLE (timeout))));
+               XPUSHs (sv_2mortal (newSVpv (va_arg (ap, char *), 0)));
+       }
+       else if (PLUGIN_FLUSH_ALL == type) {
+               cdtime_t timeout;
+               subname = "Collectd::plugin_call_all";
                /*
                 * $_[0] = $timeout;
                 * $_[1] = $identifier;
                 */
                timeout = va_arg (ap, cdtime_t);
 
+               XPUSHs (sv_2mortal (newSViv ((IV)PLUGIN_FLUSH)));
                XPUSHs (sv_2mortal (newSVnv (CDTIME_T_TO_DOUBLE (timeout))));
                XPUSHs (sv_2mortal (newSVpv (va_arg (ap, char *), 0)));
        }
+       else if (PLUGIN_INIT == type) {
+               subname = "Collectd::plugin_call_all";
+               XPUSHs (sv_2mortal (newSViv ((IV)type)));
+       }
+       else if (PLUGIN_SHUTDOWN == type) {
+               subname = "Collectd::plugin_call_all";
+               XPUSHs (sv_2mortal (newSViv ((IV)type)));
+       }
+       else { /* Unknown type. Run 'plugin_call_all' and make compiler happy */
+               subname = "Collectd::plugin_call_all";
+               XPUSHs (sv_2mortal (newSViv ((IV)type)));
+       }
 
        PUTBACK;
 
-       retvals = call_pv ("Collectd::plugin_call_all", G_SCALAR);
+       retvals = call_pv_locked (aTHX_ subname);
 
        SPAGAIN;
-       if (0 < retvals) {
+       if (SvTRUE(ERRSV)) {
+               if (PLUGIN_LOG != type)
+                       ERROR ("perl: %s error: %s", subname, SvPV_nolen(ERRSV));
+               ret = -1;
+       }
+       else if (0 < retvals) {
                SV *tmp = POPs;
                if (! SvTRUE (tmp))
                        ret = -1;
@@ -1145,10 +1229,10 @@ static int pplugin_call_all (pTHX_ int type, ...)
 
        va_end (ap);
        return ret;
-} /* static int pplugin_call_all (int, ...) */
+} /* static int pplugin_call (int, ...) */
 
 /*
- * collectd's perl interpreter based thread implementation.
+ * collectd's Perl interpreter based thread implementation.
  *
  * This has been inspired by Perl's ithreads introduced in version 5.6.0.
  */
@@ -1161,6 +1245,10 @@ static void c_ithread_destroy (c_ithread_t *ithread)
        assert (NULL != perl_threads);
 
        PERL_SET_CONTEXT (aTHX);
+       /* Mark as running to avoid deadlock:
+          c_ithread_destroy -> log_debug -> perl_log()
+       */
+       ithread->running = 1;
        log_debug ("Shutting down Perl interpreter %p...", aTHX);
 
 #if COLLECT_DEBUG
@@ -1221,7 +1309,7 @@ static c_ithread_t *c_ithread_create (PerlInterpreter *base)
 
        assert (NULL != perl_threads);
 
-       t = (c_ithread_t *)smalloc (sizeof (c_ithread_t));
+       t = smalloc (sizeof (*t));
        memset (t, 0, sizeof (c_ithread_t));
 
        t->interp = (NULL == base)
@@ -1251,6 +1339,9 @@ static c_ithread_t *c_ithread_create (PerlInterpreter *base)
                t->prev = perl_threads->tail;
        }
 
+       t->pthread = pthread_self();
+       t->running = 0;
+       t->shutdown = 0;
        perl_threads->tail = t;
 
        pthread_setspecific (perl_thr_key, (const void *)t);
@@ -1371,7 +1462,7 @@ static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
 
        PUTBACK;
 
-       retvals = call_pv ("Collectd::fc_call", G_SCALAR);
+       retvals = call_pv_locked (aTHX_ "Collectd::fc_call");
 
        if ((FC_CB_EXEC == cb_type) && (meta != NULL)) {
                assert (pmeta != NULL);
@@ -1381,7 +1472,11 @@ static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...)
        }
 
        SPAGAIN;
-       if (0 < retvals) {
+       if (SvTRUE(ERRSV)) {
+               ERROR ("perl: Collectd::fc_call error: %s", SvPV_nolen(ERRSV));
+               ret = -1;
+       }
+       else if (0 < retvals) {
                SV *tmp = POPs;
 
                /* the exec callbacks return a status, while
@@ -1431,7 +1526,7 @@ static int fc_create (int type, const oconfig_item_t *ci, void **user_data)
                return -1;
        }
 
-       data = (pfc_user_data_t *)smalloc (sizeof (*data));
+       data = smalloc (sizeof (*data));
        data->name      = sstrdup (ci->values[0].value.string);
        data->user_data = newSV (0);
 
@@ -1550,6 +1645,173 @@ static target_proc_t ptarget = {
  * Exported Perl API.
  */
 
+static void _plugin_register_generic_userdata (pTHX, int type, const char *desc)
+{
+       int ret   = 0;
+       user_data_t userdata;
+       char *pluginname;
+
+       dXSARGS;
+
+       if (2 != items) {
+               log_err ("Usage: Collectd::plugin_register_%s(pluginname, subname)",
+                                       desc);
+               XSRETURN_EMPTY;
+       }
+
+       if (! SvOK (ST (0))) {
+               log_err ("Collectd::plugin_register_%s(pluginname, subname): "
+                        "Invalid pluginname", desc);
+               XSRETURN_EMPTY;
+       }
+       if (! SvOK (ST (1))) {
+               log_err ("Collectd::plugin_register_%s(pluginname, subname): "
+                        "Invalid subname", desc);
+               XSRETURN_EMPTY;
+       }
+
+       /* Use pluginname as-is to allow flush a single perl plugin */
+       pluginname = SvPV_nolen (ST (0));
+
+       log_debug ("Collectd::plugin_register_%s: "
+                       "plugin = \"%s\", sub = \"%s\"",
+                       desc, pluginname, SvPV_nolen (ST (1)));
+
+       memset(&userdata, 0, sizeof(userdata));
+       userdata.data = strdup(SvPV_nolen (ST (1)));
+       userdata.free_func = free;
+
+       if (PLUGIN_READ == type) {
+               ret = plugin_register_complex_read(
+                       "perl",                /* group */
+                       pluginname,
+                       perl_read,
+                       plugin_get_interval(), /* Default interval */
+                       &userdata);
+       }
+       else if (PLUGIN_WRITE == type) {
+               ret = plugin_register_write(pluginname, perl_write, &userdata);
+       }
+       else if (PLUGIN_LOG == type) {
+               ret = plugin_register_log(pluginname, perl_log, &userdata);
+       }
+       else if (PLUGIN_NOTIF == type) {
+               ret = plugin_register_notification(pluginname, perl_notify, &userdata);
+       }
+       else if (PLUGIN_FLUSH == type) {
+               if (1 == register_legacy_flush) { /* For collectd-5.7 only, #1731 */
+                       register_legacy_flush = 0;
+                       ret = plugin_register_flush("perl", perl_flush, /* user_data = */ NULL);
+               }
+
+               if (0 == ret)
+                       ret = plugin_register_flush(pluginname, perl_flush, &userdata);
+       }
+       else {
+               ret = -1;
+       }
+
+       if (0 == ret)
+               XSRETURN_YES;
+       else {
+               free (userdata.data);
+               XSRETURN_EMPTY;
+       }
+} /* static void _plugin_register_generic_userdata ( ... ) */
+
+/*
+ * Collectd::plugin_register_TYPE (pluginname, subname).
+ *
+ * pluginname:
+ *   name of the perl plugin
+ *
+ * subname:
+ *   name of the plugin's subroutine that does the work
+ */
+
+static XS (Collectd_plugin_register_read) {
+       return _plugin_register_generic_userdata(aTHX, PLUGIN_READ, "read");
+}
+
+static XS (Collectd_plugin_register_write) {
+       return _plugin_register_generic_userdata(aTHX, PLUGIN_WRITE, "write");
+}
+
+static XS (Collectd_plugin_register_log) {
+       return _plugin_register_generic_userdata(aTHX, PLUGIN_LOG, "log");
+}
+
+static XS (Collectd_plugin_register_notification) {
+       return _plugin_register_generic_userdata(aTHX, PLUGIN_NOTIF, "notification");
+}
+
+static XS (Collectd_plugin_register_flush) {
+       return _plugin_register_generic_userdata(aTHX, PLUGIN_FLUSH, "flush");
+}
+
+typedef int perl_unregister_function_t(const char *name);
+
+static void _plugin_unregister_generic (pTHX,
+                               perl_unregister_function_t *unreg, const char *desc)
+{
+       dXSARGS;
+
+       if (1 != items) {
+               log_err ("Usage: Collectd::plugin_unregister_%s(pluginname)", desc);
+               XSRETURN_EMPTY;
+       }
+
+       if (! SvOK (ST (0))) {
+               log_err ("Collectd::plugin_unregister_%s(pluginname): "
+                        "Invalid pluginname", desc);
+               XSRETURN_EMPTY;
+       }
+
+       log_debug ("Collectd::plugin_unregister_%s: plugin = \"%s\"",
+                       desc, SvPV_nolen (ST (0)));
+
+       unreg(SvPV_nolen (ST (0)));
+
+       XSRETURN_EMPTY;
+
+       return;
+} /* static void _plugin_unregister_generic ( ... ) */
+
+/*
+ * Collectd::plugin_unregister_TYPE (pluginname).
+ *
+ * TYPE:
+ *   type of callback to be unregistered: read, write, log, notification, flush
+ *
+ * pluginname:
+ *   name of the perl plugin
+ */
+
+static XS (Collectd_plugin_unregister_read) {
+       return _plugin_unregister_generic(aTHX,
+                               plugin_unregister_read, "read");
+}
+
+static XS (Collectd_plugin_unregister_write) {
+       return _plugin_unregister_generic(aTHX,
+                               plugin_unregister_write, "write");
+}
+
+static XS (Collectd_plugin_unregister_log) {
+       return _plugin_unregister_generic(aTHX,
+                               plugin_unregister_log, "log");
+}
+
+static XS (Collectd_plugin_unregister_notification) {
+       return _plugin_unregister_generic(aTHX,
+                               plugin_unregister_notification, "notification");
+}
+
+static XS (Collectd_plugin_unregister_flush) {
+       return _plugin_unregister_generic(aTHX,
+                               plugin_unregister_flush, "flush");
+}
+
 /*
  * Collectd::plugin_register_data_set (type, dataset).
  *
@@ -1911,6 +2173,7 @@ static XS (Collectd_call_by_name)
 
 static int perl_init (void)
 {
+       int status;
        dTHX;
 
        if (NULL == perl_threads)
@@ -1928,10 +2191,22 @@ static int perl_init (void)
 
        log_debug ("perl_init: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
-       return pplugin_call_all (aTHX_ PLUGIN_INIT);
+
+       /* Lock the base thread to avoid race conditions with c_ithread_create().
+        * See https://github.com/collectd/collectd/issues/9 and
+        *     https://github.com/collectd/collectd/issues/1706 for details.
+       */
+       assert (aTHX == perl_threads->head->interp);
+       pthread_mutex_lock (&perl_threads->mutex);
+
+       status = pplugin_call (aTHX_ PLUGIN_INIT);
+
+       pthread_mutex_unlock (&perl_threads->mutex);
+
+       return status;
 } /* static int perl_init (void) */
 
-static int perl_read (void)
+static int perl_read (user_data_t *user_data)
 {
        dTHX;
 
@@ -1955,11 +2230,12 @@ static int perl_read (void)
 
        log_debug ("perl_read: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
-       return pplugin_call_all (aTHX_ PLUGIN_READ);
-} /* static int perl_read (void) */
+
+       return pplugin_call (aTHX_ PLUGIN_READ, user_data->data);
+} /* static int perl_read (user_data_t *user_data) */
 
 static int perl_write (const data_set_t *ds, const value_list_t *vl,
-               user_data_t __attribute__((unused)) *user_data)
+               user_data_t *user_data)
 {
        int status;
        dTHX;
@@ -1985,7 +2261,7 @@ static int perl_write (const data_set_t *ds, const value_list_t *vl,
 
        log_debug ("perl_write: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
-       status = pplugin_call_all (aTHX_ PLUGIN_WRITE, ds, vl);
+       status = pplugin_call (aTHX_ PLUGIN_WRITE, user_data->data, ds, vl);
 
        if (aTHX == perl_threads->head->interp)
                pthread_mutex_unlock (&perl_threads->mutex);
@@ -1994,7 +2270,7 @@ static int perl_write (const data_set_t *ds, const value_list_t *vl,
 } /* static int perl_write (const data_set_t *, const value_list_t *) */
 
 static void perl_log (int level, const char *msg,
-               user_data_t __attribute__((unused)) *user_data)
+               user_data_t *user_data)
 {
        dTHX;
 
@@ -2013,11 +2289,13 @@ static void perl_log (int level, const char *msg,
 
        /* Lock the base thread if this is not called from one of the read threads
         * to avoid race conditions with c_ithread_create(). See
-        * https://github.com/collectd/collectd/issues/9 for details. */
+        * https://github.com/collectd/collectd/issues/9 for details.
+       */
+
        if (aTHX == perl_threads->head->interp)
                pthread_mutex_lock (&perl_threads->mutex);
 
-       pplugin_call_all (aTHX_ PLUGIN_LOG, level, msg);
+       pplugin_call (aTHX_ PLUGIN_LOG, user_data->data, level, msg);
 
        if (aTHX == perl_threads->head->interp)
                pthread_mutex_unlock (&perl_threads->mutex);
@@ -2025,8 +2303,7 @@ static void perl_log (int level, const char *msg,
        return;
 } /* static void perl_log (int, const char *) */
 
-static int perl_notify (const notification_t *notif,
-               user_data_t __attribute__((unused)) *user_data)
+static int perl_notify (const notification_t *notif, user_data_t *user_data)
 {
        dTHX;
 
@@ -2042,11 +2319,11 @@ static int perl_notify (const notification_t *notif,
 
                aTHX = t->interp;
        }
-       return pplugin_call_all (aTHX_ PLUGIN_NOTIF, notif);
+       return pplugin_call (aTHX_ PLUGIN_NOTIF, user_data->data, notif);
 } /* static int perl_notify (const notification_t *) */
 
 static int perl_flush (cdtime_t timeout, const char *identifier,
-               user_data_t __attribute__((unused)) *user_data)
+               user_data_t *user_data)
 {
        dTHX;
 
@@ -2062,25 +2339,28 @@ static int perl_flush (cdtime_t timeout, const char *identifier,
 
                aTHX = t->interp;
        }
-       return pplugin_call_all (aTHX_ PLUGIN_FLUSH, timeout, identifier);
+
+       /* For collectd-5.6 only, #1731 */
+       if (user_data == NULL || user_data->data == NULL)
+               return pplugin_call (aTHX_ PLUGIN_FLUSH_ALL, timeout, identifier);
+
+       return pplugin_call (aTHX_ PLUGIN_FLUSH, user_data->data, timeout, identifier);
 } /* static int perl_flush (const int) */
 
 static int perl_shutdown (void)
 {
-       c_ithread_t *t = NULL;
-
-       int ret = 0;
+       c_ithread_t *t;
+       int ret;
 
        dTHX;
 
        plugin_unregister_complex_config ("perl");
+       plugin_unregister_read_group ("perl");
 
        if (NULL == perl_threads)
                return 0;
 
        if (NULL == aTHX) {
-               t = NULL;
-
                pthread_mutex_lock (&perl_threads->mutex);
                t = c_ithread_create (perl_threads->head->interp);
                pthread_mutex_unlock (&perl_threads->mutex);
@@ -2091,30 +2371,40 @@ static int perl_shutdown (void)
        log_debug ("perl_shutdown: c_ithread: interp = %p (active threads: %i)",
                        aTHX, perl_threads->number_of_threads);
 
-       plugin_unregister_log ("perl");
-       plugin_unregister_notification ("perl");
        plugin_unregister_init ("perl");
-       plugin_unregister_read ("perl");
-       plugin_unregister_write ("perl");
-       plugin_unregister_flush ("perl");
+       plugin_unregister_flush ("perl"); /* For collectd-5.6 only, #1731 */
 
-       ret = pplugin_call_all (aTHX_ PLUGIN_SHUTDOWN);
+       ret = pplugin_call (aTHX_ PLUGIN_SHUTDOWN);
 
        pthread_mutex_lock (&perl_threads->mutex);
        t = perl_threads->tail;
 
        while (NULL != t) {
+               struct timespec ts_wait;
                c_ithread_t *thr = t;
 
                /* the pointer has to be advanced before destroying
                 * the thread as this will free the memory */
                t = t->prev;
 
+               thr->shutdown = 1;
+               if (thr->running) {
+                       /* Give some time to thread to exit from Perl interpreter */
+                       WARNING ("perl shutdown: Thread is running inside Perl. Waiting.");
+                       ts_wait.tv_sec = 0;
+                       ts_wait.tv_nsec = 500000;
+                       nanosleep (&ts_wait, NULL);
+               }
+               if (thr->running) {
+                       pthread_kill (thr->pthread, SIGTERM);
+                       ERROR ("perl shutdown: Thread hangs inside Perl. Thread killed.");
+               }
                c_ithread_destroy (thr);
        }
 
        pthread_mutex_unlock (&perl_threads->mutex);
        pthread_mutex_destroy (&perl_threads->mutex);
+       pthread_mutexattr_destroy (&perl_threads->mutexattr);
 
        sfree (perl_threads);
 
@@ -2186,21 +2476,19 @@ static void xs_init (pTHX)
        SV   *tmp   = NULL;
        char *file  = __FILE__;
 
-       int i = 0;
-
        dXSUB_SYS;
 
        /* enable usage of Perl modules using shared libraries */
        newXS ("DynaLoader::boot_DynaLoader", boot_DynaLoader, file);
 
        /* register API */
-       for (i = 0; NULL != api[i].f; ++i)
+       for (int i = 0; NULL != api[i].f; ++i)
                newXS (api[i].name, api[i].f, file);
 
        stash = gv_stashpv ("Collectd", 1);
 
        /* export "constants" */
-       for (i = 0; '\0' != constants[i].name[0]; ++i)
+       for (int i = 0; '\0' != constants[i].name[0]; ++i)
                newCONSTSUB (stash, constants[i].name, newSViv (constants[i].value));
 
        /* export global variables
@@ -2209,7 +2497,7 @@ static void xs_init (pTHX)
         * accessing any such variable (this is basically the same as using
         * tie() in Perl) */
        /* global strings */
-       for (i = 0; '\0' != g_strings[i].name[0]; ++i) {
+       for (int i = 0; '\0' != g_strings[i].name[0]; ++i) {
                tmp = get_sv (g_strings[i].name, 1);
                sv_magicext (tmp, NULL, PERL_MAGIC_ext, &g_pv_vtbl,
                                g_strings[i].var, 0);
@@ -2234,9 +2522,7 @@ static int init_pi (int argc, char **argv)
        log_info ("Initializing Perl interpreter...");
 #if COLLECT_DEBUG
        {
-               int i = 0;
-
-               for (i = 0; i < argc; ++i)
+               for (int i = 0; i < argc; ++i)
                        log_debug ("argv[%i] = \"%s\"", i, argv[i]);
        }
 #endif /* COLLECT_DEBUG */
@@ -2255,10 +2541,12 @@ static int init_pi (int argc, char **argv)
 #endif
        PERL_SYS_INIT3 (&argc, &argv, &environ);
 
-       perl_threads = (c_ithread_list_t *)smalloc (sizeof (c_ithread_list_t));
+       perl_threads = smalloc (sizeof (*perl_threads));
        memset (perl_threads, 0, sizeof (c_ithread_list_t));
 
-       pthread_mutex_init (&perl_threads->mutex, NULL);
+       pthread_mutexattr_init(&perl_threads->mutexattr);
+       pthread_mutexattr_settype(&perl_threads->mutexattr, PTHREAD_MUTEX_RECURSIVE);
+       pthread_mutex_init (&perl_threads->mutex, &perl_threads->mutexattr);
        /* locking the mutex should not be necessary at this point
         * but let's just do it for the sake of completeness */
        pthread_mutex_lock (&perl_threads->mutex);
@@ -2296,15 +2584,7 @@ static int init_pi (int argc, char **argv)
 
        perl_run (aTHX);
 
-       plugin_register_log ("perl", perl_log, /* user_data = */ NULL);
-       plugin_register_notification ("perl", perl_notify,
-                       /* user_data = */ NULL);
        plugin_register_init ("perl", perl_init);
-
-       plugin_register_read ("perl", perl_read);
-
-       plugin_register_write ("perl", perl_write, /* user_data = */ NULL);
-       plugin_register_flush ("perl", perl_flush, /* user_data = */ NULL);
        plugin_register_shutdown ("perl", perl_shutdown);
        return 0;
 } /* static int init_pi (const char **, const int) */
@@ -2339,7 +2619,7 @@ static int perl_config_loadplugin (pTHX_ oconfig_item_t *ci)
 
        aTHX = perl_threads->head->interp;
 
-       log_debug ("perl_config: loading perl plugin \"%s\"", value);
+       log_debug ("perl_config: Loading Perl plugin \"%s\"", value);
        load_module (PERL_LOADMOD_NOIMPORT,
                        newSVpv (module_name, strlen (module_name)), Nullsv);
        return 0;
@@ -2385,7 +2665,7 @@ static int perl_config_enabledebugger (pTHX_ oconfig_item_t *ci)
 
        value = ci->values[0].value.string;
 
-       perl_argv = (char **)realloc (perl_argv,
+       perl_argv = realloc (perl_argv,
                        (++perl_argc + 1) * sizeof (char *));
 
        if (NULL == perl_argv) {
@@ -2397,7 +2677,7 @@ static int perl_config_enabledebugger (pTHX_ oconfig_item_t *ci)
                perl_argv[perl_argc - 1] = "-d";
        }
        else {
-               perl_argv[perl_argc - 1] = (char *)smalloc (strlen (value) + 4);
+               perl_argv[perl_argc - 1] = smalloc (strlen (value) + 4);
                sstrncpy (perl_argv[perl_argc - 1], "-d:", 4);
                sstrncpy (perl_argv[perl_argc - 1] + 3, value, strlen (value) + 1);
        }
@@ -2422,7 +2702,7 @@ static int perl_config_includedir (pTHX_ oconfig_item_t *ci)
        value = ci->values[0].value.string;
 
        if (NULL == aTHX) {
-               perl_argv = (char **)realloc (perl_argv,
+               perl_argv = realloc (perl_argv,
                                (++perl_argc + 1) * sizeof (char *));
 
                if (NULL == perl_argv) {
@@ -2430,7 +2710,7 @@ static int perl_config_includedir (pTHX_ oconfig_item_t *ci)
                        exit (3);
                }
 
-               perl_argv[perl_argc - 1] = (char *)smalloc (strlen (value) + 3);
+               perl_argv[perl_argc - 1] = smalloc (strlen (value) + 3);
                sstrncpy(perl_argv[perl_argc - 1], "-I", 3);
                sstrncpy(perl_argv[perl_argc - 1] + 2, value, strlen (value) + 1);
 
@@ -2503,11 +2783,10 @@ static int perl_config_plugin (pTHX_ oconfig_item_t *ci)
 static int perl_config (oconfig_item_t *ci)
 {
        int status = 0;
-       int i = 0;
 
        dTHXa (NULL);
 
-       for (i = 0; i < ci->children_num; ++i) {
+       for (int i = 0; i < ci->children_num; ++i) {
                oconfig_item_t *c = ci->children + i;
                int current_status = 0;
 
@@ -2527,6 +2806,8 @@ static int perl_config (oconfig_item_t *ci)
                        current_status = perl_config_includedir (aTHX_ c);
                else if (0 == strcasecmp (c->key, "Plugin"))
                        current_status = perl_config_plugin (aTHX_ c);
+               else if (0 == strcasecmp (c->key, "RegisterLegacyFlush"))
+                       cf_util_get_boolean (c, &register_legacy_flush);
                else
                {
                        log_warn ("Ignoring unknown config key \"%s\".", c->key);
@@ -2548,7 +2829,7 @@ static int perl_config (oconfig_item_t *ci)
 void module_register (void)
 {
        perl_argc = 4;
-       perl_argv = (char **)smalloc ((perl_argc + 1) * sizeof (char *));
+       perl_argv = smalloc ((perl_argc + 1) * sizeof (*perl_argv));
 
        /* default options for the Perl interpreter */
        perl_argv[0] = "";