X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fmqtt.c;h=48c34edc036897e7853599bfc6b0a6e197be2419;hp=36e657786fb59818e5ebc751c36395551d73b32a;hb=ec51ddee94fa2ba1e01fe0e336ccc9c190a198ff;hpb=6026e3162e522b133d10596710527d24c2921b55 diff --git a/src/mqtt.c b/src/mqtt.c index 36e65778..48c34edc 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -54,11 +54,11 @@ * Data types */ struct mqtt_client_conf { - _Bool publish; + bool publish; char *name; struct mosquitto *mosq; - _Bool connected; + bool connected; char *host; int port; @@ -74,22 +74,22 @@ struct mqtt_client_conf { /* For publishing */ char *topic_prefix; - _Bool store_rates; - _Bool retain; + bool store_rates; + bool retain; /* For subscribing */ pthread_t thread; - _Bool loop; + bool loop; char *topic; - _Bool clean_session; + bool clean_session; c_complain_t complaint_cantpublish; pthread_mutex_t lock; }; typedef struct mqtt_client_conf mqtt_client_conf_t; -static mqtt_client_conf_t **subscribers = NULL; -static size_t subscribers_num = 0; +static mqtt_client_conf_t **subscribers; +static size_t subscribers_num; /* * Functions @@ -141,7 +141,7 @@ static void mqtt_free(mqtt_client_conf_t *conf) { if (conf->connected) (void)mosquitto_disconnect(conf->mosq); - conf->connected = 0; + conf->connected = false; (void)mosquitto_destroy(conf->mosq); sfree(conf->host); @@ -247,14 +247,12 @@ static int mqtt_reconnect(mqtt_client_conf_t *conf) { status = mosquitto_reconnect(conf->mosq); if (status != MOSQ_ERR_SUCCESS) { - char errbuf[1024]; ERROR("mqtt_connect_broker: mosquitto_connect failed: %s", - (status == MOSQ_ERR_ERRNO) ? sstrerror(errno, errbuf, sizeof(errbuf)) - : mosquitto_strerror(status)); + (status == MOSQ_ERR_ERRNO) ? STRERRNO : mosquitto_strerror(status)); return -1; } - conf->connected = 1; + conf->connected = true; c_release(LOG_INFO, &conf->complaint_cantpublish, "mqtt plugin: successfully reconnected to broker \"%s:%d\"", @@ -325,11 +323,8 @@ static int mqtt_connect(mqtt_client_conf_t *conf) { status = mosquitto_username_pw_set(conf->mosq, conf->username, conf->password); if (status != MOSQ_ERR_SUCCESS) { - char errbuf[1024]; ERROR("mqtt plugin: mosquitto_username_pw_set failed: %s", - (status == MOSQ_ERR_ERRNO) - ? sstrerror(errno, errbuf, sizeof(errbuf)) - : mosquitto_strerror(status)); + (status == MOSQ_ERR_ERRNO) ? STRERRNO : mosquitto_strerror(status)); mosquitto_destroy(conf->mosq); conf->mosq = NULL; @@ -346,10 +341,8 @@ static int mqtt_connect(mqtt_client_conf_t *conf) { mosquitto_connect(conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE); #endif if (status != MOSQ_ERR_SUCCESS) { - char errbuf[1024]; ERROR("mqtt plugin: mosquitto_connect failed: %s", - (status == MOSQ_ERR_ERRNO) ? sstrerror(errno, errbuf, sizeof(errbuf)) - : mosquitto_strerror(status)); + (status == MOSQ_ERR_ERRNO) ? STRERRNO : mosquitto_strerror(status)); mosquitto_destroy(conf->mosq); conf->mosq = NULL; @@ -373,7 +366,7 @@ static int mqtt_connect(mqtt_client_conf_t *conf) { } } - conf->connected = 1; + conf->connected = true; return 0; } /* mqtt_connect */ @@ -400,14 +393,14 @@ static void *subscribers_thread(void *arg) { /* max_packets = */ 100); #endif if (status == MOSQ_ERR_CONN_LOST) { - conf->connected = 0; + conf->connected = false; continue; } else if (status != MOSQ_ERR_SUCCESS) { ERROR("mqtt plugin: mosquitto_loop failed: %s", mosquitto_strerror(status)); mosquitto_destroy(conf->mosq); conf->mosq = NULL; - conf->connected = 0; + conf->connected = false; continue; } @@ -438,16 +431,14 @@ static int publish(mqtt_client_conf_t *conf, char const *topic, #endif conf->qos, conf->retain); if (status != MOSQ_ERR_SUCCESS) { - char errbuf[1024]; c_complain(LOG_ERR, &conf->complaint_cantpublish, "mqtt plugin: mosquitto_publish failed: %s", - (status == MOSQ_ERR_ERRNO) - ? sstrerror(errno, errbuf, sizeof(errbuf)) - : mosquitto_strerror(status)); + (status == MOSQ_ERR_ERRNO) ? STRERRNO + : mosquitto_strerror(status)); /* Mark our connection "down" regardless of the error as a safety * measure; we will try to reconnect the next time we have to publish a * message */ - conf->connected = 0; + conf->connected = false; mosquitto_disconnect(conf->mosq); pthread_mutex_unlock(&conf->lock); @@ -471,12 +462,12 @@ static int format_topic(char *buf, size_t buf_len, data_set_t const *ds, if (status != 0) return status; - status = ssnprintf(buf, buf_len, "%s/%s", conf->topic_prefix, name); + status = snprintf(buf, buf_len, "%s/%s", conf->topic_prefix, name); if ((status < 0) || (((size_t)status) >= buf_len)) return ENOMEM; - while((c = strchr(buf, '#')) || (c = strchr(buf, '+'))) { - *c = '_'; + while ((c = strchr(buf, '#')) || (c = strchr(buf, '+'))) { + *c = '_'; } return 0; @@ -525,10 +516,10 @@ static int mqtt_write(const data_set_t *ds, const value_list_t *vl, * StoreRates true * Retain false * QoS 0 - * CACert "ca.pem" Enables TLS if set - * CertificateFile "client-cert.pem" optional - * CertificateKeyFile "client-key.pem" optional - * TLSProtocol "tlsv1.2" optional + * CACert "ca.pem" Enables TLS if set + * CertificateFile "client-cert.pem" optional + * CertificateKeyFile "client-key.pem" optional + * TLSProtocol "tlsv1.2" optional * */ static int mqtt_config_publisher(oconfig_item_t *ci) { @@ -541,7 +532,7 @@ static int mqtt_config_publisher(oconfig_item_t *ci) { ERROR("mqtt plugin: calloc failed."); return -1; } - conf->publish = 1; + conf->publish = true; conf->name = NULL; status = cf_util_get_string(ci, &conf->name); @@ -555,7 +546,7 @@ static int mqtt_config_publisher(oconfig_item_t *ci) { conf->client_id = NULL; conf->qos = 0; conf->topic_prefix = strdup(MQTT_DEFAULT_TOPIC_PREFIX); - conf->store_rates = 1; + conf->store_rates = true; status = pthread_mutex_init(&conf->lock, NULL); if (status != 0) { @@ -608,10 +599,11 @@ static int mqtt_config_publisher(oconfig_item_t *ci) { ERROR("mqtt plugin: Unknown config option: %s", child->key); } - ssnprintf(cb_name, sizeof(cb_name), "mqtt/%s", conf->name); - plugin_register_write(cb_name, mqtt_write, &(user_data_t){ - .data = conf, - }); + snprintf(cb_name, sizeof(cb_name), "mqtt/%s", conf->name); + plugin_register_write(cb_name, mqtt_write, + &(user_data_t){ + .data = conf, + }); return 0; } /* mqtt_config_publisher */ @@ -623,6 +615,10 @@ static int mqtt_config_publisher(oconfig_item_t *ci) { * User "guest" * Password "secret" * Topic "collectd/#" + * CACert "ca.pem" Enables TLS if set + * CertificateFile "client-cert.pem" optional + * CertificateKeyFile "client-key.pem" optional + * TLSProtocol "tlsv1.2" optional * */ static int mqtt_config_subscriber(oconfig_item_t *ci) { @@ -635,7 +631,7 @@ static int mqtt_config_subscriber(oconfig_item_t *ci) { ERROR("mqtt plugin: calloc failed."); return -1; } - conf->publish = 0; + conf->publish = false; conf->name = NULL; status = cf_util_get_string(ci, &conf->name); @@ -649,7 +645,7 @@ static int mqtt_config_subscriber(oconfig_item_t *ci) { conf->client_id = NULL; conf->qos = 2; conf->topic = strdup(MQTT_DEFAULT_TOPIC); - conf->clean_session = 1; + conf->clean_session = true; status = pthread_mutex_init(&conf->lock, NULL); if (status != 0) { @@ -686,6 +682,16 @@ static int mqtt_config_subscriber(oconfig_item_t *ci) { cf_util_get_string(child, &conf->topic); else if (strcasecmp("CleanSession", child->key) == 0) cf_util_get_boolean(child, &conf->clean_session); + else if (strcasecmp("CACert", child->key) == 0) + cf_util_get_string(child, &conf->cacertificatefile); + else if (strcasecmp("CertificateFile", child->key) == 0) + cf_util_get_string(child, &conf->certificatefile); + else if (strcasecmp("CertificateKeyFile", child->key) == 0) + cf_util_get_string(child, &conf->certificatekeyfile); + else if (strcasecmp("TLSProtocol", child->key) == 0) + cf_util_get_string(child, &conf->tlsprotocol); + else if (strcasecmp("CipherSuite", child->key) == 0) + cf_util_get_string(child, &conf->ciphersuite); else ERROR("mqtt plugin: Unknown config option: %s", child->key); } @@ -743,9 +749,7 @@ static int mqtt_init(void) { /* args = */ subscribers[i], /* name = */ "mqtt"); if (status != 0) { - char errbuf[1024]; - ERROR("mqtt plugin: pthread_create failed: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("mqtt plugin: pthread_create failed: %s", STRERRNO); continue; } }