Merge pull request #3269 from bplessis/feature-add-snmpbulk
authorMatthias Runge <mrunge@redhat.com>
Sat, 2 Nov 2019 21:12:06 +0000 (22:12 +0100)
committerGitHub <noreply@github.com>
Sat, 2 Nov 2019 21:12:06 +0000 (22:12 +0100)
snmp plugin: add support for SNMP Bulk Transfer

30 files changed:
.travis.yml
AUTHORS
ChangeLog
Makefile.am
README
configure.ac
contrib/redhat/collectd.spec
src/check_uptime.c [new file with mode: 0644]
src/collectd.conf.in
src/collectd.conf.pod
src/collectdctl.c
src/connectivity.c [new file with mode: 0644]
src/contextswitch.c
src/daemon/plugin.c
src/daemon/plugin.h
src/daemon/utils_cache.c
src/daemon/utils_cache.h
src/exec.c
src/grpc.cc
src/java.c
src/lvm.c [deleted file]
src/network.c
src/procevent.c [new file with mode: 0644]
src/sysevent.c [new file with mode: 0644]
src/turbostat.c
src/types.db
src/utils/common/common.c
src/utils/common/common_test.c
src/zfs_arc.c
version-gen.sh

index 4ba4e2a..f715343 100644 (file)
@@ -12,17 +12,21 @@ env:
 matrix:
   include:
     - os: osx
-      osx_image: xcode10.1
+      osx_image: xcode11.2
       compiler: clang
+      jdk: openjdk10
       env:
         - CXX=clang++
         - PATH="/usr/local/opt/mysql-client/bin:$PATH"
+        - JAVA_HOME="/Library/Java/JavaVirtualMachines/openjdk-13.jdk/Contents/Home"
     - os: linux
       dist: xenial
       compiler: clang
+      jdk: openjdk10
     - os: linux
       dist: xenial
       compiler: gcc
+      jdk: openjdk10
 
 before_install:
   # When building the coverity_scan branch, allow only the first job to continue to avoid travis-ci/travis-ci#1975.
@@ -32,7 +36,10 @@ before_script: autoreconf -vif
 
 script:
   - if [[ "${TRAVIS_BRANCH}" == "coverity_scan" ]]; then exit 0; fi
+  - type pkg-config
+  - pkg-config --list-all | sort -u
   - ./configure
+  - cat config.log
   - make distcheck DISTCHECK_CONFIGURE_FLAGS="--disable-dependency-tracking --enable-debug"
 
 addons:
@@ -56,7 +63,6 @@ addons:
     - liblua50-dev
     - liblua5.1-0-dev
     - liblua5.2-dev
-    - liblvm2-dev
     - libmemcached-dev
     - libmicrohttpd-dev
     - libmnl-dev
@@ -78,7 +84,6 @@ addons:
     - libsensors4-dev
     - libsigrok-dev
     - libsnmp-dev
-    - libstatgrab-dev
     - libtokyocabinet-dev
     - libtokyotyrant-dev
     - libudev-dev
@@ -106,6 +111,7 @@ addons:
     packages:
     - curl
     - glib
+    - grpc
     - hiredis
     - libdbi
     - libmemcached
@@ -115,9 +121,10 @@ addons:
     - liboping
     - libpcap
     - librdkafka
-    - libstatgrab
     - libvirt
+    - libxml2
     - lua
+    - mongo-c-driver
     - mosquitto
     - mysql-client
     - net-snmp
diff --git a/AUTHORS b/AUTHORS
index d05b86f..d8548d1 100644 (file)
--- a/AUTHORS
+++ b/AUTHORS
@@ -56,11 +56,23 @@ Amit Gupta <amit.gupta221 at gmail.com>
 Andreas Henriksson <andreas at fatal.se>
  - libmnl support in the netlink plugin.
 
+Andrew Bays <abays at redhat.com>
+ - connectivity plugin.
+ - procevent plugin.
+ - sysevent plugin.
+ - connectivity plugin.
+
 Andy Parkins <andyp at fussylogic.co.uk>
  - battery plugin: sysfs code.
 
+Aneesh Puttur <aputtur at redhat.com>
+ - connectivity plugin.
+
 Andy Smith <ansmith at redhat.com>
  - AMQP 1.0 plugin.
+Aneesh Puttur <aputtur at redhat.com>
+ - connectivity plugin.
 
 Anthony Dewhurst <dewhurst at gmail.com>
  - zfs_arc plugin.
index 7ea5a07..e558a26 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,300 @@
+2019-10-01, Version 5.9.2
+       * syslog plugin: Don't fail if syslog loglevel doesn't match. Thanks to
+         Fabien Wernli. #3236 #3238
+       * collectd: Fix ssnprintf wrapper. Thanks to Fabien Wernli. #3237
+       * rdt plugin: Fix compile time issues. Thanks to Matthias Runge. #3245
+
+2019-07-24, Version 5.9.1
+       * collectd: redhat spec: fix build due to new upstream plugins. Thanks
+         to Fabien Wernli. #3175
+       * collectd: regex match: Fix unexpected match with empty meta data .
+         Thanks to Takuro Ashie. #3178
+       * collectd: Fix return value or loglevel for several plugins. Thanks to
+         Fabien Wernli. #3182
+       * collectd: Add standard include early or _FILE_OFFSET_BITS will have
+         definition … . Thanks to Dagobert Michelsen. #3193
+       * collectd: Use GCC-specific flags only when compiling with GCC. Thanks
+         to Dagobert Michelsen. #3195
+       * Use test_utils_proc_pids only when compiling the plugin that uses it.
+         Thanks to Dagobert Michelsen. #3197
+       * DNS plugin: Do not use headers from glibc. Thanks to Pavel Rochnyak.
+         #3156, #3145
+       * collectd: Add missing definitions for libnetsnmpagent. Thanks to
+         Dagobert Michelsen. #3203
+       * collectd: Move Makefile rules for pid_test inside conditional for
+         code. Thanks to Dagobert Michelsen. #3206
+       * collectd: Recover setlocale() call in src/daemon/collectd.c do_init().
+         Thanks to Pavel Rochnyak. #3214, #3181
+       * collectd: Add snprintf wrapper for GCC 8.2/3. Thanks to zebity. #3153,
+         #2895, #3038
+       * collectd: Fix bug that leads to CPPFLAGS gets overridden with CFLAGS
+         when libxmms is enabled. Thanks to Dagobert Michelsen. #3207
+       * Write_Riemann plugin: Copy MetaData to Riemann events in
+         write_riemann. Thanks to Romain Tartière. #3158
+       * virt plugin: Fix memory leak with libvirt MetadataXPath enabled.
+         Thanks to Pavel Rochnyak. #3225, #3230
+
+2019-06-13, Version 5.9.0
+       * Build System: configure.ac: option "--with-libxml2" has been added.
+         Thanks to Dimitrios Apostolou, Pavel Rochnyak. #2864
+       * Build System: configure.ac: run dpdk build tests only if pkgconfig
+         fails. Thanks to Luca Boccassi, Pavel Rochnyak. #3015
+       * Build System: The "df" plugin is now built when "getmntent_r()" is
+         available. Thanks to Florian Forster. #3095
+       * Build System: The ability to turn on collectd "debug" feature in RPMs
+         has been added. Thanks to dehotot. #2755
+       * collectd: A new "UNKNOWN" state as the initial state of metrics has
+         been added. Thanks to Luis Fernández Álvarez, Florian Forster. #2976
+       * collectd: Base port to Windows. Thanks to Sean Campbell. #2810
+       * collectd: Code ownership of five plugins has been handed out to folks
+         from Intel. Thanks to Florian Forster. #3053
+       * collectd: config parser: Improved error reporting on global options.
+         Thanks to Pavel Rochnyak. #2813
+       * collectd: daemon: make plugin_dispatch_multivalue() obey write queue
+         limits. Thanks to Adam Romanek. #2898
+       * collectd: Macros "STRERROR" and "STRERRNO" have been added. Thanks to
+         Florian Forster. #2519
+       * collectd: Plugin name field has been added to plugin context to
+         improve error reporting. Thanks to Pavel Rochnyak. #2821
+       * collectd-tg: Use "CLOCK_REALTIME" for collectd-tg times. Thanks to
+         Andrew Bays. #2837
+       * tree-wide: Don't initialize static pointers to NULL, use "bool" from
+         "stdbool.h" (instead of "_Bool"). Thanks to Ruben Kerkhof. #2771,
+         #2772
+       * tree-wide: Replace zu with "PRIsz" and llu with "PRIu64". Thanks to
+         Sean Campbell. #2512
+       * tree-wide: Use interval value from plugin context, do not set
+         "vl->interval" in plugins more. Thanks to Pavel Rochnyak. #2847
+       * tree-wide: Utilities and libraries have been moved to "src/utils/".
+         Thanks to Florian Forster. #2961
+       * AMPQ1 plugin: A new plugin to write to amqp1 protocol. Thanks to Andy
+         Smith. #2618
+       * Chrony plugin: Ignoring late responses has been added. Thanks to
+         Miroslav Lichvar, Pavel Rochnyak. #2896
+       * CPUFreq plugin: Read number of p-state transitions and time spent in
+         each p-state. Thanks to Sexton Rory. #2803
+       * cURL, cURL-XML plugins: Option "Interval" has been added. Thanks to
+         Pavel Rochnyak. #2847
+       * Disk plugin: Report number of in progress disk IO requests on FreeBSD.
+         Thanks to Nathan Huff. #2878
+       * Exec plugin: Dynamic allocation of grname buffer has been added.
+         Thanks to sreedi, Florian Forster. #2937
+       * GPU NVML plugin: New plugin to collect NVIDIA GPU stats. Thanks to
+         Evgeny Naumov. #2923
+       * gRPC plugin: The "VerifyPeer" option for servers has been added.
+         Thanks to Florian Forster. #2593
+       * Intel RDT plugin: Support for groups of PIDs has been added. Thanks to
+         Wojciech Andralojc, Mateusz Starzyk, Michal Aleksinski. #2891
+       * IPMI plugin: Config options "SELSensor" and "SELIgnoreSelected" have
+         been added. Thanks to Mariusz Szafranski. #2796
+       * Modbus plugin: Support for 64 bit vals has been added, support for
+         CDAB endian 32-bit modbus polls has been added. Thanks to Anthony
+         Vickers, PJ Bostley. #2670, #2660
+       * Modbus plugin: The "Scale" and "Shift" metrics have been added. Thanks
+         to cekstam. #2729
+       * Netlink plugin: Handle new counter from Linux kernel version 4.6+.
+         Thanks to Pavel Rochnyak. #2767
+       * Network plugin: Option "BindAddress" has been added. Thanks to Ofir
+         Hermesh. #2831
+       * Ping plugin: An "AddressFamily" configuration option has been added.
+         Thanks to 依云 lilydjwg. #2961
+       * OVS Stats plugin: Extended metrics "ovs-dpdk" have been added. Thanks
+         to Matteo Croce, Ryan McCabe. #3000
+       * OVS Stats plugin: Support of bond interface and a "InterfaceStats"
+         config option have been added. Thanks to Andrew Bays. #2880
+       * PCIe Errors plugin: New plugin to read "PCIe" errors. Thanks to Kamil
+         Wiatrowski. #2733
+       * Processes plugin: Support for Linux Delay Accounting has been added.
+         Thanks to Florian Forster. #2598
+       * Redis plugin: Keyspace "hitratio" metric has been added, metric
+         "operations_per_second" has been removed, an option for connecting via
+         UNIX socket has been added. Thanks to Pavel Rochnyak. #2838, #2845,
+         #2904
+       * RouterOS plugin: Support for temperature and voltage data has been
+         added, use MAC-address when Radio-name is missing. Thanks to Pavel
+         Rochnyak. #2851, #2854
+       * RRDCacheD plugin: Time resolution has been improved to microseconds.
+         Thanks to Brian T. O'Neill. #3065
+       * Sensors plugin: Checks for upper limit of "SENSORS_API_VERSION" have
+         been removed, support for libsensors older than 3.0.0 has been
+         dropped. Thanks to Pavel Rochnyak. #3013, #3014
+       * SNMP plugin: New options "PluginInstance", "TypeInstance",
+         "TypeInstanceOID", "PluginInstanceOID", "FilterOID", "FilterValues"
+         and "FilterIgnoreSelected" have been added. Thanks to Pavel Rochnyak.
+         #2817, #2819
+       * SNMP Agent plugin: Multiple key indexes to snmp table and other new
+         features have been added, refactoring, coverity scan issues have been
+         fixed. Thanks to Marcin Mozejko. #2702, #2844
+       * Swap plugin: Support for Linux 2.4 has been dropped. Thanks to Pavel
+         Rochnyak. #2979
+       * Turbostat plugin: Configuration option "RestoreAffinityPolicy" has
+         been added. Thanks to Pablo Llopis. #2627
+       * Turbostat plugin: New metrics "P-states", "Turboboost", "Platform
+         TDP", "Uncore bus ratio" have been added. Thanks to Sexton Rory. #2806
+       * Turbostat plugin: Support of reporting GPU power on SKL has been
+         added. Thanks to Gordon Kelly. #2605
+       * virt plugin: Allow read "Hostname" from libvirt metadata. Thanks to
+         Mehdi ABAAKOUK. #2807
+       * virt plugin: Block info statistics for disk devices have been added.
+         Thanks to Radoslaw Jablonski. #2874
+       * Wireless plugin: A "bitrate" metric has been added. Thanks to Florian
+         Forster. #2950
+       * Write Graphite, Write Kafka plugins: Support for Graphite 1.1+ tag has
+         been added. Thanks to Dan Cech. #2631
+       * Write Prometheus plugin: Option "Host" has been added. Thanks to Pavel
+         Rochnyak. #2969
+       * Write Stackdriver plugin: New plugin to write to Google Stackdriver
+         Monitoring. Thanks to Florian Forster. #2472
+       * Write Syslog plugin: "write_syslog" plugin writes values lists as
+         syslog messages. Thanks to Shirly Radco. #3019
+       * Build System: A warning that pkgdatadir and pkglibdir were previously
+         defined has been fixed, additional plugins have been enabled,
+         GNULIB_DIR has been added to LDFLAGS in configure.ac on Windows.
+         Thanks to Sean Campbell. #2907, #2885, #2882
+       * Build System: Including "utils/mount/mount.h" has been fixed. Thanks
+         to Florian Forster. #3097
+       * Build System: The amount of output from ./configure has been reduced,
+         rendering of collectd-lua(5) manpage has been fixed, don't hide errors
+         when creating manpage. Thanks to Ruben Kerkhof. #3086, #3088, #3092
+       * collectd: A bug in "c_avl_iterator_prev" has been fixed. Thanks to
+         volth. #2917
+       * collectd: A stringop compiler warning has been fixed. Thanks to Ruben
+         Kerkhof, Juan Osorio Robles. #3021
+       * collectd: An invalid memory access in the "strjoin()" function has
+         been fixed. Thanks to Florian Forster. #3063
+       * collectd: collectd binary has been refactored. Thanks to Sean
+         Campbell, Sebastian Harl. #2745
+       * collectd: collectdmon cannot exit command line options parse loop has
+         been fixed. Thanks to takahashi-tsc. #2774
+       * collectd: Endianness checks for AIX have been added, gcc issue on Mac
+         byteorder has been fixed, fallback for endianness conversion has been
+         added. Thanks to Dagobert Michelsen. #2761, #2741, #2717
+       * collectd: Handle failure of simple config callbacks. Thanks to Ruben
+         Kerkhof. #3085
+       * collectd: Include "kstat.h" if available to provide "kstat_ctl_t",
+         include "kstat.h" when available. Thanks to Dagobert Michelsen. #2716,
+         #2711
+       * collectd: Parsing option for avoiding making BaseDir has been fixed.
+         Thanks to Mariusz Białończyk. #2856
+       * collectd: Remove empty "cmd_listval_t" data structure and related
+         no-op code. Thanks to Pavel Rochnyak. #2779
+       * collectd: src/daemon/plugin.c: Refactor plugin_load_file(),
+         src/utils_format_json.c: Remove chatty debug messages. Thanks to
+         Florian Forster. #2558, #2938
+       * collectd: Stop poisoning function in debug mode. Thanks to Ruben
+         Kerkhof. #2804
+       * collectd: The number of allocations when parsing types.db has been
+         reduced. Thanks to Ruben Kerkhof. #3091
+       * collectd: The organization of the source repository has been improved.
+         Thanks to Florian Forster. #2961
+       * collectd: Typos have been fixed. Thanks to Florian Forster, Jakub
+         Jankowski, William Pursell. #2944, #2692, #2643
+       * tree-wide: cleanup: cf_util_get* instead of local copy in plugins,
+         prefixed error reporting. Thanks to Pavel Rochnyak. #2833
+       * tree-wide: Some style issues have been fixed. Thanks to Ruben Kerkhof.
+         #3022
+       * tree-wide: "sstrerror()" has been replaced with "STRERRNO". Thanks to
+         Pavel Rochnyak. #2735
+       * AMQP1 plugin: Potential memory leaks found via scan-build have been
+         fixed, a typo in error log message has been fixed, cleanups. Thanks to
+         Andy Smith, Andrew Bays, Ruben Kerkhof. #2802, #2876, #2797
+       * Barometer plugin: Support to "libi2c-4.0" has been added. Thanks to
+         Pavel Rochnyak. #2783
+       * DBI, Oracle, PostgreSQL plugins: Fixes and improvements. Thanks to
+         Pavel Rochnyak. #1705
+       * Disk plugin: "HAVE_UDEV_H" has been changed to "HAVE_LIBUDEV_H".
+         Thanks to Dylan Stephano-Shachter. #2668
+       * Disk plugin: In linux, reset the disk when it disappears from
+         "/proc/diskstats". Thanks to Nikita Kozlov, Pavel Rochnyak. #2551
+       * DPDK Events, DPDK Stats plugins: Buffer size for parsing lcores has
+         been increased, a deprecation warning has been fixed, runtime config
+         file path has been fixed. Thanks to Kevin Laatz. #2722, #2840, #2924
+       * DPDK Stats plugin: A compilation issue has been fixed. Thanks to
+         Volodymyr Mytnyk. #2524
+       * GPS plugin: Build with gpsd version 3.18 has been fixed. Thanks to
+         Baruch Siach. #2947
+       * Intel RDT plugin: Compiler warnings have been fixed. Thanks to Ruben
+         Kerkhof. #3104
+       * Log Logstash plugin: Non-portable struct initialization with "{}" has
+         been fixed. Thanks to Florian Forster. #2988
+       * LUA plugin: A memory leak has been fixed. Thanks to Ruben Kerkhof.
+         #3090
+       * MySQL plugin: Properly cleanup dropped MySQL connections. Thanks to
+         Dhrupad Bhardwaj. #2704
+       * Netlink plugin: Truncation warnings have been fixed. Thanks to Ruben
+         Kerkhof. #2777
+       * NFS plugin: Message "Unexpected number of fields for NFSv4 server
+         statistics: 62" has been fixed. Thanks to Yedidyah Bar David. #2076
+       * NFS plugin: Number of fields for "NFSv4" has been fixed. Thanks to
+         Jan-Philipp Litza. #2915
+       * Notify Email plugin: All notification parameters have been included
+         into email. Thanks to Pavel Rochnyak. #2834
+       * NTPd plugin: Don't treat normal peers as refclocks, skip "0.0.0.0"
+         hosts in ntpd plugin. Thanks to Pavel Rochnyak, Ivan Kurnosov. #2822,
+         #2376
+       * OAuth plugin: src/utils_oauth.c: Renew OAuth tokens 30 seconds before
+         they expire. Thanks to Florian Forster. #2970
+       * OVS Stats plugin: A macro to populate counters list has been added,
+         value of "OpenFlow" has been corrected. Thanks to Matteo Croce. #2966,
+         #2963
+       * OVS Stats plugin: Code style, cleanup and improvements. Thanks to
+         Pavel Rochnyak. #3011, #3012
+       * OVS Stats, OVS Events plugins: utils_ovs: Avoid potential access of
+         freed memory, fixes. Thanks to Ciara Loftus, Mark Kavanagh. #2801,
+         #2731
+       * Processes plugin: Compilation has been fixed when ps_delay() is not
+         used. Thanks to Pavel Rochnyak. #2610
+       * Python plugin: A compilation warning with Python 3.7 has been fixed.
+         Thanks to Manoj Srivastava. #3042
+       * Redis plugin: Bugfixes, extended error reporting, persistent
+         connections and parallel polling, ability to select db for queries has
+         been fixed. Thanks to Pavel Rochnyak, skob. #2826, #2789
+       * Router OS plugin: Unset radio-name showing up as "(null)" has been
+         fixed. Thanks to melak. #2740
+       * RRDCacheD plugin: Cleanup rrdcached plugin a bit. Thanks to Pavel
+         Rochnyak. #3080
+       * RRDTool plugin: Error reporting has been extended. Thanks to Pavel
+         Rochnyak. #2825
+       * Sensors plugin: Support for humidity sensors has been added. Thanks to
+         Sarah Fischmann. #2913
+       * Sensu, OVS Stat, Turbostat, virt, OAuth, Write Prometheus, Intel RDT
+         plugins: Compiler warnings have been fixed. Thanks to Ruben Kerkhof.
+         #3093, #3098, #3099, #3100, #3102, #3103, #3104
+       * virt plugin: Code "do {} while(0)" around macro has been removed.
+         Thanks to Florian Forster. #2579
+       * virt plugin: Compiler warnings, a segfault in libvirt, typo in error
+         messages have been fixed. Thanks to Antoine Naud, Ruben Kerkhof, sarah
+         niuxu18. #2808, #2919, #2957
+       * virt plugin: Optional "virDomainGetCPUStats()" has been removed from
+         main flow, cleanup. Thanks to Pavel Rochnyak. #2972, #2978
+       * virt plugin: Tracking of VM state changes has been fixed. Thanks to
+         Radoslaw Jablonski. #2701
+       * Write MongoDB plugin: Plugin dependencies have been fixed. Thanks to
+         Pavel Rochnyak. #3010
+       * Write Prometheus plugin: A compilation issue on Mac OS X has been
+         fixed. Thanks to Florian Forster. #3059
+       * Write Redis plugin: Bug ""max_set_duration" deletes unexpected data"
+         has been fixed. Thanks to takahashi-tsc. #2773
+       * Write Stackdriver plugin: Potential NULL dereference and error
+         reporting have been fixed. Thanks to Florian Forster. #2960
+       * collectd.conf(5): a typo has been fixed, the tail plugin's
+         documentation has been improved. Thanks to Ruben Kerkhof, Florian
+         Forster. #3087, #2994
+       * collectd.conf.pod: virt: "Instances" option has been documented, a
+         clarifying example has been added. Thanks to Pavel Rochnyak, Fabien
+         Wernli. #2990, #2903
+       * collectd-python: "Import" configuration option has been documented.
+         Thanks to Tyler Harper. #2985
+       * collectd-snmp.pod: Document thread usage correctly. Thanks to Nathan
+         Ward. #3078
+       * CONTRIBUTING.md: Improve wording around ChangeLog; fix example,
+         document the new change log requirement / behavior. Thanks to Florian
+         Forster. #3061, #3054
+       * docs/review_comments.md: document with frequent review comments has
+         been started. Thanks to Florian Forster. #2964
+       * README: Include compiler defenses suggestion, do not point users to
+         non-existing file. Thanks to Kevin Laatz, Ruben Kerkhof. #2721, #2045
+
 2018-10-23, Version 5.8.1
        * collectd: Fix "BaseDir" option. Thanks to Mariusz Białończyk and
          Pavel Rochnyak. #2857
index 2e16e5c..258603f 100644 (file)
@@ -136,6 +136,7 @@ noinst_LTLIBRARIES = \
        libheap.la \
        libignorelist.la \
        liblatency.la \
+       libllist.la \
        liblookup.la \
        libmetadata.la \
        libmount.la \
@@ -244,8 +245,6 @@ collectd_SOURCES = \
        src/daemon/utils_cache.h \
        src/daemon/utils_complain.c \
        src/daemon/utils_complain.h \
-       src/daemon/utils_llist.c \
-       src/daemon/utils_llist.h \
        src/daemon/utils_random.c \
        src/daemon/utils_random.h \
        src/daemon/utils_subst.c \
@@ -265,6 +264,7 @@ collectd_LDADD = \
        libavltree.la \
        libcommon.la \
        libheap.la \
+       libllist.la \
        liboconfig.la \
        -lm \
        $(COMMON_LIBS) \
@@ -393,6 +393,10 @@ libignorelist_la_SOURCES = \
        src/utils/ignorelist/ignorelist.c \
        src/utils/ignorelist/ignorelist.h
 
+libllist_la_SOURCES = \
+       src/daemon/utils_llist.c \
+       src/daemon/utils_llist.h
+
 libmetadata_la_SOURCES = \
        src/utils/metadata/meta_data.c \
        src/utils/metadata/meta_data.h
@@ -768,6 +772,21 @@ chrony_la_LDFLAGS = $(PLUGIN_LDFLAGS)
 chrony_la_LIBADD = -lm
 endif
 
+if BUILD_PLUGIN_CHECK_UPTIME
+pkglib_LTLIBRARIES += check_uptime.la
+check_uptime_la_SOURCES = src/check_uptime.c
+check_uptime_la_LDFLAGS = $(PLUGIN_LDFLAGS)
+endif
+
+if BUILD_PLUGIN_CONNECTIVITY
+pkglib_LTLIBRARIES += connectivity.la
+connectivity_la_SOURCES = src/connectivity.c
+connectivity_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBMNL_CFLAGS)
+connectivity_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS)
+connectivity_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS)
+connectivity_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) $(BUILD_WITH_LIBMNL_LIBS) libignorelist.la
+endif
+
 if BUILD_PLUGIN_CONNTRACK
 pkglib_LTLIBRARIES += conntrack.la
 conntrack_la_SOURCES = src/conntrack.c
@@ -1207,14 +1226,6 @@ lua_la_LDFLAGS = $(PLUGIN_LDFLAGS)
 lua_la_LIBADD = $(BUILD_WITH_LIBLUA_LIBS)
 endif
 
-if BUILD_PLUGIN_LVM
-pkglib_LTLIBRARIES += lvm.la
-lvm_la_SOURCES = src/lvm.c
-lvm_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBLVM2APP_CPPFLAGS)
-lvm_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBLVM2APP_LDFLAGS)
-lvm_la_LIBADD = $(BUILD_WITH_LIBLVM2APP_LIBS)
-endif
-
 if BUILD_PLUGIN_MADWIFI
 pkglib_LTLIBRARIES += madwifi.la
 madwifi_la_SOURCES = \
@@ -1405,6 +1416,12 @@ test_plugin_network_LDADD = \
        libplugin_mock.la \
        libmetadata.la \
        $(GCRYPT_LIBS)
+if BUILD_WITH_LIBSOCKET
+test_plugin_network_LDADD += -lsocket
+endif
+if BUILD_WITH_LIBNSL
+test_plugin_network_LDADD += -lnsl
+endif
 check_PROGRAMS += test_plugin_network
 endif
 
@@ -1646,6 +1663,14 @@ processes_la_LIBADD += libtaskstats.la
 endif
 endif
 
+if BUILD_PLUGIN_PROCEVENT
+pkglib_LTLIBRARIES += procevent.la
+procevent_la_SOURCES = src/procevent.c
+procevent_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS)
+procevent_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS)
+procevent_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) libignorelist.la
+endif
+
 if BUILD_PLUGIN_PROTOCOLS
 pkglib_LTLIBRARIES += protocols.la
 protocols_la_SOURCES = src/protocols.c
