Merge branch 'collectd-4.10'
[collectd.git] / src / amqp.c
index 6be483e..f0abd44 100644 (file)
  *   Florian Forster <octo at verplant.org>
  **/
 
-#include <stdint.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <strings.h>
-#include <pthread.h>
-
 #include "collectd.h"
 #include "common.h"
 #include "plugin.h"
 #include "utils_cmd_putval.h"
 #include "utils_format_json.h"
 
+#include <pthread.h>
+
 #include <amqp.h>
 #include <amqp_framing.h>
 
@@ -58,7 +54,6 @@ struct camqp_config_s
 {
     _Bool   publish;
     char   *name;
-    int     format;
 
     char   *host;
     int     port;
@@ -72,6 +67,7 @@ struct camqp_config_s
     /* publish only */
     uint8_t delivery_mode;
     _Bool   store_rates;
+    int     format;
 
     /* subscribe only */
     char   *exchange_type;
@@ -220,6 +216,37 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
     return (buffer);
 } /* }}} char *camqp_strerror */
 
+static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
+{
+    amqp_exchange_declare_ok_t *ed_ret;
+
+    if (conf->exchange_type == NULL)
+        return (0);
+
+    ed_ret = amqp_exchange_declare (conf->connection,
+            /* channel     = */ CAMQP_CHANNEL,
+            /* exchange    = */ amqp_cstring_bytes (conf->exchange),
+            /* type        = */ amqp_cstring_bytes (conf->exchange_type),
+            /* passive     = */ 0,
+            /* durable     = */ 0,
+            /* auto_delete = */ 1,
+            /* arguments   = */ AMQP_EMPTY_TABLE);
+    if ((ed_ret == NULL) && camqp_is_error (conf))
+    {
+        char errbuf[1024];
+        ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
+                camqp_strerror (conf, errbuf, sizeof (errbuf)));
+        camqp_close_connection (conf);
+        return (-1);
+    }
+
+    INFO ("amqp plugin: Successfully created exchange \"%s\" "
+            "with type \"%s\".",
+            conf->exchange, conf->exchange_type);
+
+    return (0);
+} /* }}} int camqp_create_exchange */
+
 static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
 {
     amqp_queue_declare_ok_t *qd_ret;
@@ -261,29 +288,6 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
     {
         amqp_queue_bind_ok_t *qb_ret;
 
-        /* create the exchange */
-        if (conf->exchange_type != NULL)
-        {
-            amqp_exchange_declare_ok_t *ed_ret;
-
-            ed_ret = amqp_exchange_declare (conf->connection,
-                    /* channel     = */ CAMQP_CHANNEL,
-                    /* exchange    = */ amqp_cstring_bytes (conf->exchange),
-                    /* type        = */ amqp_cstring_bytes (conf->exchange_type),
-                    /* passive     = */ 0,
-                    /* durable     = */ 0,
-                    /* auto_delete = */ 1,
-                    /* arguments   = */ AMQP_EMPTY_TABLE);
-            if ((ed_ret == NULL) && camqp_is_error (conf))
-            {
-                char errbuf[1024];
-                ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
-                        camqp_strerror (conf, errbuf, sizeof (errbuf)));
-                camqp_close_connection (conf);
-                return (-1);
-            }
-        }
-
         assert (conf->queue != NULL);
         qb_ret = amqp_queue_bind (conf->connection,
                 /* channel     = */ CAMQP_CHANNEL,
@@ -386,12 +390,16 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
             "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
 
+    status = camqp_create_exchange (conf);
+    if (status != 0)
+        return (status);
+
     if (!conf->publish)
         return (camqp_setup_queue (conf));
     return (0);
 } /* }}} int camqp_connect */
 
-static int shutdown (void) /* {{{ */
+static int camqp_shutdown (void) /* {{{ */
 {
     size_t i;
 
@@ -414,13 +422,13 @@ static int shutdown (void) /* {{{ */
     DEBUG ("amqp plugin: All subscriber threads exited.");
 
     return (0);
-} /* }}} int shutdown */
+} /* }}} int camqp_shutdown */
 
 /*
  * Subscribing code
  */
 static int camqp_read_body (camqp_config_t *conf, /* {{{ */
-        size_t body_size)
+        size_t body_size, const char *content_type)
 {
     char body[body_size + 1];
     char *body_ptr;
@@ -464,7 +472,7 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */
         received += frame.payload.body_fragment.len;
     } /* while (received < body_size) */
 
-    if (conf->format == CAMQP_FORMAT_COMMAND)
+    if (strcasecmp ("text/collectd", content_type) == 0)
     {
         status = handle_putval (stderr, body);
         if (status != 0)
@@ -472,7 +480,7 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */
                     status);
         return (status);
     }
