mqtt plugin: Add proof-of-concept subscriber code.
[collectd.git] / src / mqtt.c
1 /**
2  * collectd - src/mqtt.c
3  * Copyright (C) 2014       Marc Falzon <marc at baha dot mu>
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  **/
23
24 // Reference: http://mosquitto.org/api/files/mosquitto-h.html
25
26
27 #include "collectd.h"
28 #include "common.h"
29 #include "plugin.h"
30 #include "utils_cache.h"
31 #include "utils_complain.h"
32
33 #include <pthread.h>
34
35 #include <mosquitto.h>
36
37 #define MQTT_MAX_TOPIC_SIZE         1024
38 #define MQTT_MAX_MESSAGE_SIZE       MQTT_MAX_TOPIC_SIZE + 1024
39 #define MQTT_DEFAULT_HOST           "localhost"
40 #define MQTT_DEFAULT_PORT           1883
41 #define MQTT_DEFAULT_TOPIC_PREFIX   "collectd"
42 #define MQTT_DEFAULT_TOPIC          "collectd/#"
43
44 /*
45  * Data types
46  */
47 struct mqtt_client_conf
48 {
49     _Bool               publish;
50     char               *name;
51
52     struct mosquitto   *mosq;
53     _Bool               connected;
54
55     char               *host;
56     int                 port;
57     char               *client_id;
58     char               *username;
59     char               *password;
60     int                 qos;
61
62     /* For publishing */
63     char               *topic_prefix;
64     _Bool               store_rates;
65     _Bool               retain;
66
67     /* For subscribing */
68     pthread_t           thread;
69     _Bool               loop;
70     char               *topic;
71     _Bool               clean_session;
72
73     c_complain_t        complaint_cantpublish;
74     pthread_mutex_t     lock;
75 };
76 typedef struct mqtt_client_conf mqtt_client_conf_t;
77
78 static mqtt_client_conf_t **subscribers = NULL;
79 static size_t subscribers_num = 0;
80
81 /*
82  * Functions
83  */
84 static char const *mosquitto_strerror (int code)
85 {
86     switch (code)
87     {
88         case MOSQ_ERR_SUCCESS: return "MOSQ_ERR_SUCCESS";
89         case MOSQ_ERR_NOMEM: return "MOSQ_ERR_NOMEM";
90         case MOSQ_ERR_PROTOCOL: return "MOSQ_ERR_PROTOCOL";
91         case MOSQ_ERR_INVAL: return "MOSQ_ERR_INVAL";
92         case MOSQ_ERR_NO_CONN: return "MOSQ_ERR_NO_CONN";
93         case MOSQ_ERR_CONN_REFUSED: return "MOSQ_ERR_CONN_REFUSED";
94         case MOSQ_ERR_NOT_FOUND: return "MOSQ_ERR_NOT_FOUND";
95         case MOSQ_ERR_CONN_LOST: return "MOSQ_ERR_CONN_LOST";
96         case MOSQ_ERR_SSL: return "MOSQ_ERR_SSL";
97         case MOSQ_ERR_PAYLOAD_SIZE: return "MOSQ_ERR_PAYLOAD_SIZE";
98         case MOSQ_ERR_NOT_SUPPORTED: return "MOSQ_ERR_NOT_SUPPORTED";
99         case MOSQ_ERR_AUTH: return "MOSQ_ERR_AUTH";
100         case MOSQ_ERR_ACL_DENIED: return "MOSQ_ERR_ACL_DENIED";
101         case MOSQ_ERR_UNKNOWN: return "MOSQ_ERR_UNKNOWN";
102         case MOSQ_ERR_ERRNO: return "MOSQ_ERR_ERRNO";
103     }
104
105     return "UNKNOWN ERROR CODE";
106 }
107
108 static void mqtt_free (mqtt_client_conf_t *conf)
109 {
110     if (conf == NULL)
111         return;
112
113     if (conf->connected)
114         (void) mosquitto_disconnect (conf->mosq);
115     conf->connected = 0;
116     (void) mosquitto_destroy (conf->mosq);
117
118     sfree (conf->host);
119     sfree (conf->username);
120     sfree (conf->password);
121     sfree (conf->client_id);
122     sfree (conf->topic_prefix);
123     sfree (conf);
124 }
125
126 static char *strip_prefix (char *topic)
127 {
128     size_t num;
129     size_t i;
130
131     num = 0;
132     for (i = 0; topic[i] != 0; i++)
133         if (topic[i] == '/')
134             num++;
135
136     if (num < 2)
137         return (NULL);
138
139     while (num > 2)
140     {
141         char *tmp = strchr (topic, '/');
142         if (tmp == NULL)
143             return (NULL);
144         topic = tmp + 1;
145         num--;
146     }
147
148     return (topic);
149 }
150
151 static void on_message (__attribute__((unused)) void *arg,
152         const struct mosquitto_message *msg)
153 {
154     value_list_t vl = VALUE_LIST_INIT;
155     data_set_t const *ds;
156     char *topic;
157     char *name;
158     char *payload;
159     int status;
160
161     if ((msg->payloadlen <= 0) || (msg->payload[msg->payloadlen - 1] != 0))
162         return;
163
164     topic = strdup (msg->topic);
165     name = strip_prefix (topic);
166
167     status = parse_identifier_vl (name, &vl);
168     if (status != 0)
169     {
170         ERROR ("mqtt plugin: Unable to parse topic \"%s\".", topic);
171         sfree (topic);
172         return;
173     }
174     sfree (topic);
175
176     ds = plugin_get_ds (vl.type);
177     if (ds == NULL)
178     {
179         ERROR ("mqtt plugin: Unknown type: \"%s\".", vl.type);
180         return;
181     }
182
183     vl.values = calloc (ds->ds_num, sizeof (*vl.values));
184     if (vl.values == NULL)
185     {
186         ERROR ("mqtt plugin: calloc failed.");
187         return;
188     }
189     vl.values_len = ds->ds_num;
190
191     payload = strdup ((void *) msg->payload);
192     DEBUG ("mqtt plugin: payload = \"%s\"", payload);
193     status = parse_values (payload, &vl, ds);
194     if (status != 0)
195     {
196         ERROR ("mqtt plugin: Unable to parse payload \"%s\".", payload);
197         sfree (payload);
198         sfree (vl.values);
199         return;
200     }
201     sfree (payload);
202
203     plugin_dispatch_values (&vl);
204     sfree (vl.values);
205 } /* void on_message */
206
207 /* must hold conf->lock when calling. */
208 static int mqtt_reconnect (mqtt_client_conf_t *conf)
209 {
210     int status;
211
212     if (conf->connected)
213         return (0);
214
215     status = mosquitto_reconnect (conf->mosq);
216     if (status != MOSQ_ERR_SUCCESS)
217     {
218         char errbuf[1024];
219         ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s",
220                 (status == MOSQ_ERR_ERRNO)
221                 ? sstrerror(errno, errbuf, sizeof (errbuf))
222                 : mosquitto_strerror (status));
223         return (-1);
224     }
225
226     conf->connected = 1;
227
228     c_release (LOG_INFO,
229             &conf->complaint_cantpublish,
230             "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
231             conf->host, conf->port);
232
233     return (0);
234 } /* mqtt_reconnect */
235
236 /* must hold conf->lock when calling. */
237 static int mqtt_connect (mqtt_client_conf_t *conf)
238 {
239     char const *client_id;
240     int status;
241
242     if (conf->mosq != NULL)
243         return mqtt_reconnect (conf);
244
245     if (conf->client_id)
246         client_id = conf->client_id;
247     else
248         client_id = hostname_g;
249
250     conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
251     if (conf->mosq == NULL)
252     {
253         ERROR ("mqtt plugin: mosquitto_new failed");
254         return (-1);
255     }
256
257     if (conf->username && conf->password)
258     {
259         status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password);
260         if (status != MOSQ_ERR_SUCCESS)
261         {
262             char errbuf[1024];
263             ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s",
264                     (status == MOSQ_ERR_ERRNO)
265                     ? sstrerror (errno, errbuf, sizeof (errbuf))
266                     : mosquitto_strerror (status));
267
268             mosquitto_destroy (conf->mosq);
269             conf->mosq = NULL;
270             return (-1);
271         }
272     }
273
274     status = mosquitto_connect (conf->mosq, conf->host, conf->port,
275             /* keepalive = */ 10, /* clean session = */ conf->clean_session);
276     if (status != MOSQ_ERR_SUCCESS)
277     {
278         char errbuf[1024];
279         ERROR ("mqtt plugin: mosquitto_connect failed: %s",
280                 (status == MOSQ_ERR_ERRNO)
281                 ? sstrerror (errno, errbuf, sizeof (errbuf))
282                 : mosquitto_strerror (status));
283
284         mosquitto_destroy (conf->mosq);
285         conf->mosq = NULL;
286         return (-1);
287     }
288
289     if (!conf->publish)
290     {
291         mosquitto_message_callback_set (conf->mosq, on_message);
292
293         status = mosquitto_subscribe (conf->mosq, /* mid = */ NULL,
294                 conf->topic, conf->qos);
295         if (status != MOSQ_ERR_SUCCESS)
296         {
297             ERROR ("mqtt plugin: Subscribing to \"%s\" failed: %s",
298                     conf->topic, mosquitto_strerror (status));
299
300             mosquitto_disconnect (conf->mosq);
301             mosquitto_destroy (conf->mosq);
302             conf->mosq = NULL;
303             return (-1);
304         }
305     }
306
307     conf->connected = 1;
308     return (0);
309 } /* mqtt_connect */
310
311 static void *subscribers_thread (void *arg)
312 {
313     mqtt_client_conf_t *conf = arg;
314     int status;
315
316     conf->loop = 1;
317
318     while (conf->loop)
319     {
320         status = mqtt_connect (conf);
321         if (status != 0)
322         {
323             sleep (1);
324             continue;
325         }
326
327         /* The documentation says "0" would map to the default (1000ms), but
328          * that does not work on some versions. */
329         status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
330         if (status == MOSQ_ERR_CONN_LOST)
331         {
332             conf->connected = 0;
333             continue;
334         }
335         else if (status != MOSQ_ERR_SUCCESS)
336         {
337             ERROR ("mqtt plugin: mosquitto_loop failed: %s",
338                     mosquitto_strerror (status));
339             mosquitto_destroy (conf->mosq);
340             conf->mosq = NULL;
341             conf->connected = 0;
342             continue;
343         }
344
345         DEBUG ("mqtt plugin: mosquitto_loop succeeded.");
346     } /* while (conf->loop) */
347
348     pthread_exit (0);
349 } /* void *subscribers_thread */
350
351 static int publish (mqtt_client_conf_t *conf, char const *topic,
352     void const *payload, size_t payload_len)
353 {
354     int status;
355
356     pthread_mutex_lock (&conf->lock);
357
358     status = mqtt_connect (conf);
359     if (status != 0) {
360         pthread_mutex_unlock (&conf->lock);
361         ERROR ("mqtt plugin: unable to reconnect to broker");
362         return (status);
363     }
364
365     status = mosquitto_publish(conf->mosq,
366             /* message id */ NULL,
367             topic,
368             (uint32_t) payload_len, payload,
369             /* qos */ conf->qos,
370             /* retain */ conf->retain);
371     if (status != MOSQ_ERR_SUCCESS)
372     {
373         char errbuf[1024];
374         c_complain (LOG_ERR,
375                 &conf->complaint_cantpublish,
376                 "plugin mqtt: mosquitto_publish failed: %s",
377                 status == MOSQ_ERR_ERRNO ?
378                 sstrerror(errno, errbuf, sizeof (errbuf)) :
379                 mosquitto_strerror(status));
380         /* Mark our connection "down" regardless of the error as a safety
381          * measure; we will try to reconnect the next time we have to publish a
382          * message */
383         conf->connected = 0;
384
385         pthread_mutex_unlock (&conf->lock);
386         return (-1);
387     }
388
389     pthread_mutex_unlock (&conf->lock);
390     return (0);
391 } /* int publish */
392
393 static int format_topic (char *buf, size_t buf_len,
394     data_set_t const *ds, value_list_t const *vl,
395     mqtt_client_conf_t *conf)
396 {
397     char name[MQTT_MAX_TOPIC_SIZE];
398     int status;
399
400     if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
401         return (FORMAT_VL (buf, buf_len, vl));
402
403     status = FORMAT_VL (name, sizeof (name), vl);
404     if (status != 0)
405         return (status);
406
407     status = ssnprintf (buf, buf_len, "%s/%s", conf->topic_prefix, name);
408     if ((status < 0) || (((size_t) status) >= buf_len))
409         return (ENOMEM);
410
411     return (0);
412 } /* int format_topic */
413
414 static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
415     user_data_t *user_data)
416 {
417     mqtt_client_conf_t *conf;
418     char topic[MQTT_MAX_TOPIC_SIZE];
419     char payload[MQTT_MAX_MESSAGE_SIZE];
420     int status = 0;
421
422     if ((user_data == NULL) || (user_data->data == NULL))
423         return (EINVAL);
424     conf = user_data->data;
425
426     status = format_topic (topic, sizeof (topic), ds, vl, conf);
427     if (status != 0)
428     {
429         ERROR ("mqtt plugin: format_topic failed with status %d.", status);
430         return (status);
431     }
432
433     status = format_values (payload, sizeof (payload),
434             ds, vl, conf->store_rates);
435     if (status != 0)
436     {
437         ERROR ("mqtt plugin: format_values failed with status %d.", status);
438         return (status);
439     }
440
441     status = publish (conf, topic, payload, strlen (payload) + 1);
442     if (status != 0)
443     {
444         ERROR ("mqtt plugin: publish failed: %s", mosquitto_strerror (status));
445         return (status);
446     }
447
448     return (status);
449 } /* mqtt_write */
450
451 /*
452  * <Publish "name">
453  *   Host "example.com"
454  *   Port 1883
455  *   ClientId "collectd"
456  *   User "guest"
457  *   Password "secret"
458  *   Prefix "collectd"
459  *   StoreRates true
460  *   Retain false
461  *   QoS 0
462  * </Publish>
463  */
464 static int mqtt_config_publisher (oconfig_item_t *ci)
465 {
466     mqtt_client_conf_t *conf;
467     user_data_t user_data;
468     int status;
469     int i;
470
471     conf = calloc (1, sizeof (*conf));
472     if (conf == NULL)
473     {
474         ERROR ("mqtt plugin: malloc failed.");
475         return (-1);
476     }
477     conf->publish = 1;
478
479     conf->name = NULL;
480     status = cf_util_get_string (ci, &conf->name);
481     if (status != 0)
482     {
483         mqtt_free (conf);
484         return (status);
485     }
486
487     conf->host = strdup (MQTT_DEFAULT_HOST);
488     conf->port = MQTT_DEFAULT_PORT;
489     conf->client_id = NULL;
490     conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
491
492     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
493
494     for (i = 0; i < ci->children_num; i++)
495     {
496         oconfig_item_t *child = ci->children + i;
497         if (strcasecmp ("Host", child->key) == 0)
498             cf_util_get_string (child, &conf->host);
499         else if (strcasecmp ("Port", child->key) == 0)
500         {
501             int tmp = cf_util_get_port_number (child);
502             if (tmp < 0)
503                 ERROR ("mqtt plugin: Invalid port number.");
504             else
505                 conf->port = tmp;
506         }
507         else if (strcasecmp ("ClientId", child->key) == 0)
508             cf_util_get_string (child, &conf->client_id);
509         else if (strcasecmp ("User", child->key) == 0)
510             cf_util_get_string (child, &conf->username);
511         else if (strcasecmp ("Password", child->key) == 0)
512             cf_util_get_string (child, &conf->password);
513         else if (strcasecmp ("QoS", child->key) == 0)
514         {
515             int tmp = -1;
516             status = cf_util_get_int (child, &tmp);
517             if ((status != 0) || (tmp < 0) || (tmp > 2))
518                 ERROR ("mqtt plugin: Not a valid QoS setting.");
519             else
520                 conf->qos = tmp;
521         }
522         else if (strcasecmp ("Prefix", child->key) == 0)
523             cf_util_get_string (child, &conf->topic_prefix);
524         else if (strcasecmp ("StoreRates", child->key) == 0)
525             cf_util_get_boolean (child, &conf->store_rates);
526         else if (strcasecmp ("Retain", child->key) == 0)
527             cf_util_get_boolean (child, &conf->retain);
528         else
529             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
530     }
531
532     memset (&user_data, 0, sizeof (user_data));
533     user_data.data = conf;
534
535     plugin_register_write ("mqtt", mqtt_write, &user_data);
536     return (0);
537 } /* mqtt_config_publisher */
538
539 /*
540  * <Subscribe "name">
541  *   Host "example.com"
542  *   Port 1883
543  *   ClientId "collectd"
544  *   User "guest"
545  *   Password "secret"
546  *   Topic "collectd/#"
547  * </Publish>
548  */
549 static int mqtt_config_subscriber (oconfig_item_t *ci)
550 {
551     mqtt_client_conf_t **tmp;
552     mqtt_client_conf_t *conf;
553     int status;
554     int i;
555
556     conf = calloc (1, sizeof (*conf));
557     if (conf == NULL)
558     {
559         ERROR ("mqtt plugin: malloc failed.");
560         return (-1);
561     }
562     conf->publish = 0;
563
564     conf->name = NULL;
565     status = cf_util_get_string (ci, &conf->name);
566     if (status != 0)
567     {
568         mqtt_free (conf);
569         return (status);
570     }
571
572     conf->host = strdup (MQTT_DEFAULT_HOST);
573     conf->port = MQTT_DEFAULT_PORT;
574     conf->client_id = NULL;
575     conf->topic = strdup (MQTT_DEFAULT_TOPIC);
576
577     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
578
579     for (i = 0; i < ci->children_num; i++)
580     {
581         oconfig_item_t *child = ci->children + i;
582         if (strcasecmp ("Host", child->key) == 0)
583             cf_util_get_string (child, &conf->host);
584         else if (strcasecmp ("Port", child->key) == 0)
585         {
586             int tmp = cf_util_get_port_number (child);
587             if (tmp < 0)
588                 ERROR ("mqtt plugin: Invalid port number.");
589             else
590                 conf->port = tmp;
591         }
592         else if (strcasecmp ("ClientId", child->key) == 0)
593             cf_util_get_string (child, &conf->client_id);
594         else if (strcasecmp ("User", child->key) == 0)
595             cf_util_get_string (child, &conf->username);
596         else if (strcasecmp ("Password", child->key) == 0)
597             cf_util_get_string (child, &conf->password);
598         else if (strcasecmp ("QoS", child->key) == 0)
599         {
600             int tmp = -1;
601             status = cf_util_get_int (child, &tmp);
602             if ((status != 0) || (tmp < 0) || (tmp > 2))
603                 ERROR ("mqtt plugin: Not a valid QoS setting.");
604             else
605                 conf->qos = tmp;
606         }
607         else if (strcasecmp ("Topic", child->key) == 0)
608             cf_util_get_string (child, &conf->topic);
609         else if (strcasecmp ("CleanSession", child->key) == 0)
610             cf_util_get_boolean (child, &conf->clean_session);
611         else
612             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
613     }
614
615     tmp = realloc (subscribers, sizeof (*subscribers) * subscribers_num);
616     if (tmp == NULL)
617     {
618         ERROR ("mqtt plugin: realloc failed.");
619         mqtt_free (conf);
620         return (-1);
621     }
622     subscribers = tmp;
623     subscribers[subscribers_num] = conf;
624     subscribers_num++;
625
626     return (0);
627 } /* mqtt_config_subscriber */
628
629 /*
630  * <Plugin mqtt>
631  *   <Publish "name">
632  *     # ...
633  *   </Publish>
634  *   <Subscribe "name">
635  *     # ...
636  *   </Subscribe>
637  * </Plugin>
638  */
639 static int mqtt_config (oconfig_item_t *ci)
640 {
641     int i;
642
643     for (i = 0; i < ci->children_num; i++)
644     {
645         oconfig_item_t *child = ci->children + i;
646
647         if (strcasecmp ("Publish", child->key) == 0)
648             mqtt_config_publisher (child);
649         else if (strcasecmp ("Subscribe", child->key) == 0)
650             mqtt_config_subscriber (child);
651         else
652             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
653     }
654
655     return (0);
656 } /* int mqtt_config */
657
658 static int mqtt_init (void)
659 {
660     size_t i;
661
662     mosquitto_lib_init ();
663
664     for (i = 0; i < subscribers_num; i++)
665     {
666         int status;
667
668         if (subscribers[i]->loop)
669             continue;
670
671         status = plugin_thread_create (&subscribers[i]->thread,
672                 /* attrs = */ NULL,
673                 /* func  = */ subscribers_thread,
674                 /* args  = */ subscribers[i]);
675         if (status != 0)
676         {
677             char errbuf[1024];
678             ERROR ("mqtt plugin: pthread_create failed: %s",
679                     sstrerror (errno, errbuf, sizeof (errbuf)));
680             continue;
681         }
682     }
683
684     return (0);
685 } /* mqtt_init */
686
687 void module_register (void)
688 {
689     plugin_register_complex_config ("mqtt", mqtt_config);
690     plugin_register_init ("mqtt", mqtt_init);
691 } /* void module_register */
692
693 /* vim: set sw=4 sts=4 et fdm=marker : */