From 630e3bba83de49555006037eafb291a1ce0b3c12 Mon Sep 17 00:00:00 2001 From: Pierre-Yves Ritschard Date: Tue, 22 Jul 2014 09:39:03 +0200 Subject: [PATCH] Add a write_kafka output with similar properties to the amqp one. --- configure.ac | 64 +++++++-- src/Makefile.am | 25 +++- src/utils_crc32.c | 110 +++++++++++++++ src/utils_crc32.h | 27 ++++ src/write_kafka.c | 411 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 622 insertions(+), 15 deletions(-) create mode 100644 src/utils_crc32.c create mode 100644 src/utils_crc32.h create mode 100644 src/write_kafka.c diff --git a/configure.ac b/configure.ac index f1c7b8ab..0ba3bf1e 100644 --- a/configure.ac +++ b/configure.ac @@ -981,7 +981,7 @@ if test "x$fp_layout_type" = "xunknown"; then uint8_t c[8]; double d; - d = 8.642135e130; + d = 8.642135e130; memcpy ((void *) &i0, (void *) &d, 8); i1 = i0; @@ -1036,7 +1036,7 @@ if test "x$fp_layout_type" = "xunknown"; then uint8_t c[8]; double d; - d = 8.642135e130; + d = 8.642135e130; memcpy ((void *) &i0, (void *) &d, 8); i1 = endianflip (i0); @@ -1085,7 +1085,7 @@ if test "x$fp_layout_type" = "xunknown"; then uint8_t c[8]; double d; - d = 8.642135e130; + d = 8.642135e130; memcpy ((void *) &i0, (void *) &d, 8); i1 = intswap (i0); @@ -1237,7 +1237,7 @@ AC_MSG_CHECKING([if have htonll defined]) have_htonll="yes" AC_DEFINE(HAVE_HTONLL, 1, [Define if the function htonll exists.]) ]) - + AC_MSG_RESULT([$have_htonll]) # Check for structures @@ -1380,7 +1380,7 @@ collectd additional packages:]) AM_CONDITIONAL([BUILD_FREEBSD],[test "x$x$ac_system" = "xFreeBSD"]) -AM_CONDITIONAL([BUILD_AIX],[test "x$x$ac_system" = "xAIX"]) +AM_CONDITIONAL([BUILD_AIX],[test "x$x$ac_system" = "xAIX"]) if test "x$ac_system" = "xAIX" then @@ -2794,7 +2794,7 @@ then else SAVE_CPPFLAGS="$CPPFLAGS" CPPFLAGS="$CPPFLAGS $with_snmp_cflags" - + AC_CHECK_HEADERS(net-snmp/net-snmp-config.h, [], [with_libnetsnmp="no (net-snmp/net-snmp-config.h not found)"]) CPPFLAGS="$SAVE_CPPFLAGS" @@ -3021,7 +3021,7 @@ if test "x$with_libowcapi" = "xyes" then SAVE_CPPFLAGS="$CPPFLAGS" CPPFLAGS="$with_libowcapi_cppflags" - + AC_CHECK_HEADERS(owcapi.h, [with_libowcapi="yes"], [with_libowcapi="no (owcapi.h not found)"]) CPPFLAGS="$SAVE_CPPFLAGS" @@ -3032,7 +3032,7 @@ then SAVE_CPPFLAGS="$CPPFLAGS" LDFLAGS="$with_libowcapi_libs" CPPFLAGS="$with_libowcapi_cppflags" - + AC_CHECK_LIB(owcapi, OW_get, [with_libowcapi="yes"], [with_libowcapi="no (libowcapi not found)"]) LDFLAGS="$SAVE_LDFLAGS" @@ -3597,6 +3597,49 @@ LDFLAGS="$SAVE_LDFLAGS" AM_CONDITIONAL(BUILD_WITH_LIBRABBITMQ, test "x$with_librabbitmq" = "xyes") # }}} +# --with-librdkafka {{{ +AC_ARG_WITH(librdkafka, [AS_HELP_STRING([--with-librdkafka@<:@=PREFIX@:>@], [Path to librdkafka.])], +[ + if test "x$withval" = "xno" && test "x$withval" != "xyes" + then + with_librdkafka_cppflags="-I$withval/include" + with_librdkafka_ldflags="-L$withval/lib" + with_librdkafka="yes" + else + with_librdkafka="$withval" + fi +], +[ + with_librdkafka="yes" +]) +SAVE_CPPFLAGS="$CPPFLAGS" +SAVE_LDFLAGS="$LDFLAGS" + +if test "x$with_librdkafka" = "xyes" +then + AC_CHECK_HEADERS(librdkafka/rdkafka.h, [with_librdkafka="yes"], [with_librdkafka="no (librdkafka/rdkafka.h not found)"]) +fi + +if test "x$with_librdkafka" = "xyes" +then + AC_CHECK_LIB(rdkafka, rd_kafka_new, [with_librdkafka="yes"], [with_librdkafka="no (Symbol 'rd_kafka_new' not found)"]) +fi +if test "x$with_librdkafka" = "xyes" +then + BUILD_WITH_LIBRDKAFKA_CPPFLAGS="$with_librdkafka_cppflags" + BUILD_WITH_LIBRDKAFKA_LDFLAGS="$with_librdkafka_ldflags" + BUILD_WITH_LIBRDKAFKA_LIBS="-lrdkafka" + AC_SUBST(BUILD_WITH_LIBRDKAFKA_CPPFLAGS) + AC_SUBST(BUILD_WITH_LIBRDKAFKA_LDFLAGS) + AC_SUBST(BUILD_WITH_LIBRDKAFKA_LIBS) + AC_DEFINE(HAVE_LIBRDKAFKA, 1, [Define if librdkafka is present and usable.]) +fi +CPPFLAGS="$SAVE_CPPFLAGS" +LDFLAGS="$SAVE_LDFLAGS" +AM_CONDITIONAL(BUILD_WITH_LIBRDKAFKA, test "x$with_librdkafka" = "xyes") + +# }}} + # --with-librouteros {{{ AC_ARG_WITH(librouteros, [AS_HELP_STRING([--with-librouteros@<:@=PREFIX@:>@], [Path to librouteros.])], [ @@ -4039,7 +4082,7 @@ CPPFLAGS="$SAVE_CPPFLAGS" LDFLAGS="$SAVE_LDFLAGS" if test "x$with_libtokyotyrant" = "xyes" -then +then BUILD_WITH_LIBTOKYOTYRANT_CPPFLAGS="$with_libtokyotyrant_cppflags" BUILD_WITH_LIBTOKYOTYRANT_LDFLAGS="$with_libtokyotyrant_ldflags" BUILD_WITH_LIBTOKYOTYRANT_LIBS="$with_libtokyotyrant_libs" @@ -5223,6 +5266,7 @@ AC_PLUGIN([vserver], [$plugin_vserver], [Linux VServer statistics]) AC_PLUGIN([wireless], [$plugin_wireless], [Wireless statistics]) AC_PLUGIN([write_graphite], [yes], [Graphite / Carbon output plugin]) AC_PLUGIN([write_http], [$with_libcurl], [HTTP output plugin]) +AC_PLUGIN([write_kafka], [$with_librdkafka], [Kafka output plugin]) AC_PLUGIN([write_mongodb], [$with_libmongoc], [MongoDB output plugin]) AC_PLUGIN([write_redis], [$with_libcredis], [Redis output plugin]) AC_PLUGIN([write_riemann], [$have_protoc_c], [Riemann output plugin]) @@ -5427,6 +5471,7 @@ Configuration: libpq . . . . . . . . $with_libpq libpthread . . . . . $with_libpthread librabbitmq . . . . . $with_librabbitmq + librdkafka . . . . . $with_librdkafka librouteros . . . . . $with_librouteros librrd . . . . . . . $with_librrd libsensors . . . . . $with_libsensors @@ -5567,6 +5612,7 @@ Configuration: wireless . . . . . . $enable_wireless write_graphite . . . $enable_write_graphite write_http . . . . . $enable_write_http + write_kafka . . . . . $enable_write_kafka write_mongodb . . . . $enable_write_mongodb write_redis . . . . . $enable_write_redis write_riemann . . . . $enable_write_riemann diff --git a/src/Makefile.am b/src/Makefile.am index a9d85823..5959c64f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -133,9 +133,9 @@ collectd_tg_LDADD += libcollectdclient/libcollectdclient.la collectd_tg_DEPENDENCIES = libcollectdclient/libcollectdclient.la -pkglib_LTLIBRARIES = +pkglib_LTLIBRARIES = -BUILT_SOURCES = +BUILT_SOURCES = CLEANFILES = if BUILD_PLUGIN_AGGREGATION @@ -273,7 +273,7 @@ pkglib_LTLIBRARIES += cpu.la cpu_la_SOURCES = cpu.c cpu_la_CFLAGS = $(AM_CFLAGS) cpu_la_LDFLAGS = -module -avoid-version -cpu_la_LIBADD = +cpu_la_LIBADD = if BUILD_WITH_LIBKSTAT cpu_la_LIBADD += -lkstat endif @@ -371,7 +371,7 @@ pkglib_LTLIBRARIES += disk.la disk_la_SOURCES = disk.c disk_la_CFLAGS = $(AM_CFLAGS) disk_la_LDFLAGS = -module -avoid-version -disk_la_LIBADD = +disk_la_LIBADD = if BUILD_WITH_LIBKSTAT disk_la_LIBADD += -lkstat endif @@ -382,7 +382,7 @@ if BUILD_WITH_LIBIOKIT disk_la_LDFLAGS += -framework IOKit endif if BUILD_WITH_LIBSTATGRAB -disk_la_CFLAGS += $(BUILD_WITH_LIBSTATGRAB_CFLAGS) +disk_la_CFLAGS += $(BUILD_WITH_LIBSTATGRAB_CFLAGS) disk_la_LIBADD += $(BUILD_WITH_LIBSTATGRAB_LDFLAGS) endif if BUILD_WITH_PERFSTAT @@ -862,7 +862,7 @@ if BUILD_PLUGIN_OLSRD pkglib_LTLIBRARIES += olsrd.la olsrd_la_SOURCES = olsrd.c olsrd_la_LDFLAGS = -module -avoid-version -olsrd_la_LIBADD = +olsrd_la_LIBADD = if BUILD_WITH_LIBSOCKET olsrd_la_LIBADD += -lsocket endif @@ -1387,6 +1387,19 @@ endif collectd_DEPENDENCIES += write_http.la endif +if BUILD_PLUGIN_WRITE_KAFKA +pkglib_LTLIBRARIES += write_kafka.la +write_kafka_la_SOURCES = write_kafka.c \ + utils_format_graphite.c utils_format_graphite.h \ + utils_format_json.c utils_format_json.h \ + utils_cmd_putval.c utils_cmd_putval.h \ + utils_crc32.c utils_crc32.h +write_kafka_la_LDFLAGS = -module -avoid-version $(BUILD_WITH_LIBRDKAFKA_LDFLAGS) +write_kafka_la_LIBADD = $(BUILD_WITH_LIBRDKAFKA_LIBS) +collectd_LDADD += "-dlopen" write_kafka.la +collectd_DEPENDENCIES += write_kafka.la +endif + if BUILD_PLUGIN_WRITE_MONGODB pkglib_LTLIBRARIES += write_mongodb.la write_mongodb_la_SOURCES = write_mongodb.c diff --git a/src/utils_crc32.c b/src/utils_crc32.c new file mode 100644 index 00000000..4c6d6941 --- /dev/null +++ b/src/utils_crc32.c @@ -0,0 +1,110 @@ +/* + * COPYRIGHT (C) 1986 Gary S. Brown. You may use this program, or + * code or tables extracted from it, as desired without restriction. + * + * First, the polynomial itself and its table of feedback terms. The + * polynomial is + * X^32+X^26+X^23+X^22+X^16+X^12+X^11+X^10+X^8+X^7+X^5+X^4+X^2+X^1+X^0 + * + * Note that we take it "backwards" and put the highest-order term in + * the lowest-order bit. The X^32 term is "implied"; the LSB is the + * X^31 term, etc. The X^0 term (usually shown as "+1") results in + * the MSB being 1 + * + * Note that the usual hardware shift register implementation, which + * is what we're using (we're merely optimizing it by doing eight-bit + * chunks at a time) shifts bits into the lowest-order term. In our + * implementation, that means shifting towards the right. Why do we + * do it this way? Because the calculated CRC must be transmitted in + * order from highest-order term to lowest-order term. UARTs transmit + * characters in order from LSB to MSB. By storing the CRC this way + * we hand it to the UART in the order low-byte to high-byte; the UART + * sends each low-bit to hight-bit; and the result is transmission bit + * by bit from highest- to lowest-order term without requiring any bit + * shuffling on our part. Reception works similarly + * + * The feedback terms table consists of 256, 32-bit entries. Notes + * + * The table can be generated at runtime if desired; code to do so + * is shown later. It might not be obvious, but the feedback + * terms simply represent the results of eight shift/xor opera + * tions for all combinations of data and CRC register values + * + * The values must be right-shifted by eight bits by the "updcrc + * logic; the shift must be unsigned (bring in zeroes). On some + * hardware you could probably optimize the shift in assembler by + * using byte-swap instructions + * polynomial $edb88320 + */ + +#include + +u_int32_t crc32_buffer(const u_char *, size_t); +static unsigned int crc32_tab[] = { + 0x00000000L, 0x77073096L, 0xee0e612cL, 0x990951baL, 0x076dc419L, + 0x706af48fL, 0xe963a535L, 0x9e6495a3L, 0x0edb8832L, 0x79dcb8a4L, + 0xe0d5e91eL, 0x97d2d988L, 0x09b64c2bL, 0x7eb17cbdL, 0xe7b82d07L, + 0x90bf1d91L, 0x1db71064L, 0x6ab020f2L, 0xf3b97148L, 0x84be41deL, + 0x1adad47dL, 0x6ddde4ebL, 0xf4d4b551L, 0x83d385c7L, 0x136c9856L, + 0x646ba8c0L, 0xfd62f97aL, 0x8a65c9ecL, 0x14015c4fL, 0x63066cd9L, + 0xfa0f3d63L, 0x8d080df5L, 0x3b6e20c8L, 0x4c69105eL, 0xd56041e4L, + 0xa2677172L, 0x3c03e4d1L, 0x4b04d447L, 0xd20d85fdL, 0xa50ab56bL, + 0x35b5a8faL, 0x42b2986cL, 0xdbbbc9d6L, 0xacbcf940L, 0x32d86ce3L, + 0x45df5c75L, 0xdcd60dcfL, 0xabd13d59L, 0x26d930acL, 0x51de003aL, + 0xc8d75180L, 0xbfd06116L, 0x21b4f4b5L, 0x56b3c423L, 0xcfba9599L, + 0xb8bda50fL, 0x2802b89eL, 0x5f058808L, 0xc60cd9b2L, 0xb10be924L, + 0x2f6f7c87L, 0x58684c11L, 0xc1611dabL, 0xb6662d3dL, 0x76dc4190L, + 0x01db7106L, 0x98d220bcL, 0xefd5102aL, 0x71b18589L, 0x06b6b51fL, + 0x9fbfe4a5L, 0xe8b8d433L, 0x7807c9a2L, 0x0f00f934L, 0x9609a88eL, + 0xe10e9818L, 0x7f6a0dbbL, 0x086d3d2dL, 0x91646c97L, 0xe6635c01L, + 0x6b6b51f4L, 0x1c6c6162L, 0x856530d8L, 0xf262004eL, 0x6c0695edL, + 0x1b01a57bL, 0x8208f4c1L, 0xf50fc457L, 0x65b0d9c6L, 0x12b7e950L, + 0x8bbeb8eaL, 0xfcb9887cL, 0x62dd1ddfL, 0x15da2d49L, 0x8cd37cf3L, + 0xfbd44c65L, 0x4db26158L, 0x3ab551ceL, 0xa3bc0074L, 0xd4bb30e2L, + 0x4adfa541L, 0x3dd895d7L, 0xa4d1c46dL, 0xd3d6f4fbL, 0x4369e96aL, + 0x346ed9fcL, 0xad678846L, 0xda60b8d0L, 0x44042d73L, 0x33031de5L, + 0xaa0a4c5fL, 0xdd0d7cc9L, 0x5005713cL, 0x270241aaL, 0xbe0b1010L, + 0xc90c2086L, 0x5768b525L, 0x206f85b3L, 0xb966d409L, 0xce61e49fL, + 0x5edef90eL, 0x29d9c998L, 0xb0d09822L, 0xc7d7a8b4L, 0x59b33d17L, + 0x2eb40d81L, 0xb7bd5c3bL, 0xc0ba6cadL, 0xedb88320L, 0x9abfb3b6L, + 0x03b6e20cL, 0x74b1d29aL, 0xead54739L, 0x9dd277afL, 0x04db2615L, + 0x73dc1683L, 0xe3630b12L, 0x94643b84L, 0x0d6d6a3eL, 0x7a6a5aa8L, + 0xe40ecf0bL, 0x9309ff9dL, 0x0a00ae27L, 0x7d079eb1L, 0xf00f9344L, + 0x8708a3d2L, 0x1e01f268L, 0x6906c2feL, 0xf762575dL, 0x806567cbL, + 0x196c3671L, 0x6e6b06e7L, 0xfed41b76L, 0x89d32be0L, 0x10da7a5aL, + 0x67dd4accL, 0xf9b9df6fL, 0x8ebeeff9L, 0x17b7be43L, 0x60b08ed5L, + 0xd6d6a3e8L, 0xa1d1937eL, 0x38d8c2c4L, 0x4fdff252L, 0xd1bb67f1L, + 0xa6bc5767L, 0x3fb506ddL, 0x48b2364bL, 0xd80d2bdaL, 0xaf0a1b4cL, + 0x36034af6L, 0x41047a60L, 0xdf60efc3L, 0xa867df55L, 0x316e8eefL, + 0x4669be79L, 0xcb61b38cL, 0xbc66831aL, 0x256fd2a0L, 0x5268e236L, + 0xcc0c7795L, 0xbb0b4703L, 0x220216b9L, 0x5505262fL, 0xc5ba3bbeL, + 0xb2bd0b28L, 0x2bb45a92L, 0x5cb36a04L, 0xc2d7ffa7L, 0xb5d0cf31L, + 0x2cd99e8bL, 0x5bdeae1dL, 0x9b64c2b0L, 0xec63f226L, 0x756aa39cL, + 0x026d930aL, 0x9c0906a9L, 0xeb0e363fL, 0x72076785L, 0x05005713L, + 0x95bf4a82L, 0xe2b87a14L, 0x7bb12baeL, 0x0cb61b38L, 0x92d28e9bL, + 0xe5d5be0dL, 0x7cdcefb7L, 0x0bdbdf21L, 0x86d3d2d4L, 0xf1d4e242L, + 0x68ddb3f8L, 0x1fda836eL, 0x81be16cdL, 0xf6b9265bL, 0x6fb077e1L, + 0x18b74777L, 0x88085ae6L, 0xff0f6a70L, 0x66063bcaL, 0x11010b5cL, + 0x8f659effL, 0xf862ae69L, 0x616bffd3L, 0x166ccf45L, 0xa00ae278L, + 0xd70dd2eeL, 0x4e048354L, 0x3903b3c2L, 0xa7672661L, 0xd06016f7L, + 0x4969474dL, 0x3e6e77dbL, 0xaed16a4aL, 0xd9d65adcL, 0x40df0b66L, + 0x37d83bf0L, 0xa9bcae53L, 0xdebb9ec5L, 0x47b2cf7fL, 0x30b5ffe9L, + 0xbdbdf21cL, 0xcabac28aL, 0x53b39330L, 0x24b4a3a6L, 0xbad03605L, + 0xcdd70693L, 0x54de5729L, 0x23d967bfL, 0xb3667a2eL, 0xc4614ab8L, + 0x5d681b02L, 0x2a6f2b94L, 0xb40bbe37L, 0xc30c8ea1L, 0x5a05df1bL, + 0x2d02ef8dL +}; + +/* Return a 32-bit CRC of the contents of the buffer. */ + +u_int32_t +crc32_buffer(const u_char *s, size_t len) +{ + size_t i; + u_int32_t ret; + + ret = 0; + for (i = 0; i < len; i++) + ret = crc32_tab[(ret ^ s[i]) & 0xff] ^ (ret >> 8); + return ret; +} diff --git a/src/utils_crc32.h b/src/utils_crc32.h new file mode 100644 index 00000000..b16409d1 --- /dev/null +++ b/src/utils_crc32.h @@ -0,0 +1,27 @@ +/** + * collectd - src/utils_crc32.h + * + * Copyright (C) 2014 Pierre-Yves Ritschard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER + * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING + * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * + * Authors: + * Pierre-Yves Ritschard + */ + +#ifndef UTILS_CRC32_H +#define UTILS_CRC32_H 1 + +u_int32_t crc32_buffer(const u_char *, size_t); + +#endif diff --git a/src/write_kafka.c b/src/write_kafka.c new file mode 100644 index 00000000..ec9009eb --- /dev/null +++ b/src/write_kafka.c @@ -0,0 +1,411 @@ +/** + * collectd - src/write_kafka.c + * + * Copyright (C) 2014 Pierre-Yves Ritschard + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER + * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING + * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * + * Authors: + * Pierre-Yves Ritschard + */ + +#include "collectd.h" +#include "plugin.h" +#include "common.h" +#include "configfile.h" +#include "utils_cache.h" +#include "utils_cmd_putval.h" +#include "utils_format_graphite.h" +#include "utils_format_json.h" +#include "utils_crc32.h" +#include "riemann.pb-c.h" + +#include +#include +#include +#include + +struct kafka_topic_context { +#define KAFKA_FORMAT_COMMAND 1 +#define KAFKA_FORMAT_GRAPHITE 2 +#define KAFKA_FORMAT_JSON 3 + u_int8_t format; + unsigned int graphite_flags; + _Bool store_rates; + rd_kafka_topic_conf_t *conf; + rd_kafka_topic_t *topic; + rd_kafka_t *kafka; + int has_key; + u_int32_t key; + char *prefix; + char *postfix; + char escape_char; + char *topic_name; +}; + +static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *); +static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t, + int32_t, void *, void *); + +static int32_t kafka_partition(const rd_kafka_topic_t *rkt, + const void *keydata, size_t keylen, + int32_t partition_cnt, void *p, void *m) +{ + u_int32_t key = *((u_int32_t *)keydata ); + + return key % partition_cnt; +} + +static int kafka_write(const data_set_t *ds, /* {{{ */ + const value_list_t *vl, + user_data_t *ud) +{ + int status = 0; + u_int32_t key; + char buffer[8192]; + size_t bfree = sizeof(buffer); + size_t bfill = 0; + size_t blen = 0; + struct kafka_topic_context *ctx = ud->data; + + if ((ds == NULL) || (vl == NULL) || (ctx == NULL)) + return EINVAL; + + bzero(buffer, sizeof(buffer)); + + switch (ctx->format) { + case KAFKA_FORMAT_COMMAND: + status = create_putval(buffer, sizeof(buffer), ds, vl); + if (status != 0) { + ERROR("write_kafka plugin: create_putval failed with status %i.", + status); + return status; + } + blen = strlen(buffer); + break; + case KAFKA_FORMAT_JSON: + + format_json_initialize(buffer, &bfill, &bfree); + format_json_value_list(buffer, &bfill, &bfree, ds, vl, + ctx->store_rates); + format_json_finalize(buffer, &bfill, &bfree); + blen = strlen(buffer); + break; + case KAFKA_FORMAT_GRAPHITE: + status = format_graphite(buffer, sizeof(buffer), ds, vl, + ctx->prefix, ctx->postfix, ctx->escape_char, + ctx->graphite_flags); + if (status != 0) { + ERROR("write_kafka plugin: format_graphite failed with status %i.", + status); + return status; + } + blen = strlen(buffer); + break; + default: + ERROR("write_kafka plugin: invalid format %i.", ctx->format); + return -1; + } + + /* + * We partition our stream by metric name + */ + if (ctx->has_key) + key = ctx->key; + else + key = rand(); + + rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_COPY, buffer, blen, + &key, sizeof(key), NULL); + + return status; +} /* }}} int kafka_write */ + +static void kafka_topic_context_free(void *p) /* {{{ */ +{ + struct kafka_topic_context *ctx = p; + + if (ctx == NULL) + return; + + if (ctx->topic_name != NULL) + sfree(ctx->topic_name); + if (ctx->topic != NULL) + rd_kafka_topic_destroy(ctx->topic); + if (ctx->conf != NULL) + rd_kafka_topic_conf_destroy(ctx->conf); + + sfree(ctx); +} /* }}} void kafka_topic_context_free */ + +static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ */ +{ + int status; + int i; + struct kafka_topic_context *tctx; + char *key; + char *val; + char callback_name[DATA_MAX_NAME_LEN]; + char errbuf[1024]; + user_data_t ud; + oconfig_item_t *child; + rd_kafka_conf_res_t ret; + + if ((tctx = calloc(1, sizeof (*tctx))) == NULL) { + ERROR ("write_kafka plugin: calloc failed."); + return; + } + + tctx->escape_char = '.'; + tctx->store_rates = 1; + + if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf, + errbuf, sizeof(errbuf))) == NULL) { + sfree(tctx); + ERROR("write_kafka plugin: cannot create kafka handle."); + return; + } + conf = NULL; + + if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) { + rd_kafka_destroy(tctx->kafka); + sfree(tctx); + ERROR ("write_kafka plugin: cannot create topic configuration."); + return; + } + + if (ci->values_num != 1) { + WARNING("kafka topic name needed."); + goto errout; + } + + if (ci->values[0].type != OCONFIG_TYPE_STRING) { + WARNING("kafka topic needs a string argument."); + goto errout; + } + + if ((tctx->topic_name = strdup(ci->values[0].value.string)) == NULL) { + ERROR("write_kafka plugin: cannot copy topic name."); + goto errout; + } + + for (i = 0; i < ci->children_num; i++) { + /* + * The code here could be simplified but makes room + * for easy adding of new options later on. + */ + child = &ci->children[i]; + status = 0; + + if (strcasecmp ("Property", child->key) == 0) { + if (child->values_num != 2) { + WARNING("kafka properties need both a key and a value."); + goto errout; + } + if (child->values[0].type != OCONFIG_TYPE_STRING || + child->values[1].type != OCONFIG_TYPE_STRING) { + WARNING("kafka properties needs string arguments."); + goto errout; + } + key = child->values[0].value.string; + val = child->values[0].value.string; + ret = rd_kafka_topic_conf_set(tctx->conf,key, val, + errbuf, sizeof(errbuf)); + if (ret != RD_KAFKA_CONF_OK) { + WARNING("cannot set kafka topic property %s to %s: %s.", + key, val, errbuf); + goto errout; + } + + } else if (strcasecmp ("Key", child->key) == 0) { + char *tmp_buf = NULL; + status = cf_util_get_string(child, &tmp_buf); + if (status != 0) { + WARNING("write_kafka plugin: invalid key supplied"); + break; + } + + if (strcasecmp(tmp_buf, "Random") != 0) { + tctx->has_key = 1; + tctx->key = crc32_buffer((u_char *)tmp_buf, strlen(tmp_buf)); + } + sfree(tmp_buf); + + } else if (strcasecmp ("Format", child->key) == 0) { + status = cf_util_get_string(child, &key); + if (status != 0) + goto errout; + + assert(key != NULL); + + if (strcasecmp(key, "Command") == 0) { + + tctx->format = KAFKA_FORMAT_COMMAND; + + } else if (strcasecmp(key, "Graphite") == 0) { + tctx->format = KAFKA_FORMAT_GRAPHITE; + + } else if (strcasecmp(key, "Json") == 0) { + tctx->format = KAFKA_FORMAT_JSON; + + } else { + WARNING ("write_kafka plugin: Invalid format string: %s", + key); + } + sfree(key); + + } else if (strcasecmp ("StoreRates", child->key) == 0) { + status = cf_util_get_boolean (child, &tctx->store_rates); + (void) cf_util_get_flag (child, &tctx->graphite_flags, + GRAPHITE_STORE_RATES); + + } else if (strcasecmp ("GraphiteSeparateInstances", child->key) == 0) { + status = cf_util_get_flag (child, &tctx->graphite_flags, + GRAPHITE_SEPARATE_INSTANCES); + + } else if (strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) { + status = cf_util_get_flag (child, &tctx->graphite_flags, + GRAPHITE_ALWAYS_APPEND_DS); + + } else if (strcasecmp ("GraphitePrefix", child->key) == 0) { + status = cf_util_get_string (child, &tctx->prefix); + } else if (strcasecmp ("GraphitePostfix", child->key) == 0) { + status = cf_util_get_string (child, &tctx->postfix); + } else if (strcasecmp ("GraphiteEscapeChar", child->key) == 0) { + char *tmp_buff = NULL; + status = cf_util_get_string (child, &tmp_buff); + if (strlen (tmp_buff) > 1) + WARNING ("write_kafka plugin: The option \"GraphiteEscapeChar\" handles " + "only one character. Others will be ignored."); + tctx->escape_char = tmp_buff[0]; + sfree (tmp_buff); + } else { + WARNING ("write_kafka plugin: Invalid directive: %s.", child->key); + } + + if (status != 0) + break; + } + + rd_kafka_topic_conf_set_partitioner_cb(tctx->conf, kafka_partition); + rd_kafka_topic_conf_set_opaque(tctx->conf, tctx); + + if ((tctx->topic = rd_kafka_topic_new(tctx->kafka, tctx->topic_name, + tctx->conf)) == NULL) { + ERROR("write_kafka plugin: cannot create topic."); + goto errout; + } + tctx->conf = NULL; + + ssnprintf(callback_name, sizeof(callback_name), + "write_kafka/%s", tctx->topic_name); + + ud.data = tctx; + ud.free_func = kafka_topic_context_free; + + status = plugin_register_write (callback_name, kafka_write, &ud); + if (status != 0) { + WARNING ("write_kafka plugin: plugin_register_write (\"%s\") " + "failed with status %i.", + callback_name, status); + goto errout; + } + return; + errout: + if (conf != NULL) + rd_kafka_conf_destroy(conf); + if (tctx->kafka != NULL) + rd_kafka_destroy(tctx->kafka); + if (tctx->topic != NULL) + rd_kafka_topic_destroy(tctx->topic); + if (tctx->topic_name != NULL) + free(tctx->topic_name); + if (tctx->conf != NULL) + rd_kafka_topic_conf_destroy(tctx->conf); + sfree(tctx); +} /* }}} int kafka_config_topic */ + +static int kafka_config(oconfig_item_t *ci) /* {{{ */ +{ + int i; + oconfig_item_t *child; + rd_kafka_conf_t *conf; + rd_kafka_conf_t *cloned; + rd_kafka_conf_res_t ret; + char errbuf[1024]; + + if ((conf = rd_kafka_conf_new()) == NULL) { + WARNING("cannot allocate kafka configuration."); + return -1; + } + + for (i = 0; i < ci->children_num; i++) { + child = &ci->children[i]; + + if (strcasecmp("Topic", child->key) == 0) { + if ((cloned = rd_kafka_conf_dup(conf)) == NULL) { + WARNING("write_kafka plugin: cannot allocate memory for kafka config"); + goto errout; + } + kafka_config_topic (cloned, child); + } else if (strcasecmp(child->key, "Property") == 0) { + char *key = NULL; + char *val = NULL; + + if (child->values_num != 2) { + WARNING("kafka properties need both a key and a value."); + goto errout; + } + if (child->values[0].type != OCONFIG_TYPE_STRING || + child->values[1].type != OCONFIG_TYPE_STRING) { + WARNING("kafka properties needs string arguments."); + goto errout; + } + if ((key = strdup(child->values[0].value.string)) == NULL) { + WARNING("cannot allocate memory for attribute key."); + goto errout; + } + if ((val = strdup(child->values[1].value.string)) == NULL) { + WARNING("cannot allocate memory for attribute value."); + goto errout; + } + ret = rd_kafka_conf_set(conf, key, val, errbuf, sizeof(errbuf)); + if (ret != RD_KAFKA_CONF_OK) { + WARNING("cannot set kafka property %s to %s: %s", + key, val, errbuf); + goto errout; + } + sfree(key); + sfree(val); + } else { + WARNING ("write_kafka plugin: Ignoring unknown " + "configuration option \"%s\" at top level.", + child->key); + } + } + if (conf != NULL) + rd_kafka_conf_destroy(conf); + return (0); + errout: + if (conf != NULL) + rd_kafka_conf_destroy(conf); + return -1; +} /* }}} int kafka_config */ + +void module_register(void) +{ + plugin_register_complex_config ("write_kafka", kafka_config); +} + +/* vim: set sw=8 sts=8 ts=8 noet : */ -- 2.11.0