amqp plugin: Fixed compilation when using rabbitmq-c < 0.4.
[collectd.git] / src / amqp.c
index c9e46c4..7de9d7f 100644 (file)
 #include <amqp.h>
 #include <amqp_framing.h>
 
+#ifdef HAVE_AMQP_TCP_SOCKET_H
+# include <amqp_tcp_socket.h>
+#endif
+#ifdef HAVE_AMQP_SOCKET_H
+# include <amqp_socket.h>
+#endif
+#ifdef HAVE_AMQP_TCP_SOCKET
+#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE
+/* rabbitmq-c does not currently ship amqp_socket.h
+ * and, thus, does not define this function. */
+int amqp_socket_close(amqp_socket_t *);
+#endif
+#endif
+
 /* Defines for the delivery mode. I have no idea why they're not defined by the
  * library.. */
 #define CAMQP_DM_VOLATILE   1
@@ -74,6 +88,7 @@ struct camqp_config_s
     char    *prefix;
     char    *postfix;
     char    escape_char;
+    unsigned int graphite_flags;
 
     /* subscribe only */
     char   *exchange_type;
@@ -389,8 +404,12 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
 static int camqp_connect (camqp_config_t *conf) /* {{{ */
 {
     amqp_rpc_reply_t reply;
-    int sockfd;
     int status;
+#ifdef HAVE_AMQP_TCP_SOCKET
+    amqp_socket_t *socket;
+#else
+    int sockfd;
+#endif
 
     if (conf->connection != NULL)
         return (0);
@@ -402,6 +421,34 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         return (ENOMEM);
     }
 
+#ifdef HAVE_AMQP_TCP_SOCKET
+# define CLOSE_SOCKET() amqp_socket_close (socket)
+    /* TODO: add support for SSL using amqp_ssl_socket_new
+     *       and related functions */
+    socket = amqp_tcp_socket_new (conf->connection);
+    if (! socket)
+    {
+        ERROR ("amqp plugin: amqp_tcp_socket_new failed.");
+        amqp_destroy_connection (conf->connection);
+        conf->connection = NULL;
+        return (ENOMEM);
+    }
+
+    status = amqp_socket_open (socket, CONF(conf, host), conf->port);
+    if (status < 0)
+    {
+        char errbuf[1024];
+        status *= -1;
+        ERROR ("amqp plugin: amqp_socket_open failed: %s",
+                sstrerror (status, errbuf, sizeof (errbuf)));
+        CLOSE_SOCKET ();
+        amqp_destroy_connection (conf->connection);
+        conf->connection = NULL;
+        return (status);
+    }
+#else /* HAVE_AMQP_TCP_SOCKET */
+# define CLOSE_SOCKET() close(sockfd)
+    /* this interface is deprecated as of rabbitmq-c 0.4 */
     sockfd = amqp_open_socket (CONF(conf, host), conf->port);
     if (sockfd < 0)
     {
@@ -414,6 +461,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         return (status);
     }
     amqp_set_sockfd (conf->connection, sockfd);
+#endif
 
     reply = amqp_login (conf->connection, CONF(conf, vhost),
             /* channel max = */      0,
@@ -426,7 +474,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
                 CONF(conf, vhost), CONF(conf, user));
         amqp_destroy_connection (conf->connection);
-        close (sockfd);
+        CLOSE_SOCKET ();
         conf->connection = NULL;
         return (1);
     }
@@ -439,7 +487,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */
         ERROR ("amqp plugin: amqp_channel_open failed.");
         amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
         amqp_destroy_connection (conf->connection);
-        close(sockfd);
+        CLOSE_SOCKET ();
         conf->connection = NULL;
         return (1);
     }
@@ -600,6 +648,8 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */
     camqp_config_t *conf = user_data;
     int status;
 
+    cdtime_t interval = plugin_get_interval ();
+
     while (subscriber_threads_running)
     {
         amqp_frame_t frame;
@@ -610,8 +660,8 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */
             struct timespec ts_interval;
             ERROR ("amqp plugin: camqp_connect failed. "
                     "Will sleep for %.3f seconds.",
-                    CDTIME_T_TO_DOUBLE (interval_g));
-            CDTIME_T_TO_TIMESPEC (interval_g, &ts_interval);
+                    CDTIME_T_TO_DOUBLE (interval));
+            CDTIME_T_TO_TIMESPEC (interval, &ts_interval);
             nanosleep (&ts_interval, /* remaining = */ NULL);
             continue;
         }
@@ -622,9 +672,9 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */
             struct timespec ts_interval;
             ERROR ("amqp plugin: amqp_simple_wait_frame failed. "
                     "Will sleep for %.3f seconds.",
-                    CDTIME_T_TO_DOUBLE (interval_g));
+                    CDTIME_T_TO_DOUBLE (interval));
             camqp_close_connection (conf);
-            CDTIME_T_TO_TIMESPEC (interval_g, &ts_interval);
+            CDTIME_T_TO_TIMESPEC (interval, &ts_interval);
             nanosleep (&ts_interval, /* remaining = */ NULL);
             continue;
         }
@@ -670,7 +720,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
     tmp = subscriber_threads + subscriber_threads_num;
     memset (tmp, 0, sizeof (*tmp));
 
-    status = pthread_create (tmp, /* attr = */ NULL,
+    status = plugin_thread_create (tmp, /* attr = */ NULL,
             camqp_subscribe_thread, conf);
     if (status != 0)
     {
@@ -792,7 +842,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
     {
         status = format_graphite (buffer, sizeof (buffer), ds, vl,
                     conf->prefix, conf->postfix, conf->escape_char,
-                    conf->store_rates);
+                    conf->graphite_flags);
         if (status != 0)
         {
             ERROR ("amqp plugin: format_graphite failed with status %i.",
@@ -932,7 +982,11 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
                 conf->delivery_mode = CAMQP_DM_VOLATILE;
         }
         else if ((strcasecmp ("StoreRates", child->key) == 0) && publish)
+        {
             status = cf_util_get_boolean (child, &conf->store_rates);
+            (void) cf_util_get_flag (child, &conf->graphite_flags,
+                    GRAPHITE_STORE_RATES);
+        }
         else if ((strcasecmp ("Format", child->key) == 0) && publish)
             status = camqp_config_set_format (child, conf);
         else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish)