X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp.c;h=b237ba3dedcc1d451e81086e242bda88a038ec2b;hb=0a8741b9061f8df4a78a448c021612db06e17425;hp=99dca9014ac628254481ca9dc451eedc21ae564d;hpb=21ab7512825cf8177d5eee5101344b45d0854610;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index 99dca901..b237ba3d 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -27,6 +27,7 @@ **/ #include "collectd.h" + #include "common.h" #include "plugin.h" #include "utils_cmd_putval.h" @@ -527,13 +528,11 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ static int camqp_shutdown (void) /* {{{ */ { - size_t i; - DEBUG ("amqp plugin: Shutting down %zu subscriber threads.", subscriber_threads_num); subscriber_threads_running = 0; - for (i = 0; i < subscriber_threads_num; i++) + for (size_t i = 0; i < subscriber_threads_num; i++) { /* FIXME: Sending a signal is not very elegant here. Maybe find out how * to use a timeout in the thread and check for the variable in regular @@ -600,9 +599,9 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ if (strcasecmp ("text/collectd", content_type) == 0) { - status = handle_putval (stderr, body); + status = cmd_handle_putval (stderr, body); if (status != 0) - ERROR ("amqp plugin: handle_putval failed with status %i.", + ERROR ("amqp plugin: cmd_handle_putval failed with status %i.", status); return (status); } @@ -678,25 +677,21 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ status = camqp_connect (conf); if (status != 0) { - struct timespec ts_interval; ERROR ("amqp plugin: camqp_connect failed. " "Will sleep for %.3f seconds.", CDTIME_T_TO_DOUBLE (interval)); - CDTIME_T_TO_TIMESPEC (interval, &ts_interval); - nanosleep (&ts_interval, /* remaining = */ NULL); + nanosleep (&CDTIME_T_TO_TIMESPEC (interval), /* remaining = */ NULL); continue; } status = amqp_simple_wait_frame (conf->connection, &frame); if (status < 0) { - struct timespec ts_interval; ERROR ("amqp plugin: amqp_simple_wait_frame failed. " "Will sleep for %.3f seconds.", CDTIME_T_TO_DOUBLE (interval)); camqp_close_connection (conf); - CDTIME_T_TO_TIMESPEC (interval, &ts_interval); - nanosleep (&ts_interval, /* remaining = */ NULL); + nanosleep (&CDTIME_T_TO_TIMESPEC (interval), /* remaining = */ NULL); continue; } @@ -742,7 +737,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ memset (tmp, 0, sizeof (*tmp)); status = plugin_thread_create (tmp, /* attr = */ NULL, - camqp_subscribe_thread, conf); + camqp_subscribe_thread, conf, "amqp subscribe"); if (status != 0) { char errbuf[1024]; @@ -763,17 +758,20 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ const char *buffer, const char *routing_key) { - amqp_basic_properties_t props; int status; status = camqp_connect (conf); if (status != 0) return (status); - memset (&props, 0, sizeof (props)); - props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG - | AMQP_BASIC_DELIVERY_MODE_FLAG - | AMQP_BASIC_APP_ID_FLAG; + amqp_basic_properties_t props = { + ._flags = AMQP_BASIC_CONTENT_TYPE_FLAG + | AMQP_BASIC_DELIVERY_MODE_FLAG + | AMQP_BASIC_APP_ID_FLAG, + .delivery_mode = conf->delivery_mode, + .app_id = amqp_cstring_bytes("collectd") + }; + if (conf->format == CAMQP_FORMAT_COMMAND) props.content_type = amqp_cstring_bytes("text/collectd"); else if (conf->format == CAMQP_FORMAT_JSON) @@ -782,8 +780,6 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ props.content_type = amqp_cstring_bytes("text/graphite"); else assert (23 == 42); - props.delivery_mode = conf->delivery_mode; - props.app_id = amqp_cstring_bytes("collectd"); status = amqp_basic_publish(conf->connection, /* channel = */ 1, @@ -814,15 +810,12 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ if ((ds == NULL) || (vl == NULL) || (conf == NULL)) return (EINVAL); - memset (buffer, 0, sizeof (buffer)); - if (conf->routing_key != NULL) { sstrncpy (routing_key, conf->routing_key, sizeof (routing_key)); } else { - size_t i; ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s", vl->host, vl->plugin, vl->plugin_instance, @@ -830,7 +823,7 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ /* Switch slashes (the only character forbidden by collectd) and dots * (the separation character used by AMQP). */ - for (i = 0; routing_key[i] != 0; i++) + for (size_t i = 0; routing_key[i] != 0; i++) { if (routing_key[i] == '.') routing_key[i] = '/'; @@ -841,10 +834,10 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ if (conf->format == CAMQP_FORMAT_COMMAND) { - status = create_putval (buffer, sizeof (buffer), ds, vl); + status = cmd_create_putval (buffer, sizeof (buffer), ds, vl); if (status != 0) { - ERROR ("amqp plugin: create_putval failed with status %i.", + ERROR ("amqp plugin: cmd_create_putval failed with status %i.", status); return (status); } @@ -920,7 +913,6 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ { camqp_config_t *conf; int status; - int i; conf = calloc (1, sizeof (*conf)); if (conf == NULL) @@ -967,7 +959,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ return (status); } - for (i = 0; i < ci->children_num; i++) + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; @@ -1023,6 +1015,9 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ else if ((strcasecmp ("GraphiteAlwaysAppendDS", child->key) == 0) && publish) status = cf_util_get_flag (child, &conf->graphite_flags, GRAPHITE_ALWAYS_APPEND_DS); + else if ((strcasecmp ("GraphitePreserveSeparator", child->key) == 0) && publish) + status = cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_PRESERVE_SEPARATOR); else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish) status = cf_util_get_string (child, &conf->prefix); else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish) @@ -1074,11 +1069,13 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ if (publish) { char cbname[128]; - user_data_t ud = { conf, camqp_config_free }; - ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name); - status = plugin_register_write (cbname, camqp_write, &ud); + status = plugin_register_write (cbname, camqp_write, + &(user_data_t) { + .data = conf, + .free_func = camqp_config_free, + }); if (status != 0) { camqp_config_free (conf); @@ -1100,9 +1097,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ static int camqp_config (oconfig_item_t *ci) /* {{{ */ { - int i; - - for (i = 0; i < ci->children_num; i++) + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i;