X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fwrite_kafka.c;h=3e683c8432c1911f381a9f3e67deeba48c1aff99;hb=7cea19815ba24735e91dde1c08a889960b299b62;hp=97db42657055999fa114cd1c70ca9f882ba4325c;hpb=5ff0ee586df3ae4668f105057a5b8e6fc183f1a0;p=collectd.git diff --git a/src/write_kafka.c b/src/write_kafka.c index 97db4265..3e683c84 100644 --- a/src/write_kafka.c +++ b/src/write_kafka.c @@ -1,19 +1,24 @@ /** * collectd - src/write_kafka.c - * * Copyright (C) 2014 Pierre-Yves Ritschard * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER - * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING - * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. * * Authors: * Pierre-Yves Ritschard @@ -35,9 +40,9 @@ #include struct kafka_topic_context { +#define KAFKA_FORMAT_JSON 0 #define KAFKA_FORMAT_COMMAND 1 #define KAFKA_FORMAT_GRAPHITE 2 -#define KAFKA_FORMAT_JSON 3 u_int8_t format; unsigned int graphite_flags; _Bool store_rates; @@ -55,6 +60,8 @@ struct kafka_topic_context { static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *); static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t, int32_t, void *, void *); + +#ifdef HAVE_LIBRDKAFKA_LOGGER static void kafka_log(const rd_kafka_t *, int, const char *, const char *); static void kafka_log(const rd_kafka_t *rkt, int level, @@ -62,6 +69,7 @@ static void kafka_log(const rd_kafka_t *rkt, int level, { plugin_log(level, "%s", msg); } +#endif static int32_t kafka_partition(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, @@ -160,7 +168,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ int status; int i; struct kafka_topic_context *tctx; - char *key; + char *key = NULL; char *val; char callback_name[DATA_MAX_NAME_LEN]; char errbuf[1024]; @@ -175,14 +183,20 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ tctx->escape_char = '.'; tctx->store_rates = 1; + tctx->format = KAFKA_FORMAT_JSON; +#ifdef HAVE_LIBRDKAFKA_LOG_CB rd_kafka_conf_set_log_cb(conf, kafka_log); +#endif if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errbuf, sizeof(errbuf))) == NULL) { sfree(tctx); ERROR("write_kafka plugin: cannot create kafka handle."); return; } +#ifdef HAVE_LIBRDKAFKA_LOGGER + rd_kafka_conf_set_logger(tctx->kafka, kafka_log); +#endif conf = NULL; if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) { @@ -257,7 +271,6 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ assert(key != NULL); if (strcasecmp(key, "Command") == 0) { - tctx->format = KAFKA_FORMAT_COMMAND; } else if (strcasecmp(key, "Graphite") == 0) { @@ -270,6 +283,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ WARNING ("write_kafka plugin: Invalid format string: %s", key); } + sfree(key); } else if (strcasecmp ("StoreRates", child->key) == 0) {