conditionally use rd_kafka_conf_set_log_cb, fixes #686
authorPierre-Yves Ritschard <pyr@spootnik.org>
Thu, 31 Jul 2014 07:58:06 +0000 (09:58 +0200)
committerPierre-Yves Ritschard <pyr@spootnik.org>
Thu, 31 Jul 2014 07:58:06 +0000 (09:58 +0200)
configure.ac
src/write_kafka.c

index 59f0571..6e04af1 100644 (file)
@@ -3623,6 +3623,7 @@ fi
 if test "x$with_librdkafka" = "xyes"
 then
        AC_CHECK_LIB(rdkafka, rd_kafka_new, [with_librdkafka="yes"], [with_librdkafka="no (Symbol 'rd_kafka_new' not found)"])
+  AC_CHECK_LIB(rdkafka, rd_kafka_conf_set_log_cb, [with_librdkafka_log="yes"], [with_librdkafka_log="no (Symbol 'rd_kafka_conf_set_log_cb not found)"])
 fi
 if test "x$with_librdkafka" = "xyes"
 then
@@ -3633,6 +3634,10 @@ then
        AC_SUBST(BUILD_WITH_LIBRDKAFKA_LDFLAGS)
        AC_SUBST(BUILD_WITH_LIBRDKAFKA_LIBS)
        AC_DEFINE(HAVE_LIBRDKAFKA, 1, [Define if librdkafka is present and usable.])
+  if test "x$with_librdkafka_log" = "xyes"
+  then
+        AC_DEFINE(HAVE_LIBRDKAFKA_LOG, 1, [Define if librdkafka log facility is present and usable.])
+  fi
 fi
 CPPFLAGS="$SAVE_CPPFLAGS"
 LDFLAGS="$SAVE_LDFLAGS"
index b74fe97..ff3176d 100644 (file)
@@ -60,6 +60,8 @@ struct kafka_topic_context {
 static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
 static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
                                int32_t, void *, void *);
+
+#ifdef HAVE_LIBRDKAFKA_LOG
 static void kafka_log(const rd_kafka_t *, int, const char *, const char *);
 
 static void kafka_log(const rd_kafka_t *rkt, int level,
@@ -68,6 +70,8 @@ static void kafka_log(const rd_kafka_t *rkt, int level,
     plugin_log(level, "%s", msg);
 }
 
+#endif
+
 static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
                                const void *keydata, size_t keylen,
                                int32_t partition_cnt, void *p, void *m)
@@ -182,7 +186,12 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
     tctx->store_rates = 1;
     tctx->format = KAFKA_FORMAT_JSON;
 
+#ifdef HAVE_LIBRDKAFKA_LOG
+    /*
+     * Some versions of rdkafka do not allow setting a log callback.
+     */
     rd_kafka_conf_set_log_cb(conf, kafka_log);
+#endif
     if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
                                     errbuf, sizeof(errbuf))) == NULL) {
         sfree(tctx);