@@ -1794,6 +1819,14 @@ synproxy_la_SOURCES = src/synproxy.c
 synproxy_la_LDFLAGS = $(PLUGIN_LDFLAGS)
 endif
 
+if BUILD_PLUGIN_SYSEVENT
+pkglib_LTLIBRARIES += sysevent.la
+sysevent_la_SOURCES = src/sysevent.c
+sysevent_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBYAJL_CPPFLAGS)
+sysevent_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBYAJL_LDFLAGS)
+sysevent_la_LIBADD = $(BUILD_WITH_LIBYAJL_LIBS) libignorelist.la
+endif
+
 if BUILD_PLUGIN_SYSLOG
 pkglib_LTLIBRARIES += syslog.la
 syslog_la_SOURCES = src/syslog.c
diff --git a/README b/README
index f28d499..c8be7d3 100644 (file)
--- a/README
+++ b/README
@@ -54,6 +54,9 @@ Features
     - chrony
       Chrony daemon statistics: Local clock drift, offset to peers, etc.
 
+    - connectivity
+      Event-based interface status.
+
     - conntrack
       Number of nf_conntrack entries.
 
@@ -204,10 +207,6 @@ Features
       collectd without the need to start a heavy interpreter every interval.
       See collectd-lua(5) for details.
 
-    - lvm
-      Size of “Logical Volumes” (LV) and “Volume Groups” (VG) of Linux'
-      “Logical Volume Manager” (LVM).
-
     - madwifi
       Queries very detailed usage statistics from wireless LAN adapters and
       interfaces that use the Atheros chipset and the MadWifi driver.
@@ -347,6 +346,9 @@ Features
     - processes
       Process counts: Number of running, sleeping, zombie, ... processes.
 
+    - procevent
+      Listens for process starts and exits via netlink.
+
     - protocols
       Counts various aspects of network protocols such as IP, TCP, UDP, etc.
 
@@ -391,6 +393,9 @@ Features
       Acts as a StatsD server, reading values sent over the network from StatsD
       clients and calculating rates and other aggregates out of these values.
 
+    - sysevent
+      Listens to rsyslog events and submits matched values.
+
     - swap
       Pages swapped out onto hard disk or whatever is called `swap' by the OS..
 
@@ -836,10 +841,6 @@ Prerequisites
     Used by the `lua' plugin. Currently, Lua 5.1 and later are supported.
     <https://www.lua.org/>
 
-  * liblvm2 (optional)
-    Used by the `lvm' plugin.
-    <ftp://sources.redhat.com/pub/lvm2/>
-
   * libmemcached (optional)
     Used by the `memcachec' plugin to connect to a memcache daemon.
     <http://tangent.org/552/libmemcached.html>
