remove unneeded include
[collectd.git] / src / write_kafka.c
1 /**
2  * collectd - src/write_kafka.c
3  *
4  * Copyright (C) 2014       Pierre-Yves Ritschard
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
15  * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
16  * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  *
18  * Authors:
19  *   Pierre-Yves Ritschard <pyr at spootnik.org>
20  */
21
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "configfile.h"
26 #include "utils_cache.h"
27 #include "utils_cmd_putval.h"
28 #include "utils_format_graphite.h"
29 #include "utils_format_json.h"
30 #include "utils_crc32.h"
31
32 #include <sys/types.h>
33 #include <librdkafka/rdkafka.h>
34 #include <pthread.h>
35 #include <zlib.h>
36
37 struct kafka_topic_context {
38 #define KAFKA_FORMAT_COMMAND     1
39 #define KAFKA_FORMAT_GRAPHITE    2
40 #define KAFKA_FORMAT_JSON        3
41     u_int8_t                     format;
42     unsigned int                 graphite_flags;
43     _Bool                        store_rates;
44     rd_kafka_topic_conf_t       *conf;
45     rd_kafka_topic_t            *topic;
46     rd_kafka_t                  *kafka;
47     int                          has_key;
48     u_int32_t                    key;
49     char                        *prefix;
50     char                        *postfix;
51     char                         escape_char;
52     char                        *topic_name;
53 };
54
55 static int kafka_write(const data_set_t *, const value_list_t *, user_data_t *);
56 static int32_t kafka_partition(const rd_kafka_topic_t *, const void *, size_t,
57                                int32_t, void *, void *);
58
59 static int32_t kafka_partition(const rd_kafka_topic_t *rkt,
60                                const void *keydata, size_t keylen,
61                                int32_t partition_cnt, void *p, void *m)
62 {
63     u_int32_t key = *((u_int32_t *)keydata );
64
65     return key % partition_cnt;
66 }
67
68 static int kafka_write(const data_set_t *ds, /* {{{ */
69               const value_list_t *vl,
70               user_data_t *ud)
71 {
72         int                      status = 0;
73     u_int32_t    key;
74     char         buffer[8192];
75     size_t bfree = sizeof(buffer);
76     size_t bfill = 0;
77     size_t blen = 0;
78         struct kafka_topic_context      *ctx = ud->data;
79
80     if ((ds == NULL) || (vl == NULL) || (ctx == NULL))
81         return EINVAL;
82
83     bzero(buffer, sizeof(buffer));
84
85     switch (ctx->format) {
86     case KAFKA_FORMAT_COMMAND:
87         status = create_putval(buffer, sizeof(buffer), ds, vl);
88         if (status != 0) {
89             ERROR("write_kafka plugin: create_putval failed with status %i.",
90                   status);
91             return status;
92         }
93         blen = strlen(buffer);
94         break;
95     case KAFKA_FORMAT_JSON:
96
97         format_json_initialize(buffer, &bfill, &bfree);
98         format_json_value_list(buffer, &bfill, &bfree, ds, vl,
99                                ctx->store_rates);
100         format_json_finalize(buffer, &bfill, &bfree);
101         blen = strlen(buffer);
102         break;
103     case KAFKA_FORMAT_GRAPHITE:
104         status = format_graphite(buffer, sizeof(buffer), ds, vl,
105                                  ctx->prefix, ctx->postfix, ctx->escape_char,
106                                  ctx->graphite_flags);
107         if (status != 0) {
108             ERROR("write_kafka plugin: format_graphite failed with status %i.",
109                   status);
110             return status;
111         }
112         blen = strlen(buffer);
113         break;
114     default:
115         ERROR("write_kafka plugin: invalid format %i.", ctx->format);
116         return -1;
117     }
118
119     /*
120      * We partition our stream by metric name
121      */
122     if (ctx->has_key)
123         key = ctx->key;
124     else
125         key = rand();
126
127     rd_kafka_produce(ctx->topic, RD_KAFKA_PARTITION_UA,
128                      RD_KAFKA_MSG_F_COPY, buffer, blen,
129                      &key, sizeof(key), NULL);
130
131         return status;
132 } /* }}} int kafka_write */
133
134 static void kafka_topic_context_free(void *p) /* {{{ */
135 {
136         struct kafka_topic_context *ctx = p;
137
138         if (ctx == NULL)
139                 return;
140
141     if (ctx->topic_name != NULL)
142         sfree(ctx->topic_name);
143     if (ctx->topic != NULL)
144         rd_kafka_topic_destroy(ctx->topic);
145     if (ctx->conf != NULL)
146         rd_kafka_topic_conf_destroy(ctx->conf);
147
148     sfree(ctx);
149 } /* }}} void kafka_topic_context_free */
150
151 static void kafka_config_topic(rd_kafka_conf_t *conf, oconfig_item_t *ci) /* {{{ */
152 {
153     int                          status;
154     int                          i;
155     struct kafka_topic_context  *tctx;
156     char                        *key;
157     char                        *val;
158     char                         callback_name[DATA_MAX_NAME_LEN];
159     char                         errbuf[1024];
160     user_data_t                  ud;
161         oconfig_item_t              *child;
162     rd_kafka_conf_res_t          ret;
163
164         if ((tctx = calloc(1, sizeof (*tctx))) == NULL) {
165                 ERROR ("write_kafka plugin: calloc failed.");
166         return;
167         }
168
169     tctx->escape_char = '.';
170     tctx->store_rates = 1;
171
172     if ((tctx->kafka = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
173                                     errbuf, sizeof(errbuf))) == NULL) {
174         sfree(tctx);
175         ERROR("write_kafka plugin: cannot create kafka handle.");
176         return;
177     }
178     conf = NULL;
179
180     if ((tctx->conf = rd_kafka_topic_conf_new()) == NULL) {
181         rd_kafka_destroy(tctx->kafka);
182         sfree(tctx);
183         ERROR ("write_kafka plugin: cannot create topic configuration.");
184         return;
185     }
186
187     if (ci->values_num != 1) {
188         WARNING("kafka topic name needed.");
189         goto errout;
190     }
191
192     if (ci->values[0].type != OCONFIG_TYPE_STRING) {
193         WARNING("kafka topic needs a string argument.");
194         goto errout;
195     }
196
197     if ((tctx->topic_name = strdup(ci->values[0].value.string)) == NULL) {
198         ERROR("write_kafka plugin: cannot copy topic name.");
199         goto errout;
200     }
201
202         for (i = 0; i < ci->children_num; i++) {
203                 /*
204                  * The code here could be simplified but makes room
205                  * for easy adding of new options later on.
206                  */
207                 child = &ci->children[i];
208                 status = 0;
209
210                 if (strcasecmp ("Property", child->key) == 0) {
211                         if (child->values_num != 2) {
212                                 WARNING("kafka properties need both a key and a value.");
213                 goto errout;
214                         }
215                         if (child->values[0].type != OCONFIG_TYPE_STRING ||
216                             child->values[1].type != OCONFIG_TYPE_STRING) {
217                                 WARNING("kafka properties needs string arguments.");
218                 goto errout;
219                         }
220             key = child->values[0].value.string;
221             val = child->values[0].value.string;
222             ret = rd_kafka_topic_conf_set(tctx->conf,key, val,
223                                           errbuf, sizeof(errbuf));
224             if (ret != RD_KAFKA_CONF_OK) {
225                                 WARNING("cannot set kafka topic property %s to %s: %s.",
226                         key, val, errbuf);
227                 goto errout;
228                         }
229
230         } else if (strcasecmp ("Key", child->key) == 0)  {
231             char *tmp_buf = NULL;
232             status = cf_util_get_string(child, &tmp_buf);
233             if (status != 0) {
234                 WARNING("write_kafka plugin: invalid key supplied");
235                 break;
236             }
237
238             if (strcasecmp(tmp_buf, "Random") != 0) {
239                 tctx->has_key = 1;
240                 tctx->key = crc32_buffer((u_char *)tmp_buf, strlen(tmp_buf));
241             }
242             sfree(tmp_buf);
243
244         } else if (strcasecmp ("Format", child->key) == 0) {
245             status = cf_util_get_string(child, &key);
246             if (status != 0)
247                 goto errout;
248
249             assert(key != NULL);
250
251             if (strcasecmp(key, "Command") == 0) {
252
253                 tctx->format = KAFKA_FORMAT_COMMAND;
254
255             } else if (strcasecmp(key, "Graphite") == 0) {
256                 tctx->format = KAFKA_FORMAT_GRAPHITE;
257
258             } else if (strcasecmp(key, "Json") == 0) {
259                 tctx->format = KAFKA_FORMAT_JSON;
260
261             } else {
262                 WARNING ("write_kafka plugin: Invalid format string: %s",
263                          key);
264             }
265             sfree(key);
266
267         } else if (strcasecmp ("StoreRates", child->key) == 0) {
268             status = cf_util_get_boolean (child, &tctx->store_rates);
269             (void) cf_util_get_flag (child, &tctx->graphite_flags,
270                                      GRAPHITE_STORE_RATES);
271
272         } else if (strcasecmp ("GraphiteSeparateInstances", child->key) == 0) {
273             status = cf_util_get_flag (child, &tctx->graphite_flags,
274                                        GRAPHITE_SEPARATE_INSTANCES);
275
276         } else if (strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) {
277             status = cf_util_get_flag (child, &tctx->graphite_flags,
278                                        GRAPHITE_ALWAYS_APPEND_DS);
279
280         } else if (strcasecmp ("GraphitePrefix", child->key) == 0) {
281             status = cf_util_get_string (child, &tctx->prefix);
282         } else if (strcasecmp ("GraphitePostfix", child->key) == 0) {
283             status = cf_util_get_string (child, &tctx->postfix);
284         } else if (strcasecmp ("GraphiteEscapeChar", child->key) == 0) {
285             char *tmp_buff = NULL;
286             status = cf_util_get_string (child, &tmp_buff);
287             if (strlen (tmp_buff) > 1)
288                 WARNING ("write_kafka plugin: The option \"GraphiteEscapeChar\" handles "
289                         "only one character. Others will be ignored.");
290             tctx->escape_char = tmp_buff[0];
291             sfree (tmp_buff);
292         } else {
293             WARNING ("write_kafka plugin: Invalid directive: %s.", child->key);
294         }
295
296         if (status != 0)
297             break;
298     }
299
300     rd_kafka_topic_conf_set_partitioner_cb(tctx->conf, kafka_partition);
301     rd_kafka_topic_conf_set_opaque(tctx->conf, tctx);
302
303     if ((tctx->topic = rd_kafka_topic_new(tctx->kafka, tctx->topic_name,
304                                        tctx->conf)) == NULL) {
305         ERROR("write_kafka plugin: cannot create topic.");
306         goto errout;
307     }
308     tctx->conf = NULL;
309
310     ssnprintf(callback_name, sizeof(callback_name),
311               "write_kafka/%s", tctx->topic_name);
312
313     ud.data = tctx;
314     ud.free_func = kafka_topic_context_free;
315
316         status = plugin_register_write (callback_name, kafka_write, &ud);
317         if (status != 0) {
318                 WARNING ("write_kafka plugin: plugin_register_write (\"%s\") "
319                                 "failed with status %i.",
320                                 callback_name, status);
321         goto errout;
322     }
323     return;
324  errout:
325     if (conf != NULL)
326         rd_kafka_conf_destroy(conf);
327     if (tctx->kafka != NULL)
328         rd_kafka_destroy(tctx->kafka);
329     if (tctx->topic != NULL)
330         rd_kafka_topic_destroy(tctx->topic);
331     if (tctx->topic_name != NULL)
332         free(tctx->topic_name);
333     if (tctx->conf != NULL)
334         rd_kafka_topic_conf_destroy(tctx->conf);
335     sfree(tctx);
336 } /* }}} int kafka_config_topic */
337
338 static int kafka_config(oconfig_item_t *ci) /* {{{ */
339 {
340         int                          i;
341         oconfig_item_t              *child;
342     rd_kafka_conf_t             *conf;
343     rd_kafka_conf_t             *cloned;
344     rd_kafka_conf_res_t          ret;
345     char                         errbuf[1024];
346
347     if ((conf = rd_kafka_conf_new()) == NULL) {
348         WARNING("cannot allocate kafka configuration.");
349         return -1;
350     }
351
352         for (i = 0; i < ci->children_num; i++)  {
353                 child = &ci->children[i];
354
355                 if (strcasecmp("Topic", child->key) == 0) {
356             if ((cloned = rd_kafka_conf_dup(conf)) == NULL) {
357                 WARNING("write_kafka plugin: cannot allocate memory for kafka config");
358                 goto errout;
359             }
360                         kafka_config_topic (cloned, child);
361                 } else if (strcasecmp(child->key, "Property") == 0) {
362                         char *key = NULL;
363                         char *val = NULL;
364
365                         if (child->values_num != 2) {
366                                 WARNING("kafka properties need both a key and a value.");
367                 goto errout;
368                         }
369                         if (child->values[0].type != OCONFIG_TYPE_STRING ||
370                             child->values[1].type != OCONFIG_TYPE_STRING) {
371                                 WARNING("kafka properties needs string arguments.");
372                 goto errout;
373                         }
374                         if ((key = strdup(child->values[0].value.string)) == NULL) {
375                                 WARNING("cannot allocate memory for attribute key.");
376                 goto errout;
377                         }
378                         if ((val = strdup(child->values[1].value.string)) == NULL) {
379                                 WARNING("cannot allocate memory for attribute value.");
380                 goto errout;
381                         }
382             ret = rd_kafka_conf_set(conf, key, val, errbuf, sizeof(errbuf));
383             if (ret != RD_KAFKA_CONF_OK) {
384                 WARNING("cannot set kafka property %s to %s: %s",
385                         key, val, errbuf);
386                 goto errout;
387             }
388                         sfree(key);
389                         sfree(val);
390                 } else {
391                         WARNING ("write_kafka plugin: Ignoring unknown "
392                                  "configuration option \"%s\" at top level.",
393                                  child->key);
394                 }
395         }
396     if (conf != NULL)
397         rd_kafka_conf_destroy(conf);
398         return (0);
399  errout:
400     if (conf != NULL)
401         rd_kafka_conf_destroy(conf);
402     return -1;
403 } /* }}} int kafka_config */
404
405 void module_register(void)
406 {
407         plugin_register_complex_config ("write_kafka", kafka_config);
408 }
409
410 /* vim: set sw=8 sts=8 ts=8 noet : */