Turbostat: downcase plugin name in log messages
[collectd.git] / src / write_kafka.c
index ff3176d..a2947d1 100644 (file)
@@ -61,7 +61,7 @@ static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
 static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
                                int32_t, void *, void *);
 
-#ifdef HAVE_LIBRDKAFKA_LOG
+#if defined HAVE_LIBRDKAFKA_LOGGER || defined HAVE_LIBRDKAFKA_LOG_CB
 static void kafka_log(const rd_kafka_t *, int, const char *, const char *);
 
 static void kafka_log(const rd_kafka_t *rkt, int level,
@@ -69,7 +69,6 @@ static void kafka_log(const rd_kafka_t *rkt, int level,
 {
     plugin_log(level, "%s", msg);
 }
-
 #endif
 
 static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
@@ -77,8 +76,13 @@ static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
                                int32_t partition_cnt, void *p, void *m)
 {
     u_int32_t key = *((u_int32_t *)keydata );
+    u_int32_t target = key % partition_cnt;
+    int32_t   i = partition_cnt;
 
-    return key % partition_cnt;
+    while (--i > 0 && !rd_kafka_topic_partition_available(rkt, target)) {
+        target = (target + 1) % partition_cnt;
+    }
+    return target;
 }
 
 static int kafka_write(const data_set_t *ds, /* {{{ */
@@ -186,10 +190,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
     tctx->store_rates = 1;
     tctx->format = KAFKA_FORMAT_JSON;
 
-#ifdef HAVE_LIBRDKAFKA_LOG
-    /*
-     * Some versions of rdkafka do not allow setting a log callback.
-     */
+#ifdef HAVE_LIBRDKAFKA_LOG_CB
     rd_kafka_conf_set_log_cb(conf, kafka_log);
 #endif
     if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
@@ -198,6 +199,9 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
         ERROR("write_kafka plugin: cannot create kafka handle.");
         return;
     }
+#ifdef HAVE_LIBRDKAFKA_LOGGER
+    rd_kafka_conf_set_logger(tctx->kafka, kafka_log);
+#endif
     conf = NULL;
 
     if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) {
@@ -241,7 +245,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
                 goto errout;
                        }
             key = child->values[0].value.string;
-            val = child->values[0].value.string;
+            val = child->values[1].value.string;
             ret = rd_kafka_topic_conf_set(tctx->conf,key, val,
                                           errbuf, sizeof(errbuf));
             if (ret != RD_KAFKA_CONF_OK) {