index 18314e2..9fc40b1 100644 (file)
@@ -743,10 +743,10 @@ test_cxx_flags() {
 #
 AC_CHECK_FUNCS_ONCE([ \
     asprintf \
+    execvpe \
     getpwnam \
     getpwnam_r \
     if_indextoname \
-    setenv \
     setgroups \
     setlocale
   ]
@@ -858,6 +858,17 @@ AC_CHECK_FUNCS([socket],
 AM_CONDITIONAL([BUILD_WITH_LIBSOCKET], [test "x$socket_needs_socket" = "xyes"])
 AM_CONDITIONAL([BUILD_WITH_GNULIB], [test "x$socket_needs_gnulib" = "xyes"])
 
+AC_CHECK_FUNCS([inet_ntop],
+  [],
+  [
+    AC_CHECK_LIB([nsl], [inet_ntop],
+      [inet_ntop_needs_nsl="yes"],
+      [AC_MSG_ERROR([cannot find inet_ntop() in libnsl])]
+    )
+  ]
+)
+AM_CONDITIONAL([BUILD_WITH_LIBNSL], [test "x$inet_ntop_needs_nsl" = "xyes"])
+
 clock_gettime_needs_posix4="no"
 AC_CHECK_FUNCS([clock_gettime],
   [have_clock_gettime="yes"],
@@ -2711,6 +2722,7 @@ if test "x$withval" != "xno"; then
   else
     AC_MSG_RESULT([no])
     with_libgrpcpp="no (requires C++11 support)"
+    with_libprotobuf="no (<google/protobuf/util/time_util.h> requires C++11 support)"
   fi
 fi
 
@@ -3158,67 +3170,6 @@ AC_SUBST(BUILD_WITH_LIBLUA_CFLAGS)
 AC_SUBST(BUILD_WITH_LIBLUA_LIBS)
 # }}}
 
-# --with-liblvm2app {{{
-AC_ARG_WITH([liblvm2app],
-  [AS_HELP_STRING([--with-liblvm2app@<:@=PREFIX@:>@], [Path to liblvm2app.])],
-  [
-    if test "x$withval" = "xno"; then
-      with_liblvm2app="no"
-    else
-      with_liblvm2app="yes"
-      if test "x$withval" != "xyes"; then
-        with_liblvm2app_cppflags="-I$withval/include"
-        with_liblvm2app_ldflags="-L$withval/lib"
-      fi
-    fi
-  ],
-  [
-    if test "x$ac_system" = "xLinux"; then
-      with_liblvm2app="yes"
-    else
-      with_liblvm2app="no (Linux only library)"
-    fi
-  ]
-)
-
-if test "x$with_liblvm2app" = "xyes"; then
-  SAVE_CPPFLAGS="$CPPFLAGS"
-  CPPFLAGS="$CPPFLAGS $with_liblvm2app_cppflags"
-
-  AC_CHECK_HEADERS([lvm2app.h],
-    [with_liblvm2app="yes"],
-    [with_liblvm2app="no (lvm2app.h not found)"]
-  )
-
-  CPPFLAGS="$SAVE_CPPFLAGS"
-fi
-
-if test "x$with_liblvm2app" = "xyes"; then
-  SAVE_CPPFLAGS="$CPPFLAGS"
-  SAVE_LDFLAGS="$LDFLAGS"
-  CPPFLAGS="$CPPFLAGS $with_liblvm2app_cppflags"
-  LDFLAGS="$LDFLAGS $with_liblvm2app_ldflags"
-
-  AC_CHECK_LIB([lvm2app], [lvm_lv_get_property],
-    [with_liblvm2app="yes"],
-    [with_liblvm2app="no (Symbol 'lvm_lv_get_property' not found)"]
-  )
-
-  CPPFLAGS="$SAVE_CPPFLAGS"
-  LDFLAGS="$SAVE_LDFLAGS"
-fi
-
-if test "x$with_liblvm2app" = "xyes"; then
-  BUILD_WITH_LIBLVM2APP_CPPFLAGS="$with_liblvm2app_cppflags"
-  BUILD_WITH_LIBLVM2APP_LDFLAGS="$with_liblvm2app_ldflags"
-  BUILD_WITH_LIBLVM2APP_LIBS="-llvm2app"
-fi
-
-AC_SUBST([BUILD_WITH_LIBLVM2APP_CPPFLAGS])
-AC_SUBST([BUILD_WITH_LIBLVM2APP_LDFLAGS])
-AC_SUBST([BUILD_WITH_LIBLVM2APP_LIBS])
-# }}}
-
 # --with-libmemcached {{{
 AC_ARG_WITH([libmemcached],
   [AS_HELP_STRING([--with-libmemcached@<:@=PREFIX@:>@], [Path to libmemcached.])],
@@ -4618,7 +4569,7 @@ if test "x$withval" != "xno"; then
   AC_CHECK_LIB([protobuf], [main],
     [
       SAVE_CPPFLAGS="$CPPFLAGS"
-      CPPFLAGS="$with_libprotobuf_cppflags $PROTOBUF_CFLAGS"
+      CPPFLAGS="-std=c++11 $with_libprotobuf_cppflags $PROTOBUF_CFLAGS"
       if test "x$PROTOBUF_LIBS" = "x"
       then
         PROTOBUF_LIBS="-lprotobuf"
@@ -5448,7 +5399,7 @@ AC_ARG_WITH([libtokyotyrant],
       with_libtokyotyrant="$withval"
     else
       with_libtokyotyrant_cppflags="-I$withval/include"
-      with_libtokyotyrant_ldflags="-L$withval/include"
+      with_libtokyotyrant_ldflags="-L$withval/lib"
       with_libtokyotyrant_libs="-ltokyotyrant"
       with_libtokyotyrant="yes"
     fi
@@ -5567,7 +5518,7 @@ AC_ARG_WITH([libupsclient],
     else if test "x$withval" = "xyes"; then
       with_libupsclient="use_pkgconfig"
     else
-      if test -x "$withval"; then
+      if test -f "$withval" && test -x "$withval"; then
         with_libupsclient_config="$withval"
         with_libupsclient="use_libupsclient_config"
       else if test -x "$withval/bin/libupsclient-config"; then
@@ -6362,6 +6313,7 @@ plugin_battery="no"
 plugin_bind="no"
 plugin_ceph="no"
 plugin_cgroups="no"
+plugin_connectivity="no"
 plugin_conntrack="no"
 plugin_contextswitch="no"
 plugin_cpu="no"
@@ -6402,12 +6354,14 @@ plugin_pcie_errors="no"
 plugin_perl="no"
 plugin_pinba="no"
 plugin_processes="no"
+plugin_procevent="no"
 plugin_protocols="no"
 plugin_python="no"
 plugin_serial="no"
 plugin_smart="no"
 plugin_swap="no"
 plugin_synproxy="no"
+plugin_sysevent="no"
 plugin_tape="no"
 plugin_tcpconns="no"
 plugin_ted="no"
@@ -6444,7 +6398,6 @@ if test "x$ac_system" = "xLinux"; then
   plugin_ipc="yes"
   plugin_irq="yes"
   plugin_load="yes"
-  plugin_lvm="yes"
   plugin_mcelog="yes"
   plugin_memory="yes"
   plugin_nfs="yes"
@@ -6477,6 +6430,11 @@ if test "x$ac_system" = "xLinux"; then
   if test "x$with_libyajl" = "xyes" && test "x$with_libyajl2" = "xyes"; then
     plugin_ovs_events="yes"
     plugin_ovs_stats="yes"
+    plugin_procevent="yes"
+
+    if test "x$with_libmnl" = "xyes"; then
+      plugin_connectivity="yes"
+    fi
   fi
 
   if test "x$have_pci_regs_h" = "xyes"; then
@@ -6590,6 +6548,7 @@ fi
 
 if test "x$with_libyajl" = "xyes"; then
   plugin_ceph="yes"
+  plugin_sysevent="yes"
 fi
 
 if test "x$have_processor_info" = "xyes"; then
@@ -6786,6 +6745,8 @@ AC_PLUGIN([bind],                [$plugin_bind],              [ISC Bind nameserv
 AC_PLUGIN([ceph],                [$plugin_ceph],              [Ceph daemon statistics])
 AC_PLUGIN([cgroups],             [$plugin_cgroups],           [CGroups CPU usage accounting])
 AC_PLUGIN([chrony],              [yes],                       [Chrony statistics])
+AC_PLUGIN([check_uptime],        [yes],                       [Notify about uptime reset])
+AC_PLUGIN([connectivity],        [$plugin_connectivity],      [Network interface up/down events])
 AC_PLUGIN([conntrack],           [$plugin_conntrack],         [nf_conntrack statistics])
 AC_PLUGIN([contextswitch],       [$plugin_contextswitch],     [context switch statistics])
 AC_PLUGIN([cpu],                 [$plugin_cpu],               [CPU usage statistics])
@@ -6829,7 +6790,6 @@ AC_PLUGIN([log_logstash],        [$plugin_log_logstash],      [Logstash json_eve
 AC_PLUGIN([logfile],             [yes],                       [File logging plugin])
 AC_PLUGIN([lpar],                [$with_perfstat],            [AIX logical partitions statistics])
 AC_PLUGIN([lua],                 [$with_liblua],              [Lua plugin])
-AC_PLUGIN([lvm],                 [$with_liblvm2app],          [LVM statistics])
 AC_PLUGIN([madwifi],             [$have_linux_wireless_h],    [Madwifi wireless statistics])
 AC_PLUGIN([match_empty_counter], [yes],                       [The empty counter match])
 AC_PLUGIN([match_hashed],        [yes],                       [The hashed match])
@@ -6874,6 +6834,7 @@ AC_PLUGIN([ping],                [$with_liboping],            [Network latency s
 AC_PLUGIN([postgresql],          [$with_libpq],               [PostgreSQL database statistics])
 AC_PLUGIN([powerdns],            [yes],                       [PowerDNS statistics])
 AC_PLUGIN([processes],           [$plugin_processes],         [Process statistics])
+AC_PLUGIN([procevent],           [$plugin_procevent],         [Process event (start, stop) statistics])
 AC_PLUGIN([protocols],           [$plugin_protocols],         [Protocol (IP, TCP, ...) statistics])
 AC_PLUGIN([python],              [$plugin_python],            [Embed a Python interpreter])
 AC_PLUGIN([redis],               [$with_libhiredis],          [Redis plugin])
@@ -6889,6 +6850,7 @@ AC_PLUGIN([snmp_agent],          [$with_libnetsnmpagent],     [SNMP agent plugin
 AC_PLUGIN([statsd],              [yes],                       [StatsD plugin])
 AC_PLUGIN([swap],                [$plugin_swap],              [Swap usage statistics])
 AC_PLUGIN([synproxy],            [$plugin_synproxy],          [Synproxy stats plugin])
+AC_PLUGIN([sysevent],            [$plugin_sysevent],          [rsyslog events])
 AC_PLUGIN([syslog],              [$have_syslog],              [Syslog logging plugin])
 AC_PLUGIN([table],               [yes],                       [Parsing of tabular data])
 AC_PLUGIN([tail],                [yes],                       [Parsing of logfiles])
@@ -7146,7 +7108,6 @@ AC_MSG_RESULT([    libkstat  . . . . . . $with_kstat])
 AC_MSG_RESULT([    libkvm  . . . . . . . $with_libkvm])
 AC_MSG_RESULT([    libldap . . . . . . . $with_libldap])
 AC_MSG_RESULT([    liblua  . . . . . . . $with_liblua])
-AC_MSG_RESULT([    liblvm2app  . . . . . $with_liblvm2app])
 AC_MSG_RESULT([    libmemcached  . . . . $with_libmemcached])
 AC_MSG_RESULT([    libmicrohttpd . . . . $with_libmicrohttpd])
 AC_MSG_RESULT([    libmnl  . . . . . . . $with_libmnl])
@@ -7215,6 +7176,8 @@ AC_MSG_RESULT([    bind  . . . . . . . . $enable_bind])
 AC_MSG_RESULT([    ceph  . . . . . . . . $enable_ceph])
 AC_MSG_RESULT([    cgroups . . . . . . . $enable_cgroups])
 AC_MSG_RESULT([    chrony. . . . . . . . $enable_chrony])
+AC_MSG_RESULT([    check_uptime. . . . . $enable_check_uptime])
+AC_MSG_RESULT([    connectivity. . . . . $enable_connectivity])
 AC_MSG_RESULT([    conntrack . . . . . . $enable_conntrack])
 AC_MSG_RESULT([    contextswitch . . . . $enable_contextswitch])
 AC_MSG_RESULT([    cpu . . . . . . . . . $enable_cpu])
@@ -7258,7 +7221,6 @@ AC_MSG_RESULT([    logfile . . . . . . . $enable_logfile])
 AC_MSG_RESULT([    log_logstash  . . . . $enable_log_logstash])
 AC_MSG_RESULT([    lpar  . . . . . . . . $enable_lpar])
 AC_MSG_RESULT([    lua . . . . . . . . . $enable_lua])
-AC_MSG_RESULT([    lvm . . . . . . . . . $enable_lvm])
 AC_MSG_RESULT([    madwifi . . . . . . . $enable_madwifi])
 AC_MSG_RESULT([    match_empty_counter . $enable_match_empty_counter])
 AC_MSG_RESULT([    match_hashed  . . . . $enable_match_hashed])
@@ -7302,6 +7264,7 @@ AC_MSG_RESULT([    ping  . . . . . . . . $enable_ping])
 AC_MSG_RESULT([    postgresql  . . . . . $enable_postgresql])
 AC_MSG_RESULT([    powerdns  . . . . . . $enable_powerdns])
 AC_MSG_RESULT([    processes . . . . . . $enable_processes])
+AC_MSG_RESULT([    procevent . . . . . . $enable_procevent])
 AC_MSG_RESULT([    protocols . . . . . . $enable_protocols])
 AC_MSG_RESULT([    python  . . . . . . . $enable_python])
 AC_MSG_RESULT([    redis . . . . . . . . $enable_redis])
@@ -7317,6 +7280,7 @@ AC_MSG_RESULT([    snmp_agent  . . . . . $enable_snmp_agent])
 AC_MSG_RESULT([    statsd  . . . . . . . $enable_statsd])
 AC_MSG_RESULT([    swap  . . . . . . . . $enable_swap])
 AC_MSG_RESULT([    synproxy  . . . . . . $enable_synproxy])
+AC_MSG_RESULT([    sysevent. . . . . . . $enable_sysevent])
 AC_MSG_RESULT([    syslog  . . . . . . . $enable_syslog])
 AC_MSG_RESULT([    table . . . . . . . . $enable_table])
 AC_MSG_RESULT([    tail_csv  . . . . . . $enable_tail_csv])
index 246fcb5..864ef1b 100644 (file)
@@ -53,6 +53,7 @@
 %define with_ceph 0%{!?_without_ceph:1}
 %define with_cgroups 0%{!?_without_cgroups:1}
 %define with_chrony 0%{!?_without_chrony:1}
+%define with_connectivity 0%{!?_without_connectivity:1}
 %define with_conntrack 0%{!?_without_conntrack:1}
 %define with_contextswitch 0%{!?_without_contextswitch:1}
 %define with_cpu 0%{!?_without_cpu:1}
@@ -89,7 +90,6 @@
 %define with_log_logstash 0%{!?_without_log_logstash:1}
 %define with_logfile 0%{!?_without_logfile:1}
 %define with_lua 0%{!?_without_lua:1}
-%define with_lvm 0%{!?_without_lvm:1}
 %define with_madwifi 0%{!?_without_madwifi:1}
 %define with_mbmon 0%{!?_without_mbmon:1}
 %define with_mcelog 0%{!?_without_mcelog:1}
 %define with_postgresql 0%{!?_without_postgresql:1}
 %define with_powerdns 0%{!?_without_powerdns:1}
 %define with_processes 0%{!?_without_processes:1}
+%define with_procevent 0%{!?_without_procevent:1}
 %define with_protocols 0%{!?_without_protocols:1}
 %define with_python 0%{!?_without_python:1}
 %define with_redis 0%{!?_without_redis:1}
 %define with_statsd 0%{!?_without_statsd:1}
 %define with_swap 0%{!?_without_swap:1}
 %define with_synproxy 0%{!?_without_synproxy:0}
+%define with_sysevent 0%{!?_without_sysevent:1}
 %define with_syslog 0%{!?_without_syslog:1}
 %define with_table 0%{!?_without_table:1}
 %define with_tail 0%{!?_without_tail:1}
 %define with_gmond 0
 %define with_iptables 0
 %define with_ipvs 0
-%define with_lvm 0
 %define with_modbus 0
 %define with_netlink 0
 %define with_redis 0
 
 # Plugins not buildable on RHEL < 7
 %if 0%{?rhel} && 0%{?rhel} < 7
+%define with_connectivity 0
 %define with_cpusleep 0
 %define with_gps 0
 %define with_mqtt 0
 %define with_ovs_events 0
 %define with_ovs_stats 0
+%define with_procevent 0
 %define with_redis 0
 %define with_rrdcached 0
+%define with_sysevent 0
 %define with_write_redis 0
 %define with_write_riemann 0
 %define with_xmms 0
 
 Summary:       Statistics collection and monitoring daemon
 Name:          collectd
-Version:       5.9.0
-Release:       1%{?dist}
+Version:       5.9.2
+Release:       2%{?dist}
 URL:           https://collectd.org
 Source:                https://collectd.org/files/%{name}-%{version}.tar.bz2
 License:       GPLv2
@@ -380,6 +384,16 @@ Requires:      %{name}%{?_isa} = %{version}-%{release}
 Chrony plugin for collectd
 %endif
 
+%if %{with_connectivity}
+%package connectivity
+Summary:       Connectivity plugin for collectd
+Group:         System Environment/Daemons
+Requires:      %{name}%{?_isa} = %{version}-%{release}
+BuildRequires: libmnl-devel, yajl-devel
+%description connectivity
+Monitors network interface up/down status via netlink library.
+%endif
+
 %if %{with_curl}
 %package curl
 Summary:       Curl plugin for collectd
@@ -574,17 +588,6 @@ The Lua plugin embeds a Lua interpreter into collectd and exposes the
 application programming interface (API) to Lua scripts.
 %endif
 
-%if %{with_lvm}
-%package lvm
-Summary:       LVM plugin for collectd
-Group:         System Environment/Daemons
-Requires:      %{name}%{?_isa} = %{version}-%{release}
-BuildRequires: lvm2-devel
-%description lvm
-This plugin collects size of “Logical Volumes” (LV) and “Volume Groups” (VG)
-of Linux' “Logical Volume Manager” (LVM).
-%endif
-
 %if %{with_mcelog}
 %package mcelog
 Summary:       Mcelog plugin for collectd
@@ -792,6 +795,16 @@ The PostgreSQL plugin connects to and executes SQL statements on a PostgreSQL
 database.
 %endif
 
+%if %{with_procevent}
+%package procevent
+Summary:       Processes event plugin for collectd
+Group:         System Environment/Daemons
+Requires:      %{name}%{?_isa} = %{version}-%{release}
+BuildRequires: yajl-devel
+%description procevent
+Monitors process starts/stops via netlink library.
+%endif
+
 %if %{with_python}
 %package python
 Summary:       Python plugin for collectd
@@ -891,6 +904,16 @@ BuildRequires:     net-snmp-devel
 This plugin for collectd to support AgentX integration.
 %endif
 
+%if %{with_sysevent}
+%package sysevent
+Summary:       Rsyslog event plugin for collectd
+Group:         System Environment/Daemons
+Requires:      %{name}%{?_isa} = %{version}-%{release}
+BuildRequires: yajl-devel
+%description sysevent
+Monitors rsyslog for system events.
+%endif
+
 %if %{with_varnish}
 %package varnish
 Summary:       Varnish plugin for collectd
@@ -1136,6 +1159,12 @@ Collectd utilities
 %define _with_chrony --disable-chrony
 %endif
 
+%if %{with_connectivity}
+%define _with_connectivity --enable-connectivity
+%else
+%define _with_connectivity --disable-connectivity
+%endif
+
 %if %{with_conntrack}
 %define _with_conntrack --enable-conntrack
 %else
@@ -1400,12 +1429,6 @@ Collectd utilities
 %define _with_lua --disable-lua
 %endif
 
-%if %{with_lvm}
-%define _with_lvm --enable-lvm
-%else
-%define _with_lvm --disable-lvm
-%endif
-
 %if %{with_madwifi}
 %define _with_madwifi --enable-madwifi
 %else
@@ -1634,6 +1657,12 @@ Collectd utilities
 %define _with_processes --disable-processes
 %endif
 
+%if %{with_procevent}
+%define _with_procevent --enable-procevent
+%else
+%define _with_procevent --disable-procevent
+%endif
+
 %if %{with_protocols}
 %define _with_protocols --enable-protocols
 %else
@@ -1729,6 +1758,12 @@ Collectd utilities
 %define _with_synproxy --disable-synproxy
 %endif
 
+%if %{with_sysevent}
+%define _with_sysevent --enable-sysevent
+%else
+%define _with_sysevent --disable-sysevent
+%endif
+
 %if %{with_syslog}
 %define _with_syslog --enable-syslog
 %else
@@ -1992,6 +2027,7 @@ Collectd utilities
        %{?_with_ceph} \
        %{?_with_cgroups} \
        %{?_with_chrony} \
+       %{?_with_connectivity} \
        %{?_with_conntrack} \
        %{?_with_contextswitch} \
        %{?_with_cpufreq} \
@@ -2034,7 +2070,6 @@ Collectd utilities
        %{?_with_logfile} \
        %{?_with_lpar} \
        %{?_with_lua} \
-       %{?_with_lvm} \
        %{?_with_madwifi} \
        %{?_with_mbmon} \
        %{?_with_mcelog} \
@@ -2073,6 +2108,7 @@ Collectd utilities
        %{?_with_postgresql} \
        %{?_with_powerdns} \
        %{?_with_processes} \
+       %{?_with_procevent} \
        %{?_with_protocols} \
        %{?_with_python} \
        %{?_with_redis} \
@@ -2088,6 +2124,7 @@ Collectd utilities
        %{?_with_statsd} \
        %{?_with_swap} \
        %{?_with_synproxy} \
+       %{?_with_sysevent} \
        %{?_with_syslog} \
        %{?_with_table} \
        %{?_with_tail_csv} \
@@ -2539,6 +2576,11 @@ fi
 %{_libdir}/%{name}/chrony.so
 %endif
 
+%if %{with_connectivity}
+%files connectivity
+%{_libdir}/%{name}/connectivity.so
+%endif
+
 %if %{with_curl}
 %files curl
 %{_libdir}/%{name}/curl.so
@@ -2648,11 +2690,6 @@ fi
 %{_libdir}/%{name}/lua.so
 %endif
 
-%if %{with_lvm}
-%files lvm
-%{_libdir}/%{name}/lvm.so
-%endif
-
 %if %{with_memcachec}
 %files memcachec
 %{_libdir}/%{name}/memcachec.so
@@ -2748,6 +2785,11 @@ fi
 %{_libdir}/%{name}/postgresql.so
 %endif
 
+%if %{with_procevent}
+%files procevent
+%{_libdir}/%{name}/procevent.so
+%endif
+
 %if %{with_python}
 %files python
 %{_mandir}/man5/collectd-python*
@@ -2795,6 +2837,11 @@ fi
 %{_libdir}/%{name}/snmp_agent.so
 %endif
 
+%if %{with_sysevent}
+%files sysevent
+%{_libdir}/%{name}/sysevent.so
+%endif
+
 %if %{with_varnish}
 %files varnish
 %{_libdir}/%{name}/varnish.so
@@ -2857,6 +2904,9 @@ fi
 %doc contrib/
 
 %changelog
+* Mon Oct 14 2019 Ruben Kerkhof <ruben@rubenkerkhof.com> - 5.9.2-2
+- Remove lvm plugin, liblvmapp has been deprecated upstream
+
 * Fri Jun 14 2019 Fabien Wernli <rpmbuild@faxmodem.org> - 5.9.0-1
 - add code for write_stackdriver (disabled for now)
 - add code for gpu_nvidia (disabled for now)
diff --git a/src/check_uptime.c b/src/check_uptime.c
new file mode 100644 (file)
index 0000000..33363b5
--- /dev/null
@@ -0,0 +1,273 @@
+/**
+ * collectd - src/check_uptime.c
+ * Copyright (C) 2007-2019  Florian Forster
+ * Copyright (C) 2019  Pavel V. Rochnyack
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Author:
+ *   Florian octo Forster <octo at collectd.org>
+ *   Pavel Rochnyak <pavel2000 ngs.ru>
+ **/
+
+#include "collectd.h"
+#include "plugin.h"
+#include "utils/avltree/avltree.h"
+#include "utils/common/common.h"
+#include "utils_cache.h"
+
+/* Types are registered only in `config` phase, so access is not protected by
+ * locks */
+c_avl_tree_t *types_tree = NULL;
+
+static int format_uptime(unsigned long uptime_sec, char *buf, size_t bufsize) {
+
+  unsigned int uptime_days = uptime_sec / 24 / 3600;
+  uptime_sec -= uptime_days * 24 * 3600;
+  unsigned int uptime_hours = uptime_sec / 3600;
+  uptime_sec -= uptime_hours * 3600;
+  unsigned int uptime_mins = uptime_sec / 60;
+  uptime_sec -= uptime_mins * 60;
+
+  int ret = 0;
+  if (uptime_days) {
+    ret += snprintf(buf + ret, bufsize - ret, " %u day(s)", uptime_days);
+  }
+  if (uptime_days || uptime_hours) {
+    ret += snprintf(buf + ret, bufsize - ret, " %u hour(s)", uptime_hours);
+  }
+  if (uptime_days || uptime_hours || uptime_mins) {
+    ret += snprintf(buf + ret, bufsize - ret, " %u min", uptime_mins);
+  }
+  ret += snprintf(buf + ret, bufsize - ret, " %lu sec.", uptime_sec);
+  return ret;
+}
+
+static int cu_notify(enum cache_event_type_e event_type, const value_list_t *vl,
+                     gauge_t old_uptime, gauge_t new_uptime) {
+  notification_t n;
+  NOTIFICATION_INIT_VL(&n, vl);
+
+  int status;
+  char *buf = n.message;
+  size_t bufsize = sizeof(n.message);
+
+  n.time = vl->time;
+
+  const char *service = "Service";
+  if (strcmp(vl->plugin, "uptime") == 0)
+    service = "Host";
+
+  switch (event_type) {
+  case CE_VALUE_NEW:
+    n.severity = NOTIF_OKAY;
+    status = snprintf(buf, bufsize, "%s is running.", service);
+    buf += status;
+    bufsize -= status;
+    break;
+  case CE_VALUE_UPDATE:
+    n.severity = NOTIF_WARNING;
+    status = snprintf(buf, bufsize, "%s just restarted.", service);
+    buf += status;
+    bufsize -= status;
+    break;
+  case CE_VALUE_EXPIRED:
+    n.severity = NOTIF_FAILURE;
+    status = snprintf(buf, bufsize, "%s is unreachable.", service);
+    buf += status;
+    bufsize -= status;
+    break;
+  }
+
+  if (!isnan(old_uptime)) {
+    status = snprintf(buf, bufsize, " Uptime was:");
+    buf += status;
+    bufsize -= status;
+
+    status = format_uptime(old_uptime, buf, bufsize);
+    buf += status;
+    bufsize -= status;
+
+    plugin_notification_meta_add_double(&n, "LastValue", old_uptime);
+  }
+
+  if (!isnan(new_uptime)) {
+    status = snprintf(buf, bufsize, " Uptime now:");
+    buf += status;
+    bufsize -= status;
+
+    status = format_uptime(new_uptime, buf, bufsize);
+    buf += status;
+    bufsize -= status;
+
+    plugin_notification_meta_add_double(&n, "CurrentValue", new_uptime);
+  }
+
+  plugin_dispatch_notification(&n);
+
+  plugin_notification_meta_free(n.meta);
+  return 0;
+}
+
+static int cu_cache_event(cache_event_t *event,
+                          __attribute__((unused)) user_data_t *ud) {
+  gauge_t values_history[2];
+
+  /* For CE_VALUE_EXPIRED */
+  int ret;
+  value_t *values;
+  size_t values_num;
+  gauge_t old_uptime = NAN;
+
+  switch (event->type) {
+  case CE_VALUE_NEW:
+    DEBUG("check_uptime: CE_VALUE_NEW, %s", event->value_list_name);
+    if (c_avl_get(types_tree, event->value_list->type, NULL) == 0) {
+      event->ret = 1;
+      assert(event->value_list->values_len > 0);
+      cu_notify(CE_VALUE_NEW, event->value_list, NAN /* old */,
+                event->value_list->values[0].gauge /* new */);
+    }
+    break;
+  case CE_VALUE_UPDATE:
+    DEBUG("check_uptime: CE_VALUE_UPDATE, %s", event->value_list_name);
+    if (uc_get_history_by_name(event->value_list_name, values_history, 2, 1)) {
+      ERROR("check_uptime plugin: Failed to get value history for %s.",
+            event->value_list_name);
+    } else {
+      if (!isnan(values_history[0]) && !isnan(values_history[1]) &&
+          values_history[0] < values_history[1]) {
+        cu_notify(CE_VALUE_UPDATE, event->value_list,
+                  values_history[1] /* old */, values_history[0] /* new */);
+      }
+    }
+    break;
+  case CE_VALUE_EXPIRED:
+    DEBUG("check_uptime: CE_VALUE_EXPIRED, %s", event->value_list_name);
+    ret = uc_get_value_by_name(event->value_list_name, &values, &values_num);
+    if (ret == 0) {
+      old_uptime = values[0].gauge;
+      sfree(values);
+    }
+
+    cu_notify(CE_VALUE_EXPIRED, event->value_list, old_uptime, NAN /* new */);
+    break;
+  }
+  return 0;
+}
+
+static int cu_config(oconfig_item_t *ci) {
+  if (types_tree == NULL) {
+    types_tree = c_avl_create((int (*)(const void *, const void *))strcmp);
+    if (types_tree == NULL) {
+      ERROR("check_uptime plugin: c_avl_create failed.");
+      return -1;
+    }
+  }
+
+  for (int i = 0; i < ci->children_num; ++i) {
+    oconfig_item_t *child = ci->children + i;
+    if (strcasecmp("Type", child->key) == 0) {
+      if ((child->values_num != 1) ||
+          (child->values[0].type != OCONFIG_TYPE_STRING)) {
+        WARNING("check_uptime plugin: The `Type' option needs exactly one "
+                "string argument.");
+        return -1;
+      }
+      char *type = child->values[0].value.string;
+
+      if (c_avl_get(types_tree, type, NULL) == 0) {
+        ERROR("check_uptime plugin: Type `%s' already added.", type);
+        return -1;
+      }
+
+      char *type_copy = strdup(type);
+      if (type_copy == NULL) {
+        ERROR("check_uptime plugin: strdup failed.");
+        return -1;
+      }
+
+      int status = c_avl_insert(types_tree, type_copy, NULL);
+      if (status != 0) {
+        ERROR("check_uptime plugin: c_avl_insert failed.");
+        sfree(type_copy);
+        return -1;
+      }
+    } else
+      WARNING("check_uptime plugin: Ignore unknown config option `%s'.",
+              child->key);
+  }
+
+  return 0;
+}
+
+static int cu_init(void) {
+  if (types_tree == NULL) {
+    types_tree = c_avl_create((int (*)(const void *, const void *))strcmp);
+    if (types_tree == NULL) {
+      ERROR("check_uptime plugin: c_avl_create failed.");
+      return -1;
+    }
+    /* Default configuration */
+    char *type = strdup("uptime");
+    if (type == NULL) {
+      ERROR("check_uptime plugin: strdup failed.");
+      return -1;
+    }
+    int status = c_avl_insert(types_tree, type, NULL);
+    if (status != 0) {
+      ERROR("check_uptime plugin: c_avl_insert failed.");
+      sfree(type);
+      return -1;
+    }
+  }
+
+  int ret = 0;
+  char *type;
+  void *val;
+  c_avl_iterator_t *iter = c_avl_get_iterator(types_tree);
+  while (c_avl_iterator_next(iter, (void *)&type, (void *)&val) == 0) {
+    data_set_t const *ds = plugin_get_ds(type);
+    if (ds == NULL) {
+      ERROR("check_uptime plugin: Failed to look up type \"%s\".", type);
+      ret = -1;
+      continue;
+    }
+    if (ds->ds_num != 1) {
+      ERROR("check_uptime plugin: The type \"%s\" has %" PRIsz " data sources. "
+            "Only types with a single GAUGE data source are supported.",
+            ds->type, ds->ds_num);
+      ret = -1;
+      continue;
+    }
+    if (ds->ds[0].type != DS_TYPE_GAUGE) {
+      ERROR("check_uptime plugin: The type \"%s\" has wrong data source type. "
+            "Only types with a single GAUGE data source are supported.",
+            ds->type);
+      ret = -1;
+      continue;
+    }
+  }
+  c_avl_iterator_destroy(iter);
+
+  if (ret == 0)
+    plugin_register_cache_event("check_uptime", cu_cache_event, NULL);
+
+  return ret;
+}
+
+void module_register(void) {
+  plugin_register_complex_config("check_uptime", cu_config);
+  plugin_register_init("check_uptime", cu_init);
+}
index 136681d..63db8b1 100644 (file)
 #@BUILD_PLUGIN_CEPH_TRUE@LoadPlugin ceph
 #@BUILD_PLUGIN_CGROUPS_TRUE@LoadPlugin cgroups
 #@BUILD_PLUGIN_CHRONY_TRUE@LoadPlugin chrony
+#@BUILD_PLUGIN_CHECK_UPTIME_TRUE@LoadPlugin check_uptime
+#@BUILD_PLUGIN_CONNECTIVITY_TRUE@LoadPlugin connectivity
 #@BUILD_PLUGIN_CONNTRACK_TRUE@LoadPlugin conntrack
 #@BUILD_PLUGIN_CONTEXTSWITCH_TRUE@LoadPlugin contextswitch
 @BUILD_PLUGIN_CPU_TRUE@@BUILD_PLUGIN_CPU_TRUE@LoadPlugin cpu
 @BUILD_PLUGIN_LOAD_TRUE@@BUILD_PLUGIN_LOAD_TRUE@LoadPlugin load
 #@BUILD_PLUGIN_LPAR_TRUE@LoadPlugin lpar
 #@BUILD_PLUGIN_LUA_TRUE@LoadPlugin lua
-#@BUILD_PLUGIN_LVM_TRUE@LoadPlugin lvm
 #@BUILD_PLUGIN_MADWIFI_TRUE@LoadPlugin madwifi
 #@BUILD_PLUGIN_MBMON_TRUE@LoadPlugin mbmon
 #@BUILD_PLUGIN_MCELOG_TRUE@LoadPlugin mcelog
 #@BUILD_PLUGIN_POSTGRESQL_TRUE@LoadPlugin postgresql
 #@BUILD_PLUGIN_POWERDNS_TRUE@LoadPlugin powerdns
 #@BUILD_PLUGIN_PROCESSES_TRUE@LoadPlugin processes
+#@BUILD_PLUGIN_PROCEVENT_TRUE@LoadPlugin procevent
 #@BUILD_PLUGIN_PROTOCOLS_TRUE@LoadPlugin protocols
 #@BUILD_PLUGIN_PYTHON_TRUE@LoadPlugin python
 #@BUILD_PLUGIN_REDIS_TRUE@LoadPlugin redis
 #@BUILD_PLUGIN_SNMP_AGENT_TRUE@LoadPlugin snmp_agent
 #@BUILD_PLUGIN_STATSD_TRUE@LoadPlugin statsd
 #@BUILD_PLUGIN_SWAP_TRUE@LoadPlugin swap
+#@BUILD_PLUGIN_SYSEVENT_TRUE@LoadPlugin sysevent
 #@BUILD_PLUGIN_TABLE_TRUE@LoadPlugin table
 #@BUILD_PLUGIN_TAIL_TRUE@LoadPlugin tail
 #@BUILD_PLUGIN_TAIL_CSV_TRUE@LoadPlugin tail_csv
 #      Timeout "2"
 #</Plugin>
 
+#<Plugin connectivity>
+#  Interface eth0
+#</Plugin>
+
 #<Plugin cgroups>
 #  CGroup "libvirt"
 #  IgnoreSelected false
 #      </Process>
 #</Plugin>
 
+#<Plugin "procevent">
+#  BufferLength 10
+#  ProcessRegex "/^ovs.*$/" 
+#  Process tuned
+#</Plugin>
+
 #<Plugin protocols>
 #      Value "/^Tcp:/"
 #      IgnoreSelected false
 #      ReportIO true
 #</Plugin>
 
+#<Plugin sysevent>
+#       Listen "127.0.0.1" "6666"
+#       BufferSize 1024
+#       BufferLength 10
+#       RegexFilter "regex"
+#</Plugin>
+
 #<Plugin table>
 #      <Table "/proc/slabinfo">
 #              #Plugin "table"
index ed49195..cda1002 100644 (file)
@@ -1548,6 +1548,35 @@ at all, B<all> cgroups are selected.
 
 =back
 
+=head2 Plugin C<check_uptime>
+
+The I<check_uptime plugin> designed to check and notify about host or service
+status based on I<uptime> metric.
+
+When new metric of I<uptime> type appears in cache, OK notification is sent.
+When new value for metric is less than previous value, WARNING notification is
+sent about host/service restart.
+When no new updates comes for metric and cache entry expires, then FAILURE
+notification is sent about unreachable host or service.
+
+By default (when no explicit configuration), plugin checks for I<uptime> metric.
+
+B<Synopsis:>
+
+ <Plugin "check_uptime">
+   Type "uptime"
+   Type "my_uptime_type"
+ </Plugin>
+
+=over 4
+
+=item B<Type> I<Type>
+
+Metric type to check for status/values. The type should consist single GAUGE
+data source.
+
+=back
+
 =head2 Plugin C<chrony>
 
 The C<chrony> plugin collects ntp data from a B<chronyd> server, such as clock
@@ -1574,6 +1603,47 @@ Connection timeout in seconds. Defaults to B<2>.
 
 =back
 
+=head2 Plugin Connectivity
+
+connectivity - Documentation of collectd's C<connectivity plugin>
+
+
+  LoadPlugin connectivity
+  # ...
+  <Plugin connectivity>
+    Interface eth0
+  </Plugin>
+
+The C<connectivity plugin> queries interface status using netlink (man 7 netlink) which provides information about network interfaces via the NETLINK_ROUTE family (man 7 rtnetlink). The plugin translates the value it receives to collectd's internal format and, depending on the write plugins you have loaded, it may be written to disk or submitted to another instance.
+The plugin listens to interfaces enumerated within the plugin configuration (see below).  If no interfaces are listed, then the default is for all interfaces to be monitored.
+
+This example shows C<connectivity plugin> monitoring all interfaces.
+LoadPlugin connectivity
+<Plugin connectivity>
+</Plugin>
+
+This example shows C<connectivity plugin> monitoring 2 interfaces, "eth0" and "eth1".
+LoadPlugin connectivity
+<Plugin connectivity>
+  Interface eth0
+  Interface eth1
+</Plugin>
+
+This example shows C<connectivity plugin> monitoring all interfaces except "eth1".
+LoadPlugin connectivity
+<Plugin connectivity>
+  Interface eth1
+  IgnoreSelected true
+</Plugin>
+=over 4
+
+=item B<Interface> I<interface_name>
+
+interface(s) to monitor connect to. 
+
+=back
+
 =head2 Plugin C<conntrack>
 
 This plugin collects IP conntrack statistics.
@@ -4758,7 +4828,7 @@ Hostname of the database server. Defaults to B<localhost>.
 
 Username to use when connecting to the database. The user does not have to be
 granted any privileges (which is synonym to granting the C<USAGE> privilege),
-unless you want to collectd replication statistics (see B<MasterStats> and
+unless you want to collect replication statistics (see B<MasterStats> and
 B<SlaveStats> below). In this case, the user needs the C<REPLICATION CLIENT>
 (or C<SUPER>) privileges. Else, any existing MySQL user will do.
 
@@ -4808,9 +4878,10 @@ or SQL threads are not running. Defaults to B<false>.
 
 =item B<WsrepStats> I<true|false>
 
- Enable the collection of wsrep plugin statistics, used in Master-Master
- replication setups like in MySQL Galera/Percona XtraDB Cluster.
- User needs only privileges to execute 'SHOW GLOBAL STATUS'
+Enable the collection of wsrep plugin statistics, used in Master-Master
+replication setups like in MySQL Galera/Percona XtraDB Cluster.
+User needs only privileges to execute 'SHOW GLOBAL STATUS'.
+Defaults to B<false>.
 
 =item B<ConnectTimeout> I<Seconds>
 
@@ -7314,6 +7385,40 @@ reporting the corresponding processes only. Outside of B<Process> and
 B<ProcessMatch> blocks these options set the default value for subsequent
 matches.
 
+=head2 Plugin C<procevent>
+The I<procevent> plugin monitors when processes start (EXEC) and stop (EXIT).
+B<Synopsis:>
+  <Plugin procevent>
+    BufferLength 10
+    Process "name"
+    ProcessRegex "regex"
+  </Plugin>
+B<Options:>
+=over 4
+=item B<BufferLength> I<length>
+Maximum number of process events that can be stored in plugin's ring buffer.
+By default, this is set to 10.  Once an event has been read, its location
+becomes available for storing a new event.
+=item B<Process> I<name>
+Enumerate a process name to monitor.  All processes that match this exact
+name will be monitored for EXECs and EXITs.
+
+=item B<ProcessRegex> I<regex>
+Enumerate a process pattern to monitor.  All processes that match this 
+regular expression will be monitored for EXECs and EXITs.
+=back
+
 =head2 Plugin C<protocols>
 
 Collects a lot of information about various network protocols, such as I<IP>,
@@ -8247,6 +8352,70 @@ or is not reliable.
 
 =back
 
+=head2 Plugin C<sysevent>
+The I<sysevent> plugin monitors rsyslog messages.
+B<Synopsis:>
+  <Plugin sysevent>
+    Listen "192.168.0.2" "6666"
+    BufferSize 1024
+    BufferLength 10
+    RegexFilter "regex"
+  </Plugin>
+
+  rsyslog should be configured such that it sends data to the IP and port you
+  include in the plugin configuration.  For example, given the configuration
+  above, something like this would be set in /etc/rsyslog.conf:
+
+    if $programname != 'collectd' then
+    *.* @192.168.0.2:6666
+
+  This plugin is designed to consume JSON rsyslog data, so a more complete
+  rsyslog configuration would look like so (where we define a JSON template
+  and use it when sending data to our IP and port):
+
+    $template ls_json,"{%timestamp:::date-rfc3339,jsonf:@timestamp%, \
+    %source:::jsonf:@source_host%,\"@source\":\"syslog://%fromhost-ip:::json%\", \
+    \"@message\":\"%timestamp% %app-name%:%msg:::json%\",\"@fields\": \
+    {%syslogfacility-text:::jsonf:facility%,%syslogseverity:::jsonf:severity-num%, \
+    %syslogseverity-text:::jsonf:severity%,%programname:::jsonf:program%, \
+    %procid:::jsonf:processid%}}"
+
+    if $programname != 'collectd' then
+    *.* @192.168.0.2:6666;ls_json
+
+  Please note that these rsyslog.conf examples are *not* complete, as rsyslog
+  requires more than these options in the configuration file.  These examples 
+  are meant to demonstration the proper remote logging and JSON format syntax.
+
+B<Options:>
+=over 4
+=item B<Listen> I<host> I<port>
+Listen on this IP on this port for incoming rsyslog messages.
+
+=item B<BufferSize> I<length>
+Maximum allowed size for incoming rsyslog messages.  Messages that exceed 
+this number will be truncated to this size.  Default is 4096 bytes.
+
+=item B<BufferLength> I<length>
+Maximum number of rsyslog events that can be stored in plugin's ring buffer.
+By default, this is set to 10.  Once an event has been read, its location
+becomes available for storing a new event.
+
+=item B<RegexFilter> I<regex>
+Enumerate a regex filter to apply to all incoming rsyslog messages.  If a
+message matches this filter, it will be published.
+=back
+
 =head2 Plugin C<syslog>
 
 =over 4
index df83b50..3974fd0 100644 (file)
@@ -81,8 +81,7 @@
 extern char *optarg;
 extern int optind;
 
-/* ssnprintf returns zero on success, one if truncation occurred
-   and a negative integer onerror. */
+/* _ssnprintf returns result from vsnprintf (consistent with snprintf) */
 static int _ssnprintf(char *str, size_t sz, const char *format, ...) {
   va_list ap;
   va_start(ap, format);
@@ -91,10 +90,7 @@ static int _ssnprintf(char *str, size_t sz, const char *format, ...) {
 
   va_end(ap);
 
-  if (ret < 0) {
-    return ret;
-  }
-  return (size_t)ret >= sz;
+  return ret;
 } /* int _ssnprintf */
 
 __attribute__((noreturn)) static void exit_usage(const char *name, int status) {
diff --git a/src/connectivity.c b/src/connectivity.c
new file mode 100644 (file)
index 0000000..45b65aa
--- /dev/null
@@ -0,0 +1,1032 @@
+/**
+ * collectd - src/connectivity.c
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Red Hat NFVPE
+ *     Andrew Bays <abays at redhat.com>
+ *     Aneesh Puttur <aputtur at redhat.com>
+ **/
+
+#include "collectd.h"
+
+#include "plugin.h"
+#include "utils/common/common.h"
+#include "utils/ignorelist/ignorelist.h"
+#include "utils_complain.h"
+
+#include <asm/types.h>
+#include <errno.h>
+#include <net/if.h>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <libmnl/libmnl.h>
+#include <linux/netlink.h>
+#include <linux/rtnetlink.h>
+
+#include <yajl/yajl_common.h>
+#include <yajl/yajl_gen.h>
+#if HAVE_YAJL_YAJL_VERSION_H
+#include <yajl/yajl_version.h>
+#endif
+#if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
+#define HAVE_YAJL_V2 1
+#endif
+
+#define MYPROTO NETLINK_ROUTE
+
+#define LINK_STATE_DOWN 0
+#define LINK_STATE_UP 1
+#define LINK_STATE_UNKNOWN 2
+
+#define CONNECTIVITY_DOMAIN_FIELD "domain"
+#define CONNECTIVITY_DOMAIN_VALUE "stateChange"
+#define CONNECTIVITY_EVENT_ID_FIELD "eventId"
+#define CONNECTIVITY_EVENT_NAME_FIELD "eventName"
+#define CONNECTIVITY_EVENT_NAME_DOWN_VALUE "down"
+#define CONNECTIVITY_EVENT_NAME_UP_VALUE "up"
+#define CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
+#define CONNECTIVITY_PRIORITY_FIELD "priority"
+#define CONNECTIVITY_PRIORITY_VALUE "high"
+#define CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
+#define CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE "collectd connectivity plugin"
+#define CONNECTIVITY_SEQUENCE_FIELD "sequence"
+#define CONNECTIVITY_SEQUENCE_VALUE "0"
+#define CONNECTIVITY_SOURCE_NAME_FIELD "sourceName"
+#define CONNECTIVITY_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
+#define CONNECTIVITY_VERSION_FIELD "version"
+#define CONNECTIVITY_VERSION_VALUE "1.0"
+
+#define CONNECTIVITY_NEW_STATE_FIELD "newState"
+#define CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE "outOfService"
+#define CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE "inService"
+#define CONNECTIVITY_OLD_STATE_FIELD "oldState"
+#define CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE "outOfService"
+#define CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE "inService"
+#define CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD "stateChangeFields"
+#define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD                         \
+  "stateChangeFieldsVersion"
+#define CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE "1.0"
+#define CONNECTIVITY_STATE_INTERFACE_FIELD "stateInterface"
+
+/*
+ * Private data types
+ */
+
+struct interface_list_s {
+  char *interface;
+
+  uint32_t status;
+  uint32_t prev_status;
+  uint32_t sent;
+  cdtime_t timestamp;
+
+  struct interface_list_s *next;
+};
+typedef struct interface_list_s interface_list_t;
+
+/*
+ * Private variables
+ */
+
+static ignorelist_t *ignorelist = NULL;
+
+static interface_list_t *interface_list_head = NULL;
+static int monitor_all_interfaces = 1;
+
+static int connectivity_netlink_thread_loop = 0;
+static int connectivity_netlink_thread_error = 0;
+static pthread_t connectivity_netlink_thread_id;
+static int connectivity_dequeue_thread_loop = 0;
+static pthread_t connectivity_dequeue_thread_id;
+static pthread_mutex_t connectivity_threads_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t connectivity_data_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t connectivity_cond = PTHREAD_COND_INITIALIZER;
+static int nl_sock = -1;
+static int event_id = 0;
+static int statuses_to_send = 0;
+
+static const char *config_keys[] = {"Interface", "IgnoreSelected"};
+static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
+
+/*
+ * Private functions
+ */
+
+static int gen_message_payload(int state, int old_state, const char *interface,
+                               cdtime_t timestamp, char **buf) {
+  const unsigned char *buf2;
+  yajl_gen g;
+  char json_str[DATA_MAX_NAME_LEN];
+
+#if !defined(HAVE_YAJL_V2)
+  yajl_gen_config conf = {0};
+#endif
+
+#if HAVE_YAJL_V2
+  size_t len;
+  g = yajl_gen_alloc(NULL);
+  yajl_gen_config(g, yajl_gen_beautify, 0);
+#else
+  unsigned int len;
+  g = yajl_gen_alloc(&conf, NULL);
+#endif
+
+  yajl_gen_clear(g);
+
+  // *** BEGIN common event header ***
+
+  if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+    goto err;
+
+  // domain
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_FIELD,
+                      strlen(CONNECTIVITY_DOMAIN_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_DOMAIN_VALUE,
+                      strlen(CONNECTIVITY_DOMAIN_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // eventId
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_ID_FIELD,
+                      strlen(CONNECTIVITY_EVENT_ID_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  event_id = event_id + 1;
+  if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) {
+    goto err;
+  }
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // eventName
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_EVENT_NAME_FIELD,
+                      strlen(CONNECTIVITY_EVENT_NAME_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (snprintf(json_str, sizeof(json_str), "interface %s %s", interface,
+               (state == 0 ? CONNECTIVITY_EVENT_NAME_DOWN_VALUE
+                           : CONNECTIVITY_EVENT_NAME_UP_VALUE)) < 0) {
+    goto err;
+  }
+
+  if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
+      yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // lastEpochMicrosec
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD,
+                      strlen(CONNECTIVITY_LAST_EPOCH_MICROSEC_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
+               CDTIME_T_TO_US(cdtime())) < 0) {
+    goto err;
+  }
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // priority
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_FIELD,
+                      strlen(CONNECTIVITY_PRIORITY_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_PRIORITY_VALUE,
+                      strlen(CONNECTIVITY_PRIORITY_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // reportingEntityName
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD,
+                      strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE,
+                      strlen(CONNECTIVITY_REPORTING_ENTITY_NAME_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // sequence
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SEQUENCE_FIELD,
+                      strlen(CONNECTIVITY_SEQUENCE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, CONNECTIVITY_SEQUENCE_VALUE,
+                      strlen(CONNECTIVITY_SEQUENCE_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // sourceName
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_SOURCE_NAME_FIELD,
+                      strlen(CONNECTIVITY_SOURCE_NAME_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // startEpochMicrosec
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_START_EPOCH_MICROSEC_FIELD,
+                      strlen(CONNECTIVITY_START_EPOCH_MICROSEC_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
+               CDTIME_T_TO_US(timestamp)) < 0) {
+    goto err;
+  }
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // version
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_VERSION_FIELD,
+                      strlen(CONNECTIVITY_VERSION_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, CONNECTIVITY_VERSION_VALUE,
+                      strlen(CONNECTIVITY_VERSION_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // *** END common event header ***
+
+  // *** BEGIN state change fields ***
+
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD,
+                      strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+    goto err;
+
+  // newState
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_NEW_STATE_FIELD,
+                      strlen(CONNECTIVITY_NEW_STATE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int new_state_len =
+      (state == 0 ? strlen(CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE)
+                  : strlen(CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE));
+
+  if (yajl_gen_string(g,
+                      (u_char *)(state == 0
+                                     ? CONNECTIVITY_NEW_STATE_FIELD_DOWN_VALUE
+                                     : CONNECTIVITY_NEW_STATE_FIELD_UP_VALUE),
+                      new_state_len) != yajl_gen_status_ok)
+    goto err;
+
+  // oldState
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_OLD_STATE_FIELD,
+                      strlen(CONNECTIVITY_OLD_STATE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  int old_state_len =
+      (old_state == 0 ? strlen(CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE)
+                      : strlen(CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE));
+
+  if (yajl_gen_string(g,
+                      (u_char *)(old_state == 0
+                                     ? CONNECTIVITY_OLD_STATE_FIELD_DOWN_VALUE
+                                     : CONNECTIVITY_OLD_STATE_FIELD_UP_VALUE),
+                      old_state_len) != yajl_gen_status_ok)
+    goto err;
+
+  // stateChangeFieldsVersion
+  if (yajl_gen_string(g,
+                      (u_char *)CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD,
+                      strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE,
+                      strlen(CONNECTIVITY_STATE_CHANGE_FIELDS_VERSION_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // stateInterface
+  if (yajl_gen_string(g, (u_char *)CONNECTIVITY_STATE_INTERFACE_FIELD,
+                      strlen(CONNECTIVITY_STATE_INTERFACE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)interface, strlen(interface)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // close state change and header fields
+  if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
+      yajl_gen_map_close(g) != yajl_gen_status_ok)
+    goto err;
+
+  // *** END state change fields ***
+
+  if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
+    goto err;
+
+  *buf = strdup((char *)buf2);
+
+  if (*buf == NULL) {
+    ERROR("connectivity plugin: strdup failed during gen_message_payload: %s",
+          STRERRNO);
+    goto err;
+  }
+
+  yajl_gen_free(g);
+
+  return 0;
+
+err:
+  yajl_gen_free(g);
+  ERROR("connectivity plugin: gen_message_payload failed to generate JSON");
+  return -1;
+}
+
+static interface_list_t *add_interface(const char *interface, int status,
+                                       int prev_status) {
+  interface_list_t *il = calloc(1, sizeof(*il));
+
+  if (il == NULL) {
+    ERROR("connectivity plugin: calloc failed during add_interface: %s",
+          STRERRNO);
+    return NULL;
+  }
+
+  char *interface2 = strdup(interface);
+  if (interface2 == NULL) {
+    sfree(il);
+    ERROR("connectivity plugin: strdup failed during add_interface: %s",
+          STRERRNO);
+    return NULL;
+  }
+
+  il->interface = interface2;
+  il->status = status;
+  il->prev_status = prev_status;
+  il->timestamp = cdtime();
+  il->sent = 0;
+  il->next = interface_list_head;
+  interface_list_head = il;
+
+  DEBUG("connectivity plugin: added interface %s", interface2);
+
+  return il;
+}
+
+static int connectivity_link_state(struct nlmsghdr *msg) {
+  pthread_mutex_lock(&connectivity_data_lock);
+
+  struct nlattr *attr;
+  struct ifinfomsg *ifi = mnl_nlmsg_get_payload(msg);
+
+  /* Scan attribute list for device name. */
+  mnl_attr_for_each(attr, msg, sizeof(*ifi)) {
+    if (mnl_attr_get_type(attr) != IFLA_IFNAME)
+      continue;
+
+    if (mnl_attr_validate(attr, MNL_TYPE_STRING) < 0) {
+      ERROR("connectivity plugin: connectivity_link_state: IFLA_IFNAME "
+            "mnl_attr_validate "
+            "failed.");
+      pthread_mutex_unlock(&connectivity_data_lock);
+      return MNL_CB_ERROR;
+    }
+
+    const char *dev = mnl_attr_get_str(attr);
+
+    // Check the list of interfaces we should monitor, if we've chosen
+    // a subset.  If we don't care about this one, abort.
+    if (ignorelist_match(ignorelist, dev) != 0) {
+      DEBUG("connectivity plugin: Ignoring link state change for unmonitored "
+            "interface: %s",
+            dev);
+      break;
+    }
+
+    interface_list_t *il = NULL;
+
+    for (il = interface_list_head; il != NULL; il = il->next)
+      if (strcmp(dev, il->interface) == 0)
+        break;
+
+    if (il == NULL) {
+      // We haven't encountered this interface yet, so add it to the linked list
+      il = add_interface(dev, LINK_STATE_UNKNOWN, LINK_STATE_UNKNOWN);
+
+      if (il == NULL) {
+        ERROR("connectivity plugin: unable to add interface %s during "
+              "connectivity_link_state",
+              dev);
+        return MNL_CB_ERROR;
+      }
+    }
+
+    uint32_t prev_status = il->status;
+    il->status =
+        ((ifi->ifi_flags & IFF_RUNNING) ? LINK_STATE_UP : LINK_STATE_DOWN);
+    il->timestamp = cdtime();
+
+    // If the new status is different than the previous status,
+    // store the previous status and set sent to zero, and set the
+    // global flag to indicate there are statuses to dispatch
+    if (il->status != prev_status) {
+      il->prev_status = prev_status;
+      il->sent = 0;
+      statuses_to_send = 1;
+    }
+
+    DEBUG("connectivity plugin (%llu): Interface %s status is now %s",
+          (unsigned long long)il->timestamp, dev,
+          ((ifi->ifi_flags & IFF_RUNNING) ? "UP" : "DOWN"));
+
+    // no need to loop again, we found the interface name attr
+    // (otherwise the first if-statement in the loop would
+    // have moved us on with 'continue')
+    break;
+  }
+
+  pthread_mutex_unlock(&connectivity_data_lock);
+
+  return 0;
+}
+
+static int msg_handler(struct nlmsghdr *msg) {
+  // We are only interested in RTM_NEWLINK messages
+  if (msg->nlmsg_type != RTM_NEWLINK) {
+    return 0;
+  }
+  return connectivity_link_state(msg);
+}
+
+static int read_event(int (*msg_handler)(struct nlmsghdr *)) {
+  int ret = 0;
+  int recv_flags = MSG_DONTWAIT;
+
+  if (nl_sock == -1 || msg_handler == NULL)
+    return EINVAL;
+
+  while (42) {
+    pthread_mutex_lock(&connectivity_threads_lock);
+
+    if (connectivity_netlink_thread_loop <= 0) {
+      pthread_mutex_unlock(&connectivity_threads_lock);
+      return ret;
+    }
+
+    pthread_mutex_unlock(&connectivity_threads_lock);
+
+    char buf[4096];
+    int status = recv(nl_sock, buf, sizeof(buf), recv_flags);
+
+    if (status < 0) {
+
+      // If there were no more messages to drain from the socket,
+      // then signal the dequeue thread and allow it to dispatch
+      // any saved interface status changes.  Then continue, but
+      // block and wait for new messages
+      if (errno == EWOULDBLOCK || errno == EAGAIN) {
+        pthread_cond_signal(&connectivity_cond);
+
+        recv_flags = 0;
+        continue;
+      }
+
+      if (errno == EINTR) {
+        // Interrupt, so just continue and try again
+        continue;
+      }
+
+      /* Anything else is an error */
+      ERROR("connectivity plugin: read_event: Error recv: %d", status);
+      return status;
+    }
+
+    // Message received successfully, so we'll stop blocking on the
+    // receive call for now (until we get a "would block" error, which
+    // will be handled above)
+    recv_flags = MSG_DONTWAIT;
+
+    if (status == 0) {
+      DEBUG("connectivity plugin: read_event: EOF");
+    }
+
+    /* We need to handle more than one message per 'recvmsg' */
+    for (struct nlmsghdr *h = (struct nlmsghdr *)buf;
+         NLMSG_OK(h, (unsigned int)status); h = NLMSG_NEXT(h, status)) {
+      /* Finish reading */
+      if (h->nlmsg_type == NLMSG_DONE)
+        return ret;
+
+      /* Message is some kind of error */
+      if (h->nlmsg_type == NLMSG_ERROR) {
+        struct nlmsgerr *l_err = (struct nlmsgerr *)NLMSG_DATA(h);
+        ERROR("connectivity plugin: read_event: Message is an error: %d",
+              l_err->error);
+        return -1; // Error
+      }
+
+      /* Call message handler */
+      if (msg_handler) {
+        ret = (*msg_handler)(h);
+        if (ret < 0) {
+          ERROR("connectivity plugin: read_event: Message handler error %d",
+                ret);
+          return ret;
+        }
+      } else {
+        ERROR("connectivity plugin: read_event: Error NULL message handler");
+        return -1;
+      }
+    }
+  }
+
+  return ret;
+}
+
+static void connectivity_dispatch_notification(const char *interface,
+                                               gauge_t value, gauge_t old_value,
+                                               cdtime_t timestamp) {
+
+  notification_t n = {
+      .severity = (value == LINK_STATE_UP ? NOTIF_OKAY : NOTIF_FAILURE),
+      .time = cdtime(),
+      .plugin = "connectivity",
+      .type = "gauge",
+      .type_instance = "interface_status",
+  };
+
+  sstrncpy(n.host, hostname_g, sizeof(n.host));
+  sstrncpy(n.plugin_instance, interface, sizeof(n.plugin_instance));
+
+  char *buf = NULL;
+
+  gen_message_payload(value, old_value, interface, timestamp, &buf);
+
+  int status = plugin_notification_meta_add_string(&n, "ves", buf);
+
+  if (status < 0) {
+    sfree(buf);
+    ERROR("connectivity plugin: unable to set notification VES metadata: %s",
+          STRERRNO);
+    return;
+  }
+
+  DEBUG("connectivity plugin: notification VES metadata: %s",
+        n.meta->nm_value.nm_string);
+
+  DEBUG("connectivity plugin: dispatching state %d for interface %s",
+        (int)value, interface);
+
+  plugin_dispatch_notification(&n);
+  plugin_notification_meta_free(n.meta);
+
+  // strdup'd in gen_message_payload
+  if (buf != NULL)
+    sfree(buf);
+}
+
+// NOTE: Caller MUST hold connectivity_data_lock when calling this function
+static void send_interface_status() {
+  for (interface_list_t *il = interface_list_head; il != NULL;
+       il = il->next) /* {{{ */
+  {
+    uint32_t status = il->status;
+    uint32_t prev_status = il->prev_status;
+    uint32_t sent = il->sent;
+
+    if (status != prev_status && sent == 0) {
+      connectivity_dispatch_notification(il->interface, status, prev_status,
+                                         il->timestamp);
+      il->sent = 1;
+    }
+  } /* }}} for (il = interface_list_head; il != NULL; il = il->next) */
+
+  statuses_to_send = 0;
+}
+
+static void read_interface_status() /* {{{ */
+{
+  pthread_mutex_lock(&connectivity_data_lock);
+
+  // If we don't have any interface statuses to dispatch,
+  // then we wait until signalled
+  if (!statuses_to_send)
+    pthread_cond_wait(&connectivity_cond, &connectivity_data_lock);
+
+  send_interface_status();
+
+  pthread_mutex_unlock(&connectivity_data_lock);
+} /* }}} int *read_interface_status */
+
+static void *connectivity_netlink_thread(void *arg) /* {{{ */
+{
+  pthread_mutex_lock(&connectivity_threads_lock);
+
+  while (connectivity_netlink_thread_loop > 0) {
+    pthread_mutex_unlock(&connectivity_threads_lock);
+
+    int status = read_event(msg_handler);
+
+    pthread_mutex_lock(&connectivity_threads_lock);
+
+    if (status < 0) {
+      connectivity_netlink_thread_error = 1;
+      break;
+    }
+  } /* while (connectivity_netlink_thread_loop > 0) */
+
+  pthread_mutex_unlock(&connectivity_threads_lock);
+
+  return (void *)0;
+} /* }}} void *connectivity_netlink_thread */
+
+static void *connectivity_dequeue_thread(void *arg) /* {{{ */
+{
+  pthread_mutex_lock(&connectivity_threads_lock);
+
+  while (connectivity_dequeue_thread_loop > 0) {
+    pthread_mutex_unlock(&connectivity_threads_lock);
+
+    read_interface_status();
+
+    pthread_mutex_lock(&connectivity_threads_lock);
+  } /* while (connectivity_dequeue_thread_loop > 0) */
+
+  pthread_mutex_unlock(&connectivity_threads_lock);
+
+  return ((void *)0);
+} /* }}} void *connectivity_dequeue_thread */
+
+static int nl_connect() {
+  struct sockaddr_nl sa_nl = {
+      .nl_family = AF_NETLINK,
+      .nl_groups = RTMGRP_LINK,
+      .nl_pid = getpid(),
+  };
+
+  nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_ROUTE);
+  if (nl_sock == -1) {
+    ERROR("connectivity plugin: socket open failed: %s", STRERRNO);
+    return -1;
+  }
+
+  int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
+  if (rc == -1) {
+    ERROR("connectivity plugin: socket bind failed: %s", STRERRNO);
+    close(nl_sock);
+    nl_sock = -1;
+    return -1;
+  }
+
+  return 0;
+}
+
+static int start_netlink_thread(void) /* {{{ */
+{
+  pthread_mutex_lock(&connectivity_threads_lock);
+
+  if (connectivity_netlink_thread_loop != 0) {
+    pthread_mutex_unlock(&connectivity_threads_lock);
+    return 0;
+  }
+
+  connectivity_netlink_thread_loop = 1;
+  connectivity_netlink_thread_error = 0;
+
+  int status;
+
+  if (nl_sock == -1) {
+    status = nl_connect();
+
+    if (status != 0) {
+      pthread_mutex_unlock(&connectivity_threads_lock);
+      return status;
+    }
+  }
+
+  status = plugin_thread_create(&connectivity_netlink_thread_id,
+                                /* attr = */ NULL, connectivity_netlink_thread,
+                                /* arg = */ (void *)0, "connectivity");
+  if (status != 0) {
+    connectivity_netlink_thread_loop = 0;
+    ERROR("connectivity plugin: Starting thread failed.");
+    pthread_mutex_unlock(&connectivity_threads_lock);
+
+    int status2 = close(nl_sock);
+
+    if (status2 != 0) {
+      ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
+            status2, STRERRNO);
+    }
+
+    nl_sock = -1;
+
+    return -1;
+  }
+
+  pthread_mutex_unlock(&connectivity_threads_lock);
+
+  return status;
+}
+
+static int start_dequeue_thread(void) /* {{{ */
+{
+  pthread_mutex_lock(&connectivity_threads_lock);
+
+  if (connectivity_dequeue_thread_loop != 0) {
+    pthread_mutex_unlock(&connectivity_threads_lock);
+    return 0;
+  }
+
+  connectivity_dequeue_thread_loop = 1;
+
+  int status =
+      plugin_thread_create(&connectivity_dequeue_thread_id,
+                           /* attr = */ NULL, connectivity_dequeue_thread,
+                           /* arg = */ (void *)0, "connectivity");
+  if (status != 0) {
+    connectivity_dequeue_thread_loop = 0;
+    ERROR("connectivity plugin: Starting dequeue thread failed.");
+    pthread_mutex_unlock(&connectivity_threads_lock);
+    return -1;
+  }
+
+  pthread_mutex_unlock(&connectivity_threads_lock);
+
+  return status;
+} /* }}} int start_dequeue_thread */
+
+static int start_threads(void) /* {{{ */
+{
+  int status = start_netlink_thread();
+  int status2 = start_dequeue_thread();
+
+  if (status != 0)
+    return status;
+  else
+    return status2;
+} /* }}} int start_threads */
+
+static int stop_netlink_thread(int shutdown) /* {{{ */
+{
+  int socket_status;
+
+  if (nl_sock != -1) {
+    socket_status = close(nl_sock);
+    if (socket_status != 0) {
+      ERROR("connectivity plugin: failed to close socket %d: %d (%s)", nl_sock,
+            socket_status, STRERRNO);
+    }
+
+    nl_sock = -1;
+  } else
+    socket_status = 0;
+
+  pthread_mutex_lock(&connectivity_threads_lock);
+
+  if (connectivity_netlink_thread_loop == 0) {
+    pthread_mutex_unlock(&connectivity_threads_lock);
+    // Thread has already been terminated, nothing more to attempt
+    return socket_status;
+  }
+
+  // Set thread termination status
+  connectivity_netlink_thread_loop = 0;
+  pthread_mutex_unlock(&connectivity_threads_lock);
+
+  // Let threads waiting on access to the interface list know to move
+  // on such that they'll see the thread's termination status
+  pthread_cond_broadcast(&connectivity_cond);
+
+  int thread_status;
+
+  if (shutdown == 1) {
+    // Since the thread is blocking, calling pthread_join
+    // doesn't actually succeed in stopping it.  It will stick around
+    // until a NETLINK message is received on the socket (at which
+    // it will realize that "connectivity_netlink_thread_loop" is 0 and will
+    // break out of the read loop and be allowed to die).  This is
+    // fine when the process isn't supposed to be exiting, but in
+    // the case of a process shutdown, we don't want to have an
+    // idle thread hanging around.  Calling pthread_cancel here in
+    // the case of a shutdown is just assures that the thread is
+    // gone and that the process has been fully terminated.
+
+    DEBUG("connectivity plugin: Canceling netlink thread for process shutdown");
+
+    thread_status = pthread_cancel(connectivity_netlink_thread_id);
+
+    if (thread_status != 0 && thread_status != ESRCH) {
+      ERROR("connectivity plugin: Unable to cancel netlink thread: %d",
+            thread_status);
+      thread_status = -1;
+    } else
+      thread_status = 0;
+  } else {
+    thread_status =
+        pthread_join(connectivity_netlink_thread_id, /* return = */ NULL);
+    if (thread_status != 0 && thread_status != ESRCH) {
+      ERROR("connectivity plugin: Stopping netlink thread failed: %d",
+            thread_status);
+      thread_status = -1;
+    } else
+      thread_status = 0;
+  }
+
+  pthread_mutex_lock(&connectivity_threads_lock);
+  memset(&connectivity_netlink_thread_id, 0,
+         sizeof(connectivity_netlink_thread_id));
+  connectivity_netlink_thread_error = 0;
+  pthread_mutex_unlock(&connectivity_threads_lock);
+
+  DEBUG("connectivity plugin: Finished requesting stop of netlink thread");
+
+  if (socket_status != 0)
+    return socket_status;
+  else
+    return thread_status;
+}
+
+static int stop_dequeue_thread() /* {{{ */
+{
+  pthread_mutex_lock(&connectivity_threads_lock);
+
+  if (connectivity_dequeue_thread_loop == 0) {
+    pthread_mutex_unlock(&connectivity_threads_lock);
+    return -1;
+  }
+
+  // Set thread termination status
+  connectivity_dequeue_thread_loop = 0;
+  pthread_mutex_unlock(&connectivity_threads_lock);
+
+  // Let threads waiting on access to the interface list know to move
+  // on such that they'll see the threads termination status
+  pthread_cond_broadcast(&connectivity_cond);
+
+  // Calling pthread_cancel here just assures that the thread is
+  // gone and that the process has been fully terminated.
+
+  DEBUG("connectivity plugin: Canceling dequeue thread for process shutdown");
+
+  int status = pthread_cancel(connectivity_dequeue_thread_id);
+
+  if (status != 0 && status != ESRCH) {
+    ERROR("connectivity plugin: Unable to cancel dequeue thread: %d", status);
+    status = -1;
+  } else
+    status = 0;
+
+  pthread_mutex_lock(&connectivity_threads_lock);
+  memset(&connectivity_dequeue_thread_id, 0,
+         sizeof(connectivity_dequeue_thread_id));
+  pthread_mutex_unlock(&connectivity_threads_lock);
+
+  DEBUG("connectivity plugin: Finished requesting stop of dequeue thread");
+
+  return status;
+} /* }}} int stop_dequeue_thread */
+
+static int stop_threads() /* {{{ */
+{
+  int status = stop_netlink_thread(1);
+  int status2 = stop_dequeue_thread();
+
+  if (status != 0)
+    return status;
+  else
+    return status2;
+} /* }}} int stop_threads */
+
+static int connectivity_init(void) /* {{{ */
+{
+  if (monitor_all_interfaces) {
+    NOTICE("connectivity plugin: No interfaces have been selected, so all will "
+           "be monitored");
+  }
+
+  return start_threads();
+} /* }}} int connectivity_init */
+
+static int connectivity_config(const char *key, const char *value) /* {{{ */
+{
+  if (ignorelist == NULL) {
+    ignorelist = ignorelist_create(/* invert = */ 1);
+
+    if (ignorelist == NULL)
+      return -1;
+  }
+
+  if (strcasecmp(key, "Interface") == 0) {
+    ignorelist_add(ignorelist, value);
+    monitor_all_interfaces = 0;
+  } else if (strcasecmp(key, "IgnoreSelected") == 0) {
+    int invert = 1;
+    if (IS_TRUE(value))
+      invert = 0;
+    ignorelist_set_invert(ignorelist, invert);
+  } else {
+    return -1;
+  }
+
+  return 0;
+} /* }}} int connectivity_config */
+
+static int connectivity_read(void) /* {{{ */
+{
+  pthread_mutex_lock(&connectivity_threads_lock);
+
+  if (connectivity_netlink_thread_error != 0) {
+
+    pthread_mutex_unlock(&connectivity_threads_lock);
+
+    ERROR("connectivity plugin: The netlink thread had a problem. Restarting "
+          "it.");
+
+    stop_netlink_thread(0);
+
+    for (interface_list_t *il = interface_list_head; il != NULL;
+         il = il->next) {
+      il->status = LINK_STATE_UNKNOWN;
+      il->prev_status = LINK_STATE_UNKNOWN;
+      il->sent = 0;
+    }
+
+    start_netlink_thread();
+
+    return -1;
+  } /* if (connectivity_netlink_thread_error != 0) */
+
+  pthread_mutex_unlock(&connectivity_threads_lock);
+
+  return 0;
+} /* }}} int connectivity_read */
+
+static int connectivity_shutdown(void) /* {{{ */
+{
+  DEBUG("connectivity plugin: Shutting down thread.");
+
+  int status = stop_threads();
+
+  interface_list_t *il = interface_list_head;
+  while (il != NULL) {
+    interface_list_t *il_next;
+
+    il_next = il->next;
+
+    sfree(il->interface);
+    sfree(il);
+
+    il = il_next;
+  }
+
+  ignorelist_free(ignorelist);
+
+  return status;
+} /* }}} int connectivity_shutdown */
+
+void module_register(void) {
+  plugin_register_config("connectivity", connectivity_config, config_keys,
+                         config_keys_num);
+  plugin_register_init("connectivity", connectivity_init);
+  plugin_register_read("connectivity", connectivity_read);
+  plugin_register_shutdown("connectivity", connectivity_shutdown);
+} /* void module_register */
index cf3d3da..d23d071 100644 (file)
 #include "plugin.h"
 #include "utils/common/common.h"
 
-#ifdef HAVE_SYS_SYSCTL_H
+#if defined(HAVE_SYSCTLBYNAME) && defined(HAVE_SYS_SYSCTL_H)
 #include <sys/sysctl.h>
-#endif
-
-#if HAVE_SYSCTLBYNAME
 /* no global variables */
 /* #endif HAVE_SYSCTLBYNAME */
 
index daddf68..52cb0a4 100644 (file)
@@ -85,6 +85,14 @@ struct read_func_s {
 };
 typedef struct read_func_s read_func_t;
 
+struct cache_event_func_s {
+  plugin_cache_event_cb callback;
+  char *name;
+  user_data_t user_data;
+  plugin_ctx_t plugin_ctx;
+};
+typedef struct cache_event_func_s cache_event_func_t;
+
 struct write_queue_s;
 typedef struct write_queue_s write_queue_t;
 struct write_queue_s {
@@ -112,6 +120,9 @@ static llist_t *list_shutdown;
 static llist_t *list_log;
 static llist_t *list_notification;
 
+static size_t list_cache_event_num;
+static cache_event_func_t list_cache_event[32];
+
 static fc_chain_t *pre_cache_chain;
 static fc_chain_t *post_cache_chain;
 
@@ -263,8 +274,6 @@ static void destroy_read_heap(void) /* {{{ */
 
 static int register_callback(llist_t **list, /* {{{ */
                              const char *name, callback_func_t *cf) {
-  llentry_t *le;
-  char *key;
 
   if (*list == NULL) {
     *list = llist_create();
@@ -276,14 +285,14 @@ static int register_callback(llist_t **list, /* {{{ */
     }
   }
 
-  key = strdup(name);
+  char *key = strdup(name);
   if (key == NULL) {
     ERROR("plugin: register_callback: strdup failed.");
     destroy_callback(cf);
     return -1;
   }
 
-  le = llist_search(*list, name);
+  llentry_t *le = llist_search(*list, name);
   if (le == NULL) {
     le = llentry_create(key, cf);
     if (le == NULL) {
@@ -296,9 +305,7 @@ static int register_callback(llist_t **list, /* {{{ */
 
     llist_append(*list, le);
   } else {
-    callback_func_t *old_cf;
-
-    old_cf = le->value;
+    callback_func_t *old_cf = le->value;
     le->value = cf;
 
     P_WARNING("register_callback: "
@@ -1312,6 +1319,60 @@ EXPORT int plugin_register_missing(const char *name, plugin_missing_cb callback,
   return create_register_callback(&list_missing, name, (void *)callback, ud);
 } /* int plugin_register_missing */
 
+EXPORT int plugin_register_cache_event(const char *name,
+                                       plugin_cache_event_cb callback,
+                                       user_data_t const *ud) {
+
+  if (name == NULL || callback == NULL)
+    return EINVAL;
+
+  char *name_copy = strdup(name);
+  if (name_copy == NULL) {
+    P_ERROR("plugin_register_cache_event: strdup failed.");
+    free_userdata(ud);
+    return ENOMEM;
+  }
+
+  if (list_cache_event_num >= 32) {
+    P_ERROR("plugin_register_cache_event: Too much cache event callbacks tried "
+            "to be registered.");
+    free_userdata(ud);
+    return ENOMEM;
+  }
+
+  for (size_t i = 0; i < list_cache_event_num; i++) {
+    cache_event_func_t *cef = &list_cache_event[i];
+    if (!cef->callback)
+      continue;
+
+    if (strcmp(name, cef->name) == 0) {
+      P_ERROR("plugin_register_cache_event: a callback named `%s' already "
+              "registered!",
+              name);
+      free_userdata(ud);
+      return -1;
+    }
+  }
+
+  user_data_t user_data;
+  if (ud == NULL) {
+    user_data = (user_data_t){
+        .data = NULL, .free_func = NULL,
+    };
+  } else {
+    user_data = *ud;
+  }
+
+  list_cache_event[list_cache_event_num] =
+      (cache_event_func_t){.callback = callback,
+                           .name = name_copy,
+                           .user_data = user_data,
+                           .plugin_ctx = plugin_get_ctx()};
+  list_cache_event_num++;
+
+  return 0;
+} /* int plugin_register_cache_event */
+
 EXPORT int plugin_register_shutdown(const char *name, int (*callback)(void)) {
   return create_register_callback(&list_shutdown, name, (void *)callback, NULL);
 } /* int plugin_register_shutdown */
@@ -1513,6 +1574,32 @@ EXPORT int plugin_unregister_missing(const char *name) {
   return plugin_unregister(list_missing, name);
 }
 
+EXPORT int plugin_unregister_cache_event(const char *name) {
+  for (size_t i = 0; i < list_cache_event_num; i++) {
+    cache_event_func_t *cef = &list_cache_event[i];
+    if (!cef->callback)
+      continue;
+    if (strcmp(name, cef->name) == 0) {
+      /* Mark callback as inactive, so mask in cache entries remains actual */
+      cef->callback = NULL;
+      sfree(cef->name);
+      free_userdata(&cef->user_data);
+    }
+  }
+  return 0;
+}
+
+static void destroy_cache_event_callbacks() {
+  for (size_t i = 0; i < list_cache_event_num; i++) {
+    cache_event_func_t *cef = &list_cache_event[i];
+    if (!cef->callback)
+      continue;
+    cef->callback = NULL;
+    sfree(cef->name);
+    free_userdata(&cef->user_data);
+  }
+}
+
 EXPORT int plugin_unregister_shutdown(const char *name) {
   return plugin_unregister(list_shutdown, name);
 }
@@ -1857,6 +1944,7 @@ EXPORT int plugin_shutdown_all(void) {
    * the data isn't freed twice. */
   destroy_all_callbacks(&list_flush);
   destroy_all_callbacks(&list_missing);
+  destroy_cache_event_callbacks();
   destroy_all_callbacks(&list_write);
 
   destroy_all_callbacks(&list_notification);
@@ -1897,6 +1985,82 @@ EXPORT int plugin_dispatch_missing(const value_list_t *vl) /* {{{ */
   return 0;
 } /* int }}} plugin_dispatch_missing */
 
+void plugin_dispatch_cache_event(enum cache_event_type_e event_type,
+                                 unsigned long callbacks_mask, const char *name,
+                                 const value_list_t *vl) {
+  switch (event_type) {
+  case CE_VALUE_NEW:
+    callbacks_mask = 0;
+    for (size_t i = 0; i < list_cache_event_num; i++) {
+      cache_event_func_t *cef = &list_cache_event[i];
+      plugin_cache_event_cb callback = cef->callback;
+
+      if (!callback)
+        continue;
+
+      cache_event_t event = (cache_event_t){.type = event_type,
+                                            .value_list = vl,
+                                            .value_list_name = name,
+                                            .ret = 0};
+
+      plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
+      int status = (*callback)(&event, &cef->user_data);
+      plugin_set_ctx(old_ctx);
+
+      if (status != 0) {
+        ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
+              "%i for event NEW.",
+              cef->name, status);
+      } else {
+        if (event.ret) {
+          DEBUG(
+              "plugin_dispatch_cache_event: Callback \"%s\" subscribed to %s.",
+              cef->name, name);
+          callbacks_mask |= (1 << (i));
+        } else {
+          DEBUG("plugin_dispatch_cache_event: Callback \"%s\" ignores %s.",
+                cef->name, name);
+        }
+      }
+    }
+
+    if (callbacks_mask)
+      uc_set_callbacks_mask(name, callbacks_mask);
+
+    break;
+  case CE_VALUE_UPDATE:
+  case CE_VALUE_EXPIRED:
+    for (size_t i = 0; i < list_cache_event_num; i++) {
+      cache_event_func_t *cef = &list_cache_event[i];
+      plugin_cache_event_cb callback = cef->callback;
+
+      if (!callback)
+        continue;
+
+      if (callbacks_mask && (1 << (i)) == 0)
+        continue;
+
+      cache_event_t event = (cache_event_t){.type = event_type,
+                                            .value_list = vl,
+                                            .value_list_name = name,
+                                            .ret = 0};
+
+      plugin_ctx_t old_ctx = plugin_set_ctx(cef->plugin_ctx);
+      int status = (*callback)(&event, &cef->user_data);
+      plugin_set_ctx(old_ctx);
+
+      if (status != 0) {
+        ERROR("plugin_dispatch_cache_event: Callback \"%s\" failed with status "
+              "%i for event %s.",
+              cef->name, status,
+              ((event_type == CE_VALUE_UPDATE) ? "UPDATE" : "EXPIRED"));
+      }
+    }
+    break;
+  }
+  return;
+}
+
 static int plugin_dispatch_values_internal(value_list_t *vl) {
   int status;
   static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC;
index 3c30158..af3693d 100644 (file)
@@ -171,6 +171,15 @@ struct user_data_s {
 };
 typedef struct user_data_s user_data_t;
 
+enum cache_event_type_e { CE_VALUE_NEW, CE_VALUE_UPDATE, CE_VALUE_EXPIRED };
+
+typedef struct cache_event_s {
+  enum cache_event_type_e type;
+  const value_list_t *value_list;
+  const char *value_list_name;
+  int ret;
+} cache_event_t;
+
 struct plugin_ctx_s {
   char *name;
   cdtime_t interval;
@@ -192,6 +201,11 @@ typedef int (*plugin_flush_cb)(cdtime_t timeout, const char *identifier,
  * callbacks should be called, greater than zero if no more callbacks should be
  * called. */
 typedef int (*plugin_missing_cb)(const value_list_t *, user_data_t *);
+/* "cache event" callback. CE_VALUE_NEW events are sent to all registered
+ * callbacks. Callback should check if it interested in further CE_VALUE_UPDATE
+ * and CE_VALUE_EXPIRED events for metric and set event->ret = 1 if so.
+ */
+typedef int (*plugin_cache_event_cb)(cache_event_t *, user_data_t *);
 typedef void (*plugin_log_cb)(int severity, const char *message, user_data_t *);
 typedef int (*plugin_shutdown_cb)(void);
 typedef int (*plugin_notification_cb)(const notification_t *, user_data_t *);
@@ -295,6 +309,9 @@ int plugin_register_flush(const char *name, plugin_flush_cb callback,
                           user_data_t const *user_data);
 int plugin_register_missing(const char *name, plugin_missing_cb callback,
                             user_data_t const *user_data);
+int plugin_register_cache_event(const char *name,
+                                plugin_cache_event_cb callback,
+                                user_data_t const *ud);
 int plugin_register_shutdown(const char *name, plugin_shutdown_cb callback);
 int plugin_register_data_set(const data_set_t *ds);
 int plugin_register_log(const char *name, plugin_log_cb callback,
@@ -311,6 +328,7 @@ int plugin_unregister_read_group(const char *group);
 int plugin_unregister_write(const char *name);
 int plugin_unregister_flush(const char *name);
 int plugin_unregister_missing(const char *name);
+int plugin_unregister_cache_event(const char *name);
 int plugin_unregister_shutdown(const char *name);
 int plugin_unregister_data_set(const char *name);
 int plugin_unregister_log(const char *name);
@@ -381,6 +399,9 @@ __attribute__((sentinel)) int plugin_dispatch_multivalue(value_list_t const *vl,
                                                          int store_type, ...);
 
 int plugin_dispatch_missing(const value_list_t *vl);
+void plugin_dispatch_cache_event(enum cache_event_type_e event_type,
+                                 unsigned long callbacks_mask, const char *name,
+                                 const value_list_t *vl);
 
 int plugin_dispatch_notification(const notification_t *notif);
 
index 351c3a0..672b01f 100644 (file)
@@ -67,6 +67,7 @@ typedef struct cache_entry_s {
   size_t history_length;
 
   meta_data_t *meta;
+  unsigned long callbacks_mask;
 } cache_entry_t;
 
 struct uc_iter_s {
@@ -140,18 +141,15 @@ static void uc_check_range(const data_set_t *ds, cache_entry_t *ce) {
 
 static int uc_insert(const data_set_t *ds, const value_list_t *vl,
                      const char *key) {
-  char *key_copy;
-  cache_entry_t *ce;
-
   /* `cache_lock' has been locked by `uc_update' */
 
-  key_copy = strdup(key);
+  char *key_copy = strdup(key);
   if (key_copy == NULL) {
     ERROR("uc_insert: strdup failed.");
     return -1;
   }
 
-  ce = cache_alloc(ds->ds_num);
+  cache_entry_t *ce = cache_alloc(ds->ds_num);
   if (ce == NULL) {
     sfree(key_copy);
     ERROR("uc_insert: cache_alloc (%" PRIsz ") failed.", ds->ds_num);
@@ -230,6 +228,7 @@ int uc_check_timeout(void) {
     char *key;
     cdtime_t time;
     cdtime_t interval;
+    unsigned long callbacks_mask;
   } *expired = NULL;
   size_t expired_num = 0;
 
@@ -255,6 +254,7 @@ int uc_check_timeout(void) {
     expired[expired_num].key = strdup(key);
     expired[expired_num].time = ce->last_time;
     expired[expired_num].interval = ce->interval;
+    expired[expired_num].callbacks_mask = ce->callbacks_mask;
 
     if (expired[expired_num].key == NULL) {
       ERROR("uc_check_timeout: strdup failed.");
@@ -290,6 +290,10 @@ int uc_check_timeout(void) {
     }
 
     plugin_dispatch_missing(&vl);
+
+    if (expired[i].callbacks_mask)
+      plugin_dispatch_cache_event(CE_VALUE_EXPIRED, expired[i].callbacks_mask,
+                                  expired[i].key, &vl);
   } /* for (i = 0; i < expired_num; i++) */
 
   /* Now actually remove all the values from the cache. We don't re-evaluate
@@ -319,8 +323,6 @@ int uc_check_timeout(void) {
 
 int uc_update(const data_set_t *ds, const value_list_t *vl) {
   char name[6 * DATA_MAX_NAME_LEN];
-  cache_entry_t *ce = NULL;
-  int status;
 
   if (FORMAT_VL(name, sizeof(name), vl) != 0) {
     ERROR("uc_update: FORMAT_VL failed.");
@@ -329,11 +331,16 @@ int uc_update(const data_set_t *ds, const value_list_t *vl) {
 
   pthread_mutex_lock(&cache_lock);
 
-  status = c_avl_get(cache_tree, name, (void *)&ce);
+  cache_entry_t *ce = NULL;
+  int status = c_avl_get(cache_tree, name, (void *)&ce);
   if (status != 0) /* entry does not yet exist */
   {
     status = uc_insert(ds, vl, name);
     pthread_mutex_unlock(&cache_lock);
+
+    if (status == 0)
+      plugin_dispatch_cache_event(CE_VALUE_NEW, 0 /* mask */, name, vl);
+
     return status;
   }
 
@@ -408,11 +415,32 @@ int uc_update(const data_set_t *ds, const value_list_t *vl) {
   ce->last_update = cdtime();
   ce->interval = vl->interval;
 
+  /* Check if cache entry has registered callbacks */
+  unsigned long callbacks_mask = ce->callbacks_mask;
+
   pthread_mutex_unlock(&cache_lock);
 
+  if (callbacks_mask)
+    plugin_dispatch_cache_event(CE_VALUE_UPDATE, callbacks_mask, name, vl);
+
   return 0;
 } /* int uc_update */
 
+int uc_set_callbacks_mask(const char *name, unsigned long mask) {
+  pthread_mutex_lock(&cache_lock);
+  cache_entry_t *ce = NULL;
+  int status = c_avl_get(cache_tree, name, (void *)&ce);
+  if (status != 0) { /* Ouch, just created entry disappeared ?! */
+    ERROR("uc_set_callbacks_mask: Couldn't find %s entry!", name);
+    pthread_mutex_unlock(&cache_lock);
+    return -1;
+  }
+  DEBUG("uc_set_callbacks_mask: set mask for \"%s\" to %lu.", name, mask);
+  ce->callbacks_mask = mask;
+  pthread_mutex_unlock(&cache_lock);
+  return 0;
+}
+
 int uc_get_rate_by_name(const char *name, gauge_t **ret_values,
                         size_t *ret_values_num) {
   gauge_t *ret = NULL;
index d3ea936..a069221 100644 (file)
@@ -56,6 +56,8 @@ int uc_get_hits(const data_set_t *ds, const value_list_t *vl);
 int uc_set_hits(const data_set_t *ds, const value_list_t *vl, int hits);
 int uc_inc_hits(const data_set_t *ds, const value_list_t *vl, int step);
 
+int uc_set_callbacks_mask(const char *name, unsigned long callbacks_mask);
+
 int uc_get_history(const data_set_t *ds, const value_list_t *vl,
                    gauge_t *ret_history, size_t num_steps, size_t num_ds);
 int uc_get_history_by_name(const char *name, gauge_t *ret_history,
index 9574f2c..f8707d4 100644 (file)
@@ -26,6 +26,9 @@
 #define _DEFAULT_SOURCE
 #define _BSD_SOURCE /* For setgroups */
 
+/* _GNU_SOURCE is needed in Linux to use execvpe */
+#define _GNU_SOURCE
+
 #include "collectd.h"
 
 #include "plugin.h"
@@ -43,6 +46,8 @@
 #include <sys/capability.h>
 #endif
 
+extern char **environ;
+
 #define PL_NORMAL 0x01
 #define PL_NOTIF_ACTION 0x02
 
@@ -245,49 +250,9 @@ static int exec_config(oconfig_item_t *ci) /* {{{ */
   return 0;
 } /* int exec_config }}} */
 
-#if !defined(HAVE_SETENV)
-static char env_interval[64];
-// max hostname len is 255, so this should be enough
-static char env_hostname[300];
-#endif
-
-static void set_environment(void) /* {{{ */
-{
-#ifdef HAVE_SETENV
-  char buffer[1024];
-
-  snprintf(buffer, sizeof(buffer), "%.3f",
-           CDTIME_T_TO_DOUBLE(plugin_get_interval()));
-  setenv("COLLECTD_INTERVAL", buffer, /* overwrite = */ 1);
-
-  sstrncpy(buffer, hostname_g, sizeof(buffer));
-  setenv("COLLECTD_HOSTNAME", buffer, /* overwrite = */ 1);
-#else
-  snprintf(env_interval, sizeof(env_interval), "COLLECTD_INTERVAL=%.3f",
-           CDTIME_T_TO_DOUBLE(plugin_get_interval()));
-  putenv(env_interval);
-
-  snprintf(env_hostname, sizeof(env_hostname), "COLLECTD_HOSTNAME=%s",
-           hostname_g);
-  putenv(env_hostname);
-#endif
-} /* }}} void set_environment */
-
-static void unset_environment(void) /* {{{ */
-{
-#ifdef HAVE_SETENV
-  unsetenv("COLLECTD_INTERVAL");
-  unsetenv("COLLECTD_HOSTNAME");
-#else
-  snprintf(env_interval, sizeof(env_interval), "COLLECTD_INTERVAL");
-  putenv(env_interval);
-  snprintf(env_hostname, sizeof(env_hostname), "COLLECTD_HOSTNAME");
-  putenv(env_hostname);
-#endif
-} /* }}} void unset_environment */
-
-__attribute__((noreturn)) static void exec_child(program_list_t *pl, int uid,
-                                                 int gid, int egid) /* {{{ */
+__attribute__((noreturn)) static void exec_child(program_list_t *pl,
+                                                 char **envp, int uid, int gid,
+                                                 int egid) /* {{{ */
 {
   int status;
 
@@ -328,7 +293,12 @@ __attribute__((noreturn)) static void exec_child(program_list_t *pl, int uid,
     exit(-1);
   }
 
+#ifdef HAVE_EXECVPE
+  execvpe(pl->exec, pl->argv, envp);
+#else
+  environ = envp;
   execvp(pl->exec, pl->argv);
+#endif
 
   ERROR("exec plugin: Failed to execute ``%s'': %s", pl->exec, STRERRNO);
   exit(-1);
@@ -486,17 +456,42 @@ static int fork_child(program_list_t *pl, int *fd_in, int *fd_out,
     goto failed;
   }
 
-  set_environment();
+  double interval = CDTIME_T_TO_DOUBLE(plugin_get_interval());
 
   pid = fork();
   if (pid < 0) {
     ERROR("exec plugin: fork failed: %s", STRERRNO);
     goto failed;
   } else if (pid == 0) {
-    int fd_num;
+    char interval_buf[128];
+    snprintf(interval_buf, sizeof(interval_buf), "COLLECTD_INTERVAL=%.3f",
+             interval);
+
+    /* max hostname len is 255, so this should be enough */
+    char hostname_buf[300];
+    snprintf(hostname_buf, sizeof(hostname_buf), "COLLECTD_HOSTNAME=%s",
+             hostname_g);
+
+    size_t env_size = 0;
+    while (environ[env_size] != NULL) {
+      ++env_size;
+    }
+
+    /* Copy the environment variables */
+    char *envp[env_size + 3];
+    size_t envp_idx;
+    for (envp_idx = 0; environ[envp_idx] != NULL && envp_idx < env_size;
+         ++envp_idx) {
+      envp[envp_idx] = environ[envp_idx];
+    }
+
+    /* Add the collectd environment variables */
+    envp[envp_idx++] = interval_buf;
+    envp[envp_idx++] = hostname_buf;
+    envp[envp_idx++] = NULL;
 
     /* Close all file descriptors but the pipe end we need. */
-    fd_num = getdtablesize();
+    int fd_num = getdtablesize();
     for (int fd = 0; fd < fd_num; fd++) {
       if ((fd == fd_pipe_in[0]) || (fd == fd_pipe_out[1]) ||
           (fd == fd_pipe_err[1]))
@@ -525,12 +520,10 @@ static int fork_child(program_list_t *pl, int *fd_in, int *fd_out,
     /* Unblock all signals */
     reset_signal_mask();
 
-    exec_child(pl, uid, gid, egid);
+    exec_child(pl, envp, uid, gid, egid);
     /* does not return */
   }
 
-  unset_environment();
-
   close(fd_pipe_in[0]);
   close(fd_pipe_out[1]);
   close(fd_pipe_err[1]);
@@ -553,8 +546,6 @@ static int fork_child(program_list_t *pl, int *fd_in, int *fd_out,
   return pid;
 
 failed:
-  unset_environment();
-
   close_pipe(fd_pipe_in);
   close_pipe(fd_pipe_out);
   close_pipe(fd_pipe_err);
@@ -919,6 +910,11 @@ static int exec_shutdown(void) /* {{{ */
       INFO("exec plugin: Sent SIGTERM to %hu", (unsigned short int)pl->pid);
     }
 
+    for (int i = 0; pl->argv[i] != NULL; i++) {
+      sfree(pl->argv[i]);
+    }
+    sfree(pl->argv);
+    sfree(pl->exec);
     sfree(pl->user);
     sfree(pl);
 
index 1e9cb20..d9577a4 100644 (file)
@@ -747,7 +747,8 @@ static int c_grpc_config_server(oconfig_item_t *ci) {
 
   auto callback_name = grpc::string("grpc/") + addr;
   user_data_t ud = {
-      .data = client, .free_func = c_grpc_destroy_write_callback,
+      .data = client,
+      .free_func = c_grpc_destroy_write_callback,
   };
 
   plugin_register_write(callback_name.c_str(), c_grpc_write, &ud);
index 41f4909..4da73bb 100644 (file)
@@ -2076,7 +2076,7 @@ static int cjni_config_load_plugin(oconfig_item_t *ci) /* {{{ */
     class->object = NULL;
   if (class->object == NULL) {
     ERROR("java plugin: cjni_config_load_plugin: "
-          "Could create a new `%s' object.",
+          "Could not create a new `%s' object.",
           class->name);
     cjni_thread_detach();
     free(class->name);
diff --git a/src/lvm.c b/src/lvm.c
deleted file mode 100644 (file)
index 3077c93..0000000
--- a/src/lvm.c
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * collectd - src/lvm.c
- * Copyright (C) 2013       Chad Malfait
- * Copyright (C) 2014       Carnegie Mellon University
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; only version 2 of the License is applicable.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
- *
- * Authors:
- *   Chad Malfait <malfaitc at yahoo.com>
- *   Benjamin Gilbert <bgilbert at backtick.net>
- **/
-
-#include "collectd.h"
-
-#include "plugin.h"
-#include "utils/common/common.h"
-
-#include <lvm2app.h>
-
-#ifdef HAVE_SYS_CAPABILITY_H
-#include <sys/capability.h>
-#endif /* HAVE_SYS_CAPABILITY_H */
-
-#define NO_VALUE UINT64_MAX
-#define PERCENT_SCALE_FACTOR 1e-8
-
-static uint64_t get_lv_property_int(lv_t lv, char const *property) {
-  lvm_property_value_t v;
-
-  v = lvm_lv_get_property(lv, property);
-  if (!v.is_valid || !v.is_integer)
-    return NO_VALUE;
-  /* May be NO_VALUE if @property does not apply to this LV */
-  return v.value.integer;
-}
-
-static char const *get_lv_property_string(lv_t lv, char const *property) {
-  lvm_property_value_t v;
-
-  v = lvm_lv_get_property(lv, property);
-  if (!v.is_valid || !v.is_string)
-    return NULL;
-  return v.value.string;
-}
-
-static void lvm_submit(char const *plugin_instance, char const *type_instance,
-                       uint64_t ivalue) {
-  value_list_t vl = VALUE_LIST_INIT;
-
-  vl.values = &(value_t){.gauge = (gauge_t)ivalue};
-  vl.values_len = 1;
-
-  sstrncpy(vl.plugin, "lvm", sizeof(vl.plugin));
-  sstrncpy(vl.plugin_instance, plugin_instance, sizeof(vl.plugin_instance));
-  sstrncpy(vl.type, "df_complex", sizeof(vl.type));
-  sstrncpy(vl.type_instance, type_instance, sizeof(vl.type_instance));
-
-  plugin_dispatch_values(&vl);
-}
-
-static void report_lv_utilization(lv_t lv, char const *vg_name,
-                                  char const *lv_name, uint64_t lv_size,
-                                  char const *used_percent_property) {
-  uint64_t used_percent_unscaled;
-  uint64_t used_bytes;
-  char plugin_instance[DATA_MAX_NAME_LEN];
-
-  used_percent_unscaled = get_lv_property_int(lv, used_percent_property);
-  if (used_percent_unscaled == NO_VALUE)
-    return;
-  used_bytes = lv_size * (used_percent_unscaled * PERCENT_SCALE_FACTOR);
-
-  ssnprintf(plugin_instance, sizeof(plugin_instance), "%s-%s", vg_name,
-            lv_name);
-  lvm_submit(plugin_instance, "used", used_bytes);
-  lvm_submit(plugin_instance, "free", lv_size - used_bytes);
-}
-
-static void report_thin_pool_utilization(lv_t lv, char const *vg_name,
-                                         uint64_t lv_size) {
-  char const *data_lv;
-  char const *metadata_lv;
-  uint64_t metadata_size;
-
-  data_lv = get_lv_property_string(lv, "data_lv");
-  metadata_lv = get_lv_property_string(lv, "metadata_lv");
-  metadata_size = get_lv_property_int(lv, "lv_metadata_size");
-  if (data_lv == NULL || metadata_lv == NULL || metadata_size == NO_VALUE)
-    return;
-
-  report_lv_utilization(lv, vg_name, data_lv, lv_size, "data_percent");
-  report_lv_utilization(lv, vg_name, metadata_lv, metadata_size,
-                        "metadata_percent");
-}
-
-static void vg_read(vg_t vg, char const *vg_name) {
-  struct dm_list *lvs;
-  struct lvm_lv_list *lvl;
-  char const *name;
-  char const *attrs;
-  uint64_t size;
-
-  lvm_submit(vg_name, "free", lvm_vg_get_free_size(vg));
-
-  lvs = lvm_vg_list_lvs(vg);
-  if (!lvs) {
-    /* no VGs are defined, which is not an error per se */
-    return;
-  }
-
-  dm_list_iterate_items(lvl, lvs) {
-    name = lvm_lv_get_name(lvl->lv);
-    attrs = get_lv_property_string(lvl->lv, "lv_attr");
-    size = lvm_lv_get_size(lvl->lv);
-    if (name == NULL || attrs == NULL || size == NO_VALUE)
-      continue;
-
-    /* Condition on volume type.  We want the reported sizes in the
-       volume group to sum to the size of the volume group, so we ignore
-       virtual volumes.  */
-    switch (attrs[0]) {
-    case 's':
-    case 'S':
-      /* Snapshot.  Also report used/free space. */
-      report_lv_utilization(lvl->lv, vg_name, name, size, "data_percent");
-      break;
-    case 't':
-      /* Thin pool virtual volume.  We report the underlying data
-         and metadata volumes, not this one.  Report used/free
-         space, then ignore. */
-      report_thin_pool_utilization(lvl->lv, vg_name, size);
-      continue;
-    case 'v':
-      /* Virtual volume.  Ignore. */
-      continue;
-    case 'V':
-      /* Thin volume or thin snapshot.  Ignore. */
-      continue;
-    }
-    lvm_submit(vg_name, name, size);
-  }
-}
-
-static int lvm_read(void) {
-  lvm_t lvm;
-  struct dm_list *vg_names;
-  struct lvm_str_list *name_list;
-
-  lvm = lvm_init(NULL);
-  if (!lvm) {
-    ERROR("lvm plugin: lvm_init failed.");
-    return -1;
-  }
-
-  vg_names = lvm_list_vg_names(lvm);
-  if (!vg_names) {
-    ERROR("lvm plugin lvm_list_vg_name failed %s", lvm_errmsg(lvm));
-    lvm_quit(lvm);
-    return -1;
-  }
-
-  dm_list_iterate_items(name_list, vg_names) {
-    vg_t vg;
-
-    vg = lvm_vg_open(lvm, name_list->str, "r", 0);
-    if (!vg) {
-      ERROR("lvm plugin: lvm_vg_open (%s) failed: %s", name_list->str,
-            lvm_errmsg(lvm));
-      continue;
-    }
-
-    vg_read(vg, name_list->str);
-    lvm_vg_close(vg);
-  }
-
-  lvm_quit(lvm);
-  return 0;
-} /*lvm_read */
-
-static int c_lvm_init(void) {
-#if defined(HAVE_SYS_CAPABILITY_H) && defined(CAP_SYS_ADMIN)
-  if (check_capability(CAP_SYS_ADMIN) != 0) {
-    if (getuid() == 0)
-      WARNING("lvm plugin: Running collectd as root, but the "
-              "CAP_SYS_ADMIN capability is missing. The plugin's read "
-              "function will probably fail. Is your init system dropping "
-              "capabilities?");
-    else
-      WARNING("lvm plugin: collectd doesn't have the CAP_SYS_ADMIN "
-              "capability. If you don't want to run collectd as root, try "
-              "running \"setcap cap_sys_admin=ep\" on the collectd binary.");
-  }
-#endif
-  return 0;
-}
-
-void module_register(void) {
-  plugin_register_init("lvm", c_lvm_init);
-  plugin_register_read("lvm", lvm_read);
-} /* void module_register */
index 8acf84b..a1a6e3d 100644 (file)
@@ -142,6 +142,7 @@ typedef struct sockent {
   } data;
 
   struct sockent *next;
+  pthread_mutex_t lock;
 } sockent_t;
 
 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
@@ -1540,6 +1541,7 @@ static void sockent_destroy(sockent_t *se) /* {{{ */
 
     sfree(se->node);
     sfree(se->service);
+    pthread_mutex_destroy(&se->lock);
 
     if (se->type == SOCKENT_TYPE_CLIENT)
       free_sockent_client(&se->data.client);
@@ -1858,6 +1860,7 @@ static sockent_t *sockent_create(int type) /* {{{ */
   se->service = NULL;
   se->interface = 0;
   se->next = NULL;
+  pthread_mutex_init(&se->lock, NULL);
 
   if (type == SOCKENT_TYPE_SERVER) {
     se->data.server.fd = NULL;
@@ -1949,6 +1952,8 @@ static int sockent_client_disconnect(sockent_t *se) /* {{{ */
     client->fd = -1;
   }
 
+  DEBUG("network plugin: free (se = %p, addr = %p);", (void *)se,
+        (void *)client->addr);
   sfree(client->addr);
   client->addrlen = 0;
 
@@ -2020,6 +2025,8 @@ static int sockent_client_connect(sockent_t *se) /* {{{ */
       client->fd = -1;
       continue;
     }
+    DEBUG("network plugin: alloc (se = %p, addr = %p);", (void *)se,
+          (void *)client->addr);
 
     assert(sizeof(*client->addr) >= ai_ptr->ai_addrlen);
     memcpy(client->addr, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
@@ -2541,6 +2548,7 @@ static void network_send_buffer(char *buffer, size_t buffer_len) /* {{{ */
         buffer_len);
 
   for (sockent_t *se = sending_sockets; se != NULL; se = se->next) {
+    pthread_mutex_lock(&se->lock);
 #if HAVE_GCRYPT_H
     if (se->data.client.security_level == SECURITY_LEVEL_ENCRYPT)
       network_send_buffer_encrypted(se, buffer, buffer_len);
@@ -2549,6 +2557,7 @@ static void network_send_buffer(char *buffer, size_t buffer_len) /* {{{ */
     else /* if (se->data.client.security_level == SECURITY_LEVEL_NONE) */
 #endif   /* HAVE_GCRYPT_H */
       network_send_buffer_plain(se, buffer, buffer_len);
+    pthread_mutex_unlock(&se->lock);
   } /* for (sending_sockets) */
 } /* }}} void network_send_buffer */
 
diff --git a/src/procevent.c b/src/procevent.c
new file mode 100644 (file)
index 0000000..ab000db
--- /dev/null
@@ -0,0 +1,1303 @@
+/**
+ * collectd - src/procevent.c
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Red Hat NFVPE
+ *     Andrew Bays <abays at redhat.com>
+ **/
+
+#include "collectd.h"
+
+#include "plugin.h"
+#include "utils/common/common.h"
+#include "utils/ignorelist/ignorelist.h"
+#include "utils_complain.h"
+
+#include <errno.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <dirent.h>
+#include <linux/cn_proc.h>
+#include <linux/connector.h>
+#include <linux/netlink.h>
+#include <linux/rtnetlink.h>
+#include <signal.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <yajl/yajl_common.h>
+#include <yajl/yajl_gen.h>
+#if HAVE_YAJL_YAJL_VERSION_H
+#include <yajl/yajl_version.h>
+#endif
+#if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
+#define HAVE_YAJL_V2 1
+#endif
+
+#define PROCEVENT_EXITED 0
+#define PROCEVENT_STARTED 1
+#define PROCEVENT_FIELDS 3 // pid, status, timestamp
+#define BUFSIZE 512
+#define PROCDIR "/proc"
+#define RBUF_PROC_ID_INDEX 0
+#define RBUF_PROC_STATUS_INDEX 1
+#define RBUF_TIME_INDEX 2
+
+#define PROCEVENT_DOMAIN_FIELD "domain"
+#define PROCEVENT_DOMAIN_VALUE "fault"
+#define PROCEVENT_EVENT_ID_FIELD "eventId"
+#define PROCEVENT_EVENT_NAME_FIELD "eventName"
+#define PROCEVENT_EVENT_NAME_DOWN_VALUE "down"
+#define PROCEVENT_EVENT_NAME_UP_VALUE "up"
+#define PROCEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
+#define PROCEVENT_PRIORITY_FIELD "priority"
+#define PROCEVENT_PRIORITY_VALUE "high"
+#define PROCEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
+#define PROCEVENT_REPORTING_ENTITY_NAME_VALUE "collectd procevent plugin"
+#define PROCEVENT_SEQUENCE_FIELD "sequence"
+#define PROCEVENT_SEQUENCE_VALUE "0"
+#define PROCEVENT_SOURCE_NAME_FIELD "sourceName"
+#define PROCEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
+#define PROCEVENT_VERSION_FIELD "version"
+#define PROCEVENT_VERSION_VALUE "1.0"
+
+#define PROCEVENT_ALARM_CONDITION_FIELD "alarmCondition"
+#define PROCEVENT_ALARM_INTERFACE_A_FIELD "alarmInterfaceA"
+#define PROCEVENT_EVENT_SEVERITY_FIELD "eventSeverity"
+#define PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE "CRITICAL"
+#define PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE "NORMAL"
+#define PROCEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
+#define PROCEVENT_EVENT_SOURCE_TYPE_VALUE "process"
+#define PROCEVENT_FAULT_FIELDS_FIELD "faultFields"
+#define PROCEVENT_FAULT_FIELDS_VERSION_FIELD "faultFieldsVersion"
+#define PROCEVENT_FAULT_FIELDS_VERSION_VALUE "1.0"
+#define PROCEVENT_SPECIFIC_PROBLEM_FIELD "specificProblem"
+#define PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE "down"
+#define PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE "up"
+#define PROCEVENT_VF_STATUS_FIELD "vfStatus"
+#define PROCEVENT_VF_STATUS_CRITICAL_VALUE "Ready to terminate"
+#define PROCEVENT_VF_STATUS_NORMAL_VALUE "Active"
+
+/*
+ * Private data types
+ */
+
+typedef struct {
+  int head;
+  int tail;
+  int maxLen;
+  cdtime_t **buffer;
+} circbuf_t;
+
+struct processlist_s {
+  char *process;
+
+  long pid;
+  int32_t last_status;
+
+  struct processlist_s *next;
+};
+typedef struct processlist_s processlist_t;
+
+/*
+ * Private variables
+ */
+static ignorelist_t *ignorelist = NULL;
+
+static int procevent_netlink_thread_loop = 0;
+static int procevent_netlink_thread_error = 0;
+static pthread_t procevent_netlink_thread_id;
+static int procevent_dequeue_thread_loop = 0;
+static pthread_t procevent_dequeue_thread_id;
+static pthread_mutex_t procevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t procevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t procevent_cond = PTHREAD_COND_INITIALIZER;
+static int nl_sock = -1;
+static int buffer_length;
+static circbuf_t ring;
+static processlist_t *processlist_head = NULL;
+static int event_id = 0;
+
+static const char *config_keys[] = {"BufferLength", "Process", "ProcessRegex"};
+static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
+
+/*
+ * Private functions
+ */
+
+static int gen_message_payload(int state, long pid, char *process,
+                               cdtime_t timestamp, char **buf) {
+  const unsigned char *buf2;
+  yajl_gen g;
+  char json_str[DATA_MAX_NAME_LEN];
+
+#if !defined(HAVE_YAJL_V2)
+  yajl_gen_config conf = {0};
+#endif
+
+#if HAVE_YAJL_V2
+  size_t len;
+  g = yajl_gen_alloc(NULL);
+  yajl_gen_config(g, yajl_gen_beautify, 0);
+#else
+  unsigned int len;
+  g = yajl_gen_alloc(&conf, NULL);
+#endif
+
+  yajl_gen_clear(g);
+
+  // *** BEGIN common event header ***
+
+  if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+    goto err;
+
+  // domain
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_FIELD,
+                      strlen(PROCEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_DOMAIN_VALUE,
+                      strlen(PROCEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // eventId
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_ID_FIELD,
+                      strlen(PROCEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  event_id = event_id + 1;
+  if (snprintf(json_str, sizeof(json_str), "%d", event_id) < 0) {
+    goto err;
+  }
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // eventName
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_NAME_FIELD,
+                      strlen(PROCEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (snprintf(json_str, sizeof(json_str), "process %s (%ld) %s", process, pid,
+               (state == 0 ? PROCEVENT_EVENT_NAME_DOWN_VALUE
+                           : PROCEVENT_EVENT_NAME_UP_VALUE)) < 0) {
+    goto err;
+  }
+
+  if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
+      yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // lastEpochMicrosec
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_LAST_EPOCH_MICROSEC_FIELD,
+                      strlen(PROCEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
+               CDTIME_T_TO_US(cdtime())) < 0) {
+    goto err;
+  }
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // priority
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_FIELD,
+                      strlen(PROCEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_PRIORITY_VALUE,
+                      strlen(PROCEVENT_PRIORITY_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // reportingEntityName
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_FIELD,
+                      strlen(PROCEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_REPORTING_ENTITY_NAME_VALUE,
+                      strlen(PROCEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // sequence
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_SEQUENCE_FIELD,
+                      strlen(PROCEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, PROCEVENT_SEQUENCE_VALUE,
+                      strlen(PROCEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // sourceName
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_SOURCE_NAME_FIELD,
+                      strlen(PROCEVENT_SOURCE_NAME_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // startEpochMicrosec
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_START_EPOCH_MICROSEC_FIELD,
+                      strlen(PROCEVENT_START_EPOCH_MICROSEC_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (snprintf(json_str, sizeof(json_str), "%" PRIu64,
+               CDTIME_T_TO_US(timestamp)) < 0) {
+    goto err;
+  }
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // version
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_VERSION_FIELD,
+                      strlen(PROCEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, PROCEVENT_VERSION_VALUE,
+                      strlen(PROCEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // *** END common event header ***
+
+  // *** BEGIN fault fields ***
+
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_FIELD,
+                      strlen(PROCEVENT_FAULT_FIELDS_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+    goto err;
+
+  // alarmCondition
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_CONDITION_FIELD,
+                      strlen(PROCEVENT_ALARM_CONDITION_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (snprintf(json_str, sizeof(json_str), "process %s (%ld) state change",
+               process, pid) < 0) {
+    goto err;
+  }
+
+  if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
+      yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // alarmInterfaceA
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_ALARM_INTERFACE_A_FIELD,
+                      strlen(PROCEVENT_ALARM_INTERFACE_A_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // eventSeverity
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SEVERITY_FIELD,
+                      strlen(PROCEVENT_EVENT_SEVERITY_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(
+          g,
+          (u_char *)(state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
+                                : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE),
+          strlen((state == 0 ? PROCEVENT_EVENT_SEVERITY_CRITICAL_VALUE
+                             : PROCEVENT_EVENT_SEVERITY_NORMAL_VALUE))) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // eventSourceType
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_FIELD,
+                      strlen(PROCEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_EVENT_SOURCE_TYPE_VALUE,
+                      strlen(PROCEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // faultFieldsVersion
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_FAULT_FIELDS_VERSION_FIELD,
+                      strlen(PROCEVENT_FAULT_FIELDS_VERSION_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, PROCEVENT_FAULT_FIELDS_VERSION_VALUE,
+                      strlen(PROCEVENT_FAULT_FIELDS_VERSION_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // specificProblem
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_SPECIFIC_PROBLEM_FIELD,
+                      strlen(PROCEVENT_SPECIFIC_PROBLEM_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (snprintf(json_str, sizeof(json_str), "process %s (%ld) %s", process, pid,
+               (state == 0 ? PROCEVENT_SPECIFIC_PROBLEM_DOWN_VALUE
+                           : PROCEVENT_SPECIFIC_PROBLEM_UP_VALUE)) < 0) {
+    goto err;
+  }
+
+  if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
+      yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // vfStatus
+  if (yajl_gen_string(g, (u_char *)PROCEVENT_VF_STATUS_FIELD,
+                      strlen(PROCEVENT_VF_STATUS_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(
+          g,
+          (u_char *)(state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
+                                : PROCEVENT_VF_STATUS_NORMAL_VALUE),
+          strlen((state == 0 ? PROCEVENT_VF_STATUS_CRITICAL_VALUE
+                             : PROCEVENT_VF_STATUS_NORMAL_VALUE))) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // *** END fault fields ***
+
+  // close fault and header fields
+  if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
+      yajl_gen_map_close(g) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
+    goto err;
+
+  *buf = strdup((char *)buf2);
+
+  if (*buf == NULL) {
+    ERROR("procevent plugin: strdup failed during gen_message_payload: %s",
+          STRERRNO);
+    goto err;
+  }
+
+  yajl_gen_free(g);
+
+  return 0;
+
+err:
+  yajl_gen_free(g);
+  ERROR("procevent plugin: gen_message_payload failed to generate JSON");
+  return -1;
+}
+
+// Does /proc/<pid>/comm contain a process name we are interested in?
+// NOTE: Caller MUST hold procevent_data_lock when calling this function
+static processlist_t *process_check(long pid) {
+  char file[BUFSIZE];
+
+  int len = snprintf(file, sizeof(file), PROCDIR "/%ld/comm", pid);
+
+  if ((len < 0) || (len >= BUFSIZE)) {
+    WARNING("procevent process_check: process name too large");
+    return NULL;
+  }
+
+  FILE *fh;
+
+  if (NULL == (fh = fopen(file, "r"))) {
+    // No /proc/<pid>/comm for this pid, just ignore
+    DEBUG("procevent plugin: no comm file available for pid %ld", pid);
+    return NULL;
+  }
+
+  char buffer[BUFSIZE];
+  int retval = fscanf(fh, "%[^\n]", buffer);
+
+  if (retval < 0) {
+    WARNING("procevent process_check: unable to read comm file for pid %ld",
+            pid);
+    fclose(fh);
+    return NULL;
+  }
+
+  // Now that we have the process name in the buffer, check if we are
+  // even interested in it
+  if (ignorelist_match(ignorelist, buffer) != 0) {
+    DEBUG("procevent process_check: ignoring process %s (%ld)", buffer, pid);
+    fclose(fh);
+    return NULL;
+  }
+
+  if (fh != NULL) {
+    fclose(fh);
+    fh = NULL;
+  }
+
+  //
+  // Go through the processlist linked list and look for the process name
+  // in /proc/<pid>/comm.  If found:
+  // 1. If pl->pid is -1, then set pl->pid to <pid> (and return that object)
+  // 2. If pl->pid is not -1, then another <process name> process was already
+  //    found.  If <pid> == pl->pid, this is an old match, so do nothing.
+  //    If the <pid> is different, however, make a new processlist_t and
+  //    associate <pid> with it (with the same process name as the existing).
+  //
+
+  processlist_t *match = NULL;
+
+  for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
+
+    int is_match = (strcmp(buffer, pl->process) == 0 ? 1 : 0);
+
+    if (is_match == 1) {
+      DEBUG("procevent plugin: process %ld name match for %s", pid, buffer);
+
+      if (pl->pid == pid) {
+        // this is a match, and we've already stored the exact pid/name combo
+        DEBUG("procevent plugin: found exact match with name %s, PID %ld for "
+              "incoming PID %ld",
+              pl->process, pl->pid, pid);
+        match = pl;
+        break;
+      } else if (pl->pid == -1) {
+        // this is a match, and we've found a candidate processlist_t to store
+        // this new pid/name combo
+        DEBUG("procevent plugin: reusing pl object with PID %ld for incoming "
+              "PID %ld",
+              pl->pid, pid);
+        pl->pid = pid;
+        match = pl;
+        break;
+      } else if (pl->pid != -1) {
+        // this is a match, but another instance of this process has already
+        // claimed this pid/name combo,
+        // so keep looking
+        DEBUG("procevent plugin: found pl object with matching name for "
+              "incoming PID %ld, but object is in use by PID %ld",
+              pid, pl->pid);
+        match = pl;
+        continue;
+      }
+    }
+  }
+
+  if (match == NULL ||
+      (match != NULL && match->pid != -1 && match->pid != pid)) {
+    // if there wasn't an existing match, OR
+    // if there was a match but the associated processlist_t object already
+    // contained a pid/name combo,
+    // then make a new one and add it to the linked list
+
+    DEBUG("procevent plugin: allocating new processlist_t object for PID %ld "
+          "(%s)",
+          pid, buffer);
+
+    processlist_t *pl2 = calloc(1, sizeof(*pl2));
+    if (pl2 == NULL) {
+      ERROR("procevent plugin: calloc failed during process_check: %s",
+            STRERRNO);
+      return NULL;
+    }
+
+    char *process = strdup(buffer);
+    if (process == NULL) {
+      sfree(pl2);
+      ERROR("procevent plugin: strdup failed during process_check: %s",
+            STRERRNO);
+      return NULL;
+    }
+
+    pl2->process = process;
+    pl2->pid = pid;
+    pl2->next = processlist_head;
+    processlist_head = pl2;
+
+    match = pl2;
+  }
+
+  return match;
+}
+
+// Does our map have this PID or name?
+// NOTE: Caller MUST hold procevent_data_lock when calling this function
+static processlist_t *process_map_check(long pid, char *process) {
+  for (processlist_t *pl = processlist_head; pl != NULL; pl = pl->next) {
+    int match_pid = 0;
+
+    if (pid > 0) {
+      if (pl->pid == pid)
+        match_pid = 1;
+    }
+
+    int match_process = 0;
+
+    if (process != NULL) {
+      if (strcmp(pl->process, process) == 0)
+        match_process = 1;
+    }
+
+    int match = 0;
+
+    if ((pid > 0 && process == NULL && match_pid == 1) ||
+        (pid < 0 && process != NULL && match_process == 1) ||
+        (pid > 0 && process != NULL && match_pid == 1 && match_process == 1)) {
+      match = 1;
+    }
+
+    if (match == 1) {
+      return pl;
+    }
+  }
+
+  return NULL;
+}
+
+static int process_map_refresh(void) {
+  errno = 0;
+  DIR *proc = opendir(PROCDIR);
+
+  if (proc == NULL) {
+    ERROR("procevent plugin: fopen (%s): %s", PROCDIR, STRERRNO);
+    return -1;
+  }
+
+  while (42) {
+    errno = 0;
+    struct dirent *dent = readdir(proc);
+    if (dent == NULL) {
+      if (errno == 0) /* end of directory */
+        break;
+
+      ERROR("procevent plugin: failed to read directory %s: %s", PROCDIR,
+            STRERRNO);
+      closedir(proc);
+      return -1;
+    }
+
+    if (dent->d_name[0] == '.')
+      continue;
+
+    char file[BUFSIZE];
+
+    int len = snprintf(file, sizeof(file), PROCDIR "/%s", dent->d_name);
+    if ((len < 0) || (len >= BUFSIZE))
+      continue;
+
+    struct stat statbuf;
+
+    int status = stat(file, &statbuf);
+    if (status != 0) {
+      WARNING("procevent plugin: stat (%s) failed: %s", file, STRERRNO);
+      continue;
+    }
+
+    if (!S_ISDIR(statbuf.st_mode))
+      continue;
+
+    len = snprintf(file, sizeof(file), PROCDIR "/%s/comm", dent->d_name);
+    if ((len < 0) || (len >= BUFSIZE))
+      continue;
+
+    int not_number = 0;
+
+    for (int i = 0; i < strlen(dent->d_name); i++) {
+      if (!isdigit(dent->d_name[i])) {
+        not_number = 1;
+        break;
+      }
+    }
+
+    if (not_number != 0)
+      continue;
+
+    // Check if we need to store this pid/name combo in our processlist_t linked
+    // list
+    int this_pid = atoi(dent->d_name);
+    pthread_mutex_lock(&procevent_data_lock);
+    processlist_t *pl = process_check(this_pid);
+    pthread_mutex_unlock(&procevent_data_lock);
+
+    if (pl != NULL)
+      DEBUG("procevent plugin: process map refreshed for PID %d and name %s",
+            this_pid, pl->process);
+  }
+
+  closedir(proc);
+
+  return 0;
+}
+
+static int nl_connect() {
+  struct sockaddr_nl sa_nl = {
+      .nl_family = AF_NETLINK,
+      .nl_groups = CN_IDX_PROC,
+      .nl_pid = getpid(),
+  };
+
+  nl_sock = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
+  if (nl_sock == -1) {
+    ERROR("procevent plugin: socket open failed: %d", errno);
+    return -1;
+  }
+
+  int rc = bind(nl_sock, (struct sockaddr *)&sa_nl, sizeof(sa_nl));
+  if (rc == -1) {
+    ERROR("procevent plugin: socket bind failed: %d", errno);
+    close(nl_sock);
+    nl_sock = -1;
+    return -1;
+  }
+
+  return 0;
+}
+
+static int set_proc_ev_listen(bool enable) {
+  struct __attribute__((aligned(NLMSG_ALIGNTO))) {
+    struct nlmsghdr nl_hdr;
+    struct __attribute__((__packed__)) {
+      struct cn_msg cn_msg;
+      enum proc_cn_mcast_op cn_mcast;
+    };
+  } nlcn_msg;
+
+  memset(&nlcn_msg, 0, sizeof(nlcn_msg));
+  nlcn_msg.nl_hdr.nlmsg_len = sizeof(nlcn_msg);
+  nlcn_msg.nl_hdr.nlmsg_pid = getpid();
+  nlcn_msg.nl_hdr.nlmsg_type = NLMSG_DONE;
+
+  nlcn_msg.cn_msg.id.idx = CN_IDX_PROC;
+  nlcn_msg.cn_msg.id.val = CN_VAL_PROC;
+  nlcn_msg.cn_msg.len = sizeof(enum proc_cn_mcast_op);
+
+  nlcn_msg.cn_mcast = enable ? PROC_CN_MCAST_LISTEN : PROC_CN_MCAST_IGNORE;
+
+  int rc = send(nl_sock, &nlcn_msg, sizeof(nlcn_msg), 0);
+  if (rc == -1) {
+    ERROR("procevent plugin: subscribing to netlink process events failed: %d",
+          errno);
+    return -1;
+  }
+
+  return 0;
+}
+
+// Read from netlink socket and write to ring buffer
+static int read_event() {
+  int recv_flags = MSG_DONTWAIT;
+  struct __attribute__((aligned(NLMSG_ALIGNTO))) {
+    struct nlmsghdr nl_hdr;
+    struct __attribute__((__packed__)) {
+      struct cn_msg cn_msg;
+      struct proc_event proc_ev;
+    };
+  } nlcn_msg;
+
+  if (nl_sock == -1)
+    return 0;
+
+  while (42) {
+    pthread_mutex_lock(&procevent_thread_lock);
+
+    if (procevent_netlink_thread_loop <= 0) {
+      pthread_mutex_unlock(&procevent_thread_lock);
+      return 0;
+    }
+
+    pthread_mutex_unlock(&procevent_thread_lock);
+
+    int status = recv(nl_sock, &nlcn_msg, sizeof(nlcn_msg), recv_flags);
+
+    if (status == 0) {
+      return 0;
+    } else if (status < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        pthread_mutex_lock(&procevent_data_lock);
+
+        // There was nothing more to receive for now, so...
+        // If ring head does not equal ring tail, then there is data
+        // in the ring buffer for the dequeue thread to read, so
+        // signal it
+        if (ring.head != ring.tail)
+          pthread_cond_signal(&procevent_cond);
+
+        pthread_mutex_unlock(&procevent_data_lock);
+
+        // Since there was nothing to receive, set recv to block and
+        // try again
+        recv_flags = 0;
+        continue;
+      } else if (errno != EINTR) {
+        ERROR("procevent plugin: socket receive error: %d", errno);
+        return -1;
+      } else {
+        // Interrupt, so just continue and try again
+        continue;
+      }
+    }
+
+    // We successfully received a message, so don't block on the next
+    // read in case there are more (and if there aren't, it will be
+    // handled above in the EWOULDBLOCK error-checking)
+    recv_flags = MSG_DONTWAIT;
+
+    int proc_id = -1;
+    int proc_status = -1;
+
+    switch (nlcn_msg.proc_ev.what) {
+    case PROC_EVENT_EXEC:
+      proc_status = PROCEVENT_STARTED;
+      proc_id = nlcn_msg.proc_ev.event_data.exec.process_pid;
+      break;
+    case PROC_EVENT_EXIT:
+      proc_id = nlcn_msg.proc_ev.event_data.exit.process_pid;
+      proc_status = PROCEVENT_EXITED;
+      break;
+    default:
+      // Otherwise not of interest
+      break;
+    }
+
+    // If we're interested in this process status event, place the event
+    // in the ring buffer for consumption by the dequeue (dispatch) thread.
+
+    if (proc_status != -1) {
+      pthread_mutex_lock(&procevent_data_lock);
+
+      int next = ring.head + 1;
+      if (next >= ring.maxLen)
+        next = 0;
+
+      if (next == ring.tail) {
+        // Buffer is full, signal the dequeue thread to process the buffer
+        // and clean it out, and then sleep
+        WARNING("procevent plugin: ring buffer full");
+
+        pthread_cond_signal(&procevent_cond);
+        pthread_mutex_unlock(&procevent_data_lock);
+
+        usleep(1000);
+        continue;
+      } else {
+        DEBUG("procevent plugin: Process %d status is now %s at %llu", proc_id,
+              (proc_status == PROCEVENT_EXITED ? "EXITED" : "STARTED"),
+              (unsigned long long)cdtime());
+
+        ring.buffer[ring.head][RBUF_PROC_ID_INDEX] = proc_id;
+        ring.buffer[ring.head][RBUF_PROC_STATUS_INDEX] = proc_status;
+        ring.buffer[ring.head][RBUF_TIME_INDEX] = cdtime();
+
+        ring.head = next;
+      }
+
+      pthread_mutex_unlock(&procevent_data_lock);
+    }
+  }
+
+  return 0;
+}
+
+static void procevent_dispatch_notification(long pid, gauge_t value,
+                                            char *process, cdtime_t timestamp) {
+
+  notification_t n = {
+      .severity = (value == 1 ? NOTIF_OKAY : NOTIF_FAILURE),
+      .time = cdtime(),
+      .plugin = "procevent",
+      .type = "gauge",
+      .type_instance = "process_status",
+  };
+
+  sstrncpy(n.host, hostname_g, sizeof(n.host));
+  sstrncpy(n.plugin_instance, process, sizeof(n.plugin_instance));
+
+  char *buf = NULL;
+  gen_message_payload(value, pid, process, timestamp, &buf);
+
+  int status = plugin_notification_meta_add_string(&n, "ves", buf);
+
+  if (status < 0) {
+    sfree(buf);
+    ERROR("procevent plugin: unable to set notification VES metadata: %s",
+          STRERRNO);
+    return;
+  }
+
+  DEBUG("procevent plugin: notification VES metadata: %s",
+        n.meta->nm_value.nm_string);
+
+  DEBUG("procevent plugin: dispatching state %d for PID %ld (%s)", (int)value,
+        pid, process);
+
+  plugin_dispatch_notification(&n);
+  plugin_notification_meta_free(n.meta);
+
+  // strdup'd in gen_message_payload
+  if (buf != NULL)
+    sfree(buf);
+}
+
+// Read from ring buffer and dispatch to write plugins
+static void read_ring_buffer() {
+  pthread_mutex_lock(&procevent_data_lock);
+
+  // If there's currently nothing to read from the buffer,
+  // then wait
+  if (ring.head == ring.tail)
+    pthread_cond_wait(&procevent_cond, &procevent_data_lock);
+
+  while (ring.head != ring.tail) {
+    int next = ring.tail + 1;
+
+    if (next >= ring.maxLen)
+      next = 0;
+
+    if (ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX] == PROCEVENT_EXITED) {
+      processlist_t *pl = process_map_check(ring.buffer[ring.tail][0], NULL);
+
+      if (pl != NULL) {
+        // This process is of interest to us, so publish its EXITED status
+        procevent_dispatch_notification(
+            ring.buffer[ring.tail][RBUF_PROC_ID_INDEX],
+            ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX], pl->process,
+            ring.buffer[ring.tail][RBUF_TIME_INDEX]);
+        DEBUG(
+            "procevent plugin: PID %ld (%s) EXITED, removing PID from process "
+            "list",
+            pl->pid, pl->process);
+        pl->pid = -1;
+        pl->last_status = -1;
+      }
+    } else if (ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX] ==
+               PROCEVENT_STARTED) {
+      // a new process has started, so check if we should monitor it
+      processlist_t *pl = process_check(ring.buffer[ring.tail][0]);
+
+      // If we had already seen this process name and pid combo before,
+      // and the last message was a "process started" message, don't send
+      // the notfication again
+
+      if (pl != NULL && pl->last_status != PROCEVENT_STARTED) {
+        // This process is of interest to us, so publish its STARTED status
+        procevent_dispatch_notification(
+            ring.buffer[ring.tail][RBUF_PROC_ID_INDEX],
+            ring.buffer[ring.tail][RBUF_PROC_STATUS_INDEX], pl->process,
+            ring.buffer[ring.tail][RBUF_TIME_INDEX]);
+
+        pl->last_status = PROCEVENT_STARTED;
+
+        DEBUG("procevent plugin: PID %ld (%s) STARTED, adding PID to process "
+              "list",
+              pl->pid, pl->process);
+      }
+    }
+
+    ring.tail = next;
+  }
+
+  pthread_mutex_unlock(&procevent_data_lock);
+}
+
+// Entry point for thread responsible for listening
+// to netlink socket and writing data to ring buffer
+static void *procevent_netlink_thread(void *arg) /* {{{ */
+{
+  pthread_mutex_lock(&procevent_thread_lock);
+
+  while (procevent_netlink_thread_loop > 0) {
+    pthread_mutex_unlock(&procevent_thread_lock);
+
+    int status = read_event();
+
+    pthread_mutex_lock(&procevent_thread_lock);
+
+    if (status < 0) {
+      procevent_netlink_thread_error = 1;
+      break;
+    }
+  } /* while (procevent_netlink_thread_loop > 0) */
+
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  return (void *)0;
+} /* }}} void *procevent_netlink_thread */
+
+// Entry point for thread responsible for reading from
+// ring buffer and dispatching notifications
+static void *procevent_dequeue_thread(void *arg) /* {{{ */
+{
+  pthread_mutex_lock(&procevent_thread_lock);
+
+  while (procevent_dequeue_thread_loop > 0) {
+    pthread_mutex_unlock(&procevent_thread_lock);
+
+    read_ring_buffer();
+
+    pthread_mutex_lock(&procevent_thread_lock);
+  } /* while (procevent_dequeue_thread_loop > 0) */
+
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  return (void *)0;
+} /* }}} void *procevent_dequeue_thread */
+
+static int start_netlink_thread(void) /* {{{ */
+{
+  pthread_mutex_lock(&procevent_thread_lock);
+
+  if (procevent_netlink_thread_loop != 0) {
+    pthread_mutex_unlock(&procevent_thread_lock);
+    return 0;
+  }
+
+  int status;
+
+  if (nl_sock == -1) {
+    status = nl_connect();
+
+    if (status != 0) {
+      pthread_mutex_unlock(&procevent_thread_lock);
+      return status;
+    }
+
+    status = set_proc_ev_listen(true);
+    if (status == -1) {
+      pthread_mutex_unlock(&procevent_thread_lock);
+      return status;
+    }
+  }
+
+  DEBUG("procevent plugin: socket created and bound");
+
+  procevent_netlink_thread_loop = 1;
+  procevent_netlink_thread_error = 0;
+
+  status = plugin_thread_create(&procevent_netlink_thread_id, /* attr = */ NULL,
+                                procevent_netlink_thread,
+                                /* arg = */ (void *)0, "procevent");
+  if (status != 0) {
+    procevent_netlink_thread_loop = 0;
+    ERROR("procevent plugin: Starting netlink thread failed.");
+    pthread_mutex_unlock(&procevent_thread_lock);
+
+    int status2 = close(nl_sock);
+
+    if (status2 != 0) {
+      ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
+            status2, STRERRNO);
+    }
+
+    nl_sock = -1;
+
+    return -1;
+  }
+
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  return status;
+} /* }}} int start_netlink_thread */
+
+static int start_dequeue_thread(void) /* {{{ */
+{
+  pthread_mutex_lock(&procevent_thread_lock);
+
+  if (procevent_dequeue_thread_loop != 0) {
+    pthread_mutex_unlock(&procevent_thread_lock);
+    return 0;
+  }
+
+  procevent_dequeue_thread_loop = 1;
+
+  int status = plugin_thread_create(&procevent_dequeue_thread_id,
+                                    /* attr = */ NULL, procevent_dequeue_thread,
+                                    /* arg = */ (void *)0, "procevent");
+  if (status != 0) {
+    procevent_dequeue_thread_loop = 0;
+    ERROR("procevent plugin: Starting dequeue thread failed.");
+    pthread_mutex_unlock(&procevent_thread_lock);
+    return -1;
+  }
+
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  return status;
+} /* }}} int start_dequeue_thread */
+
+static int start_threads(void) /* {{{ */
+{
+  int status = start_netlink_thread();
+  int status2 = start_dequeue_thread();
+
+  if (status != 0)
+    return status;
+  else
+    return status2;
+} /* }}} int start_threads */
+
+static int stop_netlink_thread(int shutdown) /* {{{ */
+{
+  int socket_status;
+
+  if (nl_sock != -1) {
+    socket_status = close(nl_sock);
+    if (socket_status != 0) {
+      ERROR("procevent plugin: failed to close socket %d: %d (%s)", nl_sock,
+            socket_status, strerror(errno));
+    }
+
+    nl_sock = -1;
+  } else
+    socket_status = 0;
+
+  pthread_mutex_lock(&procevent_thread_lock);
+
+  if (procevent_netlink_thread_loop == 0) {
+    pthread_mutex_unlock(&procevent_thread_lock);
+    return -1;
+  }
+
+  // Set thread termination status
+  procevent_netlink_thread_loop = 0;
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  // Let threads waiting on access to the data know to move
+  // on such that they'll see the thread's termination status
+  pthread_cond_broadcast(&procevent_cond);
+
+  int thread_status;
+
+  if (shutdown == 1) {
+    // Calling pthread_cancel here in
+    // the case of a shutdown just assures that the thread is
+    // gone and that the process has been fully terminated.
+
+    DEBUG("procevent plugin: Canceling netlink thread for process shutdown");
+
+    thread_status = pthread_cancel(procevent_netlink_thread_id);
+
+    if (thread_status != 0 && thread_status != ESRCH) {
+      ERROR("procevent plugin: Unable to cancel netlink thread: %d",
+            thread_status);
+      thread_status = -1;
+    } else
+      thread_status = 0;
+  } else {
+    thread_status =
+        pthread_join(procevent_netlink_thread_id, /* return = */ NULL);
+    if (thread_status != 0 && thread_status != ESRCH) {
+      ERROR("procevent plugin: Stopping netlink thread failed.");
+      thread_status = -1;
+    } else
+      thread_status = 0;
+  }
+
+  pthread_mutex_lock(&procevent_thread_lock);
+  memset(&procevent_netlink_thread_id, 0, sizeof(procevent_netlink_thread_id));
+  procevent_netlink_thread_error = 0;
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  DEBUG("procevent plugin: Finished requesting stop of netlink thread");
+
+  if (socket_status != 0)
+    return socket_status;
+  else
+    return thread_status;
+} /* }}} int stop_netlink_thread */
+
+static int stop_dequeue_thread() /* {{{ */
+{
+  pthread_mutex_lock(&procevent_thread_lock);
+
+  if (procevent_dequeue_thread_loop == 0) {
+    pthread_mutex_unlock(&procevent_thread_lock);
+    return -1;
+  }
+
+  procevent_dequeue_thread_loop = 0;
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  pthread_cond_broadcast(&procevent_cond);
+
+  // Calling pthread_cancel here just assures that the thread is
+  // gone and that the process has been fully terminated.
+
+  DEBUG("procevent plugin: Canceling dequeue thread for process shutdown");
+
+  int status = pthread_cancel(procevent_dequeue_thread_id);
+
+  if (status != 0 && status != ESRCH) {
+    ERROR("procevent plugin: Unable to cancel dequeue thread: %d", status);
+    status = -1;
+  } else
+    status = 0;
+
+  pthread_mutex_lock(&procevent_thread_lock);
+  memset(&procevent_dequeue_thread_id, 0, sizeof(procevent_dequeue_thread_id));
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  DEBUG("procevent plugin: Finished requesting stop of dequeue thread");
+
+  return status;
+} /* }}} int stop_dequeue_thread */
+
+static int stop_threads() /* {{{ */
+{
+  int status = stop_netlink_thread(1);
+  int status2 = stop_dequeue_thread();
+
+  if (status != 0)
+    return status;
+  else
+    return status2;
+} /* }}} int stop_threads */
+
+static int procevent_init(void) /* {{{ */
+{
+  ring.head = 0;
+  ring.tail = 0;
+  ring.maxLen = buffer_length;
+  ring.buffer = (cdtime_t **)calloc(buffer_length, sizeof(cdtime_t *));
+
+  for (int i = 0; i < buffer_length; i++) {
+    ring.buffer[i] = (cdtime_t *)calloc(PROCEVENT_FIELDS, sizeof(cdtime_t));
+  }
+
+  int status = process_map_refresh();
+
+  if (status == -1) {
+    ERROR("procevent plugin: Initial process mapping failed.");
+    return -1;
+  }
+
+  if (ignorelist == NULL) {
+    NOTICE("procevent plugin: No processes have been configured.");
+    return -1;
+  }
+
+  return start_threads();
+} /* }}} int procevent_init */
+
+static int procevent_config(const char *key, const char *value) /* {{{ */
+{
+  if (ignorelist == NULL)
+    ignorelist = ignorelist_create(/* invert = */ 1);
+
+  if (ignorelist == NULL) {
+    return -1;
+  }
+
+  if (strcasecmp(key, "BufferLength") == 0) {
+    buffer_length = atoi(value);
+  } else if (strcasecmp(key, "Process") == 0) {
+    ignorelist_add(ignorelist, value);
+  } else if (strcasecmp(key, "ProcessRegex") == 0) {
+#if HAVE_REGEX_H
+    int status = ignorelist_add(ignorelist, value);
+
+    if (status != 0) {
+      ERROR("procevent plugin: invalid regular expression: %s", value);
+      return 1;
+    }
+#else
+    WARNING("procevent plugin: The plugin has been compiled without support "
+            "for the \"ProcessRegex\" option.");
+#endif
+  } else {
+    return -1;
+  }
+
+  return 0;
+} /* }}} int procevent_config */
+
+static int procevent_read(void) /* {{{ */
+{
+  pthread_mutex_lock(&procevent_thread_lock);
+
+  if (procevent_netlink_thread_error != 0) {
+
+    pthread_mutex_unlock(&procevent_thread_lock);
+
+    ERROR("procevent plugin: The netlink thread had a problem. Restarting it.");
+
+    stop_netlink_thread(0);
+
+    start_netlink_thread();
+
+    return -1;
+  } /* if (procevent_netlink_thread_error != 0) */
+
+  pthread_mutex_unlock(&procevent_thread_lock);
+
+  return 0;
+} /* }}} int procevent_read */
+
+static int procevent_shutdown(void) /* {{{ */
+{
+  DEBUG("procevent plugin: Shutting down threads.");
+
+  int status = stop_threads();
+
+  for (int i = 0; i < buffer_length; i++) {
+    free(ring.buffer[i]);
+  }
+
+  free(ring.buffer);
+
+  processlist_t *pl = processlist_head;
+  while (pl != NULL) {
+    processlist_t *pl_next;
+
+    pl_next = pl->next;
+
+    sfree(pl->process);
+    sfree(pl);
+
+    pl = pl_next;
+  }
+
+  ignorelist_free(ignorelist);
+
+  return status;
+} /* }}} int procevent_shutdown */
+
+void module_register(void) {
+  plugin_register_config("procevent", procevent_config, config_keys,
+                         config_keys_num);
+  plugin_register_init("procevent", procevent_init);
+  plugin_register_read("procevent", procevent_read);
+  plugin_register_shutdown("procevent", procevent_shutdown);
+} /* void module_register */
diff --git a/src/sysevent.c b/src/sysevent.c
new file mode 100644 (file)
index 0000000..7f9aa9f
--- /dev/null
@@ -0,0 +1,1148 @@
+/**
+ * collectd - src/sysevent.c
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Red Hat NFVPE
+ *     Andrew Bays <abays at redhat.com>
+ **/
+
+#include "collectd.h"
+
+#include "plugin.h"
+#include "utils/common/common.h"
+#include "utils/ignorelist/ignorelist.h"
+#include "utils_complain.h"
+
+#include <errno.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <pthread.h>
+#include <regex.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <yajl/yajl_common.h>
+#include <yajl/yajl_gen.h>
+
+#if HAVE_YAJL_YAJL_VERSION_H
+#include <yajl/yajl_version.h>
+#endif
+#if defined(YAJL_MAJOR) && (YAJL_MAJOR > 1)
+#include <yajl/yajl_tree.h>
+#define HAVE_YAJL_V2 1
+#endif
+
+#define SYSEVENT_DOMAIN_FIELD "domain"
+#define SYSEVENT_DOMAIN_VALUE "syslog"
+#define SYSEVENT_EVENT_ID_FIELD "eventId"
+#define SYSEVENT_EVENT_NAME_FIELD "eventName"
+#define SYSEVENT_EVENT_NAME_VALUE "syslog message"
+#define SYSEVENT_LAST_EPOCH_MICROSEC_FIELD "lastEpochMicrosec"
+#define SYSEVENT_PRIORITY_FIELD "priority"
+#define SYSEVENT_PRIORITY_VALUE_HIGH "high"
+#define SYSEVENT_PRIORITY_VALUE_LOW "low"
+#define SYSEVENT_PRIORITY_VALUE_MEDIUM "medium"
+#define SYSEVENT_PRIORITY_VALUE_NORMAL "normal"
+#define SYSEVENT_PRIORITY_VALUE_UNKNOWN "unknown"
+#define SYSEVENT_REPORTING_ENTITY_NAME_FIELD "reportingEntityName"
+#define SYSEVENT_REPORTING_ENTITY_NAME_VALUE "collectd sysevent plugin"
+#define SYSEVENT_SEQUENCE_FIELD "sequence"
+#define SYSEVENT_SEQUENCE_VALUE "0"
+#define SYSEVENT_SOURCE_NAME_FIELD "sourceName"
+#define SYSEVENT_SOURCE_NAME_VALUE "syslog"
+#define SYSEVENT_START_EPOCH_MICROSEC_FIELD "startEpochMicrosec"
+#define SYSEVENT_VERSION_FIELD "version"
+#define SYSEVENT_VERSION_VALUE "1.0"
+
+#define SYSEVENT_EVENT_SOURCE_HOST_FIELD "eventSourceHost"
+#define SYSEVENT_EVENT_SOURCE_TYPE_FIELD "eventSourceType"
+#define SYSEVENT_EVENT_SOURCE_TYPE_VALUE "host"
+#define SYSEVENT_SYSLOG_FIELDS_FIELD "syslogFields"
+#define SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD "syslogFieldsVersion"
+#define SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE "1.0"
+#define SYSEVENT_SYSLOG_MSG_FIELD "syslogMsg"
+#define SYSEVENT_SYSLOG_PROC_FIELD "syslogProc"
+#define SYSEVENT_SYSLOG_SEV_FIELD "syslogSev"
+#define SYSEVENT_SYSLOG_TAG_FIELD "syslogTag"
+#define SYSEVENT_SYSLOG_TAG_VALUE "NILVALUE"
+
+/*
+ * Private data types
+ */
+
+typedef struct {
+  int head;
+  int tail;
+  int maxLen;
+  char **buffer;
+  cdtime_t *timestamp;
+} circbuf_t;
+
+/*
+ * Private variables
+ */
+
+static ignorelist_t *ignorelist = NULL;
+
+static int sysevent_socket_thread_loop = 0;
+static int sysevent_socket_thread_error = 0;
+static pthread_t sysevent_socket_thread_id;
+static int sysevent_dequeue_thread_loop = 0;
+static pthread_t sysevent_dequeue_thread_id;
+static pthread_mutex_t sysevent_thread_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_mutex_t sysevent_data_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t sysevent_cond = PTHREAD_COND_INITIALIZER;
+static int sock = -1;
+static int event_id = 0;
+static circbuf_t ring;
+
+static char *listen_ip;
+static char *listen_port;
+static int listen_buffer_size = 4096;
+static int buffer_length = 10;
+
+static int monitor_all_messages = 1;
+
+#if HAVE_YAJL_V2
+static const char *rsyslog_keys[3] = {"@timestamp", "@source_host", "@message"};
+static const char *rsyslog_field_keys[5] = {
+    "facility", "severity", "severity-num", "program", "processid"};
+#endif
+
+/*
+ * Private functions
+ */
+
+static int gen_message_payload(const char *msg, char *sev, int sev_num,
+                               char *process, char *host, cdtime_t timestamp,
+                               char **buf) {
+  const unsigned char *buf2;
+  yajl_gen g;
+  char json_str[DATA_MAX_NAME_LEN];
+
+#if !defined(HAVE_YAJL_V2)
+  yajl_gen_config conf = {0};
+#endif
+
+#if HAVE_YAJL_V2
+  size_t len;
+  g = yajl_gen_alloc(NULL);
+  yajl_gen_config(g, yajl_gen_beautify, 0);
+#else
+  unsigned int len;
+  g = yajl_gen_alloc(&conf, NULL);
+#endif
+
+  yajl_gen_clear(g);
+
+  // *** BEGIN common event header ***
+
+  if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+    goto err;
+
+  // domain
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_FIELD,
+                      strlen(SYSEVENT_DOMAIN_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_DOMAIN_VALUE,
+                      strlen(SYSEVENT_DOMAIN_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // eventId
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_ID_FIELD,
+                      strlen(SYSEVENT_EVENT_ID_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  event_id = event_id + 1;
+  snprintf(json_str, sizeof(json_str), "%d", event_id);
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // eventName
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_NAME_FIELD,
+                      strlen(SYSEVENT_EVENT_NAME_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  snprintf(json_str, sizeof(json_str), "host %s rsyslog message", host);
+
+  if (yajl_gen_string(g, (u_char *)json_str, strlen(json_str)) !=
+      yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // lastEpochMicrosec
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_LAST_EPOCH_MICROSEC_FIELD,
+                      strlen(SYSEVENT_LAST_EPOCH_MICROSEC_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(cdtime()));
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // priority
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_FIELD,
+                      strlen(SYSEVENT_PRIORITY_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  switch (sev_num) {
+  case 4:
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_MEDIUM,
+                        strlen(SYSEVENT_PRIORITY_VALUE_MEDIUM)) !=
+        yajl_gen_status_ok)
+      goto err;
+    break;
+  case 5:
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_NORMAL,
+                        strlen(SYSEVENT_PRIORITY_VALUE_NORMAL)) !=
+        yajl_gen_status_ok)
+      goto err;
+    break;
+  case 6:
+  case 7:
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_LOW,
+                        strlen(SYSEVENT_PRIORITY_VALUE_LOW)) !=
+        yajl_gen_status_ok)
+      goto err;
+    break;
+  default:
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_PRIORITY_VALUE_UNKNOWN,
+                        strlen(SYSEVENT_PRIORITY_VALUE_UNKNOWN)) !=
+        yajl_gen_status_ok)
+      goto err;
+    break;
+  }
+
+  // reportingEntityName
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_FIELD,
+                      strlen(SYSEVENT_REPORTING_ENTITY_NAME_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_REPORTING_ENTITY_NAME_VALUE,
+                      strlen(SYSEVENT_REPORTING_ENTITY_NAME_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // sequence
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SEQUENCE_FIELD,
+                      strlen(SYSEVENT_SEQUENCE_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, SYSEVENT_SEQUENCE_VALUE,
+                      strlen(SYSEVENT_SEQUENCE_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // sourceName
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_FIELD,
+                      strlen(SYSEVENT_SOURCE_NAME_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SOURCE_NAME_VALUE,
+                      strlen(SYSEVENT_SOURCE_NAME_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // startEpochMicrosec
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_START_EPOCH_MICROSEC_FIELD,
+                      strlen(SYSEVENT_START_EPOCH_MICROSEC_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  snprintf(json_str, sizeof(json_str), "%" PRIu64, CDTIME_T_TO_US(timestamp));
+
+  if (yajl_gen_number(g, json_str, strlen(json_str)) != yajl_gen_status_ok) {
+    goto err;
+  }
+
+  // version
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_VERSION_FIELD,
+                      strlen(SYSEVENT_VERSION_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, SYSEVENT_VERSION_VALUE,
+                      strlen(SYSEVENT_VERSION_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // *** END common event header ***
+
+  // *** BEGIN syslog fields ***
+
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_FIELD,
+                      strlen(SYSEVENT_SYSLOG_FIELDS_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_map_open(g) != yajl_gen_status_ok)
+    goto err;
+
+  // eventSourceHost
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_HOST_FIELD,
+                      strlen(SYSEVENT_EVENT_SOURCE_HOST_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)host, strlen(host)) != yajl_gen_status_ok)
+    goto err;
+
+  // eventSourceType
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_FIELD,
+                      strlen(SYSEVENT_EVENT_SOURCE_TYPE_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_EVENT_SOURCE_TYPE_VALUE,
+                      strlen(SYSEVENT_EVENT_SOURCE_TYPE_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // syslogFieldsVersion
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD,
+                      strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_FIELD)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_number(g, SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE,
+                      strlen(SYSEVENT_SYSLOG_FIELDS_VERSION_VALUE)) !=
+      yajl_gen_status_ok)
+    goto err;
+
+  // syslogMsg
+  if (msg != NULL) {
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_MSG_FIELD,
+                        strlen(SYSEVENT_SYSLOG_MSG_FIELD)) !=
+        yajl_gen_status_ok)
+      goto err;
+
+    if (yajl_gen_string(g, (u_char *)msg, strlen(msg)) != yajl_gen_status_ok)
+      goto err;
+  }
+
+  // syslogProc
+  if (process != NULL) {
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_PROC_FIELD,
+                        strlen(SYSEVENT_SYSLOG_PROC_FIELD)) !=
+        yajl_gen_status_ok)
+      goto err;
+
+    if (yajl_gen_string(g, (u_char *)process, strlen(process)) !=
+        yajl_gen_status_ok)
+      goto err;
+  }
+
+  // syslogSev
+  if (sev != NULL) {
+    if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_SEV_FIELD,
+                        strlen(SYSEVENT_SYSLOG_SEV_FIELD)) !=
+        yajl_gen_status_ok)
+      goto err;
+
+    if (yajl_gen_string(g, (u_char *)sev, strlen(sev)) != yajl_gen_status_ok)
+      goto err;
+  }
+
+  // syslogTag
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_FIELD,
+                      strlen(SYSEVENT_SYSLOG_TAG_FIELD)) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_string(g, (u_char *)SYSEVENT_SYSLOG_TAG_VALUE,
+                      strlen(SYSEVENT_SYSLOG_TAG_VALUE)) != yajl_gen_status_ok)
+    goto err;
+
+  // *** END syslog fields ***
+
+  // close syslog and header fields
+  if (yajl_gen_map_close(g) != yajl_gen_status_ok ||
+      yajl_gen_map_close(g) != yajl_gen_status_ok)
+    goto err;
+
+  if (yajl_gen_get_buf(g, &buf2, &len) != yajl_gen_status_ok)
+    goto err;
+
+  *buf = strdup((char *)buf2);
+
+  if (*buf == NULL) {
+    ERROR("sysevent plugin: gen_message_payload strdup failed");
+    goto err;
+  }
+
+  yajl_gen_free(g);
+
+  return 0;
+
+err:
+  yajl_gen_free(g);
+  ERROR("sysevent plugin: gen_message_payload failed to generate JSON");
+  return -1;
+}
+
+static int read_socket() {
+  int recv_flags = MSG_DONTWAIT;
+
+  while (42) {
+    struct sockaddr_storage src_addr;
+    socklen_t src_addr_len = sizeof(src_addr);
+
+    char buffer[listen_buffer_size];
+    memset(buffer, '\0', listen_buffer_size);
+
+    ssize_t count = recvfrom(sock, buffer, sizeof(buffer), recv_flags,
+                             (struct sockaddr *)&src_addr, &src_addr_len);
+
+    if (count < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        pthread_mutex_lock(&sysevent_data_lock);
+
+        // There was nothing more to receive for now, so...
+        // If ring head does not equal ring tail, there is data
+        // in the ring buffer for the dequeue thread to read, so
+        // signal it
+        if (ring.head != ring.tail)
+          pthread_cond_signal(&sysevent_cond);
+
+        pthread_mutex_unlock(&sysevent_data_lock);
+
+        // Since there was nothing to receive, set recv to block and
+        // try again
+        recv_flags = 0;
+        continue;
+      } else if (errno != EINTR) {
+        ERROR("sysevent plugin: failed to receive data: %s", STRERRNO);
+        return -1;
+      } else {
+        // Interrupt, so continue and try again
+        continue;
+      }
+    }
+
+    if (count >= sizeof(buffer)) {
+      WARNING("sysevent plugin: datagram too large for buffer: truncated");
+    }
+
+    // We successfully received a message, so don't block on the next
+    // read in case there are more (and if there aren't, it will be
+    // handled above in the EWOULDBLOCK error-checking)
+    recv_flags = MSG_DONTWAIT;
+
+    // 1. Acquire data lock
+    // 2. Push to buffer if there is room, otherwise raise warning
+    //    and allow dequeue thread to take over
+
+    pthread_mutex_lock(&sysevent_data_lock);
+
+    int next = ring.head + 1;
+    if (next >= ring.maxLen)
+      next = 0;
+
+    if (next == ring.tail) {
+      // Buffer is full, signal the dequeue thread to process the buffer
+      // and clean it out, and then sleep
+      WARNING("sysevent plugin: ring buffer full");
+
+      pthread_cond_signal(&sysevent_cond);
+      pthread_mutex_unlock(&sysevent_data_lock);
+
+      usleep(1000);
+      continue;
+    } else {
+      DEBUG("sysevent plugin: writing %s", buffer);
+
+      sstrncpy(ring.buffer[ring.head], buffer, sizeof(buffer));
+      ring.timestamp[ring.head] = cdtime();
+      ring.head = next;
+    }
+
+    pthread_mutex_unlock(&sysevent_data_lock);
+  }
+}
+
+static void sysevent_dispatch_notification(const char *message,
+#if HAVE_YAJL_V2
+                                           yajl_val *node,
+#endif
+                                           cdtime_t timestamp) {
+  char *buf = NULL;
+
+  notification_t n = {
+      .severity = NOTIF_OKAY,
+      .time = cdtime(),
+      .plugin = "sysevent",
+      .type = "gauge",
+  };
+
+#if HAVE_YAJL_V2
+  if (node != NULL) {
+    // If we have a parsed-JSON node to work with, use that
+    // msg
+    const char *msg_path[] = {rsyslog_keys[2], (const char *)0};
+    yajl_val msg_v = yajl_tree_get(*node, msg_path, yajl_t_string);
+
+    char msg[listen_buffer_size];
+
+    if (msg_v != NULL) {
+      memset(msg, '\0', listen_buffer_size);
+      snprintf(msg, listen_buffer_size, "%s%c", YAJL_GET_STRING(msg_v), '\0');
+    }
+
+    // severity
+    const char *severity_path[] = {"@fields", rsyslog_field_keys[1],
+                                   (const char *)0};
+    yajl_val severity_v = yajl_tree_get(*node, severity_path, yajl_t_string);
+
+    char severity[listen_buffer_size];
+
+    if (severity_v != NULL) {
+      memset(severity, '\0', listen_buffer_size);
+      snprintf(severity, listen_buffer_size, "%s%c",
+               YAJL_GET_STRING(severity_v), '\0');
+    }
+
+    // sev_num
+    const char *sev_num_str_path[] = {"@fields", rsyslog_field_keys[2],
+                                      (const char *)0};
+    yajl_val sev_num_str_v =
+        yajl_tree_get(*node, sev_num_str_path, yajl_t_string);
+
+    char sev_num_str[listen_buffer_size];
+    int sev_num = -1;
+
+    if (sev_num_str_v != NULL) {
+      memset(sev_num_str, '\0', listen_buffer_size);
+      snprintf(sev_num_str, listen_buffer_size, "%s%c",
+               YAJL_GET_STRING(sev_num_str_v), '\0');
+
+      sev_num = atoi(sev_num_str);
+
+      if (sev_num < 4)
+        n.severity = NOTIF_FAILURE;
+    }
+
+    // process
+    const char *process_path[] = {"@fields", rsyslog_field_keys[3],
+                                  (const char *)0};
+    yajl_val process_v = yajl_tree_get(*node, process_path, yajl_t_string);
+
+    char process[listen_buffer_size];
+
+    if (process_v != NULL) {
+      memset(process, '\0', listen_buffer_size);
+      snprintf(process, listen_buffer_size, "%s%c", YAJL_GET_STRING(process_v),
+               '\0');
+    }
+
+    // hostname
+    const char *hostname_path[] = {rsyslog_keys[1], (const char *)0};
+    yajl_val hostname_v = yajl_tree_get(*node, hostname_path, yajl_t_string);
+
+    char hostname_str[listen_buffer_size];
+
+    if (hostname_v != NULL) {
+      memset(hostname_str, '\0', listen_buffer_size);
+      snprintf(hostname_str, listen_buffer_size, "%s%c",
+               YAJL_GET_STRING(hostname_v), '\0');
+    }
+
+    gen_message_payload(
+        (msg_v != NULL ? msg : NULL), (severity_v != NULL ? severity : NULL),
+        (sev_num_str_v != NULL ? sev_num : -1),
+        (process_v != NULL ? process : NULL),
+        (hostname_v != NULL ? hostname_str : hostname_g), timestamp, &buf);
+  } else {
+    // Data was not sent in JSON format, so just treat the whole log entry
+    // as the message (and we'll be unable to acquire certain data, so the
+    // payload
+    // generated below will be less informative)
+
+    gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
+  }
+#else
+  gen_message_payload(message, NULL, -1, NULL, hostname_g, timestamp, &buf);
+#endif
+
+  sstrncpy(n.host, hostname_g, sizeof(n.host));
+
+  int status = plugin_notification_meta_add_string(&n, "ves", buf);
+
+  if (status < 0) {
+    sfree(buf);
+    ERROR("sysevent plugin: unable to set notification VES metadata: %s",
+          STRERRNO);
+    return;
+  }
+
+  DEBUG("sysevent plugin: notification VES metadata: %s",
+        n.meta->nm_value.nm_string);
+
+  DEBUG("sysevent plugin: dispatching message");
+
+  plugin_dispatch_notification(&n);
+  plugin_notification_meta_free(n.meta);
+
+  // strdup'd in gen_message_payload
+  if (buf != NULL)
+    sfree(buf);
+}
+
+static void read_ring_buffer() {
+  pthread_mutex_lock(&sysevent_data_lock);
+
+  // If there's currently nothing to read from the buffer,
+  // then wait
+  if (ring.head == ring.tail)
+    pthread_cond_wait(&sysevent_cond, &sysevent_data_lock);
+
+  while (ring.head != ring.tail) {
+    int next = ring.tail + 1;
+
+    if (next >= ring.maxLen)
+      next = 0;
+
+    DEBUG("sysevent plugin: reading from ring buffer: %s",
+          ring.buffer[ring.tail]);
+
+    cdtime_t timestamp = ring.timestamp[ring.tail];
+    char *match_str = NULL;
+
+#if HAVE_YAJL_V2
+    // Try to parse JSON, and if it fails, fall back to plain string
+    char errbuf[1024];
+    errbuf[0] = 0;
+    yajl_val node = yajl_tree_parse((const char *)ring.buffer[ring.tail],
+                                    errbuf, sizeof(errbuf));
+
+    if (node != NULL) {
+      // JSON rsyslog data
+
+      // If we have any regex filters, we need to see if the message portion of
+      // the data matches any of them (otherwise we're not interested)
+      if (monitor_all_messages == 0) {
+        const char *path[] = {"@message", (const char *)0};
+        yajl_val v = yajl_tree_get(node, path, yajl_t_string);
+
+        char json_val[listen_buffer_size];
+        memset(json_val, '\0', listen_buffer_size);
+
+        snprintf(json_val, listen_buffer_size, "%s%c", YAJL_GET_STRING(v),
+                 '\0');
+
+        match_str = (char *)&json_val;
+      }
+    } else {
+      // non-JSON rsyslog data
+
+      // If we have any regex filters, we need to see if the message data
+      // matches any of them (otherwise we're not interested)
+      if (monitor_all_messages == 0)
+        match_str = ring.buffer[ring.tail];
+    }
+#else
+    // If we have any regex filters, we need to see if the message data
+    // matches any of them (otherwise we're not interested)
+    if (monitor_all_messages == 0)
+      match_str = ring.buffer[ring.tail];
+#endif
+
+    int is_match = 1;
+
+    // If we care about matching, do that comparison here
+    if (match_str != NULL) {
+      if (ignorelist_match(ignorelist, match_str) != 0)
+        is_match = 0;
+      else
+        DEBUG("sysevent plugin: regex filter match");
+    }
+
+#if HAVE_YAJL_V2
+    if (is_match == 1 && node != NULL) {
+      sysevent_dispatch_notification(NULL, &node, timestamp);
+      yajl_tree_free(node);
+    } else if (is_match == 1)
+      sysevent_dispatch_notification(ring.buffer[ring.tail], NULL, timestamp);
+#else
+    if (is_match == 1)
+      sysevent_dispatch_notification(ring.buffer[ring.tail], timestamp);
+#endif
+
+    ring.tail = next;
+  }
+
+  pthread_mutex_unlock(&sysevent_data_lock);
+}
+
+static void *sysevent_socket_thread(void *arg) /* {{{ */
+{
+  pthread_mutex_lock(&sysevent_thread_lock);
+
+  while (sysevent_socket_thread_loop > 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+
+    if (sock == -1)
+      return (void *)0;
+
+    int status = read_socket();
+
+    pthread_mutex_lock(&sysevent_thread_lock);
+
+    if (status < 0) {
+      WARNING("sysevent plugin: problem with socket thread (status: %d)",
+              status);
+      sysevent_socket_thread_error = 1;
+      break;
+    }
+  } /* while (sysevent_socket_thread_loop > 0) */
+
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  return (void *)0;
+} /* }}} void *sysevent_socket_thread */
+
+// Entry point for thread responsible for reading from
+// ring buffer and dispatching notifications
+static void *sysevent_dequeue_thread(void *arg) /* {{{ */
+{
+  pthread_mutex_lock(&sysevent_thread_lock);
+
+  while (sysevent_dequeue_thread_loop > 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+
+    read_ring_buffer();
+
+    pthread_mutex_lock(&sysevent_thread_lock);
+  } /* while (sysevent_dequeue_thread_loop > 0) */
+
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  return (void *)0;
+} /* }}} void *sysevent_dequeue_thread */
+
+static int start_socket_thread(void) /* {{{ */
+{
+  pthread_mutex_lock(&sysevent_thread_lock);
+
+  if (sysevent_socket_thread_loop != 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+    return 0;
+  }
+
+  sysevent_socket_thread_loop = 1;
+  sysevent_socket_thread_error = 0;
+
+  DEBUG("sysevent plugin: starting socket thread");
+
+  int status = plugin_thread_create(&sysevent_socket_thread_id,
+                                    /* attr = */ NULL, sysevent_socket_thread,
+                                    /* arg = */ (void *)0, "sysevent");
+  if (status != 0) {
+    sysevent_socket_thread_loop = 0;
+    ERROR("sysevent plugin: starting socket thread failed.");
+    pthread_mutex_unlock(&sysevent_thread_lock);
+    return -1;
+  }
+
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  return 0;
+} /* }}} int start_socket_thread */
+
+static int start_dequeue_thread(void) /* {{{ */
+{
+  pthread_mutex_lock(&sysevent_thread_lock);
+
+  if (sysevent_dequeue_thread_loop != 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+    return 0;
+  }
+
+  sysevent_dequeue_thread_loop = 1;
+
+  int status = plugin_thread_create(&sysevent_dequeue_thread_id,
+                                    /* attr = */ NULL, sysevent_dequeue_thread,
+                                    /* arg = */ (void *)0, "ssyevent");
+  if (status != 0) {
+    sysevent_dequeue_thread_loop = 0;
+    ERROR("sysevent plugin: Starting dequeue thread failed.");
+    pthread_mutex_unlock(&sysevent_thread_lock);
+    return -1;
+  }
+
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  return status;
+} /* }}} int start_dequeue_thread */
+
+static int start_threads(void) /* {{{ */
+{
+  int status = start_socket_thread();
+  int status2 = start_dequeue_thread();
+
+  if (status != 0)
+    return status;
+  else
+    return status2;
+} /* }}} int start_threads */
+
+static int stop_socket_thread(int shutdown) /* {{{ */
+{
+  pthread_mutex_lock(&sysevent_thread_lock);
+
+  if (sysevent_socket_thread_loop == 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+    return -1;
+  }
+
+  sysevent_socket_thread_loop = 0;
+  pthread_cond_broadcast(&sysevent_cond);
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  int status;
+
+  if (shutdown == 1) {
+    // Since the thread is blocking, calling pthread_join
+    // doesn't actually succeed in stopping it.  It will stick around
+    // until a message is received on the socket (at which
+    // it will realize that "sysevent_socket_thread_loop" is 0 and will
+    // break out of the read loop and be allowed to die).  This is
+    // fine when the process isn't supposed to be exiting, but in
+    // the case of a process shutdown, we don't want to have an
+    // idle thread hanging around.  Calling pthread_cancel here in
+    // the case of a shutdown is just assures that the thread is
+    // gone and that the process has been fully terminated.
+
+    DEBUG("sysevent plugin: Canceling socket thread for process shutdown");
+
+    status = pthread_cancel(sysevent_socket_thread_id);
+
+    if (status != 0 && status != ESRCH) {
+      ERROR("sysevent plugin: Unable to cancel socket thread: %d (%s)", status,
+            STRERRNO);
+      status = -1;
+    } else
+      status = 0;
+  } else {
+    status = pthread_join(sysevent_socket_thread_id, /* return = */ NULL);
+    if (status != 0 && status != ESRCH) {
+      ERROR("sysevent plugin: Stopping socket thread failed.");
+      status = -1;
+    } else
+      status = 0;
+  }
+
+  pthread_mutex_lock(&sysevent_thread_lock);
+  memset(&sysevent_socket_thread_id, 0, sizeof(sysevent_socket_thread_id));
+  sysevent_socket_thread_error = 0;
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  DEBUG("sysevent plugin: Finished requesting stop of socket thread");
+
+  return status;
+} /* }}} int stop_socket_thread */
+
+static int stop_dequeue_thread() /* {{{ */
+{
+  pthread_mutex_lock(&sysevent_thread_lock);
+
+  if (sysevent_dequeue_thread_loop == 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+    return -1;
+  }
+
+  sysevent_dequeue_thread_loop = 0;
+  pthread_cond_broadcast(&sysevent_cond);
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  // Since the thread is blocking, calling pthread_join
+  // doesn't actually succeed in stopping it.  It will stick around
+  // until a message is received on the socket (at which
+  // it will realize that "sysevent_dequeue_thread_loop" is 0 and will
+  // break out of the read loop and be allowed to die).  Since this
+  // function is called when the processing is exiting, we don't want to
+  // have an idle thread hanging around.  Calling pthread_cancel here
+  // just assures that the thread is gone and that the process has been
+  // fully terminated.
+
+  DEBUG("sysevent plugin: Canceling dequeue thread for process shutdown");
+
+  int status = pthread_cancel(sysevent_dequeue_thread_id);
+
+  if (status != 0 && status != ESRCH) {
+    ERROR("sysevent plugin: Unable to cancel dequeue thread: %d (%s)", status,
+          STRERRNO);
+    status = -1;
+  } else
+    status = 0;
+
+  pthread_mutex_lock(&sysevent_thread_lock);
+  memset(&sysevent_dequeue_thread_id, 0, sizeof(sysevent_dequeue_thread_id));
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  DEBUG("sysevent plugin: Finished requesting stop of dequeue thread");
+
+  return status;
+} /* }}} int stop_dequeue_thread */
+
+static int stop_threads() /* {{{ */
+{
+  int status = stop_socket_thread(1);
+  int status2 = stop_dequeue_thread();
+
+  if (status != 0)
+    return status;
+  else
+    return status2;
+} /* }}} int stop_threads */
+
+static int sysevent_init(void) /* {{{ */
+{
+  ring.head = 0;
+  ring.tail = 0;
+  ring.maxLen = buffer_length;
+  ring.buffer = (char **)calloc(buffer_length, sizeof(char *));
+
+  if (ring.buffer == NULL) {
+    ERROR("sysevent plugin: sysevent_init ring buffer calloc failed");
+    return -1;
+  }
+
+  for (int i = 0; i < buffer_length; i++) {
+    ring.buffer[i] = calloc(1, listen_buffer_size);
+
+    if (ring.buffer[i] == NULL) {
+      ERROR("sysevent plugin: sysevent_init ring buffer entry calloc failed");
+      return -1;
+    }
+  }
+
+  ring.timestamp = (cdtime_t *)calloc(buffer_length, sizeof(cdtime_t));
+
+  if (ring.timestamp == NULL) {
+    ERROR("sysevent plugin: sysevent_init ring buffer timestamp calloc failed");
+    return -1;
+  }
+
+  if (sock == -1) {
+    struct addrinfo hints = {
+        .ai_family = AF_UNSPEC,
+        .ai_socktype = SOCK_DGRAM,
+        .ai_protocol = 0,
+        .ai_flags = AI_PASSIVE | AI_ADDRCONFIG,
+    };
+    struct addrinfo *res = 0;
+
+    int err = getaddrinfo(listen_ip, listen_port, &hints, &res);
+
+    if (err != 0) {
+      ERROR("sysevent plugin: failed to resolve local socket address (err=%d)",
+            err);
+      freeaddrinfo(res);
+      return -1;
+    }
+
+    sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+    if (sock == -1) {
+      ERROR("sysevent plugin: failed to open socket: %s", STRERRNO);
+      freeaddrinfo(res);
+      return -1;
+    }
+
+    if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
+      ERROR("sysevent plugin: failed to bind socket: %s", STRERRNO);
+      freeaddrinfo(res);
+      sock = -1;
+      return -1;
+    }
+
+    freeaddrinfo(res);
+  }
+
+  DEBUG("sysevent plugin: socket created and bound");
+
+  return start_threads();
+} /* }}} int sysevent_init */
+
+static int sysevent_config_add_listen(const oconfig_item_t *ci) /* {{{ */
+{
+  if (ci->values_num != 2 || ci->values[0].type != OCONFIG_TYPE_STRING ||
+      ci->values[1].type != OCONFIG_TYPE_STRING) {
+    ERROR("sysevent plugin: The `%s' config option needs "
+          "two string arguments (ip and port).",
+          ci->key);
+    return -1;
+  }
+
+  listen_ip = strdup(ci->values[0].value.string);
+  listen_port = strdup(ci->values[1].value.string);
+
+  return 0;
+}
+
+static int sysevent_config_add_buffer_size(const oconfig_item_t *ci) /* {{{ */
+{
+  int tmp = 0;
+
+  if (cf_util_get_int(ci, &tmp) != 0)
+    return -1;
+  else if ((tmp >= 1024) && (tmp <= 65535))
+    listen_buffer_size = tmp;
+  else {
+    WARNING(
+        "sysevent plugin: The `BufferSize' must be between 1024 and 65535.");
+    return -1;
+  }
+
+  return 0;
+}
+
+static int sysevent_config_add_buffer_length(const oconfig_item_t *ci) /* {{{ */
+{
+  int tmp = 0;
+
+  if (cf_util_get_int(ci, &tmp) != 0)
+    return -1;
+  else if ((tmp >= 3) && (tmp <= 4096))
+    buffer_length = tmp;
+  else {
+    WARNING("sysevent plugin: The `Bufferlength' must be between 3 and 4096.");
+    return -1;
+  }
+
+  return 0;
+}
+
+static int sysevent_config_add_regex_filter(const oconfig_item_t *ci) /* {{{ */
+{
+  if (ci->values_num != 1 || ci->values[0].type != OCONFIG_TYPE_STRING) {
+    ERROR("sysevent plugin: The `%s' config option needs "
+          "one string argument, a regular expression.",
+          ci->key);
+    return -1;
+  }
+
+#if HAVE_REGEX_H
+  if (ignorelist == NULL)
+    ignorelist = ignorelist_create(/* invert = */ 1);
+
+  int status = ignorelist_add(ignorelist, ci->values[0].value.string);
+
+  if (status != 0) {
+    ERROR("sysevent plugin: invalid regular expression: %s",
+          ci->values[0].value.string);
+    return 1;
+  }
+
+  monitor_all_messages = 0;
+#else
+  WARNING("sysevent plugin: The plugin has been compiled without support "
+          "for the \"RegexFilter\" option.");
+#endif
+
+  return 0;
+}
+
+static int sysevent_config(oconfig_item_t *ci) /* {{{ */
+{
+  for (int i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("Listen", child->key) == 0)
+      sysevent_config_add_listen(child);
+    else if (strcasecmp("BufferSize", child->key) == 0)
+      sysevent_config_add_buffer_size(child);
+    else if (strcasecmp("BufferLength", child->key) == 0)
+      sysevent_config_add_buffer_length(child);
+    else if (strcasecmp("RegexFilter", child->key) == 0)
+      sysevent_config_add_regex_filter(child);
+    else {
+      WARNING("sysevent plugin: Option `%s' is not allowed here.", child->key);
+    }
+  }
+
+  return 0;
+} /* }}} int sysevent_config */
+
+static int sysevent_read(void) /* {{{ */
+{
+  pthread_mutex_lock(&sysevent_thread_lock);
+
+  if (sysevent_socket_thread_error != 0) {
+    pthread_mutex_unlock(&sysevent_thread_lock);
+
+    ERROR("sysevent plugin: The sysevent socket thread had a problem (%d). "
+          "Restarting it.",
+          sysevent_socket_thread_error);
+
+    stop_threads();
+
+    start_threads();
+
+    return -1;
+  } /* if (sysevent_socket_thread_error != 0) */
+
+  pthread_mutex_unlock(&sysevent_thread_lock);
+
+  return 0;
+} /* }}} int sysevent_read */
+
+static int sysevent_shutdown(void) /* {{{ */
+{
+  DEBUG("sysevent plugin: Shutting down thread.");
+
+  int status = stop_threads();
+  int status2 = 0;
+
+  if (sock != -1) {
+    status2 = close(sock);
+    if (status2 != 0) {
+      ERROR("sysevent plugin: failed to close socket %d: %d (%s)", sock, status,
+            STRERRNO);
+    }
+
+    sock = -1;
+  }
+
+  free(listen_ip);
+  free(listen_port);
+
+  for (int i = 0; i < buffer_length; i++) {
+    free(ring.buffer[i]);
+  }
+
+  free(ring.buffer);
+  free(ring.timestamp);
+
+  if (status != 0)
+    return status;
+  else
+    return status2;
+} /* }}} int sysevent_shutdown */
+
+void module_register(void) {
+  plugin_register_complex_config("sysevent", sysevent_config);
+  plugin_register_init("sysevent", sysevent_init);
+  plugin_register_read("sysevent", sysevent_read);
+  plugin_register_shutdown("sysevent", sysevent_shutdown);
+} /* void module_register */
index 9465ddc..19a5111 100644 (file)
@@ -987,6 +987,8 @@ static int __attribute__((warn_unused_result)) probe_cpu(void) {
     /* Ivy Bridge */
     case 0x3A: /* IVB */
     case 0x3E: /* IVB Xeon */
+    case 0x55: /* SKX,CLX Xeon */
+    case 0x6A: /* ICX Xeon */
       do_smi = true;
       do_core_cstate = (1 << 3) | (1 << 6) | (1 << 7);
       do_pkg_cstate = (1 << 2) | (1 << 3) | (1 << 6) | (1 << 7);
@@ -1042,6 +1044,8 @@ static int __attribute__((warn_unused_result)) probe_cpu(void) {
       break;
     case 0x2D: /* SNB Xeon */
     case 0x3E: /* IVB Xeon */
+    case 0x55: /* SKX,CLX Xeon */
+    case 0x6A: /* ICX Xeon */
       do_rapl = RAPL_PKG | RAPL_CORES | RAPL_DRAM;
       do_power_fields = TURBO_PLATFORM | PSTATES_PLATFORM;
       break;
index 69f59b0..6c08936 100644 (file)
@@ -242,6 +242,7 @@ spam_score              value:GAUGE:U:U
 spl                     value:GAUGE:U:U
 swap                    value:GAUGE:0:1099511627776
 swap_io                 value:DERIVE:0:U
+sysevent                value:GAUGE:0:1
 tcp_connections         value:GAUGE:0:4294967295
 tdp                     value:GAUGE:U:U
 temperature             value:GAUGE:U:U
index 2d961d4..aad767e 100644 (file)
@@ -89,8 +89,7 @@ char *sstrncpy(char *dest, const char *src, size_t n) {
   return dest;
 } /* char *sstrncpy */
 
-/* ssnprintf returns zero on success, one if truncation occurred
-   and a negative integer onerror. */
+/* ssnprintf returns result from vsnprintf conistent with snprintf */
 int ssnprintf(char *str, size_t sz, const char *format, ...) {
   va_list ap;
   va_start(ap, format);
@@ -99,10 +98,7 @@ int ssnprintf(char *str, size_t sz, const char *format, ...) {
 
   va_end(ap);
 
-  if (ret < 0) {
-    return ret;
-  }
-  return (size_t)ret >= sz;
+  return ret;
 } /* int ssnprintf */
 
 char *ssnprintf_alloc(char const *format, ...) /* {{{ */
index 4f15c16..b8b3b4f 100644 (file)
@@ -196,9 +196,9 @@ DEF_TEST(escape_slashes) {
   };
 
   for (size_t i = 0; i < STATIC_ARRAY_SIZE(cases); i++) {
-    char buffer[32];
+    char buffer[32] = {0};
 
-    strncpy(buffer, cases[i].str, sizeof(buffer));
+    strncpy(buffer, cases[i].str, sizeof(buffer) - 1);
     OK(escape_slashes(buffer, sizeof(buffer)) == 0);
     EXPECT_EQ_STR(cases[i].want, buffer);
   }
@@ -221,9 +221,9 @@ DEF_TEST(escape_string) {
   };
 
   for (size_t i = 0; i < STATIC_ARRAY_SIZE(cases); i++) {
-    char buffer[16];
+    char buffer[16] = {0};
 
-    strncpy(buffer, cases[i].str, sizeof(buffer));
+    strncpy(buffer, cases[i].str, sizeof(buffer) - 1);
     OK(escape_string(buffer, sizeof(buffer)) == 0);
     EXPECT_EQ_STR(cases[i].want, buffer);
   }
@@ -232,33 +232,33 @@ DEF_TEST(escape_string) {
 }
 
 DEF_TEST(strunescape) {
-  char buffer[16];
+  char buffer[32] = {0};
   int status;
 
-  strncpy(buffer, "foo\\tbar", sizeof(buffer));
+  strncpy(buffer, "foo\\tbar", sizeof(buffer) - 1);
   status = strunescape(buffer, sizeof(buffer));
   OK(status == 0);
   EXPECT_EQ_STR("foo\tbar", buffer);
 
-  strncpy(buffer, "\\tfoo\\r\\n", sizeof(buffer));
+  strncpy(buffer, "\\tfoo\\r\\n", sizeof(buffer) - 1);
   status = strunescape(buffer, sizeof(buffer));
   OK(status == 0);
   EXPECT_EQ_STR("\tfoo\r\n", buffer);
 
-  strncpy(buffer, "With \\\"quotes\\\"", sizeof(buffer));
+  strncpy(buffer, "With \\\"quotes\\\"", sizeof(buffer) - 1);
   status = strunescape(buffer, sizeof(buffer));
   OK(status == 0);
   EXPECT_EQ_STR("With \"quotes\"", buffer);
 
   /* Backslash before null byte */
-  strncpy(buffer, "\\tbackslash end\\", sizeof(buffer));
+  strncpy(buffer, "\\tbackslash end\\", sizeof(buffer) - 1);
   status = strunescape(buffer, sizeof(buffer));
   OK(status != 0);
   EXPECT_EQ_STR("\tbackslash end", buffer);
   return 0;
 
   /* Backslash at buffer end */
-  strncpy(buffer, "\\t3\\56", sizeof(buffer));
+  strncpy(buffer, "\\t3\\56", sizeof(buffer) - 1);
   status = strunescape(buffer, 4);
   OK(status != 0);
   OK(buffer[0] == '\t');
index be93b9d..3364697 100644 (file)
@@ -268,10 +268,17 @@ static int za_read(void) {
   za_read_gauge(ksp, "mfu_size", "cache_size", "mfu_size");
   za_read_gauge(ksp, "mru_ghost_size", "cache_size", "mru_ghost_size");
   za_read_gauge(ksp, "mru_size", "cache_size", "mru_size");
-  za_read_gauge(ksp, "other_size", "cache_size", "other_size");
   za_read_gauge(ksp, "p", "cache_size", "p");
   za_read_gauge(ksp, "size", "cache_size", "arc");
 
+  /* The "other_size" value was replaced by more specific values in ZFS on Linux
+   * version 0.7.0 (commit 25458cb)
+   */
+  if (za_read_gauge(ksp, "dbuf_size", "cache_size", "dbuf_size") != 0 ||
+      za_read_gauge(ksp, "dnode_size", "cache_size", "dnode_size") != 0 ||
+      za_read_gauge(ksp, "bonus_size", "cache_size", "bonus_size") != 0)
+    za_read_gauge(ksp, "other_size", "cache_size", "other_size");
+
   /* The "l2_size" value has disappeared from Solaris some time in
    * early 2013, and has only reappeared recently in Solaris 11.2.
    * Stop trying if we ever fail to read it, so we don't spam the log.
index 048b5a2..76bd065 100755 (executable)
@@ -1,6 +1,6 @@
 #!/bin/sh
 
-DEFAULT_VERSION="5.8.1.git"
+DEFAULT_VERSION="5.9.2.git"
 
 if [ -d .git ]; then
        VERSION="`git describe --dirty=+ --abbrev=7 2> /dev/null | grep collectd | sed -e 's/^collectd-//' -e 's/-/./g'`"