Merge pull request #774 from trenkel/master
[collectd.git] / src / write_kafka.c
index 97db426..3e683c8 100644 (file)
@@ -1,19 +1,24 @@
 /**
  * collectd - src/write_kafka.c
- *
  * Copyright (C) 2014       Pierre-Yves Ritschard
  *
- * Permission to use, copy, modify, and distribute this software for any
- * purpose with or without fee is hereby granted, provided that the above
- * copyright notice and this permission notice appear in all copies.
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
  *
- * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
- * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
- * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
  *
  * Authors:
  *   Pierre-Yves Ritschard <pyr at spootnik.org>
@@ -35,9 +40,9 @@
 #include <zlib.h>
 
 struct kafka_topic_context {
+#define KAFKA_FORMAT_JSON        0
 #define KAFKA_FORMAT_COMMAND     1
 #define KAFKA_FORMAT_GRAPHITE    2
-#define KAFKA_FORMAT_JSON        3
     u_int8_t                     format;
     unsigned int                 graphite_flags;
     _Bool                        store_rates;
@@ -55,6 +60,8 @@ struct kafka_topic_context {
 static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
 static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
                                int32_t, void *, void *);
+
+#ifdef HAVE_LIBRDKAFKA_LOGGER
 static void kafka_log(const rd_kafka_t *, int, const char *, const char *);
 
 static void kafka_log(const rd_kafka_t *rkt, int level,
@@ -62,6 +69,7 @@ static void kafka_log(const rd_kafka_t *rkt, int level,
 {
     plugin_log(level, "%s", msg);
 }
+#endif
 
 static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
                                const void *keydata, size_t keylen,
@@ -160,7 +168,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
     int                          status;
     int                          i;
     struct kafka_topic_context  *tctx;
-    char                        *key;
+    char                        *key = NULL;
     char                        *val;
     char                         callback_name[DATA_MAX_NAME_LEN];
     char                         errbuf[1024];
@@ -175,14 +183,20 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
 
     tctx->escape_char = '.';
     tctx->store_rates = 1;
+    tctx->format = KAFKA_FORMAT_JSON;
 
+#ifdef HAVE_LIBRDKAFKA_LOG_CB
     rd_kafka_conf_set_log_cb(conf, kafka_log);
+#endif
     if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
                                     errbuf, sizeof(errbuf))) == NULL) {
         sfree(tctx);
         ERROR("write_kafka plugin: cannot create kafka handle.");
         return;
     }
+#ifdef HAVE_LIBRDKAFKA_LOGGER
+    rd_kafka_conf_set_logger(tctx->kafka, kafka_log);
+#endif
     conf = NULL;
 
     if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) {
@@ -257,7 +271,6 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
             assert(key != NULL);
 
             if (strcasecmp(key, "Command") == 0) {
-
                 tctx->format = KAFKA_FORMAT_COMMAND;
 
             } else if (strcasecmp(key, "Graphite") == 0) {
@@ -270,6 +283,7 @@ static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{
                 WARNING ("write_kafka plugin: Invalid format string: %s",
                          key);
             }
+
             sfree(key);
 
         } else if (strcasecmp ("StoreRates", child->key) == 0) {