Merge branch 'collectd-5.6' into collectd-5.7
[collectd.git] / src / amqp.c
index 9578e61..c54a1e0 100644 (file)
@@ -570,9 +570,9 @@ static int camqp_read_body(camqp_config_t *conf, /* {{{ */
   } /* while (received < body_size) */
 
   if (strcasecmp("text/collectd", content_type) == 0) {
-    status = handle_putval(stderr, body);
+    status = cmd_handle_putval(stderr, body);
     if (status != 0)
-      ERROR("amqp plugin: handle_putval failed with status %i.", status);
+      ERROR("amqp plugin: cmd_handle_putval failed with status %i.", status);
     return (status);
   } else if (strcasecmp("application/json", content_type) == 0) {
     ERROR("amqp plugin: camqp_read_body: Parsing JSON data has not "
@@ -636,24 +636,20 @@ static void *camqp_subscribe_thread(void *user_data) /* {{{ */
 
     status = camqp_connect(conf);
     if (status != 0) {
-      struct timespec ts_interval;
       ERROR("amqp plugin: camqp_connect failed. "
             "Will sleep for %.3f seconds.",
             CDTIME_T_TO_DOUBLE(interval));
-      CDTIME_T_TO_TIMESPEC(interval, &ts_interval);
-      nanosleep(&ts_interval, /* remaining = */ NULL);
+      nanosleep(&CDTIME_T_TO_TIMESPEC(interval), /* remaining = */ NULL);
       continue;
     }
 
     status = amqp_simple_wait_frame(conf->connection, &frame);
     if (status < 0) {
-      struct timespec ts_interval;
       ERROR("amqp plugin: amqp_simple_wait_frame failed. "
             "Will sleep for %.3f seconds.",
             CDTIME_T_TO_DOUBLE(interval));
       camqp_close_connection(conf);
-      CDTIME_T_TO_TIMESPEC(interval, &ts_interval);
-      nanosleep(&ts_interval, /* remaining = */ NULL);
+      nanosleep(&CDTIME_T_TO_TIMESPEC(interval), /* remaining = */ NULL);
       continue;
     }
 
@@ -695,7 +691,7 @@ static int camqp_subscribe_init(camqp_config_t *conf) /* {{{ */
   memset(tmp, 0, sizeof(*tmp));
 
   status = plugin_thread_create(tmp, /* attr = */ NULL, camqp_subscribe_thread,
-                                conf);
+                                conf, "amqp subscribe");
   if (status != 0) {
     char errbuf[1024];
     ERROR("amqp plugin: pthread_create failed: %s",
@@ -777,9 +773,9 @@ static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
   }
 
   if (conf->format == CAMQP_FORMAT_COMMAND) {
-    status = create_putval(buffer, sizeof(buffer), ds, vl);
+    status = cmd_create_putval(buffer, sizeof(buffer), ds, vl);
     if (status != 0) {
-      ERROR("amqp plugin: create_putval failed with status %i.", status);
+      ERROR("amqp plugin: cmd_create_putval failed with status %i.", status);
       return (status);
     }
   } else if (conf->format == CAMQP_FORMAT_JSON) {
@@ -935,6 +931,10 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
     else if ((strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0) && publish)
       status = cf_util_get_flag(child, &conf->graphite_flags,
                                 GRAPHITE_ALWAYS_APPEND_DS);
+    else if ((strcasecmp("GraphitePreserveSeparator", child->key) == 0) &&
+             publish)
+      status = cf_util_get_flag(child, &conf->graphite_flags,
+                                GRAPHITE_PRESERVE_SEPARATOR);
     else if ((strcasecmp("GraphitePrefix", child->key) == 0) && publish)
       status = cf_util_get_string(child, &conf->prefix);
     else if ((strcasecmp("GraphitePostfix", child->key) == 0) && publish)
@@ -980,11 +980,12 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
 
   if (publish) {
     char cbname[128];
-    user_data_t ud = {conf, camqp_config_free};
-
     ssnprintf(cbname, sizeof(cbname), "amqp/%s", conf->name);
 
-    status = plugin_register_write(cbname, camqp_write, &ud);
+    status = plugin_register_write(
+        cbname, camqp_write, &(user_data_t){
+                                 .data = conf, .free_func = camqp_config_free,
+                             });
     if (status != 0) {
       camqp_config_free(conf);
       return (status);