num++;
if (num < 2)
- return (NULL);
+ return NULL;
while (num > 2) {
char *tmp = strchr(topic, '/');
if (tmp == NULL)
- return (NULL);
+ return NULL;
topic = tmp + 1;
num--;
}
- return (topic);
+ return topic;
}
static void on_message(
int status;
if (conf->connected)
- return (0);
+ return 0;
status = mosquitto_reconnect(conf->mosq);
if (status != MOSQ_ERR_SUCCESS) {
- char errbuf[1024];
ERROR("mqtt_connect_broker: mosquitto_connect failed: %s",
- (status == MOSQ_ERR_ERRNO) ? sstrerror(errno, errbuf, sizeof(errbuf))
- : mosquitto_strerror(status));
- return (-1);
+ (status == MOSQ_ERR_ERRNO) ? STRERRNO : mosquitto_strerror(status));
+ return -1;
}
conf->connected = 1;
"mqtt plugin: successfully reconnected to broker \"%s:%d\"",
conf->host, conf->port);
- return (0);
+ return 0;
} /* mqtt_reconnect */
/* must hold conf->lock when calling. */
#endif
if (conf->mosq == NULL) {
ERROR("mqtt plugin: mosquitto_new failed");
- return (-1);
+ return -1;
}
#if LIBMOSQUITTO_MAJOR != 0
mosquitto_strerror(status));
mosquitto_destroy(conf->mosq);
conf->mosq = NULL;
- return (-1);
+ return -1;
}
status = mosquitto_tls_opts_set(conf->mosq, SSL_VERIFY_PEER,
mosquitto_strerror(status));
mosquitto_destroy(conf->mosq);
conf->mosq = NULL;
- return (-1);
+ return -1;
}
status = mosquitto_tls_insecure_set(conf->mosq, false);
mosquitto_strerror(status));
mosquitto_destroy(conf->mosq);
conf->mosq = NULL;
- return (-1);
+ return -1;
}
}
#endif
status =
mosquitto_username_pw_set(conf->mosq, conf->username, conf->password);
if (status != MOSQ_ERR_SUCCESS) {
- char errbuf[1024];
ERROR("mqtt plugin: mosquitto_username_pw_set failed: %s",
- (status == MOSQ_ERR_ERRNO)
- ? sstrerror(errno, errbuf, sizeof(errbuf))
- : mosquitto_strerror(status));
+ (status == MOSQ_ERR_ERRNO) ? STRERRNO : mosquitto_strerror(status));
mosquitto_destroy(conf->mosq);
conf->mosq = NULL;
- return (-1);
+ return -1;
}
}
mosquitto_connect(conf->mosq, conf->host, conf->port, MQTT_KEEPALIVE);
#endif
if (status != MOSQ_ERR_SUCCESS) {
- char errbuf[1024];
ERROR("mqtt plugin: mosquitto_connect failed: %s",
- (status == MOSQ_ERR_ERRNO) ? sstrerror(errno, errbuf, sizeof(errbuf))
- : mosquitto_strerror(status));
+ (status == MOSQ_ERR_ERRNO) ? STRERRNO : mosquitto_strerror(status));
mosquitto_destroy(conf->mosq);
conf->mosq = NULL;
- return (-1);
+ return -1;
}
if (!conf->publish) {
mosquitto_disconnect(conf->mosq);
mosquitto_destroy(conf->mosq);
conf->mosq = NULL;
- return (-1);
+ return -1;
}
}
conf->connected = 1;
- return (0);
+ return 0;
} /* mqtt_connect */
static void *subscribers_thread(void *arg) {
if (status != 0) {
pthread_mutex_unlock(&conf->lock);
ERROR("mqtt plugin: unable to reconnect to broker");
- return (status);
+ return status;
}
status = mosquitto_publish(conf->mosq, /* message_id */ NULL, topic,
#endif
conf->qos, conf->retain);
if (status != MOSQ_ERR_SUCCESS) {
- char errbuf[1024];
c_complain(LOG_ERR, &conf->complaint_cantpublish,
"mqtt plugin: mosquitto_publish failed: %s",
- (status == MOSQ_ERR_ERRNO)
- ? sstrerror(errno, errbuf, sizeof(errbuf))
- : mosquitto_strerror(status));
+ (status == MOSQ_ERR_ERRNO) ? STRERRNO
+ : mosquitto_strerror(status));
/* Mark our connection "down" regardless of the error as a safety
* measure; we will try to reconnect the next time we have to publish a
* message */
mosquitto_disconnect(conf->mosq);
pthread_mutex_unlock(&conf->lock);
- return (-1);
+ return -1;
}
pthread_mutex_unlock(&conf->lock);
- return (0);
+ return 0;
} /* int publish */
static int format_topic(char *buf, size_t buf_len, data_set_t const *ds,
char *c;
if ((conf->topic_prefix == NULL) || (conf->topic_prefix[0] == 0))
- return (FORMAT_VL(buf, buf_len, vl));
+ return FORMAT_VL(buf, buf_len, vl);
status = FORMAT_VL(name, sizeof(name), vl);
if (status != 0)
- return (status);
+ return status;
- status = ssnprintf(buf, buf_len, "%s/%s", conf->topic_prefix, name);
+ status = snprintf(buf, buf_len, "%s/%s", conf->topic_prefix, name);
if ((status < 0) || (((size_t)status) >= buf_len))
- return (ENOMEM);
+ return ENOMEM;
- while((c = strchr(buf, '#')) || (c = strchr(buf, '+'))) {
- *c = '_';
+ while ((c = strchr(buf, '#')) || (c = strchr(buf, '+'))) {
+ *c = '_';
}
- return (0);
+ return 0;
} /* int format_topic */
static int mqtt_write(const data_set_t *ds, const value_list_t *vl,
int status = 0;
if ((user_data == NULL) || (user_data->data == NULL))
- return (EINVAL);
+ return EINVAL;
conf = user_data->data;
status = format_topic(topic, sizeof(topic), ds, vl, conf);
if (status != 0) {
ERROR("mqtt plugin: format_topic failed with status %d.", status);
- return (status);
+ return status;
}
status = format_values(payload, sizeof(payload), ds, vl, conf->store_rates);
if (status != 0) {
ERROR("mqtt plugin: format_values failed with status %d.", status);
- return (status);
+ return status;
}
status = publish(conf, topic, payload, strlen(payload) + 1);
if (status != 0) {
ERROR("mqtt plugin: publish failed: %s", mosquitto_strerror(status));
- return (status);
+ return status;
}
- return (status);
+ return status;
} /* mqtt_write */
/*
* StoreRates true
* Retain false
* QoS 0
- * CACert "ca.pem" Enables TLS if set
- * CertificateFile "client-cert.pem" optional
- * CertificateKeyFile "client-key.pem" optional
- * TLSProtocol "tlsv1.2" optional
+ * CACert "ca.pem" Enables TLS if set
+ * CertificateFile "client-cert.pem" optional
+ * CertificateKeyFile "client-key.pem" optional
+ * TLSProtocol "tlsv1.2" optional
* </Publish>
*/
static int mqtt_config_publisher(oconfig_item_t *ci) {
conf = calloc(1, sizeof(*conf));
if (conf == NULL) {
ERROR("mqtt plugin: calloc failed.");
- return (-1);
+ return -1;
}
conf->publish = 1;
status = cf_util_get_string(ci, &conf->name);
if (status != 0) {
mqtt_free(conf);
- return (status);
+ return status;
}
conf->host = strdup(MQTT_DEFAULT_HOST);
status = pthread_mutex_init(&conf->lock, NULL);
if (status != 0) {
mqtt_free(conf);
- return (status);
+ return status;
}
C_COMPLAIN_INIT(&conf->complaint_cantpublish);
ERROR("mqtt plugin: Unknown config option: %s", child->key);
}
- ssnprintf(cb_name, sizeof(cb_name), "mqtt/%s", conf->name);
- plugin_register_write(cb_name, mqtt_write, &(user_data_t){
- .data = conf,
- });
- return (0);
+ snprintf(cb_name, sizeof(cb_name), "mqtt/%s", conf->name);
+ plugin_register_write(cb_name, mqtt_write,
+ &(user_data_t){
+ .data = conf,
+ });
+ return 0;
} /* mqtt_config_publisher */
/*
* User "guest"
* Password "secret"
* Topic "collectd/#"
+ * CACert "ca.pem" Enables TLS if set
+ * CertificateFile "client-cert.pem" optional
+ * CertificateKeyFile "client-key.pem" optional
+ * TLSProtocol "tlsv1.2" optional
* </Subscribe>
*/
static int mqtt_config_subscriber(oconfig_item_t *ci) {
conf = calloc(1, sizeof(*conf));
if (conf == NULL) {
ERROR("mqtt plugin: calloc failed.");
- return (-1);
+ return -1;
}
conf->publish = 0;
status = cf_util_get_string(ci, &conf->name);
if (status != 0) {
mqtt_free(conf);
- return (status);
+ return status;
}
conf->host = strdup(MQTT_DEFAULT_HOST);
status = pthread_mutex_init(&conf->lock, NULL);
if (status != 0) {
mqtt_free(conf);
- return (status);
+ return status;
}
C_COMPLAIN_INIT(&conf->complaint_cantpublish);
cf_util_get_string(child, &conf->topic);
else if (strcasecmp("CleanSession", child->key) == 0)
cf_util_get_boolean(child, &conf->clean_session);
+ else if (strcasecmp("CACert", child->key) == 0)
+ cf_util_get_string(child, &conf->cacertificatefile);
+ else if (strcasecmp("CertificateFile", child->key) == 0)
+ cf_util_get_string(child, &conf->certificatefile);
+ else if (strcasecmp("CertificateKeyFile", child->key) == 0)
+ cf_util_get_string(child, &conf->certificatekeyfile);
+ else if (strcasecmp("TLSProtocol", child->key) == 0)
+ cf_util_get_string(child, &conf->tlsprotocol);
+ else if (strcasecmp("CipherSuite", child->key) == 0)
+ cf_util_get_string(child, &conf->ciphersuite);
else
ERROR("mqtt plugin: Unknown config option: %s", child->key);
}
if (tmp == NULL) {
ERROR("mqtt plugin: realloc failed.");
mqtt_free(conf);
- return (-1);
+ return -1;
}
subscribers = tmp;
subscribers[subscribers_num] = conf;
subscribers_num++;
- return (0);
+ return 0;
} /* mqtt_config_subscriber */
/*
ERROR("mqtt plugin: Unknown config option: %s", child->key);
}
- return (0);
+ return 0;
} /* int mqtt_config */
static int mqtt_init(void) {
/* args = */ subscribers[i],
/* name = */ "mqtt");
if (status != 0) {
- char errbuf[1024];
- ERROR("mqtt plugin: pthread_create failed: %s",
- sstrerror(errno, errbuf, sizeof(errbuf)));
+ ERROR("mqtt plugin: pthread_create failed: %s", STRERRNO);
continue;
}
}
- return (0);
+ return 0;
} /* mqtt_init */
void module_register(void) {