mqtt, gps: add name parameter to plugin_thread_create()
[collectd.git] / src / mqtt.c
index 210d38c..32304f0 100644 (file)
  * Authors:
  *   Marc Falzon <marc at baha dot mu>
  *   Florian octo Forster <octo at collectd.org>
+ *   Jan-Piet Mens <jpmens at gmail.com>
  **/
 
 // Reference: http://mosquitto.org/api/files/mosquitto-h.html
 
 
 #include "collectd.h"
+
 #include "common.h"
 #include "plugin.h"
-#include "utils_cache.h"
 #include "utils_complain.h"
 
-#include <pthread.h>
-
 #include <mosquitto.h>
 
 #define MQTT_MAX_TOPIC_SIZE         1024
@@ -48,6 +47,9 @@
 #ifndef MQTT_KEEPALIVE
 # define MQTT_KEEPALIVE 60
 #endif
+#ifndef SSL_VERIFY_PEER
+# define SSL_VERIFY_PEER  1
+#endif
 
 
 /*
@@ -67,6 +69,11 @@ struct mqtt_client_conf
     char               *username;
     char               *password;
     int                 qos;
+    char                *cacertificatefile;
+    char                *certificatefile;
+    char                *certificatekeyfile;
+    char                *tlsprotocol;
+    char                *ciphersuite;
 
     /* For publishing */
     char               *topic_prefix;
