X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp1.c;h=e60142b5e9c5afd04cde696a06dce1279408e901;hb=a74efda2fb10bdfb4af9e9e3e7a4b11dfddb6452;hp=abf4e476ec2c12fc98df83e9dbba5d2f1dafdb53;hpb=0a9dc5f8443de4fa54153ad4517e764b43f86b5e;p=collectd.git diff --git a/src/amqp1.c b/src/amqp1.c index abf4e476..e60142b5 100644 --- a/src/amqp1.c +++ b/src/amqp1.c @@ -101,7 +101,7 @@ static uint64_t cd_tag = 1; static uint64_t acknowledged; static amqp1_config_transport_t *transport; static bool stopping; -static int event_thread_running; +static bool event_thread_running; static pthread_t event_thread_id; /* @@ -299,7 +299,7 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ cdm = DEQ_HEAD(out_messages); } - event_thread_running = 0; + event_thread_running = false; return NULL; } /* }}} void event_thread */ @@ -307,27 +307,23 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ static int encqueue(cd_message_t *cdm, amqp1_config_instance_t *instance) /* {{{ */ { - size_t bufsize = BUFSIZE; - pn_data_t *body; - pn_message_t *message; - int status = 0; - /* encode message */ - message = pn_message(); + pn_message_t *message = pn_message(); pn_message_set_address(message, instance->send_to); - body = pn_message_body(message); + pn_data_t *body = pn_message_body(message); pn_data_clear(body); pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start)); pn_data_exit(body); /* put_binary copies and stores so ok to use mbuf */ - cdm->mbuf.size = bufsize; - while ((status = pn_message_encode(message, (char *)cdm->mbuf.start, + cdm->mbuf.size = BUFSIZE; + + int status; + while ((status = pn_message_encode(message, cdm->mbuf.start, &cdm->mbuf.size)) == PN_OVERFLOW) { - DEBUG("amqp1 plugin: increasing message buffer size %i", - (int)cdm->mbuf.size); + DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size); cdm->mbuf.size *= 2; - cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size); + cdm->mbuf.start = realloc(cdm->mbuf.start, cdm->mbuf.size); } if (status != 0) { @@ -345,7 +341,7 @@ static int encqueue(cd_message_t *cdm, pn_message_free(message); /* activate the sender */ - if (conn != NULL) { + if (conn) { pn_connection_wake(conn); } @@ -355,31 +351,28 @@ static int encqueue(cd_message_t *cdm, static int amqp1_notify(notification_t const *n, user_data_t *user_data) /* {{{ */ { - amqp1_config_instance_t *instance; - int status = 0; size_t bfree = BUFSIZE; size_t bfill = 0; - cd_message_t *cdm; size_t bufsize = BUFSIZE; - if ((n == NULL) || (user_data == NULL)) + if (n == NULL || user_data == NULL) return EINVAL; - instance = user_data->data; + amqp1_config_instance_t *instance = user_data->data; if (instance->notify != true) { ERROR("amqp1 plugin: write notification failed"); } - cdm = (cd_message_t *)malloc(sizeof(cd_message_t)); + cd_message_t *cdm = malloc(sizeof(*cdm)); DEQ_ITEM_INIT(cdm); - cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize)); + cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize)); cdm->instance = instance; switch (instance->format) { case AMQP1_FORMAT_JSON: - format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree); - status = format_json_notification((char *)cdm->mbuf.start, bufsize, n); + format_json_initialize(cdm->mbuf.start, &bfill, &bfree); + int status = format_json_notification(cdm->mbuf.start, bufsize, n); if (status != 0) { ERROR("amqp1 plugin: formatting notification failed"); return status; @@ -392,33 +385,29 @@ static int amqp1_notify(notification_t const *n, } /* encode message and place on outbound queue */ - status = encqueue(cdm, instance); + return encqueue(cdm, instance); - return status; } /* }}} int amqp1_notify */ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ user_data_t *user_data) { - amqp1_config_instance_t *instance; int status = 0; size_t bfree = BUFSIZE; size_t bfill = 0; - cd_message_t *cdm; size_t bufsize = BUFSIZE; - if ((ds == NULL) || (vl == NULL) || (transport == NULL) || - (user_data == NULL)) + if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL) return EINVAL; - instance = user_data->data; + amqp1_config_instance_t *instance = user_data->data; if (instance->notify != false) { ERROR("amqp1 plugin: write failed"); } - cdm = (cd_message_t *)malloc(sizeof(cd_message_t)); + cd_message_t *cdm = malloc(sizeof(*cdm)); DEQ_ITEM_INIT(cdm); - cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize)); + cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize)); cdm->instance = instance; switch (instance->format) { @@ -452,10 +441,9 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ return -1; } - /* encode message and place on outboud queue */ - encqueue(cdm, instance); + /* encode message and place on outbound queue */ + return encqueue(cdm, instance); - return 0; } /* }}} int amqp1_write */ static void amqp1_config_transport_free(void *ptr) /* {{{ */ @@ -490,17 +478,13 @@ static void amqp1_config_instance_free(void *ptr) /* {{{ */ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ { - int status = 0; - char *key = NULL; - amqp1_config_instance_t *instance; - - instance = calloc(1, sizeof(*instance)); + amqp1_config_instance_t *instance = calloc(1, sizeof(*instance)); if (instance == NULL) { ERROR("amqp1 plugin: calloc failed."); return ENOMEM; } - status = cf_util_get_string(ci, &instance->name); + int status = cf_util_get_string(ci, &instance->name); if (status != 0) { sfree(instance); return status; @@ -514,11 +498,10 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ else if (strcasecmp("Notify", child->key) == 0) status = cf_util_get_boolean(child, &instance->notify); else if (strcasecmp("Format", child->key) == 0) { + char *key = NULL; status = cf_util_get_string(child, &key); if (status != 0) return status; - /* TODO: goto errout */ - // goto errout; assert(key != NULL); if (strcasecmp(key, "Command") == 0) { instance->format = AMQP1_FORMAT_COMMAND; @@ -580,7 +563,7 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ ERROR("amqp1 plugin: send_to address would have been truncated."); return -1; } - if (instance->notify == true) { + if (instance->notify) { status = plugin_register_notification( tpname, amqp1_notify, &(user_data_t){ @@ -604,8 +587,6 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ { - int status = 0; - transport = calloc(1, sizeof(*transport)); if (transport == NULL) { ERROR("amqp1 plugin: calloc failed."); @@ -615,7 +596,7 @@ static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ /* Initialize transport configuration {{{ */ transport->retry_delay = 1; - status = cf_util_get_string(ci, &transport->name); + int status = cf_util_get_string(ci, &transport->name); if (status != 0) { sfree(transport); return status; @@ -672,9 +653,6 @@ static int amqp1_config(oconfig_item_t *ci) /* {{{ */ static int amqp1_init(void) /* {{{ */ { - int status; - char errbuf[1024]; - if (transport == NULL) { ERROR("amqp1: init failed, no transport configured"); return -1; @@ -683,14 +661,13 @@ static int amqp1_init(void) /* {{{ */ if (proactor == NULL) { pthread_mutex_init(&send_lock, /* attr = */ NULL); /* start_thread */ - status = + int status = plugin_thread_create(&event_thread_id, NULL /* no attributes */, event_thread, NULL /* no argument */, "handle"); if (status != 0) { - ERROR("amqp1 plugin: pthread_create failed: %s", - sstrerror(errno, errbuf, sizeof(errbuf))); + ERROR("amqp1 plugin: pthread_create failed: %s", STRERRNO); } else { - event_thread_running = 1; + event_thread_running = true; } } return 0; @@ -701,7 +678,7 @@ static int amqp1_shutdown(void) /* {{{ */ stopping = true; /* Stop the proactor thread */ - if (event_thread_running == 1) { + if (event_thread_running) { DEBUG("amqp1 plugin: Shutting down proactor thread."); pn_connection_wake(conn); } @@ -710,7 +687,7 @@ static int amqp1_shutdown(void) /* {{{ */ DEBUG("amqp1 plugin: proactor thread exited."); - if (transport != NULL) { + if (transport) { amqp1_config_transport_free(transport); }