#include "collectd.h"
-#include "common.h"
#include "plugin.h"
-#include "utils_cmd_putval.h"
-#include "utils_format_graphite.h"
-#include "utils_format_json.h"
+#include "utils/cmds/putval.h"
+#include "utils/common/common.h"
+#include "utils/format_graphite/format_graphite.h"
+#include "utils/format_json/format_json.h"
+#include "utils_random.h"
#include <errno.h>
#include <librdkafka/rdkafka.h>
#define KAFKA_FORMAT_GRAPHITE 2
uint8_t format;
unsigned int graphite_flags;
- _Bool store_rates;
+ bool store_rates;
rd_kafka_topic_conf_t *conf;
rd_kafka_topic_t *topic;
rd_kafka_conf_t *kafka_conf;
}
#endif
+static rd_kafka_resp_err_t kafka_error() {
+#if RD_KAFKA_VERSION >= 0x000b00ff
+ return rd_kafka_last_error();
+#else
+ return rd_kafka_errno2err(errno);
+#endif
+}
+
static uint32_t kafka_hash(const char *keydata, size_t keylen) {
uint32_t hash = 5381;
for (; keylen > 0; keylen--)
#define KAFKA_RANDOM_KEY_BUFFER \
(char[KAFKA_RANDOM_KEY_SIZE]) { "" }
static char *kafka_random_key(char buffer[static KAFKA_RANDOM_KEY_SIZE]) {
- ssnprintf(buffer, KAFKA_RANDOM_KEY_SIZE, "%08lX", (unsigned long)mrand48());
+ snprintf(buffer, KAFKA_RANDOM_KEY_SIZE, "%08" PRIX32, cdrand_u());
return buffer;
}
rd_kafka_topic_conf_t *topic_conf;
if (ctx->kafka != NULL && ctx->topic != NULL)
- return (0);
+ return 0;
if (ctx->kafka == NULL) {
if ((conf = rd_kafka_conf_dup(ctx->kafka_conf)) == NULL) {
ERROR("write_kafka plugin: cannot duplicate kafka config");
- return (1);
+ return 1;
}
if ((ctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errbuf,
if ((ctx->topic = rd_kafka_topic_new(ctx->kafka, ctx->topic_name,
topic_conf)) == NULL) {
ERROR("write_kafka plugin: cannot create topic : %s\n",
- rd_kafka_err2str(rd_kafka_errno2err(errno)));
+ rd_kafka_err2str(kafka_error()));
return errno;
}
rd_kafka_topic_name(ctx->topic));
}
- return (0);
+ return 0;
} /* }}} int kafka_handle */
}
tctx->escape_char = '.';
- tctx->store_rates = 1;
+ tctx->store_rates = true;
tctx->format = KAFKA_FORMAT_JSON;
tctx->key = NULL;
status = cf_util_get_flag(child, &tctx->graphite_flags,
GRAPHITE_PRESERVE_SEPARATOR);
+ } else if (strcasecmp("GraphiteUseTags", child->key) == 0) {
+ status =
+ cf_util_get_flag(child, &tctx->graphite_flags, GRAPHITE_USE_TAGS);
+
} else if (strcasecmp("GraphitePrefix", child->key) == 0) {
status = cf_util_get_string(child, &tctx->prefix);
} else if (strcasecmp("GraphitePostfix", child->key) == 0) {
rd_kafka_topic_conf_set_partitioner_cb(tctx->conf, kafka_partition);
rd_kafka_topic_conf_set_opaque(tctx->conf, tctx);
- ssnprintf(callback_name, sizeof(callback_name), "write_kafka/%s",
- tctx->topic_name);
+ snprintf(callback_name, sizeof(callback_name), "write_kafka/%s",
+ tctx->topic_name);
status = plugin_register_write(
callback_name, kafka_write,
}
if (conf != NULL)
rd_kafka_conf_destroy(conf);
- return (0);
+ return 0;
errout:
if (conf != NULL)
rd_kafka_conf_destroy(conf);