Merge pull request #1743 from rubenk/apcups-coverity
[collectd.git] / src / write_kafka.c
index 736fddb..e881593 100644 (file)
@@ -36,7 +36,6 @@
 
 #include <stdint.h>
 #include <librdkafka/rdkafka.h>
-#include <pthread.h>
 #include <zlib.h>
 #include <errno.h>
 
@@ -51,8 +50,7 @@ struct kafka_topic_context {
     rd_kafka_topic_t            *topic;
     rd_kafka_conf_t             *kafka_conf;
     rd_kafka_t                  *kafka;
-    int                          has_key;
-    uint32_t                     key;
+    char                        *key;
     char                        *prefix;
     char                        *postfix;
     char                         escape_char;
@@ -115,7 +113,7 @@ static int kafka_handle(struct kafka_topic_context *ctx) /* {{{ */
 
         INFO ("write_kafka plugin: created KAFKA handle : %s", rd_kafka_name(ctx->kafka));
 
-#ifdef HAVE_LIBRDKAFKA_LOGGER
+#if defined(HAVE_LIBRDKAFKA_LOGGER) && !defined(HAVE_LIBRDKAFKA_LOG_CB)
         rd_kafka_set_logger(ctx->kafka, kafka_log);
 #endif
     }
@@ -148,7 +146,8 @@ static int kafka_write(const data_set_t *ds, /* {{{ */
           user_data_t *ud)
 {
     int      status = 0;
-    uint32_t key;
+    void    *key;
+    size_t   keylen = 0;
     char     buffer[8192];
     size_t   bfree = sizeof(buffer);
     size_t   bfill = 0;
@@ -199,17 +198,15 @@ static int kafka_write(const data_set_t *ds, /* {{{ */
         return -1;
     }
 
-    /*
-     * We partition our stream by metric name
-     */
-    if (ctx->has_key)
-        key = ctx->key;
+    key = ctx->key;
+    if (key != NULL)
+        keylen = strlen (key);
     else
-        key = rand();
+        keylen = 0;
 
     rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA,
                      RD_KAFKA_MSG_F_COPY, buffer, blen,
-                     &key, sizeof(key), NULL);
+                     key, keylen, NULL);
 
     return status;
 } /* }}} int kafka_write */
@@ -256,6 +253,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
     tctx->escape_char = '.';
     tctx->store_rates = 1;
     tctx->format = KAFKA_FORMAT_JSON;
+    tctx->key = NULL;
 
     if ((tctx->kafka_conf = rd_kafka_conf_dup(conf)) == NULL) {
         sfree(tctx);
@@ -318,19 +316,8 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
             }
 
         } else if (strcasecmp ("Key", child->key) == 0)  {
-            char *tmp_buf = NULL;
-            status = cf_util_get_string(child, &tmp_buf);
-            if (status != 0) {
-                WARNING("write_kafka plugin: invalid key supplied");
-                break;
-            }
-
-            if (strcasecmp(tmp_buf, "Random") != 0) {
-                tctx->has_key = 1;
-                tctx->key = crc32_buffer((u_char *)tmp_buf, strlen(tmp_buf));
-            }
-            sfree(tmp_buf);
-
+            cf_util_get_string (child, &tctx->key);
+            assert (tctx->key != NULL);
         } else if (strcasecmp ("Format", child->key) == 0) {
             status = cf_util_get_string(child, &key);
             if (status != 0)
@@ -485,4 +472,3 @@ void module_register(void)
 {
     plugin_register_complex_config ("write_kafka", kafka_config);
 }
-