mqtt, gps: add name parameter to plugin_thread_create()
[collectd.git] / src / mqtt.c
index 5c844a7..32304f0 100644 (file)
@@ -1,6 +1,7 @@
 /**
  * collectd - src/mqtt.c
- * Copyright (C) 2014       Marc Falzon <marc at baha dot mu>
+ * Copyright (C) 2014       Marc Falzon
+ * Copyright (C) 2014,2015  Florian octo Forster
  *
  * Permission is hereby granted, free of charge, to any person obtaining a
  * copy of this software and associated documentation files (the "Software"),
  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
  * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Marc Falzon <marc at baha dot mu>
+ *   Florian octo Forster <octo at collectd.org>
+ *   Jan-Piet Mens <jpmens at gmail.com>
  **/
 
 // Reference: http://mosquitto.org/api/files/mosquitto-h.html
 
 
 #include "collectd.h"
+
 #include "common.h"
 #include "plugin.h"
-#include "utils_cache.h"
 #include "utils_complain.h"
 
-#include <pthread.h>
-
 #include <mosquitto.h>
 
 #define MQTT_MAX_TOPIC_SIZE         1024
 #define MQTT_MAX_MESSAGE_SIZE       MQTT_MAX_TOPIC_SIZE + 1024
 #define MQTT_DEFAULT_HOST           "localhost"
 #define MQTT_DEFAULT_PORT           1883
-#define MQTT_DEFAULT_CLIENT_ID      "collectd"
 #define MQTT_DEFAULT_TOPIC_PREFIX   "collectd"
+#define MQTT_DEFAULT_TOPIC          "collectd/#"
+#ifndef MQTT_KEEPALIVE
+# define MQTT_KEEPALIVE 60
+#endif
+#ifndef SSL_VERIFY_PEER
+# define SSL_VERIFY_PEER  1
+#endif
+
 
 /*
  * Data types
  */
 struct mqtt_client_conf
 {
-    struct mosquitto    *mosq;
-    bool                connected;
-    char                *host;
+    _Bool               publish;
+    char               *name;
+
+    struct mosquitto   *mosq;
+    _Bool               connected;
+
+    char               *host;
     int                 port;
-    char                *client_id;
-    char                *topic_prefix;
+    char               *client_id;
+    char               *username;
+    char               *password;
+    int                 qos;
+    char                *cacertificatefile;
+    char                *certificatefile;
+    char                *certificatekeyfile;
+    char                *tlsprotocol;
+    char                *ciphersuite;
+
+    /* For publishing */
+    char               *topic_prefix;
+    _Bool               store_rates;
+    _Bool               retain;
+
+    /* For subscribing */
+    pthread_t           thread;
+    _Bool               loop;
+    char               *topic;
+    _Bool               clean_session;
+
     c_complain_t        complaint_cantpublish;
     pthread_mutex_t     lock;
 };
 typedef struct mqtt_client_conf mqtt_client_conf_t;
 
+static mqtt_client_conf_t **subscribers = NULL;
+static size_t subscribers_num = 0;
+
+/*
+ * Functions
+ */
+#if LIBMOSQUITTO_MAJOR == 0
 static char const *mosquitto_strerror (int code)
 {
     switch (code)
@@ -80,12 +121,125 @@ static char const *mosquitto_strerror (int code)
 
     return "UNKNOWN ERROR CODE";
 }
