X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fperl.c;h=9eef6c1ad1c88c99798cb2b8e8857c77e42658eb;hb=db961f476426f5dd3ca1663ffc094f0fc7f6f8a2;hp=27214bb6568acc18796fe913be4d01fa3969d9ef;hpb=c8d167c5af263b6f56b39275fd5b281bdaf48b02;p=collectd.git diff --git a/src/perl.c b/src/perl.c index 27214bb6..9eef6c1a 100644 --- a/src/perl.c +++ b/src/perl.c @@ -29,14 +29,13 @@ * interface for collectd plugins written in perl. */ -/* do not automatically get the thread specific perl interpreter */ +/* do not automatically get the thread specific Perl interpreter */ #define PERL_NO_GET_CONTEXT #define DONT_POISON_SPRINTF_YET 1 #include "collectd.h" -#undef DONT_POISON_SPRINTF_YET -#include "configfile.h" +#undef DONT_POISON_SPRINTF_YET #if HAVE_STDBOOL_H # include @@ -63,8 +62,6 @@ #include "filter_chain.h" -#include - #if !defined(USE_ITHREADS) # error "Perl does not support ithreads!" #endif /* !defined(USE_ITHREADS) */ @@ -123,6 +120,9 @@ static XS (Collectd_call_by_name); typedef struct c_ithread_s { /* the thread's Perl interpreter */ PerlInterpreter *interp; + _Bool running; /* thread is inside Perl interpreter */ + _Bool shutdown; + pthread_t pthread; /* double linked list of threads */ struct c_ithread_s *prev; @@ -139,6 +139,7 @@ typedef struct { #endif /* COLLECT_DEBUG */ pthread_mutex_t mutex; + pthread_mutexattr_t mutexattr; } c_ithread_list_t; /* name / user_data for Perl matches / targets */ @@ -306,7 +307,6 @@ static int hv2data_source (pTHX_ HV *hash, data_source_t *ds) static size_t av2value (pTHX_ char *name, AV *array, value_t *value, size_t array_len) { const data_set_t *ds; - size_t i; if ((NULL == name) || (NULL == array) || (NULL == value) || (array_len == 0)) return 0; @@ -326,7 +326,7 @@ static size_t av2value (pTHX_ char *name, AV *array, value_t *value, size_t arra name, array_len, ds->ds_num); } - for (i = 0; i < ds->ds_num; ++i) { + for (size_t i = 0; i < ds->ds_num; ++i) { SV **tmp = av_fetch (array, i, 0); if (NULL != tmp) { @@ -425,7 +425,7 @@ static int hv2value_list (pTHX_ HV *hash, value_list_t *vl) static int av2data_set (pTHX_ AV *array, char *name, data_set_t *ds) { - int len, i; + int len; if ((NULL == array) || (NULL == name) || (NULL == ds)) return -1; @@ -440,7 +440,7 @@ static int av2data_set (pTHX_ AV *array, char *name, data_set_t *ds) ds->ds = smalloc ((len + 1) * sizeof (*ds->ds)); ds->ds_num = len + 1; - for (i = 0; i <= len; ++i) { + for (int i = 0; i <= len; ++i) { SV **elem = av_fetch (array, i, 0); if (NULL == elem) { @@ -484,9 +484,8 @@ static int av2notification_meta (pTHX_ AV *array, notification_meta_t **meta) notification_meta_t **m = meta; int len = av_len (array); - int i; - for (i = 0; i <= len; ++i) { + for (int i = 0; i <= len; ++i) { SV **tmp = av_fetch (array, i, 0); HV *hash; @@ -602,14 +601,12 @@ static int hv2notification (pTHX_ HV *hash, notification_t *n) static int data_set2av (pTHX_ data_set_t *ds, AV *array) { - size_t i; - if ((NULL == ds) || (NULL == array)) return -1; av_extend (array, ds->ds_num); - for (i = 0; i < ds->ds_num; ++i) { + for (size_t i = 0; i < ds->ds_num; ++i) { HV *source = newHV (); if (NULL == hv_store (source, "name", 4, @@ -709,7 +706,6 @@ static int value_list2hv (pTHX_ value_list_t *vl, data_set_t *ds, HV *hash) static int notification_meta2av (pTHX_ notification_meta_t *meta, AV *array) { int meta_num = 0; - int i; while (meta) { ++meta_num; @@ -718,7 +714,7 @@ static int notification_meta2av (pTHX_ notification_meta_t *meta, AV *array) av_extend (array, meta_num); - for (i = 0; NULL != meta; meta = meta->next, ++i) { + for (int i = 0; NULL != meta; meta = meta->next, ++i) { HV *m = newHV (); SV *value; @@ -805,8 +801,6 @@ static int notification2hv (pTHX_ notification_t *n, HV *hash) static int oconfig_item2hv (pTHX_ oconfig_item_t *ci, HV *hash) { - int i; - AV *values; AV *children; @@ -823,7 +817,7 @@ static int oconfig_item2hv (pTHX_ oconfig_item_t *ci, HV *hash) return -1; } - for (i = 0; i < ci->values_num; ++i) { + for (int i = 0; i < ci->values_num; ++i) { SV *value; switch (ci->values[i].type) { @@ -860,7 +854,7 @@ static int oconfig_item2hv (pTHX_ oconfig_item_t *ci, HV *hash) return -1; } - for (i = 0; i < ci->children_num; ++i) { + for (int i = 0; i < ci->children_num; ++i) { HV *child = newHV (); if (0 != oconfig_item2hv (aTHX_ ci->children + i, child)) { @@ -981,15 +975,13 @@ static int pplugin_write (pTHX_ const char *plugin, AV *data_set, HV *values) */ static int pplugin_dispatch_notification (pTHX_ HV *notif) { - notification_t n; + notification_t n = { 0 }; int ret; if (NULL == notif) return -1; - memset (&n, 0, sizeof (n)); - if (0 != hv2notification (aTHX_ notif, &n)) return -1; @@ -999,6 +991,32 @@ static int pplugin_dispatch_notification (pTHX_ HV *notif) } /* static int pplugin_dispatch_notification (HV *) */ /* + * Call perl sub with thread locking flags handled. + */ +static int call_pv_locked (pTHX_ const char* sub_name) +{ + _Bool old_running; + int ret; + + c_ithread_t *t = (c_ithread_t *)pthread_getspecific(perl_thr_key); + if (t == NULL) /* thread destroyed */ + return 0; + + old_running = t->running; + t->running = 1; + + if (t->shutdown) { + t->running = old_running; + return 0; + } + + ret = call_pv (sub_name, G_SCALAR); + + t->running = old_running; + return ret; +} /* static int call_pv_locked (pTHX, *sub_name) */ + +/* * Call all working functions of the given type. */ static int pplugin_call_all (pTHX_ int type, ...) @@ -1127,7 +1145,7 @@ static int pplugin_call_all (pTHX_ int type, ...) PUTBACK; - retvals = call_pv ("Collectd::plugin_call_all", G_SCALAR); + retvals = call_pv_locked (aTHX_ "Collectd::plugin_call_all"); SPAGAIN; if (0 < retvals) { @@ -1145,7 +1163,7 @@ static int pplugin_call_all (pTHX_ int type, ...) } /* static int pplugin_call_all (int, ...) */ /* - * collectd's perl interpreter based thread implementation. + * collectd's Perl interpreter based thread implementation. * * This has been inspired by Perl's ithreads introduced in version 5.6.0. */ @@ -1248,6 +1266,9 @@ static c_ithread_t *c_ithread_create (PerlInterpreter *base) t->prev = perl_threads->tail; } + t->pthread = pthread_self(); + t->running = 0; + t->shutdown = 0; perl_threads->tail = t; pthread_setspecific (perl_thr_key, (const void *)t); @@ -1368,7 +1389,7 @@ static int fc_call (pTHX_ int type, int cb_type, pfc_user_data_t *data, ...) PUTBACK; - retvals = call_pv ("Collectd::fc_call", G_SCALAR); + retvals = call_pv_locked (aTHX_ "Collectd::fc_call"); if ((FC_CB_EXEC == cb_type) && (meta != NULL)) { assert (pmeta != NULL); @@ -1908,6 +1929,7 @@ static XS (Collectd_call_by_name) static int perl_init (void) { + int status; dTHX; if (NULL == perl_threads) @@ -1925,7 +1947,19 @@ static int perl_init (void) log_debug ("perl_init: c_ithread: interp = %p (active threads: %i)", aTHX, perl_threads->number_of_threads); - return pplugin_call_all (aTHX_ PLUGIN_INIT); + + /* Lock the base thread to avoid race conditions with c_ithread_create(). + * See https://github.com/collectd/collectd/issues/9 and + * https://github.com/collectd/collectd/issues/1706 for details. + */ + assert (aTHX == perl_threads->head->interp); + pthread_mutex_lock (&perl_threads->mutex); + + status = pplugin_call_all (aTHX_ PLUGIN_INIT); + + pthread_mutex_unlock (&perl_threads->mutex); + + return status; } /* static int perl_init (void) */ static int perl_read (void) @@ -2010,7 +2044,9 @@ static void perl_log (int level, const char *msg, /* Lock the base thread if this is not called from one of the read threads * to avoid race conditions with c_ithread_create(). See - * https://github.com/collectd/collectd/issues/9 for details. */ + * https://github.com/collectd/collectd/issues/9 for details. + */ + if (aTHX == perl_threads->head->interp) pthread_mutex_lock (&perl_threads->mutex); @@ -2064,9 +2100,8 @@ static int perl_flush (cdtime_t timeout, const char *identifier, static int perl_shutdown (void) { - c_ithread_t *t = NULL; - - int ret = 0; + c_ithread_t *t; + int ret; dTHX; @@ -2076,8 +2111,6 @@ static int perl_shutdown (void) return 0; if (NULL == aTHX) { - t = NULL; - pthread_mutex_lock (&perl_threads->mutex); t = c_ithread_create (perl_threads->head->interp); pthread_mutex_unlock (&perl_threads->mutex); @@ -2101,17 +2134,31 @@ static int perl_shutdown (void) t = perl_threads->tail; while (NULL != t) { + struct timespec ts_wait; c_ithread_t *thr = t; /* the pointer has to be advanced before destroying * the thread as this will free the memory */ t = t->prev; + thr->shutdown = 1; + if (thr->running) { + /* Give some time to thread to exit from Perl interpreter */ + WARNING ("perl shutdown: Thread is running inside Perl. Waiting."); + ts_wait.tv_sec = 0; + ts_wait.tv_nsec = 500000; + nanosleep (&ts_wait, NULL); + } + if (thr->running) { + pthread_kill (thr->pthread, SIGTERM); + ERROR ("perl shutdown: Thread hangs inside Perl. Thread killed."); + } c_ithread_destroy (thr); } pthread_mutex_unlock (&perl_threads->mutex); pthread_mutex_destroy (&perl_threads->mutex); + pthread_mutexattr_destroy (&perl_threads->mutexattr); sfree (perl_threads); @@ -2183,21 +2230,19 @@ static void xs_init (pTHX) SV *tmp = NULL; char *file = __FILE__; - int i = 0; - dXSUB_SYS; /* enable usage of Perl modules using shared libraries */ newXS ("DynaLoader::boot_DynaLoader", boot_DynaLoader, file); /* register API */ - for (i = 0; NULL != api[i].f; ++i) + for (int i = 0; NULL != api[i].f; ++i) newXS (api[i].name, api[i].f, file); stash = gv_stashpv ("Collectd", 1); /* export "constants" */ - for (i = 0; '\0' != constants[i].name[0]; ++i) + for (int i = 0; '\0' != constants[i].name[0]; ++i) newCONSTSUB (stash, constants[i].name, newSViv (constants[i].value)); /* export global variables @@ -2206,7 +2251,7 @@ static void xs_init (pTHX) * accessing any such variable (this is basically the same as using * tie() in Perl) */ /* global strings */ - for (i = 0; '\0' != g_strings[i].name[0]; ++i) { + for (int i = 0; '\0' != g_strings[i].name[0]; ++i) { tmp = get_sv (g_strings[i].name, 1); sv_magicext (tmp, NULL, PERL_MAGIC_ext, &g_pv_vtbl, g_strings[i].var, 0); @@ -2231,9 +2276,7 @@ static int init_pi (int argc, char **argv) log_info ("Initializing Perl interpreter..."); #if COLLECT_DEBUG { - int i = 0; - - for (i = 0; i < argc; ++i) + for (int i = 0; i < argc; ++i) log_debug ("argv[%i] = \"%s\"", i, argv[i]); } #endif /* COLLECT_DEBUG */ @@ -2255,7 +2298,9 @@ static int init_pi (int argc, char **argv) perl_threads = smalloc (sizeof (*perl_threads)); memset (perl_threads, 0, sizeof (c_ithread_list_t)); - pthread_mutex_init (&perl_threads->mutex, NULL); + pthread_mutexattr_init(&perl_threads->mutexattr); + pthread_mutexattr_settype(&perl_threads->mutexattr, PTHREAD_MUTEX_RECURSIVE); + pthread_mutex_init (&perl_threads->mutex, &perl_threads->mutexattr); /* locking the mutex should not be necessary at this point * but let's just do it for the sake of completeness */ pthread_mutex_lock (&perl_threads->mutex); @@ -2336,7 +2381,7 @@ static int perl_config_loadplugin (pTHX_ oconfig_item_t *ci) aTHX = perl_threads->head->interp; - log_debug ("perl_config: loading perl plugin \"%s\"", value); + log_debug ("perl_config: Loading Perl plugin \"%s\"", value); load_module (PERL_LOADMOD_NOIMPORT, newSVpv (module_name, strlen (module_name)), Nullsv); return 0; @@ -2500,11 +2545,10 @@ static int perl_config_plugin (pTHX_ oconfig_item_t *ci) static int perl_config (oconfig_item_t *ci) { int status = 0; - int i = 0; dTHXa (NULL); - for (i = 0; i < ci->children_num; ++i) { + for (int i = 0; i < ci->children_num; ++i) { oconfig_item_t *c = ci->children + i; int current_status = 0;