X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Famqp.c;h=bebaea7c8e7eb4bdbbcff93684321a851fc94cbd;hb=1a477ecb462094ad9e13320a9234716ead038b9c;hp=6be483e27f7da935c49f2ba52f067215c12d6bd3;hpb=b96f58e73819f3928919dfa2da63829fcae92f86;p=collectd.git diff --git a/src/amqp.c b/src/amqp.c index 6be483e2..bebaea7c 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -1,7 +1,7 @@ /** * collectd - src/amqp.c - * Copyright (C) 2009 Sebastien Pahl - * Copyright (C) 2010 Florian Forster + * Copyright (C) 2009 Sebastien Pahl + * Copyright (C) 2010-2012 Florian Forster * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), @@ -26,28 +26,38 @@ * Florian Forster **/ -#include -#include -#include -#include -#include - #include "collectd.h" #include "common.h" #include "plugin.h" #include "utils_cmd_putval.h" #include "utils_format_json.h" +#include "utils_format_graphite.h" + +#include #include #include +#ifdef HAVE_AMQP_TCP_SOCKET_H +# include +#endif +#ifdef HAVE_AMQP_SOCKET_H +# include +#endif +#if defined HAVE_DECL_AMQP_SOCKET_CLOSE && !HAVE_DECL_AMQP_SOCKET_CLOSE +/* rabbitmq-c does not currently ship amqp_socket.h + * and, thus, does not define this function. */ +int amqp_socket_close(amqp_socket_t *); +#endif + /* Defines for the delivery mode. I have no idea why they're not defined by the * library.. */ #define CAMQP_DM_VOLATILE 1 #define CAMQP_DM_PERSISTENT 2 -#define CAMQP_FORMAT_COMMAND 1 -#define CAMQP_FORMAT_JSON 2 +#define CAMQP_FORMAT_COMMAND 1 +#define CAMQP_FORMAT_JSON 2 +#define CAMQP_FORMAT_GRAPHITE 3 #define CAMQP_CHANNEL 1 @@ -58,7 +68,6 @@ struct camqp_config_s { _Bool publish; char *name; - int format; char *host; int port; @@ -72,6 +81,12 @@ struct camqp_config_s /* publish only */ uint8_t delivery_mode; _Bool store_rates; + int format; + /* publish & graphite format only */ + char *prefix; + char *postfix; + char escape_char; + unsigned int graphite_flags; /* subscribe only */ char *exchange_type; @@ -133,6 +148,9 @@ static void camqp_config_free (void *ptr) /* {{{ */ sfree (conf->exchange_type); sfree (conf->queue); sfree (conf->routing_key); + sfree (conf->prefix); + sfree (conf->postfix); + sfree (conf); } /* }}} void camqp_config_free */ @@ -182,8 +200,13 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: +#if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO if (r.library_errno) return (sstrerror (r.library_errno, buffer, buffer_size)); +#else + if (r.library_error) + return (sstrerror (r.library_error, buffer, buffer_size)); +#endif else sstrncpy (buffer, "End of stream", sizeof (buffer)); break; @@ -220,6 +243,78 @@ static char *camqp_strerror (camqp_config_t *conf, /* {{{ */ return (buffer); } /* }}} char *camqp_strerror */ +#if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO +static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ +{ + amqp_exchange_declare_ok_t *ed_ret; + + if (conf->exchange_type == NULL) + return (0); + + ed_ret = amqp_exchange_declare (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* exchange = */ amqp_cstring_bytes (conf->exchange), + /* type = */ amqp_cstring_bytes (conf->exchange_type), + /* passive = */ 0, + /* durable = */ 0, + /* auto_delete = */ 1, + /* arguments = */ AMQP_EMPTY_TABLE); + if ((ed_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_exchange_declare failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + + INFO ("amqp plugin: Successfully created exchange \"%s\" " + "with type \"%s\".", + conf->exchange, conf->exchange_type); + + return (0); +} /* }}} int camqp_create_exchange */ +#else +static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */ +{ + amqp_exchange_declare_ok_t *ed_ret; + amqp_table_t argument_table; + struct amqp_table_entry_t_ argument_table_entries[1]; + + if (conf->exchange_type == NULL) + return (0); + + /* Valid arguments: "auto_delete", "internal" */ + argument_table.num_entries = STATIC_ARRAY_SIZE (argument_table_entries); + argument_table.entries = argument_table_entries; + argument_table_entries[0].key = amqp_cstring_bytes ("auto_delete"); + argument_table_entries[0].value.kind = AMQP_FIELD_KIND_BOOLEAN; + argument_table_entries[0].value.value.boolean = 1; + + ed_ret = amqp_exchange_declare (conf->connection, + /* channel = */ CAMQP_CHANNEL, + /* exchange = */ amqp_cstring_bytes (conf->exchange), + /* type = */ amqp_cstring_bytes (conf->exchange_type), + /* passive = */ 0, + /* durable = */ 0, + /* arguments = */ argument_table); + if ((ed_ret == NULL) && camqp_is_error (conf)) + { + char errbuf[1024]; + ERROR ("amqp plugin: amqp_exchange_declare failed: %s", + camqp_strerror (conf, errbuf, sizeof (errbuf))); + camqp_close_connection (conf); + return (-1); + } + + INFO ("amqp plugin: Successfully created exchange \"%s\" " + "with type \"%s\".", + conf->exchange, conf->exchange_type); + + return (0); +} /* }}} int camqp_create_exchange */ +#endif + static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ { amqp_queue_declare_ok_t *qd_ret; @@ -261,29 +356,6 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ { amqp_queue_bind_ok_t *qb_ret; - /* create the exchange */ - if (conf->exchange_type != NULL) - { - amqp_exchange_declare_ok_t *ed_ret; - - ed_ret = amqp_exchange_declare (conf->connection, - /* channel = */ CAMQP_CHANNEL, - /* exchange = */ amqp_cstring_bytes (conf->exchange), - /* type = */ amqp_cstring_bytes (conf->exchange_type), - /* passive = */ 0, - /* durable = */ 0, - /* auto_delete = */ 1, - /* arguments = */ AMQP_EMPTY_TABLE); - if ((ed_ret == NULL) && camqp_is_error (conf)) - { - char errbuf[1024]; - ERROR ("amqp plugin: amqp_exchange_declare failed: %s", - camqp_strerror (conf, errbuf, sizeof (errbuf))); - camqp_close_connection (conf); - return (-1); - } - } - assert (conf->queue != NULL); qb_ret = amqp_queue_bind (conf->connection, /* channel = */ CAMQP_CHANNEL, @@ -312,7 +384,9 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ /* consumer_tag = */ AMQP_EMPTY_BYTES, /* no_local = */ 0, /* no_ack = */ 1, - /* exclusive = */ 0); + /* exclusive = */ 0, + /* arguments = */ AMQP_EMPTY_TABLE + ); if ((cm_ret == NULL) && camqp_is_error (conf)) { char errbuf[1024]; @@ -328,8 +402,12 @@ static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */ static int camqp_connect (camqp_config_t *conf) /* {{{ */ { amqp_rpc_reply_t reply; - int sockfd; int status; +#ifdef HAVE_AMQP_TCP_SOCKET + amqp_socket_t *socket; +#else + int sockfd; +#endif if (conf->connection != NULL) return (0); @@ -341,6 +419,34 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (ENOMEM); } +#ifdef HAVE_AMQP_TCP_SOCKET +# define CLOSE_SOCKET() amqp_socket_close (socket) + /* TODO: add support for SSL using amqp_ssl_socket_new + * and related functions */ + socket = amqp_tcp_socket_new (conf->connection); + if (! socket) + { + ERROR ("amqp plugin: amqp_tcp_socket_new failed."); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (ENOMEM); + } + + status = amqp_socket_open (socket, CONF(conf, host), conf->port); + if (status < 0) + { + char errbuf[1024]; + status *= -1; + ERROR ("amqp plugin: amqp_socket_open failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + CLOSE_SOCKET (); + amqp_destroy_connection (conf->connection); + conf->connection = NULL; + return (status); + } +#else /* HAVE_AMQP_TCP_SOCKET */ +# define CLOSE_SOCKET close(sockfd) + /* this interface is deprecated as of rabbitmq-c 0.4 */ sockfd = amqp_open_socket (CONF(conf, host), conf->port); if (sockfd < 0) { @@ -353,6 +459,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ return (status); } amqp_set_sockfd (conf->connection, sockfd); +#endif reply = amqp_login (conf->connection, CONF(conf, vhost), /* channel max = */ 0, @@ -365,7 +472,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.", CONF(conf, vhost), CONF(conf, user)); amqp_destroy_connection (conf->connection); - close (sockfd); + CLOSE_SOCKET (); conf->connection = NULL; return (1); } @@ -378,7 +485,7 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ ERROR ("amqp plugin: amqp_channel_open failed."); amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS); amqp_destroy_connection (conf->connection); - close(sockfd); + CLOSE_SOCKET (); conf->connection = NULL; return (1); } @@ -386,12 +493,16 @@ static int camqp_connect (camqp_config_t *conf) /* {{{ */ INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" " "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port); + status = camqp_create_exchange (conf); + if (status != 0) + return (status); + if (!conf->publish) return (camqp_setup_queue (conf)); return (0); } /* }}} int camqp_connect */ -static int shutdown (void) /* {{{ */ +static int camqp_shutdown (void) /* {{{ */ { size_t i; @@ -414,13 +525,13 @@ static int shutdown (void) /* {{{ */ DEBUG ("amqp plugin: All subscriber threads exited."); return (0); -} /* }}} int shutdown */ +} /* }}} int camqp_shutdown */ /* * Subscribing code */ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ - size_t body_size) + size_t body_size, const char *content_type) { char body[body_size + 1]; char *body_ptr; @@ -464,7 +575,7 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ received += frame.payload.body_fragment.len; } /* while (received < body_size) */ - if (conf->format == CAMQP_FORMAT_COMMAND) + if (strcasecmp ("text/collectd", content_type) == 0) { status = handle_putval (stderr, body); if (status != 0) @@ -472,7 +583,7 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ status); return (status); } - else if (conf->format == CAMQP_FORMAT_JSON) + else if (strcasecmp ("application/json", content_type) == 0) { ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not " "been implemented yet. FIXME!"); @@ -480,8 +591,8 @@ static int camqp_read_body (camqp_config_t *conf, /* {{{ */ } else { - ERROR ("amqp plugin: camqp_read_body: Unknown format option (%i).", - conf->format); + ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".", + content_type); return (EINVAL); } @@ -493,6 +604,8 @@ static int camqp_read_header (camqp_config_t *conf) /* {{{ */ { int status; amqp_frame_t frame; + amqp_basic_properties_t *properties; + char *content_type; status = amqp_simple_wait_frame (conf->connection, &frame); if (status < 0) @@ -512,7 +625,20 @@ static int camqp_read_header (camqp_config_t *conf) /* {{{ */ return (-1); } - return (camqp_read_body (conf, frame.payload.properties.body_size)); + properties = frame.payload.properties.decoded; + content_type = camqp_bytes_cstring (&properties->content_type); + if (content_type == NULL) + { + ERROR ("amqp plugin: Unable to determine content type."); + return (-1); + } + + status = camqp_read_body (conf, + (size_t) frame.payload.properties.body_size, + content_type); + + sfree (content_type); + return (status); } /* }}} int camqp_read_header */ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ @@ -520,6 +646,8 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ camqp_config_t *conf = user_data; int status; + cdtime_t interval = plugin_get_interval (); + while (subscriber_threads_running) { amqp_frame_t frame; @@ -527,19 +655,25 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ status = camqp_connect (conf); if (status != 0) { + struct timespec ts_interval; ERROR ("amqp plugin: camqp_connect failed. " - "Will sleep for %i seconds.", interval_g); - sleep (interval_g); + "Will sleep for %.3f seconds.", + CDTIME_T_TO_DOUBLE (interval)); + CDTIME_T_TO_TIMESPEC (interval, &ts_interval); + nanosleep (&ts_interval, /* remaining = */ NULL); continue; } status = amqp_simple_wait_frame (conf->connection, &frame); if (status < 0) { + struct timespec ts_interval; ERROR ("amqp plugin: amqp_simple_wait_frame failed. " - "Will sleep for %i seconds.", interval_g); + "Will sleep for %.3f seconds.", + CDTIME_T_TO_DOUBLE (interval)); camqp_close_connection (conf); - sleep (interval_g); + CDTIME_T_TO_TIMESPEC (interval, &ts_interval); + nanosleep (&ts_interval, /* remaining = */ NULL); continue; } @@ -564,6 +698,7 @@ static void *camqp_subscribe_thread (void *user_data) /* {{{ */ camqp_config_free (conf); pthread_exit (NULL); + return (NULL); } /* }}} void *camqp_subscribe_thread */ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ @@ -583,7 +718,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ tmp = subscriber_threads + subscriber_threads_num; memset (tmp, 0, sizeof (*tmp)); - status = pthread_create (tmp, /* attr = */ NULL, + status = plugin_thread_create (tmp, /* attr = */ NULL, camqp_subscribe_thread, conf); if (status != 0) { @@ -602,6 +737,7 @@ static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */ /* * Publishing code */ +/* XXX: You must hold "conf->lock" when calling this function! */ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ const char *buffer, const char *routing_key) { @@ -616,7 +752,14 @@ static int camqp_write_locked (camqp_config_t *conf, /* {{{ */ props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_APP_ID_FLAG; - props.content_type = amqp_cstring_bytes("application/json"); + if (conf->format == CAMQP_FORMAT_COMMAND) + props.content_type = amqp_cstring_bytes("text/collectd"); + else if (conf->format == CAMQP_FORMAT_JSON) + props.content_type = amqp_cstring_bytes("application/json"); + else if (conf->format == CAMQP_FORMAT_GRAPHITE) + props.content_type = amqp_cstring_bytes("text/graphite"); + else + assert (23 == 42); props.delivery_mode = conf->delivery_mode; props.app_id = amqp_cstring_bytes("collectd"); @@ -693,6 +836,18 @@ static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates); format_json_finalize (buffer, &bfill, &bfree); } + else if (conf->format == CAMQP_FORMAT_GRAPHITE) + { + status = format_graphite (buffer, sizeof (buffer), ds, vl, + conf->prefix, conf->postfix, conf->escape_char, + conf->graphite_flags); + if (status != 0) + { + ERROR ("amqp plugin: format_graphite failed with status %i.", + status); + return (status); + } + } else { ERROR ("amqp plugin: Invalid format (%i).", conf->format); @@ -725,6 +880,8 @@ static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */ conf->format = CAMQP_FORMAT_COMMAND; else if (strcasecmp ("JSON", string) == 0) conf->format = CAMQP_FORMAT_JSON; + else if (strcasecmp ("Graphite", string) == 0) + conf->format = CAMQP_FORMAT_GRAPHITE; else { WARNING ("amqp plugin: Invalid format string: %s", @@ -765,6 +922,10 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ /* publish only */ conf->delivery_mode = CAMQP_DM_VOLATILE; conf->store_rates = 0; + /* publish & graphite only */ + conf->prefix = NULL; + conf->postfix = NULL; + conf->escape_char = '_'; /* subscribe only */ conf->exchange_type = NULL; conf->queue = NULL; @@ -784,9 +945,7 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ { oconfig_item_t *child = ci->children + i; - if (strcasecmp ("Format", child->key) == 0) - status = camqp_config_set_format (child, conf); - else if (strcasecmp ("Host", child->key) == 0) + if (strcasecmp ("Host", child->key) == 0) status = cf_util_get_string (child, &conf->host); else if (strcasecmp ("Port", child->key) == 0) { @@ -821,7 +980,27 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ conf->delivery_mode = CAMQP_DM_VOLATILE; } else if ((strcasecmp ("StoreRates", child->key) == 0) && publish) + { status = cf_util_get_boolean (child, &conf->store_rates); + (void) cf_util_get_flag (child, &conf->graphite_flags, + GRAPHITE_STORE_RATES); + } + else if ((strcasecmp ("Format", child->key) == 0) && publish) + status = camqp_config_set_format (child, conf); + else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish) + status = cf_util_get_string (child, &conf->prefix); + else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish) + status = cf_util_get_string (child, &conf->postfix); + else if ((strcasecmp ("GraphiteEscapeChar", child->key) == 0) && publish) + { + char *tmp_buff = NULL; + status = cf_util_get_string (child, &tmp_buff); + if (strlen (tmp_buff) > 1) + WARNING ("amqp plugin: The option \"GraphiteEscapeChar\" handles " + "only one character. Others will be ignored."); + conf->escape_char = tmp_buff[0]; + sfree (tmp_buff); + } else WARNING ("amqp plugin: Ignoring unknown " "configuration option \"%s\".", child->key); @@ -830,15 +1009,16 @@ static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */ break; } /* for (i = 0; i < ci->children_num; i++) */ - if ((status == 0) && !publish && (conf->exchange == NULL)) + if ((status == 0) && (conf->exchange == NULL)) { - if (conf->routing_key != NULL) - WARNING ("amqp plugin: The option \"RoutingKey\" was given " - "without the \"Exchange\" option. It will be ignored."); - if (conf->exchange_type != NULL) WARNING ("amqp plugin: The option \"ExchangeType\" was given " "without the \"Exchange\" option. It will be ignored."); + + if (!publish && (conf->routing_key != NULL)) + WARNING ("amqp plugin: The option \"RoutingKey\" was given " + "without the \"Exchange\" option. It will be ignored."); + } if (status != 0) @@ -903,7 +1083,7 @@ static int camqp_config (oconfig_item_t *ci) /* {{{ */ void module_register (void) { plugin_register_complex_config ("amqp", camqp_config); - plugin_register_shutdown ("amqp", shutdown); + plugin_register_shutdown ("amqp", camqp_shutdown); } /* void module_register */ /* vim: set sw=4 sts=4 et fdm=marker : */