#include <zlib.h>
struct kafka_topic_context {
+#define KAFKA_FORMAT_JSON 0
#define KAFKA_FORMAT_COMMAND 1
#define KAFKA_FORMAT_GRAPHITE 2
-#define KAFKA_FORMAT_JSON 3
u_int8_t format;
unsigned int graphite_flags;
_Bool store_rates;
static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
int32_t, void *, void *);
+
+#ifdef HAVE_LIBRDKAFKA_LOGGER
static void kafka_log(const rd_kafka_t *, int, const char *, const char *);
static void kafka_log(const rd_kafka_t *rkt, int level,
{
plugin_log(level, "%s", msg);
}
+#endif
static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
const void *keydata, size_t keylen,
int status;
int i;
struct kafka_topic_context *tctx;
- char *key;
+ char *key = NULL;
char *val;
char callback_name[DATA_MAX_NAME_LEN];
char errbuf[1024];
tctx->escape_char = '.';
tctx->store_rates = 1;
+ tctx->format = KAFKA_FORMAT_JSON;
+#ifdef HAVE_LIBRDKAFKA_LOG_CB
rd_kafka_conf_set_log_cb(conf, kafka_log);
+#endif
if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errbuf, sizeof(errbuf))) == NULL) {
sfree(tctx);
ERROR("write_kafka plugin: cannot create kafka handle.");
return;
}
+#ifdef HAVE_LIBRDKAFKA_LOGGER
+ rd_kafka_conf_set_logger(tctx->kafka, kafka_log);
+#endif
conf = NULL;
if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) {
assert(key != NULL);
if (strcasecmp(key, "Command") == 0) {
-
tctx->format = KAFKA_FORMAT_COMMAND;
} else if (strcasecmp(key, "Graphite") == 0) {
WARNING ("write_kafka plugin: Invalid format string: %s",
key);
}
+
sfree(key);
} else if (strcasecmp ("StoreRates", child->key) == 0) {