X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp1.c;h=4ba73596c1140f45a02d107dc0aab03e7406ff3f;hb=d4cc32c4dddb01081c49a67d13ab4a737cda0ed0;hp=d7be877b734cfad541c70c6e44ef2141279cc658;hpb=e6df14cd399c5990bdd9cfa78ebcce5288d1be10;p=collectd.git diff --git a/src/amqp1.c b/src/amqp1.c index d7be877b..4ba73596 100644 --- a/src/amqp1.c +++ b/src/amqp1.c @@ -54,35 +54,36 @@ #define AMQP1_FORMAT_COMMAND 1 #define AMQP1_FORMAT_GRAPHITE 2 -typedef struct amqp1_config_transport_t { - DEQ_LINKS(struct amqp1_config_transport_t); +typedef struct amqp1_config_transport_s { + DEQ_LINKS(struct amqp1_config_transport_s); char *name; char *host; char *port; char *user; char *password; char *address; + int retry_delay; } amqp1_config_transport_t; -typedef struct amqp1_config_instance_t { - DEQ_LINKS(struct amqp1_config_instance_t); +typedef struct amqp1_config_instance_s { + DEQ_LINKS(struct amqp1_config_instance_s); char *name; - _Bool notify; + bool notify; uint8_t format; unsigned int graphite_flags; - _Bool store_rates; + bool store_rates; char *prefix; char *postfix; char escape_char; - _Bool pre_settle; - char send_to[128]; + bool pre_settle; + char send_to[1024]; } amqp1_config_instance_t; DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t); -typedef struct cd_message_t { - DEQ_LINKS(struct cd_message_t); - pn_bytes_t mbuf; +typedef struct cd_message_s { + DEQ_LINKS(struct cd_message_s); + pn_rwbytes_t mbuf; amqp1_config_instance_t *instance; } cd_message_t; @@ -91,17 +92,15 @@ DEQ_DECLARE(cd_message_t, cd_message_list_t); /* * Globals */ -pn_connection_t *conn = NULL; -pn_session_t *ssn = NULL; -pn_link_t *sender = NULL; -pn_proactor_t *proactor = NULL; -pthread_mutex_t send_lock; -cd_message_list_t out_messages; -uint64_t cd_tag = 1; -uint64_t acknowledged = 0; -amqp1_config_transport_t *transport = NULL; -bool finished = false; - +static pn_connection_t *conn = NULL; +static pn_link_t *sender = NULL; +static pn_proactor_t *proactor = NULL; +static pthread_mutex_t send_lock; +static cd_message_list_t out_messages; +static uint64_t cd_tag = 1; +static uint64_t acknowledged = 0; +static amqp1_config_transport_t *transport = NULL; +static bool stopping = false; static int event_thread_running = 0; static pthread_t event_thread_id; @@ -109,9 +108,7 @@ static pthread_t event_thread_id; * Functions */ static void cd_message_free(cd_message_t *cdm) { - if (cdm->mbuf.start) { - free((void *)cdm->mbuf.start); - } + free(cdm->mbuf.start); free(cdm); } /* }}} void cd_message_free */ @@ -124,6 +121,10 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ int event_count = 0; pn_delivery_t *dlv; + if (stopping) { + return 0; + } + DEQ_INIT(to_send); pthread_mutex_lock(&send_lock); @@ -179,9 +180,9 @@ static bool handle(pn_event_t *event) /* {{{ */ case PN_CONNECTION_INIT: { conn = pn_event_connection(event); - pn_connection_set_container(conn, transport->address); + pn_connection_set_container(conn, transport->name); pn_connection_open(conn); - ssn = pn_session(conn); + pn_session_t *ssn = pn_session(conn); pn_session_open(ssn); sender = pn_sender(ssn, "cd-sender"); pn_link_set_snd_settle_mode(sender, PN_SND_MIXED); @@ -206,7 +207,7 @@ static bool handle(pn_event_t *event) /* {{{ */ } case PN_CONNECTION_WAKE: { - if (!finished) { + if (!stopping) { amqp1_send_out_messages(sender); } break; @@ -250,41 +251,92 @@ static bool handle(pn_event_t *event) /* {{{ */ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ { + char addr[PN_MAX_ADDR]; + cd_message_t *cdm; - do { - pn_event_batch_t *events = pn_proactor_wait(proactor); - pn_event_t *e; - for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { - if (!handle(e)) { - finished = true; + /* setup proactor */ + proactor = pn_proactor(); + pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port); + + while (!stopping) { + /* make connection */ + conn = pn_connection(); + if (transport->user != NULL) { + pn_connection_set_user(conn, transport->user); + pn_connection_set_password(conn, transport->password); + } + pn_proactor_connect(proactor, conn, addr); + + bool engine_running = true; + while (engine_running && !stopping) { + pn_event_batch_t *events = pn_proactor_wait(proactor); + pn_event_t *e; + while ((e = pn_event_batch_next(events))) { + engine_running = handle(e); + if (!engine_running) { + break; + } } + pn_proactor_done(proactor, events); + } + + pn_proactor_release_connection(conn); + + DEBUG("amqp1 plugin: retrying connection"); + int delay = transport->retry_delay; + while (delay-- > 0 && !stopping) { + sleep(1.0); } - pn_proactor_done(proactor, events); - } while (!finished); + } + + pn_proactor_disconnect(proactor, NULL); + + /* Free the remaining out_messages */ + cdm = DEQ_HEAD(out_messages); + while (cdm) { + DEQ_REMOVE_HEAD(out_messages); + cd_message_free(cdm); + cdm = DEQ_HEAD(out_messages); + } event_thread_running = 0; return NULL; } /* }}} void event_thread */ -static void encqueue(cd_message_t *cdm, - amqp1_config_instance_t *instance) /* {{{ */ +static int encqueue(cd_message_t *cdm, + amqp1_config_instance_t *instance) /* {{{ */ { size_t bufsize = BUFSIZE; pn_data_t *body; pn_message_t *message; + int status = 0; /* encode message */ message = pn_message(); pn_message_set_address(message, instance->send_to); body = pn_message_body(message); pn_data_clear(body); - pn_data_put_binary(body, cdm->mbuf); + pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start)); pn_data_exit(body); /* put_binary copies and stores so ok to use mbuf */ cdm->mbuf.size = bufsize; - pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size); + while ((status = pn_message_encode(message, (char *)cdm->mbuf.start, + &cdm->mbuf.size)) == PN_OVERFLOW) { + DEBUG("amqp1 plugin: increasing message buffer size %i", + (int)cdm->mbuf.size); + cdm->mbuf.size *= 2; + cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size); + } + + if (status != 0) { + ERROR("amqp1 plugin: error encoding message: %s", + pn_error_text(pn_message_error(message))); + pn_message_free(message); + cd_message_free(cdm); + return -1; + } pthread_mutex_lock(&send_lock); DEQ_INSERT_TAIL(out_messages, cdm); @@ -297,7 +349,8 @@ static void encqueue(cd_message_t *cdm, pn_connection_wake(conn); } -} /* }}} void encqueue */ + return 0; +} /* }}} int encqueue */ static int amqp1_notify(notification_t const *n, user_data_t *user_data) /* {{{ */ @@ -318,9 +371,9 @@ static int amqp1_notify(notification_t const *n, ERROR("amqp1 plugin: write notification failed"); } - cdm = NEW(cd_message_t); + cdm = (cd_message_t *)malloc(sizeof(cd_message_t)); DEQ_ITEM_INIT(cdm); - cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize)); + cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize)); cdm->instance = instance; switch (instance->format) { @@ -339,9 +392,9 @@ static int amqp1_notify(notification_t const *n, } /* encode message and place on outbound queue */ - encqueue(cdm, instance); + status = encqueue(cdm, instance); - return 0; + return status; } /* }}} int amqp1_notify */ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ @@ -363,9 +416,9 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ ERROR("amqp1 plugin: write failed"); } - cdm = NEW(cd_message_t); + cdm = (cd_message_t *)malloc(sizeof(cd_message_t)); DEQ_ITEM_INIT(cdm); - cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize)); + cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize)); cdm->instance = instance; switch (instance->format) { @@ -447,9 +500,6 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ return ENOMEM; } - /* Initialize instance configuration {{{ */ - instance->name = NULL; - status = cf_util_get_string(ci, &instance->name); if (status != 0) { sfree(instance); @@ -498,10 +548,12 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) { char *tmp_buff = NULL; status = cf_util_get_string(child, &tmp_buff); - if (strlen(tmp_buff) > 1) - WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles " - "only one character. Others will be ignored."); - instance->escape_char = tmp_buff[0]; + if (status == 0) { + if (strlen(tmp_buff) > 1) + WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles " + "only one character. Others will be ignored."); + instance->escape_char = tmp_buff[0]; + } sfree(tmp_buff); } else WARNING("amqp1 plugin: Ignoring unknown " @@ -516,10 +568,18 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ amqp1_config_instance_free(instance); return status; } else { - char tpname[128]; - snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); - snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", - transport->address, instance->name); + char tpname[DATA_MAX_NAME_LEN]; + status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); + if ((status < 0) || (size_t)status >= sizeof(tpname)) { + ERROR("amqp1 plugin: Instance name would have been truncated."); + return -1; + } + status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", + transport->address, instance->name); + if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) { + ERROR("amqp1 plugin: send_to address would have been truncated."); + return -1; + } if (instance->notify == true) { status = plugin_register_notification( tpname, amqp1_notify, @@ -553,7 +613,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ } /* Initialize transport configuration {{{ */ - transport->name = NULL; + transport->retry_delay = 1; status = cf_util_get_string(ci, &transport->name); if (status != 0) { @@ -574,6 +634,8 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ status = cf_util_get_string(child, &transport->password); else if (strcasecmp("Address", child->key) == 0) status = cf_util_get_string(child, &transport->address); + else if (strcasecmp("RetryDelay", child->key) == 0) + status = cf_util_get_int(child, &transport->retry_delay); else if (strcasecmp("Instance", child->key) == 0) amqp1_config_instance(child); else @@ -601,7 +663,7 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */ if (strcasecmp("Transport", child->key) == 0) amqp1_config_transport(child); else - WARNING("amqp1 plugin: Ignoring unknown config iption \%s\".", + WARNING("amqp1 plugin: Ignoring unknown config option \%s\".", child->key); } @@ -610,7 +672,6 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */ static int amqp1_init(void) /* {{{ */ { - char addr[PN_MAX_ADDR]; int status; char errbuf[1024]; @@ -621,20 +682,12 @@ static int amqp1_init(void) /* {{{ */ if (proactor == NULL) { pthread_mutex_init(&send_lock, /* attr = */ NULL); - proactor = pn_proactor(); - pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port); - conn = pn_connection(); - if (transport->user != NULL) { - pn_connection_set_user(conn, transport->user); - pn_connection_set_password(conn, transport->password); - } - pn_proactor_connect(proactor, conn, addr); /* start_thread */ status = plugin_thread_create(&event_thread_id, NULL /* no attributes */, event_thread, NULL /* no argument */, "handle"); if (status != 0) { - ERROR("amqp1: pthread_create failed: %s", + ERROR("amqp1 plugin: pthread_create failed: %s", sstrerror(errno, errbuf, sizeof(errbuf))); } else { event_thread_running = 1; @@ -645,28 +698,17 @@ static int amqp1_init(void) /* {{{ */ static int amqp1_shutdown(void) /* {{{ */ { - cd_message_t *cdm; + stopping = true; /* Stop the proactor thread */ - if (event_thread_running != 0) { - finished = true; - /* activate the event thread */ + if (event_thread_running == 1) { + DEBUG("amqp1 plugin: Shutting down proactor thread."); pn_connection_wake(conn); - pthread_join(event_thread_id, NULL /* no return value */); - memset(&event_thread_id, 0, sizeof(event_thread_id)); - } - - /* Free the remaining out_messages */ - cdm = DEQ_HEAD(out_messages); - while (cdm) { - DEQ_REMOVE_HEAD(out_messages); - cd_message_free(cdm); - cdm = DEQ_HEAD(out_messages); } + pthread_join(event_thread_id, NULL /* no return value */); + memset(&event_thread_id, 0, sizeof(event_thread_id)); - if (proactor != NULL) { - pn_proactor_free(proactor); - } + DEBUG("amqp1 plugin: proactor thread exited."); if (transport != NULL) { amqp1_config_transport_free(transport);