X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fwrite_kafka.c;h=e1d2f1229a6684a340b4cf3da40b71f9a9ca09c4;hb=3ed71d2774fcacdf0c88b3a6d81bd826a6893b32;hp=ba76d71f6c727e4f1b19a5514e74e1e6eb36926e;hpb=2188e501bf6c31737ad81a8ca0f70aa7e6c8f31b;p=collectd.git diff --git a/src/write_kafka.c b/src/write_kafka.c index ba76d71f..e1d2f122 100644 --- a/src/write_kafka.c +++ b/src/write_kafka.c @@ -76,8 +76,13 @@ static int32_t kafka_partition(const rd_kafka_topic_t *rkt, int32_t partition_cnt, void *p, void *m) { u_int32_t key = *((u_int32_t *)keydata ); + u_int32_t target = key % partition_cnt; + int32_t i = partition_cnt; - return key % partition_cnt; + while (--i > 0 && !rd_kafka_topic_partition_available(rkt, target)) { + target = (target + 1) % partition_cnt; + } + return target; } static int kafka_write(const data_set_t *ds, /* {{{ */