rd_kafka_topic_t *topic;
rd_kafka_conf_t *kafka_conf;
rd_kafka_t *kafka;
- int has_key;
- uint32_t key;
+ char *key;
char *prefix;
char *postfix;
char escape_char;
INFO ("write_kafka plugin: created KAFKA handle : %s", rd_kafka_name(ctx->kafka));
-#ifdef HAVE_LIBRDKAFKA_LOGGER
+#if defined(HAVE_LIBRDKAFKA_LOGGER) && !defined(HAVE_LIBRDKAFKA_LOG_CB)
rd_kafka_set_logger(ctx->kafka, kafka_log);
#endif
}
user_data_t *ud)
{
int status = 0;
- uint32_t key;
+ void *key;
+ size_t keylen = 0;
char buffer[8192];
size_t bfree = sizeof(buffer);
size_t bfill = 0;
return -1;
}
- /*
- * We partition our stream by metric name
- */
- if (ctx->has_key)
- key = ctx->key;
+ key = ctx->key;
+ if (key != NULL)
+ keylen = strlen (key);
else
- key = rand();
+ keylen = 0;
rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY, buffer, blen,
- &key, sizeof(key), NULL);
+ key, keylen, NULL);
return status;
} /* }}} int kafka_write */
tctx->escape_char = '.';
tctx->store_rates = 1;
tctx->format = KAFKA_FORMAT_JSON;
+ tctx->key = NULL;
if ((tctx->kafka_conf = rd_kafka_conf_dup(conf)) == NULL) {
sfree(tctx);
}
} else if (strcasecmp ("Key", child->key) == 0) {
- char *tmp_buf = NULL;
- status = cf_util_get_string(child, &tmp_buf);
- if (status != 0) {
- WARNING("write_kafka plugin: invalid key supplied");
- break;
- }
-
- if (strcasecmp(tmp_buf, "Random") != 0) {
- tctx->has_key = 1;
- tctx->key = crc32_buffer((u_char *)tmp_buf, strlen(tmp_buf));
- }
- sfree(tmp_buf);
-
+ cf_util_get_string (child, &tctx->key);
+ assert (tctx->key != NULL);
} else if (strcasecmp ("Format", child->key) == 0) {
status = cf_util_get_string(child, &key);
if (status != 0)
{
plugin_register_complex_config ("write_kafka", kafka_config);
}
-