+#else
+/* provided by libmosquitto */
+#endif
+
+static void mqtt_free (mqtt_client_conf_t *conf)
+{
+    if (conf == NULL)
+        return;
+
+    if (conf->connected)
+        (void) mosquitto_disconnect (conf->mosq);
+    conf->connected = 0;
+    (void) mosquitto_destroy (conf->mosq);
+
+    sfree (conf->host);
+    sfree (conf->username);
+    sfree (conf->password);
+    sfree (conf->client_id);
+    sfree (conf->topic_prefix);
+    sfree (conf);
+}
+
+static char *strip_prefix (char *topic)
+{
+    size_t num = 0;
+
+    for (size_t i = 0; topic[i] != 0; i++)
+        if (topic[i] == '/')
+            num++;
+
+    if (num < 2)
+        return (NULL);
+
+    while (num > 2)
+    {
+        char *tmp = strchr (topic, '/');
+        if (tmp == NULL)
+            return (NULL);
+        topic = tmp + 1;
+        num--;
+    }
+
+    return (topic);
+}
+
+static void on_message (
+#if LIBMOSQUITTO_MAJOR == 0
+#else
+        __attribute__((unused)) struct mosquitto *m,
+#endif
+        __attribute__((unused)) void *arg,
+        const struct mosquitto_message *msg)
+{
+    value_list_t vl = VALUE_LIST_INIT;
+    data_set_t const *ds;
+    char *topic;
+    char *name;
+    char *payload;
+    int status;
+
+    if (msg->payloadlen <= 0) {
+        DEBUG ("mqtt plugin: message has empty payload");
+        return;
+    }
+
+    topic = strdup (msg->topic);
+    name = strip_prefix (topic);
+
+    status = parse_identifier_vl (name, &vl);
+    if (status != 0)
+    {
+        ERROR ("mqtt plugin: Unable to parse topic \"%s\".", topic);
+        sfree (topic);
+        return;
+    }
+    sfree (topic);
+
+    ds = plugin_get_ds (vl.type);
+    if (ds == NULL)
+    {
+        ERROR ("mqtt plugin: Unknown type: \"%s\".", vl.type);
+        return;
+    }
+
+    vl.values = calloc (ds->ds_num, sizeof (*vl.values));
+    if (vl.values == NULL)
+    {
+        ERROR ("mqtt plugin: calloc failed.");
+        return;
+    }
+    vl.values_len = ds->ds_num;
+
+    payload = malloc (msg->payloadlen+1);
+    if (payload == NULL)
+    {
+        ERROR ("mqtt plugin: malloc for payload buffer failed.");
+        sfree (vl.values);
+        return;
+    }
+    memmove (payload, msg->payload, msg->payloadlen);
+    payload[msg->payloadlen] = 0;
+
+    DEBUG ("mqtt plugin: payload = \"%s\"", payload);
+    status = parse_values (payload, &vl, ds);
+    if (status != 0)
+    {
+        ERROR ("mqtt plugin: Unable to parse payload \"%s\".", payload);
+        sfree (payload);
+        sfree (vl.values);
+        return;
+    }
+    sfree (payload);
+
+    plugin_dispatch_values (&vl);
+    sfree (vl.values);
+} /* void on_message */
 
-/*
- * Functions
- */
 /* must hold conf->lock when calling. */
-static int mqtt_reconnect_broker (mqtt_client_conf_t *conf)
+static int mqtt_reconnect (mqtt_client_conf_t *conf)
 {
     int status;
 
@@ -95,57 +249,217 @@ static int mqtt_reconnect_broker (mqtt_client_conf_t *conf)
     status = mosquitto_reconnect (conf->mosq);
     if (status != MOSQ_ERR_SUCCESS)
     {
+        char errbuf[1024];
         ERROR ("mqtt_connect_broker: mosquitto_connect failed: %s",
-            (status == MOSQ_ERR_ERRNO ?
-                strerror(errno) : mosquitto_strerror (status)));
+                (status == MOSQ_ERR_ERRNO)
+                ? sstrerror(errno, errbuf, sizeof (errbuf))
+                : mosquitto_strerror (status));
         return (-1);
     }
 
-    conf->connected = true;
+    conf->connected = 1;
 
     c_release (LOG_INFO,
-        &conf->complaint_cantpublish,
-        "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
-        conf->host, conf->port);
+            &conf->complaint_cantpublish,
+            "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
+            conf->host, conf->port);
 
     return (0);
