static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
int32_t, void *, void *);
+
+#ifdef HAVE_LIBRDKAFKA_LOG
static void kafka_log(const rd_kafka_t *, int, const char *, const char *);
static void kafka_log(const rd_kafka_t *rkt, int level,
plugin_log(level, "%s", msg);
}
+#endif
+
static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
const void *keydata, size_t keylen,
int32_t partition_cnt, void *p, void *m)
tctx->store_rates = 1;
tctx->format = KAFKA_FORMAT_JSON;
+#ifdef HAVE_LIBRDKAFKA_LOG
+ /*
+ * Some versions of rdkafka do not allow setting a log callback.
+ */
rd_kafka_conf_set_log_cb(conf, kafka_log);
+#endif
if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errbuf, sizeof(errbuf))) == NULL) {
sfree(tctx);