X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fwrite_kafka.c;h=04e67b92fc82f407530e8fa088edf2e7ef38a40f;hb=7c9d772c992647fcba64a96800c146eb9f1647f8;hp=b1c8bf94a2a716184797434e6c157ecf6fb6a2d3;hpb=be126043c2be20399d7670fe194645292018bde0;p=collectd.git diff --git a/src/write_kafka.c b/src/write_kafka.c index b1c8bf94..04e67b92 100644 --- a/src/write_kafka.c +++ b/src/write_kafka.c @@ -43,7 +43,7 @@ struct kafka_topic_context { #define KAFKA_FORMAT_GRAPHITE 2 uint8_t format; unsigned int graphite_flags; - _Bool store_rates; + bool store_rates; rd_kafka_topic_conf_t *conf; rd_kafka_topic_t *topic; rd_kafka_conf_t *kafka_conf; @@ -77,6 +77,14 @@ static void kafka_log(const rd_kafka_t *rkt, int level, const char *fac, } #endif +static rd_kafka_resp_err_t kafka_error() { +#if RD_KAFKA_VERSION >= 0x000b00ff + return rd_kafka_last_error(); +#else + return rd_kafka_errno2err(errno); +#endif +} + static uint32_t kafka_hash(const char *keydata, size_t keylen) { uint32_t hash = 5381; for (; keylen > 0; keylen--) @@ -147,7 +155,7 @@ static int kafka_handle(struct kafka_topic_context *ctx) /* {{{ */ if ((ctx->topic = rd_kafka_topic_new(ctx->kafka, ctx->topic_name, topic_conf)) == NULL) { ERROR("write_kafka plugin: cannot create topic : %s\n", - rd_kafka_err2str(rd_kafka_errno2err(errno))); + rd_kafka_err2str(kafka_error())); return errno; } @@ -265,7 +273,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, } tctx->escape_char = '.'; - tctx->store_rates = 1; + tctx->store_rates = true; tctx->format = KAFKA_FORMAT_JSON; tctx->key = NULL; @@ -375,6 +383,10 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, status = cf_util_get_flag(child, &tctx->graphite_flags, GRAPHITE_PRESERVE_SEPARATOR); + } else if (strcasecmp("GraphiteUseTags", child->key) == 0) { + status = + cf_util_get_flag(child, &tctx->graphite_flags, GRAPHITE_USE_TAGS); + } else if (strcasecmp("GraphitePrefix", child->key) == 0) { status = cf_util_get_string(child, &tctx->prefix); } else if (strcasecmp("GraphitePostfix", child->key) == 0) { @@ -399,7 +411,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, rd_kafka_topic_conf_set_opaque(tctx->conf, tctx); snprintf(callback_name, sizeof(callback_name), "write_kafka/%s", - tctx->topic_name); + tctx->topic_name); status = plugin_register_write( callback_name, kafka_write,