-} /* mqtt_reconnect_broker */
+} /* mqtt_reconnect */
+
+/* must hold conf->lock when calling. */
+static int mqtt_connect (mqtt_client_conf_t *conf)
+{
+    char const *client_id;
+    int status;
+
+    if (conf->mosq != NULL)
+        return mqtt_reconnect (conf);
+
+    if (conf->client_id)
+        client_id = conf->client_id;
+    else
+        client_id = hostname_g;
+
+#if LIBMOSQUITTO_MAJOR == 0
+    conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
+#else
+    conf->mosq = mosquitto_new (client_id, conf->clean_session, /* user data = */ conf);
+#endif
+    if (conf->mosq == NULL)
+    {
+        ERROR ("mqtt plugin: mosquitto_new failed");
+        return (-1);
+    }
+
+#if LIBMOSQUITTO_MAJOR != 0
+    if (conf->cacertificatefile) {
+        status = mosquitto_tls_set(conf->mosq, conf->cacertificatefile, NULL,
+                                  conf->certificatefile, conf->certificatekeyfile, /* pw_callback */NULL);
+        if (status != MOSQ_ERR_SUCCESS) {
+            ERROR ("mqtt plugin: cannot mosquitto_tls_set: %s", mosquitto_strerror(status));
+            mosquitto_destroy (conf->mosq);
+            conf->mosq = NULL;
+            return (-1);
+        }
+
+        status = mosquitto_tls_opts_set(conf->mosq, SSL_VERIFY_PEER, conf->tlsprotocol, conf->ciphersuite);
+        if (status != MOSQ_ERR_SUCCESS) {
+            ERROR ("mqtt plugin: cannot mosquitto_tls_opts_set: %s", mosquitto_strerror(status));
+            mosquitto_destroy (conf->mosq);
+            conf->mosq = NULL;
+            return (-1);
+        }
+
+        status = mosquitto_tls_insecure_set(conf->mosq, false);
+        if (status != MOSQ_ERR_SUCCESS) {
+            ERROR ("mqtt plugin: cannot mosquitto_tls_insecure_set: %s", mosquitto_strerror(status));
+            mosquitto_destroy (conf->mosq);
+            conf->mosq = NULL;
+            return (-1);
+        }
+    }
+#endif
+
+    if (conf->username && conf->password)
+    {
+        status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password);
+        if (status != MOSQ_ERR_SUCCESS)
+        {
+            char errbuf[1024];
+            ERROR ("mqtt plugin: mosquitto_username_pw_set failed: %s",
+                    (status == MOSQ_ERR_ERRNO)
+                    ? sstrerror (errno, errbuf, sizeof (errbuf))
+                    : mosquitto_strerror (status));
+
+            mosquitto_destroy (conf->mosq);
+            conf->mosq = NULL;
+            return (-1);
+        }
+    }
+
+#if LIBMOSQUITTO_MAJOR == 0
+    status = mosquitto_connect (conf->mosq, conf->host, conf->port,
+            /* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session);
+#else
+    status = mosquitto_connect (conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
+#endif
+    if (status != MOSQ_ERR_SUCCESS)
+    {
+        char errbuf[1024];
+        ERROR ("mqtt plugin: mosquitto_connect failed: %s",
+                (status == MOSQ_ERR_ERRNO)
+                ? sstrerror (errno, errbuf, sizeof (errbuf))
+                : mosquitto_strerror (status));
+
+        mosquitto_destroy (conf->mosq);
+        conf->mosq = NULL;
+        return (-1);
+    }
+
+    if (!conf->publish)
+    {
+        mosquitto_message_callback_set (conf->mosq, on_message);
+
+        status = mosquitto_subscribe (conf->mosq,
+                /* message_id = */ NULL,
+                conf->topic, conf->qos);
+        if (status != MOSQ_ERR_SUCCESS)
+        {
+            ERROR ("mqtt plugin: Subscribing to \"%s\" failed: %s",
+                    conf->topic, mosquitto_strerror (status));
+
+            mosquitto_disconnect (conf->mosq);
+            mosquitto_destroy (conf->mosq);
+            conf->mosq = NULL;
+            return (-1);
+        }
+    }
+
+    conf->connected = 1;
+    return (0);
+} /* mqtt_connect */
+
+static void *subscribers_thread (void *arg)
+{
+    mqtt_client_conf_t *conf = arg;
+    int status;
+
+    conf->loop = 1;
+
+    while (conf->loop)
+    {
+        status = mqtt_connect (conf);
+        if (status != 0)
+        {
+            sleep (1);
+            continue;
+        }
 
-static int mqtt_publish_message (mqtt_client_conf_t *conf, char *topic,
+        /* The documentation says "0" would map to the default (1000ms), but
+         * that does not work on some versions. */
+#if LIBMOSQUITTO_MAJOR == 0
+        status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
+#else
+        status = mosquitto_loop (conf->mosq,
+                /* timeout[ms] = */ 1000,
+                /* max_packets = */  100);
+#endif
+        if (status == MOSQ_ERR_CONN_LOST)
+        {
+            conf->connected = 0;
+            continue;
+        }
+        else if (status != MOSQ_ERR_SUCCESS)
+        {
+            ERROR ("mqtt plugin: mosquitto_loop failed: %s",
+                    mosquitto_strerror (status));
+            mosquitto_destroy (conf->mosq);
+            conf->mosq = NULL;
+            conf->connected = 0;
+            continue;
+        }
+
+        DEBUG ("mqtt plugin: mosquitto_loop succeeded.");
+    } /* while (conf->loop) */
+
+    pthread_exit (0);
+} /* void *subscribers_thread */
+
+static int publish (mqtt_client_conf_t *conf, char const *topic,
     void const *payload, size_t payload_len)
 {
-    char errbuf[1024];
     int status;
 
     pthread_mutex_lock (&conf->lock);
 
-    status = mqtt_reconnect_broker (conf);
+    status = mqtt_connect (conf);
     if (status != 0) {
         pthread_mutex_unlock (&conf->lock);
         ERROR ("mqtt plugin: unable to reconnect to broker");
         return (status);
     }
 
-    status = mosquitto_publish(conf->mosq,
-        /* message id */ NULL,
-        topic,
-        (int) payload_len,
-        payload,
-        /* qos */ 0,
-        /* retain */ false);
+    status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
+#if LIBMOSQUITTO_MAJOR == 0
+            (uint32_t) payload_len, payload,
+#else
+            (int) payload_len, payload,
+#endif
+            conf->qos, conf->retain);
     if (status != MOSQ_ERR_SUCCESS)
     {
+        char errbuf[1024];
         c_complain (LOG_ERR,
             &conf->complaint_cantpublish,
-            "plugin mqtt: mosquitto_publish failed: %s",
-            status == MOSQ_ERR_ERRNO ?
-            sstrerror(errno, errbuf, sizeof (errbuf)) :
-                mosquitto_strerror(status));
-        /*
-        Mark our connection "down" regardless of the error as a safety measure;
-        we will try to reconnect the next time we have to publish a message
-        */
-        conf->connected = false;
+            "mqtt plugin: mosquitto_publish failed: %s",
+            (status == MOSQ_ERR_ERRNO)
+            ? sstrerror(errno, errbuf, sizeof (errbuf))
+            : mosquitto_strerror(status));
+        /* Mark our connection "down" regardless of the error as a safety
+         * measure; we will try to reconnect the next time we have to publish a
+         * message */
+        conf->connected = 0;
 
         pthread_mutex_unlock (&conf->lock);
         return (-1);
@@ -153,244 +467,325 @@ static int mqtt_publish_message (mqtt_client_conf_t *conf, char *topic,
 
     pthread_mutex_unlock (&conf->lock);
     return (0);
-} /* mqtt_publish_message */
+} /* int publish */
 
