X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp.c;h=f0abd44b461a38c5c7fade0b1e96b92f11897ec7;hb=144b79a116c4d71e6172caad176db6f014c9b7fd;hp=1924ce735416b8890b57c17f6c63f2ab456ef46c;hpb=111f69addbf605ff94a4e3bf6c63163e8736c9a4;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index 1924ce73..f0abd44b 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -26,17 +26,14 @@ * Florian Forster **/ -#include -#include -#include -#include -#include - #include "collectd.h" #include "common.h" #include "plugin.h" +#include "utils_cmd_putval.h" #include "utils_format_json.h" +#include + #include #include @@ -45,6 +42,9 @@ #define CAMQP_DM_VOLATILE 1 #define CAMQP_DM_PERSISTENT 2 +#define CAMQP_FORMAT_COMMAND 1 +#define CAMQP_FORMAT_JSON 2 + #define CAMQP_CHANNEL 1 /* @@ -62,12 +62,16 @@ struct camqp_config_s char *password; char *exchange; - char *exchange_type; - char *queue; - char *routingkey; - uint8_t delivery_mode; + char *routing_key; + /* publish only */ + uint8_t delivery_mode; _Bool store_rates; + int format; + + /* subscribe only */ + char *exchange_type; + char *queue; amqp_connection_state_t connection; pthread_mutex_t lock; @@ -82,7 +86,6 @@ static const char *def_vhost = "/"; static const char *def_user = "guest"; static const char *def_password = "guest"; static const char *def_exchange = "amq.fanout"; -static const char *def_routingkey = "collectd"; static pthread_t *subscriber_threads = NULL; static size_t subscriber_threads_num = 0; @@ -125,7 +128,7 @@ static void camqp_config_free (void *ptr) /* {{{ */ sfree (conf->exchange); sfree (conf->exchange_type); sfree (conf->queue); - sfree (conf->routingkey); + sfree (conf->routing_key); sfree (conf); } /* }}} void camqp_config_free */ @@ -213,6 +216,37 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ return (buffer); } /* }}} char *camqp_strerror */ +static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ +{ + amqp_exchange_declare_ok_t *ed_ret; + + if (conf->exchange_type == NULL) + return (0); + + ed_ret = amqp_exchange_declare (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* exchange = */ amqp_cstring_bytes (conf->exchange), + /* type = */ amqp_cstring_bytes (conf->exchange_type), + /* passive = */ 0, + /* durable = */ 0, + /* auto_delete = */ 1, + /* arguments = */ AMQP_EMPTY_TABLE); + if ((ed_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_exchange_declare failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + + INFO ("amqp plugin: Successfully created exchange \"%s\" " + "with type \"%s\".", + conf->exchange, conf->exchange_type); + + return (0); +} /* }}} int camqp_create_exchange */ + static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ { amqp_queue_declare_ok_t *qd_ret; @@ -254,42 +288,14 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ { amqp_queue_bind_ok_t *qb_ret; - /* create the exchange */ - if (conf->exchange_type != NULL) - { - amqp_exchange_declare_ok_t *ed_ret; - - ed_ret = amqp_exchange_declare (conf->connection, - /* channel = */ CAMQP_CHANNEL, - /* exchange = */ amqp_cstring_bytes (conf->exchange), - /* type = */ amqp_cstring_bytes (conf->exchange_type), - /* passive = */ 0, - /* durable = */ 0, - /* auto_delete = */ 1, - /* arguments = */ AMQP_EMPTY_TABLE); - if ((ed_ret == NULL) && camqp_is_error (conf)) - { - char errbuf[1024]; - ERROR ("amqp plugin: amqp_exchange_declare failed: %s", - camqp_strerror (conf, errbuf, sizeof (errbuf))); - camqp_close_connection (conf); - return (-1); - } - } - - DEBUG ("amqp plugin: queue = %s; exchange = %s; routing_key = %s;", - conf->queue, conf->exchange, CONF (conf, routingkey)); - assert (conf->queue != NULL); qb_ret = amqp_queue_bind (conf->connection, /* channel = */ CAMQP_CHANNEL, /* queue = */ amqp_cstring_bytes (conf->queue), /* exchange = */ amqp_cstring_bytes (conf->exchange), -#if 1 - /* routing_key = */ amqp_cstring_bytes (CONF (conf, routingkey)), -#else - /* routing_key = */ AMQP_EMPTY_BYTES, -#endif + /* routing_key = */ (conf->routing_key != NULL) + ? amqp_cstring_bytes (conf->routing_key) + : AMQP_EMPTY_BYTES, /* arguments = */ AMQP_EMPTY_TABLE); if ((qb_ret == NULL) && camqp_is_error (conf)) { @@ -299,6 +305,9 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ camqp_close_connection (conf); return (-1); } + + DEBUG ("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".", + conf->queue, conf->exchange); } /* if (conf->exchange != NULL) */ cm_ret = amqp_basic_consume (conf->connection, @@ -381,13 +390,45 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" " "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port); + status = camqp_create_exchange (conf); + if (status != 0) + return (status); + if (!conf->publish) return (camqp_setup_queue (conf)); return (0); } /* }}} int camqp_connect */ +static int camqp_shutdown (void) /* {{{ */ +{ + size_t i; + + DEBUG ("amqp plugin: Shutting down %zu subscriber threads.", + subscriber_threads_num); + + subscriber_threads_running = 0; + for (i = 0; i < subscriber_threads_num; i++) + { + /* FIXME: Sending a signal is not very elegant here. Maybe find out how + * to use a timeout in the thread and check for the variable in regular + * intervals. */ + pthread_kill (subscriber_threads[i], SIGTERM); + pthread_join (subscriber_threads[i], /* retval = */ NULL); + } + + subscriber_threads_num = 0; + sfree (subscriber_threads); + + DEBUG ("amqp plugin: All subscriber threads exited."); + + return (0); +} /* }}} int camqp_shutdown */ + +/* + * Subscribing code + */ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ - size_t body_size) + size_t body_size, const char *content_type) { char body[body_size + 1]; char *body_ptr; @@ -431,8 +472,28 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ received += frame.payload.body_fragment.len; } /* while (received < body_size) */ - DEBUG ("amqp plugin: camqp_read_body: body = %s", body); + if (strcasecmp ("text/collectd", content_type) == 0) + { + status = handle_putval (stderr, body); + if (status != 0) + ERROR ("amqp plugin: handle_putval failed with status %i.", + status); + return (status); + } + else if (strcasecmp ("application/json", content_type) == 0) + { + ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not " + "been implemented yet. FIXME!"); + return (0); + } + else + { + ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".", + content_type); + return (EINVAL); + } + /* not reached */ return (0); } /* }}} int camqp_read_body */ @@ -440,6 +501,8 @@ static int camqp_read_header (camqp_config_t *conf) /* {{{ */ { int status; amqp_frame_t frame; + amqp_basic_properties_t *properties; + char *content_type; status = amqp_simple_wait_frame (conf->connection, &frame); if (status < 0) @@ -459,7 +522,20 @@ static int camqp_read_header (camqp_config_t *conf) /* {{{ */ return (-1); } - return (camqp_read_body (conf, frame.payload.properties.body_size)); + properties = frame.payload.properties.decoded; + content_type = camqp_bytes_cstring (&properties->content_type); + if (content_type == NULL) + { + ERROR ("amqp plugin: Unable to determine content type."); + return (-1); + } + + status = camqp_read_body (conf, + (size_t) frame.payload.properties.body_size, + content_type); + + sfree (content_type); + return (status); } /* }}} int camqp_read_header */ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ @@ -546,8 +622,12 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ return (0); } /* }}} int camqp_subscribe_init */ +/* + * Publishing code + */ +/* XXX: You must hold "conf->lock" when calling this function! */ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ - const char *buffer) + const char *buffer, const char *routing_key) { amqp_basic_properties_t props; int status; @@ -560,14 +640,19 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_APP_ID_FLAG; - props.content_type = amqp_cstring_bytes("application/json"); + if (conf->format == CAMQP_FORMAT_COMMAND) + props.content_type = amqp_cstring_bytes("text/collectd"); + else if (conf->format == CAMQP_FORMAT_JSON) + props.content_type = amqp_cstring_bytes("application/json"); + else + assert (23 == 42); props.delivery_mode = conf->delivery_mode; props.app_id = amqp_cstring_bytes("collectd"); status = amqp_basic_publish(conf->connection, /* channel = */ 1, amqp_cstring_bytes(CONF(conf, exchange)), - amqp_cstring_bytes(CONF(conf, routingkey)), + amqp_cstring_bytes (routing_key), /* mandatory = */ 0, /* immediate = */ 0, &props, @@ -586,29 +671,100 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ user_data_t *user_data) { camqp_config_t *conf = user_data->data; + char routing_key[6 * DATA_MAX_NAME_LEN]; char buffer[4096]; - size_t bfree; - size_t bfill; int status; if ((ds == NULL) || (vl == NULL) || (conf == NULL)) return (EINVAL); memset (buffer, 0, sizeof (buffer)); - bfree = sizeof (buffer); - bfill = 0; - format_json_initialize (buffer, &bfill, &bfree); - format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); - format_json_finalize (buffer, &bfill, &bfree); + if (conf->routing_key != NULL) + { + sstrncpy (routing_key, conf->routing_key, sizeof (routing_key)); + } + else + { + size_t i; + ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s", + vl->host, + vl->plugin, vl->plugin_instance, + vl->type, vl->type_instance); + + /* Switch slashes (the only character forbidden by collectd) and dots + * (the separation character used by AMQP). */ + for (i = 0; routing_key[i] != 0; i++) + { + if (routing_key[i] == '.') + routing_key[i] = '/'; + else if (routing_key[i] == '/') + routing_key[i] = '.'; + } + } + + if (conf->format == CAMQP_FORMAT_COMMAND) + { + status = create_putval (buffer, sizeof (buffer), ds, vl); + if (status != 0) + { + ERROR ("amqp plugin: create_putval failed with status %i.", + status); + return (status); + } + } + else if (conf->format == CAMQP_FORMAT_JSON) + { + size_t bfree = sizeof (buffer); + size_t bfill = 0; + + format_json_initialize (buffer, &bfill, &bfree); + format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); + format_json_finalize (buffer, &bfill, &bfree); + } + else + { + ERROR ("amqp plugin: Invalid format (%i).", conf->format); + return (-1); + } pthread_mutex_lock (&conf->lock); - status = camqp_write_locked (conf, buffer); + status = camqp_write_locked (conf, buffer, routing_key); pthread_mutex_unlock (&conf->lock); return (status); } /* }}} int camqp_write */ +/* + * Config handling + */ +static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */ + camqp_config_t *conf) +{ + char *string; + int status; + + string = NULL; + status = cf_util_get_string (ci, &string); + if (status != 0) + return (status); + + assert (string != NULL); + if (strcasecmp ("Command", string) == 0) + conf->format = CAMQP_FORMAT_COMMAND; + else if (strcasecmp ("JSON", string) == 0) + conf->format = CAMQP_FORMAT_JSON; + else + { + WARNING ("amqp plugin: Invalid format string: %s", + string); + } + + free (string); + + return (0); +} /* }}} int config_set_string */ + static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ _Bool publish) { @@ -627,17 +783,21 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ memset (conf, 0, sizeof (*conf)); conf->publish = publish; conf->name = NULL; + conf->format = CAMQP_FORMAT_COMMAND; conf->host = NULL; conf->port = 5672; conf->vhost = NULL; conf->user = NULL; conf->password = NULL; conf->exchange = NULL; - conf->exchange_type = NULL; - conf->queue = NULL; - conf->routingkey = NULL; + conf->routing_key = NULL; + /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; + /* subscribe only */ + conf->exchange_type = NULL; + conf->queue = NULL; + /* general */ conf->connection = NULL; pthread_mutex_init (&conf->lock, /* attr = */ NULL); /* }}} */ @@ -677,8 +837,8 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ else if ((strcasecmp ("Queue", child->key) == 0) && !publish) status = cf_util_get_string (child, &conf->queue); else if (strcasecmp ("RoutingKey", child->key) == 0) - status = cf_util_get_string (child, &conf->routingkey); - else if (strcasecmp ("Persistent", child->key) == 0) + status = cf_util_get_string (child, &conf->routing_key); + else if ((strcasecmp ("Persistent", child->key) == 0) && publish) { _Bool tmp = 0; status = cf_util_get_boolean (child, &tmp); @@ -687,8 +847,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ else conf->delivery_mode = CAMQP_DM_VOLATILE; } - else if (strcasecmp ("StoreRates", child->key) == 0) + else if ((strcasecmp ("StoreRates", child->key) == 0) && publish) status = cf_util_get_boolean (child, &conf->store_rates); + else if ((strcasecmp ("Format", child->key) == 0) && publish) + status = camqp_config_set_format (child, conf); else WARNING ("amqp plugin: Ignoring unknown " "configuration option \"%s\".", child->key); @@ -697,15 +859,16 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ break; } /* for (i = 0; i < ci->children_num; i++) */ - if ((status == 0) && !publish && (conf->exchange == NULL)) + if ((status == 0) && (conf->exchange == NULL)) { - if (conf->routingkey != NULL) - WARNING ("amqp plugin: The option \"RoutingKey\" was given " - "without the \"Exchange\" option. It will be ignored."); - if (conf->exchange_type != NULL) WARNING ("amqp plugin: The option \"ExchangeType\" was given " "without the \"Exchange\" option. It will be ignored."); + + if (!publish && (conf->routing_key != NULL)) + WARNING ("amqp plugin: The option \"RoutingKey\" was given " + "without the \"Exchange\" option. It will be ignored."); + } if (status != 0) @@ -767,35 +930,10 @@ static int camqp_config (oconfig_item_t *ci) /* {{{ */ return (0); } /* }}} int camqp_config */ -static int shutdown (void) /* {{{ */ -{ - size_t i; - - DEBUG ("amqp plugin: Shutting down %zu subscriber threads.", - subscriber_threads_num); - - subscriber_threads_running = 0; - for (i = 0; i < subscriber_threads_num; i++) - { - /* FIXME: Sending a signal is not very elegant here. Maybe find out how - * to use a timeout in the thread and check for the variable in regular - * intervals. */ - pthread_kill (subscriber_threads[i], SIGTERM); - pthread_join (subscriber_threads[i], /* retval = */ NULL); - } - - subscriber_threads_num = 0; - sfree (subscriber_threads); - - DEBUG ("amqp plugin: All subscriber threads exited."); - - return (0); -} /* }}} int shutdown */ - void module_register (void) { plugin_register_complex_config ("amqp", camqp_config); - plugin_register_shutdown ("amqp", shutdown); + plugin_register_shutdown ("amqp", camqp_shutdown); } /* void module_register */ /* vim: set sw=4 sts=4 et fdm=marker : */