@@ -138,11 +145,9 @@ static void mqtt_free (mqtt_client_conf_t *conf)
 
 static char *strip_prefix (char *topic)
 {
-    size_t num;
-    size_t i;
+    size_t num = 0;
 
-    num = 0;
-    for (i = 0; topic[i] != 0; i++)
+    for (size_t i = 0; topic[i] != 0; i++)
         if (topic[i] == '/')
             num++;
 
@@ -176,9 +181,10 @@ static void on_message (
     char *payload;
     int status;
 
-    if ((msg->payloadlen <= 0)
-            || (((uint8_t *) msg->payload)[msg->payloadlen - 1] != 0))
+    if (msg->payloadlen <= 0) {
+        DEBUG ("mqtt plugin: message has empty payload");
         return;
+    }
 
     topic = strdup (msg->topic);
     name = strip_prefix (topic);
@@ -207,7 +213,16 @@ static void on_message (
     }
     vl.values_len = ds->ds_num;
 
-    payload = strdup ((void *) msg->payload);
+    payload = malloc (msg->payloadlen+1);
+    if (payload == NULL)
+    {
+        ERROR ("mqtt plugin: malloc for payload buffer failed.");
+        sfree (vl.values);
+        return;
+    }
+    memmove (payload, msg->payload, msg->payloadlen);
+    payload[msg->payloadlen] = 0;
+
     DEBUG ("mqtt plugin: payload = \"%s\"", payload);
     status = parse_values (payload, &vl, ds);
     if (status != 0)
@@ -277,6 +292,35 @@ static int mqtt_connect (mqtt_client_conf_t *conf)
         return (-1);
     }
 
+#if LIBMOSQUITTO_MAJOR != 0
+    if (conf->cacertificatefile) {
+        status = mosquitto_tls_set(conf->mosq, conf->cacertificatefile, NULL,
+                                  conf->certificatefile, conf->certificatekeyfile, /* pw_callback */NULL);
+        if (status != MOSQ_ERR_SUCCESS) {
+            ERROR ("mqtt plugin: cannot mosquitto_tls_set: %s", mosquitto_strerror(status));
+            mosquitto_destroy (conf->mosq);
+            conf->mosq = NULL;
+            return (-1);
+        }
+
+        status = mosquitto_tls_opts_set(conf->mosq, SSL_VERIFY_PEER, conf->tlsprotocol, conf->ciphersuite);
+        if (status != MOSQ_ERR_SUCCESS) {
+            ERROR ("mqtt plugin: cannot mosquitto_tls_opts_set: %s", mosquitto_strerror(status));
+            mosquitto_destroy (conf->mosq);
+            conf->mosq = NULL;
+            return (-1);
+        }
+
+        status = mosquitto_tls_insecure_set(conf->mosq, false);
+        if (status != MOSQ_ERR_SUCCESS) {
+            ERROR ("mqtt plugin: cannot mosquitto_tls_insecure_set: %s", mosquitto_strerror(status));
+            mosquitto_destroy (conf->mosq);
+            conf->mosq = NULL;
+            return (-1);
+        }
+    }
+#endif
+
     if (conf->username && conf->password)
     {
         status = mosquitto_username_pw_set (conf->mosq, conf->username, conf->password);
@@ -407,11 +451,11 @@ static int publish (mqtt_client_conf_t *conf, char const *topic,
     {
         char errbuf[1024];
         c_complain (LOG_ERR,
-                &conf->complaint_cantpublish,
-                "plugin mqtt: mosquitto_publish failed: %s",
-                status == MOSQ_ERR_ERRNO ?
-                sstrerror(errno, errbuf, sizeof (errbuf)) :
-                mosquitto_strerror(status));
+            &conf->complaint_cantpublish,
+            "mqtt plugin: mosquitto_publish failed: %s",
+            (status == MOSQ_ERR_ERRNO)
+            ? sstrerror(errno, errbuf, sizeof (errbuf))
+            : mosquitto_strerror(status));
         /* Mark our connection "down" regardless of the error as a safety
          * measure; we will try to reconnect the next time we have to publish a
          * message */
@@ -494,20 +538,22 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
  *   StoreRates true
  *   Retain false
  *   QoS 0
+ *   CACert "ca.pem"                   Enables TLS if set
+ *   CertificateFile "client-cert.pem"         optional
+ *   CertificateKeyFile "client-key.pem"               optional
+ *   TLSProtocol "tlsv1.2"             optional
  * </Publish>
  */
 static int mqtt_config_publisher (oconfig_item_t *ci)
 {
     mqtt_client_conf_t *conf;
     char cb_name[1024];
-    user_data_t user_data;
     int status;
-    int i;
 
     conf = calloc (1, sizeof (*conf));
     if (conf == NULL)
     {
-        ERROR ("mqtt plugin: malloc failed.");
+        ERROR ("mqtt plugin: calloc failed.");
         return (-1);
     }
     conf->publish = 1;
@@ -527,9 +573,16 @@ static int mqtt_config_publisher (oconfig_item_t *ci)
     conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
     conf->store_rates = 1;
 
+    status = pthread_mutex_init (&conf->lock, NULL);
+    if (status != 0)
+    {
+      mqtt_free (conf);
+      return (status);
+    }
+
     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
 
-    for (i = 0; i < ci->children_num; i++)
+    for (int i = 0; i < ci->children_num; i++)
     {
         oconfig_item_t *child = ci->children + i;
         if (strcasecmp ("Host", child->key) == 0)
@@ -563,15 +616,24 @@ static int mqtt_config_publisher (oconfig_item_t *ci)
             cf_util_get_boolean (child, &conf->store_rates);
         else if (strcasecmp ("Retain", child->key) == 0)
             cf_util_get_boolean (child, &conf->retain);
+        else if (strcasecmp ("CACert", child->key) == 0)
+            cf_util_get_string (child, &conf->cacertificatefile);
+        else if (strcasecmp ("CertificateFile", child->key) == 0)
+            cf_util_get_string (child, &conf->certificatefile);
+        else if (strcasecmp ("CertificateKeyFile", child->key) == 0)
+            cf_util_get_string (child, &conf->certificatekeyfile);
+        else if (strcasecmp ("TLSProtocol", child->key) == 0)
+            cf_util_get_string (child, &conf->tlsprotocol);
+        else if (strcasecmp ("CipherSuite", child->key) == 0)
+            cf_util_get_string (child, &conf->ciphersuite);
         else
             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
     }
 
     ssnprintf (cb_name, sizeof (cb_name), "mqtt/%s", conf->name);
-    memset (&user_data, 0, sizeof (user_data));
-    user_data.data = conf;
-
-    plugin_register_write (cb_name, mqtt_write, &user_data);
+    plugin_register_write (cb_name, mqtt_write, &(user_data_t) {
+                .data = conf,
+            });
     return (0);
 } /* mqtt_config_publisher */
 
@@ -583,19 +645,18 @@ static int mqtt_config_publisher (oconfig_item_t *ci)
  *   User "guest"
  *   Password "secret"
  *   Topic "collectd/#"
- * </Publish>
+ * </Subscribe>
  */
 static int mqtt_config_subscriber (oconfig_item_t *ci)
 {
     mqtt_client_conf_t **tmp;
     mqtt_client_conf_t *conf;
     int status;
-    int i;
 
     conf = calloc (1, sizeof (*conf));
     if (conf == NULL)
     {
-        ERROR ("mqtt plugin: malloc failed.");
+        ERROR ("mqtt plugin: calloc failed.");
         return (-1);
     }
     conf->publish = 0;
@@ -615,20 +676,27 @@ static int mqtt_config_subscriber (oconfig_item_t *ci)
     conf->topic = strdup (MQTT_DEFAULT_TOPIC);
     conf->clean_session = 1;
 
+    status = pthread_mutex_init (&conf->lock, NULL);
+    if (status != 0)
+    {
+      mqtt_free (conf);
+      return (status);
+    }
+
     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
 
-    for (i = 0; i < ci->children_num; i++)
+    for (int i = 0; i < ci->children_num; i++)
     {
         oconfig_item_t *child = ci->children + i;
         if (strcasecmp ("Host", child->key) == 0)
             cf_util_get_string (child, &conf->host);
         else if (strcasecmp ("Port", child->key) == 0)
         {
-            int tmp = cf_util_get_port_number (child);
-            if (tmp < 0)
+            status = cf_util_get_port_number (child);
+            if (status < 0)
                 ERROR ("mqtt plugin: Invalid port number.");
             else
-                conf->port = tmp;
+                conf->port = status;
         }
         else if (strcasecmp ("ClientId", child->key) == 0)
             cf_util_get_string (child, &conf->client_id);
@@ -638,12 +706,12 @@ static int mqtt_config_subscriber (oconfig_item_t *ci)
             cf_util_get_string (child, &conf->password);
         else if (strcasecmp ("QoS", child->key) == 0)
         {
-            int tmp = -1;
-            status = cf_util_get_int (child, &tmp);
-            if ((status != 0) || (tmp < 0) || (tmp > 2))
+            int qos = -1;
+            status = cf_util_get_int (child, &qos);
+            if ((status != 0) || (qos < 0) || (qos > 2))
                 ERROR ("mqtt plugin: Not a valid QoS setting.");
             else
-                conf->qos = tmp;
+                conf->qos = qos;
         }
         else if (strcasecmp ("Topic", child->key) == 0)
             cf_util_get_string (child, &conf->topic);
@@ -653,7 +721,7 @@ static int mqtt_config_subscriber (oconfig_item_t *ci)
             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
     }
 
-    tmp = realloc (subscribers, sizeof (*subscribers) * subscribers_num);
+    tmp = realloc (subscribers, sizeof (*subscribers) * (subscribers_num + 1) );
     if (tmp == NULL)
     {
         ERROR ("mqtt plugin: realloc failed.");
@@ -679,9 +747,7 @@ static int mqtt_config_subscriber (oconfig_item_t *ci)
  */
 static int mqtt_config (oconfig_item_t *ci)
 {
-    int i;
-
-    for (i = 0; i < ci->children_num; i++)
+    for (int i = 0; i < ci->children_num; i++)
     {
         oconfig_item_t *child = ci->children + i;
 
@@ -698,11 +764,9 @@ static int mqtt_config (oconfig_item_t *ci)
 
 static int mqtt_init (void)
 {
-    size_t i;
-
     mosquitto_lib_init ();
 
-    for (i = 0; i < subscribers_num; i++)
+    for (size_t i = 0; i < subscribers_num; i++)
     {
         int status;
 
@@ -712,7 +776,8 @@ static int mqtt_init (void)
         status = plugin_thread_create (&subscribers[i]->thread,
                 /* attrs = */ NULL,
                 /* func  = */ subscribers_thread,
-                /* args  = */ subscribers[i]);
+                /* args  = */ subscribers[i],
+                /* name  = */ "mqtt");
         if (status != 0)
         {
             char errbuf[1024];