write_kafka plugin: Fix support for librdkafka 0.9.0.
[collectd.git] / src / write_kafka.c
1 /**
2  * collectd - src/write_kafka.c
3  * Copyright (C) 2014       Pierre-Yves Ritschard
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Pierre-Yves Ritschard <pyr at spootnik.org>
25  */
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "common.h"
31 #include "configfile.h"
32 #include "utils_cache.h"
33 #include "utils_cmd_putval.h"
34 #include "utils_format_graphite.h"
35 #include "utils_format_json.h"
36 #include "utils_crc32.h"
37
38 #include <stdint.h>
39 #include <librdkafka/rdkafka.h>
40 #include <pthread.h>
41 #include <zlib.h>
42 #include <errno.h>
43
44 struct kafka_topic_context {
45 #define KAFKA_FORMAT_JSON        0
46 #define KAFKA_FORMAT_COMMAND     1
47 #define KAFKA_FORMAT_GRAPHITE    2
48     uint8_t                     format;
49     unsigned int                 graphite_flags;
50     _Bool                        store_rates;
51     rd_kafka_topic_conf_t       *conf;
52     rd_kafka_topic_t            *topic;
53     rd_kafka_conf_t             *kafka_conf;
54     rd_kafka_t                  *kafka;
55     int                          has_key;
56     uint32_t                    key;
57     char                        *prefix;
58     char                        *postfix;
59     char                         escape_char;
60     char                        *topic_name;
61     pthread_mutex_t             lock;
62 };
63
64 static int kafka_handle(struct kafka_topic_context *);
65 static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
66 static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
67                                int32_t, void *, void *);
68
69 /* Version 0.9.0 of librdkafka deprecates rd_kafka_set_logger() in favor of
70  * rd_kafka_conf_set_log_cb(). This is to make sure we're not using the
71  * deprecated function. */
72 #ifdef HAVE_LIBRDKAFKA_LOG_CB
73 # undef HAVE_LIBRDKAFKA_LOGGER
74 #endif
75
76 #if defined(HAVE_LIBRDKAFKA_LOGGER) || defined(HAVE_LIBRDKAFKA_LOG_CB)
77 static void kafka_log(const rd_kafka_t *, int, const char *, const char *);
78
79 static void kafka_log(const rd_kafka_t *rkt, int level,
80                       const char *fac, const char *msg)
81 {
82     plugin_log(level, "%s", msg);
83 }
84 #endif
85
86 static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
87                                const void *keydata, size_t keylen,
88                                int32_t partition_cnt, void *p, void *m)
89 {
90     uint32_t key = *((uint32_t *)keydata );
91     uint32_t target = key % partition_cnt;
92     int32_t   i = partition_cnt;
93
94     while (--i > 0 && !rd_kafka_topic_partition_available(rkt, target)) {
95         target = (target + 1) % partition_cnt;
96     }
97     return target;
98 }
99
100 static int kafka_handle(struct kafka_topic_context *ctx) /* {{{ */
101 {
102     char                         errbuf[1024];
103     rd_kafka_conf_t             *conf;
104     rd_kafka_topic_conf_t       *topic_conf;
105
106     if (ctx->kafka != NULL && ctx->topic != NULL)
107         return(0);
108
109     if (ctx->kafka == NULL) {
110         if ((conf = rd_kafka_conf_dup(ctx->kafka_conf)) == NULL) {
111             ERROR("write_kafka plugin: cannot duplicate kafka config");
112             return(1);
113         }
114
115         if ((ctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
116                                     errbuf, sizeof(errbuf))) == NULL) {
117                 ERROR("write_kafka plugin: cannot create kafka handle.");
118                 return 1;
119         }
120
121         rd_kafka_conf_destroy(ctx->kafka_conf);
122         ctx->kafka_conf = NULL;
123
124         INFO ("write_kafka plugin: created KAFKA handle : %s", rd_kafka_name(ctx->kafka));
125
126 #ifdef HAVE_LIBRDKAFKA_LOGGER
127         rd_kafka_set_logger(ctx->kafka, kafka_log);
128 #endif
129     }
130
131     if (ctx->topic == NULL ) {
132         if ((topic_conf = rd_kafka_topic_conf_dup(ctx->conf)) == NULL) {
133             ERROR("write_kafka plugin: cannot duplicate kafka topic config");
134             return 1;
135         }
136
137         if ((ctx->topic = rd_kafka_topic_new(ctx->kafka, ctx->topic_name,
138                                                 topic_conf)) == NULL) {
139                 ERROR("write_kafka plugin: cannot create topic : %s\n", 
140                         rd_kafka_err2str(rd_kafka_errno2err(errno)));
141                 return errno;
142         }
143
144         rd_kafka_topic_conf_destroy(ctx->conf);
145         ctx->conf = NULL;
146
147         INFO ("write_kafka plugin: handle created for topic : %s", rd_kafka_topic_name(ctx->topic));
148     }
149
150     return(0);
151
152 } /* }}} int kafka_handle */
153
154 static int kafka_write(const data_set_t *ds, /* {{{ */
155               const value_list_t *vl,
156               user_data_t *ud)
157 {
158         int                      status = 0;
159     uint32_t    key;
160     char         buffer[8192];
161     size_t bfree = sizeof(buffer);
162     size_t bfill = 0;
163     size_t blen = 0;
164         struct kafka_topic_context      *ctx = ud->data;
165
166     if ((ds == NULL) || (vl == NULL) || (ctx == NULL))
167         return EINVAL;
168
169     pthread_mutex_lock (&ctx->lock);
170     status = kafka_handle(ctx);
171     pthread_mutex_unlock (&ctx->lock);
172     if( status != 0 )
173         return status;
174
175     bzero(buffer, sizeof(buffer));
176
177     switch (ctx->format) {
178     case KAFKA_FORMAT_COMMAND:
179         status = create_putval(buffer, sizeof(buffer), ds, vl);
180         if (status != 0) {
181             ERROR("write_kafka plugin: create_putval failed with status %i.",
182                   status);
183             return status;
184         }
185         blen = strlen(buffer);
186         break;
187     case KAFKA_FORMAT_JSON:
188
189         format_json_initialize(buffer, &bfill, &bfree);
190         format_json_value_list(buffer, &bfill, &bfree, ds, vl,
191                                ctx->store_rates);
192         format_json_finalize(buffer, &bfill, &bfree);
193         blen = strlen(buffer);
194         break;
195     case KAFKA_FORMAT_GRAPHITE:
196         status = format_graphite(buffer, sizeof(buffer), ds, vl,
197                                  ctx->prefix, ctx->postfix, ctx->escape_char,
198                                  ctx->graphite_flags);
199         if (status != 0) {
200             ERROR("write_kafka plugin: format_graphite failed with status %i.",
201                   status);
202             return status;
203         }
204         blen = strlen(buffer);
205         break;
206     default:
207         ERROR("write_kafka plugin: invalid format %i.", ctx->format);
208         return -1;
209     }
210
211     /*
212      * We partition our stream by metric name
213      */
214     if (ctx->has_key)
215         key = ctx->key;
216     else
217         key = rand();
218
219     rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA,
220                      RD_KAFKA_MSG_F_COPY, buffer, blen,
221                      &key, sizeof(key), NULL);
222
223         return status;
224 } /* }}} int kafka_write */
225
226 static void kafka_topic_context_free(void *p) /* {{{ */
227 {
228         struct kafka_topic_context *ctx = p;
229
230         if (ctx == NULL)
231                 return;
232
233     if (ctx->topic_name != NULL)
234         sfree(ctx->topic_name);
235     if (ctx->topic != NULL)
236         rd_kafka_topic_destroy(ctx->topic);
237     if (ctx->conf != NULL)
238         rd_kafka_topic_conf_destroy(ctx->conf);
239     if (ctx->kafka_conf != NULL)
240         rd_kafka_conf_destroy(ctx->kafka_conf);
241     if (ctx->kafka != NULL)
242         rd_kafka_destroy(ctx->kafka);
243
244     sfree(ctx);
245 } /* }}} void kafka_topic_context_free */
246
247 static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ */
248 {
249     int                          status;
250     int                          i;
251     struct kafka_topic_context  *tctx;
252     char                        *key = NULL;
253     char                        *val;
254     char                         callback_name[DATA_MAX_NAME_LEN];
255     char                         errbuf[1024];
256     user_data_t                  ud;
257         oconfig_item_t              *child;
258     rd_kafka_conf_res_t          ret;
259
260         if ((tctx = calloc(1, sizeof (*tctx))) == NULL) {
261                 ERROR ("write_kafka plugin: calloc failed.");
262         return;
263         }
264
265     tctx->escape_char = '.';
266     tctx->store_rates = 1;
267     tctx->format = KAFKA_FORMAT_JSON;
268
269     if ((tctx->kafka_conf = rd_kafka_conf_dup(conf)) == NULL) {
270         sfree(tctx);
271         ERROR("write_kafka plugin: cannot allocate memory for kafka config");
272         return;
273     }
274
275 #ifdef HAVE_LIBRDKAFKA_LOG_CB
276     rd_kafka_conf_set_log_cb(tctx->kafka_conf, kafka_log);
277 #endif
278
279     if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) {
280         rd_kafka_conf_destroy(tctx->kafka_conf);
281         sfree(tctx);
282         ERROR ("write_kafka plugin: cannot create topic configuration.");
283         return;
284     }
285
286     if (ci->values_num != 1) {
287         WARNING("kafka topic name needed.");
288         goto errout;
289     }
290
291     if (ci->values[0].type != OCONFIG_TYPE_STRING) {
292         WARNING("kafka topic needs a string argument.");
293         goto errout;
294     }
295
296     if ((tctx->topic_name = strdup(ci->values[0].value.string)) == NULL) {
297         ERROR("write_kafka plugin: cannot copy topic name.");
298         goto errout;
299     }
300
301         for (i = 0; i < ci->children_num; i++) {
302                 /*
303                  * The code here could be simplified but makes room
304                  * for easy adding of new options later on.
305                  */
306                 child = &ci->children[i];
307                 status = 0;
308
309                 if (strcasecmp ("Property", child->key) == 0) {
310                         if (child->values_num != 2) {
311                                 WARNING("kafka properties need both a key and a value.");
312                 goto errout;
313                         }
314                         if (child->values[0].type != OCONFIG_TYPE_STRING ||
315                             child->values[1].type != OCONFIG_TYPE_STRING) {
316                                 WARNING("kafka properties needs string arguments.");
317                 goto errout;
318                         }
319             key = child->values[0].value.string;
320             val = child->values[1].value.string;
321             ret = rd_kafka_topic_conf_set(tctx->conf,key, val,
322                                           errbuf, sizeof(errbuf));
323             if (ret != RD_KAFKA_CONF_OK) {
324                                 WARNING("cannot set kafka topic property %s to %s: %s.",
325                         key, val, errbuf);
326                 goto errout;
327                         }
328
329         } else if (strcasecmp ("Key", child->key) == 0)  {
330             char *tmp_buf = NULL;
331             status = cf_util_get_string(child, &tmp_buf);
332             if (status != 0) {
333                 WARNING("write_kafka plugin: invalid key supplied");
334                 break;
335             }
336
337             if (strcasecmp(tmp_buf, "Random") != 0) {
338                 tctx->has_key = 1;
339                 tctx->key = crc32_buffer((u_char *)tmp_buf, strlen(tmp_buf));
340             }
341             sfree(tmp_buf);
342
343         } else if (strcasecmp ("Format", child->key) == 0) {
344             status = cf_util_get_string(child, &key);
345             if (status != 0)
346                 goto errout;
347
348             assert(key != NULL);
349
350             if (strcasecmp(key, "Command") == 0) {
351                 tctx->format = KAFKA_FORMAT_COMMAND;
352
353             } else if (strcasecmp(key, "Graphite") == 0) {
354                 tctx->format = KAFKA_FORMAT_GRAPHITE;
355
356             } else if (strcasecmp(key, "Json") == 0) {
357                 tctx->format = KAFKA_FORMAT_JSON;
358
359             } else {
360                 WARNING ("write_kafka plugin: Invalid format string: %s",
361                          key);
362             }
363
364             sfree(key);
365
366         } else if (strcasecmp ("StoreRates", child->key) == 0) {
367             status = cf_util_get_boolean (child, &tctx->store_rates);
368             (void) cf_util_get_flag (child, &tctx->graphite_flags,
369                                      GRAPHITE_STORE_RATES);
370
371         } else if (strcasecmp ("GraphiteSeparateInstances", child->key) == 0) {
372             status = cf_util_get_flag (child, &tctx->graphite_flags,
373                                        GRAPHITE_SEPARATE_INSTANCES);
374
375         } else if (strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) {
376             status = cf_util_get_flag (child, &tctx->graphite_flags,
377                                        GRAPHITE_ALWAYS_APPEND_DS);
378
379         } else if (strcasecmp ("GraphitePrefix", child->key) == 0) {
380             status = cf_util_get_string (child, &tctx->prefix);
381         } else if (strcasecmp ("GraphitePostfix", child->key) == 0) {
382             status = cf_util_get_string (child, &tctx->postfix);
383         } else if (strcasecmp ("GraphiteEscapeChar", child->key) == 0) {
384             char *tmp_buff = NULL;
385             status = cf_util_get_string (child, &tmp_buff);
386             if (strlen (tmp_buff) > 1)
387                 WARNING ("write_kafka plugin: The option \"GraphiteEscapeChar\" handles "
388                         "only one character. Others will be ignored.");
389             tctx->escape_char = tmp_buff[0];
390             sfree (tmp_buff);
391         } else {
392             WARNING ("write_kafka plugin: Invalid directive: %s.", child->key);
393         }
394
395         if (status != 0)
396             break;
397     }
398
399     rd_kafka_topic_conf_set_partitioner_cb(tctx->conf, kafka_partition);
400     rd_kafka_topic_conf_set_opaque(tctx->conf, tctx);
401
402     ssnprintf(callback_name, sizeof(callback_name),
403               "write_kafka/%s", tctx->topic_name);
404
405     ud.data = tctx;
406     ud.free_func = kafka_topic_context_free;
407
408         status = plugin_register_write (callback_name, kafka_write, &ud);
409         if (status != 0) {
410                 WARNING ("write_kafka plugin: plugin_register_write (\"%s\") "
411                                 "failed with status %i.",
412                                 callback_name, status);
413         goto errout;
414     }
415
416     pthread_mutex_init (&tctx->lock, /* attr = */ NULL);
417
418     return;
419  errout:
420     if (tctx->topic_name != NULL)
421         free(tctx->topic_name);
422     if (tctx->conf != NULL)
423         rd_kafka_topic_conf_destroy(tctx->conf);
424     if (tctx->kafka_conf != NULL)
425                 rd_kafka_conf_destroy(tctx->kafka_conf);
426     sfree(tctx);
427 } /* }}} int kafka_config_topic */
428
429 static int kafka_config(oconfig_item_t *ci) /* {{{ */
430 {
431         int                          i;
432         oconfig_item_t              *child;
433     rd_kafka_conf_t             *conf;
434     rd_kafka_conf_res_t          ret;
435     char                         errbuf[1024];
436
437     if ((conf = rd_kafka_conf_new()) == NULL) {
438         WARNING("cannot allocate kafka configuration.");
439         return -1;
440     }
441         for (i = 0; i < ci->children_num; i++)  {
442                 child = &ci->children[i];
443
444                 if (strcasecmp("Topic", child->key) == 0) {
445                         kafka_config_topic (conf, child);
446                 } else if (strcasecmp(child->key, "Property") == 0) {
447                         char *key = NULL;
448                         char *val = NULL;
449
450                         if (child->values_num != 2) {
451                                 WARNING("kafka properties need both a key and a value.");
452                 goto errout;
453                         }
454                         if (child->values[0].type != OCONFIG_TYPE_STRING ||
455                             child->values[1].type != OCONFIG_TYPE_STRING) {
456                                 WARNING("kafka properties needs string arguments.");
457                 goto errout;
458                         }
459                         if ((key = strdup(child->values[0].value.string)) == NULL) {
460                                 WARNING("cannot allocate memory for attribute key.");
461                 goto errout;
462                         }
463                         if ((val = strdup(child->values[1].value.string)) == NULL) {
464                                 WARNING("cannot allocate memory for attribute value.");
465                 goto errout;
466                         }
467             ret = rd_kafka_conf_set(conf, key, val, errbuf, sizeof(errbuf));
468             if (ret != RD_KAFKA_CONF_OK) {
469                 WARNING("cannot set kafka property %s to %s: %s",
470                         key, val, errbuf);
471                 goto errout;
472             }
473                         sfree(key);
474                         sfree(val);
475                 } else {
476                         WARNING ("write_kafka plugin: Ignoring unknown "
477                                  "configuration option \"%s\" at top level.",
478                                  child->key);
479                 }
480         }
481     if (conf != NULL)
482         rd_kafka_conf_destroy(conf);
483     return (0);
484  errout:
485     if (conf != NULL)
486         rd_kafka_conf_destroy(conf);
487     return -1;
488 } /* }}} int kafka_config */
489
490 void module_register(void)
491 {
492         plugin_register_complex_config ("write_kafka", kafka_config);
493 }
494
495 /* vim: set sw=8 sts=8 ts=8 noet : */