X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp.c;h=1764129faf4a57fdc622062156d28ffb322a4bef;hb=103f05e098865196fc5f28df51e99b64fd6b5202;hp=c9e46c45df0cac7ef0e0cc4f3bdd2e1bda8ad96e;hpb=1126617bebe569dffe6b7a5d3f0c400134b41d65;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index c9e46c45..1764129f 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -23,7 +23,7 @@ * * Authors: * Sebastien Pahl - * Florian Forster + * Florian Forster **/ #include "collectd.h" @@ -38,6 +38,20 @@ #include #include +#ifdef HAVE_AMQP_TCP_SOCKET_H +# include +#endif +#ifdef HAVE_AMQP_SOCKET_H +# include +#endif +#ifdef HAVE_AMQP_TCP_SOCKET +#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE +/* rabbitmq-c does not currently ship amqp_socket.h + * and, thus, does not define this function. */ +int amqp_socket_close(amqp_socket_t *); +#endif +#endif + /* Defines for the delivery mode. I have no idea why they're not defined by the * library.. */ #define CAMQP_DM_VOLATILE 1 @@ -74,10 +88,13 @@ struct camqp_config_s char *prefix; char *postfix; char escape_char; + unsigned int graphite_flags; /* subscribe only */ char *exchange_type; char *queue; + _Bool queue_durable; + _Bool queue_auto_delete; amqp_connection_state_t connection; pthread_mutex_t lock; @@ -313,9 +330,9 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ ? amqp_cstring_bytes (conf->queue) : AMQP_EMPTY_BYTES, /* passive = */ 0, - /* durable = */ 0, + /* durable = */ conf->queue_durable, /* exclusive = */ 0, - /* auto_delete = */ 1, + /* auto_delete = */ conf->queue_auto_delete, /* arguments = */ AMQP_EMPTY_TABLE); if (qd_ret == NULL) { @@ -389,8 +406,12 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ static int camqp_connect (camqp_config_t *conf) /* {{{ */ { amqp_rpc_reply_t reply; - int sockfd; int status; +#ifdef HAVE_AMQP_TCP_SOCKET + amqp_socket_t *socket; +#else + int sockfd; +#endif if (conf->connection != NULL) return (0); @@ -402,6 +423,33 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (ENOMEM); } +#ifdef HAVE_AMQP_TCP_SOCKET +# define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us */ + /* TODO: add support for SSL using amqp_ssl_socket_new + * and related functions */ + socket = amqp_tcp_socket_new (conf->connection); + if (! socket) + { + ERROR ("amqp plugin: amqp_tcp_socket_new failed."); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (ENOMEM); + } + + status = amqp_socket_open (socket, CONF(conf, host), conf->port); + if (status < 0) + { + char errbuf[1024]; + status *= -1; + ERROR ("amqp plugin: amqp_socket_open failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (status); + } +#else /* HAVE_AMQP_TCP_SOCKET */ +# define CLOSE_SOCKET() close(sockfd) + /* this interface is deprecated as of rabbitmq-c 0.4 */ sockfd = amqp_open_socket (CONF(conf, host), conf->port); if (sockfd < 0) { @@ -414,6 +462,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (status); } amqp_set_sockfd (conf->connection, sockfd); +#endif reply = amqp_login (conf->connection, CONF(conf, vhost), /* channel max = */ 0, @@ -426,7 +475,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.", CONF(conf, vhost), CONF(conf, user)); amqp_destroy_connection (conf->connection); - close (sockfd); + CLOSE_SOCKET (); conf->connection = NULL; return (1); } @@ -439,7 +488,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ ERROR ("amqp plugin: amqp_channel_open failed."); amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS); amqp_destroy_connection (conf->connection); - close(sockfd); + CLOSE_SOCKET (); conf->connection = NULL; return (1); } @@ -600,6 +649,8 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ camqp_config_t *conf = user_data; int status; + cdtime_t interval = plugin_get_interval (); + while (subscriber_threads_running) { amqp_frame_t frame; @@ -610,8 +661,8 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ struct timespec ts_interval; ERROR ("amqp plugin: camqp_connect failed. " "Will sleep for %.3f seconds.", - CDTIME_T_TO_DOUBLE (interval_g)); - CDTIME_T_TO_TIMESPEC (interval_g, &ts_interval); + CDTIME_T_TO_DOUBLE (interval)); + CDTIME_T_TO_TIMESPEC (interval, &ts_interval); nanosleep (&ts_interval, /* remaining = */ NULL); continue; } @@ -622,9 +673,9 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ struct timespec ts_interval; ERROR ("amqp plugin: amqp_simple_wait_frame failed. " "Will sleep for %.3f seconds.", - CDTIME_T_TO_DOUBLE (interval_g)); + CDTIME_T_TO_DOUBLE (interval)); camqp_close_connection (conf); - CDTIME_T_TO_TIMESPEC (interval_g, &ts_interval); + CDTIME_T_TO_TIMESPEC (interval, &ts_interval); nanosleep (&ts_interval, /* remaining = */ NULL); continue; } @@ -670,7 +721,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ tmp = subscriber_threads + subscriber_threads_num; memset (tmp, 0, sizeof (*tmp)); - status = pthread_create (tmp, /* attr = */ NULL, + status = plugin_thread_create (tmp, /* attr = */ NULL, camqp_subscribe_thread, conf); if (status != 0) { @@ -738,7 +789,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ { camqp_config_t *conf = user_data->data; char routing_key[6 * DATA_MAX_NAME_LEN]; - char buffer[4096]; + char buffer[8192]; int status; if ((ds == NULL) || (vl == NULL) || (conf == NULL)) @@ -792,7 +843,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ { status = format_graphite (buffer, sizeof (buffer), ds, vl, conf->prefix, conf->postfix, conf->escape_char, - conf->store_rates); + conf->graphite_flags); if (status != 0) { ERROR ("amqp plugin: format_graphite failed with status %i.", @@ -874,6 +925,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; + conf->graphite_flags = 0; /* publish & graphite only */ conf->prefix = NULL; conf->postfix = NULL; @@ -881,6 +933,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ /* subscribe only */ conf->exchange_type = NULL; conf->queue = NULL; + conf->queue_durable = 0; + conf->queue_auto_delete = 1; /* general */ conf->connection = NULL; pthread_mutex_init (&conf->lock, /* attr = */ NULL); @@ -920,6 +974,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ status = cf_util_get_string (child, &conf->exchange_type); else if ((strcasecmp ("Queue", child->key) == 0) && !publish) status = cf_util_get_string (child, &conf->queue); + else if ((strcasecmp ("QueueDurable", child->key) == 0) && !publish) + status = cf_util_get_boolean (child, &conf->queue_durable); + else if ((strcasecmp ("QueueAutoDelete", child->key) == 0) && !publish) + status = cf_util_get_boolean (child, &conf->queue_auto_delete); else if (strcasecmp ("RoutingKey", child->key) == 0) status = cf_util_get_string (child, &conf->routing_key); else if ((strcasecmp ("Persistent", child->key) == 0) && publish) @@ -932,9 +990,19 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ conf->delivery_mode = CAMQP_DM_VOLATILE; } else if ((strcasecmp ("StoreRates", child->key) == 0) && publish) + { status = cf_util_get_boolean (child, &conf->store_rates); + (void) cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_STORE_RATES); + } else if ((strcasecmp ("Format", child->key) == 0) && publish) status = camqp_config_set_format (child, conf); + else if ((strcasecmp ("GraphiteSeparateInstances", child->key) == 0) && publish) + status = cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_SEPARATE_INSTANCES); + else if ((strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) && publish) + status = cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_ALWAYS_APPEND_DS); else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish) status = cf_util_get_string (child, &conf->prefix); else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish)