configfile: Log errors when trying to access unknown global options.
[collectd.git] / src / amqp.c
index 1e23a56..99dca90 100644 (file)
@@ -23,7 +23,7 @@
  *
  * Authors:
  *   Sebastien Pahl <sebastien.pahl at dotcloud.com>
- *   Florian Forster <octo at verplant.org>
+ *   Florian Forster <octo at collectd.org>
  **/
 
 #include "collectd.h"
@@ -33,8 +33,6 @@
 #include "utils_format_json.h"
 #include "utils_format_graphite.h"
 
-#include <pthread.h>
-
 #include <amqp.h>
 #include <amqp_framing.h>
 
@@ -80,6 +78,9 @@ struct camqp_config_s
     char   *exchange;
     char   *routing_key;
 
+    /* Number of seconds to wait before connection is retried */
+    int     connection_retry_delay;
+
     /* publish only */
     uint8_t delivery_mode;
     _Bool   store_rates;
@@ -93,6 +94,8 @@ struct camqp_config_s
     /* subscribe only */
     char   *exchange_type;
     char   *queue;
+    _Bool   queue_durable;
+    _Bool   queue_auto_delete;
 
     amqp_connection_state_t connection;
     pthread_mutex_t lock;
@@ -332,9 +335,9 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
             ? amqp_cstring_bytes (conf->queue)
             : AMQP_EMPTY_BYTES,
             /* passive     = */ 0,
-            /* durable     = */ 0,
+            /* durable     = */ conf->queue_durable,
             /* exclusive   = */ 0,
-            /* auto_delete = */ 1,
+            /* auto_delete = */ conf->queue_auto_delete,
             /* arguments   = */ AMQP_EMPTY_TABLE);
     if (qd_ret == NULL)
     {
@@ -407,6 +410,8 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
 
 static int camqp_connect (camqp_config_t *conf) /* {{{ */
 {
+    static time_t last_connect_time = 0;
+
     amqp_rpc_reply_t reply;
     int status;
 #ifdef HAVE_AMQP_TCP_SOCKET
@@ -418,6 +423,19 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
     if (conf->connection != NULL)
         return (0);
 
+    time_t now = time(NULL);
+    if (now < (last_connect_time + conf->connection_retry_delay))
+    {
+        DEBUG("amqp plugin: skipping connection retry, "
+            "ConnectionRetryDelay: %d", conf->connection_retry_delay);
+        return(1);
+    }
+    else
+    {
+        DEBUG ("amqp plugin: retrying connection");
+        last_connect_time = now;
+    }
+
     conf->connection = amqp_new_connection ();
     if (conf->connection == NULL)
     {
@@ -716,7 +734,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
     if (tmp == NULL)
     {
         ERROR ("amqp plugin: realloc failed.");
-        camqp_config_free (conf);
+        sfree (subscriber_threads);
         return (ENOMEM);
     }
     subscriber_threads = tmp;
@@ -730,7 +748,6 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
         char errbuf[1024];
         ERROR ("amqp plugin: pthread_create failed: %s",
                 sstrerror (status, errbuf, sizeof (errbuf)));
-        camqp_config_free (conf);
         return (status);
     }
 
@@ -791,7 +808,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
 {
     camqp_config_t *conf = user_data->data;
     char routing_key[6 * DATA_MAX_NAME_LEN];
-    char buffer[4096];
+    char buffer[8192];
     int status;
 
     if ((ds == NULL) || (vl == NULL) || (conf == NULL))
@@ -905,15 +922,14 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     int status;
     int i;
 
-    conf = malloc (sizeof (*conf));
+    conf = calloc (1, sizeof (*conf));
     if (conf == NULL)
     {
-        ERROR ("amqp plugin: malloc failed.");
+        ERROR ("amqp plugin: calloc failed.");
         return (ENOMEM);
     }
 
     /* Initialize "conf" {{{ */
-    memset (conf, 0, sizeof (*conf));
     conf->publish = publish;
     conf->name = NULL;
     conf->format = CAMQP_FORMAT_COMMAND;
@@ -924,6 +940,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     conf->password = NULL;
     conf->exchange = NULL;
     conf->routing_key = NULL;
+    conf->connection_retry_delay = 0;
+
     /* publish only */
     conf->delivery_mode = CAMQP_DM_VOLATILE;
     conf->store_rates = 0;
@@ -935,6 +953,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     /* subscribe only */
     conf->exchange_type = NULL;
     conf->queue = NULL;
+    conf->queue_durable = 0;
+    conf->queue_auto_delete = 1;
     /* general */
     conf->connection = NULL;
     pthread_mutex_init (&conf->lock, /* attr = */ NULL);
@@ -974,6 +994,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
             status = cf_util_get_string (child, &conf->exchange_type);
         else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
             status = cf_util_get_string (child, &conf->queue);
+        else if ((strcasecmp ("QueueDurable", child->key) == 0) && !publish)
+            status = cf_util_get_boolean (child, &conf->queue_durable);
+        else if ((strcasecmp ("QueueAutoDelete", child->key) == 0) && !publish)
+            status = cf_util_get_boolean (child, &conf->queue_auto_delete);
         else if (strcasecmp ("RoutingKey", child->key) == 0)
             status = cf_util_get_string (child, &conf->routing_key);
         else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
@@ -1013,6 +1037,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
             conf->escape_char = tmp_buff[0];
             sfree (tmp_buff);
         }
+        else if (strcasecmp ("ConnectionRetryDelay", child->key) == 0)
+            status = cf_util_get_int (child, &conf->connection_retry_delay);
         else
             WARNING ("amqp plugin: Ignoring unknown "
                     "configuration option \"%s\".", child->key);