X-Git-Url: https://git.octo.it/?p=collectd.git;a=blobdiff_plain;f=src%2Famqp.c;h=2077d57b3d5e870caa6e089456e646810c1e5ce7;hp=c54a1e09036dd289a7ba07819eced763c3edf971;hb=48efd3deb4c9139fd060ff3d289896e9031bcc7c;hpb=35a6c9c5fcd87cb78451a974c4d5b5707926845c diff --git a/src/amqp.c b/src/amqp.c index c54a1e09..2077d57b 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -28,11 +28,11 @@ #include "collectd.h" -#include "common.h" #include "plugin.h" -#include "utils_cmd_putval.h" -#include "utils_format_graphite.h" -#include "utils_format_json.h" +#include "utils/cmds/putval.h" +#include "utils/common/common.h" +#include "utils/format_graphite/format_graphite.h" +#include "utils/format_json/format_json.h" #include #include @@ -66,7 +66,7 @@ int amqp_socket_close(amqp_socket_t *); * Data types */ struct camqp_config_s { - _Bool publish; + bool publish; char *name; char *host; @@ -83,7 +83,7 @@ struct camqp_config_s { /* publish only */ uint8_t delivery_mode; - _Bool store_rates; + bool store_rates; int format; /* publish & graphite format only */ char *prefix; @@ -94,8 +94,8 @@ struct camqp_config_s { /* subscribe only */ char *exchange_type; char *queue; - _Bool queue_durable; - _Bool queue_auto_delete; + bool queue_durable; + bool queue_auto_delete; amqp_connection_state_t connection; pthread_mutex_t lock; @@ -111,9 +111,9 @@ static const char *def_user = "guest"; static const char *def_password = "guest"; static const char *def_exchange = "amq.fanout"; -static pthread_t *subscriber_threads = NULL; -static size_t subscriber_threads_num = 0; -static _Bool subscriber_threads_running = 1; +static pthread_t *subscriber_threads; +static size_t subscriber_threads_num; +static bool subscriber_threads_running = true; #define CONF(c, f) (((c)->f != NULL) ? (c)->f : def_##f) @@ -164,28 +164,28 @@ static char *camqp_bytes_cstring(amqp_bytes_t *in) /* {{{ */ char *ret; if ((in == NULL) || (in->bytes == NULL)) - return (NULL); + return NULL; ret = malloc(in->len + 1); if (ret == NULL) - return (NULL); + return NULL; memcpy(ret, in->bytes, in->len); ret[in->len] = 0; - return (ret); + return ret; } /* }}} char *camqp_bytes_cstring */ -static _Bool camqp_is_error(camqp_config_t *conf) /* {{{ */ +static bool camqp_is_error(camqp_config_t *conf) /* {{{ */ { amqp_rpc_reply_t r; r = amqp_get_rpc_reply(conf->connection); if (r.reply_type == AMQP_RESPONSE_NORMAL) - return (0); + return false; - return (1); -} /* }}} _Bool camqp_is_error */ + return true; +} /* }}} bool camqp_is_error */ static char *camqp_strerror(camqp_config_t *conf, /* {{{ */ char *buffer, size_t buffer_size) { @@ -204,10 +204,10 @@ static char *camqp_strerror(camqp_config_t *conf, /* {{{ */ case AMQP_RESPONSE_LIBRARY_EXCEPTION: #if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO if (r.library_errno) - return (sstrerror(r.library_errno, buffer, buffer_size)); + return sstrerror(r.library_errno, buffer, buffer_size); #else if (r.library_error) - return (sstrerror(r.library_error, buffer, buffer_size)); + return sstrerror(r.library_error, buffer, buffer_size); #endif else sstrncpy(buffer, "End of stream", buffer_size); @@ -236,7 +236,7 @@ static char *camqp_strerror(camqp_config_t *conf, /* {{{ */ ssnprintf(buffer, buffer_size, "Unknown reply type %i", (int)r.reply_type); } - return (buffer); + return buffer; } /* }}} char *camqp_strerror */ #if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO @@ -245,7 +245,7 @@ static int camqp_create_exchange(camqp_config_t *conf) /* {{{ */ amqp_exchange_declare_ok_t *ed_ret; if (conf->exchange_type == NULL) - return (0); + return 0; ed_ret = amqp_exchange_declare( conf->connection, @@ -261,14 +261,14 @@ static int camqp_create_exchange(camqp_config_t *conf) /* {{{ */ ERROR("amqp plugin: amqp_exchange_declare failed: %s", camqp_strerror(conf, errbuf, sizeof(errbuf))); camqp_close_connection(conf); - return (-1); + return -1; } INFO("amqp plugin: Successfully created exchange \"%s\" " "with type \"%s\".", conf->exchange, conf->exchange_type); - return (0); + return 0; } /* }}} int camqp_create_exchange */ #else static int camqp_create_exchange(camqp_config_t *conf) /* {{{ */ @@ -278,7 +278,7 @@ static int camqp_create_exchange(camqp_config_t *conf) /* {{{ */ struct amqp_table_entry_t_ argument_table_entries[1]; if (conf->exchange_type == NULL) - return (0); + return 0; /* Valid arguments: "auto_delete", "internal" */ argument_table.num_entries = STATIC_ARRAY_SIZE(argument_table_entries); @@ -304,14 +304,14 @@ static int camqp_create_exchange(camqp_config_t *conf) /* {{{ */ ERROR("amqp plugin: amqp_exchange_declare failed: %s", camqp_strerror(conf, errbuf, sizeof(errbuf))); camqp_close_connection(conf); - return (-1); + return -1; } INFO("amqp plugin: Successfully created exchange \"%s\" " "with type \"%s\".", conf->exchange, conf->exchange_type); - return (0); + return 0; } /* }}} int camqp_create_exchange */ #endif @@ -320,20 +320,21 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */ amqp_queue_declare_ok_t *qd_ret; amqp_basic_consume_ok_t *cm_ret; - qd_ret = amqp_queue_declare(conf->connection, - /* channel = */ CAMQP_CHANNEL, - /* queue = */ (conf->queue != NULL) - ? amqp_cstring_bytes(conf->queue) - : AMQP_EMPTY_BYTES, - /* passive = */ 0, - /* durable = */ conf->queue_durable, - /* exclusive = */ 0, - /* auto_delete = */ conf->queue_auto_delete, - /* arguments = */ AMQP_EMPTY_TABLE); + qd_ret = + amqp_queue_declare(conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ + (conf->queue != NULL) ? amqp_cstring_bytes(conf->queue) + : AMQP_EMPTY_BYTES, + /* passive = */ 0, + /* durable = */ conf->queue_durable, + /* exclusive = */ 0, + /* auto_delete = */ conf->queue_auto_delete, + /* arguments = */ AMQP_EMPTY_TABLE); if (qd_ret == NULL) { ERROR("amqp plugin: amqp_queue_declare failed."); camqp_close_connection(conf); - return (-1); + return -1; } if (conf->queue == NULL) { @@ -341,7 +342,7 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */ if (conf->queue == NULL) { ERROR("amqp plugin: camqp_bytes_cstring failed."); camqp_close_connection(conf); - return (-1); + return -1; } INFO("amqp plugin: Created queue \"%s\".", conf->queue); @@ -353,21 +354,21 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */ amqp_queue_bind_ok_t *qb_ret; assert(conf->queue != NULL); - qb_ret = - amqp_queue_bind(conf->connection, - /* channel = */ CAMQP_CHANNEL, - /* queue = */ amqp_cstring_bytes(conf->queue), - /* exchange = */ amqp_cstring_bytes(conf->exchange), - /* routing_key = */ (conf->routing_key != NULL) - ? amqp_cstring_bytes(conf->routing_key) - : AMQP_EMPTY_BYTES, - /* arguments = */ AMQP_EMPTY_TABLE); + qb_ret = amqp_queue_bind( + conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* queue = */ amqp_cstring_bytes(conf->queue), + /* exchange = */ amqp_cstring_bytes(conf->exchange), + /* routing_key = */ + (conf->routing_key != NULL) ? amqp_cstring_bytes(conf->routing_key) + : AMQP_EMPTY_BYTES, + /* arguments = */ AMQP_EMPTY_TABLE); if ((qb_ret == NULL) && camqp_is_error(conf)) { char errbuf[1024]; ERROR("amqp plugin: amqp_queue_bind failed: %s", camqp_strerror(conf, errbuf, sizeof(errbuf))); camqp_close_connection(conf); - return (-1); + return -1; } DEBUG("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".", @@ -388,15 +389,15 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */ ERROR("amqp plugin: amqp_basic_consume failed: %s", camqp_strerror(conf, errbuf, sizeof(errbuf))); camqp_close_connection(conf); - return (-1); + return -1; } - return (0); + return 0; } /* }}} int camqp_setup_queue */ static int camqp_connect(camqp_config_t *conf) /* {{{ */ { - static time_t last_connect_time = 0; + static time_t last_connect_time; amqp_rpc_reply_t reply; int status; @@ -407,14 +408,14 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ #endif if (conf->connection != NULL) - return (0); + return 0; time_t now = time(NULL); if (now < (last_connect_time + conf->connection_retry_delay)) { DEBUG("amqp plugin: skipping connection retry, " "ConnectionRetryDelay: %d", conf->connection_retry_delay); - return (1); + return 1; } else { DEBUG("amqp plugin: retrying connection"); last_connect_time = now; @@ -423,12 +424,12 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ conf->connection = amqp_new_connection(); if (conf->connection == NULL) { ERROR("amqp plugin: amqp_new_connection failed."); - return (ENOMEM); + return ENOMEM; } #ifdef HAVE_AMQP_TCP_SOCKET #define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us \ - */ + */ /* TODO: add support for SSL using amqp_ssl_socket_new * and related functions */ socket = amqp_tcp_socket_new(conf->connection); @@ -436,31 +437,27 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ ERROR("amqp plugin: amqp_tcp_socket_new failed."); amqp_destroy_connection(conf->connection); conf->connection = NULL; - return (ENOMEM); + return ENOMEM; } status = amqp_socket_open(socket, CONF(conf, host), conf->port); if (status < 0) { - char errbuf[1024]; status *= -1; - ERROR("amqp plugin: amqp_socket_open failed: %s", - sstrerror(status, errbuf, sizeof(errbuf))); + ERROR("amqp plugin: amqp_socket_open failed: %s", STRERROR(status)); amqp_destroy_connection(conf->connection); conf->connection = NULL; - return (status); + return status; } #else /* HAVE_AMQP_TCP_SOCKET */ #define CLOSE_SOCKET() close(sockfd) /* this interface is deprecated as of rabbitmq-c 0.4 */ sockfd = amqp_open_socket(CONF(conf, host), conf->port); if (sockfd < 0) { - char errbuf[1024]; status = (-1) * sockfd; - ERROR("amqp plugin: amqp_open_socket failed: %s", - sstrerror(status, errbuf, sizeof(errbuf))); + ERROR("amqp plugin: amqp_open_socket failed: %s", STRERROR(status)); amqp_destroy_connection(conf->connection); conf->connection = NULL; - return (status); + return status; } amqp_set_sockfd(conf->connection, sockfd); #endif @@ -477,7 +474,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ amqp_destroy_connection(conf->connection); CLOSE_SOCKET(); conf->connection = NULL; - return (1); + return 1; } amqp_channel_open(conf->connection, /* channel = */ 1); @@ -489,7 +486,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ amqp_destroy_connection(conf->connection); CLOSE_SOCKET(); conf->connection = NULL; - return (1); + return 1; } INFO("amqp plugin: Successfully opened connection to vhost \"%s\" " @@ -498,16 +495,16 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */ status = camqp_create_exchange(conf); if (status != 0) - return (status); + return status; if (!conf->publish) - return (camqp_setup_queue(conf)); - return (0); + return camqp_setup_queue(conf); + return 0; } /* }}} int camqp_connect */ static int camqp_shutdown(void) /* {{{ */ { - DEBUG("amqp plugin: Shutting down %zu subscriber threads.", + DEBUG("amqp plugin: Shutting down %" PRIsz " subscriber threads.", subscriber_threads_num); subscriber_threads_running = 0; @@ -524,7 +521,7 @@ static int camqp_shutdown(void) /* {{{ */ DEBUG("amqp plugin: All subscriber threads exited."); - return (0); + return 0; } /* }}} int camqp_shutdown */ /* @@ -545,22 +542,20 @@ static int camqp_read_body(camqp_config_t *conf, /* {{{ */ while (received < body_size) { status = amqp_simple_wait_frame(conf->connection, &frame); if (status < 0) { - char errbuf[1024]; status = (-1) * status; - ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", - sstrerror(status, errbuf, sizeof(errbuf))); + ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", STRERROR(status)); camqp_close_connection(conf); - return (status); + return status; } if (frame.frame_type != AMQP_FRAME_BODY) { NOTICE("amqp plugin: Unexpected frame type: %#" PRIx8, frame.frame_type); - return (-1); + return -1; } if ((body_size - received) < frame.payload.body_fragment.len) { WARNING("amqp plugin: Body is larger than indicated by header."); - return (-1); + return -1; } memcpy(body_ptr, frame.payload.body_fragment.bytes, @@ -573,19 +568,19 @@ static int camqp_read_body(camqp_config_t *conf, /* {{{ */ status = cmd_handle_putval(stderr, body); if (status != 0) ERROR("amqp plugin: cmd_handle_putval failed with status %i.", status); - return (status); + return status; } else if (strcasecmp("application/json", content_type) == 0) { ERROR("amqp plugin: camqp_read_body: Parsing JSON data has not " "been implemented yet. FIXME!"); - return (0); + return 0; } else { ERROR("amqp plugin: camqp_read_body: Unknown content type \"%s\".", content_type); - return (EINVAL); + return EINVAL; } /* not reached */ - return (0); + return 0; } /* }}} int camqp_read_body */ static int camqp_read_header(camqp_config_t *conf) /* {{{ */ @@ -597,31 +592,29 @@ static int camqp_read_header(camqp_config_t *conf) /* {{{ */ status = amqp_simple_wait_frame(conf->connection, &frame); if (status < 0) { - char errbuf[1024]; status = (-1) * status; - ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", - sstrerror(status, errbuf, sizeof(errbuf))); + ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", STRERROR(status)); camqp_close_connection(conf); - return (status); + return status; } if (frame.frame_type != AMQP_FRAME_HEADER) { NOTICE("amqp plugin: Unexpected frame type: %#" PRIx8, frame.frame_type); - return (-1); + return -1; } properties = frame.payload.properties.decoded; content_type = camqp_bytes_cstring(&properties->content_type); if (content_type == NULL) { ERROR("amqp plugin: Unable to determine content type."); - return (-1); + return -1; } status = camqp_read_body(conf, (size_t)frame.payload.properties.body_size, content_type); sfree(content_type); - return (status); + return status; } /* }}} int camqp_read_header */ static void *camqp_subscribe_thread(void *user_data) /* {{{ */ @@ -671,7 +664,7 @@ static void *camqp_subscribe_thread(void *user_data) /* {{{ */ camqp_config_free(conf); pthread_exit(NULL); - return (NULL); + return NULL; } /* }}} void *camqp_subscribe_thread */ static int camqp_subscribe_init(camqp_config_t *conf) /* {{{ */ @@ -684,7 +677,7 @@ static int camqp_subscribe_init(camqp_config_t *conf) /* {{{ */ if (tmp == NULL) { ERROR("amqp plugin: realloc failed."); sfree(subscriber_threads); - return (ENOMEM); + return ENOMEM; } subscriber_threads = tmp; tmp = subscriber_threads + subscriber_threads_num; @@ -693,15 +686,13 @@ static int camqp_subscribe_init(camqp_config_t *conf) /* {{{ */ status = plugin_thread_create(tmp, /* attr = */ NULL, camqp_subscribe_thread, conf, "amqp subscribe"); if (status != 0) { - char errbuf[1024]; - ERROR("amqp plugin: pthread_create failed: %s", - sstrerror(status, errbuf, sizeof(errbuf))); - return (status); + ERROR("amqp plugin: pthread_create failed: %s", STRERROR(status)); + return status; } subscriber_threads_num++; - return (0); + return 0; } /* }}} int camqp_subscribe_init */ /* @@ -714,7 +705,7 @@ static int camqp_write_locked(camqp_config_t *conf, /* {{{ */ status = camqp_connect(conf); if (status != 0) - return (status); + return status; amqp_basic_properties_t props = {._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | @@ -742,7 +733,7 @@ static int camqp_write_locked(camqp_config_t *conf, /* {{{ */ camqp_close_connection(conf); } - return (status); + return status; } /* }}} int camqp_write_locked */ static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ @@ -753,7 +744,7 @@ static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ int status; if ((ds == NULL) || (vl == NULL) || (conf == NULL)) - return (EINVAL); + return EINVAL; if (conf->routing_key != NULL) { sstrncpy(routing_key, conf->routing_key, sizeof(routing_key)); @@ -776,7 +767,7 @@ static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ status = cmd_create_putval(buffer, sizeof(buffer), ds, vl); if (status != 0) { ERROR("amqp plugin: cmd_create_putval failed with status %i.", status); - return (status); + return status; } } else if (conf->format == CAMQP_FORMAT_JSON) { size_t bfree = sizeof(buffer); @@ -791,18 +782,18 @@ static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ conf->postfix, conf->escape_char, conf->graphite_flags); if (status != 0) { ERROR("amqp plugin: format_graphite failed with status %i.", status); - return (status); + return status; } } else { ERROR("amqp plugin: Invalid format (%i).", conf->format); - return (-1); + return -1; } pthread_mutex_lock(&conf->lock); status = camqp_write_locked(conf, buffer, routing_key); pthread_mutex_unlock(&conf->lock); - return (status); + return status; } /* }}} int camqp_write */ /* @@ -816,7 +807,7 @@ static int camqp_config_set_format(oconfig_item_t *ci, /* {{{ */ string = NULL; status = cf_util_get_string(ci, &string); if (status != 0) - return (status); + return status; assert(string != NULL); if (strcasecmp("Command", string) == 0) @@ -831,18 +822,18 @@ static int camqp_config_set_format(oconfig_item_t *ci, /* {{{ */ free(string); - return (0); + return 0; } /* }}} int config_set_string */ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ - _Bool publish) { + bool publish) { camqp_config_t *conf; int status; conf = calloc(1, sizeof(*conf)); if (conf == NULL) { ERROR("amqp plugin: calloc failed."); - return (ENOMEM); + return ENOMEM; } /* Initialize "conf" {{{ */ @@ -860,7 +851,7 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; - conf->store_rates = 0; + conf->store_rates = false; conf->graphite_flags = 0; /* publish & graphite only */ conf->prefix = NULL; @@ -869,8 +860,8 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ /* subscribe only */ conf->exchange_type = NULL; conf->queue = NULL; - conf->queue_durable = 0; - conf->queue_auto_delete = 1; + conf->queue_durable = false; + conf->queue_auto_delete = true; /* general */ conf->connection = NULL; pthread_mutex_init(&conf->lock, /* attr = */ NULL); @@ -879,7 +870,7 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ status = cf_util_get_string(ci, &conf->name); if (status != 0) { sfree(conf); - return (status); + return status; } for (int i = 0; i < ci->children_num; i++) { @@ -912,7 +903,7 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ else if (strcasecmp("RoutingKey", child->key) == 0) status = cf_util_get_string(child, &conf->routing_key); else if ((strcasecmp("Persistent", child->key) == 0) && publish) { - _Bool tmp = 0; + bool tmp = 0; status = cf_util_get_boolean(child, &tmp); if (tmp) conf->delivery_mode = CAMQP_DM_PERSISTENT; @@ -970,7 +961,7 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ if (status != 0) { camqp_config_free(conf); - return (status); + return status; } if (conf->exchange != NULL) { @@ -982,23 +973,24 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */ char cbname[128]; ssnprintf(cbname, sizeof(cbname), "amqp/%s", conf->name); - status = plugin_register_write( - cbname, camqp_write, &(user_data_t){ - .data = conf, .free_func = camqp_config_free, - }); + status = plugin_register_write(cbname, camqp_write, + &(user_data_t){ + .data = conf, + .free_func = camqp_config_free, + }); if (status != 0) { camqp_config_free(conf); - return (status); + return status; } } else { status = camqp_subscribe_init(conf); if (status != 0) { camqp_config_free(conf); - return (status); + return status; } } - return (0); + return 0; } /* }}} int camqp_config_connection */ static int camqp_config(oconfig_item_t *ci) /* {{{ */ @@ -1007,20 +999,18 @@ static int camqp_config(oconfig_item_t *ci) /* {{{ */ oconfig_item_t *child = ci->children + i; if (strcasecmp("Publish", child->key) == 0) - camqp_config_connection(child, /* publish = */ 1); + camqp_config_connection(child, /* publish = */ true); else if (strcasecmp("Subscribe", child->key) == 0) - camqp_config_connection(child, /* publish = */ 0); + camqp_config_connection(child, /* publish = */ false); else WARNING("amqp plugin: Ignoring unknown config option \"%s\".", child->key); } /* for (ci->children_num) */ - return (0); + return 0; } /* }}} int camqp_config */ void module_register(void) { plugin_register_complex_config("amqp", camqp_config); plugin_register_shutdown("amqp", camqp_shutdown); } /* void module_register */ - -/* vim: set sw=4 sts=4 et fdm=marker : */