src/Makefile: Don't unnecessarily set plugin specific CXXFLAGS.
[collectd.git] / src / write_kafka.c
index a5977ab..10ae5a5 100644 (file)
@@ -51,8 +51,7 @@ struct kafka_topic_context {
     rd_kafka_topic_t            *topic;
     rd_kafka_conf_t             *kafka_conf;
     rd_kafka_t                  *kafka;
-    int                          has_key;
-    uint32_t                     key;
+    char                        *key;
     char                        *prefix;
     char                        *postfix;
     char                         escape_char;
@@ -115,7 +114,7 @@ static int kafka_handle(struct kafka_topic_context *ctx) /* {{{ */
 
         INFO ("write_kafka plugin: created KAFKA handle : %s", rd_kafka_name(ctx->kafka));
 
-#ifdef HAVE_LIBRDKAFKA_LOGGER
+#if defined(HAVE_LIBRDKAFKA_LOGGER) && !defined(HAVE_LIBRDKAFKA_LOG_CB)
         rd_kafka_set_logger(ctx->kafka, kafka_log);
 #endif
     }
@@ -148,7 +147,8 @@ static int kafka_write(const data_set_t *ds, /* {{{ */
           user_data_t *ud)
 {
     int      status = 0;
-    uint32_t key;
+    void    *key;
+    size_t   keylen = 0;
     char     buffer[8192];
     size_t   bfree = sizeof(buffer);
     size_t   bfill = 0;
@@ -199,17 +199,15 @@ static int kafka_write(const data_set_t *ds, /* {{{ */
         return -1;
     }
 
-    /*
-     * We partition our stream by metric name
-     */
-    if (ctx->has_key)
-        key = ctx->key;
+    key = ctx->key;
+    if (key != NULL)
+        keylen = strlen (key);
     else
-        key = rand();
+        keylen = 0;
 
     rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA,
                      RD_KAFKA_MSG_F_COPY, buffer, blen,
-                     &key, sizeof(key), NULL);
+                     key, keylen, NULL);
 
     return status;
 } /* }}} int kafka_write */
@@ -256,6 +254,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
     tctx->escape_char = '.';
     tctx->store_rates = 1;
     tctx->format = KAFKA_FORMAT_JSON;
+    tctx->key = NULL;
 
     if ((tctx->kafka_conf = rd_kafka_conf_dup(conf)) == NULL) {
         sfree(tctx);
@@ -318,19 +317,8 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
             }
 
         } else if (strcasecmp ("Key", child->key) == 0)  {
-            char *tmp_buf = NULL;
-            status = cf_util_get_string(child, &tmp_buf);
-            if (status != 0) {
-                WARNING("write_kafka plugin: invalid key supplied");
-                break;
-            }
-
-            if (strcasecmp(tmp_buf, "Random") != 0) {
-                tctx->has_key = 1;
-                tctx->key = crc32_buffer((u_char *)tmp_buf, strlen(tmp_buf));
-            }
-            sfree(tmp_buf);
-
+            cf_util_get_string (child, &tctx->key);
+            assert (tctx->key != NULL);
         } else if (strcasecmp ("Format", child->key) == 0) {
             status = cf_util_get_string(child, &key);
             if (status != 0)
@@ -453,12 +441,15 @@ static int kafka_config(oconfig_item_t *ci) /* {{{ */
             }
             if ((val = strdup(child->values[1].value.string)) == NULL) {
                 WARNING("cannot allocate memory for attribute value.");
+                sfree(key);
                 goto errout;
             }
             ret = rd_kafka_conf_set(conf, key, val, errbuf, sizeof(errbuf));
             if (ret != RD_KAFKA_CONF_OK) {
                 WARNING("cannot set kafka property %s to %s: %s",
                         key, val, errbuf);
+                sfree(key);
+                sfree(val);
                 goto errout;
             }
             sfree(key);
@@ -482,4 +473,3 @@ void module_register(void)
 {
     plugin_register_complex_config ("write_kafka", kafka_config);
 }
-