-static int mqtt_format_metric_value (char *buf, size_t buf_len,
-    const data_set_t *data_set, const value_list_t *vl, int ds_num)
+static int format_topic (char *buf, size_t buf_len,
+    data_set_t const *ds, value_list_t const *vl,
+    mqtt_client_conf_t *conf)
 {
-    gauge_t *rates = NULL;
-    gauge_t *value = NULL;
-    size_t metric_value_len;
-    int status = 0;
-
-    memset (buf, 0, buf_len);
-
-    if (data_set->ds[ds_num].type == DS_TYPE_GAUGE)
-        value = &vl->values[ds_num].gauge;
-    else {
-        rates = uc_get_rate (data_set, vl);
-        value = &rates[ds_num];
-    }
-
-    metric_value_len = ssnprintf (buf, buf_len, "%f", *value);
-
-    if (metric_value_len >= buf_len)
-        return (-ENOMEM);
+    char name[MQTT_MAX_TOPIC_SIZE];
+    int status;
 
-    if (rates)
-        sfree (rates);
+    if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
+        return (FORMAT_VL (buf, buf_len, vl));
 
-    return (status);
-} /* mqtt_format_metric_value */
+    status = FORMAT_VL (name, sizeof (name), vl);
+    if (status != 0)
+        return (status);
 
