X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fwrite_kafka.c;h=eaaf171262bd0b06ef07e0ddb8e32056bb9d873b;hb=f0481f16ff200ef93c6ea4487a7bc3a59c8f52cb;hp=b6e89618ec9ddbf51738556171ace8b278944130;hpb=f664b944f774e4d1e5d5c562eaf0bb207c6a7edf;p=collectd.git diff --git a/src/write_kafka.c b/src/write_kafka.c index b6e89618..eaaf1712 100644 --- a/src/write_kafka.c +++ b/src/write_kafka.c @@ -25,6 +25,7 @@ */ #include "collectd.h" + #include "plugin.h" #include "common.h" #include "configfile.h" @@ -34,7 +35,7 @@ #include "utils_format_json.h" #include "utils_crc32.h" -#include +#include #include #include #include @@ -44,7 +45,7 @@ struct kafka_topic_context { #define KAFKA_FORMAT_JSON 0 #define KAFKA_FORMAT_COMMAND 1 #define KAFKA_FORMAT_GRAPHITE 2 - u_int8_t format; + uint8_t format; unsigned int graphite_flags; _Bool store_rates; rd_kafka_topic_conf_t *conf; @@ -52,7 +53,7 @@ struct kafka_topic_context { rd_kafka_conf_t *kafka_conf; rd_kafka_t *kafka; int has_key; - u_int32_t key; + uint32_t key; char *prefix; char *postfix; char escape_char; @@ -65,7 +66,14 @@ static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *); static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t, int32_t, void *, void *); -#if defined HAVE_LIBRDKAFKA_LOGGER || defined HAVE_LIBRDKAFKA_LOG_CB +/* Version 0.9.0 of librdkafka deprecates rd_kafka_set_logger() in favor of + * rd_kafka_conf_set_log_cb(). This is to make sure we're not using the + * deprecated function. */ +#ifdef HAVE_LIBRDKAFKA_LOG_CB +# undef HAVE_LIBRDKAFKA_LOGGER +#endif + +#if defined(HAVE_LIBRDKAFKA_LOGGER) || defined(HAVE_LIBRDKAFKA_LOG_CB) static void kafka_log(const rd_kafka_t *, int, const char *, const char *); static void kafka_log(const rd_kafka_t *rkt, int level, @@ -79,8 +87,8 @@ static int32_t kafka_partition(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *p, void *m) { - u_int32_t key = *((u_int32_t *)keydata ); - u_int32_t target = key % partition_cnt; + uint32_t key = *((uint32_t *)keydata ); + uint32_t target = key % partition_cnt; int32_t i = partition_cnt; while (--i > 0 && !rd_kafka_topic_partition_available(rkt, target)) { @@ -148,7 +156,7 @@ static int kafka_write(const data_set_t *ds, /* {{{ */ user_data_t *ud) { int status = 0; - u_int32_t key; + uint32_t key; char buffer[8192]; size_t bfree = sizeof(buffer); size_t bfill = 0; @@ -472,7 +480,7 @@ static int kafka_config(oconfig_item_t *ci) /* {{{ */ } if (conf != NULL) rd_kafka_conf_destroy(conf); - return (0); + return (0); errout: if (conf != NULL) rd_kafka_conf_destroy(conf);