X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fwrite_kafka.c;h=e1d2f1229a6684a340b4cf3da40b71f9a9ca09c4;hb=789b73ad5ce9a1f3d3b255be593813fbe814f9f1;hp=b74fe97ddc6ecde6d50bd3d94c981234191516b5;hpb=9679723b9f177055ef93d0b1bbb420cca7ac6ea4;p=collectd.git diff --git a/src/write_kafka.c b/src/write_kafka.c index b74fe97d..e1d2f122 100644 --- a/src/write_kafka.c +++ b/src/write_kafka.c @@ -60,6 +60,8 @@ struct kafka_topic_context { static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *); static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t, int32_t, void *, void *); + +#if defined HAVE_LIBRDKAFKA_LOGGER || defined HAVE_LIBRDKAFKA_LOG_CB static void kafka_log(const rd_kafka_t *, int, const char *, const char *); static void kafka_log(const rd_kafka_t *rkt, int level, @@ -67,14 +69,20 @@ static void kafka_log(const rd_kafka_t *rkt, int level, { plugin_log(level, "%s", msg); } +#endif static int32_t kafka_partition(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *p, void *m) { u_int32_t key = *((u_int32_t *)keydata ); + u_int32_t target = key % partition_cnt; + int32_t i = partition_cnt; - return key % partition_cnt; + while (--i > 0 && !rd_kafka_topic_partition_available(rkt, target)) { + target = (target + 1) % partition_cnt; + } + return target; } static int kafka_write(const data_set_t *ds, /* {{{ */ @@ -182,13 +190,18 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ tctx->store_rates = 1; tctx->format = KAFKA_FORMAT_JSON; +#ifdef HAVE_LIBRDKAFKA_LOG_CB rd_kafka_conf_set_log_cb(conf, kafka_log); +#endif if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errbuf, sizeof(errbuf))) == NULL) { sfree(tctx); ERROR("write_kafka plugin: cannot create kafka handle."); return; } +#ifdef HAVE_LIBRDKAFKA_LOGGER + rd_kafka_conf_set_logger(tctx->kafka, kafka_log); +#endif conf = NULL; if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) {