-static int mqtt_format_message_topic (char *buf, size_t buf_len,
-    char const *prefix, const value_list_t *vl, const char *ds_name)
-{
-    size_t topic_buf_len;
-
-    memset (buf, 0, buf_len);
-
-    /*
-        MQTT message topic format:
-        [<prefix>/]<hostname>/<plugin>/<plugin instance>/<type>/<type instance>/<ds>/
-    */
-    topic_buf_len = (size_t) ssnprintf (buf, buf_len,
-        "%s/%s/%s/%s/%s/%s/%s",
-        prefix,
-        vl->host,
-        vl->plugin,
-        vl->plugin_instance[0] != '\0' ? vl->plugin_instance : "(null)",
-        vl->type,
-        vl->type_instance[0] != '\0' ? vl->type_instance : "(null)",
-        ds_name);
-
-    if (topic_buf_len >= buf_len)
-    {
-        ERROR ("mqtt_format_message_topic: topic buffer too small: "
-                "Need %zu bytes.", topic_buf_len + 1);
-        return (-ENOMEM);
-    }
+    status = ssnprintf (buf, buf_len, "%s/%s", conf->topic_prefix, name);
+    if ((status < 0) || (((size_t) status) >= buf_len))
+        return (ENOMEM);
 
     return (0);
-} /* mqtt_format_message_topic */
+} /* int format_topic */
 
-static int mqtt_format_payload (char *buf, size_t buf_len,
-    const data_set_t *data_set, const value_list_t *vl, int ds_num)
+static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
+    user_data_t *user_data)
 {
-    char metric_path[10 * DATA_MAX_NAME_LEN];
-    char metric_value[512];
-    size_t payload_buf_len;
+    mqtt_client_conf_t *conf;
+    char topic[MQTT_MAX_TOPIC_SIZE];
+    char payload[MQTT_MAX_MESSAGE_SIZE];
     int status = 0;
 
-    memset (buf, 0, buf_len);
-
-    ssnprintf (metric_path, sizeof (metric_path),
-        "%s.%s%s%s.%s%s%s%s%s",
-        vl->host,
-        vl->plugin,
-        vl->plugin_instance[0] != '\0' ? "." : "",
-        vl->plugin_instance[0] != '\0' ? vl->plugin_instance : "",
-        vl->type,
-        vl->type_instance[0] != '\0' ? "." : "",
-        vl->type_instance[0] != '\0' ? vl->type_instance : "",
-        strcmp(data_set->ds[ds_num].name, "value") != 0 ? "." : "",
-        strcmp(data_set->ds[ds_num].name, "value") != 0 ?
-            data_set->ds[ds_num].name : "");
-
-    status = mqtt_format_metric_value (metric_value,
-        sizeof (metric_value),
-        data_set,
-        vl,
-        ds_num);
+    if ((user_data == NULL) || (user_data->data == NULL))
+        return (EINVAL);
+    conf = user_data->data;
 
+    status = format_topic (topic, sizeof (topic), ds, vl, conf);
     if (status != 0)
     {
-        ERROR ("mqtt_format_payload: error with mqtt_format_metric_value");
+        ERROR ("mqtt plugin: format_topic failed with status %d.", status);
         return (status);
     }
 
-    payload_buf_len = (size_t) ssnprintf (buf, buf_len,
-        "%s %s %u",
-        metric_path,
-        metric_value,
-        (unsigned int) CDTIME_T_TO_TIME_T (vl->time));
+    status = format_values (payload, sizeof (payload),
+            ds, vl, conf->store_rates);
+    if (status != 0)
+    {
+        ERROR ("mqtt plugin: format_values failed with status %d.", status);
+        return (status);
+    }
 
-    if (payload_buf_len >= buf_len)
+    status = publish (conf, topic, payload, strlen (payload) + 1);
+    if (status != 0)
     {
-        ERROR ("mqtt_format_payload: payload buffer too small: "
-                "Need %zu bytes.", payload_buf_len + 1);
-        return (-ENOMEM);
+        ERROR ("mqtt plugin: publish failed: %s", mosquitto_strerror (status));
+        return (status);
     }
 
     return (status);
-} /* mqtt_format_payload */
+} /* mqtt_write */
 
