amqp, write_graphite and write_kafka plugins: Implement the "[Graphite]PreserveSepara...
[collectd.git] / src / amqp.c
index ec79430..b237ba3 100644 (file)
  **/
 
 #include "collectd.h"
+
 #include "common.h"
 #include "plugin.h"
 #include "utils_cmd_putval.h"
 #include "utils_format_json.h"
 #include "utils_format_graphite.h"
 
-#include <pthread.h>
-
 #include <amqp.h>
 #include <amqp_framing.h>
 
@@ -529,13 +528,11 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
 
 static int camqp_shutdown (void) /* {{{ */
 {
-    size_t i;
-
     DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
             subscriber_threads_num);
 
     subscriber_threads_running = 0;
-    for (i = 0; i < subscriber_threads_num; i++)
+    for (size_t i = 0; i < subscriber_threads_num; i++)
     {
         /* FIXME: Sending a signal is not very elegant here. Maybe find out how
          * to use a timeout in the thread and check for the variable in regular
@@ -602,9 +599,9 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */
 
     if (strcasecmp ("text/collectd", content_type) == 0)
     {
-        status = handle_putval (stderr, body);
+        status = cmd_handle_putval (stderr, body);
         if (status != 0)
-            ERROR ("amqp plugin: handle_putval failed with status %i.",
+            ERROR ("amqp plugin: cmd_handle_putval failed with status %i.",
                     status);
         return (status);
     }
@@ -680,25 +677,21 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */
         status = camqp_connect (conf);
         if (status != 0)
         {
-            struct timespec ts_interval;
             ERROR ("amqp plugin: camqp_connect failed. "
                     "Will sleep for %.3f seconds.",
                     CDTIME_T_TO_DOUBLE (interval));
-            CDTIME_T_TO_TIMESPEC (interval, &ts_interval);
-            nanosleep (&ts_interval, /* remaining = */ NULL);
+            nanosleep (&CDTIME_T_TO_TIMESPEC (interval), /* remaining = */ NULL);
             continue;
         }
 
         status = amqp_simple_wait_frame (conf->connection, &frame);
         if (status < 0)
         {
-            struct timespec ts_interval;
             ERROR ("amqp plugin: amqp_simple_wait_frame failed. "
                     "Will sleep for %.3f seconds.",
                     CDTIME_T_TO_DOUBLE (interval));
             camqp_close_connection (conf);
-            CDTIME_T_TO_TIMESPEC (interval, &ts_interval);
-            nanosleep (&ts_interval, /* remaining = */ NULL);
+            nanosleep (&CDTIME_T_TO_TIMESPEC (interval), /* remaining = */ NULL);
             continue;
         }
 
@@ -736,7 +729,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
     if (tmp == NULL)
     {
         ERROR ("amqp plugin: realloc failed.");
-        camqp_config_free (conf);
+        sfree (subscriber_threads);
         return (ENOMEM);
     }
     subscriber_threads = tmp;
@@ -744,13 +737,12 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
     memset (tmp, 0, sizeof (*tmp));
 
     status = plugin_thread_create (tmp, /* attr = */ NULL,
-            camqp_subscribe_thread, conf);
+            camqp_subscribe_thread, conf, "amqp subscribe");
     if (status != 0)
     {
         char errbuf[1024];
         ERROR ("amqp plugin: pthread_create failed: %s",
                 sstrerror (status, errbuf, sizeof (errbuf)));
-        camqp_config_free (conf);
         return (status);
     }
 
@@ -766,17 +758,20 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
 static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
         const char *buffer, const char *routing_key)
 {
-    amqp_basic_properties_t props;
     int status;
 
     status = camqp_connect (conf);
     if (status != 0)
         return (status);
 
-    memset (&props, 0, sizeof (props));
-    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
-        | AMQP_BASIC_DELIVERY_MODE_FLAG
-        | AMQP_BASIC_APP_ID_FLAG;
+    amqp_basic_properties_t props = {
+        ._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
+            | AMQP_BASIC_DELIVERY_MODE_FLAG
+            | AMQP_BASIC_APP_ID_FLAG,
+        .delivery_mode = conf->delivery_mode,
+        .app_id = amqp_cstring_bytes("collectd")
+    };
+
     if (conf->format == CAMQP_FORMAT_COMMAND)
         props.content_type = amqp_cstring_bytes("text/collectd");
     else if (conf->format == CAMQP_FORMAT_JSON)
@@ -785,8 +780,6 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
         props.content_type = amqp_cstring_bytes("text/graphite");
     else
         assert (23 == 42);
-    props.delivery_mode = conf->delivery_mode;
-    props.app_id = amqp_cstring_bytes("collectd");
 
     status = amqp_basic_publish(conf->connection,
                 /* channel = */ 1,
@@ -817,15 +810,12 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
     if ((ds == NULL) || (vl == NULL) || (conf == NULL))
         return (EINVAL);
 
-    memset (buffer, 0, sizeof (buffer));
-
     if (conf->routing_key != NULL)
     {
         sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
     }
     else
     {
-        size_t i;
         ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s",
                 vl->host,
                 vl->plugin, vl->plugin_instance,
@@ -833,7 +823,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
 
         /* Switch slashes (the only character forbidden by collectd) and dots
          * (the separation character used by AMQP). */
-        for (i = 0; routing_key[i] != 0; i++)
+        for (size_t i = 0; routing_key[i] != 0; i++)
         {
             if (routing_key[i] == '.')
                 routing_key[i] = '/';
@@ -844,10 +834,10 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
 
     if (conf->format == CAMQP_FORMAT_COMMAND)
     {
-        status = create_putval (buffer, sizeof (buffer), ds, vl);
+        status = cmd_create_putval (buffer, sizeof (buffer), ds, vl);
         if (status != 0)
         {
-            ERROR ("amqp plugin: create_putval failed with status %i.",
+            ERROR ("amqp plugin: cmd_create_putval failed with status %i.",
                     status);
             return (status);
         }
@@ -923,7 +913,6 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
 {
     camqp_config_t *conf;
     int status;
-    int i;
 
     conf = calloc (1, sizeof (*conf));
     if (conf == NULL)
@@ -970,7 +959,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
         return (status);
     }
 
-    for (i = 0; i < ci->children_num; i++)
+    for (int i = 0; i < ci->children_num; i++)
     {
         oconfig_item_t *child = ci->children + i;
 
@@ -1026,6 +1015,9 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
         else if ((strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) && publish)
             status = cf_util_get_flag (child, &conf->graphite_flags,
                     GRAPHITE_ALWAYS_APPEND_DS);
+        else if ((strcasecmp ("GraphitePreserveSeparator", child->key) == 0) && publish)
+            status = cf_util_get_flag (child, &conf->graphite_flags,
+                    GRAPHITE_PRESERVE_SEPARATOR);
         else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish)
             status = cf_util_get_string (child, &conf->prefix);
         else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish)
@@ -1077,11 +1069,13 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     if (publish)
     {
         char cbname[128];
-        user_data_t ud = { conf, camqp_config_free };
-
         ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
 
-        status = plugin_register_write (cbname, camqp_write, &ud);
+        status = plugin_register_write (cbname, camqp_write,
+                &(user_data_t) {
+                    .data = conf,
+                    .free_func = camqp_config_free,
+                });
         if (status != 0)
         {
             camqp_config_free (conf);
@@ -1103,9 +1097,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
 
 static int camqp_config (oconfig_item_t *ci) /* {{{ */
 {
-    int i;
-
-    for (i = 0; i < ci->children_num; i++)
+    for (int i = 0; i < ci->children_num; i++)
     {
         oconfig_item_t *child = ci->children + i;