From: Matthias Runge Date: Fri, 13 Sep 2019 18:44:36 +0000 (+0200) Subject: Merge pull request #3159 from rpv-tomsk/collectd-usercache X-Git-Url: https://git.octo.it/?p=collectd.git;a=commitdiff_plain;h=bf1c2612bc0405c895f754ebfbb24484122c7cfa;hp=-c Merge pull request #3159 from rpv-tomsk/collectd-usercache check_uptime: New plugin, based on new cache_event callback. --- bf1c2612bc0405c895f754ebfbb24484122c7cfa diff --combined Makefile.am index 0a383143,b9a57a8d..5470b9b3 --- a/Makefile.am +++ b/Makefile.am @@@ -159,7 -159,8 +159,7 @@@ check_PROGRAMS = test_utils_time \ test_utils_vl_lookup \ test_libcollectd_network_parse \ - test_utils_config_cores \ - test_utils_proc_pids + test_utils_config_cores TESTS = $(check_PROGRAMS) @@@ -276,7 -277,7 +276,7 @@@ collectd_LDFLAGS += -ldl -Wl,--out-impl else collectd_SOURCES += src/daemon/cmd.c endif - + if BUILD_FEATURE_DAEMON collectd_CPPFLAGS += -DPIDFILE='"${localstatedir}/run/${PACKAGE_NAME}.pid"' endif @@@ -376,6 -377,11 +376,6 @@@ test_utils_config_cores_SOURCES = src/testing.h test_utils_config_cores_LDADD = libplugin_mock.la -test_utils_proc_pids_SOURCES = \ - src/utils/proc_pids/proc_pids_test.c \ - src/testing.h -test_utils_proc_pids_LDADD = libplugin_mock.la - libavltree_la_SOURCES = \ src/utils/avltree/avltree.c \ src/utils/avltree/avltree.h @@@ -768,6 -774,12 +768,12 @@@ chrony_la_LDFLAGS = $(PLUGIN_LDFLAGS chrony_la_LIBADD = -lm endif + if BUILD_PLUGIN_CHECK_UPTIME + pkglib_LTLIBRARIES += check_uptime.la + check_uptime_la_SOURCES = src/check_uptime.c + check_uptime_la_LDFLAGS = $(PLUGIN_LDFLAGS) + endif + if BUILD_PLUGIN_CONNTRACK pkglib_LTLIBRARIES += conntrack.la conntrack_la_SOURCES = src/conntrack.c @@@ -1084,13 -1096,6 +1090,13 @@@ test_plugin_intel_rdt_LDFLAGS = $(PLUGI test_plugin_intel_rdt_LDADD = liboconfig.la libplugin_mock.la check_PROGRAMS += test_plugin_intel_rdt TESTS += test_plugin_intel_rdt + +test_utils_proc_pids_SOURCES = \ + src/utils/proc_pids/proc_pids_test.c \ + src/testing.h +test_utils_proc_pids_LDADD = libplugin_mock.la +check_PROGRAMS += test_utils_proc_pids +TESTS += test_utils_proc_pids endif if BUILD_PLUGIN_INTERFACE @@@ -1391,21 -1396,6 +1397,21 @@@ network_la_CPPFLAGS += $(GCRYPT_CPPFLAG network_la_LDFLAGS += $(GCRYPT_LDFLAGS) network_la_LIBADD += $(GCRYPT_LIBS) endif + +test_plugin_network_SOURCES = \ + src/network_test.c \ + src/utils_fbhash.c \ + src/daemon/configfile.c \ + src/daemon/types_list.c +test_plugin_network_CPPFLAGS = $(AM_CPPFLAGS) $(GCRYPT_CPPFLAGS) +test_plugin_network_LDFLAGS = $(PLUGIN_LDFLAGS) $(GCRYPT_LDFLAGS) +test_plugin_network_LDADD = \ + libavltree.la \ + liboconfig.la \ + libplugin_mock.la \ + libmetadata.la \ + $(GCRYPT_LIBS) +check_PROGRAMS += test_plugin_network endif if BUILD_PLUGIN_NFS @@@ -1646,14 -1636,6 +1652,14 @@@ processes_la_LIBADD += libtaskstats.l endif endif +if BUILD_PLUGIN_PROCEVENT +pkglib_LTLIBRARIES += procevent.la +procevent_la_SOURCES = src/procevent.c +procevent_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS) +procevent_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS) +procevent_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) libignorelist.la +endif + if BUILD_PLUGIN_PROTOCOLS pkglib_LTLIBRARIES += protocols.la protocols_la_SOURCES = src/protocols.c @@@ -1802,14 -1784,6 +1808,14 @@@ synproxy_la_SOURCES = src/synproxy. synproxy_la_LDFLAGS = $(PLUGIN_LDFLAGS) endif +if BUILD_PLUGIN_SYSEVENT +pkglib_LTLIBRARIES += sysevent.la +sysevent_la_SOURCES = src/sysevent.c +sysevent_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS) +sysevent_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS) +sysevent_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) libignorelist.la +endif + if BUILD_PLUGIN_SYSLOG pkglib_LTLIBRARIES += syslog.la syslog_la_SOURCES = src/syslog.c diff --combined configure.ac index 2faa4ee8,18633890..9ff6408f --- a/configure.ac +++ b/configure.ac @@@ -165,7 -165,6 +165,7 @@@ AC_CHECK_HEADERS_ONCE([ kstat.h \ kvm.h \ libgen.h \ + locale.h \ mntent.h \ mnttab.h \ netdb.h \ @@@ -636,6 -635,16 +636,6 @@@ f # }}} -# For the dns plugin -AC_CHECK_HEADERS([arpa/nameser.h]) -AC_CHECK_HEADERS([arpa/nameser_compat.h], [], [], - [[ - #if HAVE_ARPA_NAMESER_H - # include - #endif - ]] -) - AC_CHECK_HEADERS([net/if_arp.h], [], [], [[ #if HAVE_SYS_SOCKET_H @@@ -743,21 -752,33 +743,21 @@@ test_cxx_flags() # AC_CHECK_FUNCS_ONCE([ \ asprintf \ - closelog \ - getaddrinfo \ - getgrnam_r \ - getnameinfo \ getpwnam \ getpwnam_r \ - gettimeofday \ if_indextoname \ - openlog \ - regcomp \ - regerror \ - regexec \ - regfree \ - select \ setenv \ setgroups \ - strcasecmp \ - strdup \ - strncasecmp \ - sysconf + setlocale ] ) AC_FUNC_STRERROR_R -SAVE_CFLAGS="$CFLAGS" -CFLAGS="-Wall -Werror" +if test "x$GCC" = "xyes"; then + SAVE_CFLAGS="$CFLAGS" + CFLAGS="$CFLAGS -Wall -Werror" +fi SAVE_LDFLAGS="$LDFLAGS" LDFLAGS="" if test "x$ac_system" = "xWindows"; then @@@ -3847,10 -3868,7 +3847,10 @@@ if test "x$with_libnetsnmp" = "xyes"; t SAVE_CPPFLAGS="$CPPFLAGS" SAVE_LDFLAGS="$LDFLAGS" SAVE_LIBS="$LIBS" - CPPFLAGS="$CPPFLAGS $with_libnetsnmp_cppflags -Wall -Werror" + CPPFLAGS="$CPPFLAGS $with_libnetsnmp_cppflags" + if test "x$GCC" = "xyes"; then + CPPFLAGS="$CPPFLAGS -Wall -Werror" + fi LDFLAGS="$LDFLAGS $with_libnetsnmp_ldflags" LIBS="$LIBS -lnetsnmp" @@@ -3950,13 -3968,7 +3950,13 @@@ if test "x$with_libnetsnmpagent" = "xye ) AC_CHECK_LIB([netsnmpagent], [init_agent], - [with_libnetsnmpagent="yes"], + [ + # libnetsnmp can be built without mib loading support + AC_CHECK_LIB([netsnmp], [get_tree], + [with_libnetsnmpagent="yes"], + [with_libnetsnmpagent="no (libnetsnmp doesn't support mib loading)"] + ) + ], [with_libnetsnmpagent="no (libnetsnmpagent not found)"], [$libnetsnmphelpers] ) @@@ -3965,8 -3977,6 +3965,8 @@@ fi if test "x$with_libnetsnmpagent" = "xyes"; then + BUILD_WITH_LIBNETSNMPAGENT_CPPFLAGS="$with_libnetsnmpagent_cppflags" + BUILD_WITH_LIBNETSNMPAGENT_LDFLAGS="$with_libnetsnmpagent_ldflags" BUILD_WITH_LIBNETSNMPAGENT_LIBS="-lnetsnmpagent $libnetsnmphelpers" fi @@@ -4321,10 -4331,7 +4321,10 @@@ if test "x$with_libperl" = "xyes"; the # (see issues #41 and #42) SAVE_CFLAGS="$CFLAGS" SAVE_LIBS="$LIBS" - CFLAGS="$CFLAGS $PERL_CFLAGS -Wall -Werror" + CFLAGS="$CFLAGS $PERL_CFLAGS" + if test "x$GCC" = "xyes"; then + CFLAGS="$CFLAGS -Wall -Werror" + fi LIBS="$LIBS $PERL_LIBS" AC_CACHE_CHECK([for broken Perl_load_module()], @@@ -4764,7 -4771,7 +4764,7 @@@ if test "$PYTHON_CONFIG" != ""; the if test $? -ne 0; then with_libpython="no" fi - LIBPYTHON_LIBS="`${PYTHON_CONFIG} --libs`" + LIBPYTHON_LIBS="`${PYTHON_CONFIG} --libs --embed`" || LIBPYTHON_LIBS="`${PYTHON_CONFIG} --libs`" if test $? -ne 0; then with_libpython="no" fi @@@ -5762,7 -5769,7 +5762,7 @@@ if test "x$with_libxmms" = "xyes"; the fi if test "x$with_libxmms" = "xyes"; then - SAVE_CPPFLAGS="$CFLAGS" + SAVE_CPPFLAGS="$CPPFLAGS" CPPFLAGS="$with_xmms_cflags" AC_CHECK_HEADER([xmmsctrl.h], @@@ -6402,14 -6409,12 +6402,14 @@@ plugin_pcie_errors="no plugin_perl="no" plugin_pinba="no" plugin_processes="no" +plugin_procevent="no" plugin_protocols="no" plugin_python="no" plugin_serial="no" plugin_smart="no" plugin_swap="no" plugin_synproxy="no" +plugin_sysevent="no" plugin_tape="no" plugin_tcpconns="no" plugin_ted="no" @@@ -6479,7 -6484,6 +6479,7 @@@ if test "x$ac_system" = "xLinux"; the if test "x$with_libyajl" = "xyes" && test "x$with_libyajl2" = "xyes"; then plugin_ovs_events="yes" plugin_ovs_stats="yes" + plugin_procevent="yes" fi if test "x$have_pci_regs_h" = "xyes"; then @@@ -6593,7 -6597,6 +6593,7 @@@ f if test "x$with_libyajl" = "xyes"; then plugin_ceph="yes" + plugin_sysevent="yes" fi if test "x$have_processor_info" = "xyes"; then @@@ -6790,6 -6793,7 +6790,7 @@@ AC_PLUGIN([bind], [$plug AC_PLUGIN([ceph], [$plugin_ceph], [Ceph daemon statistics]) AC_PLUGIN([cgroups], [$plugin_cgroups], [CGroups CPU usage accounting]) AC_PLUGIN([chrony], [yes], [Chrony statistics]) + AC_PLUGIN([check_uptime], [yes], [Notify about uptime reset]) AC_PLUGIN([conntrack], [$plugin_conntrack], [nf_conntrack statistics]) AC_PLUGIN([contextswitch], [$plugin_contextswitch], [context switch statistics]) AC_PLUGIN([cpu], [$plugin_cpu], [CPU usage statistics]) @@@ -6878,7 -6882,6 +6879,7 @@@ AC_PLUGIN([ping], [$with AC_PLUGIN([postgresql], [$with_libpq], [PostgreSQL database statistics]) AC_PLUGIN([powerdns], [yes], [PowerDNS statistics]) AC_PLUGIN([processes], [$plugin_processes], [Process statistics]) +AC_PLUGIN([procevent], [$plugin_procevent], [Process event (start, stop) statistics]) AC_PLUGIN([protocols], [$plugin_protocols], [Protocol (IP, TCP, ...) statistics]) AC_PLUGIN([python], [$plugin_python], [Embed a Python interpreter]) AC_PLUGIN([redis], [$with_libhiredis], [Redis plugin]) @@@ -6894,7 -6897,6 +6895,7 @@@ AC_PLUGIN([snmp_agent], [$with AC_PLUGIN([statsd], [yes], [StatsD plugin]) AC_PLUGIN([swap], [$plugin_swap], [Swap usage statistics]) AC_PLUGIN([synproxy], [$plugin_synproxy], [Synproxy stats plugin]) +AC_PLUGIN([sysevent], [$plugin_sysevent], [rsyslog events]) AC_PLUGIN([syslog], [$have_syslog], [Syslog logging plugin]) AC_PLUGIN([table], [yes], [Parsing of tabular data]) AC_PLUGIN([tail], [yes], [Parsing of logfiles]) @@@ -7075,13 -7077,11 +7076,13 @@@ AC_SUBST([LCC_VERSION_STRING] AC_CONFIG_FILES([src/libcollectdclient/collectd/lcc_features.h]) -AM_CFLAGS="-Wall" -AM_CXXFLAGS="-Wall" -if test "x$enable_werror" != "xno"; then - AM_CFLAGS="$AM_CFLAGS -Werror" - AM_CXXFLAGS="$AM_CXXFLAGS -Werror" +if test "x$GCC" = "xyes"; then + AM_CFLAGS="-Wall" + AM_CXXFLAGS="-Wall" + if test "x$enable_werror" != "xno"; then + AM_CFLAGS="$AM_CFLAGS -Werror" + AM_CXXFLAGS="$AM_CXXFLAGS -Werror" + fi fi AC_SUBST([AM_CFLAGS]) @@@ -7221,6 -7221,7 +7222,7 @@@ AC_MSG_RESULT([ bind . . . . . . . AC_MSG_RESULT([ ceph . . . . . . . . $enable_ceph]) AC_MSG_RESULT([ cgroups . . . . . . . $enable_cgroups]) AC_MSG_RESULT([ chrony. . . . . . . . $enable_chrony]) + AC_MSG_RESULT([ check_uptime. . . . . $enable_check_uptime]) AC_MSG_RESULT([ conntrack . . . . . . $enable_conntrack]) AC_MSG_RESULT([ contextswitch . . . . $enable_contextswitch]) AC_MSG_RESULT([ cpu . . . . . . . . . $enable_cpu]) @@@ -7308,7 -7309,6 +7310,7 @@@ AC_MSG_RESULT([ ping . . . . . . . AC_MSG_RESULT([ postgresql . . . . . $enable_postgresql]) AC_MSG_RESULT([ powerdns . . . . . . $enable_powerdns]) AC_MSG_RESULT([ processes . . . . . . $enable_processes]) +AC_MSG_RESULT([ procevent . . . . . . $enable_procevent]) AC_MSG_RESULT([ protocols . . . . . . $enable_protocols]) AC_MSG_RESULT([ python . . . . . . . $enable_python]) AC_MSG_RESULT([ redis . . . . . . . . $enable_redis]) @@@ -7324,7 -7324,6 +7326,7 @@@ AC_MSG_RESULT([ snmp_agent . . . . AC_MSG_RESULT([ statsd . . . . . . . $enable_statsd]) AC_MSG_RESULT([ swap . . . . . . . . $enable_swap]) AC_MSG_RESULT([ synproxy . . . . . . $enable_synproxy]) +AC_MSG_RESULT([ sysevent. . . . . . . $enable_sysevent]) AC_MSG_RESULT([ syslog . . . . . . . $enable_syslog]) AC_MSG_RESULT([ table . . . . . . . . $enable_table]) AC_MSG_RESULT([ tail_csv . . . . . . $enable_tail_csv]) diff --combined src/collectd.conf.in index ec77c2fa,40111553..f8c48080 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@@ -102,6 -102,7 +102,7 @@@ #@BUILD_PLUGIN_CEPH_TRUE@LoadPlugin ceph #@BUILD_PLUGIN_CGROUPS_TRUE@LoadPlugin cgroups #@BUILD_PLUGIN_CHRONY_TRUE@LoadPlugin chrony + #@BUILD_PLUGIN_CHECK_UPTIME_TRUE@LoadPlugin check_uptime #@BUILD_PLUGIN_CONNTRACK_TRUE@LoadPlugin conntrack #@BUILD_PLUGIN_CONTEXTSWITCH_TRUE@LoadPlugin contextswitch @BUILD_PLUGIN_CPU_TRUE@@BUILD_PLUGIN_CPU_TRUE@LoadPlugin cpu @@@ -180,7 -181,6 +181,7 @@@ #@BUILD_PLUGIN_POSTGRESQL_TRUE@LoadPlugin postgresql #@BUILD_PLUGIN_POWERDNS_TRUE@LoadPlugin powerdns #@BUILD_PLUGIN_PROCESSES_TRUE@LoadPlugin processes +#@BUILD_PLUGIN_PROCEVENT_TRUE@LoadPlugin procevent #@BUILD_PLUGIN_PROTOCOLS_TRUE@LoadPlugin protocols #@BUILD_PLUGIN_PYTHON_TRUE@LoadPlugin python #@BUILD_PLUGIN_REDIS_TRUE@LoadPlugin redis @@@ -195,7 -195,6 +196,7 @@@ #@BUILD_PLUGIN_SNMP_AGENT_TRUE@LoadPlugin snmp_agent #@BUILD_PLUGIN_STATSD_TRUE@LoadPlugin statsd #@BUILD_PLUGIN_SWAP_TRUE@LoadPlugin swap +#@BUILD_PLUGIN_SYSEVENT_TRUE@LoadPlugin sysevent #@BUILD_PLUGIN_TABLE_TRUE@LoadPlugin table #@BUILD_PLUGIN_TAIL_TRUE@LoadPlugin tail #@BUILD_PLUGIN_TAIL_CSV_TRUE@LoadPlugin tail_csv @@@ -1272,12 -1271,6 +1273,12 @@@ # # +# +# BufferLength 10 +# ProcessRegex "/^ovs.*$/" +# Process tuned +# + # # Value "/^Tcp:/" # IgnoreSelected false @@@ -1473,13 -1466,6 +1474,13 @@@ # ReportIO true # +# +# Listen "127.0.0.1" "6666" +# BufferSize 1024 +# BufferLength 10 +# RegexFilter "regex" +# + # # # #Plugin "table" @@@ -1699,7 -1685,7 +1700,7 @@@ # InterfaceFormat name # PluginInstanceFormat name # Instances 1 -# ExtraStats "cpu_util disk disk_err domain_state fs_info job_stats_background pcpu perf vcpupin disk_physical disk_allocation disk_capacity" +# ExtraStats "cpu_util disk disk_err domain_state fs_info job_stats_background pcpu perf vcpu vcpupin disk_physical disk_allocation disk_capacity memory" # PersistentNotification false # diff --combined src/collectd.conf.pod index c6b6ab4a,1190895c..8dff63c1 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@@ -1548,6 -1548,35 +1548,35 @@@ at all, B cgroups are selected =back + =head2 Plugin C + + The I designed to check and notify about host or service + status based on I metric. + + When new metric of I type appears in cache, OK notification is sent. + When new value for metric is less than previous value, WARNING notification is + sent about host/service restart. + When no new updates comes for metric and cache entry expires, then FAILURE + notification is sent about unreachable host or service. + + By default (when no explicit configuration), plugin checks for I metric. + + B + + + Type "uptime" + Type "my_uptime_type" + + + =over 4 + + =item B I + + Metric type to check for status/values. The type should consist single GAUGE + data source. + + =back + =head2 Plugin C The C plugin collects ntp data from a B server, such as clock @@@ -7314,40 -7343,6 +7343,40 @@@ reporting the corresponding processes o B blocks these options set the default value for subsequent matches. +=head2 Plugin C + +The I plugin monitors when processes start (EXEC) and stop (EXIT). + +B + + + BufferLength 10 + Process "name" + ProcessRegex "regex" + + +B + +=over 4 + +=item B I + +Maximum number of process events that can be stored in plugin's ring buffer. +By default, this is set to 10. Once an event has been read, its location +becomes available for storing a new event. + +=item B I + +Enumerate a process name to monitor. All processes that match this exact +name will be monitored for EXECs and EXITs. + +=item B I + +Enumerate a process pattern to monitor. All processes that match this +regular expression will be monitored for EXECs and EXITs. + +=back + =head2 Plugin C Collects a lot of information about various network protocols, such as I, @@@ -8281,70 -8276,6 +8310,70 @@@ or is not reliable =back +=head2 Plugin C + +The I plugin monitors rsyslog messages. + +B + + + Listen "192.168.0.2" "6666" + BufferSize 1024 + BufferLength 10 + RegexFilter "regex" + + + rsyslog should be configured such that it sends data to the IP and port you + include in the plugin configuration. For example, given the configuration + above, something like this would be set in /etc/rsyslog.conf: + + if $programname != 'collectd' then + *.* @192.168.0.2:6666 + + This plugin is designed to consume JSON rsyslog data, so a more complete + rsyslog configuration would look like so (where we define a JSON template + and use it when sending data to our IP and port): + + $template ls_json,"{%timestamp:::date-rfc3339,jsonf:@timestamp%, \ + %source:::jsonf:@source_host%,\"@source\":\"syslog://%fromhost-ip:::json%\", \ + \"@message\":\"%timestamp% %app-name%:%msg:::json%\",\"@fields\": \ + {%syslogfacility-text:::jsonf:facility%,%syslogseverity:::jsonf:severity-num%, \ + %syslogseverity-text:::jsonf:severity%,%programname:::jsonf:program%, \ + %procid:::jsonf:processid%}}" + + if $programname != 'collectd' then + *.* @192.168.0.2:6666;ls_json + + Please note that these rsyslog.conf examples are *not* complete, as rsyslog + requires more than these options in the configuration file. These examples + are meant to demonstration the proper remote logging and JSON format syntax. + +B + +=over 4 + +=item B I I + +Listen on this IP on this port for incoming rsyslog messages. + +=item B I + +Maximum allowed size for incoming rsyslog messages. Messages that exceed +this number will be truncated to this size. Default is 4096 bytes. + +=item B I + +Maximum number of rsyslog events that can be stored in plugin's ring buffer. +By default, this is set to 10. Once an event has been read, its location +becomes available for storing a new event. + +=item B I + +Enumerate a regex filter to apply to all incoming rsyslog messages. If a +message matches this filter, it will be published. + +=back + =head2 Plugin C =over 4 @@@ -9618,9 -9549,6 +9647,9 @@@ Requires libvirt API version I<1.2.9> o a domain. Only one type of job statistics can be collected at the same time. Requires libvirt API version I<1.2.9> or later. +=item B: report statistics about memory usage details, provided +by libvirt virDomainMemoryStats() function. + =item B: report the physical user/system cpu time consumed by the hypervisor, per-vm. Requires libvirt API version I<0.9.11> or later. @@@ -9629,8 -9557,6 +9658,8 @@@ metrics they must be enabled for domai libvirt API version I<1.3.3> or later. B: I metrics can't be collected if I plugin is enabled. +=item B: report domain virtual CPUs utilisation. + =item B: report pinning of domain VCPUs to host physical CPUs. =item B: report 'disk_physical' statistic for disk device. diff --combined src/daemon/plugin.c index daddf684,10a20648..52cb0a4b --- a/src/daemon/plugin.c +++ b/src/daemon/plugin.c @@@ -85,6 -85,14 +85,14 @@@ struct read_func_s }; typedef struct read_func_s read_func_t; + struct cache_event_func_s { + plugin_cache_event_cb callback; + char *name; + user_data_t user_data; + plugin_ctx_t plugin_ctx; + }; + typedef struct cache_event_func_s cache_event_func_t; + struct write_queue_s; typedef struct write_queue_s write_queue_t; struct write_queue_s { @@@ -112,6 -120,9 +120,9 @@@ static llist_t *list_shutdown static llist_t *list_log; static llist_t *list_notification; + static size_t list_cache_event_num; + static cache_event_func_t list_cache_event[32]; + static fc_chain_t *pre_cache_chain; static fc_chain_t *post_cache_chain; @@@ -263,8 -274,6 +274,6 @@@ static void destroy_read_heap(void) /* static int register_callback(llist_t **list, /* {{{ */ const char *name, callback_func_t *cf) { - llentry_t *le; - char *key; if (*list == NULL) { *list = llist_create(); @@@ -276,14 -285,14 +285,14 @@@ } } - key = strdup(name); + char *key = strdup(name); if (key == NULL) { ERROR("plugin: register_callback: strdup failed."); destroy_callback(cf); return -1; } - le = llist_search(*list, name); + llentry_t *le = llist_search(*list, name); if (le == NULL) { le = llentry_create(key, cf); if (le == NULL) { @@@ -296,9 -305,7 +305,7 @@@ llist_append(*list, le); } else { - callback_func_t *old_cf; - - old_cf = le->value; + callback_func_t *old_cf = le->value; le->value = cf; P_WARNING("register_callback: " @@@ -366,8 -373,7 +373,8 @@@ static int create_register_callback(lli cf->cf_callback = callback; if (ud == NULL) { cf->cf_udata = (user_data_t){ - .data = NULL, .free_func = NULL, + .data = NULL, + .free_func = NULL, }; } else { cf->cf_udata = *ud; @@@ -647,8 -653,7 +654,8 @@@ static void start_read_threads(size_t n } char name[THREAD_NAME_MAX]; - snprintf(name, sizeof(name), "reader#%" PRIu64, (uint64_t)read_threads_num); + ssnprintf(name, sizeof(name), "reader#%" PRIu64, + (uint64_t)read_threads_num); set_thread_name(read_threads[read_threads_num], name); read_threads_num++; @@@ -837,8 -842,8 +844,8 @@@ static void start_write_threads(size_t } char name[THREAD_NAME_MAX]; - snprintf(name, sizeof(name), "writer#%" PRIu64, - (uint64_t)write_threads_num); + ssnprintf(name, sizeof(name), "writer#%" PRIu64, + (uint64_t)write_threads_num); set_thread_name(write_threads[write_threads_num], name); write_threads_num++; @@@ -907,13 -912,15 +914,13 @@@ void plugin_set_dir(const char *dir) ERROR("plugin_set_dir: strdup(\"%s\") failed", dir); } -static bool plugin_is_loaded(char const *name) { - int status; - +bool plugin_is_loaded(char const *name) { if (plugins_loaded == NULL) plugins_loaded = c_avl_create((int (*)(const void *, const void *))strcasecmp); assert(plugins_loaded != NULL); - status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL); + int status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL); return status == 0; } @@@ -1294,10 -1301,8 +1301,10 @@@ EXPORT int plugin_register_flush(const /* name = */ flush_name, /* callback = */ plugin_flush_timeout_callback, /* interval = */ ctx.flush_interval, - /* user data = */ &(user_data_t){ - .data = cb, .free_func = plugin_flush_timeout_callback_free, + /* user data = */ + &(user_data_t){ + .data = cb, + .free_func = plugin_flush_timeout_callback_free, }); sfree(flush_name); @@@ -1312,6 -1317,60 +1319,60 @@@ EXPORT int plugin_register_missing(cons return create_register_callback(&list_missing, name, (void *)callback, ud); } /* int plugin_register_missing */ + EXPORT int plugin_register_cache_event(const char *name, + plugin_cache_event_cb callback, + user_data_t const *ud) { + + if (name == NULL || callback == NULL) + return EINVAL; + + char *name_copy = strdup(name); + if (name_copy == NULL) { + P_ERROR("plugin_register_cache_event: strdup failed."); + free_userdata(ud); + return ENOMEM; + } + + if (list_cache_event_num >= 32) { + P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried " + "to be registered."); + free_userdata(ud); + return ENOMEM; + } + + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + + if (strcmp(name, cef->name) == 0) { + P_ERROR("plugin_register_cache_event: a callback named `%s' already " + "registered!", + name); + free_userdata(ud); + return -1; + } + } + + user_data_t user_data; + if (ud == NULL) { + user_data = (user_data_t){ + .data = NULL, .free_func = NULL, + }; + } else { + user_data = *ud; + } + + list_cache_event[list_cache_event_num] = + (cache_event_func_t){.callback = callback, + .name = name_copy, + .user_data = user_data, + .plugin_ctx = plugin_get_ctx()}; + list_cache_event_num++; + + return 0; + } /* int plugin_register_cache_event */ + EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) { return create_register_callback(&list_shutdown, name, (void *)callback, NULL); } /* int plugin_register_shutdown */ @@@ -1513,6 -1572,32 +1574,32 @@@ EXPORT int plugin_unregister_missing(co return plugin_unregister(list_missing, name); } + EXPORT int plugin_unregister_cache_event(const char *name) { + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + if (strcmp(name, cef->name) == 0) { + /* Mark callback as inactive, so mask in cache entries remains actual */ + cef->callback = NULL; + sfree(cef->name); + free_userdata(&cef->user_data); + } + } + return 0; + } + + static void destroy_cache_event_callbacks() { + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + if (!cef->callback) + continue; + cef->callback = NULL; + sfree(cef->name); + free_userdata(&cef->user_data); + } + } + EXPORT int plugin_unregister_shutdown(const char *name) { return plugin_unregister(list_shutdown, name); } @@@ -1857,6 -1942,7 +1944,7 @@@ EXPORT int plugin_shutdown_all(void) * the data isn't freed twice. */ destroy_all_callbacks(&list_flush); destroy_all_callbacks(&list_missing); + destroy_cache_event_callbacks(); destroy_all_callbacks(&list_write); destroy_all_callbacks(&list_notification); @@@ -1897,6 -1983,82 +1985,82 @@@ EXPORT int plugin_dispatch_missing(cons return 0; } /* int }}} plugin_dispatch_missing */ + void plugin_dispatch_cache_event(enum cache_event_type_e event_type, + unsigned long callbacks_mask, const char *name, + const value_list_t *vl) { + switch (event_type) { + case CE_VALUE_NEW: + callbacks_mask = 0; + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + plugin_cache_event_cb callback = cef->callback; + + if (!callback) + continue; + + cache_event_t event = (cache_event_t){.type = event_type, + .value_list = vl, + .value_list_name = name, + .ret = 0}; + + plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx); + int status = (*callback)(&event, &cef->user_data); + plugin_set_ctx(old_ctx); + + if (status != 0) { + ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status " + "%i for event NEW.", + cef->name, status); + } else { + if (event.ret) { + DEBUG( + "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.", + cef->name, name); + callbacks_mask |= (1 << (i)); + } else { + DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.", + cef->name, name); + } + } + } + + if (callbacks_mask) + uc_set_callbacks_mask(name, callbacks_mask); + + break; + case CE_VALUE_UPDATE: + case CE_VALUE_EXPIRED: + for (size_t i = 0; i < list_cache_event_num; i++) { + cache_event_func_t *cef = &list_cache_event[i]; + plugin_cache_event_cb callback = cef->callback; + + if (!callback) + continue; + + if (callbacks_mask && (1 << (i)) == 0) + continue; + + cache_event_t event = (cache_event_t){.type = event_type, + .value_list = vl, + .value_list_name = name, + .ret = 0}; + + plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx); + int status = (*callback)(&event, &cef->user_data); + plugin_set_ctx(old_ctx); + + if (status != 0) { + ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status " + "%i for event %s.", + cef->name, status, + ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED")); + } + } + break; + } + return; + } + static int plugin_dispatch_values_internal(value_list_t *vl) { int status; static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC; diff --combined src/daemon/plugin.h index 3c301586,c3534e85..af3693dd --- a/src/daemon/plugin.h +++ b/src/daemon/plugin.h @@@ -171,6 -171,15 +171,15 @@@ struct user_data_s }; typedef struct user_data_s user_data_t; + enum cache_event_type_e { CE_VALUE_NEW, CE_VALUE_UPDATE, CE_VALUE_EXPIRED }; + + typedef struct cache_event_s { + enum cache_event_type_e type; + const value_list_t *value_list; + const char *value_list_name; + int ret; + } cache_event_t; + struct plugin_ctx_s { char *name; cdtime_t interval; @@@ -192,6 -201,11 +201,11 @@@ typedef int (*plugin_flush_cb)(cdtime_ * callbacks should be called, greater than zero if no more callbacks should be * called. */ typedef int (*plugin_missing_cb)(const value_list_t *, user_data_t *); + /* "cache event" callback. CE_VALUE_NEW events are sent to all registered + * callbacks. Callback should check if it interested in further CE_VALUE_UPDATE + * and CE_VALUE_EXPIRED events for metric and set event->ret = 1 if so. + */ + typedef int (*plugin_cache_event_cb)(cache_event_t *, user_data_t *); typedef void (*plugin_log_cb)(int severity, const char *message, user_data_t *); typedef int (*plugin_shutdown_cb)(void); typedef int (*plugin_notification_cb)(const notification_t *, user_data_t *); @@@ -233,7 -247,6 +247,7 @@@ void plugin_set_dir(const char *dir) * this case. */ int plugin_load(const char *name, bool global); +bool plugin_is_loaded(char const *name); int plugin_init_all(void); void plugin_read_all(void); @@@ -295,6 -308,9 +309,9 @@@ int plugin_register_flush(const char *n user_data_t const *user_data); int plugin_register_missing(const char *name, plugin_missing_cb callback, user_data_t const *user_data); + int plugin_register_cache_event(const char *name, + plugin_cache_event_cb callback, + user_data_t const *ud); int plugin_register_shutdown(const char *name, plugin_shutdown_cb callback); int plugin_register_data_set(const data_set_t *ds); int plugin_register_log(const char *name, plugin_log_cb callback, @@@ -311,6 -327,7 +328,7 @@@ int plugin_unregister_read_group(const int plugin_unregister_write(const char *name); int plugin_unregister_flush(const char *name); int plugin_unregister_missing(const char *name); + int plugin_unregister_cache_event(const char *name); int plugin_unregister_shutdown(const char *name); int plugin_unregister_data_set(const char *name); int plugin_unregister_log(const char *name); @@@ -381,6 -398,9 +399,9 @@@ __attribute__((sentinel)) int plugin_di int store_type, ...); int plugin_dispatch_missing(const value_list_t *vl); + void plugin_dispatch_cache_event(enum cache_event_type_e event_type, + unsigned long callbacks_mask, const char *name, + const value_list_t *vl); int plugin_dispatch_notification(const notification_t *notif); diff --combined src/daemon/utils_cache.c index 351c3a0f,722fa2da..672b01f1 --- a/src/daemon/utils_cache.c +++ b/src/daemon/utils_cache.c @@@ -67,6 -67,7 +67,7 @@@ typedef struct cache_entry_s size_t history_length; meta_data_t *meta; + unsigned long callbacks_mask; } cache_entry_t; struct uc_iter_s { @@@ -140,18 -141,15 +141,15 @@@ static void uc_check_range(const data_s static int uc_insert(const data_set_t *ds, const value_list_t *vl, const char *key) { - char *key_copy; - cache_entry_t *ce; - /* `cache_lock' has been locked by `uc_update' */ - key_copy = strdup(key); + char *key_copy = strdup(key); if (key_copy == NULL) { ERROR("uc_insert: strdup failed."); return -1; } - ce = cache_alloc(ds->ds_num); + cache_entry_t *ce = cache_alloc(ds->ds_num); if (ce == NULL) { sfree(key_copy); ERROR("uc_insert: cache_alloc (%" PRIsz ") failed.", ds->ds_num); @@@ -203,10 -201,6 +201,10 @@@ ce->interval = vl->interval; ce->state = STATE_UNKNOWN; + if (vl->meta != NULL) { + ce->meta = meta_data_clone(vl->meta); + } + if (c_avl_insert(cache_tree, key_copy, ce) != 0) { sfree(key_copy); ERROR("uc_insert: c_avl_insert failed."); @@@ -230,6 -224,7 +228,7 @@@ int uc_check_timeout(void) char *key; cdtime_t time; cdtime_t interval; + unsigned long callbacks_mask; } *expired = NULL; size_t expired_num = 0; @@@ -255,6 -250,7 +254,7 @@@ expired[expired_num].key = strdup(key); expired[expired_num].time = ce->last_time; expired[expired_num].interval = ce->interval; + expired[expired_num].callbacks_mask = ce->callbacks_mask; if (expired[expired_num].key == NULL) { ERROR("uc_check_timeout: strdup failed."); @@@ -279,8 -275,7 +279,8 @@@ * plugin calls the cache interface. */ for (size_t i = 0; i < expired_num; i++) { value_list_t vl = { - .time = expired[i].time, .interval = expired[i].interval, + .time = expired[i].time, + .interval = expired[i].interval, }; if (parse_identifier_vl(expired[i].key, &vl) != 0) { @@@ -290,6 -285,10 +290,10 @@@ } plugin_dispatch_missing(&vl); + + if (expired[i].callbacks_mask) + plugin_dispatch_cache_event(CE_VALUE_EXPIRED, expired[i].callbacks_mask, + expired[i].key, &vl); } /* for (i = 0; i < expired_num; i++) */ /* Now actually remove all the values from the cache. We don't re-evaluate @@@ -319,8 -318,6 +323,6 @@@ int uc_update(const data_set_t *ds, const value_list_t *vl) { char name[6 * DATA_MAX_NAME_LEN]; - cache_entry_t *ce = NULL; - int status; if (FORMAT_VL(name, sizeof(name), vl) != 0) { ERROR("uc_update: FORMAT_VL failed."); @@@ -329,11 -326,16 +331,16 @@@ pthread_mutex_lock(&cache_lock); - status = c_avl_get(cache_tree, name, (void *)&ce); + cache_entry_t *ce = NULL; + int status = c_avl_get(cache_tree, name, (void *)&ce); if (status != 0) /* entry does not yet exist */ { status = uc_insert(ds, vl, name); pthread_mutex_unlock(&cache_lock); + + if (status == 0) + plugin_dispatch_cache_event(CE_VALUE_NEW, 0 /* mask */, name, vl); + return status; } @@@ -408,11 -410,32 +415,32 @@@ ce->last_update = cdtime(); ce->interval = vl->interval; + /* Check if cache entry has registered callbacks */ + unsigned long callbacks_mask = ce->callbacks_mask; + pthread_mutex_unlock(&cache_lock); + if (callbacks_mask) + plugin_dispatch_cache_event(CE_VALUE_UPDATE, callbacks_mask, name, vl); + return 0; } /* int uc_update */ + int uc_set_callbacks_mask(const char *name, unsigned long mask) { + pthread_mutex_lock(&cache_lock); + cache_entry_t *ce = NULL; + int status = c_avl_get(cache_tree, name, (void *)&ce); + if (status != 0) { /* Ouch, just created entry disappeared ?! */ + ERROR("uc_set_callbacks_mask: Couldn't find %s entry!", name); + pthread_mutex_unlock(&cache_lock); + return -1; + } + DEBUG("uc_set_callbacks_mask: set mask for \"%s\" to %lu.", name, mask); + ce->callbacks_mask = mask; + pthread_mutex_unlock(&cache_lock); + return 0; + } + int uc_get_rate_by_name(const char *name, gauge_t **ret_values, size_t *ret_values_num) { gauge_t *ret = NULL; @@@ -894,12 -917,13 +922,12 @@@ int uc_iterator_get_values(uc_iter_t *i if ((iter == NULL) || (iter->entry == NULL) || (ret_values == NULL) || (ret_num == NULL)) return -1; - *ret_values = calloc(iter->entry->values_num, sizeof(*iter->entry->values_raw)); if (*ret_values == NULL) return -1; for (size_t i = 0; i < iter->entry->values_num; ++i) - *ret_values[i] = iter->entry->values_raw[i]; + (*ret_values)[i] = iter->entry->values_raw[i]; *ret_num = iter->entry->values_num;