-static int mqtt_write (const data_set_t *data_set, const value_list_t *vl,
-    user_data_t *user_data)
+/*
+ * <Publish "name">
+ *   Host "example.com"
+ *   Port 1883
+ *   ClientId "collectd"
+ *   User "guest"
+ *   Password "secret"
+ *   Prefix "collectd"
+ *   StoreRates true
+ *   Retain false
+ *   QoS 0
+ *   CACert "ca.pem"                   Enables TLS if set
+ *   CertificateFile "client-cert.pem"         optional
+ *   CertificateKeyFile "client-key.pem"               optional
+ *   TLSProtocol "tlsv1.2"             optional
+ * </Publish>
+ */
+static int mqtt_config_publisher (oconfig_item_t *ci)
 {
-    struct mqtt_client_conf *conf;
-    char msg_topic[MQTT_MAX_TOPIC_SIZE];
-    char msg_payload[MQTT_MAX_MESSAGE_SIZE];
-    int status = 0;
-    int i;
-
-    if (user_data == NULL)
-        return (EINVAL);
-
-    conf = user_data->data;
+    mqtt_client_conf_t *conf;
+    char cb_name[1024];
+    int status;
 
-    if (!conf->connected)
+    conf = calloc (1, sizeof (*conf));
+    if (conf == NULL)
     {
-        status = mqtt_reconnect_broker (conf);
+        ERROR ("mqtt plugin: calloc failed.");
+        return (-1);
+    }
+    conf->publish = 1;
 
-        if (status != 0) {
-            ERROR ("plugin mqtt: unable to reconnect to broker");
-            return (status);
-        }
+    conf->name = NULL;
+    status = cf_util_get_string (ci, &conf->name);
+    if (status != 0)
+    {
+        mqtt_free (conf);
+        return (status);
     }
 
-    for (i = 0; i < data_set->ds_num; i++)
+    conf->host = strdup (MQTT_DEFAULT_HOST);
+    conf->port = MQTT_DEFAULT_PORT;
+    conf->client_id = NULL;
+    conf->qos = 0;
+    conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
+    conf->store_rates = 1;
+
+    status = pthread_mutex_init (&conf->lock, NULL);
+    if (status != 0)
     {
-        status = mqtt_format_message_topic (msg_topic, sizeof (msg_topic),
-            conf->topic_prefix, vl, data_set->ds[i].name);
-        if (status != 0)
-        {
-            ERROR ("plugin mqtt: error with mqtt_format_message_topic");
-            return (status);
-        }
+      mqtt_free (conf);
+      return (status);
+    }
 
-        status = mqtt_format_payload (msg_payload,
-            sizeof (msg_payload),
-            data_set,
-            vl,
-            i);
+    C_COMPLAIN_INIT (&conf->complaint_cantpublish);
 
-        if (status != 0)
+    for (int i = 0; i < ci->children_num; i++)
+    {
+        oconfig_item_t *child = ci->children + i;
+        if (strcasecmp ("Host", child->key) == 0)
+            cf_util_get_string (child, &conf->host);
+        else if (strcasecmp ("Port", child->key) == 0)
         {
-            ERROR ("mqtt_write: error with mqtt_format_payload");
-            return (status);
+            int tmp = cf_util_get_port_number (child);
+            if (tmp < 0)
+                ERROR ("mqtt plugin: Invalid port number.");
+            else
+                conf->port = tmp;
         }
-
-        status = mqtt_publish_message (conf,
-            msg_topic,
-            msg_payload,
-            sizeof (msg_payload));
-        if (status != 0)
+        else if (strcasecmp ("ClientId", child->key) == 0)
+            cf_util_get_string (child, &conf->client_id);
+        else if (strcasecmp ("User", child->key) == 0)
+            cf_util_get_string (child, &conf->username);
+        else if (strcasecmp ("Password", child->key) == 0)
+            cf_util_get_string (child, &conf->password);
+        else if (strcasecmp ("QoS", child->key) == 0)
         {
-            ERROR ("plugin mqtt: unable to publish message");
-            return (status);
+            int tmp = -1;
+            status = cf_util_get_int (child, &tmp);
+            if ((status != 0) || (tmp < 0) || (tmp > 2))
+                ERROR ("mqtt plugin: Not a valid QoS setting.");
+            else
+                conf->qos = tmp;
         }
-
-        DEBUG ("\x1B[36m[debug]\x1B[0m\x1B[37m mqtt_write[%02X]\x1B[0m "
-            "published message: topic=%s payload=%s",
-            (unsigned)pthread_self(),
-            msg_topic,
-            msg_payload);
+        else if (strcasecmp ("Prefix", child->key) == 0)
+            cf_util_get_string (child, &conf->topic_prefix);
+        else if (strcasecmp ("StoreRates", child->key) == 0)
+            cf_util_get_boolean (child, &conf->store_rates);
+        else if (strcasecmp ("Retain", child->key) == 0)
+            cf_util_get_boolean (child, &conf->retain);
+        else if (strcasecmp ("CACert", child->key) == 0)
+            cf_util_get_string (child, &conf->cacertificatefile);
+        else if (strcasecmp ("CertificateFile", child->key) == 0)
+            cf_util_get_string (child, &conf->certificatefile);
+        else if (strcasecmp ("CertificateKeyFile", child->key) == 0)
+            cf_util_get_string (child, &conf->certificatekeyfile);
+        else if (strcasecmp ("TLSProtocol", child->key) == 0)
+            cf_util_get_string (child, &conf->tlsprotocol);
+        else if (strcasecmp ("CipherSuite", child->key) == 0)
+            cf_util_get_string (child, &conf->ciphersuite);
+        else
+            ERROR ("mqtt plugin: Unknown config option: %s", child->key);
     }
 
-    return (status);
-} /* mqtt_write */
+    ssnprintf (cb_name, sizeof (cb_name), "mqtt/%s", conf->name);
+    plugin_register_write (cb_name, mqtt_write, &(user_data_t) {
+                .data = conf,
+            });
+    return (0);
+} /* mqtt_config_publisher */
 
