X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp1.c;h=67c96b750256550c90fee2776a338b170695c058;hb=39049b56158161b4f9eeacdad8918bcf0f1f7e90;hp=4ba73596c1140f45a02d107dc0aab03e7406ff3f;hpb=9b2d6a2792ef579fca5c03a2076c1e05f4b93507;p=collectd.git diff --git a/src/amqp1.c b/src/amqp1.c index 4ba73596..67c96b75 100644 --- a/src/amqp1.c +++ b/src/amqp1.c @@ -26,12 +26,12 @@ #include "collectd.h" -#include "common.h" #include "plugin.h" -#include "utils_cmd_putval.h" -#include "utils_deq.h" -#include "utils_format_graphite.h" -#include "utils_format_json.h" +#include "utils/cmds/putval.h" +#include "utils/common/common.h" +#include "utils/deq/deq.h" +#include "utils/format_graphite/format_graphite.h" +#include "utils/format_json/format_json.h" #include "utils_random.h" #include @@ -92,16 +92,16 @@ DEQ_DECLARE(cd_message_t, cd_message_list_t); /* * Globals */ -static pn_connection_t *conn = NULL; -static pn_link_t *sender = NULL; -static pn_proactor_t *proactor = NULL; +static pn_connection_t *conn; +static pn_link_t *sender; +static pn_proactor_t *proactor; static pthread_mutex_t send_lock; static cd_message_list_t out_messages; static uint64_t cd_tag = 1; -static uint64_t acknowledged = 0; -static amqp1_config_transport_t *transport = NULL; -static bool stopping = false; -static int event_thread_running = 0; +static uint64_t acknowledged; +static amqp1_config_transport_t *transport; +static bool stopping; +static bool event_thread_running; static pthread_t event_thread_id; /* @@ -135,7 +135,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ while (cdm) { DEQ_REMOVE_HEAD(out_messages); DEQ_INSERT_TAIL(to_send, cdm); - if (DEQ_SIZE(to_send) == link_credit) + if (DEQ_SIZE(to_send) == (size_t)link_credit) break; cdm = DEQ_HEAD(out_messages); } @@ -299,7 +299,7 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ cdm = DEQ_HEAD(out_messages); } - event_thread_running = 0; + event_thread_running = false; return NULL; } /* }}} void event_thread */ @@ -307,34 +307,36 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ static int encqueue(cd_message_t *cdm, amqp1_config_instance_t *instance) /* {{{ */ { - size_t bufsize = BUFSIZE; - pn_data_t *body; - pn_message_t *message; - int status = 0; - /* encode message */ - message = pn_message(); + pn_message_t *message = pn_message(); pn_message_set_address(message, instance->send_to); - body = pn_message_body(message); + pn_data_t *body = pn_message_body(message); pn_data_clear(body); pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start)); pn_data_exit(body); /* put_binary copies and stores so ok to use mbuf */ - cdm->mbuf.size = bufsize; - while ((status = pn_message_encode(message, (char *)cdm->mbuf.start, + cdm->mbuf.size = BUFSIZE; + + int status; + char *start; + while ((status = pn_message_encode(message, cdm->mbuf.start, &cdm->mbuf.size)) == PN_OVERFLOW) { - DEBUG("amqp1 plugin: increasing message buffer size %i", - (int)cdm->mbuf.size); + DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size); cdm->mbuf.size *= 2; - cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size); + start = realloc(cdm->mbuf.start, cdm->mbuf.size); + if (start == NULL) { + status = -1; + break; + } else { + cdm->mbuf.start = start; + } } if (status != 0) { ERROR("amqp1 plugin: error encoding message: %s", pn_error_text(pn_message_error(message))); pn_message_free(message); - cd_message_free(cdm); return -1; } @@ -345,7 +347,7 @@ static int encqueue(cd_message_t *cdm, pn_message_free(message); /* activate the sender */ - if (conn != NULL) { + if (conn) { pn_connection_wake(conn); } @@ -355,70 +357,99 @@ static int encqueue(cd_message_t *cdm, static int amqp1_notify(notification_t const *n, user_data_t *user_data) /* {{{ */ { - amqp1_config_instance_t *instance; int status = 0; size_t bfree = BUFSIZE; size_t bfill = 0; - cd_message_t *cdm; size_t bufsize = BUFSIZE; - if ((n == NULL) || (user_data == NULL)) + if (n == NULL || user_data == NULL) return EINVAL; - instance = user_data->data; + amqp1_config_instance_t *instance = user_data->data; if (instance->notify != true) { ERROR("amqp1 plugin: write notification failed"); } - cdm = (cd_message_t *)malloc(sizeof(cd_message_t)); + cd_message_t *cdm = malloc(sizeof(*cdm)); + if (cdm == NULL) { + ERROR("amqp1 plugin: notify failed"); + return -1; + } + DEQ_ITEM_INIT(cdm); - cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize)); + char *start = malloc(bufsize); + if (start == NULL) { + ERROR("amqp1 plugin: malloc failed"); + free(cdm); + return -1; + } + cdm->mbuf.size = bufsize; + cdm->mbuf.start = start; cdm->instance = instance; switch (instance->format) { case AMQP1_FORMAT_JSON: - format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree); - status = format_json_notification((char *)cdm->mbuf.start, bufsize, n); + format_json_initialize(cdm->mbuf.start, &bfill, &bfree); + status = format_json_notification(cdm->mbuf.start, bufsize, n); if (status != 0) { ERROR("amqp1 plugin: formatting notification failed"); + cd_message_free(cdm); return status; } cdm->mbuf.size = strlen(cdm->mbuf.start); + if (cdm->mbuf.size >= BUFSIZE) { + ERROR("amqp1 plugin: notify format json failed"); + cd_message_free(cdm); + return -1; + } break; default: ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format); + cd_message_free(cdm); return -1; } /* encode message and place on outbound queue */ status = encqueue(cdm, instance); - + if (status != 0) { + ERROR("amqp1 plugin: notify enqueue failed"); + cd_message_free(cdm); + } return status; + } /* }}} int amqp1_notify */ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ user_data_t *user_data) { - amqp1_config_instance_t *instance; int status = 0; size_t bfree = BUFSIZE; size_t bfill = 0; - cd_message_t *cdm; size_t bufsize = BUFSIZE; - if ((ds == NULL) || (vl == NULL) || (transport == NULL) || - (user_data == NULL)) + if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL) return EINVAL; - instance = user_data->data; + amqp1_config_instance_t *instance = user_data->data; if (instance->notify != false) { ERROR("amqp1 plugin: write failed"); } - cdm = (cd_message_t *)malloc(sizeof(cd_message_t)); + cd_message_t *cdm = malloc(sizeof(*cdm)); + if (cdm == NULL) { + ERROR("amqp1 plugin: malloc failed."); + return -1; + } DEQ_ITEM_INIT(cdm); - cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize)); + char *start = malloc(bufsize); + if (start == NULL) { + ERROR("amqp1 plugin: malloc failed."); + free(cdm); + return -1; + } + cdm->mbuf.size = bufsize; + cdm->mbuf.start = start; cdm->instance = instance; switch (instance->format) { @@ -426,16 +457,33 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl); if (status != 0) { ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status); + cd_message_free(cdm); return status; } cdm->mbuf.size = strlen(cdm->mbuf.start); + if (cdm->mbuf.size >= BUFSIZE) { + ERROR("amqp1 plugin: format cmd failed"); + cd_message_free(cdm); + return -1; + } break; case AMQP1_FORMAT_JSON: format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree); format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl, instance->store_rates); - format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree); + status = format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree); + if (status != 0) { + ERROR("amqp1 plugin: format_json_finalize failed with status %i.", + status); + cd_message_free(cdm); + return status; + } cdm->mbuf.size = strlen(cdm->mbuf.start); + if (cdm->mbuf.size >= BUFSIZE) { + ERROR("amqp1 plugin: format json failed"); + cd_message_free(cdm); + return -1; + } break; case AMQP1_FORMAT_GRAPHITE: status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl, @@ -443,19 +491,30 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ instance->escape_char, instance->graphite_flags); if (status != 0) { ERROR("amqp1 plugin: format_graphite failed with status %i.", status); + cd_message_free(cdm); return status; } cdm->mbuf.size = strlen(cdm->mbuf.start); + if (cdm->mbuf.size >= BUFSIZE) { + ERROR("amqp1 plugin: format graphite failed"); + cd_message_free(cdm); + return -1; + } break; default: ERROR("amqp1 plugin: Invalid write format (%i).", instance->format); + cd_message_free(cdm); return -1; } - /* encode message and place on outboud queue */ - encqueue(cdm, instance); + /* encode message and place on outbound queue */ + status = encqueue(cdm, instance); + if (status != 0) { + ERROR("amqp1 plugin: write enqueue failed"); + cd_message_free(cdm); + } + return status; - return 0; } /* }}} int amqp1_write */ static void amqp1_config_transport_free(void *ptr) /* {{{ */ @@ -467,6 +526,7 @@ static void amqp1_config_transport_free(void *ptr) /* {{{ */ sfree(transport->name); sfree(transport->host); + sfree(transport->port); sfree(transport->user); sfree(transport->password); sfree(transport->address); @@ -490,17 +550,13 @@ static void amqp1_config_instance_free(void *ptr) /* {{{ */ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ { - int status = 0; - char *key = NULL; - amqp1_config_instance_t *instance; - - instance = calloc(1, sizeof(*instance)); + amqp1_config_instance_t *instance = calloc(1, sizeof(*instance)); if (instance == NULL) { ERROR("amqp1 plugin: calloc failed."); return ENOMEM; } - status = cf_util_get_string(ci, &instance->name); + int status = cf_util_get_string(ci, &instance->name); if (status != 0) { sfree(instance); return status; @@ -514,11 +570,10 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ else if (strcasecmp("Notify", child->key) == 0) status = cf_util_get_boolean(child, &instance->notify); else if (strcasecmp("Format", child->key) == 0) { + char *key = NULL; status = cf_util_get_string(child, &key); if (status != 0) return status; - /* TODO: goto errout */ - // goto errout; assert(key != NULL); if (strcasecmp(key, "Command") == 0) { instance->format = AMQP1_FORMAT_COMMAND; @@ -558,7 +613,7 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ } else WARNING("amqp1 plugin: Ignoring unknown " "instance configuration option " - "\%s\".", + "\"%s\".", child->key); if (status != 0) break; @@ -569,29 +624,31 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ return status; } else { char tpname[DATA_MAX_NAME_LEN]; - status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); + status = ssnprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); if ((status < 0) || (size_t)status >= sizeof(tpname)) { ERROR("amqp1 plugin: Instance name would have been truncated."); return -1; } - status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", - transport->address, instance->name); + status = ssnprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", + transport->address, instance->name); if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) { ERROR("amqp1 plugin: send_to address would have been truncated."); return -1; } - if (instance->notify == true) { + if (instance->notify) { status = plugin_register_notification( tpname, amqp1_notify, &(user_data_t){ - .data = instance, .free_func = amqp1_config_instance_free, + .data = instance, + .free_func = amqp1_config_instance_free, }); } else { - status = plugin_register_write( - tpname, amqp1_write, - &(user_data_t){ - .data = instance, .free_func = amqp1_config_instance_free, - }); + status = + plugin_register_write(tpname, amqp1_write, + &(user_data_t){ + .data = instance, + .free_func = amqp1_config_instance_free, + }); } if (status != 0) { @@ -604,8 +661,6 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ { - int status = 0; - transport = calloc(1, sizeof(*transport)); if (transport == NULL) { ERROR("amqp1 plugin: calloc failed."); @@ -615,7 +670,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ /* Initialize transport configuration {{{ */ transport->retry_delay = 1; - status = cf_util_get_string(ci, &transport->name); + int status = cf_util_get_string(ci, &transport->name); if (status != 0) { sfree(transport); return status; @@ -641,7 +696,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ else WARNING("amqp1 plugin: Ignoring unknown " "transport configuration option " - "\%s\".", + "\"%s\".", child->key); if (status != 0) @@ -663,7 +718,7 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */ if (strcasecmp("Transport", child->key) == 0) amqp1_config_transport(child); else - WARNING("amqp1 plugin: Ignoring unknown config option \%s\".", + WARNING("amqp1 plugin: Ignoring unknown config option \"%s\".", child->key); } @@ -672,9 +727,6 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */ static int amqp1_init(void) /* {{{ */ { - int status; - char errbuf[1024]; - if (transport == NULL) { ERROR("amqp1: init failed, no transport configured"); return -1; @@ -683,14 +735,13 @@ static int amqp1_init(void) /* {{{ */ if (proactor == NULL) { pthread_mutex_init(&send_lock, /* attr = */ NULL); /* start_thread */ - status = + int status = plugin_thread_create(&event_thread_id, NULL /* no attributes */, event_thread, NULL /* no argument */, "handle"); if (status != 0) { - ERROR("amqp1 plugin: pthread_create failed: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("amqp1 plugin: pthread_create failed: %s", STRERRNO); } else { - event_thread_running = 1; + event_thread_running = true; } } return 0; @@ -701,7 +752,7 @@ static int amqp1_shutdown(void) /* {{{ */ stopping = true; /* Stop the proactor thread */ - if (event_thread_running == 1) { + if (event_thread_running) { DEBUG("amqp1 plugin: Shutting down proactor thread."); pn_connection_wake(conn); } @@ -710,7 +761,7 @@ static int amqp1_shutdown(void) /* {{{ */ DEBUG("amqp1 plugin: proactor thread exited."); - if (transport != NULL) { + if (transport) { amqp1_config_transport_free(transport); }