2 * collectd - src/mqtt.c
3 * Copyright (C) 2014 Marc Falzon
4 * Copyright (C) 2014,2015 Florian octo Forster
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, including without limitation
9 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 * and/or sell copies of the Software, and to permit persons to whom the
11 * Software is furnished to do so, subject to the following conditions:
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
25 * Marc Falzon <marc at baha dot mu>
26 * Florian octo Forster <octo at collectd.org>
27 * Jan-Piet Mens <jpmens at gmail.com>
30 // Reference: http://mosquitto.org/api/files/mosquitto-h.html
35 #include "utils/common/common.h"
36 #include "utils_complain.h"
38 #include <mosquitto.h>
40 #define MQTT_MAX_TOPIC_SIZE 1024
41 #define MQTT_MAX_MESSAGE_SIZE MQTT_MAX_TOPIC_SIZE + 1024
42 #define MQTT_DEFAULT_HOST "localhost"
43 #define MQTT_DEFAULT_PORT 1883
44 #define MQTT_DEFAULT_TOPIC_PREFIX "collectd"
45 #define MQTT_DEFAULT_TOPIC "collectd/#"
46 #ifndef MQTT_KEEPALIVE
47 #define MQTT_KEEPALIVE 60
49 #ifndef SSL_VERIFY_PEER
50 #define SSL_VERIFY_PEER 1
56 struct mqtt_client_conf {
60 struct mosquitto *mosq;
69 char *cacertificatefile;
70 char *certificatefile;
71 char *certificatekeyfile;
86 c_complain_t complaint_cantpublish;
89 typedef struct mqtt_client_conf mqtt_client_conf_t;
91 static mqtt_client_conf_t **subscribers;
92 static size_t subscribers_num;
97 #if LIBMOSQUITTO_MAJOR == 0
98 static char const *mosquitto_strerror(int code) {
100 case MOSQ_ERR_SUCCESS:
101 return "MOSQ_ERR_SUCCESS";
103 return "MOSQ_ERR_NOMEM";
104 case MOSQ_ERR_PROTOCOL:
105 return "MOSQ_ERR_PROTOCOL";
107 return "MOSQ_ERR_INVAL";
108 case MOSQ_ERR_NO_CONN:
109 return "MOSQ_ERR_NO_CONN";
110 case MOSQ_ERR_CONN_REFUSED:
111 return "MOSQ_ERR_CONN_REFUSED";
112 case MOSQ_ERR_NOT_FOUND:
113 return "MOSQ_ERR_NOT_FOUND";
114 case MOSQ_ERR_CONN_LOST:
115 return "MOSQ_ERR_CONN_LOST";
117 return "MOSQ_ERR_SSL";
118 case MOSQ_ERR_PAYLOAD_SIZE:
119 return "MOSQ_ERR_PAYLOAD_SIZE";
120 case MOSQ_ERR_NOT_SUPPORTED:
121 return "MOSQ_ERR_NOT_SUPPORTED";
123 return "MOSQ_ERR_AUTH";
124 case MOSQ_ERR_ACL_DENIED:
125 return "MOSQ_ERR_ACL_DENIED";
126 case MOSQ_ERR_UNKNOWN:
127 return "MOSQ_ERR_UNKNOWN";
129 return "MOSQ_ERR_ERRNO";
132 return "UNKNOWN ERROR CODE";
135 /* provided by libmosquitto */
138 static void mqtt_free(mqtt_client_conf_t *conf) {
143 (void)mosquitto_disconnect(conf->mosq);
144 conf->connected = false;
145 (void)mosquitto_destroy(conf->mosq);
148 sfree(conf->username);
149 sfree(conf->password);
150 sfree(conf->client_id);
151 sfree(conf->topic_prefix);
155 static char *strip_prefix(char *topic) {
158 for (size_t i = 0; topic[i] != 0; i++)
166 char *tmp = strchr(topic, '/');
176 static void on_message(
177 #if LIBMOSQUITTO_MAJOR == 0
179 __attribute__((unused)) struct mosquitto *m,
181 __attribute__((unused)) void *arg, const struct mosquitto_message *msg) {
182 value_list_t vl = VALUE_LIST_INIT;
183 data_set_t const *ds;
189 if (msg->payloadlen <= 0) {
190 DEBUG("mqtt plugin: message has empty payload");
194 topic = strdup(msg->topic);
195 name = strip_prefix(topic);
197 status = parse_identifier_vl(name, &vl);
199 ERROR("mqtt plugin: Unable to parse topic \"%s\".", topic);
205 ds = plugin_get_ds(vl.type);
207 ERROR("mqtt plugin: Unknown type: \"%s\".", vl.type);
211 vl.values = calloc(ds->ds_num, sizeof(*vl.values));
212 if (vl.values == NULL) {
213 ERROR("mqtt plugin: calloc failed.");
216 vl.values_len = ds->ds_num;
218 payload = malloc(msg->payloadlen + 1);
219 if (payload == NULL) {
220 ERROR("mqtt plugin: malloc for payload buffer failed.");
224 memmove(payload, msg->payload, msg->payloadlen);
225 payload[msg->payloadlen] = 0;
227 DEBUG("mqtt plugin: payload = \"%s\"", payload);
228 status = parse_values(payload, &vl, ds);
230 ERROR("mqtt plugin: Unable to parse payload \"%s\".", payload);
237 plugin_dispatch_values(&vl);
239 } /* void on_message */
241 /* must hold conf->lock when calling. */
242 static int mqtt_reconnect(mqtt_client_conf_t *conf) {
248 status = mosquitto_reconnect(conf->mosq);
249 if (status != MOSQ_ERR_SUCCESS) {
250 ERROR("mqtt_connect_broker: mosquitto_connect failed: %s",
251 (status == MOSQ_ERR_ERRNO) ? STRERRNO : mosquitto_strerror(status));
255 conf->connected = true;
257 c_release(LOG_INFO, &conf->complaint_cantpublish,
258 "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
259 conf->host, conf->port);
262 } /* mqtt_reconnect */
264 /* must hold conf->lock when calling. */
265 static int mqtt_connect(mqtt_client_conf_t *conf) {
266 char const *client_id;
269 if (conf->mosq != NULL)
270 return mqtt_reconnect(conf);
273 client_id = conf->client_id;
275 client_id = hostname_g;
277 #if LIBMOSQUITTO_MAJOR == 0
278 conf->mosq = mosquitto_new(client_id, /* user data = */ conf);
281 mosquitto_new(client_id, conf->clean_session, /* user data = */ conf);
283 if (conf->mosq == NULL) {
284 ERROR("mqtt plugin: mosquitto_new failed");
288 #if LIBMOSQUITTO_MAJOR != 0
289 if (conf->cacertificatefile) {
290 status = mosquitto_tls_set(conf->mosq, conf->cacertificatefile, NULL,
291 conf->certificatefile, conf->certificatekeyfile,
292 /* pw_callback */ NULL);
293 if (status != MOSQ_ERR_SUCCESS) {
294 ERROR("mqtt plugin: cannot mosquitto_tls_set: %s",
295 mosquitto_strerror(status));
296 mosquitto_destroy(conf->mosq);
301 status = mosquitto_tls_opts_set(conf->mosq, SSL_VERIFY_PEER,
302 conf->tlsprotocol, conf->ciphersuite);
303 if (status != MOSQ_ERR_SUCCESS) {
304 ERROR("mqtt plugin: cannot mosquitto_tls_opts_set: %s",
305 mosquitto_strerror(status));
306 mosquitto_destroy(conf->mosq);
311 status = mosquitto_tls_insecure_set(conf->mosq, false);
312 if (status != MOSQ_ERR_SUCCESS) {
313 ERROR("mqtt plugin: cannot mosquitto_tls_insecure_set: %s",
314 mosquitto_strerror(status));
315 mosquitto_destroy(conf->mosq);
322 if (conf->username && conf->password) {
324 mosquitto_username_pw_set(conf->mosq, conf->username, conf->password);
325 if (status != MOSQ_ERR_SUCCESS) {
326 ERROR("mqtt plugin: mosquitto_username_pw_set failed: %s",
327 (status == MOSQ_ERR_ERRNO) ? STRERRNO : mosquitto_strerror(status));
329 mosquitto_destroy(conf->mosq);
335 #if LIBMOSQUITTO_MAJOR == 0
336 status = mosquitto_connect(conf->mosq, conf->host, conf->port,
337 /* keepalive = */ MQTT_KEEPALIVE,
338 /* clean session = */ conf->clean_session);
341 mosquitto_connect(conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
343 if (status != MOSQ_ERR_SUCCESS) {
344 ERROR("mqtt plugin: mosquitto_connect failed: %s",
345 (status == MOSQ_ERR_ERRNO) ? STRERRNO : mosquitto_strerror(status));
347 mosquitto_destroy(conf->mosq);
352 if (!conf->publish) {
353 mosquitto_message_callback_set(conf->mosq, on_message);
356 mosquitto_subscribe(conf->mosq,
357 /* message_id = */ NULL, conf->topic, conf->qos);
358 if (status != MOSQ_ERR_SUCCESS) {
359 ERROR("mqtt plugin: Subscribing to \"%s\" failed: %s", conf->topic,
360 mosquitto_strerror(status));
362 mosquitto_disconnect(conf->mosq);
363 mosquitto_destroy(conf->mosq);
369 conf->connected = true;
373 static void *subscribers_thread(void *arg) {
374 mqtt_client_conf_t *conf = arg;
380 status = mqtt_connect(conf);
386 /* The documentation says "0" would map to the default (1000ms), but
387 * that does not work on some versions. */
388 #if LIBMOSQUITTO_MAJOR == 0
389 status = mosquitto_loop(conf->mosq, /* timeout = */ 1000 /* ms */);
391 status = mosquitto_loop(conf->mosq,
392 /* timeout[ms] = */ 1000,
393 /* max_packets = */ 100);
395 if (status == MOSQ_ERR_CONN_LOST) {
396 conf->connected = false;
398 } else if (status != MOSQ_ERR_SUCCESS) {
399 ERROR("mqtt plugin: mosquitto_loop failed: %s",
400 mosquitto_strerror(status));
401 mosquitto_destroy(conf->mosq);
403 conf->connected = false;
407 DEBUG("mqtt plugin: mosquitto_loop succeeded.");
408 } /* while (conf->loop) */
411 } /* void *subscribers_thread */
413 static int publish(mqtt_client_conf_t *conf, char const *topic,
414 void const *payload, size_t payload_len) {
417 pthread_mutex_lock(&conf->lock);
419 status = mqtt_connect(conf);
421 pthread_mutex_unlock(&conf->lock);
422 ERROR("mqtt plugin: unable to reconnect to broker");
426 status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
427 #if LIBMOSQUITTO_MAJOR == 0
428 (uint32_t)payload_len, payload,
430 (int)payload_len, payload,
432 conf->qos, conf->retain);
433 if (status != MOSQ_ERR_SUCCESS) {
434 c_complain(LOG_ERR, &conf->complaint_cantpublish,
435 "mqtt plugin: mosquitto_publish failed: %s",
436 (status == MOSQ_ERR_ERRNO) ? STRERRNO
437 : mosquitto_strerror(status));
438 /* Mark our connection "down" regardless of the error as a safety
439 * measure; we will try to reconnect the next time we have to publish a
441 conf->connected = false;
442 mosquitto_disconnect(conf->mosq);
444 pthread_mutex_unlock(&conf->lock);
448 pthread_mutex_unlock(&conf->lock);
452 static int format_topic(char *buf, size_t buf_len, data_set_t const *ds,
453 value_list_t const *vl, mqtt_client_conf_t *conf) {
454 char name[MQTT_MAX_TOPIC_SIZE];
458 if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
459 return FORMAT_VL(buf, buf_len, vl);
461 status = FORMAT_VL(name, sizeof(name), vl);
465 status = ssnprintf(buf, buf_len, "%s/%s", conf->topic_prefix, name);
466 if ((status < 0) || (((size_t)status) >= buf_len))
469 while ((c = strchr(buf, '#')) || (c = strchr(buf, '+'))) {
474 } /* int format_topic */
476 static int mqtt_write(const data_set_t *ds, const value_list_t *vl,
477 user_data_t *user_data) {
478 mqtt_client_conf_t *conf;
479 char topic[MQTT_MAX_TOPIC_SIZE];
480 char payload[MQTT_MAX_MESSAGE_SIZE];
483 if ((user_data == NULL) || (user_data->data == NULL))
485 conf = user_data->data;
487 status = format_topic(topic, sizeof(topic), ds, vl, conf);
489 ERROR("mqtt plugin: format_topic failed with status %d.", status);
493 status = format_values(payload, sizeof(payload), ds, vl, conf->store_rates);
495 ERROR("mqtt plugin: format_values failed with status %d.", status);
499 status = publish(conf, topic, payload, strlen(payload) + 1);
501 ERROR("mqtt plugin: publish failed: %s", mosquitto_strerror(status));
512 * ClientId "collectd"
519 * CACert "ca.pem" Enables TLS if set
520 * CertificateFile "client-cert.pem" optional
521 * CertificateKeyFile "client-key.pem" optional
522 * TLSProtocol "tlsv1.2" optional
525 static int mqtt_config_publisher(oconfig_item_t *ci) {
526 mqtt_client_conf_t *conf;
530 conf = calloc(1, sizeof(*conf));
532 ERROR("mqtt plugin: calloc failed.");
535 conf->publish = true;
538 status = cf_util_get_string(ci, &conf->name);
544 conf->host = strdup(MQTT_DEFAULT_HOST);
545 conf->port = MQTT_DEFAULT_PORT;
546 conf->client_id = NULL;
548 conf->topic_prefix = strdup(MQTT_DEFAULT_TOPIC_PREFIX);
549 conf->store_rates = true;
551 status = pthread_mutex_init(&conf->lock, NULL);
557 C_COMPLAIN_INIT(&conf->complaint_cantpublish);
559 for (int i = 0; i < ci->children_num; i++) {
560 oconfig_item_t *child = ci->children + i;
561 if (strcasecmp("Host", child->key) == 0)
562 cf_util_get_string(child, &conf->host);
563 else if (strcasecmp("Port", child->key) == 0) {
564 int tmp = cf_util_get_port_number(child);
566 ERROR("mqtt plugin: Invalid port number.");
569 } else if (strcasecmp("ClientId", child->key) == 0)
570 cf_util_get_string(child, &conf->client_id);
571 else if (strcasecmp("User", child->key) == 0)
572 cf_util_get_string(child, &conf->username);
573 else if (strcasecmp("Password", child->key) == 0)
574 cf_util_get_string(child, &conf->password);
575 else if (strcasecmp("QoS", child->key) == 0) {
577 status = cf_util_get_int(child, &tmp);
578 if ((status != 0) || (tmp < 0) || (tmp > 2))
579 ERROR("mqtt plugin: Not a valid QoS setting.");
582 } else if (strcasecmp("Prefix", child->key) == 0)
583 cf_util_get_string(child, &conf->topic_prefix);
584 else if (strcasecmp("StoreRates", child->key) == 0)
585 cf_util_get_boolean(child, &conf->store_rates);
586 else if (strcasecmp("Retain", child->key) == 0)
587 cf_util_get_boolean(child, &conf->retain);
588 else if (strcasecmp("CACert", child->key) == 0)
589 cf_util_get_string(child, &conf->cacertificatefile);
590 else if (strcasecmp("CertificateFile", child->key) == 0)
591 cf_util_get_string(child, &conf->certificatefile);
592 else if (strcasecmp("CertificateKeyFile", child->key) == 0)
593 cf_util_get_string(child, &conf->certificatekeyfile);
594 else if (strcasecmp("TLSProtocol", child->key) == 0)
595 cf_util_get_string(child, &conf->tlsprotocol);
596 else if (strcasecmp("CipherSuite", child->key) == 0)
597 cf_util_get_string(child, &conf->ciphersuite);
599 ERROR("mqtt plugin: Unknown config option: %s", child->key);
602 ssnprintf(cb_name, sizeof(cb_name), "mqtt/%s", conf->name);
603 plugin_register_write(cb_name, mqtt_write,
608 } /* mqtt_config_publisher */
614 * ClientId "collectd"
618 * CACert "ca.pem" Enables TLS if set
619 * CertificateFile "client-cert.pem" optional
620 * CertificateKeyFile "client-key.pem" optional
621 * TLSProtocol "tlsv1.2" optional
624 static int mqtt_config_subscriber(oconfig_item_t *ci) {
625 mqtt_client_conf_t **tmp;
626 mqtt_client_conf_t *conf;
629 conf = calloc(1, sizeof(*conf));
631 ERROR("mqtt plugin: calloc failed.");
634 conf->publish = false;
637 status = cf_util_get_string(ci, &conf->name);
643 conf->host = strdup(MQTT_DEFAULT_HOST);
644 conf->port = MQTT_DEFAULT_PORT;
645 conf->client_id = NULL;
647 conf->topic = strdup(MQTT_DEFAULT_TOPIC);
648 conf->clean_session = true;
650 status = pthread_mutex_init(&conf->lock, NULL);
656 C_COMPLAIN_INIT(&conf->complaint_cantpublish);
658 for (int i = 0; i < ci->children_num; i++) {
659 oconfig_item_t *child = ci->children + i;
660 if (strcasecmp("Host", child->key) == 0)
661 cf_util_get_string(child, &conf->host);
662 else if (strcasecmp("Port", child->key) == 0) {
663 status = cf_util_get_port_number(child);
665 ERROR("mqtt plugin: Invalid port number.");
668 } else if (strcasecmp("ClientId", child->key) == 0)
669 cf_util_get_string(child, &conf->client_id);
670 else if (strcasecmp("User", child->key) == 0)
671 cf_util_get_string(child, &conf->username);
672 else if (strcasecmp("Password", child->key) == 0)
673 cf_util_get_string(child, &conf->password);
674 else if (strcasecmp("QoS", child->key) == 0) {
676 status = cf_util_get_int(child, &qos);
677 if ((status != 0) || (qos < 0) || (qos > 2))
678 ERROR("mqtt plugin: Not a valid QoS setting.");
681 } else if (strcasecmp("Topic", child->key) == 0)
682 cf_util_get_string(child, &conf->topic);
683 else if (strcasecmp("CleanSession", child->key) == 0)
684 cf_util_get_boolean(child, &conf->clean_session);
685 else if (strcasecmp("CACert", child->key) == 0)
686 cf_util_get_string(child, &conf->cacertificatefile);
687 else if (strcasecmp("CertificateFile", child->key) == 0)
688 cf_util_get_string(child, &conf->certificatefile);
689 else if (strcasecmp("CertificateKeyFile", child->key) == 0)
690 cf_util_get_string(child, &conf->certificatekeyfile);
691 else if (strcasecmp("TLSProtocol", child->key) == 0)
692 cf_util_get_string(child, &conf->tlsprotocol);
693 else if (strcasecmp("CipherSuite", child->key) == 0)
694 cf_util_get_string(child, &conf->ciphersuite);
696 ERROR("mqtt plugin: Unknown config option: %s", child->key);
699 tmp = realloc(subscribers, sizeof(*subscribers) * (subscribers_num + 1));
701 ERROR("mqtt plugin: realloc failed.");
706 subscribers[subscribers_num] = conf;
710 } /* mqtt_config_subscriber */
722 static int mqtt_config(oconfig_item_t *ci) {
723 for (int i = 0; i < ci->children_num; i++) {
724 oconfig_item_t *child = ci->children + i;
726 if (strcasecmp("Publish", child->key) == 0)
727 mqtt_config_publisher(child);
728 else if (strcasecmp("Subscribe", child->key) == 0)
729 mqtt_config_subscriber(child);
731 ERROR("mqtt plugin: Unknown config option: %s", child->key);
735 } /* int mqtt_config */
737 static int mqtt_init(void) {
738 mosquitto_lib_init();
740 for (size_t i = 0; i < subscribers_num; i++) {
743 if (subscribers[i]->loop)
746 status = plugin_thread_create(&subscribers[i]->thread,
748 /* func = */ subscribers_thread,
749 /* args = */ subscribers[i],
750 /* name = */ "mqtt");
752 ERROR("mqtt plugin: pthread_create failed: %s", STRERRNO);
760 void module_register(void) {
761 plugin_register_complex_config("mqtt", mqtt_config);
762 plugin_register_init("mqtt", mqtt_init);
763 } /* void module_register */