Merge branch 'collectd-5.5'
[collectd.git] / src / amqp.c
index 187582c..99dca90 100644 (file)
@@ -33,8 +33,6 @@
 #include "utils_format_json.h"
 #include "utils_format_graphite.h"
 
-#include <pthread.h>
-
 #include <amqp.h>
 #include <amqp_framing.h>
 
@@ -199,11 +197,11 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
     switch (r.reply_type)
     {
         case AMQP_RESPONSE_NORMAL:
-            sstrncpy (buffer, "Success", sizeof (buffer));
+            sstrncpy (buffer, "Success", buffer_size);
             break;
 
         case AMQP_RESPONSE_NONE:
-            sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer));
+            sstrncpy (buffer, "Missing RPC reply type", buffer_size);
             break;
 
         case AMQP_RESPONSE_LIBRARY_EXCEPTION:
@@ -215,7 +213,7 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
                 return (sstrerror (r.library_error, buffer, buffer_size));
 #endif
             else
-                sstrncpy (buffer, "End of stream", sizeof (buffer));
+                sstrncpy (buffer, "End of stream", buffer_size);
             break;
 
         case AMQP_RESPONSE_SERVER_EXCEPTION:
@@ -304,6 +302,10 @@ static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
             /* type        = */ amqp_cstring_bytes (conf->exchange_type),
             /* passive     = */ 0,
             /* durable     = */ 0,
+#if defined(AMQP_VERSION) && AMQP_VERSION >= 0x00060000
+            /* auto delete = */ 0,
+            /* internal    = */ 0,
+#endif
             /* arguments   = */ argument_table);
     if ((ed_ret == NULL) && camqp_is_error (conf))
     {
@@ -712,7 +714,7 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */
             continue;
         }
 
-        status = camqp_read_header (conf);
+        camqp_read_header (conf);
 
         amqp_maybe_release_buffers (conf->connection);
     } /* while (subscriber_threads_running) */
@@ -732,7 +734,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
     if (tmp == NULL)
     {
         ERROR ("amqp plugin: realloc failed.");
-        camqp_config_free (conf);
+        sfree (subscriber_threads);
         return (ENOMEM);
     }
     subscriber_threads = tmp;
@@ -746,7 +748,6 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
         char errbuf[1024];
         ERROR ("amqp plugin: pthread_create failed: %s",
                 sstrerror (status, errbuf, sizeof (errbuf)));
-        camqp_config_free (conf);
         return (status);
     }
 
@@ -921,15 +922,14 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     int status;
     int i;
 
-    conf = malloc (sizeof (*conf));
+    conf = calloc (1, sizeof (*conf));
     if (conf == NULL)
     {
-        ERROR ("amqp plugin: malloc failed.");
+        ERROR ("amqp plugin: calloc failed.");
         return (ENOMEM);
     }
 
     /* Initialize "conf" {{{ */
-    memset (conf, 0, sizeof (*conf));
     conf->publish = publish;
     conf->name = NULL;
     conf->format = CAMQP_FORMAT_COMMAND;
@@ -940,7 +940,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
     conf->password = NULL;
     conf->exchange = NULL;
     conf->routing_key = NULL;
-    conf->connection_retry_delay = 60;
+    conf->connection_retry_delay = 0;
 
     /* publish only */
     conf->delivery_mode = CAMQP_DM_VOLATILE;