// Reference: http://mosquitto.org/api/files/mosquitto-h.html
-
#include "collectd.h"
#include "common.h"
#include <mosquitto.h>
-#define MQTT_MAX_TOPIC_SIZE 1024
-#define MQTT_MAX_MESSAGE_SIZE MQTT_MAX_TOPIC_SIZE + 1024
-#define MQTT_DEFAULT_HOST "localhost"
-#define MQTT_DEFAULT_PORT 1883
-#define MQTT_DEFAULT_TOPIC_PREFIX "collectd"
-#define MQTT_DEFAULT_TOPIC "collectd/#"
+#define MQTT_MAX_TOPIC_SIZE 1024
+#define MQTT_MAX_MESSAGE_SIZE MQTT_MAX_TOPIC_SIZE + 1024
+#define MQTT_DEFAULT_HOST "localhost"
+#define MQTT_DEFAULT_PORT 1883
+#define MQTT_DEFAULT_TOPIC_PREFIX "collectd"
+#define MQTT_DEFAULT_TOPIC "collectd/#"
#ifndef MQTT_KEEPALIVE
-# define MQTT_KEEPALIVE 60
+#define MQTT_KEEPALIVE 60
#endif
#ifndef SSL_VERIFY_PEER
-# define SSL_VERIFY_PEER 1
+#define SSL_VERIFY_PEER 1
#endif
-
/*
* Data types
*/
-struct mqtt_client_conf
-{
- _Bool publish;
- char *name;
-
- struct mosquitto *mosq;
- _Bool connected;
-
- char *host;
- int port;
- char *client_id;
- char *username;
- char *password;
- int qos;
- char *cacertificatefile;
- char *certificatefile;
- char *certificatekeyfile;
- char *tlsprotocol;
- char *ciphersuite;
-
- /* For publishing */
- char *topic_prefix;
- _Bool store_rates;
- _Bool retain;
-
- /* For subscribing */
- pthread_t thread;
- _Bool loop;
- char *topic;
- _Bool clean_session;
-
- c_complain_t complaint_cantpublish;
- pthread_mutex_t lock;
+struct mqtt_client_conf {
+ _Bool publish;
+ char *name;
+
+ struct mosquitto *mosq;
+ _Bool connected;
+
+ char *host;
+ int port;
+ char *client_id;
+ char *username;
+ char *password;
+ int qos;
+ char *cacertificatefile;
+ char *certificatefile;
+ char *certificatekeyfile;
+ char *tlsprotocol;
+ char *ciphersuite;
+
+ /* For publishing */
+ char *topic_prefix;
+ _Bool store_rates;
+ _Bool retain;
+
+ /* For subscribing */
+ pthread_t thread;
+ _Bool loop;
+ char *topic;
+ _Bool clean_session;
+
+ c_complain_t complaint_cantpublish;
+ pthread_mutex_t lock;
};
typedef struct mqtt_client_conf mqtt_client_conf_t;
* Functions
*/
#if LIBMOSQUITTO_MAJOR == 0
-static char const *mosquitto_strerror (int code)
-{
- switch (code)
- {
- case MOSQ_ERR_SUCCESS: return "MOSQ_ERR_SUCCESS";
- case MOSQ_ERR_NOMEM: return "MOSQ_ERR_NOMEM";
- case MOSQ_ERR_PROTOCOL: return "MOSQ_ERR_PROTOCOL";
- case MOSQ_ERR_INVAL: return "MOSQ_ERR_INVAL";
- case MOSQ_ERR_NO_CONN: return "MOSQ_ERR_NO_CONN";
- case MOSQ_ERR_CONN_REFUSED: return "MOSQ_ERR_CONN_REFUSED";
- case MOSQ_ERR_NOT_FOUND: return "MOSQ_ERR_NOT_FOUND";
- case MOSQ_ERR_CONN_LOST: return "MOSQ_ERR_CONN_LOST";
- case MOSQ_ERR_SSL: return "MOSQ_ERR_SSL";
- case MOSQ_ERR_PAYLOAD_SIZE: return "MOSQ_ERR_PAYLOAD_SIZE";
- case MOSQ_ERR_NOT_SUPPORTED: return "MOSQ_ERR_NOT_SUPPORTED";
- case MOSQ_ERR_AUTH: return "MOSQ_ERR_AUTH";
- case MOSQ_ERR_ACL_DENIED: return "MOSQ_ERR_ACL_DENIED";
- case MOSQ_ERR_UNKNOWN: return "MOSQ_ERR_UNKNOWN";
- case MOSQ_ERR_ERRNO: return "MOSQ_ERR_ERRNO";
- }
-
- return "UNKNOWN ERROR CODE";
+static char const *mosquitto_strerror(int code) {
+ switch (code) {
+ case MOSQ_ERR_SUCCESS:
+ return "MOSQ_ERR_SUCCESS";
+ case MOSQ_ERR_NOMEM:
+ return "MOSQ_ERR_NOMEM";
+ case MOSQ_ERR_PROTOCOL:
+ return "MOSQ_ERR_PROTOCOL";
+ case MOSQ_ERR_INVAL:
+ return "MOSQ_ERR_INVAL";
+ case MOSQ_ERR_NO_CONN:
+ return "MOSQ_ERR_NO_CONN";
+ case MOSQ_ERR_CONN_REFUSED:
+ return "MOSQ_ERR_CONN_REFUSED";
+ case MOSQ_ERR_NOT_FOUND:
+ return "MOSQ_ERR_NOT_FOUND";
+ case MOSQ_ERR_CONN_LOST:
+ return "MOSQ_ERR_CONN_LOST";
+ case MOSQ_ERR_SSL:
+ return "MOSQ_ERR_SSL";
+ case MOSQ_ERR_PAYLOAD_SIZE:
+ return "MOSQ_ERR_PAYLOAD_SIZE";
+ case MOSQ_ERR_NOT_SUPPORTED:
+ return "MOSQ_ERR_NOT_SUPPORTED";
+ case MOSQ_ERR_AUTH:
+ return "MOSQ_ERR_AUTH";
+ case MOSQ_ERR_ACL_DENIED:
+ return "MOSQ_ERR_ACL_DENIED";
+ case MOSQ_ERR_UNKNOWN:
+ return "MOSQ_ERR_UNKNOWN";
+ case MOSQ_ERR_ERRNO:
+ return "MOSQ_ERR_ERRNO";
+ }
+
+ return "UNKNOWN ERROR CODE";
}
#else
/* provided by libmosquitto */
#endif
-static void mqtt_free (mqtt_client_conf_t *conf)
-{
- if (conf == NULL)
- return;
-
- if (conf->connected)
- (void) mosquitto_disconnect (conf->mosq);
- conf->connected = 0;
- (void) mosquitto_destroy (conf->mosq);
-
- sfree (conf->host);
- sfree (conf->username);
- sfree (conf->password);
- sfree (conf->client_id);
- sfree (conf->topic_prefix);
- sfree (conf);
+static void mqtt_free(mqtt_client_conf_t *conf) {
+ if (conf == NULL)
+ return;
+
+ if (conf->connected)
+ (void)mosquitto_disconnect(conf->mosq);
+ conf->connected = 0;
+ (void)mosquitto_destroy(conf->mosq);
+
+ sfree(conf->host);
+ sfree(conf->username);
+ sfree(conf->password);
+ sfree(conf->client_id);
+ sfree(conf->topic_prefix);
+ sfree(conf);
}
-static char *strip_prefix (char *topic)
-{
- size_t num = 0;
+static char *strip_prefix(char *topic) {
+ size_t num = 0;
- for (size_t i = 0; topic[i] != 0; i++)
- if (topic[i] == '/')
- num++;
+ for (size_t i = 0; topic[i] != 0; i++)
+ if (topic[i] == '/')
+ num++;
- if (num < 2)
- return (NULL);
+ if (num < 2)
+ return (NULL);
- while (num > 2)
- {
- char *tmp = strchr (topic, '/');
- if (tmp == NULL)
- return (NULL);
- topic = tmp + 1;
- num--;
- }
+ while (num > 2) {
+ char *tmp = strchr(topic, '/');
+ if (tmp == NULL)
+ return (NULL);
+ topic = tmp + 1;
+ num--;
+ }
- return (topic);
+ return (topic);
}
-static void on_message (
+static void on_message(
#if LIBMOSQUITTO_MAJOR == 0
#else
- __attribute__((unused)) struct mosquitto *m,
+ __attribute__((unused)) struct mosquitto *m,
#endif
- __attribute__((unused)) void *arg,
- const struct mosquitto_message *msg)
-{
- value_list_t vl = VALUE_LIST_INIT;
- data_set_t const *ds;
- char *topic;
- char *name;
- char *payload;
- int status;
-
- if (msg->payloadlen <= 0) {
- DEBUG ("mqtt plugin: message has empty payload");
- return;
- }
-
- topic = strdup (msg->topic);
- name = strip_prefix (topic);
-
- status = parse_identifier_vl (name, &vl);
- if (status != 0)
- {
- ERROR ("mqtt plugin: Unable to parse topic \"%s\".", topic);
- sfree (topic);
- return;
- }
- sfree (topic);
-
- ds = plugin_get_ds (vl.type);
- if (ds == NULL)
- {
- ERROR ("mqtt plugin: Unknown type: \"%s\".", vl.type);
- return;
- }
-
- vl.values = calloc (ds->ds_num, sizeof (*vl.values));
- if (vl.values == NULL)
- {
- ERROR ("mqtt plugin: calloc failed.");
- return;
- }
- vl.values_len = ds->ds_num;
-
- payload = malloc (msg->payloadlen+1);
- if (payload == NULL)
- {
- ERROR ("mqtt plugin: malloc for payload buffer failed.");
- sfree (vl.values);
- return;
- }
- memmove (payload, msg->payload, msg->payloadlen);
- payload[msg->payloadlen] = 0;
-
- DEBUG ("mqtt plugin: payload = \"%s\"", payload);
- status = parse_values (payload, &vl, ds);
- if (status != 0)
- {
- ERROR ("mqtt plugin: Unable to parse payload \"%s\".", payload);
- sfree (payload);
- sfree (vl.values);
- return;
- }
- sfree (payload);
-
- plugin_dispatch_values (&vl);
- sfree (vl.values);
+ __attribute__((unused)) void *arg, const struct mosquitto_message *msg) {
+ value_list_t vl = VALUE_LIST_INIT;
+ data_set_t const *ds;
+ char *topic;
+ char *name;
+ char *payload;
+ int status;
+
+ if (msg->payloadlen <= 0) {
+ DEBUG("mqtt plugin: message has empty payload");
+ return;
+ }
+
+ topic = strdup(msg->topic);
+ name = strip_prefix(topic);
+
+ status = parse_identifier_vl(name, &vl);
+ if (status != 0) {
+ ERROR("mqtt plugin: Unable to parse topic \"%s\".", topic);
+ sfree(topic);
+ return;
+ }
+ sfree(topic);
+
+ ds = plugin_get_ds(vl.type);
+ if (ds == NULL) {
+ ERROR("mqtt plugin: Unknown type: \"%s\".", vl.type);
+ return;
+ }
+
+ vl.values = calloc(ds->ds_num, sizeof(*vl.values));
+ if (vl.values == NULL) {
+ ERROR("mqtt plugin: calloc failed.");
+ return;
+ }
+ vl.values_len = ds->ds_num;
+
+ payload = malloc(msg->payloadlen + 1);
+ if (payload == NULL) {
+ ERROR("mqtt plugin: malloc for payload buffer failed.");
+ sfree(vl.values);
+ return;
+ }
+ memmove(payload, msg->payload, msg->payloadlen);
+ payload[msg->payloadlen] = 0;
+
+ DEBUG("mqtt plugin: payload = \"%s\"", payload);
+ status = parse_values(payload, &vl, ds);
+ if (status != 0) {
+ ERROR("mqtt plugin: Unable to parse payload \"%s\".", payload);
+ sfree(payload);
+ sfree(vl.values);
+ return;
+ }
+ sfree(payload);
+
+ plugin_dispatch_values(&vl);
+ sfree(vl.values);
} /* void on_message */
/* must hold conf->lock when calling. */
-static int mqtt_reconnect (mqtt_client_conf_t *conf)
-{
- int status;
+static int mqtt_reconnect(mqtt_client_conf_t *conf) {
+ int status;
- if (conf->connected)
- return (0);
-
- status = mosquitto_reconnect (conf->mosq);
- if (status != MOSQ_ERR_SUCCESS)
- {
- char errbuf[1024];
- ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s",
- (status == MOSQ_ERR_ERRNO)
- ? sstrerror(errno, errbuf, sizeof (errbuf))
- : mosquitto_strerror (status));
- return (-1);
- }
+ if (conf->connected)
+ return (0);
+
+ status = mosquitto_reconnect(conf->mosq);
+ if (status != MOSQ_ERR_SUCCESS) {
+ char errbuf[1024];
+ ERROR("mqtt_connect_broker: mosquitto_connect failed: %s",
+ (status == MOSQ_ERR_ERRNO) ? sstrerror(errno, errbuf, sizeof(errbuf))
+ : mosquitto_strerror(status));
+ return (-1);
+ }
- conf->connected = 1;
+ conf->connected = 1;
- c_release (LOG_INFO,
- &conf->complaint_cantpublish,
+ c_release(LOG_INFO, &conf->complaint_cantpublish,
"mqtt plugin: successfully reconnected to broker \"%s:%d\"",
conf->host, conf->port);
- return (0);
+ return (0);
} /* mqtt_reconnect */
/* must hold conf->lock when calling. */
-static int mqtt_connect (mqtt_client_conf_t *conf)
-{
- char const *client_id;
- int status;
+static int mqtt_connect(mqtt_client_conf_t *conf) {
+ char const *client_id;
+ int status;
- if (conf->mosq != NULL)
- return mqtt_reconnect (conf);
+ if (conf->mosq != NULL)
+ return mqtt_reconnect(conf);
- if (conf->client_id)
- client_id = conf->client_id;
- else
- client_id = hostname_g;
+ if (conf->client_id)
+ client_id = conf->client_id;
+ else
+ client_id = hostname_g;
#if LIBMOSQUITTO_MAJOR == 0
- conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
+ conf->mosq = mosquitto_new(client_id, /* user data = */ conf);
#else
- conf->mosq = mosquitto_new (client_id, conf->clean_session, /* user data = */ conf);
+ conf->mosq =
+ mosquitto_new(client_id, conf->clean_session, /* user data = */ conf);
#endif
- if (conf->mosq == NULL)
- {
- ERROR ("mqtt plugin: mosquitto_new failed");
- return (-1);
- }
+ if (conf->mosq == NULL) {
+ ERROR("mqtt plugin: mosquitto_new failed");
+ return (-1);
+ }
#if LIBMOSQUITTO_MAJOR != 0
- if (conf->cacertificatefile) {
- status = mosquitto_tls_set(conf->mosq, conf->cacertificatefile, NULL,
- conf->certificatefile, conf->certificatekeyfile, /* pw_callback */NULL);
- if (status != MOSQ_ERR_SUCCESS) {
- ERROR ("mqtt plugin: cannot mosquitto_tls_set: %s", mosquitto_strerror(status));
- mosquitto_destroy (conf->mosq);
- conf->mosq = NULL;
- return (-1);
- }
-
- status = mosquitto_tls_opts_set(conf->mosq, SSL_VERIFY_PEER, conf->tlsprotocol, conf->ciphersuite);
- if (status != MOSQ_ERR_SUCCESS) {
- ERROR ("mqtt plugin: cannot mosquitto_tls_opts_set: %s", mosquitto_strerror(status));
- mosquitto_destroy (conf->mosq);
- conf->mosq = NULL;
- return (-1);
- }
-
- status = mosquitto_tls_insecure_set(conf->mosq, false);
- if (status != MOSQ_ERR_SUCCESS) {
- ERROR ("mqtt plugin: cannot mosquitto_tls_insecure_set: %s", mosquitto_strerror(status));
- mosquitto_destroy (conf->mosq);
- conf->mosq = NULL;
- return (-1);
- }
+ if (conf->cacertificatefile) {
+ status = mosquitto_tls_set(conf->mosq, conf->cacertificatefile, NULL,
+ conf->certificatefile, conf->certificatekeyfile,
+ /* pw_callback */ NULL);
+ if (status != MOSQ_ERR_SUCCESS) {
+ ERROR("mqtt plugin: cannot mosquitto_tls_set: %s",
+ mosquitto_strerror(status));
+ mosquitto_destroy(conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
}
+
+ status = mosquitto_tls_opts_set(conf->mosq, SSL_VERIFY_PEER,
+ conf->tlsprotocol, conf->ciphersuite);
+ if (status != MOSQ_ERR_SUCCESS) {
+ ERROR("mqtt plugin: cannot mosquitto_tls_opts_set: %s",
+ mosquitto_strerror(status));
+ mosquitto_destroy(conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
+ }
+
+ status = mosquitto_tls_insecure_set(conf->mosq, false);
+ if (status != MOSQ_ERR_SUCCESS) {
+ ERROR("mqtt plugin: cannot mosquitto_tls_insecure_set: %s",
+ mosquitto_strerror(status));
+ mosquitto_destroy(conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
+ }
+ }
#endif
- if (conf->username && conf->password)
- {
- status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password);
- if (status != MOSQ_ERR_SUCCESS)
- {
- char errbuf[1024];
- ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s",
- (status == MOSQ_ERR_ERRNO)
- ? sstrerror (errno, errbuf, sizeof (errbuf))
- : mosquitto_strerror (status));
-
- mosquitto_destroy (conf->mosq);
- conf->mosq = NULL;
- return (-1);
- }
+ if (conf->username && conf->password) {
+ status =
+ mosquitto_username_pw_set(conf->mosq, conf->username, conf->password);
+ if (status != MOSQ_ERR_SUCCESS) {
+ char errbuf[1024];
+ ERROR("mqtt plugin: mosquitto_username_pw_set failed: %s",
+ (status == MOSQ_ERR_ERRNO)
+ ? sstrerror(errno, errbuf, sizeof(errbuf))
+ : mosquitto_strerror(status));
+
+ mosquitto_destroy(conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
}
+ }
#if LIBMOSQUITTO_MAJOR == 0
- status = mosquitto_connect (conf->mosq, conf->host, conf->port,
- /* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session);
+ status = mosquitto_connect(conf->mosq, conf->host, conf->port,
+ /* keepalive = */ MQTT_KEEPALIVE,
+ /* clean session = */ conf->clean_session);
#else
- status = mosquitto_connect (conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
+ status =
+ mosquitto_connect(conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
#endif
- if (status != MOSQ_ERR_SUCCESS)
- {
- char errbuf[1024];
- ERROR ("mqtt plugin: mosquitto_connect failed: %s",
- (status == MOSQ_ERR_ERRNO)
- ? sstrerror (errno, errbuf, sizeof (errbuf))
- : mosquitto_strerror (status));
-
- mosquitto_destroy (conf->mosq);
- conf->mosq = NULL;
- return (-1);
+ if (status != MOSQ_ERR_SUCCESS) {
+ char errbuf[1024];
+ ERROR("mqtt plugin: mosquitto_connect failed: %s",
+ (status == MOSQ_ERR_ERRNO) ? sstrerror(errno, errbuf, sizeof(errbuf))
+ : mosquitto_strerror(status));
+
+ mosquitto_destroy(conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
+ }
+
+ if (!conf->publish) {
+ mosquitto_message_callback_set(conf->mosq, on_message);
+
+ status =
+ mosquitto_subscribe(conf->mosq,
+ /* message_id = */ NULL, conf->topic, conf->qos);
+ if (status != MOSQ_ERR_SUCCESS) {
+ ERROR("mqtt plugin: Subscribing to \"%s\" failed: %s", conf->topic,
+ mosquitto_strerror(status));
+
+ mosquitto_disconnect(conf->mosq);
+ mosquitto_destroy(conf->mosq);
+ conf->mosq = NULL;
+ return (-1);
}
+ }
- if (!conf->publish)
- {
- mosquitto_message_callback_set (conf->mosq, on_message);
-
- status = mosquitto_subscribe (conf->mosq,
- /* message_id = */ NULL,
- conf->topic, conf->qos);
- if (status != MOSQ_ERR_SUCCESS)
- {
- ERROR ("mqtt plugin: Subscribing to \"%s\" failed: %s",
- conf->topic, mosquitto_strerror (status));
-
- mosquitto_disconnect (conf->mosq);
- mosquitto_destroy (conf->mosq);
- conf->mosq = NULL;
- return (-1);
- }
- }
-
- conf->connected = 1;
- return (0);
+ conf->connected = 1;
+ return (0);
} /* mqtt_connect */
-static void *subscribers_thread (void *arg)
-{
- mqtt_client_conf_t *conf = arg;
- int status;
+static void *subscribers_thread(void *arg) {
+ mqtt_client_conf_t *conf = arg;
+ int status;
- conf->loop = 1;
+ conf->loop = 1;
- while (conf->loop)
- {
- status = mqtt_connect (conf);
- if (status != 0)
- {
- sleep (1);
- continue;
- }
+ while (conf->loop) {
+ status = mqtt_connect(conf);
+ if (status != 0) {
+ sleep(1);
+ continue;
+ }
- /* The documentation says "0" would map to the default (1000ms), but
- * that does not work on some versions. */
+/* The documentation says "0" would map to the default (1000ms), but
+ * that does not work on some versions. */
#if LIBMOSQUITTO_MAJOR == 0
- status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
+ status = mosquitto_loop(conf->mosq, /* timeout = */ 1000 /* ms */);
#else
- status = mosquitto_loop (conf->mosq,
- /* timeout[ms] = */ 1000,
- /* max_packets = */ 100);
+ status = mosquitto_loop(conf->mosq,
+ /* timeout[ms] = */ 1000,
+ /* max_packets = */ 100);
#endif
- if (status == MOSQ_ERR_CONN_LOST)
- {
- conf->connected = 0;
- continue;
- }
- else if (status != MOSQ_ERR_SUCCESS)
- {
- ERROR ("mqtt plugin: mosquitto_loop failed: %s",
- mosquitto_strerror (status));
- mosquitto_destroy (conf->mosq);
- conf->mosq = NULL;
- conf->connected = 0;
- continue;
- }
-
- DEBUG ("mqtt plugin: mosquitto_loop succeeded.");
- } /* while (conf->loop) */
-
- pthread_exit (0);
+ if (status == MOSQ_ERR_CONN_LOST) {
+ conf->connected = 0;
+ continue;
+ } else if (status != MOSQ_ERR_SUCCESS) {
+ ERROR("mqtt plugin: mosquitto_loop failed: %s",
+ mosquitto_strerror(status));
+ mosquitto_destroy(conf->mosq);
+ conf->mosq = NULL;
+ conf->connected = 0;
+ continue;
+ }
+
+ DEBUG("mqtt plugin: mosquitto_loop succeeded.");
+ } /* while (conf->loop) */
+
+ pthread_exit(0);
} /* void *subscribers_thread */
-static int publish (mqtt_client_conf_t *conf, char const *topic,
- void const *payload, size_t payload_len)
-{
- int status;
+static int publish(mqtt_client_conf_t *conf, char const *topic,
+ void const *payload, size_t payload_len) {
+ int status;
- pthread_mutex_lock (&conf->lock);
+ pthread_mutex_lock(&conf->lock);
- status = mqtt_connect (conf);
- if (status != 0) {
- pthread_mutex_unlock (&conf->lock);
- ERROR ("mqtt plugin: unable to reconnect to broker");
- return (status);
- }
+ status = mqtt_connect(conf);
+ if (status != 0) {
+ pthread_mutex_unlock(&conf->lock);
+ ERROR("mqtt plugin: unable to reconnect to broker");
+ return (status);
+ }
- status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
+ status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
#if LIBMOSQUITTO_MAJOR == 0
- (uint32_t) payload_len, payload,
+ (uint32_t)payload_len, payload,
#else
- (int) payload_len, payload,
+ (int)payload_len, payload,
#endif
- conf->qos, conf->retain);
- if (status != MOSQ_ERR_SUCCESS)
- {
- char errbuf[1024];
- c_complain (LOG_ERR,
- &conf->complaint_cantpublish,
- "mqtt plugin: mosquitto_publish failed: %s",
- (status == MOSQ_ERR_ERRNO)
- ? sstrerror(errno, errbuf, sizeof (errbuf))
- : mosquitto_strerror(status));
- /* Mark our connection "down" regardless of the error as a safety
- * measure; we will try to reconnect the next time we have to publish a
- * message */
- conf->connected = 0;
-
- pthread_mutex_unlock (&conf->lock);
- return (-1);
- }
+ conf->qos, conf->retain);
+ if (status != MOSQ_ERR_SUCCESS) {
+ char errbuf[1024];
+ c_complain(LOG_ERR, &conf->complaint_cantpublish,
+ "mqtt plugin: mosquitto_publish failed: %s",
+ (status == MOSQ_ERR_ERRNO)
+ ? sstrerror(errno, errbuf, sizeof(errbuf))
+ : mosquitto_strerror(status));
+ /* Mark our connection "down" regardless of the error as a safety
+ * measure; we will try to reconnect the next time we have to publish a
+ * message */
+ conf->connected = 0;
- pthread_mutex_unlock (&conf->lock);
- return (0);
+ pthread_mutex_unlock(&conf->lock);
+ return (-1);
+ }
+
+ pthread_mutex_unlock(&conf->lock);
+ return (0);
} /* int publish */
-static int format_topic (char *buf, size_t buf_len,
- data_set_t const *ds, value_list_t const *vl,
- mqtt_client_conf_t *conf)
-{
- char name[MQTT_MAX_TOPIC_SIZE];
- int status;
+static int format_topic(char *buf, size_t buf_len, data_set_t const *ds,
+ value_list_t const *vl, mqtt_client_conf_t *conf) {
+ char name[MQTT_MAX_TOPIC_SIZE];
+ int status;
- if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
- return (FORMAT_VL (buf, buf_len, vl));
+ if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
+ return (FORMAT_VL(buf, buf_len, vl));
- status = FORMAT_VL (name, sizeof (name), vl);
- if (status != 0)
- return (status);
+ status = FORMAT_VL(name, sizeof(name), vl);
+ if (status != 0)
+ return (status);
- status = ssnprintf (buf, buf_len, "%s/%s", conf->topic_prefix, name);
- if ((status < 0) || (((size_t) status) >= buf_len))
- return (ENOMEM);
+ status = ssnprintf(buf, buf_len, "%s/%s", conf->topic_prefix, name);
+ if ((status < 0) || (((size_t)status) >= buf_len))
+ return (ENOMEM);
- return (0);
+ return (0);
} /* int format_topic */
-static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
- user_data_t *user_data)
-{
- mqtt_client_conf_t *conf;
- char topic[MQTT_MAX_TOPIC_SIZE];
- char payload[MQTT_MAX_MESSAGE_SIZE];
- int status = 0;
-
- if ((user_data == NULL) || (user_data->data == NULL))
- return (EINVAL);
- conf = user_data->data;
-
- status = format_topic (topic, sizeof (topic), ds, vl, conf);
- if (status != 0)
- {
- ERROR ("mqtt plugin: format_topic failed with status %d.", status);
- return (status);
- }
+static int mqtt_write(const data_set_t *ds, const value_list_t *vl,
+ user_data_t *user_data) {
+ mqtt_client_conf_t *conf;
+ char topic[MQTT_MAX_TOPIC_SIZE];
+ char payload[MQTT_MAX_MESSAGE_SIZE];
+ int status = 0;
- status = format_values (payload, sizeof (payload),
- ds, vl, conf->store_rates);
- if (status != 0)
- {
- ERROR ("mqtt plugin: format_values failed with status %d.", status);
- return (status);
- }
+ if ((user_data == NULL) || (user_data->data == NULL))
+ return (EINVAL);
+ conf = user_data->data;
- status = publish (conf, topic, payload, strlen (payload) + 1);
- if (status != 0)
- {
- ERROR ("mqtt plugin: publish failed: %s", mosquitto_strerror (status));
- return (status);
- }
+ status = format_topic(topic, sizeof(topic), ds, vl, conf);
+ if (status != 0) {
+ ERROR("mqtt plugin: format_topic failed with status %d.", status);
+ return (status);
+ }
+ status = format_values(payload, sizeof(payload), ds, vl, conf->store_rates);
+ if (status != 0) {
+ ERROR("mqtt plugin: format_values failed with status %d.", status);
return (status);
+ }
+
+ status = publish(conf, topic, payload, strlen(payload) + 1);
+ if (status != 0) {
+ ERROR("mqtt plugin: publish failed: %s", mosquitto_strerror(status));
+ return (status);
+ }
+
+ return (status);
} /* mqtt_write */
/*
* TLSProtocol "tlsv1.2" optional
* </Publish>
*/
-static int mqtt_config_publisher (oconfig_item_t *ci)
-{
- mqtt_client_conf_t *conf;
- char cb_name[1024];
- int status;
-
- conf = calloc (1, sizeof (*conf));
- if (conf == NULL)
- {
- ERROR ("mqtt plugin: calloc failed.");
- return (-1);
- }
- conf->publish = 1;
-
- conf->name = NULL;
- status = cf_util_get_string (ci, &conf->name);
- if (status != 0)
- {
- mqtt_free (conf);
- return (status);
- }
-
- conf->host = strdup (MQTT_DEFAULT_HOST);
- conf->port = MQTT_DEFAULT_PORT;
- conf->client_id = NULL;
- conf->qos = 0;
- conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
- conf->store_rates = 1;
-
- status = pthread_mutex_init (&conf->lock, NULL);
- if (status != 0)
- {
- mqtt_free (conf);
- return (status);
- }
-
- C_COMPLAIN_INIT (&conf->complaint_cantpublish);
-
- for (int i = 0; i < ci->children_num; i++)
- {
- oconfig_item_t *child = ci->children + i;
- if (strcasecmp ("Host", child->key) == 0)
- cf_util_get_string (child, &conf->host);
- else if (strcasecmp ("Port", child->key) == 0)
- {
- int tmp = cf_util_get_port_number (child);
- if (tmp < 0)
- ERROR ("mqtt plugin: Invalid port number.");
- else
- conf->port = tmp;
- }
- else if (strcasecmp ("ClientId", child->key) == 0)
- cf_util_get_string (child, &conf->client_id);
- else if (strcasecmp ("User", child->key) == 0)
- cf_util_get_string (child, &conf->username);
- else if (strcasecmp ("Password", child->key) == 0)
- cf_util_get_string (child, &conf->password);
- else if (strcasecmp ("QoS", child->key) == 0)
- {
- int tmp = -1;
- status = cf_util_get_int (child, &tmp);
- if ((status != 0) || (tmp < 0) || (tmp > 2))
- ERROR ("mqtt plugin: Not a valid QoS setting.");
- else
- conf->qos = tmp;
- }
- else if (strcasecmp ("Prefix", child->key) == 0)
- cf_util_get_string (child, &conf->topic_prefix);
- else if (strcasecmp ("StoreRates", child->key) == 0)
- cf_util_get_boolean (child, &conf->store_rates);
- else if (strcasecmp ("Retain", child->key) == 0)
- cf_util_get_boolean (child, &conf->retain);
- else if (strcasecmp ("CACert", child->key) == 0)
- cf_util_get_string (child, &conf->cacertificatefile);
- else if (strcasecmp ("CertificateFile", child->key) == 0)
- cf_util_get_string (child, &conf->certificatefile);
- else if (strcasecmp ("CertificateKeyFile", child->key) == 0)
- cf_util_get_string (child, &conf->certificatekeyfile);
- else if (strcasecmp ("TLSProtocol", child->key) == 0)
- cf_util_get_string (child, &conf->tlsprotocol);
- else if (strcasecmp ("CipherSuite", child->key) == 0)
- cf_util_get_string (child, &conf->ciphersuite);
- else
- ERROR ("mqtt plugin: Unknown config option: %s", child->key);
- }
+static int mqtt_config_publisher(oconfig_item_t *ci) {
+ mqtt_client_conf_t *conf;
+ char cb_name[1024];
+ int status;
+
+ conf = calloc(1, sizeof(*conf));
+ if (conf == NULL) {
+ ERROR("mqtt plugin: calloc failed.");
+ return (-1);
+ }
+ conf->publish = 1;
+
+ conf->name = NULL;
+ status = cf_util_get_string(ci, &conf->name);
+ if (status != 0) {
+ mqtt_free(conf);
+ return (status);
+ }
+
+ conf->host = strdup(MQTT_DEFAULT_HOST);
+ conf->port = MQTT_DEFAULT_PORT;
+ conf->client_id = NULL;
+ conf->qos = 0;
+ conf->topic_prefix = strdup(MQTT_DEFAULT_TOPIC_PREFIX);
+ conf->store_rates = 1;
+
+ status = pthread_mutex_init(&conf->lock, NULL);
+ if (status != 0) {
+ mqtt_free(conf);
+ return (status);
+ }
+
+ C_COMPLAIN_INIT(&conf->complaint_cantpublish);
+
+ for (int i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+ if (strcasecmp("Host", child->key) == 0)
+ cf_util_get_string(child, &conf->host);
+ else if (strcasecmp("Port", child->key) == 0) {
+ int tmp = cf_util_get_port_number(child);
+ if (tmp < 0)
+ ERROR("mqtt plugin: Invalid port number.");
+ else
+ conf->port = tmp;
+ } else if (strcasecmp("ClientId", child->key) == 0)
+ cf_util_get_string(child, &conf->client_id);
+ else if (strcasecmp("User", child->key) == 0)
+ cf_util_get_string(child, &conf->username);
+ else if (strcasecmp("Password", child->key) == 0)
+ cf_util_get_string(child, &conf->password);
+ else if (strcasecmp("QoS", child->key) == 0) {
+ int tmp = -1;
+ status = cf_util_get_int(child, &tmp);
+ if ((status != 0) || (tmp < 0) || (tmp > 2))
+ ERROR("mqtt plugin: Not a valid QoS setting.");
+ else
+ conf->qos = tmp;
+ } else if (strcasecmp("Prefix", child->key) == 0)
+ cf_util_get_string(child, &conf->topic_prefix);
+ else if (strcasecmp("StoreRates", child->key) == 0)
+ cf_util_get_boolean(child, &conf->store_rates);
+ else if (strcasecmp("Retain", child->key) == 0)
+ cf_util_get_boolean(child, &conf->retain);
+ else if (strcasecmp("CACert", child->key) == 0)
+ cf_util_get_string(child, &conf->cacertificatefile);
+ else if (strcasecmp("CertificateFile", child->key) == 0)
+ cf_util_get_string(child, &conf->certificatefile);
+ else if (strcasecmp("CertificateKeyFile", child->key) == 0)
+ cf_util_get_string(child, &conf->certificatekeyfile);
+ else if (strcasecmp("TLSProtocol", child->key) == 0)
+ cf_util_get_string(child, &conf->tlsprotocol);
+ else if (strcasecmp("CipherSuite", child->key) == 0)
+ cf_util_get_string(child, &conf->ciphersuite);
+ else
+ ERROR("mqtt plugin: Unknown config option: %s", child->key);
+ }
- ssnprintf (cb_name, sizeof (cb_name), "mqtt/%s", conf->name);
- user_data_t user_data = {
- .data = conf
- };
+ ssnprintf(cb_name, sizeof(cb_name), "mqtt/%s", conf->name);
+ user_data_t user_data = {.data = conf};
- plugin_register_write (cb_name, mqtt_write, &user_data);
- return (0);
+ plugin_register_write(cb_name, mqtt_write, &user_data);
+ return (0);
} /* mqtt_config_publisher */
/*
* Topic "collectd/#"
* </Subscribe>
*/
-static int mqtt_config_subscriber (oconfig_item_t *ci)
-{
- mqtt_client_conf_t **tmp;
- mqtt_client_conf_t *conf;
- int status;
-
- conf = calloc (1, sizeof (*conf));
- if (conf == NULL)
- {
- ERROR ("mqtt plugin: calloc failed.");
- return (-1);
- }
- conf->publish = 0;
-
- conf->name = NULL;
- status = cf_util_get_string (ci, &conf->name);
- if (status != 0)
- {
- mqtt_free (conf);
- return (status);
- }
-
- conf->host = strdup (MQTT_DEFAULT_HOST);
- conf->port = MQTT_DEFAULT_PORT;
- conf->client_id = NULL;
- conf->qos = 2;
- conf->topic = strdup (MQTT_DEFAULT_TOPIC);
- conf->clean_session = 1;
-
- status = pthread_mutex_init (&conf->lock, NULL);
- if (status != 0)
- {
- mqtt_free (conf);
- return (status);
- }
-
- C_COMPLAIN_INIT (&conf->complaint_cantpublish);
-
- for (int i = 0; i < ci->children_num; i++)
- {
- oconfig_item_t *child = ci->children + i;
- if (strcasecmp ("Host", child->key) == 0)
- cf_util_get_string (child, &conf->host);
- else if (strcasecmp ("Port", child->key) == 0)
- {
- status = cf_util_get_port_number (child);
- if (status < 0)
- ERROR ("mqtt plugin: Invalid port number.");
- else
- conf->port = status;
- }
- else if (strcasecmp ("ClientId", child->key) == 0)
- cf_util_get_string (child, &conf->client_id);
- else if (strcasecmp ("User", child->key) == 0)
- cf_util_get_string (child, &conf->username);
- else if (strcasecmp ("Password", child->key) == 0)
- cf_util_get_string (child, &conf->password);
- else if (strcasecmp ("QoS", child->key) == 0)
- {
- int qos = -1;
- status = cf_util_get_int (child, &qos);
- if ((status != 0) || (qos < 0) || (qos > 2))
- ERROR ("mqtt plugin: Not a valid QoS setting.");
- else
- conf->qos = qos;
- }
- else if (strcasecmp ("Topic", child->key) == 0)
- cf_util_get_string (child, &conf->topic);
- else if (strcasecmp ("CleanSession", child->key) == 0)
- cf_util_get_boolean (child, &conf->clean_session);
- else
- ERROR ("mqtt plugin: Unknown config option: %s", child->key);
- }
-
- tmp = realloc (subscribers, sizeof (*subscribers) * (subscribers_num + 1) );
- if (tmp == NULL)
- {
- ERROR ("mqtt plugin: realloc failed.");
- mqtt_free (conf);
- return (-1);
- }
- subscribers = tmp;
- subscribers[subscribers_num] = conf;
- subscribers_num++;
-
- return (0);
+static int mqtt_config_subscriber(oconfig_item_t *ci) {
+ mqtt_client_conf_t **tmp;
+ mqtt_client_conf_t *conf;
+ int status;
+
+ conf = calloc(1, sizeof(*conf));
+ if (conf == NULL) {
+ ERROR("mqtt plugin: calloc failed.");
+ return (-1);
+ }
+ conf->publish = 0;
+
+ conf->name = NULL;
+ status = cf_util_get_string(ci, &conf->name);
+ if (status != 0) {
+ mqtt_free(conf);
+ return (status);
+ }
+
+ conf->host = strdup(MQTT_DEFAULT_HOST);
+ conf->port = MQTT_DEFAULT_PORT;
+ conf->client_id = NULL;
+ conf->qos = 2;
+ conf->topic = strdup(MQTT_DEFAULT_TOPIC);
+ conf->clean_session = 1;
+
+ status = pthread_mutex_init(&conf->lock, NULL);
+ if (status != 0) {
+ mqtt_free(conf);
+ return (status);
+ }
+
+ C_COMPLAIN_INIT(&conf->complaint_cantpublish);
+
+ for (int i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+ if (strcasecmp("Host", child->key) == 0)
+ cf_util_get_string(child, &conf->host);
+ else if (strcasecmp("Port", child->key) == 0) {
+ status = cf_util_get_port_number(child);
+ if (status < 0)
+ ERROR("mqtt plugin: Invalid port number.");
+ else
+ conf->port = status;
+ } else if (strcasecmp("ClientId", child->key) == 0)
+ cf_util_get_string(child, &conf->client_id);
+ else if (strcasecmp("User", child->key) == 0)
+ cf_util_get_string(child, &conf->username);
+ else if (strcasecmp("Password", child->key) == 0)
+ cf_util_get_string(child, &conf->password);
+ else if (strcasecmp("QoS", child->key) == 0) {
+ int qos = -1;
+ status = cf_util_get_int(child, &qos);
+ if ((status != 0) || (qos < 0) || (qos > 2))
+ ERROR("mqtt plugin: Not a valid QoS setting.");
+ else
+ conf->qos = qos;
+ } else if (strcasecmp("Topic", child->key) == 0)
+ cf_util_get_string(child, &conf->topic);
+ else if (strcasecmp("CleanSession", child->key) == 0)
+ cf_util_get_boolean(child, &conf->clean_session);
+ else
+ ERROR("mqtt plugin: Unknown config option: %s", child->key);
+ }
+
+ tmp = realloc(subscribers, sizeof(*subscribers) * (subscribers_num + 1));
+ if (tmp == NULL) {
+ ERROR("mqtt plugin: realloc failed.");
+ mqtt_free(conf);
+ return (-1);
+ }
+ subscribers = tmp;
+ subscribers[subscribers_num] = conf;
+ subscribers_num++;
+
+ return (0);
} /* mqtt_config_subscriber */
/*
* </Subscribe>
* </Plugin>
*/
-static int mqtt_config (oconfig_item_t *ci)
-{
- for (int i = 0; i < ci->children_num; i++)
- {
- oconfig_item_t *child = ci->children + i;
-
- if (strcasecmp ("Publish", child->key) == 0)
- mqtt_config_publisher (child);
- else if (strcasecmp ("Subscribe", child->key) == 0)
- mqtt_config_subscriber (child);
- else
- ERROR ("mqtt plugin: Unknown config option: %s", child->key);
- }
+static int mqtt_config(oconfig_item_t *ci) {
+ for (int i = 0; i < ci->children_num; i++) {
+ oconfig_item_t *child = ci->children + i;
+
+ if (strcasecmp("Publish", child->key) == 0)
+ mqtt_config_publisher(child);
+ else if (strcasecmp("Subscribe", child->key) == 0)
+ mqtt_config_subscriber(child);
+ else
+ ERROR("mqtt plugin: Unknown config option: %s", child->key);
+ }
- return (0);
+ return (0);
} /* int mqtt_config */
-static int mqtt_init (void)
-{
- mosquitto_lib_init ();
-
- for (size_t i = 0; i < subscribers_num; i++)
- {
- int status;
-
- if (subscribers[i]->loop)
- continue;
-
- status = plugin_thread_create (&subscribers[i]->thread,
- /* attrs = */ NULL,
- /* func = */ subscribers_thread,
- /* args = */ subscribers[i]);
- if (status != 0)
- {
- char errbuf[1024];
- ERROR ("mqtt plugin: pthread_create failed: %s",
- sstrerror (errno, errbuf, sizeof (errbuf)));
- continue;
- }
+static int mqtt_init(void) {
+ mosquitto_lib_init();
+
+ for (size_t i = 0; i < subscribers_num; i++) {
+ int status;
+
+ if (subscribers[i]->loop)
+ continue;
+
+ status = plugin_thread_create(&subscribers[i]->thread,
+ /* attrs = */ NULL,
+ /* func = */ subscribers_thread,
+ /* args = */ subscribers[i]);
+ if (status != 0) {
+ char errbuf[1024];
+ ERROR("mqtt plugin: pthread_create failed: %s",
+ sstrerror(errno, errbuf, sizeof(errbuf)));
+ continue;
}
+ }
- return (0);
+ return (0);
} /* mqtt_init */
-void module_register (void)
-{
- plugin_register_complex_config ("mqtt", mqtt_config);
- plugin_register_init ("mqtt", mqtt_init);
+void module_register(void) {
+ plugin_register_complex_config("mqtt", mqtt_config);
+ plugin_register_init("mqtt", mqtt_init);
} /* void module_register */
/* vim: set sw=4 sts=4 et fdm=marker : */