#include "utils_cmd_putval.h"
#include "utils_format_graphite.h"
#include "utils_format_json.h"
+#include "utils_random.h"
#include <errno.h>
#include <librdkafka/rdkafka.h>
}
#endif
+static rd_kafka_resp_err_t kafka_error() {
+#if RD_KAFKA_VERSION >= 0x000b00ff
+ return rd_kafka_last_error();
+#else
+ return rd_kafka_errno2err(errno);
+#endif
+}
+
static uint32_t kafka_hash(const char *keydata, size_t keylen) {
uint32_t hash = 5381;
for (; keylen > 0; keylen--)
#define KAFKA_RANDOM_KEY_BUFFER \
(char[KAFKA_RANDOM_KEY_SIZE]) { "" }
static char *kafka_random_key(char buffer[static KAFKA_RANDOM_KEY_SIZE]) {
- ssnprintf(buffer, KAFKA_RANDOM_KEY_SIZE, "%08lX", (unsigned long)mrand48());
+ ssnprintf(buffer, KAFKA_RANDOM_KEY_SIZE, "%08" PRIX32, cdrand_u());
return buffer;
}
if ((ctx->topic = rd_kafka_topic_new(ctx->kafka, ctx->topic_name,
topic_conf)) == NULL) {
ERROR("write_kafka plugin: cannot create topic : %s\n",
- rd_kafka_err2str(rd_kafka_errno2err(errno)));
+ rd_kafka_err2str(kafka_error()));
return errno;
}