From: Pierre-Yves Ritschard Date: Thu, 31 Jul 2014 13:12:38 +0000 (+0200) Subject: enable logging for older versions of librdkafka X-Git-Tag: collectd-5.5.0~240 X-Git-Url: https://git.octo.it/?a=commitdiff_plain;h=19d5c74dca00487c35b4e103f0d11e2a0d74012e;p=collectd.git enable logging for older versions of librdkafka --- diff --git a/configure.ac b/configure.ac index 6e04af16..2a22c20a 100644 --- a/configure.ac +++ b/configure.ac @@ -3623,7 +3623,8 @@ fi if test "x$with_librdkafka" = "xyes" then AC_CHECK_LIB(rdkafka, rd_kafka_new, [with_librdkafka="yes"], [with_librdkafka="no (Symbol 'rd_kafka_new' not found)"]) - AC_CHECK_LIB(rdkafka, rd_kafka_conf_set_log_cb, [with_librdkafka_log="yes"], [with_librdkafka_log="no (Symbol 'rd_kafka_conf_set_log_cb not found)"]) + AC_CHECK_LIB(rdkafka, rd_kafka_conf_set_log_cb, [with_librdkafka_log_cb="yes"], [with_librdkafka_log_cb="no"]) + AC_CHECK_LIB(rdkafka, rd_kafka_conf_set_logger, [with_librdkafka_logger="yes"], [with_librdkafka_logger="no"]) fi if test "x$with_librdkafka" = "xyes" then @@ -3634,9 +3635,13 @@ then AC_SUBST(BUILD_WITH_LIBRDKAFKA_LDFLAGS) AC_SUBST(BUILD_WITH_LIBRDKAFKA_LIBS) AC_DEFINE(HAVE_LIBRDKAFKA, 1, [Define if librdkafka is present and usable.]) - if test "x$with_librdkafka_log" = "xyes" + if test "x$with_librdkafka_log_cb" = "xyes" then - AC_DEFINE(HAVE_LIBRDKAFKA_LOG, 1, [Define if librdkafka log facility is present and usable.]) + AC_DEFINE(HAVE_LIBRDKAFKA_LOG_CB, 1, [Define if librdkafka log facility is present and usable.]) + fi + if test "x$with_librdkafka_logger" = "xyes" + then + AC_DEFINE(HAVE_LIBRDKAFKA_LOGGER, 1, [Define if librdkafka log facility is present and usable.]) fi fi CPPFLAGS="$SAVE_CPPFLAGS" diff --git a/src/write_kafka.c b/src/write_kafka.c index ff3176dd..2149ff18 100644 --- a/src/write_kafka.c +++ b/src/write_kafka.c @@ -61,7 +61,6 @@ static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *); static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t, int32_t, void *, void *); -#ifdef HAVE_LIBRDKAFKA_LOG static void kafka_log(const rd_kafka_t *, int, const char *, const char *); static void kafka_log(const rd_kafka_t *rkt, int level, @@ -70,8 +69,6 @@ static void kafka_log(const rd_kafka_t *rkt, int level, plugin_log(level, "%s", msg); } -#endif - static int32_t kafka_partition(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *p, void *m) @@ -186,10 +183,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ tctx->store_rates = 1; tctx->format = KAFKA_FORMAT_JSON; -#ifdef HAVE_LIBRDKAFKA_LOG - /* - * Some versions of rdkafka do not allow setting a log callback. - */ +#ifdef HAVE_LIBRDKAFKA_LOG_CB rd_kafka_conf_set_log_cb(conf, kafka_log); #endif if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf, @@ -198,6 +192,9 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ ERROR("write_kafka plugin: cannot create kafka handle."); return; } +#ifdef HAVE_LIBRDKAFKA_LOGGER + rd_kafka_conf_set_logger(tctx->kafka, kafka_log); +#endif conf = NULL; if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) {