X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp.c;h=9bc3175cebaf0dd25c5a1188f4099e10426ccaac;hb=cd5c60931ef73c6c34be6dcf58538b069be17c58;hp=1764129faf4a57fdc622062156d28ffb322a4bef;hpb=e30c4b09f2ba06ed279d7ddfdc3714e1b3ab3a06;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index 1764129f..9bc3175c 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -33,8 +33,6 @@ #include "utils_format_json.h" #include "utils_format_graphite.h" -#include - #include #include @@ -80,6 +78,9 @@ struct camqp_config_s char *exchange; char *routing_key; + /* Number of seconds to wait before connection is retried */ + int connection_retry_delay; + /* publish only */ uint8_t delivery_mode; _Bool store_rates; @@ -196,11 +197,11 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ switch (r.reply_type) { case AMQP_RESPONSE_NORMAL: - sstrncpy (buffer, "Success", sizeof (buffer)); + sstrncpy (buffer, "Success", buffer_size); break; case AMQP_RESPONSE_NONE: - sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer)); + sstrncpy (buffer, "Missing RPC reply type", buffer_size); break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: @@ -212,7 +213,7 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ return (sstrerror (r.library_error, buffer, buffer_size)); #endif else - sstrncpy (buffer, "End of stream", sizeof (buffer)); + sstrncpy (buffer, "End of stream", buffer_size); break; case AMQP_RESPONSE_SERVER_EXCEPTION: @@ -301,6 +302,10 @@ static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ /* type = */ amqp_cstring_bytes (conf->exchange_type), /* passive = */ 0, /* durable = */ 0, +#if defined(AMQP_VERSION) && AMQP_VERSION >= 0x00060000 + /* auto delete = */ 0, + /* internal = */ 0, +#endif /* arguments = */ argument_table); if ((ed_ret == NULL) && camqp_is_error (conf)) { @@ -405,6 +410,8 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ static int camqp_connect (camqp_config_t *conf) /* {{{ */ { + static time_t last_connect_time = 0; + amqp_rpc_reply_t reply; int status; #ifdef HAVE_AMQP_TCP_SOCKET @@ -416,6 +423,19 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ if (conf->connection != NULL) return (0); + time_t now = time(NULL); + if (now < (last_connect_time + conf->connection_retry_delay)) + { + DEBUG("amqp plugin: skipping connection retry, " + "ConnectionRetryDelay: %d", conf->connection_retry_delay); + return(1); + } + else + { + DEBUG ("amqp plugin: retrying connection"); + last_connect_time = now; + } + conf->connection = amqp_new_connection (); if (conf->connection == NULL) { @@ -694,7 +714,7 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ continue; } - status = camqp_read_header (conf); + camqp_read_header (conf); amqp_maybe_release_buffers (conf->connection); } /* while (subscriber_threads_running) */ @@ -714,7 +734,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ if (tmp == NULL) { ERROR ("amqp plugin: realloc failed."); - camqp_config_free (conf); + sfree (subscriber_threads); return (ENOMEM); } subscriber_threads = tmp; @@ -728,7 +748,6 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ char errbuf[1024]; ERROR ("amqp plugin: pthread_create failed: %s", sstrerror (status, errbuf, sizeof (errbuf))); - camqp_config_free (conf); return (status); } @@ -744,17 +763,20 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ const char *buffer, const char *routing_key) { - amqp_basic_properties_t props; int status; status = camqp_connect (conf); if (status != 0) return (status); - memset (&props, 0, sizeof (props)); - props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG - | AMQP_BASIC_DELIVERY_MODE_FLAG - | AMQP_BASIC_APP_ID_FLAG; + amqp_basic_properties_t props = { + ._flags = AMQP_BASIC_CONTENT_TYPE_FLAG + | AMQP_BASIC_DELIVERY_MODE_FLAG + | AMQP_BASIC_APP_ID_FLAG, + .delivery_mode = conf->delivery_mode, + .app_id = amqp_cstring_bytes("collectd") + }; + if (conf->format == CAMQP_FORMAT_COMMAND) props.content_type = amqp_cstring_bytes("text/collectd"); else if (conf->format == CAMQP_FORMAT_JSON) @@ -763,8 +785,6 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ props.content_type = amqp_cstring_bytes("text/graphite"); else assert (23 == 42); - props.delivery_mode = conf->delivery_mode; - props.app_id = amqp_cstring_bytes("collectd"); status = amqp_basic_publish(conf->connection, /* channel = */ 1, @@ -795,8 +815,6 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ if ((ds == NULL) || (vl == NULL) || (conf == NULL)) return (EINVAL); - memset (buffer, 0, sizeof (buffer)); - if (conf->routing_key != NULL) { sstrncpy (routing_key, conf->routing_key, sizeof (routing_key)); @@ -903,15 +921,14 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ int status; int i; - conf = malloc (sizeof (*conf)); + conf = calloc (1, sizeof (*conf)); if (conf == NULL) { - ERROR ("amqp plugin: malloc failed."); + ERROR ("amqp plugin: calloc failed."); return (ENOMEM); } /* Initialize "conf" {{{ */ - memset (conf, 0, sizeof (*conf)); conf->publish = publish; conf->name = NULL; conf->format = CAMQP_FORMAT_COMMAND; @@ -922,6 +939,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ conf->password = NULL; conf->exchange = NULL; conf->routing_key = NULL; + conf->connection_retry_delay = 0; + /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; @@ -1017,6 +1036,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ conf->escape_char = tmp_buff[0]; sfree (tmp_buff); } + else if (strcasecmp ("ConnectionRetryDelay", child->key) == 0) + status = cf_util_get_int (child, &conf->connection_retry_delay); else WARNING ("amqp plugin: Ignoring unknown " "configuration option \"%s\".", child->key);