check_uptime: New plugin, based on new cache_event callback.
test_utils_time \
test_utils_vl_lookup \
test_libcollectd_network_parse \
- test_utils_config_cores \
- test_utils_proc_pids
+ test_utils_config_cores
TESTS = $(check_PROGRAMS)
else
collectd_SOURCES += src/daemon/cmd.c
endif
-
+
if BUILD_FEATURE_DAEMON
collectd_CPPFLAGS += -DPIDFILE='"${localstatedir}/run/${PACKAGE_NAME}.pid"'
endif
src/testing.h
test_utils_config_cores_LDADD = libplugin_mock.la
-test_utils_proc_pids_SOURCES = \
- src/utils/proc_pids/proc_pids_test.c \
- src/testing.h
-test_utils_proc_pids_LDADD = libplugin_mock.la
-
libavltree_la_SOURCES = \
src/utils/avltree/avltree.c \
src/utils/avltree/avltree.h
chrony_la_LIBADD = -lm
endif
+ if BUILD_PLUGIN_CHECK_UPTIME
+ pkglib_LTLIBRARIES += check_uptime.la
+ check_uptime_la_SOURCES = src/check_uptime.c
+ check_uptime_la_LDFLAGS = $(PLUGIN_LDFLAGS)
+ endif
+
if BUILD_PLUGIN_CONNTRACK
pkglib_LTLIBRARIES += conntrack.la
conntrack_la_SOURCES = src/conntrack.c
test_plugin_intel_rdt_LDADD = liboconfig.la libplugin_mock.la
check_PROGRAMS += test_plugin_intel_rdt
TESTS += test_plugin_intel_rdt
+
+test_utils_proc_pids_SOURCES = \
+ src/utils/proc_pids/proc_pids_test.c \
+ src/testing.h
+test_utils_proc_pids_LDADD = libplugin_mock.la
+check_PROGRAMS += test_utils_proc_pids
+TESTS += test_utils_proc_pids
endif
if BUILD_PLUGIN_INTERFACE
network_la_LDFLAGS += $(GCRYPT_LDFLAGS)
network_la_LIBADD += $(GCRYPT_LIBS)
endif
+
+test_plugin_network_SOURCES = \
+ src/network_test.c \
+ src/utils_fbhash.c \
+ src/daemon/configfile.c \
+ src/daemon/types_list.c
+test_plugin_network_CPPFLAGS = $(AM_CPPFLAGS) $(GCRYPT_CPPFLAGS)
+test_plugin_network_LDFLAGS = $(PLUGIN_LDFLAGS) $(GCRYPT_LDFLAGS)
+test_plugin_network_LDADD = \
+ libavltree.la \
+ liboconfig.la \
+ libplugin_mock.la \
+ libmetadata.la \
+ $(GCRYPT_LIBS)
+check_PROGRAMS += test_plugin_network
endif
if BUILD_PLUGIN_NFS
endif
endif
+if BUILD_PLUGIN_PROCEVENT
+pkglib_LTLIBRARIES += procevent.la
+procevent_la_SOURCES = src/procevent.c
+procevent_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS)
+procevent_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS)
+procevent_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) libignorelist.la
+endif
+
if BUILD_PLUGIN_PROTOCOLS
pkglib_LTLIBRARIES += protocols.la
protocols_la_SOURCES = src/protocols.c
synproxy_la_LDFLAGS = $(PLUGIN_LDFLAGS)
endif
+if BUILD_PLUGIN_SYSEVENT
+pkglib_LTLIBRARIES += sysevent.la
+sysevent_la_SOURCES = src/sysevent.c
+sysevent_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS)
+sysevent_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS)
+sysevent_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) libignorelist.la
+endif
+
if BUILD_PLUGIN_SYSLOG
pkglib_LTLIBRARIES += syslog.la
syslog_la_SOURCES = src/syslog.c
kstat.h \
kvm.h \
libgen.h \
+ locale.h \
mntent.h \
mnttab.h \
netdb.h \
# }}}
-# For the dns plugin
-AC_CHECK_HEADERS([arpa/nameser.h])
-AC_CHECK_HEADERS([arpa/nameser_compat.h], [], [],
- [[
- #if HAVE_ARPA_NAMESER_H
- # include <arpa/nameser.h>
- #endif
- ]]
-)
-
AC_CHECK_HEADERS([net/if_arp.h], [], [],
[[
#if HAVE_SYS_SOCKET_H
#
AC_CHECK_FUNCS_ONCE([ \
asprintf \
- closelog \
- getaddrinfo \
- getgrnam_r \
- getnameinfo \
getpwnam \
getpwnam_r \
- gettimeofday \
if_indextoname \
- openlog \
- regcomp \
- regerror \
- regexec \
- regfree \
- select \
setenv \
setgroups \
- strcasecmp \
- strdup \
- strncasecmp \
- sysconf
+ setlocale
]
)
AC_FUNC_STRERROR_R
-SAVE_CFLAGS="$CFLAGS"
-CFLAGS="-Wall -Werror"
+if test "x$GCC" = "xyes"; then
+ SAVE_CFLAGS="$CFLAGS"
+ CFLAGS="$CFLAGS -Wall -Werror"
+fi
SAVE_LDFLAGS="$LDFLAGS"
LDFLAGS=""
if test "x$ac_system" = "xWindows"; then
SAVE_CPPFLAGS="$CPPFLAGS"
SAVE_LDFLAGS="$LDFLAGS"
SAVE_LIBS="$LIBS"
- CPPFLAGS="$CPPFLAGS $with_libnetsnmp_cppflags -Wall -Werror"
+ CPPFLAGS="$CPPFLAGS $with_libnetsnmp_cppflags"
+ if test "x$GCC" = "xyes"; then
+ CPPFLAGS="$CPPFLAGS -Wall -Werror"
+ fi
LDFLAGS="$LDFLAGS $with_libnetsnmp_ldflags"
LIBS="$LIBS -lnetsnmp"
)
AC_CHECK_LIB([netsnmpagent], [init_agent],
- [with_libnetsnmpagent="yes"],
+ [
+ # libnetsnmp can be built without mib loading support
+ AC_CHECK_LIB([netsnmp], [get_tree],
+ [with_libnetsnmpagent="yes"],
+ [with_libnetsnmpagent="no (libnetsnmp doesn't support mib loading)"]
+ )
+ ],
[with_libnetsnmpagent="no (libnetsnmpagent not found)"],
[$libnetsnmphelpers]
)
fi
if test "x$with_libnetsnmpagent" = "xyes"; then
+ BUILD_WITH_LIBNETSNMPAGENT_CPPFLAGS="$with_libnetsnmpagent_cppflags"
+ BUILD_WITH_LIBNETSNMPAGENT_LDFLAGS="$with_libnetsnmpagent_ldflags"
BUILD_WITH_LIBNETSNMPAGENT_LIBS="-lnetsnmpagent $libnetsnmphelpers"
fi
# (see issues #41 and #42)
SAVE_CFLAGS="$CFLAGS"
SAVE_LIBS="$LIBS"
- CFLAGS="$CFLAGS $PERL_CFLAGS -Wall -Werror"
+ CFLAGS="$CFLAGS $PERL_CFLAGS"
+ if test "x$GCC" = "xyes"; then
+ CFLAGS="$CFLAGS -Wall -Werror"
+ fi
LIBS="$LIBS $PERL_LIBS"
AC_CACHE_CHECK([for broken Perl_load_module()],
if test $? -ne 0; then
with_libpython="no"
fi
- LIBPYTHON_LIBS="`${PYTHON_CONFIG} --libs`"
+ LIBPYTHON_LIBS="`${PYTHON_CONFIG} --libs --embed`" || LIBPYTHON_LIBS="`${PYTHON_CONFIG} --libs`"
if test $? -ne 0; then
with_libpython="no"
fi
fi
if test "x$with_libxmms" = "xyes"; then
- SAVE_CPPFLAGS="$CFLAGS"
+ SAVE_CPPFLAGS="$CPPFLAGS"
CPPFLAGS="$with_xmms_cflags"
AC_CHECK_HEADER([xmmsctrl.h],
plugin_perl="no"
plugin_pinba="no"
plugin_processes="no"
+plugin_procevent="no"
plugin_protocols="no"
plugin_python="no"
plugin_serial="no"
plugin_smart="no"
plugin_swap="no"
plugin_synproxy="no"
+plugin_sysevent="no"
plugin_tape="no"
plugin_tcpconns="no"
plugin_ted="no"
if test "x$with_libyajl" = "xyes" && test "x$with_libyajl2" = "xyes"; then
plugin_ovs_events="yes"
plugin_ovs_stats="yes"
+ plugin_procevent="yes"
fi
if test "x$have_pci_regs_h" = "xyes"; then
if test "x$with_libyajl" = "xyes"; then
plugin_ceph="yes"
+ plugin_sysevent="yes"
fi
if test "x$have_processor_info" = "xyes"; then
AC_PLUGIN([ceph], [$plugin_ceph], [Ceph daemon statistics])
AC_PLUGIN([cgroups], [$plugin_cgroups], [CGroups CPU usage accounting])
AC_PLUGIN([chrony], [yes], [Chrony statistics])
+ AC_PLUGIN([check_uptime], [yes], [Notify about uptime reset])
AC_PLUGIN([conntrack], [$plugin_conntrack], [nf_conntrack statistics])
AC_PLUGIN([contextswitch], [$plugin_contextswitch], [context switch statistics])
AC_PLUGIN([cpu], [$plugin_cpu], [CPU usage statistics])
AC_PLUGIN([postgresql], [$with_libpq], [PostgreSQL database statistics])
AC_PLUGIN([powerdns], [yes], [PowerDNS statistics])
AC_PLUGIN([processes], [$plugin_processes], [Process statistics])
+AC_PLUGIN([procevent], [$plugin_procevent], [Process event (start, stop) statistics])
AC_PLUGIN([protocols], [$plugin_protocols], [Protocol (IP, TCP, ...) statistics])
AC_PLUGIN([python], [$plugin_python], [Embed a Python interpreter])
AC_PLUGIN([redis], [$with_libhiredis], [Redis plugin])
AC_PLUGIN([statsd], [yes], [StatsD plugin])
AC_PLUGIN([swap], [$plugin_swap], [Swap usage statistics])
AC_PLUGIN([synproxy], [$plugin_synproxy], [Synproxy stats plugin])
+AC_PLUGIN([sysevent], [$plugin_sysevent], [rsyslog events])
AC_PLUGIN([syslog], [$have_syslog], [Syslog logging plugin])
AC_PLUGIN([table], [yes], [Parsing of tabular data])
AC_PLUGIN([tail], [yes], [Parsing of logfiles])
AC_CONFIG_FILES([src/libcollectdclient/collectd/lcc_features.h])
-AM_CFLAGS="-Wall"
-AM_CXXFLAGS="-Wall"
-if test "x$enable_werror" != "xno"; then
- AM_CFLAGS="$AM_CFLAGS -Werror"
- AM_CXXFLAGS="$AM_CXXFLAGS -Werror"
+if test "x$GCC" = "xyes"; then
+ AM_CFLAGS="-Wall"
+ AM_CXXFLAGS="-Wall"
+ if test "x$enable_werror" != "xno"; then
+ AM_CFLAGS="$AM_CFLAGS -Werror"
+ AM_CXXFLAGS="$AM_CXXFLAGS -Werror"
+ fi
fi
AC_SUBST([AM_CFLAGS])
AC_MSG_RESULT([ ceph . . . . . . . . $enable_ceph])
AC_MSG_RESULT([ cgroups . . . . . . . $enable_cgroups])
AC_MSG_RESULT([ chrony. . . . . . . . $enable_chrony])
+ AC_MSG_RESULT([ check_uptime. . . . . $enable_check_uptime])
AC_MSG_RESULT([ conntrack . . . . . . $enable_conntrack])
AC_MSG_RESULT([ contextswitch . . . . $enable_contextswitch])
AC_MSG_RESULT([ cpu . . . . . . . . . $enable_cpu])
AC_MSG_RESULT([ postgresql . . . . . $enable_postgresql])
AC_MSG_RESULT([ powerdns . . . . . . $enable_powerdns])
AC_MSG_RESULT([ processes . . . . . . $enable_processes])
+AC_MSG_RESULT([ procevent . . . . . . $enable_procevent])
AC_MSG_RESULT([ protocols . . . . . . $enable_protocols])
AC_MSG_RESULT([ python . . . . . . . $enable_python])
AC_MSG_RESULT([ redis . . . . . . . . $enable_redis])
AC_MSG_RESULT([ statsd . . . . . . . $enable_statsd])
AC_MSG_RESULT([ swap . . . . . . . . $enable_swap])
AC_MSG_RESULT([ synproxy . . . . . . $enable_synproxy])
+AC_MSG_RESULT([ sysevent. . . . . . . $enable_sysevent])
AC_MSG_RESULT([ syslog . . . . . . . $enable_syslog])
AC_MSG_RESULT([ table . . . . . . . . $enable_table])
AC_MSG_RESULT([ tail_csv . . . . . . $enable_tail_csv])
#@BUILD_PLUGIN_CEPH_TRUE@LoadPlugin ceph
#@BUILD_PLUGIN_CGROUPS_TRUE@LoadPlugin cgroups
#@BUILD_PLUGIN_CHRONY_TRUE@LoadPlugin chrony
+ #@BUILD_PLUGIN_CHECK_UPTIME_TRUE@LoadPlugin check_uptime
#@BUILD_PLUGIN_CONNTRACK_TRUE@LoadPlugin conntrack
#@BUILD_PLUGIN_CONTEXTSWITCH_TRUE@LoadPlugin contextswitch
@BUILD_PLUGIN_CPU_TRUE@@BUILD_PLUGIN_CPU_TRUE@LoadPlugin cpu
#@BUILD_PLUGIN_POSTGRESQL_TRUE@LoadPlugin postgresql
#@BUILD_PLUGIN_POWERDNS_TRUE@LoadPlugin powerdns
#@BUILD_PLUGIN_PROCESSES_TRUE@LoadPlugin processes
+#@BUILD_PLUGIN_PROCEVENT_TRUE@LoadPlugin procevent
#@BUILD_PLUGIN_PROTOCOLS_TRUE@LoadPlugin protocols
#@BUILD_PLUGIN_PYTHON_TRUE@LoadPlugin python
#@BUILD_PLUGIN_REDIS_TRUE@LoadPlugin redis
#@BUILD_PLUGIN_SNMP_AGENT_TRUE@LoadPlugin snmp_agent
#@BUILD_PLUGIN_STATSD_TRUE@LoadPlugin statsd
#@BUILD_PLUGIN_SWAP_TRUE@LoadPlugin swap
+#@BUILD_PLUGIN_SYSEVENT_TRUE@LoadPlugin sysevent
#@BUILD_PLUGIN_TABLE_TRUE@LoadPlugin table
#@BUILD_PLUGIN_TAIL_TRUE@LoadPlugin tail
#@BUILD_PLUGIN_TAIL_CSV_TRUE@LoadPlugin tail_csv
# </Process>
#</Plugin>
+#<Plugin "procevent">
+# BufferLength 10
+# ProcessRegex "/^ovs.*$/"
+# Process tuned
+#</Plugin>
+
#<Plugin protocols>
# Value "/^Tcp:/"
# IgnoreSelected false
# ReportIO true
#</Plugin>
+#<Plugin sysevent>
+# Listen "127.0.0.1" "6666"
+# BufferSize 1024
+# BufferLength 10
+# RegexFilter "regex"
+#</Plugin>
+
#<Plugin table>
# <Table "/proc/slabinfo">
# #Plugin "table"
# InterfaceFormat name
# PluginInstanceFormat name
# Instances 1
-# ExtraStats "cpu_util disk disk_err domain_state fs_info job_stats_background pcpu perf vcpupin disk_physical disk_allocation disk_capacity"
+# ExtraStats "cpu_util disk disk_err domain_state fs_info job_stats_background pcpu perf vcpu vcpupin disk_physical disk_allocation disk_capacity memory"
# PersistentNotification false
#</Plugin>
=back
+ =head2 Plugin C<check_uptime>
+
+ The I<check_uptime plugin> designed to check and notify about host or service
+ status based on I<uptime> metric.
+
+ When new metric of I<uptime> type appears in cache, OK notification is sent.
+ When new value for metric is less than previous value, WARNING notification is
+ sent about host/service restart.
+ When no new updates comes for metric and cache entry expires, then FAILURE
+ notification is sent about unreachable host or service.
+
+ By default (when no explicit configuration), plugin checks for I<uptime> metric.
+
+ B<Synopsis:>
+
+ <Plugin "check_uptime">
+ Type "uptime"
+ Type "my_uptime_type"
+ </Plugin>
+
+ =over 4
+
+ =item B<Type> I<Type>
+
+ Metric type to check for status/values. The type should consist single GAUGE
+ data source.
+
+ =back
+
=head2 Plugin C<chrony>
The C<chrony> plugin collects ntp data from a B<chronyd> server, such as clock
B<ProcessMatch> blocks these options set the default value for subsequent
matches.
+=head2 Plugin C<procevent>
+
+The I<procevent> plugin monitors when processes start (EXEC) and stop (EXIT).
+
+B<Synopsis:>
+
+ <Plugin procevent>
+ BufferLength 10
+ Process "name"
+ ProcessRegex "regex"
+ </Plugin>
+
+B<Options:>
+
+=over 4
+
+=item B<BufferLength> I<length>
+
+Maximum number of process events that can be stored in plugin's ring buffer.
+By default, this is set to 10. Once an event has been read, its location
+becomes available for storing a new event.
+
+=item B<Process> I<name>
+
+Enumerate a process name to monitor. All processes that match this exact
+name will be monitored for EXECs and EXITs.
+
+=item B<ProcessRegex> I<regex>
+
+Enumerate a process pattern to monitor. All processes that match this
+regular expression will be monitored for EXECs and EXITs.
+
+=back
+
=head2 Plugin C<protocols>
Collects a lot of information about various network protocols, such as I<IP>,
=back
+=head2 Plugin C<sysevent>
+
+The I<sysevent> plugin monitors rsyslog messages.
+
+B<Synopsis:>
+
+ <Plugin sysevent>
+ Listen "192.168.0.2" "6666"
+ BufferSize 1024
+ BufferLength 10
+ RegexFilter "regex"
+ </Plugin>
+
+ rsyslog should be configured such that it sends data to the IP and port you
+ include in the plugin configuration. For example, given the configuration
+ above, something like this would be set in /etc/rsyslog.conf:
+
+ if $programname != 'collectd' then
+ *.* @192.168.0.2:6666
+
+ This plugin is designed to consume JSON rsyslog data, so a more complete
+ rsyslog configuration would look like so (where we define a JSON template
+ and use it when sending data to our IP and port):
+
+ $template ls_json,"{%timestamp:::date-rfc3339,jsonf:@timestamp%, \
+ %source:::jsonf:@source_host%,\"@source\":\"syslog://%fromhost-ip:::json%\", \
+ \"@message\":\"%timestamp% %app-name%:%msg:::json%\",\"@fields\": \
+ {%syslogfacility-text:::jsonf:facility%,%syslogseverity:::jsonf:severity-num%, \
+ %syslogseverity-text:::jsonf:severity%,%programname:::jsonf:program%, \
+ %procid:::jsonf:processid%}}"
+
+ if $programname != 'collectd' then
+ *.* @192.168.0.2:6666;ls_json
+
+ Please note that these rsyslog.conf examples are *not* complete, as rsyslog
+ requires more than these options in the configuration file. These examples
+ are meant to demonstration the proper remote logging and JSON format syntax.
+
+B<Options:>
+
+=over 4
+
+=item B<Listen> I<host> I<port>
+
+Listen on this IP on this port for incoming rsyslog messages.
+
+=item B<BufferSize> I<length>
+
+Maximum allowed size for incoming rsyslog messages. Messages that exceed
+this number will be truncated to this size. Default is 4096 bytes.
+
+=item B<BufferLength> I<length>
+
+Maximum number of rsyslog events that can be stored in plugin's ring buffer.
+By default, this is set to 10. Once an event has been read, its location
+becomes available for storing a new event.
+
+=item B<RegexFilter> I<regex>
+
+Enumerate a regex filter to apply to all incoming rsyslog messages. If a
+message matches this filter, it will be published.
+
+=back
+
=head2 Plugin C<syslog>
=over 4
a domain. Only one type of job statistics can be collected at the same time.
Requires libvirt API version I<1.2.9> or later.
+=item B<memory>: report statistics about memory usage details, provided
+by libvirt virDomainMemoryStats() function.
+
=item B<pcpu>: report the physical user/system cpu time consumed by the hypervisor, per-vm.
Requires libvirt API version I<0.9.11> or later.
libvirt API version I<1.3.3> or later.
B<Note>: I<perf> metrics can't be collected if I<intel_rdt> plugin is enabled.
+=item B<vcpu>: report domain virtual CPUs utilisation.
+
=item B<vcpupin>: report pinning of domain VCPUs to host physical CPUs.
=item B<disk_physical>: report 'disk_physical' statistic for disk device.
};
typedef struct read_func_s read_func_t;
+ struct cache_event_func_s {
+ plugin_cache_event_cb callback;
+ char *name;
+ user_data_t user_data;
+ plugin_ctx_t plugin_ctx;
+ };
+ typedef struct cache_event_func_s cache_event_func_t;
+
struct write_queue_s;
typedef struct write_queue_s write_queue_t;
struct write_queue_s {
static llist_t *list_log;
static llist_t *list_notification;
+ static size_t list_cache_event_num;
+ static cache_event_func_t list_cache_event[32];
+
static fc_chain_t *pre_cache_chain;
static fc_chain_t *post_cache_chain;
static int register_callback(llist_t **list, /* {{{ */
const char *name, callback_func_t *cf) {
- llentry_t *le;
- char *key;
if (*list == NULL) {
*list = llist_create();
}
}
- key = strdup(name);
+ char *key = strdup(name);
if (key == NULL) {
ERROR("plugin: register_callback: strdup failed.");
destroy_callback(cf);
return -1;
}
- le = llist_search(*list, name);
+ llentry_t *le = llist_search(*list, name);
if (le == NULL) {
le = llentry_create(key, cf);
if (le == NULL) {
llist_append(*list, le);
} else {
- callback_func_t *old_cf;
-
- old_cf = le->value;
+ callback_func_t *old_cf = le->value;
le->value = cf;
P_WARNING("register_callback: "
cf->cf_callback = callback;
if (ud == NULL) {
cf->cf_udata = (user_data_t){
- .data = NULL, .free_func = NULL,
+ .data = NULL,
+ .free_func = NULL,
};
} else {
cf->cf_udata = *ud;
}
char name[THREAD_NAME_MAX];
- snprintf(name, sizeof(name), "reader#%" PRIu64, (uint64_t)read_threads_num);
+ ssnprintf(name, sizeof(name), "reader#%" PRIu64,
+ (uint64_t)read_threads_num);
set_thread_name(read_threads[read_threads_num], name);
read_threads_num++;
}
char name[THREAD_NAME_MAX];
- snprintf(name, sizeof(name), "writer#%" PRIu64,
- (uint64_t)write_threads_num);
+ ssnprintf(name, sizeof(name), "writer#%" PRIu64,
+ (uint64_t)write_threads_num);
set_thread_name(write_threads[write_threads_num], name);
write_threads_num++;
ERROR("plugin_set_dir: strdup(\"%s\") failed", dir);
}
-static bool plugin_is_loaded(char const *name) {
- int status;
-
+bool plugin_is_loaded(char const *name) {
if (plugins_loaded == NULL)
plugins_loaded =
c_avl_create((int (*)(const void *, const void *))strcasecmp);
assert(plugins_loaded != NULL);
- status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL);
+ int status = c_avl_get(plugins_loaded, name, /* ret_value = */ NULL);
return status == 0;
}
/* name = */ flush_name,
/* callback = */ plugin_flush_timeout_callback,
/* interval = */ ctx.flush_interval,
- /* user data = */ &(user_data_t){
- .data = cb, .free_func = plugin_flush_timeout_callback_free,
+ /* user data = */
+ &(user_data_t){
+ .data = cb,
+ .free_func = plugin_flush_timeout_callback_free,
});
sfree(flush_name);
return create_register_callback(&list_missing, name, (void *)callback, ud);
} /* int plugin_register_missing */
+ EXPORT int plugin_register_cache_event(const char *name,
+ plugin_cache_event_cb callback,
+ user_data_t const *ud) {
+
+ if (name == NULL || callback == NULL)
+ return EINVAL;
+
+ char *name_copy = strdup(name);
+ if (name_copy == NULL) {
+ P_ERROR("plugin_register_cache_event: strdup failed.");
+ free_userdata(ud);
+ return ENOMEM;
+ }
+
+ if (list_cache_event_num >= 32) {
+ P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried "
+ "to be registered.");
+ free_userdata(ud);
+ return ENOMEM;
+ }
+
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ if (!cef->callback)
+ continue;
+
+ if (strcmp(name, cef->name) == 0) {
+ P_ERROR("plugin_register_cache_event: a callback named `%s' already "
+ "registered!",
+ name);
+ free_userdata(ud);
+ return -1;
+ }
+ }
+
+ user_data_t user_data;
+ if (ud == NULL) {
+ user_data = (user_data_t){
+ .data = NULL, .free_func = NULL,
+ };
+ } else {
+ user_data = *ud;
+ }
+
+ list_cache_event[list_cache_event_num] =
+ (cache_event_func_t){.callback = callback,
+ .name = name_copy,
+ .user_data = user_data,
+ .plugin_ctx = plugin_get_ctx()};
+ list_cache_event_num++;
+
+ return 0;
+ } /* int plugin_register_cache_event */
+
EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) {
return create_register_callback(&list_shutdown, name, (void *)callback, NULL);
} /* int plugin_register_shutdown */
return plugin_unregister(list_missing, name);
}
+ EXPORT int plugin_unregister_cache_event(const char *name) {
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ if (!cef->callback)
+ continue;
+ if (strcmp(name, cef->name) == 0) {
+ /* Mark callback as inactive, so mask in cache entries remains actual */
+ cef->callback = NULL;
+ sfree(cef->name);
+ free_userdata(&cef->user_data);
+ }
+ }
+ return 0;
+ }
+
+ static void destroy_cache_event_callbacks() {
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ if (!cef->callback)
+ continue;
+ cef->callback = NULL;
+ sfree(cef->name);
+ free_userdata(&cef->user_data);
+ }
+ }
+
EXPORT int plugin_unregister_shutdown(const char *name) {
return plugin_unregister(list_shutdown, name);
}
* the data isn't freed twice. */
destroy_all_callbacks(&list_flush);
destroy_all_callbacks(&list_missing);
+ destroy_cache_event_callbacks();
destroy_all_callbacks(&list_write);
destroy_all_callbacks(&list_notification);
return 0;
} /* int }}} plugin_dispatch_missing */
+ void plugin_dispatch_cache_event(enum cache_event_type_e event_type,
+ unsigned long callbacks_mask, const char *name,
+ const value_list_t *vl) {
+ switch (event_type) {
+ case CE_VALUE_NEW:
+ callbacks_mask = 0;
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ plugin_cache_event_cb callback = cef->callback;
+
+ if (!callback)
+ continue;
+
+ cache_event_t event = (cache_event_t){.type = event_type,
+ .value_list = vl,
+ .value_list_name = name,
+ .ret = 0};
+
+ plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
+ int status = (*callback)(&event, &cef->user_data);
+ plugin_set_ctx(old_ctx);
+
+ if (status != 0) {
+ ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
+ "%i for event NEW.",
+ cef->name, status);
+ } else {
+ if (event.ret) {
+ DEBUG(
+ "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.",
+ cef->name, name);
+ callbacks_mask |= (1 << (i));
+ } else {
+ DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.",
+ cef->name, name);
+ }
+ }
+ }
+
+ if (callbacks_mask)
+ uc_set_callbacks_mask(name, callbacks_mask);
+
+ break;
+ case CE_VALUE_UPDATE:
+ case CE_VALUE_EXPIRED:
+ for (size_t i = 0; i < list_cache_event_num; i++) {
+ cache_event_func_t *cef = &list_cache_event[i];
+ plugin_cache_event_cb callback = cef->callback;
+
+ if (!callback)
+ continue;
+
+ if (callbacks_mask && (1 << (i)) == 0)
+ continue;
+
+ cache_event_t event = (cache_event_t){.type = event_type,
+ .value_list = vl,
+ .value_list_name = name,
+ .ret = 0};
+
+ plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
+ int status = (*callback)(&event, &cef->user_data);
+ plugin_set_ctx(old_ctx);
+
+ if (status != 0) {
+ ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
+ "%i for event %s.",
+ cef->name, status,
+ ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED"));
+ }
+ }
+ break;
+ }
+ return;
+ }
+
static int plugin_dispatch_values_internal(value_list_t *vl) {
int status;
static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
};
typedef struct user_data_s user_data_t;
+ enum cache_event_type_e { CE_VALUE_NEW, CE_VALUE_UPDATE, CE_VALUE_EXPIRED };
+
+ typedef struct cache_event_s {
+ enum cache_event_type_e type;
+ const value_list_t *value_list;
+ const char *value_list_name;
+ int ret;
+ } cache_event_t;
+
struct plugin_ctx_s {
char *name;
cdtime_t interval;
* callbacks should be called, greater than zero if no more callbacks should be
* called. */
typedef int (*plugin_missing_cb)(const value_list_t *, user_data_t *);
+ /* "cache event" callback. CE_VALUE_NEW events are sent to all registered
+ * callbacks. Callback should check if it interested in further CE_VALUE_UPDATE
+ * and CE_VALUE_EXPIRED events for metric and set event->ret = 1 if so.
+ */
+ typedef int (*plugin_cache_event_cb)(cache_event_t *, user_data_t *);
typedef void (*plugin_log_cb)(int severity, const char *message, user_data_t *);
typedef int (*plugin_shutdown_cb)(void);
typedef int (*plugin_notification_cb)(const notification_t *, user_data_t *);
* this case.
*/
int plugin_load(const char *name, bool global);
+bool plugin_is_loaded(char const *name);
int plugin_init_all(void);
void plugin_read_all(void);
user_data_t const *user_data);
int plugin_register_missing(const char *name, plugin_missing_cb callback,
user_data_t const *user_data);
+ int plugin_register_cache_event(const char *name,
+ plugin_cache_event_cb callback,
+ user_data_t const *ud);
int plugin_register_shutdown(const char *name, plugin_shutdown_cb callback);
int plugin_register_data_set(const data_set_t *ds);
int plugin_register_log(const char *name, plugin_log_cb callback,
int plugin_unregister_write(const char *name);
int plugin_unregister_flush(const char *name);
int plugin_unregister_missing(const char *name);
+ int plugin_unregister_cache_event(const char *name);
int plugin_unregister_shutdown(const char *name);
int plugin_unregister_data_set(const char *name);
int plugin_unregister_log(const char *name);
int store_type, ...);
int plugin_dispatch_missing(const value_list_t *vl);
+ void plugin_dispatch_cache_event(enum cache_event_type_e event_type,
+ unsigned long callbacks_mask, const char *name,
+ const value_list_t *vl);
int plugin_dispatch_notification(const notification_t *notif);
size_t history_length;
meta_data_t *meta;
+ unsigned long callbacks_mask;
} cache_entry_t;
struct uc_iter_s {
static int uc_insert(const data_set_t *ds, const value_list_t *vl,
const char *key) {
- char *key_copy;
- cache_entry_t *ce;
-
/* `cache_lock' has been locked by `uc_update' */
- key_copy = strdup(key);
+ char *key_copy = strdup(key);
if (key_copy == NULL) {
ERROR("uc_insert: strdup failed.");
return -1;
}
- ce = cache_alloc(ds->ds_num);
+ cache_entry_t *ce = cache_alloc(ds->ds_num);
if (ce == NULL) {
sfree(key_copy);
ERROR("uc_insert: cache_alloc (%" PRIsz ") failed.", ds->ds_num);
ce->interval = vl->interval;
ce->state = STATE_UNKNOWN;
+ if (vl->meta != NULL) {
+ ce->meta = meta_data_clone(vl->meta);
+ }
+
if (c_avl_insert(cache_tree, key_copy, ce) != 0) {
sfree(key_copy);
ERROR("uc_insert: c_avl_insert failed.");
char *key;
cdtime_t time;
cdtime_t interval;
+ unsigned long callbacks_mask;
} *expired = NULL;
size_t expired_num = 0;
expired[expired_num].key = strdup(key);
expired[expired_num].time = ce->last_time;
expired[expired_num].interval = ce->interval;
+ expired[expired_num].callbacks_mask = ce->callbacks_mask;
if (expired[expired_num].key == NULL) {
ERROR("uc_check_timeout: strdup failed.");
* plugin calls the cache interface. */
for (size_t i = 0; i < expired_num; i++) {
value_list_t vl = {
- .time = expired[i].time, .interval = expired[i].interval,
+ .time = expired[i].time,
+ .interval = expired[i].interval,
};
if (parse_identifier_vl(expired[i].key, &vl) != 0) {
}
plugin_dispatch_missing(&vl);
+
+ if (expired[i].callbacks_mask)
+ plugin_dispatch_cache_event(CE_VALUE_EXPIRED, expired[i].callbacks_mask,
+ expired[i].key, &vl);
} /* for (i = 0; i < expired_num; i++) */
/* Now actually remove all the values from the cache. We don't re-evaluate
int uc_update(const data_set_t *ds, const value_list_t *vl) {
char name[6 * DATA_MAX_NAME_LEN];
- cache_entry_t *ce = NULL;
- int status;
if (FORMAT_VL(name, sizeof(name), vl) != 0) {
ERROR("uc_update: FORMAT_VL failed.");
pthread_mutex_lock(&cache_lock);
- status = c_avl_get(cache_tree, name, (void *)&ce);
+ cache_entry_t *ce = NULL;
+ int status = c_avl_get(cache_tree, name, (void *)&ce);
if (status != 0) /* entry does not yet exist */
{
status = uc_insert(ds, vl, name);
pthread_mutex_unlock(&cache_lock);
+
+ if (status == 0)
+ plugin_dispatch_cache_event(CE_VALUE_NEW, 0 /* mask */, name, vl);
+
return status;
}
ce->last_update = cdtime();
ce->interval = vl->interval;
+ /* Check if cache entry has registered callbacks */
+ unsigned long callbacks_mask = ce->callbacks_mask;
+
pthread_mutex_unlock(&cache_lock);
+ if (callbacks_mask)
+ plugin_dispatch_cache_event(CE_VALUE_UPDATE, callbacks_mask, name, vl);
+
return 0;
} /* int uc_update */
+ int uc_set_callbacks_mask(const char *name, unsigned long mask) {
+ pthread_mutex_lock(&cache_lock);
+ cache_entry_t *ce = NULL;
+ int status = c_avl_get(cache_tree, name, (void *)&ce);
+ if (status != 0) { /* Ouch, just created entry disappeared ?! */
+ ERROR("uc_set_callbacks_mask: Couldn't find %s entry!", name);
+ pthread_mutex_unlock(&cache_lock);
+ return -1;
+ }
+ DEBUG("uc_set_callbacks_mask: set mask for \"%s\" to %lu.", name, mask);
+ ce->callbacks_mask = mask;
+ pthread_mutex_unlock(&cache_lock);
+ return 0;
+ }
+
int uc_get_rate_by_name(const char *name, gauge_t **ret_values,
size_t *ret_values_num) {
gauge_t *ret = NULL;
if ((iter == NULL) || (iter->entry == NULL) || (ret_values == NULL) ||
(ret_num == NULL))
return -1;
-
*ret_values =
calloc(iter->entry->values_num, sizeof(*iter->entry->values_raw));
if (*ret_values == NULL)
return -1;
for (size_t i = 0; i < iter->entry->values_num; ++i)
- *ret_values[i] = iter->entry->values_raw[i];
+ (*ret_values)[i] = iter->entry->values_raw[i];
*ret_num = iter->entry->values_num;