Merge branch 'collectd-5.5'
[collectd.git] / src / mqtt.c
index 98b1751..1b71d42 100644 (file)
@@ -90,6 +90,7 @@ static size_t subscribers_num = 0;
 /*
  * Functions
  */
+#if LIBMOSQUITTO_MAJOR == 0
 static char const *mosquitto_strerror (int code)
 {
     switch (code)
@@ -113,6 +114,9 @@ static char const *mosquitto_strerror (int code)
 
     return "UNKNOWN ERROR CODE";
 }
+#else
+/* provided by libmosquitto */
+#endif
 
 static void mqtt_free (mqtt_client_conf_t *conf)
 {
@@ -157,7 +161,12 @@ static char *strip_prefix (char *topic)
     return (topic);
 }
 
-static void on_message (__attribute__((unused)) void *arg,
+static void on_message (
+#if LIBMOSQUITTO_MAJOR == 0
+#else
+        __attribute__((unused)) struct mosquitto *m,
+#endif
+        __attribute__((unused)) void *arg,
         const struct mosquitto_message *msg)
 {
     value_list_t vl = VALUE_LIST_INIT;
@@ -167,7 +176,8 @@ static void on_message (__attribute__((unused)) void *arg,
     char *payload;
     int status;
 
-    if ((msg->payloadlen <= 0) || (msg->payload[msg->payloadlen - 1] != 0))
+    if ((msg->payloadlen <= 0)
+            || (((uint8_t *) msg->payload)[msg->payloadlen - 1] != 0))
         return;
 
     topic = strdup (msg->topic);
@@ -256,7 +266,11 @@ static int mqtt_connect (mqtt_client_conf_t *conf)
     else
         client_id = hostname_g;
 
+#if LIBMOSQUITTO_MAJOR == 0
     conf->mosq = mosquitto_new (client_id, /* user data = */ conf);
+#else
+    conf->mosq = mosquitto_new (client_id, conf->clean_session, /* user data = */ conf);
+#endif
     if (conf->mosq == NULL)
     {
         ERROR ("mqtt plugin: mosquitto_new failed");
@@ -280,8 +294,12 @@ static int mqtt_connect (mqtt_client_conf_t *conf)
         }
     }
 
+#if LIBMOSQUITTO_MAJOR == 0
     status = mosquitto_connect (conf->mosq, conf->host, conf->port,
             /* keepalive = */ MQTT_KEEPALIVE, /* clean session = */ conf->clean_session);
+#else
+    status = mosquitto_connect (conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
+#endif
     if (status != MOSQ_ERR_SUCCESS)
     {
         char errbuf[1024];
@@ -299,7 +317,8 @@ static int mqtt_connect (mqtt_client_conf_t *conf)
     {
         mosquitto_message_callback_set (conf->mosq, on_message);
 
-        status = mosquitto_subscribe (conf->mosq, /* mid = */ NULL,
+        status = mosquitto_subscribe (conf->mosq,
+                /* message_id = */ NULL,
                 conf->topic, conf->qos);
         if (status != MOSQ_ERR_SUCCESS)
         {
@@ -335,7 +354,13 @@ static void *subscribers_thread (void *arg)
 
         /* The documentation says "0" would map to the default (1000ms), but
          * that does not work on some versions. */
+#if LIBMOSQUITTO_MAJOR == 0
         status = mosquitto_loop (conf->mosq, /* timeout = */ 1000 /* ms */);
+#else
+        status = mosquitto_loop (conf->mosq,
+                /* timeout[ms] = */ 1000,
+                /* max_packets = */  100);
+#endif
         if (status == MOSQ_ERR_CONN_LOST)
         {
             conf->connected = 0;
@@ -371,21 +396,22 @@ static int publish (mqtt_client_conf_t *conf, char const *topic,
         return (status);
     }
 
-    status = mosquitto_publish(conf->mosq,
-            /* message id */ NULL,
-            topic,
+    status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
+#if LIBMOSQUITTO_MAJOR == 0
             (uint32_t) payload_len, payload,
-            /* qos */ conf->qos,
-            /* retain */ conf->retain);
+#else
+            (int) payload_len, payload,
+#endif
+            conf->qos, conf->retain);
     if (status != MOSQ_ERR_SUCCESS)
     {
         char errbuf[1024];
         c_complain (LOG_ERR,
-                &conf->complaint_cantpublish,
-                "plugin mqtt: mosquitto_publish failed: %s",
-                status == MOSQ_ERR_ERRNO ?
-                sstrerror(errno, errbuf, sizeof (errbuf)) :
-                mosquitto_strerror(status));
+            &conf->complaint_cantpublish,
+            "mqtt plugin: mosquitto_publish failed: %s",
+            (status == MOSQ_ERR_ERRNO)
+            ? sstrerror(errno, errbuf, sizeof (errbuf))
+            : mosquitto_strerror(status));
         /* Mark our connection "down" regardless of the error as a safety
          * measure; we will try to reconnect the next time we have to publish a
          * message */
@@ -473,6 +499,7 @@ static int mqtt_write (const data_set_t *ds, const value_list_t *vl,
 static int mqtt_config_publisher (oconfig_item_t *ci)
 {
     mqtt_client_conf_t *conf;
+    char cb_name[1024];
     user_data_t user_data;
     int status;
     int i;
@@ -496,7 +523,16 @@ static int mqtt_config_publisher (oconfig_item_t *ci)
     conf->host = strdup (MQTT_DEFAULT_HOST);
     conf->port = MQTT_DEFAULT_PORT;
     conf->client_id = NULL;
+    conf->qos = 0;
     conf->topic_prefix = strdup (MQTT_DEFAULT_TOPIC_PREFIX);
+    conf->store_rates = 1;
+
+    status = pthread_mutex_init (&conf->lock, NULL);
+    if (status != 0)
+    {
+      mqtt_free (conf);
+      return (status);
+    }
 
     C_COMPLAIN_INIT (&conf->complaint_cantpublish);
 
@@ -538,10 +574,11 @@ static int mqtt_config_publisher (oconfig_item_t *ci)
             ERROR ("mqtt plugin: Unknown config option: %s", child->key);
     }
 
+    ssnprintf (cb_name, sizeof (cb_name), "mqtt/%s", conf->name);
     memset (&user_data, 0, sizeof (user_data));
     user_data.data = conf;
 
-    plugin_register_write ("mqtt", mqtt_write, &user_data);
+    plugin_register_write (cb_name, mqtt_write, &user_data);
     return (0);
 } /* mqtt_config_publisher */
 
@@ -581,7 +618,16 @@ static int mqtt_config_subscriber (oconfig_item_t *ci)
     conf->host = strdup (MQTT_DEFAULT_HOST);
     conf->port = MQTT_DEFAULT_PORT;
     conf->client_id = NULL;
+    conf->qos = 2;
     conf->topic = strdup (MQTT_DEFAULT_TOPIC);
+    conf->clean_session = 1;
+
+    status = pthread_mutex_init (&conf->lock, NULL);
+    if (status != 0)
+    {
+      mqtt_free (conf);
+      return (status);
+    }
 
     C_COMPLAIN_INIT (&conf->complaint_cantpublish);