-    else if (conf->format == CAMQP_FORMAT_JSON)
+    else if (strcasecmp ("application/json", content_type) == 0)
     {
         ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not "
                 "been implemented yet. FIXME!");
@@ -480,8 +488,8 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */
     }
     else
     {
-        ERROR ("amqp plugin: camqp_read_body: Unknown format option (%i).",
-                conf->format);
+        ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".",
+                content_type);
         return (EINVAL);
     }
 
@@ -493,6 +501,8 @@ static int camqp_read_header (camqp_config_t *conf) /* {{{ */
 {
     int status;
     amqp_frame_t frame;
+    amqp_basic_properties_t *properties;
+    char *content_type;
 
     status = amqp_simple_wait_frame (conf->connection, &frame);
     if (status < 0)
@@ -512,7 +522,20 @@ static int camqp_read_header (camqp_config_t *conf) /* {{{ */
         return (-1);
     }
 
-    return (camqp_read_body (conf, frame.payload.properties.body_size));
+    properties = frame.payload.properties.decoded;
+    content_type = camqp_bytes_cstring (&properties->content_type);
+    if (content_type == NULL)
+    {
+        ERROR ("amqp plugin: Unable to determine content type.");
+        return (-1);
+    }
+
+    status = camqp_read_body (conf,
+            (size_t) frame.payload.properties.body_size,
+            content_type);
+
+    sfree (content_type);
+    return (status);
 } /* }}} int camqp_read_header */
 
 static void *camqp_subscribe_thread (void *user_data) /* {{{ */
@@ -602,6 +625,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
 /*
  * Publishing code
  */
+/* XXX: You must hold "conf->lock" when calling this function! */
 static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
         const char *buffer, const char *routing_key)
 {
@@ -616,7 +640,12 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
         | AMQP_BASIC_DELIVERY_MODE_FLAG
         | AMQP_BASIC_APP_ID_FLAG;
-    props.content_type = amqp_cstring_bytes("application/json");
+    if (conf->format == CAMQP_FORMAT_COMMAND)
+        props.content_type = amqp_cstring_bytes("text/collectd");
+    else if (conf->format == CAMQP_FORMAT_JSON)
+        props.content_type = amqp_cstring_bytes("application/json");
+    else
+        assert (23 == 42);
     props.delivery_mode = conf->delivery_mode;
     props.app_id = amqp_cstring_bytes("collectd");
 
@@ -784,9 +813,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     {
         oconfig_item_t *child = ci->children + i;
 
-        if (strcasecmp ("Format", child->key) == 0)
-            status = camqp_config_set_format (child, conf);
-        else if (strcasecmp ("Host", child->key) == 0)
+        if (strcasecmp ("Host", child->key) == 0)
             status = cf_util_get_string (child, &conf->host);
         else if (strcasecmp ("Port", child->key) == 0)
         {
@@ -822,6 +849,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
         }
         else if ((strcasecmp ("StoreRates", child->key) == 0) && publish)
             status = cf_util_get_boolean (child, &conf->store_rates);
+        else if ((strcasecmp ("Format", child->key) == 0) && publish)
+            status = camqp_config_set_format (child, conf);
         else
             WARNING ("amqp plugin: Ignoring unknown "
                     "configuration option \"%s\".", child->key);
@@ -830,15 +859,16 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
             break;
     } /* for (i = 0; i < ci->children_num; i++) */
 
-    if ((status == 0) && !publish && (conf->exchange == NULL))
+    if ((status == 0) && (conf->exchange == NULL))
     {
-        if (conf->routing_key != NULL)
-            WARNING ("amqp plugin: The option \"RoutingKey\" was given "
-                    "without the \"Exchange\" option. It will be ignored.");
-
         if (conf->exchange_type != NULL)
             WARNING ("amqp plugin: The option \"ExchangeType\" was given "
                     "without the \"Exchange\" option. It will be ignored.");
+
+        if (!publish && (conf->routing_key != NULL))
+            WARNING ("amqp plugin: The option \"RoutingKey\" was given "
+                    "without the \"Exchange\" option. It will be ignored.");
+
     }
 
     if (status != 0)
@@ -903,7 +933,7 @@ static int camqp_config (oconfig_item_t *ci) /* {{{ */
 void module_register (void)
 {
     plugin_register_complex_config ("amqp", camqp_config);
-    plugin_register_shutdown ("amqp", shutdown);
+    plugin_register_shutdown ("amqp", camqp_shutdown);
 } /* void module_register */
 
 /* vim: set sw=4 sts=4 et fdm=marker : */