Merge pull request #1758 from rubenk/README
[collectd.git] / src / mqtt.c
1 /**
2  * collectd - src/mqtt.c
3  * Copyright (C) 2014       Marc Falzon
4  * Copyright (C) 2014,2015  Florian octo Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Marc Falzon <marc at baha dot mu>
26  *   Florian octo Forster <octo at collectd.org>
27  *   Jan-Piet Mens <jpmens at gmail.com>
28  **/
29
30 // Reference: http://mosquitto.org/api/files/mosquitto-h.html
31
32
33 #include "collectd.h"
34 #include "common.h"
35 #include "plugin.h"
36 #include "utils_cache.h"
37 #include "utils_complain.h"
38
39 #include <mosquitto.h>
40
41 #define MQTT_MAX_TOPIC_SIZE         1024
42 #define MQTT_MAX_MESSAGE_SIZE       MQTT_MAX_TOPIC_SIZE + 1024
43 #define MQTT_DEFAULT_HOST           "localhost"
44 #define MQTT_DEFAULT_PORT           1883
45 #define MQTT_DEFAULT_TOPIC_PREFIX   "collectd"
46 #define MQTT_DEFAULT_TOPIC          "collectd/#"
47 #ifndef MQTT_KEEPALIVE
48 # define MQTT_KEEPALIVE 60
49 #endif
50 #ifndef SSL_VERIFY_PEER
51 # define SSL_VERIFY_PEER  1
52 #endif
53
54
55 /*
56  * Data types
57  */
58 struct mqtt_client_conf
59 {
60     _Bool               publish;
61     char               *name;
62
63     struct mosquitto   *mosq;
64     _Bool               connected;
65
66     char               *host;
67     int                 port;
68     char               *client_id;
69     char               *username;
70     char               *password;
71     int                 qos;
72     char                *cacertificatefile;
73     char                *certificatefile;
74     char                *certificatekeyfile;
75     char                *tlsprotocol;
76     char                *ciphersuite;
77
78     /* For publishing */
79     char               *topic_prefix;
80     _Bool               store_rates;
81     _Bool               retain;
82
83     /* For subscribing */
84     pthread_t           thread;
85     _Bool               loop;
86     char               *topic;
87     _Bool               clean_session;
88
89     c_complain_t        complaint_cantpublish;
90     pthread_mutex_t     lock;
91 };
92 typedef struct mqtt_client_conf mqtt_client_conf_t;
93
94 static mqtt_client_conf_t **subscribers = NULL;
95 static size_t subscribers_num = 0;
96
97 /*
98  * Functions
99  */
100 #if LIBMOSQUITTO_MAJOR == 0
101 static char const *mosquitto_strerror (int code)
102 {
103     switch (code)
104     {
105         case MOSQ_ERR_SUCCESS: return "MOSQ_ERR_SUCCESS";
106         case MOSQ_ERR_NOMEM: return "MOSQ_ERR_NOMEM";
107         case MOSQ_ERR_PROTOCOL: return "MOSQ_ERR_PROTOCOL";
108         case MOSQ_ERR_INVAL: return "MOSQ_ERR_INVAL";
109         case MOSQ_ERR_NO_CONN: return "MOSQ_ERR_NO_CONN";
110         case MOSQ_ERR_CONN_REFUSED: return "MOSQ_ERR_CONN_REFUSED";
111         case MOSQ_ERR_NOT_FOUND: return "MOSQ_ERR_NOT_FOUND";
112         case MOSQ_ERR_CONN_LOST: return "MOSQ_ERR_CONN_LOST";
113         case MOSQ_ERR_SSL: return "MOSQ_ERR_SSL";
114         case MOSQ_ERR_PAYLOAD_SIZE: return "MOSQ_ERR_PAYLOAD_SIZE";
115         case MOSQ_ERR_NOT_SUPPORTED: return "MOSQ_ERR_NOT_SUPPORTED";
116         case MOSQ_ERR_AUTH: return "MOSQ_ERR_AUTH";
117         case MOSQ_ERR_ACL_DENIED: return "MOSQ_ERR_ACL_DENIED";
118         case MOSQ_ERR_UNKNOWN: return "MOSQ_ERR_UNKNOWN";
119         case MOSQ_ERR_ERRNO: return "MOSQ_ERR_ERRNO";
120     }
121
122     return "UNKNOWN ERROR CODE";
123 }
124 #else
125 /* provided by libmosquitto */
126 #endif
127
128 static void mqtt_free (mqtt_client_conf_t *conf)
129 {
130     if (conf == NULL)
131         return;
132
133     if (conf->connected)
134         (void) mosquitto_disconnect (conf->mosq);
135     conf->connected = 0;
136     (void) mosquitto_destroy (conf->mosq);
137
138     sfree (conf->host);
139     sfree (conf->username);
140     sfree (conf->password);
141     sfree (conf->client_id);
142     sfree (conf->topic_prefix);
143     sfree (conf);
144 }
145
146 static char *strip_prefix (char *topic)
147 {
148     size_t num;
149     size_t i;
150
151     num = 0;
152     for (i = 0; topic[i] != 0; i++)
153         if (topic[i] == '/')
154             num++;
155
156     if (num < 2)
157         return (NULL);
158
159     while (num > 2)
160     {
161         char *tmp = strchr (topic, '/');
162         if (tmp == NULL)
163             return (NULL);
164         topic = tmp + 1;
165         num--;
166     }
167
168     return (topic);
169 }
170
171 static void on_message (
172 #if LIBMOSQUITTO_MAJOR == 0
173 #else
174         __attribute__((unused)) struct mosquitto *m,
175 #endif
176         __attribute__((unused)) void *arg,
177         const struct mosquitto_message *msg)
178 {
179     value_list_t vl = VALUE_LIST_INIT;
180     data_set_t const *ds;
181     char *topic;
182     char *name;
183     char *payload;
184     int status;
185
186     if (msg->payloadlen <= 0) {
187         DEBUG ("mqtt plugin: message has empty payload");
188         return;
189     }
190
191     topic = strdup (msg->topic);
192     name = strip_prefix (topic);
193
194     status = parse_identifier_vl (name, &vl);
195     if (status != 0)
196     {
197         ERROR ("mqtt plugin: Unable to parse topic \"%s\".", topic);
198         sfree (topic);
199         return;
200     }
201     sfree (topic);
202
203     ds = plugin_get_ds (vl.type);
204     if (ds == NULL)
205     {
206         ERROR ("mqtt plugin: Unknown type: \"%s\".", vl.type);
207         return;
208     }
209
210     vl.values = calloc (ds->ds_num, sizeof (*vl.values));
211     if (vl.values == NULL)
212     {
213         ERROR ("mqtt plugin: calloc failed.");
214         return;
215     }
216     vl.values_len = ds->ds_num;
217
218     payload = malloc (msg->payloadlen+1);
219     if (payload == NULL)
220     {
221         ERROR ("mqtt plugin: malloc for payload buffer failed.");
222         sfree (vl.values);
223         return;
224     }
225     memmove (payload, msg->payload, msg->payloadlen);
226     payload[msg->payloadlen] = 0;
227
228     DEBUG ("mqtt plugin: payload = \"%s\"", payload);
229     status = parse_values (payload, &vl, ds);
230     if (status != 0)
231     {
232         ERROR ("mqtt plugin: Unable to parse payload \"%s\".", payload);
233         sfree (payload);
234         sfree (vl.values);
235         return;
236     }
237     sfree (payload);
238
239     plugin_dispatch_values (&vl);
240     sfree (vl.values);
241 } /* void on_message */
242
243 /* must hold conf->lock when calling. */
244 static int mqtt_reconnect (mqtt_client_conf_t *conf)
245 {
246     int status;
247
248     if (conf->connected)
249         return (0);
250
251     status = mosquitto_reconnect (conf->mosq);
252     if (status != MOSQ_ERR_SUCCESS)
253     {
254         char errbuf[1024];
255         ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s",
256                 (status == MOSQ_ERR_ERRNO)
257                 ? sstrerror(errno, errbuf, sizeof (errbuf))
258                 : mosquitto_strerror (status));
259         return (-1);
260     }
261
262     conf->connected = 1;
263
264     c_release (LOG_INFO,
265             &conf->complaint_cantpublish,
266             "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
267             conf->host, conf->port);
268
269     return (0);
270 } /* mqtt_reconnect */
271
272 /* must hold conf->lock when calling. */
273 static int mqtt_connect (mqtt_client_conf_t *conf)
274 {
275     char const *client_id;
276     int status;
277
278     if (conf->mosq != NULL)
279         return mqtt_reconnect (conf);
280
281     if (conf->client_id)
282         client_id = conf->client_id;
283     else
284         client_id = hostname_g;
285
286 #if LIBMOSQUITTO_MAJOR == 0
287     conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
288 #else
289     conf->mosq = mosquitto_new (client_id, conf->clean_session, /* user data = */ conf);
290 #endif
291     if (conf->mosq == NULL)
292     {
293         ERROR ("mqtt plugin: mosquitto_new failed");
294         return (-1);
295     }
296
297 #if LIBMOSQUITTO_MAJOR != 0
298     if (conf->cacertificatefile) {
299         status = mosquitto_tls_set(conf->mosq, conf->cacertificatefile, NULL,
300                                    conf->certificatefile, conf->certificatekeyfile, /* pw_callback */NULL);
301         if (status != MOSQ_ERR_SUCCESS) {
302             ERROR ("mqtt plugin: cannot mosquitto_tls_set: %s", mosquitto_strerror(status));
303             mosquitto_destroy (conf->mosq);
304             conf->mosq = NULL;
305             return (-1);
306         }
307
308         status = mosquitto_tls_opts_set(conf->mosq, SSL_VERIFY_PEER, conf->tlsprotocol, conf->ciphersuite);
309         if (status != MOSQ_ERR_SUCCESS) {
310             ERROR ("mqtt plugin: cannot mosquitto_tls_opts_set: %s", mosquitto_strerror(status));
311             mosquitto_destroy (conf->mosq);
312             conf->mosq = NULL;
313             return (-1);
314         }
315
316         status = mosquitto_tls_insecure_set(conf->mosq, false);
317         if (status != MOSQ_ERR_SUCCESS) {
318             ERROR ("mqtt plugin: cannot mosquitto_tls_insecure_set: %s", mosquitto_strerror(status));
319             mosquitto_destroy (conf->mosq);
320             conf->mosq = NULL;
321             return (-1);
322         }
323     }
324 #endif
325
326     if (conf->username && conf->password)
327     {
328         status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password);
329         if (status != MOSQ_ERR_SUCCESS)
330         {
331             char errbuf[1024];
332             ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s",
333                     (status == MOSQ_ERR_ERRNO)
334                     ? sstrerror (errno, errbuf, sizeof (errbuf))
335                     : mosquitto_strerror (status));
336
337             mosquitto_destroy (conf->mosq);
338             conf->mosq = NULL;
339             return (-1);
340         }
341     }
342
343 #if LIBMOSQUITTO_MAJOR == 0
344     status = mosquitto_connect (conf->mosq, conf->host, conf->port,
345             /* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session);
346 #else
347     status = mosquitto_connect (conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
348 #endif
349     if (status != MOSQ_ERR_SUCCESS)
350     {
351         char errbuf[1024];
352         ERROR ("mqtt plugin: mosquitto_connect failed: %s",
353                 (status == MOSQ_ERR_ERRNO)
354                 ? sstrerror (errno, errbuf, sizeof (errbuf))
355                 : mosquitto_strerror (status));
356
357         mosquitto_destroy (conf->mosq);
358         conf->mosq = NULL;
359         return (-1);
360     }
361
362     if (!conf->publish)
363     {
364         mosquitto_message_callback_set (conf->mosq, on_message);
365
366         status = mosquitto_subscribe (conf->mosq,
367                 /* message_id = */ NULL,
368                 conf->topic, conf->qos);
369         if (status != MOSQ_ERR_SUCCESS)
370         {
371             ERROR ("mqtt plugin: Subscribing to \"%s\" failed: %s",
372                     conf->topic, mosquitto_strerror (status));
373
374             mosquitto_disconnect (conf->mosq);
375             mosquitto_destroy (conf->mosq);
376             conf->mosq = NULL;
377             return (-1);
378         }
379     }
380
381     conf->connected = 1;
382     return (0);
383 } /* mqtt_connect */
384
385 static void *subscribers_thread (void *arg)
386 {
387     mqtt_client_conf_t *conf = arg;
388     int status;
389
390     conf->loop = 1;
391
392     while (conf->loop)
393     {
394         status = mqtt_connect (conf);
395         if (status != 0)
396         {
397             sleep (1);
398             continue;
399         }
400
401         /* The documentation says "0" would map to the default (1000ms), but
402          * that does not work on some versions. */
403 #if LIBMOSQUITTO_MAJOR == 0
404         status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
405 #else
406         status = mosquitto_loop (conf->mosq,
407                 /* timeout[ms] = */ 1000,
408                 /* max_packets = */  100);
409 #endif
410         if (status == MOSQ_ERR_CONN_LOST)
411         {
412             conf->connected = 0;
413             continue;
414         }
415         else if (status != MOSQ_ERR_SUCCESS)
416         {
417             ERROR ("mqtt plugin: mosquitto_loop failed: %s",
418                     mosquitto_strerror (status));
419             mosquitto_destroy (conf->mosq);
420             conf->mosq = NULL;
421             conf->connected = 0;
422             continue;
423         }
424
425         DEBUG ("mqtt plugin: mosquitto_loop succeeded.");
426     } /* while (conf->loop) */
427
428     pthread_exit (0);
429 } /* void *subscribers_thread */
430
431 static int publish (mqtt_client_conf_t *conf, char const *topic,
432     void const *payload, size_t payload_len)
433 {
434     int status;
435
436     pthread_mutex_lock (&conf->lock);
437
438     status = mqtt_connect (conf);
439     if (status != 0) {
440         pthread_mutex_unlock (&conf->lock);
441         ERROR ("mqtt plugin: unable to reconnect to broker");
442         return (status);
443     }
444
445     status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
446 #if LIBMOSQUITTO_MAJOR == 0
447             (uint32_t) payload_len, payload,
448 #else
449             (int) payload_len, payload,
450 #endif
451             conf->qos, conf->retain);
452     if (status != MOSQ_ERR_SUCCESS)
453     {
454         char errbuf[1024];
455         c_complain (LOG_ERR,
456             &conf->complaint_cantpublish,
457             "mqtt plugin: mosquitto_publish failed: %s",
458             (status == MOSQ_ERR_ERRNO)
459             ? sstrerror(errno, errbuf, sizeof (errbuf))
460             : mosquitto_strerror(status));
461         /* Mark our connection "down" regardless of the error as a safety
462          * measure; we will try to reconnect the next time we have to publish a
463          * message */
464         conf->connected = 0;
465
466         pthread_mutex_unlock (&conf->lock);
467         return (-1);
468     }
469
470     pthread_mutex_unlock (&conf->lock);
471     return (0);
472 } /* int publish */
473
474 static int format_topic (char *buf, size_t buf_len,
475     data_set_t const *ds, value_list_t const *vl,
476     mqtt_client_conf_t *conf)
477 {
478     char name[MQTT_MAX_TOPIC_SIZE];
479     int status;
480
481     if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
482         return (FORMAT_VL (buf, buf_len, vl));
483
484     status = FORMAT_VL (name, sizeof (name), vl);
485     if (status != 0)
486         return (status);
487
488     status = ssnprintf (buf, buf_len, "%s/%s", conf->topic_prefix, name);
489     if ((status < 0) || (((size_t) status) >= buf_len))
490         return (ENOMEM);
491
492     return (0);
493 } /* int format_topic */
494
495 static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
496     user_data_t *user_data)
497 {
498     mqtt_client_conf_t *conf;
499     char topic[MQTT_MAX_TOPIC_SIZE];
500     char payload[MQTT_MAX_MESSAGE_SIZE];
501     int status = 0;
502
503     if ((user_data == NULL) || (user_data->data == NULL))
504         return (EINVAL);
505     conf = user_data->data;
506
507     status = format_topic (topic, sizeof (topic), ds, vl, conf);
508     if (status != 0)
509     {
510         ERROR ("mqtt plugin: format_topic failed with status %d.", status);
511         return (status);
512     }
513
514     status = format_values (payload, sizeof (payload),
515             ds, vl, conf->store_rates);
516     if (status != 0)
517     {
518         ERROR ("mqtt plugin: format_values failed with status %d.", status);
519         return (status);
520     }
521
522     status = publish (conf, topic, payload, strlen (payload) + 1);
523     if (status != 0)
524     {
525         ERROR ("mqtt plugin: publish failed: %s", mosquitto_strerror (status));
526         return (status);
527     }
528
529     return (status);
530 } /* mqtt_write */
531
532 /*
533  * <Publish "name">
534  *   Host "example.com"
535  *   Port 1883
536  *   ClientId "collectd"
537  *   User "guest"
538  *   Password "secret"
539  *   Prefix "collectd"
540  *   StoreRates true
541  *   Retain false
542  *   QoS 0
543  *   CACert "ca.pem"                    Enables TLS if set
544  *   CertificateFile "client-cert.pem"          optional
545  *   CertificateKeyFile "client-key.pem"                optional
546  *   TLSProtocol "tlsv1.2"              optional
547  * </Publish>
548  */
549 static int mqtt_config_publisher (oconfig_item_t *ci)
550 {
551     mqtt_client_conf_t *conf;
552     char cb_name[1024];
553     user_data_t user_data;
554     int status;
555     int i;
556
557     conf = calloc (1, sizeof (*conf));
558     if (conf == NULL)
559     {
560         ERROR ("mqtt plugin: calloc failed.");
561         return (-1);
562     }
563     conf->publish = 1;
564
565     conf->name = NULL;
566     status = cf_util_get_string (ci, &conf->name);
567     if (status != 0)
568     {
569         mqtt_free (conf);
570         return (status);
571     }
572
573     conf->host = strdup (MQTT_DEFAULT_HOST);
574     conf->port = MQTT_DEFAULT_PORT;
575     conf->client_id = NULL;
576     conf->qos = 0;
577     conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
578     conf->store_rates = 1;
579
580     status = pthread_mutex_init (&conf->lock, NULL);
581     if (status != 0)
582     {
583       mqtt_free (conf);
584       return (status);
585     }
586
587     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
588
589     for (i = 0; i < ci->children_num; i++)
590     {
591         oconfig_item_t *child = ci->children + i;
592         if (strcasecmp ("Host", child->key) == 0)
593             cf_util_get_string (child, &conf->host);
594         else if (strcasecmp ("Port", child->key) == 0)
595         {
596             int tmp = cf_util_get_port_number (child);
597             if (tmp < 0)
598                 ERROR ("mqtt plugin: Invalid port number.");
599             else
600                 conf->port = tmp;
601         }
602         else if (strcasecmp ("ClientId", child->key) == 0)
603             cf_util_get_string (child, &conf->client_id);
604         else if (strcasecmp ("User", child->key) == 0)
605             cf_util_get_string (child, &conf->username);
606         else if (strcasecmp ("Password", child->key) == 0)
607             cf_util_get_string (child, &conf->password);
608         else if (strcasecmp ("QoS", child->key) == 0)
609         {
610             int tmp = -1;
611             status = cf_util_get_int (child, &tmp);
612             if ((status != 0) || (tmp < 0) || (tmp > 2))
613                 ERROR ("mqtt plugin: Not a valid QoS setting.");
614             else
615                 conf->qos = tmp;
616         }
617         else if (strcasecmp ("Prefix", child->key) == 0)
618             cf_util_get_string (child, &conf->topic_prefix);
619         else if (strcasecmp ("StoreRates", child->key) == 0)
620             cf_util_get_boolean (child, &conf->store_rates);
621         else if (strcasecmp ("Retain", child->key) == 0)
622             cf_util_get_boolean (child, &conf->retain);
623         else if (strcasecmp ("CACert", child->key) == 0)
624             cf_util_get_string (child, &conf->cacertificatefile);
625         else if (strcasecmp ("CertificateFile", child->key) == 0)
626             cf_util_get_string (child, &conf->certificatefile);
627         else if (strcasecmp ("CertificateKeyFile", child->key) == 0)
628             cf_util_get_string (child, &conf->certificatekeyfile);
629         else if (strcasecmp ("TLSProtocol", child->key) == 0)
630             cf_util_get_string (child, &conf->tlsprotocol);
631         else if (strcasecmp ("CipherSuite", child->key) == 0)
632             cf_util_get_string (child, &conf->ciphersuite);
633         else
634             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
635     }
636
637     ssnprintf (cb_name, sizeof (cb_name), "mqtt/%s", conf->name);
638     memset (&user_data, 0, sizeof (user_data));
639     user_data.data = conf;
640
641     plugin_register_write (cb_name, mqtt_write, &user_data);
642     return (0);
643 } /* mqtt_config_publisher */
644
645 /*
646  * <Subscribe "name">
647  *   Host "example.com"
648  *   Port 1883
649  *   ClientId "collectd"
650  *   User "guest"
651  *   Password "secret"
652  *   Topic "collectd/#"
653  * </Subscribe>
654  */
655 static int mqtt_config_subscriber (oconfig_item_t *ci)
656 {
657     mqtt_client_conf_t **tmp;
658     mqtt_client_conf_t *conf;
659     int status;
660     int i;
661
662     conf = calloc (1, sizeof (*conf));
663     if (conf == NULL)
664     {
665         ERROR ("mqtt plugin: calloc failed.");
666         return (-1);
667     }
668     conf->publish = 0;
669
670     conf->name = NULL;
671     status = cf_util_get_string (ci, &conf->name);
672     if (status != 0)
673     {
674         mqtt_free (conf);
675         return (status);
676     }
677
678     conf->host = strdup (MQTT_DEFAULT_HOST);
679     conf->port = MQTT_DEFAULT_PORT;
680     conf->client_id = NULL;
681     conf->qos = 2;
682     conf->topic = strdup (MQTT_DEFAULT_TOPIC);
683     conf->clean_session = 1;
684
685     status = pthread_mutex_init (&conf->lock, NULL);
686     if (status != 0)
687     {
688       mqtt_free (conf);
689       return (status);
690     }
691
692     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
693
694     for (i = 0; i < ci->children_num; i++)
695     {
696         oconfig_item_t *child = ci->children + i;
697         if (strcasecmp ("Host", child->key) == 0)
698             cf_util_get_string (child, &conf->host);
699         else if (strcasecmp ("Port", child->key) == 0)
700         {
701             status = cf_util_get_port_number (child);
702             if (status < 0)
703                 ERROR ("mqtt plugin: Invalid port number.");
704             else
705                 conf->port = status;
706         }
707         else if (strcasecmp ("ClientId", child->key) == 0)
708             cf_util_get_string (child, &conf->client_id);
709         else if (strcasecmp ("User", child->key) == 0)
710             cf_util_get_string (child, &conf->username);
711         else if (strcasecmp ("Password", child->key) == 0)
712             cf_util_get_string (child, &conf->password);
713         else if (strcasecmp ("QoS", child->key) == 0)
714         {
715             int qos = -1;
716             status = cf_util_get_int (child, &qos);
717             if ((status != 0) || (qos < 0) || (qos > 2))
718                 ERROR ("mqtt plugin: Not a valid QoS setting.");
719             else
720                 conf->qos = qos;
721         }
722         else if (strcasecmp ("Topic", child->key) == 0)
723             cf_util_get_string (child, &conf->topic);
724         else if (strcasecmp ("CleanSession", child->key) == 0)
725             cf_util_get_boolean (child, &conf->clean_session);
726         else
727             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
728     }
729
730     tmp = realloc (subscribers, sizeof (*subscribers) * (subscribers_num + 1) );
731     if (tmp == NULL)
732     {
733         ERROR ("mqtt plugin: realloc failed.");
734         mqtt_free (conf);
735         return (-1);
736     }
737     subscribers = tmp;
738     subscribers[subscribers_num] = conf;
739     subscribers_num++;
740
741     return (0);
742 } /* mqtt_config_subscriber */
743
744 /*
745  * <Plugin mqtt>
746  *   <Publish "name">
747  *     # ...
748  *   </Publish>
749  *   <Subscribe "name">
750  *     # ...
751  *   </Subscribe>
752  * </Plugin>
753  */
754 static int mqtt_config (oconfig_item_t *ci)
755 {
756     int i;
757
758     for (i = 0; i < ci->children_num; i++)
759     {
760         oconfig_item_t *child = ci->children + i;
761
762         if (strcasecmp ("Publish", child->key) == 0)
763             mqtt_config_publisher (child);
764         else if (strcasecmp ("Subscribe", child->key) == 0)
765             mqtt_config_subscriber (child);
766         else
767             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
768     }
769
770     return (0);
771 } /* int mqtt_config */
772
773 static int mqtt_init (void)
774 {
775     size_t i;
776
777     mosquitto_lib_init ();
778
779     for (i = 0; i < subscribers_num; i++)
780     {
781         int status;
782
783         if (subscribers[i]->loop)
784             continue;
785
786         status = plugin_thread_create (&subscribers[i]->thread,
787                 /* attrs = */ NULL,
788                 /* func  = */ subscribers_thread,
789                 /* args  = */ subscribers[i]);
790         if (status != 0)
791         {
792             char errbuf[1024];
793             ERROR ("mqtt plugin: pthread_create failed: %s",
794                     sstrerror (errno, errbuf, sizeof (errbuf)));
795             continue;
796         }
797     }
798
799     return (0);
800 } /* mqtt_init */
801
802 void module_register (void)
803 {
804     plugin_register_complex_config ("mqtt", mqtt_config);
805     plugin_register_init ("mqtt", mqtt_init);
806 } /* void module_register */
807
808 /* vim: set sw=4 sts=4 et fdm=marker : */