Merge branch 'collectd-5.7' into collectd-5.8
[collectd.git] / src / mqtt.c
index b578b99..5164485 100644 (file)
@@ -160,17 +160,17 @@ static char *strip_prefix(char *topic) {
       num++;
 
   if (num < 2)
-    return (NULL);
+    return NULL;
 
   while (num > 2) {
     char *tmp = strchr(topic, '/');
     if (tmp == NULL)
-      return (NULL);
+      return NULL;
     topic = tmp + 1;
     num--;
   }
 
-  return (topic);
+  return topic;
 }
 
 static void on_message(
@@ -243,7 +243,7 @@ static int mqtt_reconnect(mqtt_client_conf_t *conf) {
   int status;
 
   if (conf->connected)
-    return (0);
+    return 0;
 
   status = mosquitto_reconnect(conf->mosq);
   if (status != MOSQ_ERR_SUCCESS) {
@@ -251,7 +251,7 @@ static int mqtt_reconnect(mqtt_client_conf_t *conf) {
     ERROR("mqtt_connect_broker: mosquitto_connect failed: %s",
           (status == MOSQ_ERR_ERRNO) ? sstrerror(errno, errbuf, sizeof(errbuf))
                                      : mosquitto_strerror(status));
-    return (-1);
+    return -1;
   }
 
   conf->connected = 1;
@@ -260,7 +260,7 @@ static int mqtt_reconnect(mqtt_client_conf_t *conf) {
             "mqtt plugin: successfully reconnected to broker \"%s:%d\"",
             conf->host, conf->port);
 
-  return (0);
+  return 0;
 } /* mqtt_reconnect */
 
 /* must hold conf->lock when calling. */
@@ -284,7 +284,7 @@ static int mqtt_connect(mqtt_client_conf_t *conf) {
 #endif
   if (conf->mosq == NULL) {
     ERROR("mqtt plugin: mosquitto_new failed");
-    return (-1);
+    return -1;
   }
 
 #if LIBMOSQUITTO_MAJOR != 0
@@ -297,7 +297,7 @@ static int mqtt_connect(mqtt_client_conf_t *conf) {
             mosquitto_strerror(status));
       mosquitto_destroy(conf->mosq);
       conf->mosq = NULL;
-      return (-1);
+      return -1;
     }
 
     status = mosquitto_tls_opts_set(conf->mosq, SSL_VERIFY_PEER,
@@ -307,7 +307,7 @@ static int mqtt_connect(mqtt_client_conf_t *conf) {
             mosquitto_strerror(status));
       mosquitto_destroy(conf->mosq);
       conf->mosq = NULL;
-      return (-1);
+      return -1;
     }
 
     status = mosquitto_tls_insecure_set(conf->mosq, false);
@@ -316,7 +316,7 @@ static int mqtt_connect(mqtt_client_conf_t *conf) {
             mosquitto_strerror(status));
       mosquitto_destroy(conf->mosq);
       conf->mosq = NULL;
-      return (-1);
+      return -1;
     }
   }
 #endif
@@ -333,7 +333,7 @@ static int mqtt_connect(mqtt_client_conf_t *conf) {
 
       mosquitto_destroy(conf->mosq);
       conf->mosq = NULL;
-      return (-1);
+      return -1;
     }
   }
 
@@ -353,7 +353,7 @@ static int mqtt_connect(mqtt_client_conf_t *conf) {
 
     mosquitto_destroy(conf->mosq);
     conf->mosq = NULL;
-    return (-1);
+    return -1;
   }
 
   if (!conf->publish) {
@@ -369,12 +369,12 @@ static int mqtt_connect(mqtt_client_conf_t *conf) {
       mosquitto_disconnect(conf->mosq);
       mosquitto_destroy(conf->mosq);
       conf->mosq = NULL;
-      return (-1);
+      return -1;
     }
   }
 
   conf->connected = 1;
