Move utils_parse_option.[ch] out of the daemon/ directory.
[collectd.git] / src / write_kafka.c
index b74fe97..3e683c8 100644 (file)
@@ -60,6 +60,8 @@ struct kafka_topic_context {
 static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
 static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
                                int32_t, void *, void *);
+
+#ifdef HAVE_LIBRDKAFKA_LOGGER
 static void kafka_log(const rd_kafka_t *, int, const char *, const char *);
 
 static void kafka_log(const rd_kafka_t *rkt, int level,
@@ -67,6 +69,7 @@ static void kafka_log(const rd_kafka_t *rkt, int level,
 {
     plugin_log(level, "%s", msg);
 }
+#endif
 
 static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
                                const void *keydata, size_t keylen,
@@ -182,13 +185,18 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
     tctx->store_rates = 1;
     tctx->format = KAFKA_FORMAT_JSON;
 
+#ifdef HAVE_LIBRDKAFKA_LOG_CB
     rd_kafka_conf_set_log_cb(conf, kafka_log);
+#endif
     if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
                                     errbuf, sizeof(errbuf))) == NULL) {
         sfree(tctx);
         ERROR("write_kafka plugin: cannot create kafka handle.");
         return;
     }
+#ifdef HAVE_LIBRDKAFKA_LOGGER
+    rd_kafka_conf_set_logger(tctx->kafka, kafka_log);
+#endif
     conf = NULL;
 
     if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) {