X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp1.c;h=4325f0016d0c2e01c538075ceaf13fb95e1233a5;hb=f43a473fd8ffa483bd7d74579a22886ab2df9101;hp=dcd17dd029d6fa6ab6f522bc1377a89a477e43d0;hpb=7feccc9a58e2c75b87f9f2883c9b4b5a0612938e;p=collectd.git diff --git a/src/amqp1.c b/src/amqp1.c index dcd17dd0..4325f001 100644 --- a/src/amqp1.c +++ b/src/amqp1.c @@ -26,16 +26,16 @@ #include "collectd.h" -#include "common.h" #include "plugin.h" -#include "utils_cmd_putval.h" -#include "utils_format_graphite.h" -#include "utils_format_json.h" +#include "utils/cmds/putval.h" +#include "utils/common/common.h" +#include "utils/deq/deq.h" +#include "utils/format_graphite/format_graphite.h" +#include "utils/format_json/format_json.h" #include "utils_random.h" -#include "utils_deq.h" -#include #include +#include #include #include #include @@ -44,85 +44,86 @@ #include #include -#include -#include #include #include +#include +#include #define BUFSIZE 8192 #define AMQP1_FORMAT_JSON 0 #define AMQP1_FORMAT_COMMAND 1 #define AMQP1_FORMAT_GRAPHITE 2 -typedef struct amqp1_config_transport_t { - DEQ_LINKS(struct amqp1_config_transport_t); - char *name; - char *host; - char *port; - char *user; - char *password; - char *address; +typedef struct amqp1_config_transport_s { + DEQ_LINKS(struct amqp1_config_transport_s); + char *name; + char *host; + char *port; + char *user; + char *password; + char *address; + int retry_delay; } amqp1_config_transport_t; -typedef struct amqp1_config_instance_t { - DEQ_LINKS(struct amqp1_config_instance_t); - char *name; - uint8_t format; - unsigned int graphite_flags; - _Bool store_rates; - char *prefix; - char *postfix; - char escape_char; - _Bool pre_settle; - char send_to[128]; +typedef struct amqp1_config_instance_s { + DEQ_LINKS(struct amqp1_config_instance_s); + char *name; + bool notify; + uint8_t format; + unsigned int graphite_flags; + bool store_rates; + char *prefix; + char *postfix; + char escape_char; + bool pre_settle; + char send_to[1024]; } amqp1_config_instance_t; DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t); -typedef struct cd_message_t { - DEQ_LINKS(struct cd_message_t); - pn_bytes_t mbuf; +typedef struct cd_message_s { + DEQ_LINKS(struct cd_message_s); + pn_rwbytes_t mbuf; amqp1_config_instance_t *instance; } cd_message_t; DEQ_DECLARE(cd_message_t, cd_message_list_t); -/* +/* * Globals */ -pn_connection_t *conn = NULL; -pn_session_t *ssn = NULL; -pn_link_t *sender = NULL; -pn_proactor_t *proactor = NULL; -pthread_mutex_t send_lock; -cd_message_list_t out_messages; -uint64_t cd_tag = 1; -uint64_t acknowledged = 0; -amqp1_config_transport_t *transport = NULL; -bool finished = false; - -static int event_thread_running = 0; +static pn_connection_t *conn; +static pn_link_t *sender; +static pn_proactor_t *proactor; +static pthread_mutex_t send_lock; +static cd_message_list_t out_messages; +static uint64_t cd_tag = 1; +static uint64_t acknowledged; +static amqp1_config_transport_t *transport; +static bool stopping; +static bool event_thread_running; static pthread_t event_thread_id; /* * Functions */ -static void cd_message_free(cd_message_t *cdm) -{ - if (cdm->mbuf.start) { - free((void *)cdm->mbuf.start); - } +static void cd_message_free(cd_message_t *cdm) { + free(cdm->mbuf.start); free(cdm); } /* }}} void cd_message_free */ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ { - uint64_t dtag; + uint64_t dtag; cd_message_list_t to_send; - cd_message_t *cdm; - int link_credit = pn_link_credit(link); - int event_count = 0; - pn_delivery_t *dlv; + cd_message_t *cdm; + int link_credit = pn_link_credit(link); + int event_count = 0; + pn_delivery_t *dlv; + + if (stopping) { + return 0; + } DEQ_INIT(to_send); @@ -134,7 +135,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ while (cdm) { DEQ_REMOVE_HEAD(out_messages); DEQ_INSERT_TAIL(to_send, cdm); - if (DEQ_SIZE(to_send) == link_credit) + if (DEQ_SIZE(to_send) == (size_t)link_credit) break; cdm = DEQ_HEAD(out_messages); } @@ -148,12 +149,12 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ while (cdm) { DEQ_REMOVE_HEAD(to_send); dtag++; - dlv = pn_delivery(link, pn_dtag((const char*)&dtag, sizeof(dtag))); + dlv = pn_delivery(link, pn_dtag((const char *)&dtag, sizeof(dtag))); pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size); pn_link_advance(link); if (cdm->instance->pre_settle == true) { pn_delivery_settle(dlv); - } + } event_count++; cd_message_free(cdm); cdm = DEQ_HEAD(to_send); @@ -165,10 +166,8 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */ { if (pn_condition_is_set(cond)) { - ERROR("amqp1 plugin: %s: %s: %s", - pn_event_type_name(pn_event_type(e)), - pn_condition_get_name(cond), - pn_condition_get_description(cond)); + ERROR("amqp1 plugin: %s: %s: %s", pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), pn_condition_get_description(cond)); pn_connection_close(pn_event_connection(e)); conn = NULL; } @@ -179,11 +178,11 @@ static bool handle(pn_event_t *event) /* {{{ */ switch (pn_event_type(event)) { - case PN_CONNECTION_INIT:{ + case PN_CONNECTION_INIT: { conn = pn_event_connection(event); - pn_connection_set_container(conn, transport->address); + pn_connection_set_container(conn, transport->name); pn_connection_open(conn); - ssn = pn_session(conn); + pn_session_t *ssn = pn_session(conn); pn_session_open(ssn); sender = pn_sender(ssn, "cd-sender"); pn_link_set_snd_settle_mode(sender, PN_SND_MIXED); @@ -199,33 +198,36 @@ static bool handle(pn_event_t *event) /* {{{ */ case PN_DELIVERY: { /* acknowledgement from peer that a message was delivered */ - pn_delivery_t * dlv = pn_event_delivery(event); + pn_delivery_t *dlv = pn_event_delivery(event); if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) { + pn_delivery_settle(dlv); acknowledged++; } break; } case PN_CONNECTION_WAKE: { - if (!finished) { + if (!stopping) { amqp1_send_out_messages(sender); } break; } - + case PN_TRANSPORT_CLOSED: { check_condition(event, pn_transport_condition(pn_event_transport(event))); break; } case PN_CONNECTION_REMOTE_CLOSE: { - check_condition(event, pn_session_remote_condition(pn_event_session(event))); + check_condition(event, + pn_session_remote_condition(pn_event_session(event))); pn_connection_close(pn_event_connection(event)); break; } case PN_SESSION_REMOTE_CLOSE: { - check_condition(event, pn_session_remote_condition(pn_event_session(event))); + check_condition(event, + pn_session_remote_condition(pn_event_session(event))); pn_connection_close(pn_event_connection(event)); break; } @@ -235,54 +237,219 @@ static bool handle(pn_event_t *event) /* {{{ */ check_condition(event, pn_link_remote_condition(pn_event_link(event))); pn_connection_close(pn_event_connection(event)); break; - } + } case PN_PROACTOR_INACTIVE: { return false; } - default: break; + default: + break; } return true; } /* }}} bool handle */ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ { + char addr[PN_MAX_ADDR]; + cd_message_t *cdm; + + /* setup proactor */ + proactor = pn_proactor(); + pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port); - do { - pn_event_batch_t *events = pn_proactor_wait(proactor); - pn_event_t *e; - for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { - if (!handle(e)) { - finished = true; + while (!stopping) { + /* make connection */ + conn = pn_connection(); + if (transport->user != NULL) { + pn_connection_set_user(conn, transport->user); + pn_connection_set_password(conn, transport->password); + } + pn_proactor_connect(proactor, conn, addr); + + bool engine_running = true; + while (engine_running && !stopping) { + pn_event_batch_t *events = pn_proactor_wait(proactor); + pn_event_t *e; + while ((e = pn_event_batch_next(events))) { + engine_running = handle(e); + if (!engine_running) { + break; + } } + pn_proactor_done(proactor, events); + } + + pn_proactor_release_connection(conn); + + DEBUG("amqp1 plugin: retrying connection"); + int delay = transport->retry_delay; + while (delay-- > 0 && !stopping) { + sleep(1.0); } - pn_proactor_done(proactor, events); - } while (!finished); + } + + pn_proactor_disconnect(proactor, NULL); + + /* Free the remaining out_messages */ + cdm = DEQ_HEAD(out_messages); + while (cdm) { + DEQ_REMOVE_HEAD(out_messages); + cd_message_free(cdm); + cdm = DEQ_HEAD(out_messages); + } - event_thread_running = 0; + event_thread_running = false; return NULL; } /* }}} void event_thread */ -static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ - user_data_t *user_data) +static int encqueue(cd_message_t *cdm, + amqp1_config_instance_t *instance) /* {{{ */ +{ + /* encode message */ + pn_message_t *message = pn_message(); + pn_message_set_address(message, instance->send_to); + pn_data_t *body = pn_message_body(message); + pn_data_clear(body); + pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start)); + pn_data_exit(body); + + /* put_binary copies and stores so ok to use mbuf */ + cdm->mbuf.size = BUFSIZE; + + int status; + char *start; + while ((status = pn_message_encode(message, cdm->mbuf.start, + &cdm->mbuf.size)) == PN_OVERFLOW) { + DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size); + cdm->mbuf.size *= 2; + start = realloc(cdm->mbuf.start, cdm->mbuf.size); + if (start == NULL) { + status = -1; + break; + } else { + cdm->mbuf.start = start; + } + } + + if (status != 0) { + ERROR("amqp1 plugin: error encoding message: %s", + pn_error_text(pn_message_error(message))); + pn_message_free(message); + return -1; + } + + pthread_mutex_lock(&send_lock); + DEQ_INSERT_TAIL(out_messages, cdm); + pthread_mutex_unlock(&send_lock); + + pn_message_free(message); + + /* activate the sender */ + if (conn) { + pn_connection_wake(conn); + } + + return 0; +} /* }}} int encqueue */ + +static int amqp1_notify(notification_t const *n, + user_data_t *user_data) /* {{{ */ { + int status = 0; + size_t bfree = BUFSIZE; + size_t bfill = 0; + size_t bufsize = BUFSIZE; + + if (n == NULL || user_data == NULL) + return EINVAL; + amqp1_config_instance_t *instance = user_data->data; - int status = 0; - size_t bfree = BUFSIZE; - size_t bfill = 0; - cd_message_t *cdm; - size_t bufsize = BUFSIZE; - pn_data_t *body; - pn_message_t *message; - if ((ds == NULL) || (vl == NULL) || (transport == NULL)) + if (instance->notify != true) { + ERROR("amqp1 plugin: write notification failed"); + } + + cd_message_t *cdm = malloc(sizeof(*cdm)); + if (cdm == NULL) { + ERROR("amqp1 plugin: notify failed"); + return -1; + } + + DEQ_ITEM_INIT(cdm); + char *start = malloc(bufsize); + if (start == NULL) { + ERROR("amqp1 plugin: malloc failed"); + free(cdm); + return -1; + } + cdm->mbuf.size = bufsize; + cdm->mbuf.start = start; + cdm->instance = instance; + + switch (instance->format) { + case AMQP1_FORMAT_JSON: + format_json_initialize(cdm->mbuf.start, &bfill, &bfree); + status = format_json_notification(cdm->mbuf.start, bufsize, n); + if (status != 0) { + ERROR("amqp1 plugin: formatting notification failed"); + cd_message_free(cdm); + return status; + } + cdm->mbuf.size = strlen(cdm->mbuf.start); + if (cdm->mbuf.size >= BUFSIZE) { + ERROR("amqp1 plugin: notify format json failed"); + cd_message_free(cdm); + return -1; + } + break; + default: + ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format); + cd_message_free(cdm); + return -1; + } + + /* encode message and place on outbound queue */ + status = encqueue(cdm, instance); + if (status != 0) { + ERROR("amqp1 plugin: notify enqueue failed"); + cd_message_free(cdm); + } + return status; + +} /* }}} int amqp1_notify */ + +static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ + user_data_t *user_data) { + int status = 0; + size_t bfree = BUFSIZE; + size_t bfill = 0; + size_t bufsize = BUFSIZE; + + if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL) return EINVAL; - cdm = NEW(cd_message_t); + amqp1_config_instance_t *instance = user_data->data; + + if (instance->notify != false) { + ERROR("amqp1 plugin: write failed"); + } + + cd_message_t *cdm = malloc(sizeof(*cdm)); + if (cdm == NULL) { + ERROR("amqp1 plugin: malloc failed."); + return -1; + } DEQ_ITEM_INIT(cdm); - cdm->mbuf = pn_bytes(bufsize, (char *) malloc(bufsize)); + char *start = malloc(bufsize); + if (start == NULL) { + ERROR("amqp1 plugin: malloc failed."); + free(cdm); + return -1; + } + cdm->mbuf.size = bufsize; + cdm->mbuf.start = start; cdm->instance = instance; switch (instance->format) { @@ -290,67 +457,76 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl); if (status != 0) { ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status); + cd_message_free(cdm); return status; } cdm->mbuf.size = strlen(cdm->mbuf.start); + if (cdm->mbuf.size >= BUFSIZE) { + ERROR("amqp1 plugin: format cmd failed"); + cd_message_free(cdm); + return -1; + } break; case AMQP1_FORMAT_JSON: format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree); format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl, - instance->store_rates); - format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree); + instance->store_rates); + status = format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree); + if (status != 0) { + ERROR("amqp1 plugin: format_json_finalize failed with status %i.", + status); + cd_message_free(cdm); + return status; + } cdm->mbuf.size = strlen(cdm->mbuf.start); + if (cdm->mbuf.size >= BUFSIZE) { + ERROR("amqp1 plugin: format json failed"); + cd_message_free(cdm); + return -1; + } break; case AMQP1_FORMAT_GRAPHITE: - status = - format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl, instance->prefix, - instance->postfix, instance->escape_char, instance->graphite_flags); + status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl, + instance->prefix, instance->postfix, + instance->escape_char, instance->graphite_flags); if (status != 0) { ERROR("amqp1 plugin: format_graphite failed with status %i.", status); + cd_message_free(cdm); return status; } cdm->mbuf.size = strlen(cdm->mbuf.start); + if (cdm->mbuf.size >= BUFSIZE) { + ERROR("amqp1 plugin: format graphite failed"); + cd_message_free(cdm); + return -1; + } break; default: - ERROR("amqp1 plugin: Invalid format (%i).", instance->format); + ERROR("amqp1 plugin: Invalid write format (%i).", instance->format); + cd_message_free(cdm); return -1; } - /* encode message */ - message = pn_message(); - pn_message_set_address(message, instance->send_to); - body = pn_message_body(message); - pn_data_clear(body); - pn_data_put_binary(body, cdm->mbuf); - pn_data_exit(body); - - /* put_binary copies and stores so ok to use mbuf */ - cdm->mbuf.size = bufsize; - pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size); - - pthread_mutex_lock(&send_lock); - DEQ_INSERT_TAIL(out_messages, cdm); - pthread_mutex_unlock(&send_lock); - - pn_message_free(message); - - /* activate the sender */ - if (conn != NULL) { - pn_connection_wake(conn); + /* encode message and place on outbound queue */ + status = encqueue(cdm, instance); + if (status != 0) { + ERROR("amqp1 plugin: write enqueue failed"); + cd_message_free(cdm); } + return status; - return 0; -} /* }}} int amqp_write1 */ +} /* }}} int amqp1_write */ static void amqp1_config_transport_free(void *ptr) /* {{{ */ { amqp1_config_transport_t *transport = ptr; - + if (transport == NULL) return; sfree(transport->name); sfree(transport->host); + sfree(transport->port); sfree(transport->user); sfree(transport->password); sfree(transport->address); @@ -361,7 +537,7 @@ static void amqp1_config_transport_free(void *ptr) /* {{{ */ static void amqp1_config_instance_free(void *ptr) /* {{{ */ { amqp1_config_instance_t *instance = ptr; - + if (instance == NULL) return; @@ -374,20 +550,13 @@ static void amqp1_config_instance_free(void *ptr) /* {{{ */ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ { - int status=0; - char *key = NULL; - amqp1_config_instance_t *instance; - - instance = calloc(1, sizeof(*instance)); + amqp1_config_instance_t *instance = calloc(1, sizeof(*instance)); if (instance == NULL) { ERROR("amqp1 plugin: calloc failed."); return ENOMEM; } - /* Initialize instance configuration {{{ */ - instance->name = NULL; - - status = cf_util_get_string(ci, &instance->name); + int status = cf_util_get_string(ci, &instance->name); if (status != 0) { sfree(instance); return status; @@ -398,12 +567,15 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ if (strcasecmp("PreSettle", child->key) == 0) status = cf_util_get_boolean(child, &instance->pre_settle); + else if (strcasecmp("Notify", child->key) == 0) + status = cf_util_get_boolean(child, &instance->notify); else if (strcasecmp("Format", child->key) == 0) { + char *key = NULL; status = cf_util_get_string(child, &key); - if (status != 0) - return status; - /* TODO: goto errout */ - // goto errout; + if (status != 0) { + amqp1_config_instance_free(instance); + return status; + } assert(key != NULL); if (strcasecmp(key, "Command") == 0) { instance->format = AMQP1_FORMAT_COMMAND; @@ -415,8 +587,7 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ WARNING("amqp1 plugin: Invalid format string: %s", key); } sfree(key); - } - else if (strcasecmp("StoreRates", child->key) == 0) + } else if (strcasecmp("StoreRates", child->key) == 0) status = cf_util_get_boolean(child, &instance->store_rates); else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0) status = cf_util_get_flag(child, &instance->graphite_flags, @@ -434,16 +605,18 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) { char *tmp_buff = NULL; status = cf_util_get_string(child, &tmp_buff); - if (strlen(tmp_buff) > 1) - WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles " - "only one character. Others will be ignored."); - instance->escape_char = tmp_buff[0]; + if (status == 0) { + if (strlen(tmp_buff) > 1) + WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles " + "only one character. Others will be ignored."); + instance->escape_char = tmp_buff[0]; + } sfree(tmp_buff); - } - else + } else WARNING("amqp1 plugin: Ignoring unknown " "instance configuration option " - "\%s\".", child->key); + "\"%s\".", + child->key); if (status != 0) break; } @@ -452,12 +625,36 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ amqp1_config_instance_free(instance); return status; } else { - char tpname[128]; - snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); - snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", - transport->address,instance->name); - status = plugin_register_write(tpname, amqp1_write, &(user_data_t) { - .data = instance, .free_func = amqp1_config_instance_free, }); + char tpname[DATA_MAX_NAME_LEN]; + status = ssnprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); + if ((status < 0) || (size_t)status >= sizeof(tpname)) { + ERROR("amqp1 plugin: Instance name would have been truncated."); + amqp1_config_instance_free(instance); + return -1; + } + status = ssnprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", + transport->address, instance->name); + if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) { + ERROR("amqp1 plugin: send_to address would have been truncated."); + amqp1_config_instance_free(instance); + return -1; + } + if (instance->notify) { + status = plugin_register_notification( + tpname, amqp1_notify, + &(user_data_t){ + .data = instance, + .free_func = amqp1_config_instance_free, + }); + } else { + status = + plugin_register_write(tpname, amqp1_write, + &(user_data_t){ + .data = instance, + .free_func = amqp1_config_instance_free, + }); + } + if (status != 0) { amqp1_config_instance_free(instance); } @@ -465,11 +662,9 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ return status; } /* }}} int amqp1_config_instance */ - + static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ { - int status=0; - transport = calloc(1, sizeof(*transport)); if (transport == NULL) { ERROR("amqp1 plugin: calloc failed."); @@ -477,14 +672,14 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ } /* Initialize transport configuration {{{ */ - transport->name = NULL; + transport->retry_delay = 1; - status = cf_util_get_string(ci, &transport->name); + int status = cf_util_get_string(ci, &transport->name); if (status != 0) { sfree(transport); return status; } - + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; @@ -498,13 +693,16 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ status = cf_util_get_string(child, &transport->password); else if (strcasecmp("Address", child->key) == 0) status = cf_util_get_string(child, &transport->address); - else if (strcasecmp("Instance",child->key) == 0) + else if (strcasecmp("RetryDelay", child->key) == 0) + status = cf_util_get_int(child, &transport->retry_delay); + else if (strcasecmp("Instance", child->key) == 0) amqp1_config_instance(child); else WARNING("amqp1 plugin: Ignoring unknown " "transport configuration option " - "\%s\".", child->key); - + "\"%s\".", + child->key); + if (status != 0) break; } @@ -513,7 +711,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ amqp1_config_transport_free(transport); } return status; -} /* }}} int amqp1_config_transport */ +} /* }}} int amqp1_config_transport */ static int amqp1_config(oconfig_item_t *ci) /* {{{ */ { @@ -524,7 +722,7 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */ if (strcasecmp("Transport", child->key) == 0) amqp1_config_transport(child); else - WARNING("amqp1 plugin: Ignoring unknown config iption \%s\".", + WARNING("amqp1 plugin: Ignoring unknown config option \"%s\".", child->key); } @@ -533,10 +731,6 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */ static int amqp1_init(void) /* {{{ */ { - char addr[PN_MAX_ADDR]; - int status; - char errbuf[1024]; - if (transport == NULL) { ERROR("amqp1: init failed, no transport configured"); return -1; @@ -544,64 +738,42 @@ static int amqp1_init(void) /* {{{ */ if (proactor == NULL) { pthread_mutex_init(&send_lock, /* attr = */ NULL); - proactor = pn_proactor(); - pn_proactor_addr(addr, sizeof(addr),transport->host,transport->port); - conn = pn_connection(); - if (transport->user != NULL) { - pn_connection_set_user(conn, transport->user); - pn_connection_set_password(conn, transport->password); - } - pn_proactor_connect(proactor, conn, addr); /* start_thread */ - status = plugin_thread_create(&event_thread_id, NULL /* no attributes */, - event_thread, NULL /* no argument */, - "handle"); + int status = + plugin_thread_create(&event_thread_id, NULL /* no attributes */, + event_thread, NULL /* no argument */, "handle"); if (status != 0) { - ERROR("amqp1: pthread_create failed: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("amqp1 plugin: pthread_create failed: %s", STRERRNO); } else { - event_thread_running = 1; + event_thread_running = true; } } return 0; } /* }}} int amqp1_init */ -static int amqp1_shutdown -(void) /* {{{ */ +static int amqp1_shutdown(void) /* {{{ */ { - cd_message_t *cdm; + stopping = true; /* Stop the proactor thread */ - if (event_thread_running != 0) { - finished=true; - /* activate the event thread */ + if (event_thread_running) { + DEBUG("amqp1 plugin: Shutting down proactor thread."); pn_connection_wake(conn); - pthread_join(event_thread_id, NULL /* no return value */); - memset(&event_thread_id, 0, sizeof(event_thread_id)); } + pthread_join(event_thread_id, NULL /* no return value */); + memset(&event_thread_id, 0, sizeof(event_thread_id)); - /* Free the remaining out_messages */ - cdm = DEQ_HEAD(out_messages); - while (cdm) { - DEQ_REMOVE_HEAD(out_messages); - cd_message_free(cdm); - cdm = DEQ_HEAD(out_messages); - } + DEBUG("amqp1 plugin: proactor thread exited."); - if (proactor != NULL) { - pn_proactor_free(proactor); - } - - if (transport != NULL) { + if (transport) { amqp1_config_transport_free(transport); } return 0; } /* }}} int amqp1_shutdown */ -void module_register(void) -{ +void module_register(void) { plugin_register_complex_config("amqp1", amqp1_config); plugin_register_init("amqp1", amqp1_init); - plugin_register_shutdown("amqp1",amqp1_shutdown); + plugin_register_shutdown("amqp1", amqp1_shutdown); } /* void module_register */