From: Pierre-Yves Ritschard Date: Fri, 25 Jul 2014 14:07:04 +0000 (+0200) Subject: Merge pull request #660 from mfournier/logfile-log_level X-Git-Tag: collectd-5.5.0~289 X-Git-Url: https://git.octo.it/?a=commitdiff_plain;h=70160d8f57c9a767ae2ca14e31c8687ecbb10db3;hp=88e327344e8cee8272776a28a030ffa84d201f8e;p=collectd.git Merge pull request #660 from mfournier/logfile-log_level Logfile log level --- diff --git a/configure.ac b/configure.ac index f1c7b8ab..f09887e4 100644 --- a/configure.ac +++ b/configure.ac @@ -981,7 +981,7 @@ if test "x$fp_layout_type" = "xunknown"; then uint8_t c[8]; double d; - d = 8.642135e130; + d = 8.642135e130; memcpy ((void *) &i0, (void *) &d, 8); i1 = i0; @@ -1036,7 +1036,7 @@ if test "x$fp_layout_type" = "xunknown"; then uint8_t c[8]; double d; - d = 8.642135e130; + d = 8.642135e130; memcpy ((void *) &i0, (void *) &d, 8); i1 = endianflip (i0); @@ -1085,7 +1085,7 @@ if test "x$fp_layout_type" = "xunknown"; then uint8_t c[8]; double d; - d = 8.642135e130; + d = 8.642135e130; memcpy ((void *) &i0, (void *) &d, 8); i1 = intswap (i0); @@ -1237,7 +1237,7 @@ AC_MSG_CHECKING([if have htonll defined]) have_htonll="yes" AC_DEFINE(HAVE_HTONLL, 1, [Define if the function htonll exists.]) ]) - + AC_MSG_RESULT([$have_htonll]) # Check for structures @@ -1380,7 +1380,7 @@ collectd additional packages:]) AM_CONDITIONAL([BUILD_FREEBSD],[test "x$x$ac_system" = "xFreeBSD"]) -AM_CONDITIONAL([BUILD_AIX],[test "x$x$ac_system" = "xAIX"]) +AM_CONDITIONAL([BUILD_AIX],[test "x$x$ac_system" = "xAIX"]) if test "x$ac_system" = "xAIX" then @@ -2794,7 +2794,7 @@ then else SAVE_CPPFLAGS="$CPPFLAGS" CPPFLAGS="$CPPFLAGS $with_snmp_cflags" - + AC_CHECK_HEADERS(net-snmp/net-snmp-config.h, [], [with_libnetsnmp="no (net-snmp/net-snmp-config.h not found)"]) CPPFLAGS="$SAVE_CPPFLAGS" @@ -3021,7 +3021,7 @@ if test "x$with_libowcapi" = "xyes" then SAVE_CPPFLAGS="$CPPFLAGS" CPPFLAGS="$with_libowcapi_cppflags" - + AC_CHECK_HEADERS(owcapi.h, [with_libowcapi="yes"], [with_libowcapi="no (owcapi.h not found)"]) CPPFLAGS="$SAVE_CPPFLAGS" @@ -3032,7 +3032,7 @@ then SAVE_CPPFLAGS="$CPPFLAGS" LDFLAGS="$with_libowcapi_libs" CPPFLAGS="$with_libowcapi_cppflags" - + AC_CHECK_LIB(owcapi, OW_get, [with_libowcapi="yes"], [with_libowcapi="no (libowcapi not found)"]) LDFLAGS="$SAVE_LDFLAGS" @@ -3597,6 +3597,49 @@ LDFLAGS="$SAVE_LDFLAGS" AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes") # }}} +# --with-librdkafka {{{ +AC_ARG_WITH(librdkafka, [AS_HELP_STRING([--with-librdkafka@<:@=PREFIX@:>@], [Path to librdkafka.])], +[ + if test "x$withval" = "xno" && test "x$withval" != "xyes" + then + with_librdkafka_cppflags="-I$withval/include" + with_librdkafka_ldflags="-L$withval/lib" + with_librdkafka="yes" + else + with_librdkafka="$withval" + fi +], +[ + with_librdkafka="yes" +]) +SAVE_CPPFLAGS="$CPPFLAGS" +SAVE_LDFLAGS="$LDFLAGS" + +if test "x$with_librdkafka" = "xyes" +then + AC_CHECK_HEADERS(librdkafka/rdkafka.h, [with_librdkafka="yes"], [with_librdkafka="no (librdkafka/rdkafka.h not found)"]) +fi + +if test "x$with_librdkafka" = "xyes" +then + AC_CHECK_LIB(rdkafka, rd_kafka_new, [with_librdkafka="yes"], [with_librdkafka="no (Symbol 'rd_kafka_new' not found)"]) +fi +if test "x$with_librdkafka" = "xyes" +then + BUILD_WITH_LIBRDKAFKA_CPPFLAGS="$with_librdkafka_cppflags" + BUILD_WITH_LIBRDKAFKA_LDFLAGS="$with_librdkafka_ldflags" + BUILD_WITH_LIBRDKAFKA_LIBS="-lrdkafka" + AC_SUBST(BUILD_WITH_LIBRDKAFKA_CPPFLAGS) + AC_SUBST(BUILD_WITH_LIBRDKAFKA_LDFLAGS) + AC_SUBST(BUILD_WITH_LIBRDKAFKA_LIBS) + AC_DEFINE(HAVE_LIBRDKAFKA, 1, [Define if librdkafka is present and usable.]) +fi +CPPFLAGS="$SAVE_CPPFLAGS" +LDFLAGS="$SAVE_LDFLAGS" +AM_CONDITIONAL(BUILD_WITH_LIBRDKAFKA, test "x$with_librdkafka" = "xyes") + +# }}} + # --with-librouteros {{{ AC_ARG_WITH(librouteros, [AS_HELP_STRING([--with-librouteros@<:@=PREFIX@:>@], [Path to librouteros.])], [ @@ -4039,7 +4082,7 @@ CPPFLAGS="$SAVE_CPPFLAGS" LDFLAGS="$SAVE_LDFLAGS" if test "x$with_libtokyotyrant" = "xyes" -then +then BUILD_WITH_LIBTOKYOTYRANT_CPPFLAGS="$with_libtokyotyrant_cppflags" BUILD_WITH_LIBTOKYOTYRANT_LDFLAGS="$with_libtokyotyrant_ldflags" BUILD_WITH_LIBTOKYOTYRANT_LIBS="$with_libtokyotyrant_libs" @@ -4050,6 +4093,67 @@ fi AM_CONDITIONAL(BUILD_WITH_LIBTOKYOTYRANT, test "x$with_libtokyotyrant" = "xyes") # }}} +# --with-libudev {{{ +with_libudev_cflags="" +with_libudev_ldflags="" +AC_ARG_WITH(libudev, [AS_HELP_STRING([--with-libudev@<:@=PREFIX@:>@], [Path to libudev.])], +[ + if test "x$withval" = "xno" + then + with_libudev="no" + else + with_libudev="yes" + if test "x$withval" != "xyes" + then + with_libudev_cflags="-I$withval/include" + with_libudev_ldflags="-L$withval/lib" + with_libudev="yes" + fi + fi +], +[ + if test "x$ac_system" = "xLinux" + then + with_libudev="yes" + else + with_libudev="no (Linux only library)" + fi +]) +if test "x$with_libudev" = "xyes" +then + SAVE_CPPFLAGS="$CPPFLAGS" + CPPFLAGS="$CPPFLAGS $with_libudev_cflags" + + AC_CHECK_HEADERS(libudev.h, [], [with_libudev="no (libudev.h not found)"]) + + CPPFLAGS="$SAVE_CPPFLAGS" +fi +if test "x$with_libudev" = "xyes" +then + SAVE_CPPFLAGS="$CPPFLAGS" + SAVE_LDFLAGS="$LDFLAGS" + CPPFLAGS="$CPPFLAGS $with_libudev_cflags" + LDFLAGS="$LDFLAGS $with_libudev_ldflags" + + AC_CHECK_LIB(udev, udev_new, + [ + AC_DEFINE(HAVE_LIBUDEV, 1, [Define to 1 if you have the udev library (-ludev).]) + ], + [with_libudev="no (libudev not found)"]) + + CPPFLAGS="$SAVE_CPPFLAGS" + LDFLAGS="$SAVE_LDFLAGS" +fi +if test "x$with_libudev" = "xyes" +then + BUILD_WITH_LIBUDEV_CFLAGS="$with_libudev_cflags" + BUILD_WITH_LIBUDEV_LDFLAGS="$with_libudev_ldflags" + AC_SUBST(BUILD_WITH_LIBUDEV_CFLAGS) + AC_SUBST(BUILD_WITH_LIBUDEV_LDFLAGS) +fi +AM_CONDITIONAL(BUILD_WITH_LIBUDEV, test "x$with_libudev" = "xyes") +# }}} + # --with-libupsclient {{{ with_libupsclient_config="" with_libupsclient_cflags="" @@ -5223,6 +5327,7 @@ AC_PLUGIN([vserver], [$plugin_vserver], [Linux VServer statistics]) AC_PLUGIN([wireless], [$plugin_wireless], [Wireless statistics]) AC_PLUGIN([write_graphite], [yes], [Graphite / Carbon output plugin]) AC_PLUGIN([write_http], [$with_libcurl], [HTTP output plugin]) +AC_PLUGIN([write_kafka], [$with_librdkafka], [Kafka output plugin]) AC_PLUGIN([write_mongodb], [$with_libmongoc], [MongoDB output plugin]) AC_PLUGIN([write_redis], [$with_libcredis], [Redis output plugin]) AC_PLUGIN([write_riemann], [$have_protoc_c], [Riemann output plugin]) @@ -5427,12 +5532,14 @@ Configuration: libpq . . . . . . . . $with_libpq libpthread . . . . . $with_libpthread librabbitmq . . . . . $with_librabbitmq + librdkafka . . . . . $with_librdkafka librouteros . . . . . $with_librouteros librrd . . . . . . . $with_librrd libsensors . . . . . $with_libsensors libsigrok . . . . . $with_libsigrok libstatgrab . . . . . $with_libstatgrab libtokyotyrant . . . $with_libtokyotyrant + libudev . . . . . . . $with_libudev libupsclient . . . . $with_libupsclient libvarnish . . . . . $with_libvarnish libvirt . . . . . . . $with_libvirt @@ -5567,6 +5674,7 @@ Configuration: wireless . . . . . . $enable_wireless write_graphite . . . $enable_write_graphite write_http . . . . . $enable_write_http + write_kafka . . . . . $enable_write_kafka write_mongodb . . . . $enable_write_mongodb write_redis . . . . . $enable_write_redis write_riemann . . . . $enable_write_riemann diff --git a/src/Makefile.am b/src/Makefile.am index a9d85823..5e8f76ae 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -39,7 +39,9 @@ collectd_SOURCES = collectd.c collectd.h \ utils_subst.c utils_subst.h \ utils_tail.c utils_tail.h \ utils_time.c utils_time.h \ - types_list.c types_list.h + types_list.c types_list.h \ + utils_threshold.c utils_threshold.h + collectd_CPPFLAGS = $(AM_CPPFLAGS) $(LTDLINCL) collectd_CFLAGS = $(AM_CFLAGS) @@ -133,9 +135,9 @@ collectd_tg_LDADD += libcollectdclient/libcollectdclient.la collectd_tg_DEPENDENCIES = libcollectdclient/libcollectdclient.la -pkglib_LTLIBRARIES = +pkglib_LTLIBRARIES = -BUILT_SOURCES = +BUILT_SOURCES = CLEANFILES = if BUILD_PLUGIN_AGGREGATION @@ -273,7 +275,7 @@ pkglib_LTLIBRARIES += cpu.la cpu_la_SOURCES = cpu.c cpu_la_CFLAGS = $(AM_CFLAGS) cpu_la_LDFLAGS = -module -avoid-version -cpu_la_LIBADD = +cpu_la_LIBADD = if BUILD_WITH_LIBKSTAT cpu_la_LIBADD += -lkstat endif @@ -371,7 +373,7 @@ pkglib_LTLIBRARIES += disk.la disk_la_SOURCES = disk.c disk_la_CFLAGS = $(AM_CFLAGS) disk_la_LDFLAGS = -module -avoid-version -disk_la_LIBADD = +disk_la_LIBADD = if BUILD_WITH_LIBKSTAT disk_la_LIBADD += -lkstat endif @@ -382,9 +384,12 @@ if BUILD_WITH_LIBIOKIT disk_la_LDFLAGS += -framework IOKit endif if BUILD_WITH_LIBSTATGRAB -disk_la_CFLAGS += $(BUILD_WITH_LIBSTATGRAB_CFLAGS) +disk_la_CFLAGS += $(BUILD_WITH_LIBSTATGRAB_CFLAGS) disk_la_LIBADD += $(BUILD_WITH_LIBSTATGRAB_LDFLAGS) endif +if BUILD_WITH_LIBUDEV +disk_la_LIBADD += -ludev +endif if BUILD_WITH_PERFSTAT disk_la_LIBADD += -lperfstat endif @@ -862,7 +867,7 @@ if BUILD_PLUGIN_OLSRD pkglib_LTLIBRARIES += olsrd.la olsrd_la_SOURCES = olsrd.c olsrd_la_LDFLAGS = -module -avoid-version -olsrd_la_LIBADD = +olsrd_la_LIBADD = if BUILD_WITH_LIBSOCKET olsrd_la_LIBADD += -lsocket endif @@ -1387,6 +1392,19 @@ endif collectd_DEPENDENCIES += write_http.la endif +if BUILD_PLUGIN_WRITE_KAFKA +pkglib_LTLIBRARIES += write_kafka.la +write_kafka_la_SOURCES = write_kafka.c \ + utils_format_graphite.c utils_format_graphite.h \ + utils_format_json.c utils_format_json.h \ + utils_cmd_putval.c utils_cmd_putval.h \ + utils_crc32.c utils_crc32.h +write_kafka_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRDKAFKA_LDFLAGS) +write_kafka_la_LIBADD = $(BUILD_WITH_LIBRDKAFKA_LIBS) +collectd_LDADD += "-dlopen" write_kafka.la +collectd_DEPENDENCIES += write_kafka.la +endif + if BUILD_PLUGIN_WRITE_MONGODB pkglib_LTLIBRARIES += write_mongodb.la write_mongodb_la_SOURCES = write_mongodb.c @@ -1409,7 +1427,7 @@ endif if BUILD_PLUGIN_WRITE_RIEMANN pkglib_LTLIBRARIES += write_riemann.la -write_riemann_la_SOURCES = write_riemann.c +write_riemann_la_SOURCES = write_riemann.c write_riemann_threshold.c nodist_write_riemann_la_SOURCES = riemann.pb-c.c riemann.pb-c.h write_riemann_la_LDFLAGS = -module -avoid-version write_riemann_la_LIBADD = -lprotobuf-c diff --git a/src/amqp.c b/src/amqp.c index 3f33ff78..bdc62b3b 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -743,7 +743,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ { camqp_config_t *conf = user_data->data; char routing_key[6 * DATA_MAX_NAME_LEN]; - char buffer[4096]; + char buffer[8192]; int status; if ((ds == NULL) || (vl == NULL) || (conf == NULL)) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 3e2ddeef..667cd527 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -391,6 +391,8 @@ # # Disk "/^[hs]d[a-f][0-9]?$/" # IgnoreSelected false +# UseBSDName false +# UdevNameAttr "DEVNAME" # # diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index a14a8f68..39dea7e5 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -1688,6 +1688,20 @@ collected. If at least one B option is given and no B or set to B, B matching disks will be collected. If B is set to B, all disks are collected B the ones matched. +=item B B|B + +Whether to use the device's "BSD Name", on MacEOSEX, instead of the +default major/minor numbers. Requires collectd to be built with Apple's +IOKitLib support. + +=item B I + +Attempt to override disk instance name with the value of a specified udev +attribute when built with B. If the attribute is not defined for the +given device, the default name is used. Example: + + UdevNameAttr "DM_NAME" + =back =head2 Plugin C @@ -6366,6 +6380,111 @@ number. =back +=head2 Plugin C + +The I will send values to a I topic, a distributed +queue. +Synopsis: + + + Property "metadata.broker.list" "broker1:9092,broker2:9092" + + Format JSON + + + +The following options are understood by the I: + +=over 4 + +=item EB IE + +The plugin's configuration consists of one or more B blocks. Each block +is given a unique I and specifies one kafka producer. +Inside the B block, the following per-topic options are +understood: + +=over 4 + +=item B I I + +Configure the named property for the current topic. Properties are +forwarded to the kafka producer library B. + +=item B I + +Use the specified string as a partioning key for the topic. Kafka breaks +topic into partitions and guarantees that for a given topology, the same +consumer will be used for a specific key. The special (case insensitive) +string B can be used to specify that an arbitrary partition should +be used. + +=item B B|B|B + +Selects the format in which messages are sent to the broker. If set to +B (the default), values are sent as C commands which are +identical to the syntax used by the I and I. + +If set to B, the values are encoded in the I, +an easy and straight forward exchange format. + +If set to B, values are encoded in the I format, which is +" \n". + +=item B B|B + +Determines whether or not C, C and C data sources +are converted to a I (i.e. a C value). If set to B (the +default), no conversion is performed. Otherwise the conversion is performed +using the internal value cache. + +Please note that currently this option is only used if the B option has +been set to B. + +=item B (B=I only) + +A prefix can be added in the metric name when outputting in the I format. +It's added before the I name. +Metric name will be "" + +=item B (B=I only) + +A postfix can be added in the metric name when outputting in the I format. +It's added after the I name. +Metric name will be "" + +=item B (B=I only) + +Specify a character to replace dots (.) in the host part of the metric name. +In I metric name, dots are used as separators between different +metric parts (host, plugin, type). +Default is "_" (I). + +=item B B|B + +If set to B, the plugin instance and type instance will be in their own +path component, for example C. If set to B (the +default), the plugin and plugin instance (and likewise the type and type +instance) are put into one component, for example C. + +=item B B|B + +If set to B (the default), convert counter values to rates. If set to +B counter values are stored as is, i.e. as an increasing integer number. + +This will be reflected in the C tag: If B is enabled, +converted values will have "rate" appended to the data source type, e.g. +C. + +=back + +=item B I I + +Configure the kafka producer through properties, you almost always will +want to set B to your Kafka broker list. + +=back + =head2 Plugin C The I will send values to I, a powerfull stream @@ -6438,6 +6557,17 @@ interval is multiplied to set the TTL. The default value is B<2.0>. Unless you know exactly what you're doing, you should only increase this setting from its default value. +=item B B|B + +If set to B, create riemann events for notifications. This is B +by default. When processing thresholds from write_riemann, it might prove +useful to avoid getting notification events. + +=item B B|B + +If set to B, attach state to events based on thresholds defined +in the B plugin. Defaults to B. + =back =item B I diff --git a/src/cpu.c b/src/cpu.c index 2e225fd1..79dc1507 100644 --- a/src/cpu.c +++ b/src/cpu.c @@ -241,10 +241,11 @@ static int cpu_states_grow (void) return -1; } - for (i = percents_cells; i < size; i++) + percents = tmp; + + for (i = percents_cells ; i < size; i++) memset(&percents[i], 0, sizeof(*percents)); - percents = tmp; percents_cells = size; return 0; } /* cpu_states_grow */ diff --git a/src/disk.c b/src/disk.c index 36d0a0c3..cdf2816d 100644 --- a/src/disk.c +++ b/src/disk.c @@ -128,11 +128,19 @@ static int pnumdisk; # error "No applicable input method." #endif +#if HAVE_LIBUDEV +#include + +static char *conf_udev_name_attr = NULL; +static struct udev *handle_udev; +#endif + static const char *config_keys[] = { "Disk", "UseBSDName", - "IgnoreSelected" + "IgnoreSelected", + "UdevNameAttr" }; static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); @@ -165,6 +173,21 @@ static int disk_config (const char *key, const char *value) "on Mach / Mac OS X and will be ignored."); #endif } + else if (strcasecmp ("UdevNameAttr", key) == 0) + { +#if HAVE_LIBUDEV + if (conf_udev_name_attr != NULL) + { + free (conf_udev_name_attr); + conf_udev_name_attr = NULL; + } + if ((conf_udev_name_attr = strdup (value)) == NULL) + return (1); +#else + WARNING ("disk plugin: The \"UdevNameAttr\" option is only supported " + "if collectd is built with libudev support"); +#endif + } else { return (-1); @@ -259,6 +282,34 @@ static counter_t disk_calc_time_incr (counter_t delta_time, counter_t delta_ops) } #endif +#if HAVE_LIBUDEV +/** + * Attempt to provide an rename disk instance from an assigned udev attribute. + * + * On success, it returns a strduped char* to the desired attribute value. + * Otherwise it returns NULL. + */ + +static char *disk_udev_attr_name (struct udev *udev, char *disk_name, const char *attr) +{ + struct udev_device *dev; + const char *prop; + char *output = NULL; + + dev = udev_device_new_from_subsystem_sysname (udev, "block", disk_name); + if (dev != NULL) + { + prop = udev_device_get_property_value (dev, attr); + if (prop) { + output = strdup (prop); + DEBUG ("disk plugin: renaming %s => %s", disk_name, output); + } + udev_device_unref (dev); + } + return output; +} +#endif + #if HAVE_IOKIT_IOKITLIB_H static signed long long dict_get_value (CFDictionaryRef dict, const char *key) { @@ -505,9 +556,15 @@ static int disk_read (void) fieldshift = 1; } +#if HAVE_LIBUDEV + handle_udev = udev_new(); +#endif + while (fgets (buffer, sizeof (buffer), fh) != NULL) { char *disk_name; + char *output_name; + char *alt_name; numfields = strsplit (buffer, fields, 32); @@ -659,25 +716,43 @@ static int disk_read (void) continue; } + output_name = disk_name; + +#if HAVE_LIBUDEV + alt_name = disk_udev_attr_name (handle_udev, disk_name, + conf_udev_name_attr); +#else + alt_name = NULL; +#endif + if (alt_name != NULL) + output_name = alt_name; + if ((ds->read_bytes != 0) || (ds->write_bytes != 0)) - disk_submit (disk_name, "disk_octets", + disk_submit (output_name, "disk_octets", ds->read_bytes, ds->write_bytes); if ((ds->read_ops != 0) || (ds->write_ops != 0)) - disk_submit (disk_name, "disk_ops", + disk_submit (output_name, "disk_ops", read_ops, write_ops); if ((ds->avg_read_time != 0) || (ds->avg_write_time != 0)) - disk_submit (disk_name, "disk_time", + disk_submit (output_name, "disk_time", ds->avg_read_time, ds->avg_write_time); if (is_disk) { - disk_submit (disk_name, "disk_merged", + disk_submit (output_name, "disk_merged", read_merged, write_merged); } /* if (is_disk) */ + + /* release udev-based alternate name, if allocated */ + free(alt_name); } /* while (fgets (buffer, sizeof (buffer), fh) != NULL) */ +#if HAVE_LIBUDEV + udev_unref(handle_udev); +#endif + fclose (fh); /* #endif defined(KERNEL_LINUX) */ diff --git a/src/threshold.c b/src/threshold.c index 7df4d616..922689d3 100644 --- a/src/threshold.c +++ b/src/threshold.c @@ -28,45 +28,12 @@ #include "plugin.h" #include "utils_avltree.h" #include "utils_cache.h" +#include "utils_threshold.h" #include #include /* - * Private data structures - * {{{ */ -#define UT_FLAG_INVERT 0x01 -#define UT_FLAG_PERSIST 0x02 -#define UT_FLAG_PERCENTAGE 0x04 -#define UT_FLAG_INTERESTING 0x08 -#define UT_FLAG_PERSIST_OK 0x10 -typedef struct threshold_s -{ - char host[DATA_MAX_NAME_LEN]; - char plugin[DATA_MAX_NAME_LEN]; - char plugin_instance[DATA_MAX_NAME_LEN]; - char type[DATA_MAX_NAME_LEN]; - char type_instance[DATA_MAX_NAME_LEN]; - char data_source[DATA_MAX_NAME_LEN]; - gauge_t warning_min; - gauge_t warning_max; - gauge_t failure_min; - gauge_t failure_max; - gauge_t hysteresis; - unsigned int flags; - int hits; - struct threshold_s *next; -} threshold_t; -/* }}} */ - -/* - * Private (static) variables - * {{{ */ -static c_avl_tree_t *threshold_tree = NULL; -static pthread_mutex_t threshold_lock = PTHREAD_MUTEX_INITIALIZER; -/* }}} */ - -/* * Threshold management * ==================== * The following functions add, delete, search, etc. configured thresholds to @@ -171,7 +138,7 @@ static int ut_threshold_add (const threshold_t *th) return (status); } /* }}} int ut_threshold_add */ -/* +/* * threshold_t *threshold_search * * Searches for a threshold configuration using all the possible variations of @@ -862,7 +829,7 @@ static int ut_check_one_threshold (const data_set_t *ds, * * Gets a list of matching thresholds and searches for the worst status by one * of the thresholds. Then reports that status using the ut_report_state - * function above. + * function above. * Returns zero on success and if no threshold has been configured. Returns * less than zero on failure. */ @@ -990,7 +957,7 @@ int ut_config (oconfig_item_t *ci) th.hits = 0; th.hysteresis = 0; th.flags = UT_FLAG_INTERESTING; /* interesting by default */ - + for (i = 0; i < ci->children_num; i++) { oconfig_item_t *option = ci->children + i; diff --git a/src/utils_crc32.c b/src/utils_crc32.c new file mode 100644 index 00000000..4c6d6941 --- /dev/null +++ b/src/utils_crc32.c @@ -0,0 +1,110 @@ +/* + * COPYRIGHT (C) 1986 Gary S. Brown. You may use this program, or + * code or tables extracted from it, as desired without restriction. + * + * First, the polynomial itself and its table of feedback terms. The + * polynomial is + * X^32+X^26+X^23+X^22+X^16+X^12+X^11+X^10+X^8+X^7+X^5+X^4+X^2+X^1+X^0 + * + * Note that we take it "backwards" and put the highest-order term in + * the lowest-order bit. The X^32 term is "implied"; the LSB is the + * X^31 term, etc. The X^0 term (usually shown as "+1") results in + * the MSB being 1 + * + * Note that the usual hardware shift register implementation, which + * is what we're using (we're merely optimizing it by doing eight-bit + * chunks at a time) shifts bits into the lowest-order term. In our + * implementation, that means shifting towards the right. Why do we + * do it this way? Because the calculated CRC must be transmitted in + * order from highest-order term to lowest-order term. UARTs transmit + * characters in order from LSB to MSB. By storing the CRC this way + * we hand it to the UART in the order low-byte to high-byte; the UART + * sends each low-bit to hight-bit; and the result is transmission bit + * by bit from highest- to lowest-order term without requiring any bit + * shuffling on our part. Reception works similarly + * + * The feedback terms table consists of 256, 32-bit entries. Notes + * + * The table can be generated at runtime if desired; code to do so + * is shown later. It might not be obvious, but the feedback + * terms simply represent the results of eight shift/xor opera + * tions for all combinations of data and CRC register values + * + * The values must be right-shifted by eight bits by the "updcrc + * logic; the shift must be unsigned (bring in zeroes). On some + * hardware you could probably optimize the shift in assembler by + * using byte-swap instructions + * polynomial $edb88320 + */ + +#include + +u_int32_t crc32_buffer(const u_char *, size_t); +static unsigned int crc32_tab[] = { + 0x00000000L, 0x77073096L, 0xee0e612cL, 0x990951baL, 0x076dc419L, + 0x706af48fL, 0xe963a535L, 0x9e6495a3L, 0x0edb8832L, 0x79dcb8a4L, + 0xe0d5e91eL, 0x97d2d988L, 0x09b64c2bL, 0x7eb17cbdL, 0xe7b82d07L, + 0x90bf1d91L, 0x1db71064L, 0x6ab020f2L, 0xf3b97148L, 0x84be41deL, + 0x1adad47dL, 0x6ddde4ebL, 0xf4d4b551L, 0x83d385c7L, 0x136c9856L, + 0x646ba8c0L, 0xfd62f97aL, 0x8a65c9ecL, 0x14015c4fL, 0x63066cd9L, + 0xfa0f3d63L, 0x8d080df5L, 0x3b6e20c8L, 0x4c69105eL, 0xd56041e4L, + 0xa2677172L, 0x3c03e4d1L, 0x4b04d447L, 0xd20d85fdL, 0xa50ab56bL, + 0x35b5a8faL, 0x42b2986cL, 0xdbbbc9d6L, 0xacbcf940L, 0x32d86ce3L, + 0x45df5c75L, 0xdcd60dcfL, 0xabd13d59L, 0x26d930acL, 0x51de003aL, + 0xc8d75180L, 0xbfd06116L, 0x21b4f4b5L, 0x56b3c423L, 0xcfba9599L, + 0xb8bda50fL, 0x2802b89eL, 0x5f058808L, 0xc60cd9b2L, 0xb10be924L, + 0x2f6f7c87L, 0x58684c11L, 0xc1611dabL, 0xb6662d3dL, 0x76dc4190L, + 0x01db7106L, 0x98d220bcL, 0xefd5102aL, 0x71b18589L, 0x06b6b51fL, + 0x9fbfe4a5L, 0xe8b8d433L, 0x7807c9a2L, 0x0f00f934L, 0x9609a88eL, + 0xe10e9818L, 0x7f6a0dbbL, 0x086d3d2dL, 0x91646c97L, 0xe6635c01L, + 0x6b6b51f4L, 0x1c6c6162L, 0x856530d8L, 0xf262004eL, 0x6c0695edL, + 0x1b01a57bL, 0x8208f4c1L, 0xf50fc457L, 0x65b0d9c6L, 0x12b7e950L, + 0x8bbeb8eaL, 0xfcb9887cL, 0x62dd1ddfL, 0x15da2d49L, 0x8cd37cf3L, + 0xfbd44c65L, 0x4db26158L, 0x3ab551ceL, 0xa3bc0074L, 0xd4bb30e2L, + 0x4adfa541L, 0x3dd895d7L, 0xa4d1c46dL, 0xd3d6f4fbL, 0x4369e96aL, + 0x346ed9fcL, 0xad678846L, 0xda60b8d0L, 0x44042d73L, 0x33031de5L, + 0xaa0a4c5fL, 0xdd0d7cc9L, 0x5005713cL, 0x270241aaL, 0xbe0b1010L, + 0xc90c2086L, 0x5768b525L, 0x206f85b3L, 0xb966d409L, 0xce61e49fL, + 0x5edef90eL, 0x29d9c998L, 0xb0d09822L, 0xc7d7a8b4L, 0x59b33d17L, + 0x2eb40d81L, 0xb7bd5c3bL, 0xc0ba6cadL, 0xedb88320L, 0x9abfb3b6L, + 0x03b6e20cL, 0x74b1d29aL, 0xead54739L, 0x9dd277afL, 0x04db2615L, + 0x73dc1683L, 0xe3630b12L, 0x94643b84L, 0x0d6d6a3eL, 0x7a6a5aa8L, + 0xe40ecf0bL, 0x9309ff9dL, 0x0a00ae27L, 0x7d079eb1L, 0xf00f9344L, + 0x8708a3d2L, 0x1e01f268L, 0x6906c2feL, 0xf762575dL, 0x806567cbL, + 0x196c3671L, 0x6e6b06e7L, 0xfed41b76L, 0x89d32be0L, 0x10da7a5aL, + 0x67dd4accL, 0xf9b9df6fL, 0x8ebeeff9L, 0x17b7be43L, 0x60b08ed5L, + 0xd6d6a3e8L, 0xa1d1937eL, 0x38d8c2c4L, 0x4fdff252L, 0xd1bb67f1L, + 0xa6bc5767L, 0x3fb506ddL, 0x48b2364bL, 0xd80d2bdaL, 0xaf0a1b4cL, + 0x36034af6L, 0x41047a60L, 0xdf60efc3L, 0xa867df55L, 0x316e8eefL, + 0x4669be79L, 0xcb61b38cL, 0xbc66831aL, 0x256fd2a0L, 0x5268e236L, + 0xcc0c7795L, 0xbb0b4703L, 0x220216b9L, 0x5505262fL, 0xc5ba3bbeL, + 0xb2bd0b28L, 0x2bb45a92L, 0x5cb36a04L, 0xc2d7ffa7L, 0xb5d0cf31L, + 0x2cd99e8bL, 0x5bdeae1dL, 0x9b64c2b0L, 0xec63f226L, 0x756aa39cL, + 0x026d930aL, 0x9c0906a9L, 0xeb0e363fL, 0x72076785L, 0x05005713L, + 0x95bf4a82L, 0xe2b87a14L, 0x7bb12baeL, 0x0cb61b38L, 0x92d28e9bL, + 0xe5d5be0dL, 0x7cdcefb7L, 0x0bdbdf21L, 0x86d3d2d4L, 0xf1d4e242L, + 0x68ddb3f8L, 0x1fda836eL, 0x81be16cdL, 0xf6b9265bL, 0x6fb077e1L, + 0x18b74777L, 0x88085ae6L, 0xff0f6a70L, 0x66063bcaL, 0x11010b5cL, + 0x8f659effL, 0xf862ae69L, 0x616bffd3L, 0x166ccf45L, 0xa00ae278L, + 0xd70dd2eeL, 0x4e048354L, 0x3903b3c2L, 0xa7672661L, 0xd06016f7L, + 0x4969474dL, 0x3e6e77dbL, 0xaed16a4aL, 0xd9d65adcL, 0x40df0b66L, + 0x37d83bf0L, 0xa9bcae53L, 0xdebb9ec5L, 0x47b2cf7fL, 0x30b5ffe9L, + 0xbdbdf21cL, 0xcabac28aL, 0x53b39330L, 0x24b4a3a6L, 0xbad03605L, + 0xcdd70693L, 0x54de5729L, 0x23d967bfL, 0xb3667a2eL, 0xc4614ab8L, + 0x5d681b02L, 0x2a6f2b94L, 0xb40bbe37L, 0xc30c8ea1L, 0x5a05df1bL, + 0x2d02ef8dL +}; + +/* Return a 32-bit CRC of the contents of the buffer. */ + +u_int32_t +crc32_buffer(const u_char *s, size_t len) +{ + size_t i; + u_int32_t ret; + + ret = 0; + for (i = 0; i < len; i++) + ret = crc32_tab[(ret ^ s[i]) & 0xff] ^ (ret >> 8); + return ret; +} diff --git a/src/utils_crc32.h b/src/utils_crc32.h new file mode 100644 index 00000000..b16409d1 --- /dev/null +++ b/src/utils_crc32.h @@ -0,0 +1,27 @@ +/** + * collectd - src/utils_crc32.h + * + * Copyright (C) 2014 Pierre-Yves Ritschard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER + * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING + * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * + * Authors: + * Pierre-Yves Ritschard + */ + +#ifndef UTILS_CRC32_H +#define UTILS_CRC32_H 1 + +u_int32_t crc32_buffer(const u_char *, size_t); + +#endif diff --git a/src/utils_threshold.c b/src/utils_threshold.c new file mode 100644 index 00000000..005c49c3 --- /dev/null +++ b/src/utils_threshold.c @@ -0,0 +1,34 @@ +/** + * collectd - src/utils_threshold.c + * Copyright (C) 2014 Pierre-Yves Ritschard + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Author: + * Pierre-Yves Ritschard + **/ + +#include "collectd.h" +#include "common.h" +#include "utils_avltree.h" +#include "utils_threshold.h" + +#include + +/* + * Exported symbols + * {{{ */ +c_avl_tree_t *threshold_tree = NULL; +pthread_mutex_t threshold_lock = PTHREAD_MUTEX_INITIALIZER; +/* }}} */ diff --git a/src/utils_threshold.h b/src/utils_threshold.h new file mode 100644 index 00000000..d1abf563 --- /dev/null +++ b/src/utils_threshold.h @@ -0,0 +1,53 @@ +/** + * collectd - src/utils_threshold.h + * Copyright (C) 2014 Pierre-Yves Ritschard + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Author: + * Pierre-Yves Ritschard + **/ + +#ifndef UTILS_THRESHOLD_H +#define UTILS_THRESHOLD_H 1 + +#define UT_FLAG_INVERT 0x01 +#define UT_FLAG_PERSIST 0x02 +#define UT_FLAG_PERCENTAGE 0x04 +#define UT_FLAG_INTERESTING 0x08 +#define UT_FLAG_PERSIST_OK 0x10 +typedef struct threshold_s +{ + char host[DATA_MAX_NAME_LEN]; + char plugin[DATA_MAX_NAME_LEN]; + char plugin_instance[DATA_MAX_NAME_LEN]; + char type[DATA_MAX_NAME_LEN]; + char type_instance[DATA_MAX_NAME_LEN]; + char data_source[DATA_MAX_NAME_LEN]; + gauge_t warning_min; + gauge_t warning_max; + gauge_t failure_min; + gauge_t failure_max; + gauge_t hysteresis; + unsigned int flags; + int hits; + struct threshold_s *next; +} threshold_t; + +extern c_avl_tree_t *threshold_tree; +extern pthread_mutex_t threshold_lock; + +#endif /* UTILS_THRESHOLD_H */ + +/* vim: set sw=2 sts=2 ts=8 : */ diff --git a/src/write_http.c b/src/write_http.c index ed8d8342..6dc7b364 100644 --- a/src/write_http.c +++ b/src/write_http.c @@ -49,6 +49,11 @@ struct wh_callback_s _Bool verify_peer; _Bool verify_host; char *cacert; + char *capath; + char *clientkey; + char *clientcert; + char *clientkeypass; + long sslversion; _Bool store_rates; #define WH_FORMAT_COMMAND 0 @@ -150,8 +155,20 @@ static int wh_callback_init (wh_callback_t *cb) /* {{{ */ curl_easy_setopt (cb->curl, CURLOPT_SSL_VERIFYPEER, (long) cb->verify_peer); curl_easy_setopt (cb->curl, CURLOPT_SSL_VERIFYHOST, cb->verify_host ? 2L : 0L); + curl_easy_setopt (cb->curl, CURLOPT_SSLVERSION, cb->sslversion); if (cb->cacert != NULL) curl_easy_setopt (cb->curl, CURLOPT_CAINFO, cb->cacert); + if (cb->capath != NULL) + curl_easy_setopt (cb->curl, CURLOPT_CAPATH, cb->capath); + + if (cb->clientkey != NULL && cb->clientcert != NULL) + { + curl_easy_setopt (cb->curl, CURLOPT_SSLKEY, cb->clientkey); + curl_easy_setopt (cb->curl, CURLOPT_SSLCERT, cb->clientcert); + + if (cb->clientkeypass != NULL) + curl_easy_setopt (cb->curl, CURLOPT_SSLKEYPASSWD, cb->clientkeypass); + } wh_reset_buffer (cb); @@ -269,6 +286,10 @@ static void wh_callback_free (void *data) /* {{{ */ sfree (cb->pass); sfree (cb->credentials); sfree (cb->cacert); + sfree (cb->capath); + sfree (cb->clientkey); + sfree (cb->clientcert); + sfree (cb->clientkeypass); sfree (cb); } /* }}} void wh_callback_free */ @@ -474,15 +495,10 @@ static int wh_config_url (oconfig_item_t *ci) /* {{{ */ return (-1); } memset (cb, 0, sizeof (*cb)); - cb->location = NULL; - cb->user = NULL; - cb->pass = NULL; - cb->credentials = NULL; cb->verify_peer = 1; cb->verify_host = 1; - cb->cacert = NULL; cb->format = WH_FORMAT_COMMAND; - cb->curl = NULL; + cb->sslversion = CURL_SSLVERSION_DEFAULT; pthread_mutex_init (&cb->send_lock, /* attr = */ NULL); @@ -504,6 +520,42 @@ static int wh_config_url (oconfig_item_t *ci) /* {{{ */ cf_util_get_boolean (child, &cb->verify_host); else if (strcasecmp ("CACert", child->key) == 0) cf_util_get_string (child, &cb->cacert); + else if (strcasecmp ("CAPath", child->key) == 0) + cf_util_get_string (child, &cb->capath); + else if (strcasecmp ("ClientKey", child->key) == 0) + cf_util_get_string (child, &cb->clientkey); + else if (strcasecmp ("ClientCert", child->key) == 0) + cf_util_get_string (child, &cb->clientcert); + else if (strcasecmp ("ClientKeyPass", child->key) == 0) + cf_util_get_string (child, &cb->clientkeypass); + else if (strcasecmp ("SSLVersion", child->key) == 0) + { + char *value = NULL; + + cf_util_get_string (child, &value); + + if (value == NULL || strcasecmp ("default", value) == 0) + cb->sslversion = CURL_SSLVERSION_DEFAULT; + else if (strcasecmp ("SSLv2", value) == 0) + cb->sslversion = CURL_SSLVERSION_SSLv2; + else if (strcasecmp ("SSLv3", value) == 0) + cb->sslversion = CURL_SSLVERSION_SSLv3; + else if (strcasecmp ("TLSv1", value) == 0) + cb->sslversion = CURL_SSLVERSION_TLSv1; +#if (LIBCURL_VERSION_MAJOR > 7) || (LIBCURL_VERSION_MAJOR == 7 && LIBCURL_VERSION_MINOR >= 34) + else if (strcasecmp ("TLSv1_0", value) == 0) + cb->sslversion = CURL_SSLVERSION_TLSv1_0; + else if (strcasecmp ("TLSv1_1", value) == 0) + cb->sslversion = CURL_SSLVERSION_TLSv1_1; + else if (strcasecmp ("TLSv1_2", value) == 0) + cb->sslversion = CURL_SSLVERSION_TLSv1_2; +#endif + else + ERROR ("write_http plugin: Invalid SSLVersion " + "option: %s.", value); + + sfree(value); + } else if (strcasecmp ("Format", child->key) == 0) config_set_format (cb, child); else if (strcasecmp ("StoreRates", child->key) == 0) diff --git a/src/write_kafka.c b/src/write_kafka.c new file mode 100644 index 00000000..97db4265 --- /dev/null +++ b/src/write_kafka.c @@ -0,0 +1,418 @@ +/** + * collectd - src/write_kafka.c + * + * Copyright (C) 2014 Pierre-Yves Ritschard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER + * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING + * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * + * Authors: + * Pierre-Yves Ritschard + */ + +#include "collectd.h" +#include "plugin.h" +#include "common.h" +#include "configfile.h" +#include "utils_cache.h" +#include "utils_cmd_putval.h" +#include "utils_format_graphite.h" +#include "utils_format_json.h" +#include "utils_crc32.h" + +#include +#include +#include +#include + +struct kafka_topic_context { +#define KAFKA_FORMAT_COMMAND 1 +#define KAFKA_FORMAT_GRAPHITE 2 +#define KAFKA_FORMAT_JSON 3 + u_int8_t format; + unsigned int graphite_flags; + _Bool store_rates; + rd_kafka_topic_conf_t *conf; + rd_kafka_topic_t *topic; + rd_kafka_t *kafka; + int has_key; + u_int32_t key; + char *prefix; + char *postfix; + char escape_char; + char *topic_name; +}; + +static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *); +static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t, + int32_t, void *, void *); +static void kafka_log(const rd_kafka_t *, int, const char *, const char *); + +static void kafka_log(const rd_kafka_t *rkt, int level, + const char *fac, const char *msg) +{ + plugin_log(level, "%s", msg); +} + +static int32_t kafka_partition(const rd_kafka_topic_t *rkt, + const void *keydata, size_t keylen, + int32_t partition_cnt, void *p, void *m) +{ + u_int32_t key = *((u_int32_t *)keydata ); + + return key % partition_cnt; +} + +static int kafka_write(const data_set_t *ds, /* {{{ */ + const value_list_t *vl, + user_data_t *ud) +{ + int status = 0; + u_int32_t key; + char buffer[8192]; + size_t bfree = sizeof(buffer); + size_t bfill = 0; + size_t blen = 0; + struct kafka_topic_context *ctx = ud->data; + + if ((ds == NULL) || (vl == NULL) || (ctx == NULL)) + return EINVAL; + + bzero(buffer, sizeof(buffer)); + + switch (ctx->format) { + case KAFKA_FORMAT_COMMAND: + status = create_putval(buffer, sizeof(buffer), ds, vl); + if (status != 0) { + ERROR("write_kafka plugin: create_putval failed with status %i.", + status); + return status; + } + blen = strlen(buffer); + break; + case KAFKA_FORMAT_JSON: + + format_json_initialize(buffer, &bfill, &bfree); + format_json_value_list(buffer, &bfill, &bfree, ds, vl, + ctx->store_rates); + format_json_finalize(buffer, &bfill, &bfree); + blen = strlen(buffer); + break; + case KAFKA_FORMAT_GRAPHITE: + status = format_graphite(buffer, sizeof(buffer), ds, vl, + ctx->prefix, ctx->postfix, ctx->escape_char, + ctx->graphite_flags); + if (status != 0) { + ERROR("write_kafka plugin: format_graphite failed with status %i.", + status); + return status; + } + blen = strlen(buffer); + break; + default: + ERROR("write_kafka plugin: invalid format %i.", ctx->format); + return -1; + } + + /* + * We partition our stream by metric name + */ + if (ctx->has_key) + key = ctx->key; + else + key = rand(); + + rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_COPY, buffer, blen, + &key, sizeof(key), NULL); + + return status; +} /* }}} int kafka_write */ + +static void kafka_topic_context_free(void *p) /* {{{ */ +{ + struct kafka_topic_context *ctx = p; + + if (ctx == NULL) + return; + + if (ctx->topic_name != NULL) + sfree(ctx->topic_name); + if (ctx->topic != NULL) + rd_kafka_topic_destroy(ctx->topic); + if (ctx->conf != NULL) + rd_kafka_topic_conf_destroy(ctx->conf); + + sfree(ctx); +} /* }}} void kafka_topic_context_free */ + +static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ */ +{ + int status; + int i; + struct kafka_topic_context *tctx; + char *key; + char *val; + char callback_name[DATA_MAX_NAME_LEN]; + char errbuf[1024]; + user_data_t ud; + oconfig_item_t *child; + rd_kafka_conf_res_t ret; + + if ((tctx = calloc(1, sizeof (*tctx))) == NULL) { + ERROR ("write_kafka plugin: calloc failed."); + return; + } + + tctx->escape_char = '.'; + tctx->store_rates = 1; + + rd_kafka_conf_set_log_cb(conf, kafka_log); + if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf, + errbuf, sizeof(errbuf))) == NULL) { + sfree(tctx); + ERROR("write_kafka plugin: cannot create kafka handle."); + return; + } + conf = NULL; + + if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) { + rd_kafka_destroy(tctx->kafka); + sfree(tctx); + ERROR ("write_kafka plugin: cannot create topic configuration."); + return; + } + + if (ci->values_num != 1) { + WARNING("kafka topic name needed."); + goto errout; + } + + if (ci->values[0].type != OCONFIG_TYPE_STRING) { + WARNING("kafka topic needs a string argument."); + goto errout; + } + + if ((tctx->topic_name = strdup(ci->values[0].value.string)) == NULL) { + ERROR("write_kafka plugin: cannot copy topic name."); + goto errout; + } + + for (i = 0; i < ci->children_num; i++) { + /* + * The code here could be simplified but makes room + * for easy adding of new options later on. + */ + child = &ci->children[i]; + status = 0; + + if (strcasecmp ("Property", child->key) == 0) { + if (child->values_num != 2) { + WARNING("kafka properties need both a key and a value."); + goto errout; + } + if (child->values[0].type != OCONFIG_TYPE_STRING || + child->values[1].type != OCONFIG_TYPE_STRING) { + WARNING("kafka properties needs string arguments."); + goto errout; + } + key = child->values[0].value.string; + val = child->values[0].value.string; + ret = rd_kafka_topic_conf_set(tctx->conf,key, val, + errbuf, sizeof(errbuf)); + if (ret != RD_KAFKA_CONF_OK) { + WARNING("cannot set kafka topic property %s to %s: %s.", + key, val, errbuf); + goto errout; + } + + } else if (strcasecmp ("Key", child->key) == 0) { + char *tmp_buf = NULL; + status = cf_util_get_string(child, &tmp_buf); + if (status != 0) { + WARNING("write_kafka plugin: invalid key supplied"); + break; + } + + if (strcasecmp(tmp_buf, "Random") != 0) { + tctx->has_key = 1; + tctx->key = crc32_buffer((u_char *)tmp_buf, strlen(tmp_buf)); + } + sfree(tmp_buf); + + } else if (strcasecmp ("Format", child->key) == 0) { + status = cf_util_get_string(child, &key); + if (status != 0) + goto errout; + + assert(key != NULL); + + if (strcasecmp(key, "Command") == 0) { + + tctx->format = KAFKA_FORMAT_COMMAND; + + } else if (strcasecmp(key, "Graphite") == 0) { + tctx->format = KAFKA_FORMAT_GRAPHITE; + + } else if (strcasecmp(key, "Json") == 0) { + tctx->format = KAFKA_FORMAT_JSON; + + } else { + WARNING ("write_kafka plugin: Invalid format string: %s", + key); + } + sfree(key); + + } else if (strcasecmp ("StoreRates", child->key) == 0) { + status = cf_util_get_boolean (child, &tctx->store_rates); + (void) cf_util_get_flag (child, &tctx->graphite_flags, + GRAPHITE_STORE_RATES); + + } else if (strcasecmp ("GraphiteSeparateInstances", child->key) == 0) { + status = cf_util_get_flag (child, &tctx->graphite_flags, + GRAPHITE_SEPARATE_INSTANCES); + + } else if (strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) { + status = cf_util_get_flag (child, &tctx->graphite_flags, + GRAPHITE_ALWAYS_APPEND_DS); + + } else if (strcasecmp ("GraphitePrefix", child->key) == 0) { + status = cf_util_get_string (child, &tctx->prefix); + } else if (strcasecmp ("GraphitePostfix", child->key) == 0) { + status = cf_util_get_string (child, &tctx->postfix); + } else if (strcasecmp ("GraphiteEscapeChar", child->key) == 0) { + char *tmp_buff = NULL; + status = cf_util_get_string (child, &tmp_buff); + if (strlen (tmp_buff) > 1) + WARNING ("write_kafka plugin: The option \"GraphiteEscapeChar\" handles " + "only one character. Others will be ignored."); + tctx->escape_char = tmp_buff[0]; + sfree (tmp_buff); + } else { + WARNING ("write_kafka plugin: Invalid directive: %s.", child->key); + } + + if (status != 0) + break; + } + + rd_kafka_topic_conf_set_partitioner_cb(tctx->conf, kafka_partition); + rd_kafka_topic_conf_set_opaque(tctx->conf, tctx); + + if ((tctx->topic = rd_kafka_topic_new(tctx->kafka, tctx->topic_name, + tctx->conf)) == NULL) { + ERROR("write_kafka plugin: cannot create topic."); + goto errout; + } + tctx->conf = NULL; + + ssnprintf(callback_name, sizeof(callback_name), + "write_kafka/%s", tctx->topic_name); + + ud.data = tctx; + ud.free_func = kafka_topic_context_free; + + status = plugin_register_write (callback_name, kafka_write, &ud); + if (status != 0) { + WARNING ("write_kafka plugin: plugin_register_write (\"%s\") " + "failed with status %i.", + callback_name, status); + goto errout; + } + return; + errout: + if (conf != NULL) + rd_kafka_conf_destroy(conf); + if (tctx->kafka != NULL) + rd_kafka_destroy(tctx->kafka); + if (tctx->topic != NULL) + rd_kafka_topic_destroy(tctx->topic); + if (tctx->topic_name != NULL) + free(tctx->topic_name); + if (tctx->conf != NULL) + rd_kafka_topic_conf_destroy(tctx->conf); + sfree(tctx); +} /* }}} int kafka_config_topic */ + +static int kafka_config(oconfig_item_t *ci) /* {{{ */ +{ + int i; + oconfig_item_t *child; + rd_kafka_conf_t *conf; + rd_kafka_conf_t *cloned; + rd_kafka_conf_res_t ret; + char errbuf[1024]; + + if ((conf = rd_kafka_conf_new()) == NULL) { + WARNING("cannot allocate kafka configuration."); + return -1; + } + + for (i = 0; i < ci->children_num; i++) { + child = &ci->children[i]; + + if (strcasecmp("Topic", child->key) == 0) { + if ((cloned = rd_kafka_conf_dup(conf)) == NULL) { + WARNING("write_kafka plugin: cannot allocate memory for kafka config"); + goto errout; + } + kafka_config_topic (cloned, child); + } else if (strcasecmp(child->key, "Property") == 0) { + char *key = NULL; + char *val = NULL; + + if (child->values_num != 2) { + WARNING("kafka properties need both a key and a value."); + goto errout; + } + if (child->values[0].type != OCONFIG_TYPE_STRING || + child->values[1].type != OCONFIG_TYPE_STRING) { + WARNING("kafka properties needs string arguments."); + goto errout; + } + if ((key = strdup(child->values[0].value.string)) == NULL) { + WARNING("cannot allocate memory for attribute key."); + goto errout; + } + if ((val = strdup(child->values[1].value.string)) == NULL) { + WARNING("cannot allocate memory for attribute value."); + goto errout; + } + ret = rd_kafka_conf_set(conf, key, val, errbuf, sizeof(errbuf)); + if (ret != RD_KAFKA_CONF_OK) { + WARNING("cannot set kafka property %s to %s: %s", + key, val, errbuf); + goto errout; + } + sfree(key); + sfree(val); + } else { + WARNING ("write_kafka plugin: Ignoring unknown " + "configuration option \"%s\" at top level.", + child->key); + } + } + if (conf != NULL) + rd_kafka_conf_destroy(conf); + return (0); + errout: + if (conf != NULL) + rd_kafka_conf_destroy(conf); + return -1; +} /* }}} int kafka_config */ + +void module_register(void) +{ + plugin_register_complex_config ("write_kafka", kafka_config); +} + +/* vim: set sw=8 sts=8 ts=8 noet : */ diff --git a/src/write_riemann.c b/src/write_riemann.c index e85e9436..9d2b5398 100644 --- a/src/write_riemann.c +++ b/src/write_riemann.c @@ -39,11 +39,15 @@ #define RIEMANN_PORT "5555" #define RIEMANN_TTL_FACTOR 2.0 +int write_riemann_threshold_check(const data_set_t *, const value_list_t *, int *); + struct riemann_host { char *name; #define F_CONNECT 0x01 uint8_t flags; pthread_mutex_t lock; + _Bool notifications; + _Bool check_thresholds; _Bool store_rates; _Bool always_append_ds; char *node; @@ -453,7 +457,8 @@ static Msg *riemann_notification_to_protobuf (struct riemann_host *host, /* {{{ static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ */ data_set_t const *ds, value_list_t const *vl, size_t index, - gauge_t const *rates) + gauge_t const *rates, + int status) { Event *event; char name_buffer[5 * DATA_MAX_NAME_LEN]; @@ -474,6 +479,23 @@ static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ event->time = CDTIME_T_TO_TIME_T (vl->time); event->has_time = 1; + if (host->check_thresholds) { + switch (status) { + case STATE_OKAY: + event->state = strdup("ok"); + break; + case STATE_ERROR: + event->state = strdup("critical"); + break; + case STATE_WARNING: + event->state = strdup("warning"); + break; + case STATE_MISSING: + event->state = strdup("unknown"); + break; + } + } + ttl = CDTIME_T_TO_DOUBLE (vl->interval) * host->ttl_factor; event->ttl = (float) ttl; event->has_ttl = 1; @@ -557,8 +579,9 @@ static Event *riemann_value_to_protobuf (struct riemann_host const *host, /* {{{ } /* }}} Event *riemann_value_to_protobuf */ static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* {{{ */ - data_set_t const *ds, - value_list_t const *vl) + data_set_t const *ds, + value_list_t const *vl, + int *statuses) { Msg *msg; size_t i; @@ -598,7 +621,7 @@ static Msg *riemann_value_list_to_protobuf (struct riemann_host const *host, /* for (i = 0; i < msg->n_events; i++) { msg->events[i] = riemann_value_to_protobuf (host, ds, vl, - (int) i, rates); + (int) i, rates, statuses[i]); if (msg->events[i] == NULL) { riemann_msg_protobuf_free (msg); @@ -617,6 +640,9 @@ static int riemann_notification(const notification_t *n, user_data_t *ud) /* {{{ struct riemann_host *host = ud->data; Msg *msg; + if (!host->notifications) + return 0; + msg = riemann_notification_to_protobuf (host, n); if (msg == NULL) return (-1); @@ -635,10 +661,13 @@ static int riemann_write(const data_set_t *ds, /* {{{ */ user_data_t *ud) { int status; + int statuses[vl->values_len]; struct riemann_host *host = ud->data; Msg *msg; - msg = riemann_value_list_to_protobuf (host, ds, vl); + if (host->check_thresholds) + write_riemann_threshold_check(ds, vl, statuses); + msg = riemann_value_list_to_protobuf (host, ds, vl, statuses); if (msg == NULL) return (-1); @@ -691,6 +720,8 @@ static int riemann_config_node(oconfig_item_t *ci) /* {{{ */ host->reference_count = 1; host->node = NULL; host->service = NULL; + host->notifications = 1; + host->check_thresholds = 0; host->store_rates = 1; host->always_append_ds = 0; host->use_tcp = 0; @@ -715,6 +746,14 @@ static int riemann_config_node(oconfig_item_t *ci) /* {{{ */ status = cf_util_get_string (child, &host->node); if (status != 0) break; + } else if (strcasecmp ("Notifications", child->key) == 0) { + status = cf_util_get_boolean(child, &host->notifications); + if (status != 0) + break; + } else if (strcasecmp ("CheckThresholds", child->key) == 0) { + status = cf_util_get_boolean(child, &host->check_thresholds); + if (status != 0) + break; } else if (strcasecmp ("Port", child->key) == 0) { status = cf_util_get_service (child, &host->service); if (status != 0) { @@ -884,7 +923,7 @@ static int riemann_config(oconfig_item_t *ci) /* {{{ */ child->key); } } - return (0); + return 0; } /* }}} int riemann_config */ void module_register(void) diff --git a/src/write_riemann_threshold.c b/src/write_riemann_threshold.c new file mode 100644 index 00000000..ff96843b --- /dev/null +++ b/src/write_riemann_threshold.c @@ -0,0 +1,319 @@ +/** + * collectd - src/threshold.c + * Copyright (C) 2007-2010 Florian Forster + * Copyright (C) 2008-2009 Sebastian Harl + * Copyright (C) 2009 Andrés J. Díaz + * Copyright (C) 2014 Pierre-Yves Ritschard + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Author: + * Pierre-Yves Ritschard + * Florian octo Forster + * Sebastian Harl + * Andrés J. Díaz + **/ + +#include "collectd.h" +#include "common.h" +#include "plugin.h" +#include "utils_avltree.h" +#include "utils_cache.h" +#include "utils_threshold.h" + +#include +#include +#include + +/* + * Threshold management + * ==================== + * The following functions add, delete, search, etc. configured thresholds to + * the underlying AVL trees. + */ +/* + * threshold_t *threshold_get + * + * Retrieve one specific threshold configuration. For looking up a threshold + * matching a value_list_t, see "threshold_search" below. Returns NULL if the + * specified threshold doesn't exist. + */ +static threshold_t *threshold_get (const char *hostname, + const char *plugin, const char *plugin_instance, + const char *type, const char *type_instance) +{ /* {{{ */ + char name[6 * DATA_MAX_NAME_LEN]; + threshold_t *th = NULL; + + format_name (name, sizeof (name), + (hostname == NULL) ? "" : hostname, + (plugin == NULL) ? "" : plugin, plugin_instance, + (type == NULL) ? "" : type, type_instance); + name[sizeof (name) - 1] = '\0'; + + if (c_avl_get (threshold_tree, name, (void *) &th) == 0) + return (th); + else + return (NULL); +} /* }}} threshold_t *threshold_get */ + +/* + * threshold_t *threshold_search + * + * Searches for a threshold configuration using all the possible variations of + * "Host", "Plugin" and "Type" blocks. Returns NULL if no threshold could be + * found. + * XXX: This is likely the least efficient function in collectd. + */ +static threshold_t *threshold_search (const value_list_t *vl) +{ /* {{{ */ + threshold_t *th; + + if ((th = threshold_get (vl->host, vl->plugin, vl->plugin_instance, + vl->type, vl->type_instance)) != NULL) + return (th); + else if ((th = threshold_get (vl->host, vl->plugin, vl->plugin_instance, + vl->type, NULL)) != NULL) + return (th); + else if ((th = threshold_get (vl->host, vl->plugin, NULL, + vl->type, vl->type_instance)) != NULL) + return (th); + else if ((th = threshold_get (vl->host, vl->plugin, NULL, + vl->type, NULL)) != NULL) + return (th); + else if ((th = threshold_get (vl->host, "", NULL, + vl->type, vl->type_instance)) != NULL) + return (th); + else if ((th = threshold_get (vl->host, "", NULL, + vl->type, NULL)) != NULL) + return (th); + else if ((th = threshold_get ("", vl->plugin, vl->plugin_instance, + vl->type, vl->type_instance)) != NULL) + return (th); + else if ((th = threshold_get ("", vl->plugin, vl->plugin_instance, + vl->type, NULL)) != NULL) + return (th); + else if ((th = threshold_get ("", vl->plugin, NULL, + vl->type, vl->type_instance)) != NULL) + return (th); + else if ((th = threshold_get ("", vl->plugin, NULL, + vl->type, NULL)) != NULL) + return (th); + else if ((th = threshold_get ("", "", NULL, + vl->type, vl->type_instance)) != NULL) + return (th); + else if ((th = threshold_get ("", "", NULL, + vl->type, NULL)) != NULL) + return (th); + + return (NULL); +} /* }}} threshold_t *threshold_search */ + +/* + * int ut_check_one_data_source + * + * Checks one data source against the given threshold configuration. If the + * `DataSource' option is set in the threshold, and the name does NOT match, + * `okay' is returned. If the threshold does match, its failure and warning + * min and max values are checked and `failure' or `warning' is returned if + * appropriate. + * Does not fail. + */ +static int ut_check_one_data_source (const data_set_t *ds, + const value_list_t __attribute__((unused)) *vl, + const threshold_t *th, + const gauge_t *values, + int ds_index) +{ /* {{{ */ + const char *ds_name; + int is_warning = 0; + int is_failure = 0; + int prev_state = STATE_OKAY; + + /* check if this threshold applies to this data source */ + if (ds != NULL) + { + ds_name = ds->ds[ds_index].name; + if ((th->data_source[0] != 0) + && (strcmp (ds_name, th->data_source) != 0)) + return (STATE_OKAY); + } + + if ((th->flags & UT_FLAG_INVERT) != 0) + { + is_warning--; + is_failure--; + } + + /* XXX: This is an experimental code, not optimized, not fast, not reliable, + * and probably, do not work as you expect. Enjoy! :D */ + if ( (th->hysteresis > 0) && ((prev_state = uc_get_state(ds,vl)) != STATE_OKAY) ) + { + switch(prev_state) + { + case STATE_ERROR: + if ( (!isnan (th->failure_min) && ((th->failure_min + th->hysteresis) < values[ds_index])) || + (!isnan (th->failure_max) && ((th->failure_max - th->hysteresis) > values[ds_index])) ) + return (STATE_OKAY); + else + is_failure++; + case STATE_WARNING: + if ( (!isnan (th->warning_min) && ((th->warning_min + th->hysteresis) < values[ds_index])) || + (!isnan (th->warning_max) && ((th->warning_max - th->hysteresis) > values[ds_index])) ) + return (STATE_OKAY); + else + is_warning++; + } + } + else { /* no hysteresis */ + if ((!isnan (th->failure_min) && (th->failure_min > values[ds_index])) + || (!isnan (th->failure_max) && (th->failure_max < values[ds_index]))) + is_failure++; + + if ((!isnan (th->warning_min) && (th->warning_min > values[ds_index])) + || (!isnan (th->warning_max) && (th->warning_max < values[ds_index]))) + is_warning++; + } + + if (is_failure != 0) + return (STATE_ERROR); + + if (is_warning != 0) + return (STATE_WARNING); + + return (STATE_OKAY); +} /* }}} int ut_check_one_data_source */ + +/* + * int ut_check_one_threshold + * + * Checks all data sources of a value list against the given threshold, using + * the ut_check_one_data_source function above. Returns the worst status, + * which is `okay' if nothing has failed. + * Returns less than zero if the data set doesn't have any data sources. + */ +static int ut_check_one_threshold (const data_set_t *ds, + const value_list_t *vl, + const threshold_t *th, + const gauge_t *values, + int *statuses) +{ /* {{{ */ + int ret = -1; + int i; + int status; + gauge_t values_copy[ds->ds_num]; + + memcpy (values_copy, values, sizeof (values_copy)); + + if ((th->flags & UT_FLAG_PERCENTAGE) != 0) + { + int num = 0; + gauge_t sum=0.0; + + if (ds->ds_num == 1) + { + WARNING ("ut_check_one_threshold: The %s type has only one data " + "source, but you have configured to check this as a percentage. " + "That doesn't make much sense, because the percentage will always " + "be 100%%!", ds->type); + } + + /* Prepare `sum' and `num'. */ + for (i = 0; i < ds->ds_num; i++) + if (!isnan (values[i])) + { + num++; + sum += values[i]; + } + + if ((num == 0) /* All data sources are undefined. */ + || (sum == 0.0)) /* Sum is zero, cannot calculate percentage. */ + { + for (i = 0; i < ds->ds_num; i++) + values_copy[i] = NAN; + } + else /* We can actually calculate the percentage. */ + { + for (i = 0; i < ds->ds_num; i++) + values_copy[i] = 100.0 * values[i] / sum; + } + } /* if (UT_FLAG_PERCENTAGE) */ + + for (i = 0; i < ds->ds_num; i++) + { + status = ut_check_one_data_source (ds, vl, th, values_copy, i); + if (status != -1) { + ret = 0; + if (statuses[i] < status) + statuses[i] = status; + } + } /* for (ds->ds_num) */ + + return (ret); +} /* }}} int ut_check_one_threshold */ + +/* + * int ut_check_threshold + * + * Gets a list of matching thresholds and searches for the worst status by one + * of the thresholds. Then reports that status using the ut_report_state + * function above. + * Returns zero on success and if no threshold has been configured. Returns + * less than zero on failure. + */ +int write_riemann_threshold_check (const data_set_t *ds, const value_list_t *vl, + int *statuses) +{ /* {{{ */ + threshold_t *th; + gauge_t *values; + int status; + + memset(statuses, 0, vl->values_len * sizeof(*statuses)); + if (threshold_tree == NULL) + return 0; + + /* Is this lock really necessary? So far, thresholds are only inserted at + * startup. -octo */ + pthread_mutex_lock (&threshold_lock); + th = threshold_search (vl); + pthread_mutex_unlock (&threshold_lock); + if (th == NULL) + return (0); + + DEBUG ("ut_check_threshold: Found matching threshold(s)"); + + values = uc_get_rate (ds, vl); + if (values == NULL) + return (0); + + while (th != NULL) + { + status = ut_check_one_threshold (ds, vl, th, values, statuses); + if (status < 0) + { + ERROR ("ut_check_threshold: ut_check_one_threshold failed."); + sfree (values); + return (-1); + } + + th = th->next; + } /* while (th) */ + + sfree (values); + + return (0); +} /* }}} int ut_check_threshold */ + + +/* vim: set sw=2 ts=8 sts=2 tw=78 et fdm=marker : */