Merge branch 'collectd-5.4'
[collectd.git] / src / amqp.c
index bebaea7..1764129 100644 (file)
@@ -23,7 +23,7 @@
  *
  * Authors:
  *   Sebastien Pahl <sebastien.pahl at dotcloud.com>
- *   Florian Forster <octo at verplant.org>
+ *   Florian Forster <octo at collectd.org>
  **/
 
 #include "collectd.h"
 #ifdef HAVE_AMQP_SOCKET_H
 # include <amqp_socket.h>
 #endif
+#ifdef HAVE_AMQP_TCP_SOCKET
 #if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE
 /* rabbitmq-c does not currently ship amqp_socket.h
  * and, thus, does not define this function. */
 int amqp_socket_close(amqp_socket_t *);
 #endif
+#endif
 
 /* Defines for the delivery mode. I have no idea why they're not defined by the
  * library.. */
@@ -91,6 +93,8 @@ struct camqp_config_s
     /* subscribe only */
     char   *exchange_type;
     char   *queue;
+    _Bool   queue_durable;
+    _Bool   queue_auto_delete;
 
     amqp_connection_state_t connection;
     pthread_mutex_t lock;
@@ -326,9 +330,9 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
             ? amqp_cstring_bytes (conf->queue)
             : AMQP_EMPTY_BYTES,
             /* passive     = */ 0,
-            /* durable     = */ 0,
+            /* durable     = */ conf->queue_durable,
             /* exclusive   = */ 0,
-            /* auto_delete = */ 1,
+            /* auto_delete = */ conf->queue_auto_delete,
             /* arguments   = */ AMQP_EMPTY_TABLE);
     if (qd_ret == NULL)
     {
@@ -420,7 +424,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
     }
 
 #ifdef HAVE_AMQP_TCP_SOCKET
-# define CLOSE_SOCKET() amqp_socket_close (socket)
+# define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us */
     /* TODO: add support for SSL using amqp_ssl_socket_new
      *       and related functions */
     socket = amqp_tcp_socket_new (conf->connection);
@@ -439,13 +443,12 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         status *= -1;
         ERROR ("amqp plugin: amqp_socket_open failed: %s",
                 sstrerror (status, errbuf, sizeof (errbuf)));
-        CLOSE_SOCKET ();
         amqp_destroy_connection (conf->connection);
         conf->connection = NULL;
         return (status);
     }
 #else /* HAVE_AMQP_TCP_SOCKET */
-# define CLOSE_SOCKET close(sockfd)
+# define CLOSE_SOCKET() close(sockfd)
     /* this interface is deprecated as of rabbitmq-c 0.4 */
     sockfd = amqp_open_socket (CONF(conf, host), conf->port);
     if (sockfd < 0)
@@ -786,7 +789,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
 {
     camqp_config_t *conf = user_data->data;
     char routing_key[6 * DATA_MAX_NAME_LEN];
-    char buffer[4096];
+    char buffer[8192];
     int status;
 
     if ((ds == NULL) || (vl == NULL) || (conf == NULL))
@@ -922,6 +925,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     /* publish only */
     conf->delivery_mode = CAMQP_DM_VOLATILE;
     conf->store_rates = 0;
+    conf->graphite_flags = 0;
     /* publish & graphite only */
     conf->prefix = NULL;
     conf->postfix = NULL;
@@ -929,6 +933,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     /* subscribe only */
     conf->exchange_type = NULL;
     conf->queue = NULL;
+    conf->queue_durable = 0;
+    conf->queue_auto_delete = 1;
     /* general */
     conf->connection = NULL;
     pthread_mutex_init (&conf->lock, /* attr = */ NULL);
@@ -968,6 +974,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
             status = cf_util_get_string (child, &conf->exchange_type);
         else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
             status = cf_util_get_string (child, &conf->queue);
+        else if ((strcasecmp ("QueueDurable", child->key) == 0) && !publish)
+            status = cf_util_get_boolean (child, &conf->queue_durable);
+        else if ((strcasecmp ("QueueAutoDelete", child->key) == 0) && !publish)
+            status = cf_util_get_boolean (child, &conf->queue_auto_delete);
         else if (strcasecmp ("RoutingKey", child->key) == 0)
             status = cf_util_get_string (child, &conf->routing_key);
         else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
@@ -987,6 +997,12 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
         }
         else if ((strcasecmp ("Format", child->key) == 0) && publish)
             status = camqp_config_set_format (child, conf);
+        else if ((strcasecmp ("GraphiteSeparateInstances", child->key) == 0) && publish)
+            status = cf_util_get_flag (child, &conf->graphite_flags,
+                    GRAPHITE_SEPARATE_INSTANCES);
+        else if ((strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) && publish)
+            status = cf_util_get_flag (child, &conf->graphite_flags,
+                    GRAPHITE_ALWAYS_APPEND_DS);
         else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish)
             status = cf_util_get_string (child, &conf->prefix);
         else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish)