a44f4c829d5835a6c44e15566e3f1985707cc058
[collectd.git] / src / mqtt.c
1 /**
2  * collectd - src/mqtt.c
3  * Copyright (C) 2014       Marc Falzon
4  * Copyright (C) 2014,2015  Florian octo Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Marc Falzon <marc at baha dot mu>
26  *   Florian octo Forster <octo at collectd.org>
27  *   Jan-Piet Mens <jpmens at gmail.com>
28  **/
29
30 // Reference: http://mosquitto.org/api/files/mosquitto-h.html
31
32 #include "collectd.h"
33
34 #include "plugin.h"
35 #include "utils/common/common.h"
36 #include "utils_complain.h"
37
38 #include <mosquitto.h>
39
40 #define MQTT_MAX_TOPIC_SIZE 1024
41 #define MQTT_MAX_MESSAGE_SIZE MQTT_MAX_TOPIC_SIZE + 1024
42 #define MQTT_DEFAULT_HOST "localhost"
43 #define MQTT_DEFAULT_PORT 1883
44 #define MQTT_DEFAULT_TOPIC_PREFIX "collectd"
45 #define MQTT_DEFAULT_TOPIC "collectd/#"
46 #ifndef MQTT_KEEPALIVE
47 #define MQTT_KEEPALIVE 60
48 #endif
49 #ifndef SSL_VERIFY_PEER
50 #define SSL_VERIFY_PEER 1
51 #endif
52
53 /*
54  * Data types
55  */
56 struct mqtt_client_conf {
57   bool publish;
58   char *name;
59
60   struct mosquitto *mosq;
61   bool connected;
62
63   char *host;
64   int port;
65   char *client_id;
66   char *username;
67   char *password;
68   int qos;
69   char *cacertificatefile;
70   char *certificatefile;
71   char *certificatekeyfile;
72   char *tlsprotocol;
73   char *ciphersuite;
74
75   /* For publishing */
76   char *topic_prefix;
77   bool store_rates;
78   bool retain;
79
80   /* For subscribing */
81   pthread_t thread;
82   bool loop;
83   char *topic;
84   bool clean_session;
85
86   c_complain_t complaint_cantpublish;
87   pthread_mutex_t lock;
88 };
89 typedef struct mqtt_client_conf mqtt_client_conf_t;
90
91 static mqtt_client_conf_t **subscribers;
92 static size_t subscribers_num;
93
94 /*
95  * Functions
96  */
97 #if LIBMOSQUITTO_MAJOR == 0
98 static char const *mosquitto_strerror(int code) {
99   switch (code) {
100   case MOSQ_ERR_SUCCESS:
101     return "MOSQ_ERR_SUCCESS";
102   case MOSQ_ERR_NOMEM:
103     return "MOSQ_ERR_NOMEM";
104   case MOSQ_ERR_PROTOCOL:
105     return "MOSQ_ERR_PROTOCOL";
106   case MOSQ_ERR_INVAL:
107     return "MOSQ_ERR_INVAL";
108   case MOSQ_ERR_NO_CONN:
109     return "MOSQ_ERR_NO_CONN";
110   case MOSQ_ERR_CONN_REFUSED:
111     return "MOSQ_ERR_CONN_REFUSED";
112   case MOSQ_ERR_NOT_FOUND:
113     return "MOSQ_ERR_NOT_FOUND";
114   case MOSQ_ERR_CONN_LOST:
115     return "MOSQ_ERR_CONN_LOST";
116   case MOSQ_ERR_SSL:
117     return "MOSQ_ERR_SSL";
118   case MOSQ_ERR_PAYLOAD_SIZE:
119     return "MOSQ_ERR_PAYLOAD_SIZE";
120   case MOSQ_ERR_NOT_SUPPORTED:
121     return "MOSQ_ERR_NOT_SUPPORTED";
122   case MOSQ_ERR_AUTH:
123     return "MOSQ_ERR_AUTH";
124   case MOSQ_ERR_ACL_DENIED:
125     return "MOSQ_ERR_ACL_DENIED";
126   case MOSQ_ERR_UNKNOWN:
127     return "MOSQ_ERR_UNKNOWN";
128   case MOSQ_ERR_ERRNO:
129     return "MOSQ_ERR_ERRNO";
130   }
131
132   return "UNKNOWN ERROR CODE";
133 }
134 #else
135 /* provided by libmosquitto */
136 #endif
137
138 static void mqtt_free(mqtt_client_conf_t *conf) {
139   if (conf == NULL)
140     return;
141
142   if (conf->connected)
143     (void)mosquitto_disconnect(conf->mosq);
144   conf->connected = false;
145   (void)mosquitto_destroy(conf->mosq);
146
147   sfree(conf->host);
148   sfree(conf->username);
149   sfree(conf->password);
150   sfree(conf->client_id);
151   sfree(conf->topic_prefix);
152   sfree(conf);
153 }
154
155 static char *strip_prefix(char *topic) {
156   size_t num = 0;
157
158   for (size_t i = 0; topic[i] != 0; i++)
159     if (topic[i] == '/')
160       num++;
161
162   if (num < 2)
163     return NULL;
164
165   while (num > 2) {
166     char *tmp = strchr(topic, '/');
167     if (tmp == NULL)
168       return NULL;
169     topic = tmp + 1;
170     num--;
171   }
172
173   return topic;
174 }
175
176 static void on_message(
177 #if LIBMOSQUITTO_MAJOR == 0
178 #else
179     __attribute__((unused)) struct mosquitto *m,
180 #endif
181     __attribute__((unused)) void *arg, const struct mosquitto_message *msg) {
182   value_list_t vl = VALUE_LIST_INIT;
183   data_set_t const *ds;
184   char *topic;
185   char *name;
186   char *payload;
187   int status;
188
189   if (msg->payloadlen <= 0) {
190     DEBUG("mqtt plugin: message has empty payload");
191     return;
192   }
193
194   topic = strdup(msg->topic);
195   name = strip_prefix(topic);
196
197   status = parse_identifier_vl(name, &vl);
198   if (status != 0) {
199     ERROR("mqtt plugin: Unable to parse topic \"%s\".", topic);
200     sfree(topic);
201     return;
202   }
203   sfree(topic);
204
205   ds = plugin_get_ds(vl.type);
206   if (ds == NULL) {
207     ERROR("mqtt plugin: Unknown type: \"%s\".", vl.type);
208     return;
209   }
210
211   vl.values = calloc(ds->ds_num, sizeof(*vl.values));
212   if (vl.values == NULL) {
213     ERROR("mqtt plugin: calloc failed.");
214     return;
215   }
216   vl.values_len = ds->ds_num;
217
218   payload = malloc(msg->payloadlen + 1);
219   if (payload == NULL) {
220     ERROR("mqtt plugin: malloc for payload buffer failed.");
221     sfree(vl.values);
222     return;
223   }
224   memmove(payload, msg->payload, msg->payloadlen);
225   payload[msg->payloadlen] = 0;
226
227   DEBUG("mqtt plugin: payload = \"%s\"", payload);
228   status = parse_values(payload, &vl, ds);
229   if (status != 0) {
230     ERROR("mqtt plugin: Unable to parse payload \"%s\".", payload);
231     sfree(payload);
232     sfree(vl.values);
233     return;
234   }
235   sfree(payload);
236
237   plugin_dispatch_values(&vl);
238   sfree(vl.values);
239 } /* void on_message */
240
241 /* must hold conf->lock when calling. */
242 static int mqtt_reconnect(mqtt_client_conf_t *conf) {
243   int status;
244
245   if (conf->connected)
246     return 0;
247
248   status = mosquitto_reconnect(conf->mosq);
249   if (status != MOSQ_ERR_SUCCESS) {
250     ERROR("mqtt_connect_broker: mosquitto_connect failed: %s",
251           (status == MOSQ_ERR_ERRNO) ? STRERRNO : mosquitto_strerror(status));
252     return -1;
253   }
254
255   conf->connected = true;
256
257   c_release(LOG_INFO, &conf->complaint_cantpublish,
258             "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
259             conf->host, conf->port);
260
261   return 0;
262 } /* mqtt_reconnect */
263
264 /* must hold conf->lock when calling. */
265 static int mqtt_connect(mqtt_client_conf_t *conf) {
266   char const *client_id;
267   int status;
268
269   if (conf->mosq != NULL)
270     return mqtt_reconnect(conf);
271
272   if (conf->client_id)
273     client_id = conf->client_id;
274   else
275     client_id = hostname_g;
276
277 #if LIBMOSQUITTO_MAJOR == 0
278   conf->mosq = mosquitto_new(client_id, /* user data = */ conf);
279 #else
280   conf->mosq =
281       mosquitto_new(client_id, conf->clean_session, /* user data = */ conf);
282 #endif
283   if (conf->mosq == NULL) {
284     ERROR("mqtt plugin: mosquitto_new failed");
285     return -1;
286   }
287
288 #if LIBMOSQUITTO_MAJOR != 0
289   if (conf->cacertificatefile) {
290     status = mosquitto_tls_set(conf->mosq, conf->cacertificatefile, NULL,
291                                conf->certificatefile, conf->certificatekeyfile,
292                                /* pw_callback */ NULL);
293     if (status != MOSQ_ERR_SUCCESS) {
294       ERROR("mqtt plugin: cannot mosquitto_tls_set: %s",
295             mosquitto_strerror(status));
296       mosquitto_destroy(conf->mosq);
297       conf->mosq = NULL;
298       return -1;
299     }
300
301     status = mosquitto_tls_opts_set(conf->mosq, SSL_VERIFY_PEER,
302                                     conf->tlsprotocol, conf->ciphersuite);
303     if (status != MOSQ_ERR_SUCCESS) {
304       ERROR("mqtt plugin: cannot mosquitto_tls_opts_set: %s",
305             mosquitto_strerror(status));
306       mosquitto_destroy(conf->mosq);
307       conf->mosq = NULL;
308       return -1;
309     }
310
311     status = mosquitto_tls_insecure_set(conf->mosq, false);
312     if (status != MOSQ_ERR_SUCCESS) {
313       ERROR("mqtt plugin: cannot mosquitto_tls_insecure_set: %s",
314             mosquitto_strerror(status));
315       mosquitto_destroy(conf->mosq);
316       conf->mosq = NULL;
317       return -1;
318     }
319   }
320 #endif
321
322   if (conf->username && conf->password) {
323     status =
324         mosquitto_username_pw_set(conf->mosq, conf->username, conf->password);
325     if (status != MOSQ_ERR_SUCCESS) {
326       ERROR("mqtt plugin: mosquitto_username_pw_set failed: %s",
327             (status == MOSQ_ERR_ERRNO) ? STRERRNO : mosquitto_strerror(status));
328
329       mosquitto_destroy(conf->mosq);
330       conf->mosq = NULL;
331       return -1;
332     }
333   }
334
335 #if LIBMOSQUITTO_MAJOR == 0
336   status = mosquitto_connect(conf->mosq, conf->host, conf->port,
337                              /* keepalive = */ MQTT_KEEPALIVE,
338                              /* clean session = */ conf->clean_session);
339 #else
340   status =
341       mosquitto_connect(conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
342 #endif
343   if (status != MOSQ_ERR_SUCCESS) {
344     ERROR("mqtt plugin: mosquitto_connect failed: %s",
345           (status == MOSQ_ERR_ERRNO) ? STRERRNO : mosquitto_strerror(status));
346
347     mosquitto_destroy(conf->mosq);
348     conf->mosq = NULL;
349     return -1;
350   }
351
352   if (!conf->publish) {
353     mosquitto_message_callback_set(conf->mosq, on_message);
354
355     status =
356         mosquitto_subscribe(conf->mosq,
357                             /* message_id = */ NULL, conf->topic, conf->qos);
358     if (status != MOSQ_ERR_SUCCESS) {
359       ERROR("mqtt plugin: Subscribing to \"%s\" failed: %s", conf->topic,
360             mosquitto_strerror(status));
361
362       mosquitto_disconnect(conf->mosq);
363       mosquitto_destroy(conf->mosq);
364       conf->mosq = NULL;
365       return -1;
366     }
367   }
368
369   conf->connected = true;
370   return 0;
371 } /* mqtt_connect */
372
373 static void *subscribers_thread(void *arg) {
374   mqtt_client_conf_t *conf = arg;
375   int status;
376
377   conf->loop = 1;
378
379   while (conf->loop) {
380     status = mqtt_connect(conf);
381     if (status != 0) {
382       sleep(1);
383       continue;
384     }
385
386 /* The documentation says "0" would map to the default (1000ms), but
387  * that does not work on some versions. */
388 #if LIBMOSQUITTO_MAJOR == 0
389     status = mosquitto_loop(conf->mosq, /* timeout = */ 1000 /* ms */);
390 #else
391     status = mosquitto_loop(conf->mosq,
392                             /* timeout[ms] = */ 1000,
393                             /* max_packets = */ 100);
394 #endif
395     if (status == MOSQ_ERR_CONN_LOST) {
396       conf->connected = false;
397       continue;
398     } else if (status != MOSQ_ERR_SUCCESS) {
399       ERROR("mqtt plugin: mosquitto_loop failed: %s",
400             mosquitto_strerror(status));
401       mosquitto_destroy(conf->mosq);
402       conf->mosq = NULL;
403       conf->connected = false;
404       continue;
405     }
406
407     DEBUG("mqtt plugin: mosquitto_loop succeeded.");
408   } /* while (conf->loop) */
409
410   pthread_exit(0);
411 } /* void *subscribers_thread */
412
413 static int publish(mqtt_client_conf_t *conf, char const *topic,
414                    void const *payload, size_t payload_len) {
415   int status;
416
417   pthread_mutex_lock(&conf->lock);
418
419   status = mqtt_connect(conf);
420   if (status != 0) {
421     pthread_mutex_unlock(&conf->lock);
422     ERROR("mqtt plugin: unable to reconnect to broker");
423     return status;
424   }
425
426   status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
427 #if LIBMOSQUITTO_MAJOR == 0
428                              (uint32_t)payload_len, payload,
429 #else
430                              (int)payload_len, payload,
431 #endif
432                              conf->qos, conf->retain);
433   if (status != MOSQ_ERR_SUCCESS) {
434     c_complain(LOG_ERR, &conf->complaint_cantpublish,
435                "mqtt plugin: mosquitto_publish failed: %s",
436                (status == MOSQ_ERR_ERRNO) ? STRERRNO
437                                           : mosquitto_strerror(status));
438     /* Mark our connection "down" regardless of the error as a safety
439      * measure; we will try to reconnect the next time we have to publish a
440      * message */
441     conf->connected = false;
442     mosquitto_disconnect(conf->mosq);
443
444     pthread_mutex_unlock(&conf->lock);
445     return -1;
446   }
447
448   pthread_mutex_unlock(&conf->lock);
449   return 0;
450 } /* int publish */
451
452 static int format_topic(char *buf, size_t buf_len, data_set_t const *ds,
453                         value_list_t const *vl, mqtt_client_conf_t *conf) {
454   char name[MQTT_MAX_TOPIC_SIZE];
455   int status;
456   char *c;
457
458   if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
459     return FORMAT_VL(buf, buf_len, vl);
460
461   status = FORMAT_VL(name, sizeof(name), vl);
462   if (status != 0)
463     return status;
464
465   status = ssnprintf(buf, buf_len, "%s/%s", conf->topic_prefix, name);
466   if ((status < 0) || (((size_t)status) >= buf_len))
467     return ENOMEM;
468
469   while ((c = strchr(buf, '#')) || (c = strchr(buf, '+'))) {
470     *c = '_';
471   }
472
473   return 0;
474 } /* int format_topic */
475
476 static int mqtt_write(const data_set_t *ds, const value_list_t *vl,
477                       user_data_t *user_data) {
478   mqtt_client_conf_t *conf;
479   char topic[MQTT_MAX_TOPIC_SIZE];
480   char payload[MQTT_MAX_MESSAGE_SIZE];
481   int status = 0;
482
483   if ((user_data == NULL) || (user_data->data == NULL))
484     return EINVAL;
485   conf = user_data->data;
486
487   status = format_topic(topic, sizeof(topic), ds, vl, conf);
488   if (status != 0) {
489     ERROR("mqtt plugin: format_topic failed with status %d.", status);
490     return status;
491   }
492
493   status = format_values(payload, sizeof(payload), ds, vl, conf->store_rates);
494   if (status != 0) {
495     ERROR("mqtt plugin: format_values failed with status %d.", status);
496     return status;
497   }
498
499   status = publish(conf, topic, payload, strlen(payload) + 1);
500   if (status != 0) {
501     ERROR("mqtt plugin: publish failed: %s", mosquitto_strerror(status));
502     return status;
503   }
504
505   return status;
506 } /* mqtt_write */
507
508 /*
509  * <Publish "name">
510  *   Host "example.com"
511  *   Port 1883
512  *   ClientId "collectd"
513  *   User "guest"
514  *   Password "secret"
515  *   Prefix "collectd"
516  *   StoreRates true
517  *   Retain false
518  *   QoS 0
519  *   CACert "ca.pem"                      Enables TLS if set
520  *   CertificateFile "client-cert.pem"    optional
521  *   CertificateKeyFile "client-key.pem"  optional
522  *   TLSProtocol "tlsv1.2"                optional
523  * </Publish>
524  */
525 static int mqtt_config_publisher(oconfig_item_t *ci) {
526   mqtt_client_conf_t *conf;
527   char cb_name[1024];
528   int status;
529
530   conf = calloc(1, sizeof(*conf));
531   if (conf == NULL) {
532     ERROR("mqtt plugin: calloc failed.");
533     return -1;
534   }
535   conf->publish = true;
536
537   conf->name = NULL;
538   status = cf_util_get_string(ci, &conf->name);
539   if (status != 0) {
540     mqtt_free(conf);
541     return status;
542   }
543
544   conf->host = strdup(MQTT_DEFAULT_HOST);
545   conf->port = MQTT_DEFAULT_PORT;
546   conf->client_id = NULL;
547   conf->qos = 0;
548   conf->topic_prefix = strdup(MQTT_DEFAULT_TOPIC_PREFIX);
549   conf->store_rates = true;
550
551   status = pthread_mutex_init(&conf->lock, NULL);
552   if (status != 0) {
553     mqtt_free(conf);
554     return status;
555   }
556
557   C_COMPLAIN_INIT(&conf->complaint_cantpublish);
558
559   for (int i = 0; i < ci->children_num; i++) {
560     oconfig_item_t *child = ci->children + i;
561     if (strcasecmp("Host", child->key) == 0)
562       cf_util_get_string(child, &conf->host);
563     else if (strcasecmp("Port", child->key) == 0) {
564       int tmp = cf_util_get_port_number(child);
565       if (tmp < 0)
566         ERROR("mqtt plugin: Invalid port number.");
567       else
568         conf->port = tmp;
569     } else if (strcasecmp("ClientId", child->key) == 0)
570       cf_util_get_string(child, &conf->client_id);
571     else if (strcasecmp("User", child->key) == 0)
572       cf_util_get_string(child, &conf->username);
573     else if (strcasecmp("Password", child->key) == 0)
574       cf_util_get_string(child, &conf->password);
575     else if (strcasecmp("QoS", child->key) == 0) {
576       int tmp = -1;
577       status = cf_util_get_int(child, &tmp);
578       if ((status != 0) || (tmp < 0) || (tmp > 2))
579         ERROR("mqtt plugin: Not a valid QoS setting.");
580       else
581         conf->qos = tmp;
582     } else if (strcasecmp("Prefix", child->key) == 0)
583       cf_util_get_string(child, &conf->topic_prefix);
584     else if (strcasecmp("StoreRates", child->key) == 0)
585       cf_util_get_boolean(child, &conf->store_rates);
586     else if (strcasecmp("Retain", child->key) == 0)
587       cf_util_get_boolean(child, &conf->retain);
588     else if (strcasecmp("CACert", child->key) == 0)
589       cf_util_get_string(child, &conf->cacertificatefile);
590     else if (strcasecmp("CertificateFile", child->key) == 0)
591       cf_util_get_string(child, &conf->certificatefile);
592     else if (strcasecmp("CertificateKeyFile", child->key) == 0)
593       cf_util_get_string(child, &conf->certificatekeyfile);
594     else if (strcasecmp("TLSProtocol", child->key) == 0)
595       cf_util_get_string(child, &conf->tlsprotocol);
596     else if (strcasecmp("CipherSuite", child->key) == 0)
597       cf_util_get_string(child, &conf->ciphersuite);
598     else
599       ERROR("mqtt plugin: Unknown config option: %s", child->key);
600   }
601
602   ssnprintf(cb_name, sizeof(cb_name), "mqtt/%s", conf->name);
603   plugin_register_write(cb_name, mqtt_write,
604                         &(user_data_t){
605                             .data = conf,
606                         });
607   return 0;
608 } /* mqtt_config_publisher */
609
610 /*
611  * <Subscribe "name">
612  *   Host "example.com"
613  *   Port 1883
614  *   ClientId "collectd"
615  *   User "guest"
616  *   Password "secret"
617  *   Topic "collectd/#"
618  *   CACert "ca.pem"                      Enables TLS if set
619  *   CertificateFile "client-cert.pem"    optional
620  *   CertificateKeyFile "client-key.pem"  optional
621  *   TLSProtocol "tlsv1.2"                optional
622  * </Subscribe>
623  */
624 static int mqtt_config_subscriber(oconfig_item_t *ci) {
625   mqtt_client_conf_t **tmp;
626   mqtt_client_conf_t *conf;
627   int status;
628
629   conf = calloc(1, sizeof(*conf));
630   if (conf == NULL) {
631     ERROR("mqtt plugin: calloc failed.");
632     return -1;
633   }
634   conf->publish = false;
635
636   conf->name = NULL;
637   status = cf_util_get_string(ci, &conf->name);
638   if (status != 0) {
639     mqtt_free(conf);
640     return status;
641   }
642
643   conf->host = strdup(MQTT_DEFAULT_HOST);
644   conf->port = MQTT_DEFAULT_PORT;
645   conf->client_id = NULL;
646   conf->qos = 2;
647   conf->topic = strdup(MQTT_DEFAULT_TOPIC);
648   conf->clean_session = true;
649
650   status = pthread_mutex_init(&conf->lock, NULL);
651   if (status != 0) {
652     mqtt_free(conf);
653     return status;
654   }
655
656   C_COMPLAIN_INIT(&conf->complaint_cantpublish);
657
658   for (int i = 0; i < ci->children_num; i++) {
659     oconfig_item_t *child = ci->children + i;
660     if (strcasecmp("Host", child->key) == 0)
661       cf_util_get_string(child, &conf->host);
662     else if (strcasecmp("Port", child->key) == 0) {
663       status = cf_util_get_port_number(child);
664       if (status < 0)
665         ERROR("mqtt plugin: Invalid port number.");
666       else
667         conf->port = status;
668     } else if (strcasecmp("ClientId", child->key) == 0)
669       cf_util_get_string(child, &conf->client_id);
670     else if (strcasecmp("User", child->key) == 0)
671       cf_util_get_string(child, &conf->username);
672     else if (strcasecmp("Password", child->key) == 0)
673       cf_util_get_string(child, &conf->password);
674     else if (strcasecmp("QoS", child->key) == 0) {
675       int qos = -1;
676       status = cf_util_get_int(child, &qos);
677       if ((status != 0) || (qos < 0) || (qos > 2))
678         ERROR("mqtt plugin: Not a valid QoS setting.");
679       else
680         conf->qos = qos;
681     } else if (strcasecmp("Topic", child->key) == 0)
682       cf_util_get_string(child, &conf->topic);
683     else if (strcasecmp("CleanSession", child->key) == 0)
684       cf_util_get_boolean(child, &conf->clean_session);
685     else if (strcasecmp("CACert", child->key) == 0)
686       cf_util_get_string(child, &conf->cacertificatefile);
687     else if (strcasecmp("CertificateFile", child->key) == 0)
688       cf_util_get_string(child, &conf->certificatefile);
689     else if (strcasecmp("CertificateKeyFile", child->key) == 0)
690       cf_util_get_string(child, &conf->certificatekeyfile);
691     else if (strcasecmp("TLSProtocol", child->key) == 0)
692       cf_util_get_string(child, &conf->tlsprotocol);
693     else if (strcasecmp("CipherSuite", child->key) == 0)
694       cf_util_get_string(child, &conf->ciphersuite);
695     else
696       ERROR("mqtt plugin: Unknown config option: %s", child->key);
697   }
698
699   tmp = realloc(subscribers, sizeof(*subscribers) * (subscribers_num + 1));
700   if (tmp == NULL) {
701     ERROR("mqtt plugin: realloc failed.");
702     mqtt_free(conf);
703     return -1;
704   }
705   subscribers = tmp;
706   subscribers[subscribers_num] = conf;
707   subscribers_num++;
708
709   return 0;
710 } /* mqtt_config_subscriber */
711
712 /*
713  * <Plugin mqtt>
714  *   <Publish "name">
715  *     # ...
716  *   </Publish>
717  *   <Subscribe "name">
718  *     # ...
719  *   </Subscribe>
720  * </Plugin>
721  */
722 static int mqtt_config(oconfig_item_t *ci) {
723   for (int i = 0; i < ci->children_num; i++) {
724     oconfig_item_t *child = ci->children + i;
725
726     if (strcasecmp("Publish", child->key) == 0)
727       mqtt_config_publisher(child);
728     else if (strcasecmp("Subscribe", child->key) == 0)
729       mqtt_config_subscriber(child);
730     else
731       ERROR("mqtt plugin: Unknown config option: %s", child->key);
732   }
733
734   return 0;
735 } /* int mqtt_config */
736
737 static int mqtt_init(void) {
738   mosquitto_lib_init();
739
740   for (size_t i = 0; i < subscribers_num; i++) {
741     int status;
742
743     if (subscribers[i]->loop)
744       continue;
745
746     status = plugin_thread_create(&subscribers[i]->thread,
747                                   /* attrs = */ NULL,
748                                   /* func  = */ subscribers_thread,
749                                   /* args  = */ subscribers[i],
750                                   /* name  = */ "mqtt");
751     if (status != 0) {
752       ERROR("mqtt plugin: pthread_create failed: %s", STRERRNO);
753       continue;
754     }
755   }
756
757   return 0;
758 } /* mqtt_init */
759
760 void module_register(void) {
761   plugin_register_complex_config("mqtt", mqtt_config);
762   plugin_register_init("mqtt", mqtt_init);
763 } /* void module_register */