-static int mqtt_config (oconfig_item_t *ci)
+/*
+ * <Subscribe "name">
+ *   Host "example.com"
+ *   Port 1883
+ *   ClientId "collectd"
+ *   User "guest"
+ *   Password "secret"
+ *   Topic "collectd/#"
+ * </Subscribe>
+ */
+static int mqtt_config_subscriber (oconfig_item_t *ci)
 {
-    struct mqtt_client_conf *conf;
-    user_data_t user_data;
-    char errbuf[1024];
+    mqtt_client_conf_t **tmp;
+    mqtt_client_conf_t *conf;
     int status;
 
-    DEBUG ("\x1B[36m[debug]\x1B[0m\x1B[37m mqtt_config[%02X]\x1B[0m ",
-        (unsigned)pthread_self());
-
-    conf = malloc (sizeof (*conf));
+    conf = calloc (1, sizeof (*conf));
     if (conf == NULL)
     {
-        ERROR ("write_mqtt plugin: malloc failed.");
+        ERROR ("mqtt plugin: calloc failed.");
         return (-1);
     }
+    conf->publish = 0;
 
-    memset (conf, 0, sizeof (*conf));
+    conf->name = NULL;
+    status = cf_util_get_string (ci, &conf->name);
+    if (status != 0)
+    {
+        mqtt_free (conf);
+        return (status);
+    }
 
-    conf->connected = false;
-    conf->host = MQTT_DEFAULT_HOST;
+    conf->host = strdup (MQTT_DEFAULT_HOST);
     conf->port = MQTT_DEFAULT_PORT;
-    conf->client_id = MQTT_DEFAULT_CLIENT_ID;
-    conf->topic_prefix = MQTT_DEFAULT_TOPIC_PREFIX;
-    C_COMPLAIN_INIT (&conf->complaint_cantpublish);
+    conf->client_id = NULL;
+    conf->qos = 2;
+    conf->topic = strdup (MQTT_DEFAULT_TOPIC);
+    conf->clean_session = 1;
 
-    memset (&user_data, 0, sizeof (user_data));
-    user_data.data = conf;
+    status = pthread_mutex_init (&conf->lock, NULL);
+    if (status != 0)
+    {
+      mqtt_free (conf);
+      return (status);
+    }
 
-    conf->mosq = mosquitto_new (conf->client_id, /* user data = */ conf);
-    if (conf->mosq == NULL)
+    C_COMPLAIN_INIT (&conf->complaint_cantpublish);
+
+    for (int i = 0; i < ci->children_num; i++)
     {
-        ERROR ("mqtt plugin: mosquitto_new failed");
-        free (conf);
-        return (-1);
+        oconfig_item_t *child = ci->children + i;
+        if (strcasecmp ("Host", child->key) == 0)
+            cf_util_get_string (child, &conf->host);
+        else if (strcasecmp ("Port", child->key) == 0)
+        {
+            status = cf_util_get_port_number (child);
+            if (status < 0)
+                ERROR ("mqtt plugin: Invalid port number.");
+            else
+                conf->port = status;
+        }
+        else if (strcasecmp ("ClientId", child->key) == 0)
+            cf_util_get_string (child, &conf->client_id);
+        else if (strcasecmp ("User", child->key) == 0)
+            cf_util_get_string (child, &conf->username);
+        else if (strcasecmp ("Password", child->key) == 0)
+            cf_util_get_string (child, &conf->password);
+        else if (strcasecmp ("QoS", child->key) == 0)
+        {
+            int qos = -1;
+            status = cf_util_get_int (child, &qos);
+            if ((status != 0) || (qos < 0) || (qos > 2))
+                ERROR ("mqtt plugin: Not a valid QoS setting.");
+            else
+                conf->qos = qos;
+        }
+        else if (strcasecmp ("Topic", child->key) == 0)
+            cf_util_get_string (child, &conf->topic);
+        else if (strcasecmp ("CleanSession", child->key) == 0)
+            cf_util_get_boolean (child, &conf->clean_session);
+        else
+            ERROR ("mqtt plugin: Unknown config option: %s", child->key);
     }
 
-    status = mosquitto_connect (conf->mosq, conf->host, conf->port,
-            /* keepalive = */ 10, /* clean session = */ 1);
-    if (status != MOSQ_ERR_SUCCESS) {
-        ERROR ("mqtt_config: mosquitto_connect failed: %s",
-            (status == MOSQ_ERR_ERRNO ?
-                sstrerror(errno, errbuf, sizeof (errbuf)) :
-                mosquitto_strerror (status)));
+    tmp = realloc (subscribers, sizeof (*subscribers) * (subscribers_num + 1) );
+    if (tmp == NULL)
+    {
+        ERROR ("mqtt plugin: realloc failed.");
+        mqtt_free (conf);
         return (-1);
     }
+    subscribers = tmp;
+    subscribers[subscribers_num] = conf;
+    subscribers_num++;
 
-    DEBUG ("mqtt plugin: successfully connected to broker \"%s:%d\"",
-        conf->host, conf->port);
-
-    conf->connected = true;
+    return (0);
+} /* mqtt_config_subscriber */
 
-    plugin_register_write ("mqtt", mqtt_write, &user_data);
+/*
+ * <Plugin mqtt>
+ *   <Publish "name">
+ *     # ...
+ *   </Publish>
+ *   <Subscribe "name">
+ *     # ...
+ *   </Subscribe>
+ * </Plugin>
+ */
+static int mqtt_config (oconfig_item_t *ci)
+{
+    for (int i = 0; i < ci->children_num; i++)
+    {
+        oconfig_item_t *child = ci->children + i;
+
+        if (strcasecmp ("Publish", child->key) == 0)
+            mqtt_config_publisher (child);
+        else if (strcasecmp ("Subscribe", child->key) == 0)
+            mqtt_config_subscriber (child);
+        else
+            ERROR ("mqtt plugin: Unknown config option: %s", child->key);
+    }
 
     return (0);
-} /* mqtt_config */
+} /* int mqtt_config */
 
 static int mqtt_init (void)
 {
-    mosquitto_lib_init();
+    mosquitto_lib_init ();
+
+    for (size_t i = 0; i < subscribers_num; i++)
+    {
+        int status;
+
+        if (subscribers[i]->loop)
+            continue;
+
+        status = plugin_thread_create (&subscribers[i]->thread,
+                /* attrs = */ NULL,
+                /* func  = */ subscribers_thread,
+                /* args  = */ subscribers[i],
+                /* name  = */ "mqtt");
+        if (status != 0)
+        {
+            char errbuf[1024];
+            ERROR ("mqtt plugin: pthread_create failed: %s",
+                    sstrerror (errno, errbuf, sizeof (errbuf)));
+            continue;
+        }
+    }
 
     return (0);
 } /* mqtt_init */