X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Fmqtt.c;h=516448557f94e326f934729193c63cd094a6bd58;hp=b578b99bb93505f27a344b88d512cad2d97aa432;hb=7111bb6df7628edce3a8e538b386fbe27633a191;hpb=bdd27cb013de89b4149393212aef1bf45a221dba diff --git a/src/mqtt.c b/src/mqtt.c index b578b99b..51644855 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -160,17 +160,17 @@ static char *strip_prefix(char *topic) { num++; if (num < 2) - return (NULL); + return NULL; while (num > 2) { char *tmp = strchr(topic, '/'); if (tmp == NULL) - return (NULL); + return NULL; topic = tmp + 1; num--; } - return (topic); + return topic; } static void on_message( @@ -243,7 +243,7 @@ static int mqtt_reconnect(mqtt_client_conf_t *conf) { int status; if (conf->connected) - return (0); + return 0; status = mosquitto_reconnect(conf->mosq); if (status != MOSQ_ERR_SUCCESS) { @@ -251,7 +251,7 @@ static int mqtt_reconnect(mqtt_client_conf_t *conf) { ERROR("mqtt_connect_broker: mosquitto_connect failed: %s", (status == MOSQ_ERR_ERRNO) ? sstrerror(errno, errbuf, sizeof(errbuf)) : mosquitto_strerror(status)); - return (-1); + return -1; } conf->connected = 1; @@ -260,7 +260,7 @@ static int mqtt_reconnect(mqtt_client_conf_t *conf) { "mqtt plugin: successfully reconnected to broker \"%s:%d\"", conf->host, conf->port); - return (0); + return 0; } /* mqtt_reconnect */ /* must hold conf->lock when calling. */ @@ -284,7 +284,7 @@ static int mqtt_connect(mqtt_client_conf_t *conf) { #endif if (conf->mosq == NULL) { ERROR("mqtt plugin: mosquitto_new failed"); - return (-1); + return -1; } #if LIBMOSQUITTO_MAJOR != 0 @@ -297,7 +297,7 @@ static int mqtt_connect(mqtt_client_conf_t *conf) { mosquitto_strerror(status)); mosquitto_destroy(conf->mosq); conf->mosq = NULL; - return (-1); + return -1; } status = mosquitto_tls_opts_set(conf->mosq, SSL_VERIFY_PEER, @@ -307,7 +307,7 @@ static int mqtt_connect(mqtt_client_conf_t *conf) { mosquitto_strerror(status)); mosquitto_destroy(conf->mosq); conf->mosq = NULL; - return (-1); + return -1; } status = mosquitto_tls_insecure_set(conf->mosq, false); @@ -316,7 +316,7 @@ static int mqtt_connect(mqtt_client_conf_t *conf) { mosquitto_strerror(status)); mosquitto_destroy(conf->mosq); conf->mosq = NULL; - return (-1); + return -1; } } #endif @@ -333,7 +333,7 @@ static int mqtt_connect(mqtt_client_conf_t *conf) { mosquitto_destroy(conf->mosq); conf->mosq = NULL; - return (-1); + return -1; } } @@ -353,7 +353,7 @@ static int mqtt_connect(mqtt_client_conf_t *conf) { mosquitto_destroy(conf->mosq); conf->mosq = NULL; - return (-1); + return -1; } if (!conf->publish) { @@ -369,12 +369,12 @@ static int mqtt_connect(mqtt_client_conf_t *conf) { mosquitto_disconnect(conf->mosq); mosquitto_destroy(conf->mosq); conf->mosq = NULL; - return (-1); + return -1; } } conf->connected = 1; - return (0); + return 0; } /* mqtt_connect */ static void *subscribers_thread(void *arg) { @@ -427,7 +427,7 @@ static int publish(mqtt_client_conf_t *conf, char const *topic, if (status != 0) { pthread_mutex_unlock(&conf->lock); ERROR("mqtt plugin: unable to reconnect to broker"); - return (status); + return status; } status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic, @@ -448,32 +448,38 @@ static int publish(mqtt_client_conf_t *conf, char const *topic, * measure; we will try to reconnect the next time we have to publish a * message */ conf->connected = 0; + mosquitto_disconnect(conf->mosq); pthread_mutex_unlock(&conf->lock); - return (-1); + return -1; } pthread_mutex_unlock(&conf->lock); - return (0); + return 0; } /* int publish */ static int format_topic(char *buf, size_t buf_len, data_set_t const *ds, value_list_t const *vl, mqtt_client_conf_t *conf) { char name[MQTT_MAX_TOPIC_SIZE]; int status; + char *c; if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0)) - return (FORMAT_VL(buf, buf_len, vl)); + return FORMAT_VL(buf, buf_len, vl); status = FORMAT_VL(name, sizeof(name), vl); if (status != 0) - return (status); + return status; - status = ssnprintf(buf, buf_len, "%s/%s", conf->topic_prefix, name); + status = snprintf(buf, buf_len, "%s/%s", conf->topic_prefix, name); if ((status < 0) || (((size_t)status) >= buf_len)) - return (ENOMEM); + return ENOMEM; - return (0); + while ((c = strchr(buf, '#')) || (c = strchr(buf, '+'))) { + *c = '_'; + } + + return 0; } /* int format_topic */ static int mqtt_write(const data_set_t *ds, const value_list_t *vl, @@ -484,28 +490,28 @@ static int mqtt_write(const data_set_t *ds, const value_list_t *vl, int status = 0; if ((user_data == NULL) || (user_data->data == NULL)) - return (EINVAL); + return EINVAL; conf = user_data->data; status = format_topic(topic, sizeof(topic), ds, vl, conf); if (status != 0) { ERROR("mqtt plugin: format_topic failed with status %d.", status); - return (status); + return status; } status = format_values(payload, sizeof(payload), ds, vl, conf->store_rates); if (status != 0) { ERROR("mqtt plugin: format_values failed with status %d.", status); - return (status); + return status; } status = publish(conf, topic, payload, strlen(payload) + 1); if (status != 0) { ERROR("mqtt plugin: publish failed: %s", mosquitto_strerror(status)); - return (status); + return status; } - return (status); + return status; } /* mqtt_write */ /* @@ -519,10 +525,10 @@ static int mqtt_write(const data_set_t *ds, const value_list_t *vl, * StoreRates true * Retain false * QoS 0 - * CACert "ca.pem" Enables TLS if set - * CertificateFile "client-cert.pem" optional - * CertificateKeyFile "client-key.pem" optional - * TLSProtocol "tlsv1.2" optional + * CACert "ca.pem" Enables TLS if set + * CertificateFile "client-cert.pem" optional + * CertificateKeyFile "client-key.pem" optional + * TLSProtocol "tlsv1.2" optional * */ static int mqtt_config_publisher(oconfig_item_t *ci) { @@ -533,7 +539,7 @@ static int mqtt_config_publisher(oconfig_item_t *ci) { conf = calloc(1, sizeof(*conf)); if (conf == NULL) { ERROR("mqtt plugin: calloc failed."); - return (-1); + return -1; } conf->publish = 1; @@ -541,7 +547,7 @@ static int mqtt_config_publisher(oconfig_item_t *ci) { status = cf_util_get_string(ci, &conf->name); if (status != 0) { mqtt_free(conf); - return (status); + return status; } conf->host = strdup(MQTT_DEFAULT_HOST); @@ -554,7 +560,7 @@ static int mqtt_config_publisher(oconfig_item_t *ci) { status = pthread_mutex_init(&conf->lock, NULL); if (status != 0) { mqtt_free(conf); - return (status); + return status; } C_COMPLAIN_INIT(&conf->complaint_cantpublish); @@ -602,11 +608,12 @@ static int mqtt_config_publisher(oconfig_item_t *ci) { ERROR("mqtt plugin: Unknown config option: %s", child->key); } - ssnprintf(cb_name, sizeof(cb_name), "mqtt/%s", conf->name); - plugin_register_write(cb_name, mqtt_write, &(user_data_t){ - .data = conf, - }); - return (0); + snprintf(cb_name, sizeof(cb_name), "mqtt/%s", conf->name); + plugin_register_write(cb_name, mqtt_write, + &(user_data_t){ + .data = conf, + }); + return 0; } /* mqtt_config_publisher */ /* @@ -617,6 +624,10 @@ static int mqtt_config_publisher(oconfig_item_t *ci) { * User "guest" * Password "secret" * Topic "collectd/#" + * CACert "ca.pem" Enables TLS if set + * CertificateFile "client-cert.pem" optional + * CertificateKeyFile "client-key.pem" optional + * TLSProtocol "tlsv1.2" optional * */ static int mqtt_config_subscriber(oconfig_item_t *ci) { @@ -627,7 +638,7 @@ static int mqtt_config_subscriber(oconfig_item_t *ci) { conf = calloc(1, sizeof(*conf)); if (conf == NULL) { ERROR("mqtt plugin: calloc failed."); - return (-1); + return -1; } conf->publish = 0; @@ -635,7 +646,7 @@ static int mqtt_config_subscriber(oconfig_item_t *ci) { status = cf_util_get_string(ci, &conf->name); if (status != 0) { mqtt_free(conf); - return (status); + return status; } conf->host = strdup(MQTT_DEFAULT_HOST); @@ -648,7 +659,7 @@ static int mqtt_config_subscriber(oconfig_item_t *ci) { status = pthread_mutex_init(&conf->lock, NULL); if (status != 0) { mqtt_free(conf); - return (status); + return status; } C_COMPLAIN_INIT(&conf->complaint_cantpublish); @@ -680,6 +691,16 @@ static int mqtt_config_subscriber(oconfig_item_t *ci) { cf_util_get_string(child, &conf->topic); else if (strcasecmp("CleanSession", child->key) == 0) cf_util_get_boolean(child, &conf->clean_session); + else if (strcasecmp("CACert", child->key) == 0) + cf_util_get_string(child, &conf->cacertificatefile); + else if (strcasecmp("CertificateFile", child->key) == 0) + cf_util_get_string(child, &conf->certificatefile); + else if (strcasecmp("CertificateKeyFile", child->key) == 0) + cf_util_get_string(child, &conf->certificatekeyfile); + else if (strcasecmp("TLSProtocol", child->key) == 0) + cf_util_get_string(child, &conf->tlsprotocol); + else if (strcasecmp("CipherSuite", child->key) == 0) + cf_util_get_string(child, &conf->ciphersuite); else ERROR("mqtt plugin: Unknown config option: %s", child->key); } @@ -688,13 +709,13 @@ static int mqtt_config_subscriber(oconfig_item_t *ci) { if (tmp == NULL) { ERROR("mqtt plugin: realloc failed."); mqtt_free(conf); - return (-1); + return -1; } subscribers = tmp; subscribers[subscribers_num] = conf; subscribers_num++; - return (0); + return 0; } /* mqtt_config_subscriber */ /* @@ -719,7 +740,7 @@ static int mqtt_config(oconfig_item_t *ci) { ERROR("mqtt plugin: Unknown config option: %s", child->key); } - return (0); + return 0; } /* int mqtt_config */ static int mqtt_init(void) { @@ -744,12 +765,10 @@ static int mqtt_init(void) { } } - return (0); + return 0; } /* mqtt_init */ void module_register(void) { plugin_register_complex_config("mqtt", mqtt_config); plugin_register_init("mqtt", mqtt_init); } /* void module_register */ - -/* vim: set sw=4 sts=4 et fdm=marker : */