-  return (0);
+  return 0;
 } /* mqtt_connect */
 
 static void *subscribers_thread(void *arg) {
@@ -427,7 +427,7 @@ static int publish(mqtt_client_conf_t *conf, char const *topic,
   if (status != 0) {
     pthread_mutex_unlock(&conf->lock);
     ERROR("mqtt plugin: unable to reconnect to broker");
-    return (status);
+    return status;
   }
 
   status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
@@ -448,32 +448,38 @@ static int publish(mqtt_client_conf_t *conf, char const *topic,
      * measure; we will try to reconnect the next time we have to publish a
      * message */
     conf->connected = 0;
+    mosquitto_disconnect(conf->mosq);
 
     pthread_mutex_unlock(&conf->lock);
-    return (-1);
+    return -1;
   }
 
   pthread_mutex_unlock(&conf->lock);
-  return (0);
+  return 0;
 } /* int publish */
 
 static int format_topic(char *buf, size_t buf_len, data_set_t const *ds,
                         value_list_t const *vl, mqtt_client_conf_t *conf) {
   char name[MQTT_MAX_TOPIC_SIZE];
   int status;
+  char *c;
 
   if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
-    return (FORMAT_VL(buf, buf_len, vl));
+    return FORMAT_VL(buf, buf_len, vl);
 
   status = FORMAT_VL(name, sizeof(name), vl);
   if (status != 0)
-    return (status);
+    return status;
 
-  status = ssnprintf(buf, buf_len, "%s/%s", conf->topic_prefix, name);
+  status = snprintf(buf, buf_len, "%s/%s", conf->topic_prefix, name);
   if ((status < 0) || (((size_t)status) >= buf_len))
-    return (ENOMEM);
+    return ENOMEM;
 
-  return (0);
+  while ((c = strchr(buf, '#')) || (c = strchr(buf, '+'))) {
+    *c = '_';
+  }
+
+  return 0;
 } /* int format_topic */
 
 static int mqtt_write(const data_set_t *ds, const value_list_t *vl,
@@ -484,28 +490,28 @@ static int mqtt_write(const data_set_t *ds, const value_list_t *vl,
   int status = 0;
 
   if ((user_data == NULL) || (user_data->data == NULL))
-    return (EINVAL);
+    return EINVAL;
   conf = user_data->data;
 
   status = format_topic(topic, sizeof(topic), ds, vl, conf);
   if (status != 0) {
     ERROR("mqtt plugin: format_topic failed with status %d.", status);
-    return (status);
+    return status;
   }
 
   status = format_values(payload, sizeof(payload), ds, vl, conf->store_rates);
   if (status != 0) {
     ERROR("mqtt plugin: format_values failed with status %d.", status);
-    return (status);
+    return status;
   }
 
   status = publish(conf, topic, payload, strlen(payload) + 1);
   if (status != 0) {
     ERROR("mqtt plugin: publish failed: %s", mosquitto_strerror(status));
-    return (status);
+    return status;
   }
 
-  return (status);
+  return status;
 } /* mqtt_write */
 
 /*
@@ -519,10 +525,10 @@ static int mqtt_write(const data_set_t *ds, const value_list_t *vl,
  *   StoreRates true
  *   Retain false
  *   QoS 0
- *   CACert "ca.pem"                   Enables TLS if set
- *   CertificateFile "client-cert.pem"         optional
- *   CertificateKeyFile "client-key.pem"               optional
- *   TLSProtocol "tlsv1.2"             optional
+ *   CACert "ca.pem"                      Enables TLS if set
+ *   CertificateFile "client-cert.pem"   optional
+ *   CertificateKeyFile "client-key.pem"  optional
+ *   TLSProtocol "tlsv1.2"                optional
  * </Publish>
  */
 static int mqtt_config_publisher(oconfig_item_t *ci) {
@@ -533,7 +539,7 @@ static int mqtt_config_publisher(oconfig_item_t *ci) {
   conf = calloc(1, sizeof(*conf));
   if (conf == NULL) {
     ERROR("mqtt plugin: calloc failed.");
-    return (-1);
+    return -1;
   }
   conf->publish = 1;
 
@@ -541,7 +547,7 @@ static int mqtt_config_publisher(oconfig_item_t *ci) {
   status = cf_util_get_string(ci, &conf->name);
   if (status != 0) {
     mqtt_free(conf);
-    return (status);
+    return status;
   }
 
   conf->host = strdup(MQTT_DEFAULT_HOST);
@@ -554,7 +560,7 @@ static int mqtt_config_publisher(oconfig_item_t *ci) {
   status = pthread_mutex_init(&conf->lock, NULL);
   if (status != 0) {
     mqtt_free(conf);
-    return (status);
+    return status;
   }
 
   C_COMPLAIN_INIT(&conf->complaint_cantpublish);
@@ -602,11 +608,12 @@ static int mqtt_config_publisher(oconfig_item_t *ci) {
       ERROR("mqtt plugin: Unknown config option: %s", child->key);
   }
 
-  ssnprintf(cb_name, sizeof(cb_name), "mqtt/%s", conf->name);
-  plugin_register_write(cb_name, mqtt_write, &(user_data_t){
-                                                 .data = conf,
-                                             });
-  return (0);
+  snprintf(cb_name, sizeof(cb_name), "mqtt/%s", conf->name);
+  plugin_register_write(cb_name, mqtt_write,
+                        &(user_data_t){
+                            .data = conf,
+                        });
+  return 0;
 } /* mqtt_config_publisher */
 
 /*
@@ -617,6 +624,10 @@ static int mqtt_config_publisher(oconfig_item_t *ci) {
  *   User "guest"
  *   Password "secret"
  *   Topic "collectd/#"
+ *   CACert "ca.pem"                      Enables TLS if set
+ *   CertificateFile "client-cert.pem"   optional
+ *   CertificateKeyFile "client-key.pem"  optional
+ *   TLSProtocol "tlsv1.2"                optional
  * </Subscribe>
  */
 static int mqtt_config_subscriber(oconfig_item_t *ci) {
@@ -627,7 +638,7 @@ static int mqtt_config_subscriber(oconfig_item_t *ci) {
   conf = calloc(1, sizeof(*conf));
   if (conf == NULL) {
     ERROR("mqtt plugin: calloc failed.");
-    return (-1);
+    return -1;
   }
   conf->publish = 0;
 
@@ -635,7 +646,7 @@ static int mqtt_config_subscriber(oconfig_item_t *ci) {
   status = cf_util_get_string(ci, &conf->name);
   if (status != 0) {
     mqtt_free(conf);
-    return (status);
+    return status;
   }
 
   conf->host = strdup(MQTT_DEFAULT_HOST);
@@ -648,7 +659,7 @@ static int mqtt_config_subscriber(oconfig_item_t *ci) {
   status = pthread_mutex_init(&conf->lock, NULL);
   if (status != 0) {
     mqtt_free(conf);
-    return (status);
+    return status;
   }
 
   C_COMPLAIN_INIT(&conf->complaint_cantpublish);
@@ -680,6 +691,16 @@ static int mqtt_config_subscriber(oconfig_item_t *ci) {
       cf_util_get_string(child, &conf->topic);
     else if (strcasecmp("CleanSession", child->key) == 0)
       cf_util_get_boolean(child, &conf->clean_session);
+    else if (strcasecmp("CACert", child->key) == 0)
+      cf_util_get_string(child, &conf->cacertificatefile);
+    else if (strcasecmp("CertificateFile", child->key) == 0)
+      cf_util_get_string(child, &conf->certificatefile);
+    else if (strcasecmp("CertificateKeyFile", child->key) == 0)
+      cf_util_get_string(child, &conf->certificatekeyfile);
+    else if (strcasecmp("TLSProtocol", child->key) == 0)
+      cf_util_get_string(child, &conf->tlsprotocol);
+    else if (strcasecmp("CipherSuite", child->key) == 0)
+      cf_util_get_string(child, &conf->ciphersuite);
     else
       ERROR("mqtt plugin: Unknown config option: %s", child->key);
   }
@@ -688,13 +709,13 @@ static int mqtt_config_subscriber(oconfig_item_t *ci) {
   if (tmp == NULL) {
     ERROR("mqtt plugin: realloc failed.");
     mqtt_free(conf);
-    return (-1);
+    return -1;
   }
   subscribers = tmp;
   subscribers[subscribers_num] = conf;
   subscribers_num++;
 
-  return (0);
+  return 0;
 } /* mqtt_config_subscriber */
 
 /*
@@ -719,7 +740,7 @@ static int mqtt_config(oconfig_item_t *ci) {
       ERROR("mqtt plugin: Unknown config option: %s", child->key);
   }
 
-  return (0);
+  return 0;
 } /* int mqtt_config */
 
 static int mqtt_init(void) {
@@ -744,12 +765,10 @@ static int mqtt_init(void) {
     }
   }
 
-  return (0);
+  return 0;
 } /* mqtt_init */
 
 void module_register(void) {
   plugin_register_complex_config("mqtt", mqtt_config);
   plugin_register_init("mqtt", mqtt_init);
 } /* void module_register */
-
-/* vim: set sw=4 sts=4 et fdm=marker : */