2eb11520dce1eccca31427c5707f46e4a02bd125
[collectd.git] / src / mqtt.c
1 /**
2  * collectd - src/mqtt.c
3  * Copyright (C) 2014       Marc Falzon
4  * Copyright (C) 2014,2015  Florian octo Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Marc Falzon <marc at baha dot mu>
26  *   Florian octo Forster <octo at collectd.org>
27  **/
28
29 // Reference: http://mosquitto.org/api/files/mosquitto-h.html
30
31
32 #include "collectd.h"
33 #include "common.h"
34 #include "plugin.h"
35 #include "utils_cache.h"
36 #include "utils_complain.h"
37
38 #include <pthread.h>
39
40 #include <mosquitto.h>
41
42 #define MQTT_MAX_TOPIC_SIZE         1024
43 #define MQTT_MAX_MESSAGE_SIZE       MQTT_MAX_TOPIC_SIZE + 1024
44 #define MQTT_DEFAULT_HOST           "localhost"
45 #define MQTT_DEFAULT_PORT           1883
46 #define MQTT_DEFAULT_TOPIC_PREFIX   "collectd"
47 #define MQTT_DEFAULT_TOPIC          "collectd/#"
48 #ifndef MQTT_KEEPALIVE
49 # define MQTT_KEEPALIVE 60
50 #endif
51
52
53 /*
54  * Data types
55  */
56 struct mqtt_client_conf
57 {
58     _Bool               publish;
59     char               *name;
60
61     struct mosquitto   *mosq;
62     _Bool               connected;
63
64     char               *host;
65     int                 port;
66     char               *client_id;
67     char               *username;
68     char               *password;
69     int                 qos;
70
71     /* For publishing */
72     char               *topic_prefix;
73     _Bool               store_rates;
74     _Bool               retain;
75
76     /* For subscribing */
77     pthread_t           thread;
78     _Bool               loop;
79     char               *topic;
80     _Bool               clean_session;
81
82     c_complain_t        complaint_cantpublish;
83     pthread_mutex_t     lock;
84 };
85 typedef struct mqtt_client_conf mqtt_client_conf_t;
86
87 static mqtt_client_conf_t **subscribers = NULL;
88 static size_t subscribers_num = 0;
89
90 /*
91  * Functions
92  */
93 #if LIBMOSQUITTO_MAJOR == 0
94 static char const *mosquitto_strerror (int code)
95 {
96     switch (code)
97     {
98         case MOSQ_ERR_SUCCESS: return "MOSQ_ERR_SUCCESS";
99         case MOSQ_ERR_NOMEM: return "MOSQ_ERR_NOMEM";
100         case MOSQ_ERR_PROTOCOL: return "MOSQ_ERR_PROTOCOL";
101         case MOSQ_ERR_INVAL: return "MOSQ_ERR_INVAL";
102         case MOSQ_ERR_NO_CONN: return "MOSQ_ERR_NO_CONN";
103         case MOSQ_ERR_CONN_REFUSED: return "MOSQ_ERR_CONN_REFUSED";
104         case MOSQ_ERR_NOT_FOUND: return "MOSQ_ERR_NOT_FOUND";
105         case MOSQ_ERR_CONN_LOST: return "MOSQ_ERR_CONN_LOST";
106         case MOSQ_ERR_SSL: return "MOSQ_ERR_SSL";
107         case MOSQ_ERR_PAYLOAD_SIZE: return "MOSQ_ERR_PAYLOAD_SIZE";
108         case MOSQ_ERR_NOT_SUPPORTED: return "MOSQ_ERR_NOT_SUPPORTED";
109         case MOSQ_ERR_AUTH: return "MOSQ_ERR_AUTH";
110         case MOSQ_ERR_ACL_DENIED: return "MOSQ_ERR_ACL_DENIED";
111         case MOSQ_ERR_UNKNOWN: return "MOSQ_ERR_UNKNOWN";
112         case MOSQ_ERR_ERRNO: return "MOSQ_ERR_ERRNO";
113     }
114
115     return "UNKNOWN ERROR CODE";
116 }
117 #else
118 /* provided by libmosquitto */
119 #endif
120
121 static void mqtt_free (mqtt_client_conf_t *conf)
122 {
123     if (conf == NULL)
124         return;
125
126     if (conf->connected)
127         (void) mosquitto_disconnect (conf->mosq);
128     conf->connected = 0;
129     (void) mosquitto_destroy (conf->mosq);
130
131     sfree (conf->host);
132     sfree (conf->username);
133     sfree (conf->password);
134     sfree (conf->client_id);
135     sfree (conf->topic_prefix);
136     sfree (conf);
137 }
138
139 static char *strip_prefix (char *topic)
140 {
141     size_t num;
142     size_t i;
143
144     num = 0;
145     for (i = 0; topic[i] != 0; i++)
146         if (topic[i] == '/')
147             num++;
148
149     if (num < 2)
150         return (NULL);
151
152     while (num > 2)
153     {
154         char *tmp = strchr (topic, '/');
155         if (tmp == NULL)
156             return (NULL);
157         topic = tmp + 1;
158         num--;
159     }
160
161     return (topic);
162 }
163
164 static void on_message (
165 #if LIBMOSQUITTO_MAJOR == 0
166 #else
167         __attribute__((unused)) struct mosquitto *m,
168 #endif
169         __attribute__((unused)) void *arg,
170         const struct mosquitto_message *msg)
171 {
172     value_list_t vl = VALUE_LIST_INIT;
173     data_set_t const *ds;
174     char *topic;
175     char *name;
176     char *payload;
177     int status;
178
179     if (msg->payloadlen <= 0) {
180         DEBUG ("mqtt plugin: message has empty payload");
181         return;
182     }
183
184     topic = strdup (msg->topic);
185     name = strip_prefix (topic);
186
187     status = parse_identifier_vl (name, &vl);
188     if (status != 0)
189     {
190         ERROR ("mqtt plugin: Unable to parse topic \"%s\".", topic);
191         sfree (topic);
192         return;
193     }
194     sfree (topic);
195
196     ds = plugin_get_ds (vl.type);
197     if (ds == NULL)
198     {
199         ERROR ("mqtt plugin: Unknown type: \"%s\".", vl.type);
200         return;
201     }
202
203     vl.values = calloc (ds->ds_num, sizeof (*vl.values));
204     if (vl.values == NULL)
205     {
206         ERROR ("mqtt plugin: calloc failed.");
207         return;
208     }
209     vl.values_len = ds->ds_num;
210
211     payload = malloc (msg->payloadlen+1);
212     if (payload == NULL)
213     {
214         ERROR ("mqtt plugin: malloc for payload buffer failed.");
215         return;
216     }
217     memmove (payload, msg->payload, msg->payloadlen);
218     payload[msg->payloadlen] = 0;
219
220     DEBUG ("mqtt plugin: payload = \"%s\"", payload);
221     status = parse_values (payload, &vl, ds);
222     if (status != 0)
223     {
224         ERROR ("mqtt plugin: Unable to parse payload \"%s\".", payload);
225         sfree (payload);
226         sfree (vl.values);
227         return;
228     }
229     sfree (payload);
230
231     plugin_dispatch_values (&vl);
232     sfree (vl.values);
233 } /* void on_message */
234
235 /* must hold conf->lock when calling. */
236 static int mqtt_reconnect (mqtt_client_conf_t *conf)
237 {
238     int status;
239
240     if (conf->connected)
241         return (0);
242
243     status = mosquitto_reconnect (conf->mosq);
244     if (status != MOSQ_ERR_SUCCESS)
245     {
246         char errbuf[1024];
247         ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s",
248                 (status == MOSQ_ERR_ERRNO)
249                 ? sstrerror(errno, errbuf, sizeof (errbuf))
250                 : mosquitto_strerror (status));
251         return (-1);
252     }
253
254     conf->connected = 1;
255
256     c_release (LOG_INFO,
257             &conf->complaint_cantpublish,
258             "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
259             conf->host, conf->port);
260
261     return (0);
262 } /* mqtt_reconnect */
263
264 /* must hold conf->lock when calling. */
265 static int mqtt_connect (mqtt_client_conf_t *conf)
266 {
267     char const *client_id;
268     int status;
269
270     if (conf->mosq != NULL)
271         return mqtt_reconnect (conf);
272
273     if (conf->client_id)
274         client_id = conf->client_id;
275     else
276         client_id = hostname_g;
277
278 #if LIBMOSQUITTO_MAJOR == 0
279     conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
280 #else
281     conf->mosq = mosquitto_new (client_id, conf->clean_session, /* user data = */ conf);
282 #endif
283     if (conf->mosq == NULL)
284     {
285         ERROR ("mqtt plugin: mosquitto_new failed");
286         return (-1);
287     }
288
289     if (conf->username && conf->password)
290     {
291         status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password);
292         if (status != MOSQ_ERR_SUCCESS)
293         {
294             char errbuf[1024];
295             ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s",
296                     (status == MOSQ_ERR_ERRNO)
297                     ? sstrerror (errno, errbuf, sizeof (errbuf))
298                     : mosquitto_strerror (status));
299
300             mosquitto_destroy (conf->mosq);
301             conf->mosq = NULL;
302             return (-1);
303         }
304     }
305
306 #if LIBMOSQUITTO_MAJOR == 0
307     status = mosquitto_connect (conf->mosq, conf->host, conf->port,
308             /* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session);
309 #else
310     status = mosquitto_connect (conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
311 #endif
312     if (status != MOSQ_ERR_SUCCESS)
313     {
314         char errbuf[1024];
315         ERROR ("mqtt plugin: mosquitto_connect failed: %s",
316                 (status == MOSQ_ERR_ERRNO)
317                 ? sstrerror (errno, errbuf, sizeof (errbuf))
318                 : mosquitto_strerror (status));
319
320         mosquitto_destroy (conf->mosq);
321         conf->mosq = NULL;
322         return (-1);
323     }
324
325     if (!conf->publish)
326     {
327         mosquitto_message_callback_set (conf->mosq, on_message);
328
329         status = mosquitto_subscribe (conf->mosq,
330                 /* message_id = */ NULL,
331                 conf->topic, conf->qos);
332         if (status != MOSQ_ERR_SUCCESS)
333         {
334             ERROR ("mqtt plugin: Subscribing to \"%s\" failed: %s",
335                     conf->topic, mosquitto_strerror (status));
336
337             mosquitto_disconnect (conf->mosq);
338             mosquitto_destroy (conf->mosq);
339             conf->mosq = NULL;
340             return (-1);
341         }
342     }
343
344     conf->connected = 1;
345     return (0);
346 } /* mqtt_connect */
347
348 static void *subscribers_thread (void *arg)
349 {
350     mqtt_client_conf_t *conf = arg;
351     int status;
352
353     conf->loop = 1;
354
355     while (conf->loop)
356     {
357         status = mqtt_connect (conf);
358         if (status != 0)
359         {
360             sleep (1);
361             continue;
362         }
363
364         /* The documentation says "0" would map to the default (1000ms), but
365          * that does not work on some versions. */
366 #if LIBMOSQUITTO_MAJOR == 0
367         status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
368 #else
369         status = mosquitto_loop (conf->mosq,
370                 /* timeout[ms] = */ 1000,
371                 /* max_packets = */  100);
372 #endif
373         if (status == MOSQ_ERR_CONN_LOST)
374         {
375             conf->connected = 0;
376             continue;
377         }
378         else if (status != MOSQ_ERR_SUCCESS)
379         {
380             ERROR ("mqtt plugin: mosquitto_loop failed: %s",
381                     mosquitto_strerror (status));
382             mosquitto_destroy (conf->mosq);
383             conf->mosq = NULL;
384             conf->connected = 0;
385             continue;
386         }
387
388         DEBUG ("mqtt plugin: mosquitto_loop succeeded.");
389     } /* while (conf->loop) */
390
391     pthread_exit (0);
392 } /* void *subscribers_thread */
393
394 static int publish (mqtt_client_conf_t *conf, char const *topic,
395     void const *payload, size_t payload_len)
396 {
397     int status;
398
399     pthread_mutex_lock (&conf->lock);
400
401     status = mqtt_connect (conf);
402     if (status != 0) {
403         pthread_mutex_unlock (&conf->lock);
404         ERROR ("mqtt plugin: unable to reconnect to broker");
405         return (status);
406     }
407
408     status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
409 #if LIBMOSQUITTO_MAJOR == 0
410             (uint32_t) payload_len, payload,
411 #else
412             (int) payload_len, payload,
413 #endif
414             conf->qos, conf->retain);
415     if (status != MOSQ_ERR_SUCCESS)
416     {
417         char errbuf[1024];
418         c_complain (LOG_ERR,
419             &conf->complaint_cantpublish,
420             "mqtt plugin: mosquitto_publish failed: %s",
421             (status == MOSQ_ERR_ERRNO)
422             ? sstrerror(errno, errbuf, sizeof (errbuf))
423             : mosquitto_strerror(status));
424         /* Mark our connection "down" regardless of the error as a safety
425          * measure; we will try to reconnect the next time we have to publish a
426          * message */
427         conf->connected = 0;
428
429         pthread_mutex_unlock (&conf->lock);
430         return (-1);
431     }
432
433     pthread_mutex_unlock (&conf->lock);
434     return (0);
435 } /* int publish */
436
437 static int format_topic (char *buf, size_t buf_len,
438     data_set_t const *ds, value_list_t const *vl,
439     mqtt_client_conf_t *conf)
440 {
441     char name[MQTT_MAX_TOPIC_SIZE];
442     int status;
443
444     if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
445         return (FORMAT_VL (buf, buf_len, vl));
446
447     status = FORMAT_VL (name, sizeof (name), vl);
448     if (status != 0)
449         return (status);
450
451     status = ssnprintf (buf, buf_len, "%s/%s", conf->topic_prefix, name);
452     if ((status < 0) || (((size_t) status) >= buf_len))
453         return (ENOMEM);
454
455     return (0);
456 } /* int format_topic */
457
458 static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
459     user_data_t *user_data)
460 {
461     mqtt_client_conf_t *conf;
462     char topic[MQTT_MAX_TOPIC_SIZE];
463     char payload[MQTT_MAX_MESSAGE_SIZE];
464     int status = 0;
465
466     if ((user_data == NULL) || (user_data->data == NULL))
467         return (EINVAL);
468     conf = user_data->data;
469
470     status = format_topic (topic, sizeof (topic), ds, vl, conf);
471     if (status != 0)
472     {
473         ERROR ("mqtt plugin: format_topic failed with status %d.", status);
474         return (status);
475     }
476
477     status = format_values (payload, sizeof (payload),
478             ds, vl, conf->store_rates);
479     if (status != 0)
480     {
481         ERROR ("mqtt plugin: format_values failed with status %d.", status);
482         return (status);
483     }
484
485     status = publish (conf, topic, payload, strlen (payload) + 1);
486     if (status != 0)
487     {
488         ERROR ("mqtt plugin: publish failed: %s", mosquitto_strerror (status));
489         return (status);
490     }
491
492     return (status);
493 } /* mqtt_write */
494
495 /*
496  * <Publish "name">
497  *   Host "example.com"
498  *   Port 1883
499  *   ClientId "collectd"
500  *   User "guest"
501  *   Password "secret"
502  *   Prefix "collectd"
503  *   StoreRates true
504  *   Retain false
505  *   QoS 0
506  * </Publish>
507  */
508 static int mqtt_config_publisher (oconfig_item_t *ci)
509 {
510     mqtt_client_conf_t *conf;
511     char cb_name[1024];
512     user_data_t user_data;
513     int status;
514     int i;
515
516     conf = calloc (1, sizeof (*conf));
517     if (conf == NULL)
518     {
519         ERROR ("mqtt plugin: malloc failed.");
520         return (-1);
521     }
522     conf->publish = 1;
523
524     conf->name = NULL;
525     status = cf_util_get_string (ci, &conf->name);
526     if (status != 0)
527     {
528         mqtt_free (conf);
529         return (status);
530     }
531
532     conf->host = strdup (MQTT_DEFAULT_HOST);
533     conf->port = MQTT_DEFAULT_PORT;
534     conf->client_id = NULL;
535     conf->qos = 0;
536     conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
537     conf->store_rates = 1;
538
539     status = pthread_mutex_init (&conf->lock, NULL);
540     if (status != 0)
541     {
542       mqtt_free (conf);
543       return (status);
544     }
545
546     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
547
548     for (i = 0; i < ci->children_num; i++)
549     {
550         oconfig_item_t *child = ci->children + i;
551         if (strcasecmp ("Host", child->key) == 0)
552             cf_util_get_string (child, &conf->host);
553         else if (strcasecmp ("Port", child->key) == 0)
554         {
555             int tmp = cf_util_get_port_number (child);
556             if (tmp < 0)
557                 ERROR ("mqtt plugin: Invalid port number.");
558             else
559                 conf->port = tmp;
560         }
561         else if (strcasecmp ("ClientId", child->key) == 0)
562             cf_util_get_string (child, &conf->client_id);
563         else if (strcasecmp ("User", child->key) == 0)
564             cf_util_get_string (child, &conf->username);
565         else if (strcasecmp ("Password", child->key) == 0)
566             cf_util_get_string (child, &conf->password);
567         else if (strcasecmp ("QoS", child->key) == 0)
568         {
569             int tmp = -1;
570             status = cf_util_get_int (child, &tmp);
571             if ((status != 0) || (tmp < 0) || (tmp > 2))
572                 ERROR ("mqtt plugin: Not a valid QoS setting.");
573             else
574                 conf->qos = tmp;
575         }
576         else if (strcasecmp ("Prefix", child->key) == 0)
577             cf_util_get_string (child, &conf->topic_prefix);
578         else if (strcasecmp ("StoreRates", child->key) == 0)
579             cf_util_get_boolean (child, &conf->store_rates);
580         else if (strcasecmp ("Retain", child->key) == 0)
581             cf_util_get_boolean (child, &conf->retain);
582         else
583             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
584     }
585
586     ssnprintf (cb_name, sizeof (cb_name), "mqtt/%s", conf->name);
587     memset (&user_data, 0, sizeof (user_data));
588     user_data.data = conf;
589
590     plugin_register_write (cb_name, mqtt_write, &user_data);
591     return (0);
592 } /* mqtt_config_publisher */
593
594 /*
595  * <Subscribe "name">
596  *   Host "example.com"
597  *   Port 1883
598  *   ClientId "collectd"
599  *   User "guest"
600  *   Password "secret"
601  *   Topic "collectd/#"
602  * </Publish>
603  */
604 static int mqtt_config_subscriber (oconfig_item_t *ci)
605 {
606     mqtt_client_conf_t **tmp;
607     mqtt_client_conf_t *conf;
608     int status;
609     int i;
610
611     conf = calloc (1, sizeof (*conf));
612     if (conf == NULL)
613     {
614         ERROR ("mqtt plugin: malloc failed.");
615         return (-1);
616     }
617     conf->publish = 0;
618
619     conf->name = NULL;
620     status = cf_util_get_string (ci, &conf->name);
621     if (status != 0)
622     {
623         mqtt_free (conf);
624         return (status);
625     }
626
627     conf->host = strdup (MQTT_DEFAULT_HOST);
628     conf->port = MQTT_DEFAULT_PORT;
629     conf->client_id = NULL;
630     conf->qos = 2;
631     conf->topic = strdup (MQTT_DEFAULT_TOPIC);
632     conf->clean_session = 1;
633
634     status = pthread_mutex_init (&conf->lock, NULL);
635     if (status != 0)
636     {
637       mqtt_free (conf);
638       return (status);
639     }
640
641     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
642
643     for (i = 0; i < ci->children_num; i++)
644     {
645         oconfig_item_t *child = ci->children + i;
646         if (strcasecmp ("Host", child->key) == 0)
647             cf_util_get_string (child, &conf->host);
648         else if (strcasecmp ("Port", child->key) == 0)
649         {
650             int tmp = cf_util_get_port_number (child);
651             if (tmp < 0)
652                 ERROR ("mqtt plugin: Invalid port number.");
653             else
654                 conf->port = tmp;
655         }
656         else if (strcasecmp ("ClientId", child->key) == 0)
657             cf_util_get_string (child, &conf->client_id);
658         else if (strcasecmp ("User", child->key) == 0)
659             cf_util_get_string (child, &conf->username);
660         else if (strcasecmp ("Password", child->key) == 0)
661             cf_util_get_string (child, &conf->password);
662         else if (strcasecmp ("QoS", child->key) == 0)
663         {
664             int tmp = -1;
665             status = cf_util_get_int (child, &tmp);
666             if ((status != 0) || (tmp < 0) || (tmp > 2))
667                 ERROR ("mqtt plugin: Not a valid QoS setting.");
668             else
669                 conf->qos = tmp;
670         }
671         else if (strcasecmp ("Topic", child->key) == 0)
672             cf_util_get_string (child, &conf->topic);
673         else if (strcasecmp ("CleanSession", child->key) == 0)
674             cf_util_get_boolean (child, &conf->clean_session);
675         else
676             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
677     }
678
679     tmp = realloc (subscribers, sizeof (*subscribers) * subscribers_num);
680     if (tmp == NULL)
681     {
682         ERROR ("mqtt plugin: realloc failed.");
683         mqtt_free (conf);
684         return (-1);
685     }
686     subscribers = tmp;
687     subscribers[subscribers_num] = conf;
688     subscribers_num++;
689
690     return (0);
691 } /* mqtt_config_subscriber */
692
693 /*
694  * <Plugin mqtt>
695  *   <Publish "name">
696  *     # ...
697  *   </Publish>
698  *   <Subscribe "name">
699  *     # ...
700  *   </Subscribe>
701  * </Plugin>
702  */
703 static int mqtt_config (oconfig_item_t *ci)
704 {
705     int i;
706
707     for (i = 0; i < ci->children_num; i++)
708     {
709         oconfig_item_t *child = ci->children + i;
710
711         if (strcasecmp ("Publish", child->key) == 0)
712             mqtt_config_publisher (child);
713         else if (strcasecmp ("Subscribe", child->key) == 0)
714             mqtt_config_subscriber (child);
715         else
716             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
717     }
718
719     return (0);
720 } /* int mqtt_config */
721
722 static int mqtt_init (void)
723 {
724     size_t i;
725
726     mosquitto_lib_init ();
727
728     for (i = 0; i < subscribers_num; i++)
729     {
730         int status;
731
732         if (subscribers[i]->loop)
733             continue;
734
735         status = plugin_thread_create (&subscribers[i]->thread,
736                 /* attrs = */ NULL,
737                 /* func  = */ subscribers_thread,
738                 /* args  = */ subscribers[i]);
739         if (status != 0)
740         {
741             char errbuf[1024];
742             ERROR ("mqtt plugin: pthread_create failed: %s",
743                     sstrerror (errno, errbuf, sizeof (errbuf)));
744             continue;
745         }
746     }
747
748     return (0);
749 } /* mqtt_init */
750
751 void module_register (void)
752 {
753     plugin_register_complex_config ("mqtt", mqtt_config);
754     plugin_register_init ("mqtt", mqtt_init);
755 } /* void module_register */
756
757 /* vim: set sw=4 sts=4 et fdm=marker : */