amqp plugin: Document the lock required to hold when calling "camqp_write_locked".
[collectd.git] / src / amqp.c
index 7b9f41b..f0abd44 100644 (file)
  *   Florian Forster <octo at verplant.org>
  **/
 
-#include <stdint.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <strings.h>
-#include <pthread.h>
-
 #include "collectd.h"
 #include "common.h"
 #include "plugin.h"
 #include "utils_cmd_putval.h"
 #include "utils_format_json.h"
 
+#include <pthread.h>
+
 #include <amqp.h>
 #include <amqp_framing.h>
 
@@ -220,6 +216,37 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
     return (buffer);
 } /* }}} char *camqp_strerror */
 
+static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
+{
+    amqp_exchange_declare_ok_t *ed_ret;
+
+    if (conf->exchange_type == NULL)
+        return (0);
+
+    ed_ret = amqp_exchange_declare (conf->connection,
+            /* channel     = */ CAMQP_CHANNEL,
+            /* exchange    = */ amqp_cstring_bytes (conf->exchange),
+            /* type        = */ amqp_cstring_bytes (conf->exchange_type),
+            /* passive     = */ 0,
+            /* durable     = */ 0,
+            /* auto_delete = */ 1,
+            /* arguments   = */ AMQP_EMPTY_TABLE);
+    if ((ed_ret == NULL) && camqp_is_error (conf))
+    {
+        char errbuf[1024];
+        ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
+                camqp_strerror (conf, errbuf, sizeof (errbuf)));
+        camqp_close_connection (conf);
+        return (-1);
+    }
+
+    INFO ("amqp plugin: Successfully created exchange \"%s\" "
+            "with type \"%s\".",
+            conf->exchange, conf->exchange_type);
+
+    return (0);
+} /* }}} int camqp_create_exchange */
+
 static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
 {
     amqp_queue_declare_ok_t *qd_ret;
@@ -261,29 +288,6 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
     {
         amqp_queue_bind_ok_t *qb_ret;
 
-        /* create the exchange */
-        if (conf->exchange_type != NULL)
-        {
-            amqp_exchange_declare_ok_t *ed_ret;
-
-            ed_ret = amqp_exchange_declare (conf->connection,
-                    /* channel     = */ CAMQP_CHANNEL,
-                    /* exchange    = */ amqp_cstring_bytes (conf->exchange),
-                    /* type        = */ amqp_cstring_bytes (conf->exchange_type),
-                    /* passive     = */ 0,
-                    /* durable     = */ 0,
-                    /* auto_delete = */ 1,
-                    /* arguments   = */ AMQP_EMPTY_TABLE);
-            if ((ed_ret == NULL) && camqp_is_error (conf))
-            {
-                char errbuf[1024];
-                ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
-                        camqp_strerror (conf, errbuf, sizeof (errbuf)));
-                camqp_close_connection (conf);
-                return (-1);
-            }
-        }
-
         assert (conf->queue != NULL);
         qb_ret = amqp_queue_bind (conf->connection,
                 /* channel     = */ CAMQP_CHANNEL,
@@ -386,12 +390,16 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
             "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
 
+    status = camqp_create_exchange (conf);
+    if (status != 0)
+        return (status);
+
     if (!conf->publish)
         return (camqp_setup_queue (conf));
     return (0);
 } /* }}} int camqp_connect */
 
-static int shutdown (void) /* {{{ */
+static int camqp_shutdown (void) /* {{{ */
 {
     size_t i;
 
@@ -414,7 +422,7 @@ static int shutdown (void) /* {{{ */
     DEBUG ("amqp plugin: All subscriber threads exited.");
 
     return (0);
-} /* }}} int shutdown */
+} /* }}} int camqp_shutdown */
 
 /*
  * Subscribing code
@@ -617,6 +625,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
 /*
  * Publishing code
  */
+/* XXX: You must hold "conf->lock" when calling this function! */
 static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
         const char *buffer, const char *routing_key)
 {
@@ -850,15 +859,16 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
             break;
     } /* for (i = 0; i < ci->children_num; i++) */
 
-    if ((status == 0) && !publish && (conf->exchange == NULL))
+    if ((status == 0) && (conf->exchange == NULL))
     {
-        if (conf->routing_key != NULL)
-            WARNING ("amqp plugin: The option \"RoutingKey\" was given "
-                    "without the \"Exchange\" option. It will be ignored.");
-
         if (conf->exchange_type != NULL)
             WARNING ("amqp plugin: The option \"ExchangeType\" was given "
                     "without the \"Exchange\" option. It will be ignored.");
+
+        if (!publish && (conf->routing_key != NULL))
+            WARNING ("amqp plugin: The option \"RoutingKey\" was given "
+                    "without the \"Exchange\" option. It will be ignored.");
+
     }
 
     if (status != 0)
@@ -923,7 +933,7 @@ static int camqp_config (oconfig_item_t *ci) /* {{{ */
 void module_register (void)
 {
     plugin_register_complex_config ("amqp", camqp_config);
-    plugin_register_shutdown ("amqp", shutdown);
+    plugin_register_shutdown ("amqp", camqp_shutdown);
 } /* void module_register */
 
 /* vim: set sw=4 sts=4 et fdm=marker : */