-#include "utils_cmd_putval.h"
-#include "utils_format_graphite.h"
-#include "utils_format_json.h"
+#include "utils/cmds/putval.h"
+#include "utils/common/common.h"
+#include "utils/format_graphite/format_graphite.h"
+#include "utils/format_json/format_json.h"
static uint32_t kafka_hash(const char *keydata, size_t keylen) {
uint32_t hash = 5381;
for (; keylen > 0; keylen--)
static uint32_t kafka_hash(const char *keydata, size_t keylen) {
uint32_t hash = 5381;
for (; keylen > 0; keylen--)
#define KAFKA_RANDOM_KEY_BUFFER \
(char[KAFKA_RANDOM_KEY_SIZE]) { "" }
static char *kafka_random_key(char buffer[static KAFKA_RANDOM_KEY_SIZE]) {
#define KAFKA_RANDOM_KEY_BUFFER \
(char[KAFKA_RANDOM_KEY_SIZE]) { "" }
static char *kafka_random_key(char buffer[static KAFKA_RANDOM_KEY_SIZE]) {
- snprintf(buffer, KAFKA_RANDOM_KEY_SIZE, "%08" PRIX32, cdrand_u());
+ ssnprintf(buffer, KAFKA_RANDOM_KEY_SIZE, "%08" PRIX32, cdrand_u());
if ((ctx->topic = rd_kafka_topic_new(ctx->kafka, ctx->topic_name,
topic_conf)) == NULL) {
ERROR("write_kafka plugin: cannot create topic : %s\n",
if ((ctx->topic = rd_kafka_topic_new(ctx->kafka, ctx->topic_name,
topic_conf)) == NULL) {
ERROR("write_kafka plugin: cannot create topic : %s\n",
status = cf_util_get_flag(child, &tctx->graphite_flags,
GRAPHITE_PRESERVE_SEPARATOR);
status = cf_util_get_flag(child, &tctx->graphite_flags,
GRAPHITE_PRESERVE_SEPARATOR);
} else if (strcasecmp("GraphitePrefix", child->key) == 0) {
status = cf_util_get_string(child, &tctx->prefix);
} else if (strcasecmp("GraphitePostfix", child->key) == 0) {
} else if (strcasecmp("GraphitePrefix", child->key) == 0) {
status = cf_util_get_string(child, &tctx->prefix);
} else if (strcasecmp("GraphitePostfix", child->key) == 0) {
rd_kafka_topic_conf_set_partitioner_cb(tctx->conf, kafka_partition);
rd_kafka_topic_conf_set_opaque(tctx->conf, tctx);
rd_kafka_topic_conf_set_partitioner_cb(tctx->conf, kafka_partition);
rd_kafka_topic_conf_set_opaque(tctx->conf, tctx);
- snprintf(callback_name, sizeof(callback_name), "write_kafka/%s",
- tctx->topic_name);
+ ssnprintf(callback_name, sizeof(callback_name), "write_kafka/%s",
+ tctx->topic_name);
status = plugin_register_write(
callback_name, kafka_write,
status = plugin_register_write(
callback_name, kafka_write,