X-Git-Url: https://git.octo.it/?a=blobdiff_plain;ds=sidebyside;f=src%2Fwrite_kafka.c;h=a2947d15b34fc84eedac2332a315128bf6a27c6b;hb=b63efaaa45f332044cda735e17d463d6e5b7e7a0;hp=3e683c8432c1911f381a9f3e67deeba48c1aff99;hpb=f2a6ed9f5064319ddc27292ab784e135bc955289;p=collectd.git diff --git a/src/write_kafka.c b/src/write_kafka.c index 3e683c84..a2947d15 100644 --- a/src/write_kafka.c +++ b/src/write_kafka.c @@ -61,7 +61,7 @@ static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *); static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t, int32_t, void *, void *); -#ifdef HAVE_LIBRDKAFKA_LOGGER +#if defined HAVE_LIBRDKAFKA_LOGGER || defined HAVE_LIBRDKAFKA_LOG_CB static void kafka_log(const rd_kafka_t *, int, const char *, const char *); static void kafka_log(const rd_kafka_t *rkt, int level, @@ -76,8 +76,13 @@ static int32_t kafka_partition(const rd_kafka_topic_t *rkt, int32_t partition_cnt, void *p, void *m) { u_int32_t key = *((u_int32_t *)keydata ); + u_int32_t target = key % partition_cnt; + int32_t i = partition_cnt; - return key % partition_cnt; + while (--i > 0 && !rd_kafka_topic_partition_available(rkt, target)) { + target = (target + 1) % partition_cnt; + } + return target; } static int kafka_write(const data_set_t *ds, /* {{{ */ @@ -240,7 +245,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ goto errout; } key = child->values[0].value.string; - val = child->values[0].value.string; + val = child->values[1].value.string; ret = rd_kafka_topic_conf_set(tctx->conf,key, val, errbuf, sizeof(errbuf)); if (ret != RD_KAFKA_CONF_OK) {