From: Pierre-Yves Ritschard Date: Fri, 25 Jul 2014 13:34:16 +0000 (+0200) Subject: Merge pull request #670 from pyr/feature-write-kafka X-Git-Tag: collectd-5.5.0~290 X-Git-Url: https://git.octo.it/?a=commitdiff_plain;h=5ff0ee586df3ae4668f105057a5b8e6fc183f1a0;hp=-c;p=collectd.git Merge pull request #670 from pyr/feature-write-kafka Add a write_kafka output with similar properties to the amqp one. --- 5ff0ee586df3ae4668f105057a5b8e6fc183f1a0 diff --combined configure.ac index 53cfdd6a,0ba3bf1e..f09887e4 --- a/configure.ac +++ b/configure.ac @@@ -981,7 -981,7 +981,7 @@@ if test "x$fp_layout_type" = "xunknown" uint8_t c[8]; double d; - d = 8.642135e130; + d = 8.642135e130; memcpy ((void *) &i0, (void *) &d, 8); i1 = i0; @@@ -1036,7 -1036,7 +1036,7 @@@ if test "x$fp_layout_type" = "xunknown" uint8_t c[8]; double d; - d = 8.642135e130; + d = 8.642135e130; memcpy ((void *) &i0, (void *) &d, 8); i1 = endianflip (i0); @@@ -1085,7 -1085,7 +1085,7 @@@ if test "x$fp_layout_type" = "xunknown" uint8_t c[8]; double d; - d = 8.642135e130; + d = 8.642135e130; memcpy ((void *) &i0, (void *) &d, 8); i1 = intswap (i0); @@@ -1237,7 -1237,7 +1237,7 @@@ AC_MSG_CHECKING([if have htonll defined have_htonll="yes" AC_DEFINE(HAVE_HTONLL, 1, [Define if the function htonll exists.]) ]) - + AC_MSG_RESULT([$have_htonll]) # Check for structures @@@ -1380,7 -1380,7 +1380,7 @@@ collectd additional packages:] AM_CONDITIONAL([BUILD_FREEBSD],[test "x$x$ac_system" = "xFreeBSD"]) - AM_CONDITIONAL([BUILD_AIX],[test "x$x$ac_system" = "xAIX"]) + AM_CONDITIONAL([BUILD_AIX],[test "x$x$ac_system" = "xAIX"]) if test "x$ac_system" = "xAIX" then @@@ -2794,7 -2794,7 +2794,7 @@@ the else SAVE_CPPFLAGS="$CPPFLAGS" CPPFLAGS="$CPPFLAGS $with_snmp_cflags" - + AC_CHECK_HEADERS(net-snmp/net-snmp-config.h, [], [with_libnetsnmp="no (net-snmp/net-snmp-config.h not found)"]) CPPFLAGS="$SAVE_CPPFLAGS" @@@ -3021,7 -3021,7 +3021,7 @@@ if test "x$with_libowcapi" = "xyes then SAVE_CPPFLAGS="$CPPFLAGS" CPPFLAGS="$with_libowcapi_cppflags" - + AC_CHECK_HEADERS(owcapi.h, [with_libowcapi="yes"], [with_libowcapi="no (owcapi.h not found)"]) CPPFLAGS="$SAVE_CPPFLAGS" @@@ -3032,7 -3032,7 +3032,7 @@@ the SAVE_CPPFLAGS="$CPPFLAGS" LDFLAGS="$with_libowcapi_libs" CPPFLAGS="$with_libowcapi_cppflags" - + AC_CHECK_LIB(owcapi, OW_get, [with_libowcapi="yes"], [with_libowcapi="no (libowcapi not found)"]) LDFLAGS="$SAVE_LDFLAGS" @@@ -3597,6 -3597,49 +3597,49 @@@ LDFLAGS="$SAVE_LDFLAGS AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes") # }}} + # --with-librdkafka {{{ + AC_ARG_WITH(librdkafka, [AS_HELP_STRING([--with-librdkafka@<:@=PREFIX@:>@], [Path to librdkafka.])], + [ + if test "x$withval" = "xno" && test "x$withval" != "xyes" + then + with_librdkafka_cppflags="-I$withval/include" + with_librdkafka_ldflags="-L$withval/lib" + with_librdkafka="yes" + else + with_librdkafka="$withval" + fi + ], + [ + with_librdkafka="yes" + ]) + SAVE_CPPFLAGS="$CPPFLAGS" + SAVE_LDFLAGS="$LDFLAGS" + + if test "x$with_librdkafka" = "xyes" + then + AC_CHECK_HEADERS(librdkafka/rdkafka.h, [with_librdkafka="yes"], [with_librdkafka="no (librdkafka/rdkafka.h not found)"]) + fi + + if test "x$with_librdkafka" = "xyes" + then + AC_CHECK_LIB(rdkafka, rd_kafka_new, [with_librdkafka="yes"], [with_librdkafka="no (Symbol 'rd_kafka_new' not found)"]) + fi + if test "x$with_librdkafka" = "xyes" + then + BUILD_WITH_LIBRDKAFKA_CPPFLAGS="$with_librdkafka_cppflags" + BUILD_WITH_LIBRDKAFKA_LDFLAGS="$with_librdkafka_ldflags" + BUILD_WITH_LIBRDKAFKA_LIBS="-lrdkafka" + AC_SUBST(BUILD_WITH_LIBRDKAFKA_CPPFLAGS) + AC_SUBST(BUILD_WITH_LIBRDKAFKA_LDFLAGS) + AC_SUBST(BUILD_WITH_LIBRDKAFKA_LIBS) + AC_DEFINE(HAVE_LIBRDKAFKA, 1, [Define if librdkafka is present and usable.]) + fi + CPPFLAGS="$SAVE_CPPFLAGS" + LDFLAGS="$SAVE_LDFLAGS" + AM_CONDITIONAL(BUILD_WITH_LIBRDKAFKA, test "x$with_librdkafka" = "xyes") + + # }}} + # --with-librouteros {{{ AC_ARG_WITH(librouteros, [AS_HELP_STRING([--with-librouteros@<:@=PREFIX@:>@], [Path to librouteros.])], [ @@@ -4039,7 -4082,7 +4082,7 @@@ CPPFLAGS="$SAVE_CPPFLAGS LDFLAGS="$SAVE_LDFLAGS" if test "x$with_libtokyotyrant" = "xyes" - then + then BUILD_WITH_LIBTOKYOTYRANT_CPPFLAGS="$with_libtokyotyrant_cppflags" BUILD_WITH_LIBTOKYOTYRANT_LDFLAGS="$with_libtokyotyrant_ldflags" BUILD_WITH_LIBTOKYOTYRANT_LIBS="$with_libtokyotyrant_libs" @@@ -4050,67 -4093,6 +4093,67 @@@ f AM_CONDITIONAL(BUILD_WITH_LIBTOKYOTYRANT, test "x$with_libtokyotyrant" = "xyes") # }}} +# --with-libudev {{{ +with_libudev_cflags="" +with_libudev_ldflags="" +AC_ARG_WITH(libudev, [AS_HELP_STRING([--with-libudev@<:@=PREFIX@:>@], [Path to libudev.])], +[ + if test "x$withval" = "xno" + then + with_libudev="no" + else + with_libudev="yes" + if test "x$withval" != "xyes" + then + with_libudev_cflags="-I$withval/include" + with_libudev_ldflags="-L$withval/lib" + with_libudev="yes" + fi + fi +], +[ + if test "x$ac_system" = "xLinux" + then + with_libudev="yes" + else + with_libudev="no (Linux only library)" + fi +]) +if test "x$with_libudev" = "xyes" +then + SAVE_CPPFLAGS="$CPPFLAGS" + CPPFLAGS="$CPPFLAGS $with_libudev_cflags" + + AC_CHECK_HEADERS(libudev.h, [], [with_libudev="no (libudev.h not found)"]) + + CPPFLAGS="$SAVE_CPPFLAGS" +fi +if test "x$with_libudev" = "xyes" +then + SAVE_CPPFLAGS="$CPPFLAGS" + SAVE_LDFLAGS="$LDFLAGS" + CPPFLAGS="$CPPFLAGS $with_libudev_cflags" + LDFLAGS="$LDFLAGS $with_libudev_ldflags" + + AC_CHECK_LIB(udev, udev_new, + [ + AC_DEFINE(HAVE_LIBUDEV, 1, [Define to 1 if you have the udev library (-ludev).]) + ], + [with_libudev="no (libudev not found)"]) + + CPPFLAGS="$SAVE_CPPFLAGS" + LDFLAGS="$SAVE_LDFLAGS" +fi +if test "x$with_libudev" = "xyes" +then + BUILD_WITH_LIBUDEV_CFLAGS="$with_libudev_cflags" + BUILD_WITH_LIBUDEV_LDFLAGS="$with_libudev_ldflags" + AC_SUBST(BUILD_WITH_LIBUDEV_CFLAGS) + AC_SUBST(BUILD_WITH_LIBUDEV_LDFLAGS) +fi +AM_CONDITIONAL(BUILD_WITH_LIBUDEV, test "x$with_libudev" = "xyes") +# }}} + # --with-libupsclient {{{ with_libupsclient_config="" with_libupsclient_cflags="" @@@ -5284,6 -5266,7 +5327,7 @@@ AC_PLUGIN([vserver], [$plugin_vserv AC_PLUGIN([wireless], [$plugin_wireless], [Wireless statistics]) AC_PLUGIN([write_graphite], [yes], [Graphite / Carbon output plugin]) AC_PLUGIN([write_http], [$with_libcurl], [HTTP output plugin]) + AC_PLUGIN([write_kafka], [$with_librdkafka], [Kafka output plugin]) AC_PLUGIN([write_mongodb], [$with_libmongoc], [MongoDB output plugin]) AC_PLUGIN([write_redis], [$with_libcredis], [Redis output plugin]) AC_PLUGIN([write_riemann], [$have_protoc_c], [Riemann output plugin]) @@@ -5488,13 -5471,13 +5532,14 @@@ Configuration libpq . . . . . . . . $with_libpq libpthread . . . . . $with_libpthread librabbitmq . . . . . $with_librabbitmq + librdkafka . . . . . $with_librdkafka librouteros . . . . . $with_librouteros librrd . . . . . . . $with_librrd libsensors . . . . . $with_libsensors libsigrok . . . . . $with_libsigrok libstatgrab . . . . . $with_libstatgrab libtokyotyrant . . . $with_libtokyotyrant + libudev . . . . . . . $with_libudev libupsclient . . . . $with_libupsclient libvarnish . . . . . $with_libvarnish libvirt . . . . . . . $with_libvirt @@@ -5629,6 -5612,7 +5674,7 @@@ wireless . . . . . . $enable_wireless write_graphite . . . $enable_write_graphite write_http . . . . . $enable_write_http + write_kafka . . . . . $enable_write_kafka write_mongodb . . . . $enable_write_mongodb write_redis . . . . . $enable_write_redis write_riemann . . . . $enable_write_riemann diff --combined src/Makefile.am index 566260a8,5959c64f..5e8f76ae --- a/src/Makefile.am +++ b/src/Makefile.am @@@ -39,9 -39,7 +39,9 @@@ collectd_SOURCES = collectd.c collectd. utils_subst.c utils_subst.h \ utils_tail.c utils_tail.h \ utils_time.c utils_time.h \ - types_list.c types_list.h + types_list.c types_list.h \ + utils_threshold.c utils_threshold.h + collectd_CPPFLAGS = $(AM_CPPFLAGS) $(LTDLINCL) collectd_CFLAGS = $(AM_CFLAGS) @@@ -387,9 -385,6 +387,9 @@@ if BUILD_WITH_LIBSTATGRA disk_la_CFLAGS += $(BUILD_WITH_LIBSTATGRAB_CFLAGS) disk_la_LIBADD += $(BUILD_WITH_LIBSTATGRAB_LDFLAGS) endif +if BUILD_WITH_LIBUDEV +disk_la_LIBADD += -ludev +endif if BUILD_WITH_PERFSTAT disk_la_LIBADD += -lperfstat endif @@@ -1392,6 -1387,19 +1392,19 @@@ endi collectd_DEPENDENCIES += write_http.la endif + if BUILD_PLUGIN_WRITE_KAFKA + pkglib_LTLIBRARIES += write_kafka.la + write_kafka_la_SOURCES = write_kafka.c \ + utils_format_graphite.c utils_format_graphite.h \ + utils_format_json.c utils_format_json.h \ + utils_cmd_putval.c utils_cmd_putval.h \ + utils_crc32.c utils_crc32.h + write_kafka_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRDKAFKA_LDFLAGS) + write_kafka_la_LIBADD = $(BUILD_WITH_LIBRDKAFKA_LIBS) + collectd_LDADD += "-dlopen" write_kafka.la + collectd_DEPENDENCIES += write_kafka.la + endif + if BUILD_PLUGIN_WRITE_MONGODB pkglib_LTLIBRARIES += write_mongodb.la write_mongodb_la_SOURCES = write_mongodb.c @@@ -1414,7 -1422,7 +1427,7 @@@ endi if BUILD_PLUGIN_WRITE_RIEMANN pkglib_LTLIBRARIES += write_riemann.la -write_riemann_la_SOURCES = write_riemann.c +write_riemann_la_SOURCES = write_riemann.c write_riemann_threshold.c nodist_write_riemann_la_SOURCES = riemann.pb-c.c riemann.pb-c.h write_riemann_la_LDFLAGS = -module -avoid-version write_riemann_la_LIBADD = -lprotobuf-c diff --combined src/collectd.conf.pod index 5eaf37f9,fd921e23..39dea7e5 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@@ -1688,20 -1688,6 +1688,20 @@@ collected. If at least one B opti set to B, B matching disks will be collected. If B is set to B, all disks are collected B the ones matched. +=item B B|B + +Whether to use the device's "BSD Name", on MacEOSEX, instead of the +default major/minor numbers. Requires collectd to be built with Apple's +IOKitLib support. + +=item B I + +Attempt to override disk instance name with the value of a specified udev +attribute when built with B. If the attribute is not defined for the +given device, the default name is used. Example: + + UdevNameAttr "DM_NAME" + =back =head2 Plugin C @@@ -6380,6 -6366,111 +6380,111 @@@ number =back + =head2 Plugin C + + The I will send values to a I topic, a distributed + queue. + Synopsis: + + + Property "metadata.broker.list" "broker1:9092,broker2:9092" + + Format JSON + + + + The following options are understood by the I: + + =over 4 + + =item EB IE + + The plugin's configuration consists of one or more B blocks. Each block + is given a unique I and specifies one kafka producer. + Inside the B block, the following per-topic options are + understood: + + =over 4 + + =item B I I + + Configure the named property for the current topic. Properties are + forwarded to the kafka producer library B. + + =item B I + + Use the specified string as a partioning key for the topic. Kafka breaks + topic into partitions and guarantees that for a given topology, the same + consumer will be used for a specific key. The special (case insensitive) + string B can be used to specify that an arbitrary partition should + be used. + + =item B B|B|B + + Selects the format in which messages are sent to the broker. If set to + B (the default), values are sent as C commands which are + identical to the syntax used by the I and I. + + If set to B, the values are encoded in the I, + an easy and straight forward exchange format. + + If set to B, values are encoded in the I format, which is + " \n". + + =item B B|B + + Determines whether or not C, C and C data sources + are converted to a I (i.e. a C value). If set to B (the + default), no conversion is performed. Otherwise the conversion is performed + using the internal value cache. + + Please note that currently this option is only used if the B option has + been set to B. + + =item B (B=I only) + + A prefix can be added in the metric name when outputting in the I format. + It's added before the I name. + Metric name will be "" + + =item B (B=I only) + + A postfix can be added in the metric name when outputting in the I format. + It's added after the I name. + Metric name will be "" + + =item B (B=I only) + + Specify a character to replace dots (.) in the host part of the metric name. + In I metric name, dots are used as separators between different + metric parts (host, plugin, type). + Default is "_" (I). + + =item B B|B + + If set to B, the plugin instance and type instance will be in their own + path component, for example C. If set to B (the + default), the plugin and plugin instance (and likewise the type and type + instance) are put into one component, for example C. + + =item B B|B + + If set to B (the default), convert counter values to rates. If set to + B counter values are stored as is, i.e. as an increasing integer number. + + This will be reflected in the C tag: If B is enabled, + converted values will have "rate" appended to the data source type, e.g. + C. + + =back + + =item B I I + + Configure the kafka producer through properties, you almost always will + want to set B to your Kafka broker list. + + =back + =head2 Plugin C The I will send values to I, a powerfull stream @@@ -6452,17 -6543,6 +6557,17 @@@ interval is multiplied to set the TTL. know exactly what you're doing, you should only increase this setting from its default value. +=item B B|B + +If set to B, create riemann events for notifications. This is B +by default. When processing thresholds from write_riemann, it might prove +useful to avoid getting notification events. + +=item B B|B + +If set to B, attach state to events based on thresholds defined +in the B plugin. Defaults to B. + =back =item B I