Add a write_kafka output with similar properties to the amqp one.
[collectd.git] / src / write_kafka.c
1 /**
2  * collectd - src/write_kafka.c
3  *
4  * Copyright (C) 2014       Pierre-Yves Ritschard
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
15  * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
16  * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  *
18  * Authors:
19  *   Pierre-Yves Ritschard <pyr at spootnik.org>
20  */
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "configfile.h"
26 #include "utils_cache.h"
27 #include "utils_cmd_putval.h"
28 #include "utils_format_graphite.h"
29 #include "utils_format_json.h"
30 #include "utils_crc32.h"
31 #include "riemann.pb-c.h"
32
33 #include <sys/types.h>
34 #include <librdkafka/rdkafka.h>
35 #include <pthread.h>
36 #include <zlib.h>
37
38 struct kafka_topic_context {
39 #define KAFKA_FORMAT_COMMAND     1
40 #define KAFKA_FORMAT_GRAPHITE    2
41 #define KAFKA_FORMAT_JSON        3
42     u_int8_t                     format;
43     unsigned int                 graphite_flags;
44     _Bool                        store_rates;
45     rd_kafka_topic_conf_t       *conf;
46     rd_kafka_topic_t            *topic;
47     rd_kafka_t                  *kafka;
48     int                          has_key;
49     u_int32_t                    key;
50     char                        *prefix;
51     char                        *postfix;
52     char                         escape_char;
53     char                        *topic_name;
54 };
55
56 static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
57 static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
58                                int32_t, void *, void *);
59
60 static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
61                                const void *keydata, size_t keylen,
62                                int32_t partition_cnt, void *p, void *m)
63 {
64     u_int32_t key = *((u_int32_t *)keydata );
65
66     return key % partition_cnt;
67 }
68
69 static int kafka_write(const data_set_t *ds, /* {{{ */
70               const value_list_t *vl,
71               user_data_t *ud)
72 {
73         int                      status = 0;
74     u_int32_t    key;
75     char         buffer[8192];
76     size_t bfree = sizeof(buffer);
77     size_t bfill = 0;
78     size_t blen = 0;
79         struct kafka_topic_context      *ctx = ud->data;
80
81     if ((ds == NULL) || (vl == NULL) || (ctx == NULL))
82         return EINVAL;
83
84     bzero(buffer, sizeof(buffer));
85
86     switch (ctx->format) {
87     case KAFKA_FORMAT_COMMAND:
88         status = create_putval(buffer, sizeof(buffer), ds, vl);
89         if (status != 0) {
90             ERROR("write_kafka plugin: create_putval failed with status %i.",
91                   status);
92             return status;
93         }
94         blen = strlen(buffer);
95         break;
96     case KAFKA_FORMAT_JSON:
97
98         format_json_initialize(buffer, &bfill, &bfree);
99         format_json_value_list(buffer, &bfill, &bfree, ds, vl,
100                                ctx->store_rates);
101         format_json_finalize(buffer, &bfill, &bfree);
102         blen = strlen(buffer);
103         break;
104     case KAFKA_FORMAT_GRAPHITE:
105         status = format_graphite(buffer, sizeof(buffer), ds, vl,
106                                  ctx->prefix, ctx->postfix, ctx->escape_char,
107                                  ctx->graphite_flags);
108         if (status != 0) {
109             ERROR("write_kafka plugin: format_graphite failed with status %i.",
110                   status);
111             return status;
112         }
113         blen = strlen(buffer);
114         break;
115     default:
116         ERROR("write_kafka plugin: invalid format %i.", ctx->format);
117         return -1;
118     }
119
120     /*
121      * We partition our stream by metric name
122      */
123     if (ctx->has_key)
124         key = ctx->key;
125     else
126         key = rand();
127
128     rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA,
129                      RD_KAFKA_MSG_F_COPY, buffer, blen,
130                      &key, sizeof(key), NULL);
131
132         return status;
133 } /* }}} int kafka_write */
134
135 static void kafka_topic_context_free(void *p) /* {{{ */
136 {
137         struct kafka_topic_context *ctx = p;
138
139         if (ctx == NULL)
140                 return;
141
142     if (ctx->topic_name != NULL)
143         sfree(ctx->topic_name);
144     if (ctx->topic != NULL)
145         rd_kafka_topic_destroy(ctx->topic);
146     if (ctx->conf != NULL)
147         rd_kafka_topic_conf_destroy(ctx->conf);
148
149     sfree(ctx);
150 } /* }}} void kafka_topic_context_free */
151
152 static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ */
153 {
154     int                          status;
155     int                          i;
156     struct kafka_topic_context  *tctx;
157     char                        *key;
158     char                        *val;
159     char                         callback_name[DATA_MAX_NAME_LEN];
160     char                         errbuf[1024];
161     user_data_t                  ud;
162         oconfig_item_t              *child;
163     rd_kafka_conf_res_t          ret;
164
165         if ((tctx = calloc(1, sizeof (*tctx))) == NULL) {
166                 ERROR ("write_kafka plugin: calloc failed.");
167         return;
168         }
169
170     tctx->escape_char = '.';
171     tctx->store_rates = 1;
172
173     if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
174                                     errbuf, sizeof(errbuf))) == NULL) {
175         sfree(tctx);
176         ERROR("write_kafka plugin: cannot create kafka handle.");
177         return;
178     }
179     conf = NULL;
180
181     if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) {
182         rd_kafka_destroy(tctx->kafka);
183         sfree(tctx);
184         ERROR ("write_kafka plugin: cannot create topic configuration.");
185         return;
186     }
187
188     if (ci->values_num != 1) {
189         WARNING("kafka topic name needed.");
190         goto errout;
191     }
192
193     if (ci->values[0].type != OCONFIG_TYPE_STRING) {
194         WARNING("kafka topic needs a string argument.");
195         goto errout;
196     }
197
198     if ((tctx->topic_name = strdup(ci->values[0].value.string)) == NULL) {
199         ERROR("write_kafka plugin: cannot copy topic name.");
200         goto errout;
201     }
202
203         for (i = 0; i < ci->children_num; i++) {
204                 /*
205                  * The code here could be simplified but makes room
206                  * for easy adding of new options later on.
207                  */
208                 child = &ci->children[i];
209                 status = 0;
210
211                 if (strcasecmp ("Property", child->key) == 0) {
212                         if (child->values_num != 2) {
213                                 WARNING("kafka properties need both a key and a value.");
214                 goto errout;
215                         }
216                         if (child->values[0].type != OCONFIG_TYPE_STRING ||
217                             child->values[1].type != OCONFIG_TYPE_STRING) {
218                                 WARNING("kafka properties needs string arguments.");
219                 goto errout;
220                         }
221             key = child->values[0].value.string;
222             val = child->values[0].value.string;
223             ret = rd_kafka_topic_conf_set(tctx->conf,key, val,
224                                           errbuf, sizeof(errbuf));
225             if (ret != RD_KAFKA_CONF_OK) {
226                                 WARNING("cannot set kafka topic property %s to %s: %s.",
227                         key, val, errbuf);
228                 goto errout;
229                         }
230
231         } else if (strcasecmp ("Key", child->key) == 0)  {
232             char *tmp_buf = NULL;
233             status = cf_util_get_string(child, &tmp_buf);
234             if (status != 0) {
235                 WARNING("write_kafka plugin: invalid key supplied");
236                 break;
237             }
238
239             if (strcasecmp(tmp_buf, "Random") != 0) {
240                 tctx->has_key = 1;
241                 tctx->key = crc32_buffer((u_char *)tmp_buf, strlen(tmp_buf));
242             }
243             sfree(tmp_buf);
244
245         } else if (strcasecmp ("Format", child->key) == 0) {
246             status = cf_util_get_string(child, &key);
247             if (status != 0)
248                 goto errout;
249
250             assert(key != NULL);
251
252             if (strcasecmp(key, "Command") == 0) {
253
254                 tctx->format = KAFKA_FORMAT_COMMAND;
255
256             } else if (strcasecmp(key, "Graphite") == 0) {
257                 tctx->format = KAFKA_FORMAT_GRAPHITE;
258
259             } else if (strcasecmp(key, "Json") == 0) {
260                 tctx->format = KAFKA_FORMAT_JSON;
261
262             } else {
263                 WARNING ("write_kafka plugin: Invalid format string: %s",
264                          key);
265             }
266             sfree(key);
267
268         } else if (strcasecmp ("StoreRates", child->key) == 0) {
269             status = cf_util_get_boolean (child, &tctx->store_rates);
270             (void) cf_util_get_flag (child, &tctx->graphite_flags,
271                                      GRAPHITE_STORE_RATES);
272
273         } else if (strcasecmp ("GraphiteSeparateInstances", child->key) == 0) {
274             status = cf_util_get_flag (child, &tctx->graphite_flags,
275                                        GRAPHITE_SEPARATE_INSTANCES);
276
277         } else if (strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) {
278             status = cf_util_get_flag (child, &tctx->graphite_flags,
279                                        GRAPHITE_ALWAYS_APPEND_DS);
280
281         } else if (strcasecmp ("GraphitePrefix", child->key) == 0) {
282             status = cf_util_get_string (child, &tctx->prefix);
283         } else if (strcasecmp ("GraphitePostfix", child->key) == 0) {
284             status = cf_util_get_string (child, &tctx->postfix);
285         } else if (strcasecmp ("GraphiteEscapeChar", child->key) == 0) {
286             char *tmp_buff = NULL;
287             status = cf_util_get_string (child, &tmp_buff);
288             if (strlen (tmp_buff) > 1)
289                 WARNING ("write_kafka plugin: The option \"GraphiteEscapeChar\" handles "
290                         "only one character. Others will be ignored.");
291             tctx->escape_char = tmp_buff[0];
292             sfree (tmp_buff);
293         } else {
294             WARNING ("write_kafka plugin: Invalid directive: %s.", child->key);
295         }
296
297         if (status != 0)
298             break;
299     }
300
301     rd_kafka_topic_conf_set_partitioner_cb(tctx->conf, kafka_partition);
302     rd_kafka_topic_conf_set_opaque(tctx->conf, tctx);
303
304     if ((tctx->topic = rd_kafka_topic_new(tctx->kafka, tctx->topic_name,
305                                        tctx->conf)) == NULL) {
306         ERROR("write_kafka plugin: cannot create topic.");
307         goto errout;
308     }
309     tctx->conf = NULL;
310
311     ssnprintf(callback_name, sizeof(callback_name),
312               "write_kafka/%s", tctx->topic_name);
313
314     ud.data = tctx;
315     ud.free_func = kafka_topic_context_free;
316
317         status = plugin_register_write (callback_name, kafka_write, &ud);
318         if (status != 0) {
319                 WARNING ("write_kafka plugin: plugin_register_write (\"%s\") "
320                                 "failed with status %i.",
321                                 callback_name, status);
322         goto errout;
323     }
324     return;
325  errout:
326     if (conf != NULL)
327         rd_kafka_conf_destroy(conf);
328     if (tctx->kafka != NULL)
329         rd_kafka_destroy(tctx->kafka);
330     if (tctx->topic != NULL)
331         rd_kafka_topic_destroy(tctx->topic);
332     if (tctx->topic_name != NULL)
333         free(tctx->topic_name);
334     if (tctx->conf != NULL)
335         rd_kafka_topic_conf_destroy(tctx->conf);
336     sfree(tctx);
337 } /* }}} int kafka_config_topic */
338
339 static int kafka_config(oconfig_item_t *ci) /* {{{ */
340 {
341         int                          i;
342         oconfig_item_t              *child;
343     rd_kafka_conf_t             *conf;
344     rd_kafka_conf_t             *cloned;
345     rd_kafka_conf_res_t          ret;
346     char                         errbuf[1024];
347
348     if ((conf = rd_kafka_conf_new()) == NULL) {
349         WARNING("cannot allocate kafka configuration.");
350         return -1;
351     }
352
353         for (i = 0; i < ci->children_num; i++)  {
354                 child = &ci->children[i];
355
356                 if (strcasecmp("Topic", child->key) == 0) {
357             if ((cloned = rd_kafka_conf_dup(conf)) == NULL) {
358                 WARNING("write_kafka plugin: cannot allocate memory for kafka config");
359                 goto errout;
360             }
361                         kafka_config_topic (cloned, child);
362                 } else if (strcasecmp(child->key, "Property") == 0) {
363                         char *key = NULL;
364                         char *val = NULL;
365
366                         if (child->values_num != 2) {
367                                 WARNING("kafka properties need both a key and a value.");
368                 goto errout;
369                         }
370                         if (child->values[0].type != OCONFIG_TYPE_STRING ||
371                             child->values[1].type != OCONFIG_TYPE_STRING) {
372                                 WARNING("kafka properties needs string arguments.");
373                 goto errout;
374                         }
375                         if ((key = strdup(child->values[0].value.string)) == NULL) {
376                                 WARNING("cannot allocate memory for attribute key.");
377                 goto errout;
378                         }
379                         if ((val = strdup(child->values[1].value.string)) == NULL) {
380                                 WARNING("cannot allocate memory for attribute value.");
381                 goto errout;
382                         }
383             ret = rd_kafka_conf_set(conf, key, val, errbuf, sizeof(errbuf));
384             if (ret != RD_KAFKA_CONF_OK) {
385                 WARNING("cannot set kafka property %s to %s: %s",
386                         key, val, errbuf);
387                 goto errout;
388             }
389                         sfree(key);
390                         sfree(val);
391                 } else {
392                         WARNING ("write_kafka plugin: Ignoring unknown "
393                                  "configuration option \"%s\" at top level.",
394                                  child->key);
395                 }
396         }
397     if (conf != NULL)
398         rd_kafka_conf_destroy(conf);
399         return (0);
400  errout:
401     if (conf != NULL)
402         rd_kafka_conf_destroy(conf);
403     return -1;
404 } /* }}} int kafka_config */
405
406 void module_register(void)
407 {
408         plugin_register_complex_config ("write_kafka", kafka_config);
409 }
410
411 /* vim: set sw=8 sts=8 ts=8 noet : */