From 7feccc9a58e2c75b87f9f2883c9b4b5a0612938e Mon Sep 17 00:00:00 2001 From: Andrew Smith Date: Thu, 10 Aug 2017 10:00:30 -0400 Subject: [PATCH] initial commit for amqp1 plugin --- AUTHORS | 5 +- Makefile.am | 12 + README | 14 +- configure.ac | 44 ++++ src/amqp1.c | 607 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/collectd.conf.in | 23 ++ src/collectd.conf.pod | 154 ++++++++++++- src/utils_deq.h | 180 +++++++++++++++ 8 files changed, 1033 insertions(+), 6 deletions(-) create mode 100644 src/amqp1.c create mode 100644 src/utils_deq.h diff --git a/AUTHORS b/AUTHORS index 4df743c5..409655ac 100644 --- a/AUTHORS +++ b/AUTHORS @@ -59,6 +59,9 @@ Andreas Henriksson Andy Parkins - battery plugin: sysfs code. +Andy Smith + - AMQP 1.0 plugin. + Anthony Dewhurst - zfs_arc plugin. @@ -286,7 +289,7 @@ Scott Sanders - Write-Graphite plugin. Sebastien Pahl - - AMQP plugin. + - AMQP 0.9 plugin. Serhiy Pshyk - intel_pmu plugin diff --git a/Makefile.am b/Makefile.am index e37716f6..facbc976 100644 --- a/Makefile.am +++ b/Makefile.am @@ -542,6 +542,18 @@ amqp_la_LIBADD = \ libformat_json.la endif +if BUILD_PLUGIN_AMQP1 +pkglib_LTLIBRARIES += amqp1.la +amqp1_la_SOURCES = src/amqp1.c +amqp1_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBQPIDPROTON_CPPFLAGS) +amqp1_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBQPIDPROTON_LDFLAGS) +amqp1_la_LIBADD = \ + $(BUILD_WITH_LIBQPIDPROTON_LIBS) \ + libcmds.la \ + libformat_graphite.la \ + libformat_json.la +endif + if BUILD_PLUGIN_APACHE pkglib_LTLIBRARIES += apache.la apache_la_SOURCES = src/apache.c diff --git a/README b/README index ca86c84d..0f378e56 100644 --- a/README +++ b/README @@ -459,7 +459,11 @@ Features - amqp Sends JSON-encoded data to an Advanced Message Queuing Protocol (AMQP) - server, such as RabbitMQ. + 0.9.1 server, such as RabbitMQ. + + - amqp1 + Sends JSON-encoded data to an Advanced Message Queuing Protocol (AMQP) + 1.0 server, such as Qpid Dispatch Router or Apache Artemis Broker. - csv Write to comma separated values (CSV) files. This needs lots of @@ -902,8 +906,14 @@ Prerequisites are supported. + * libqpid-proton (optional) + Used by the `amqp1' plugin for AMQP 1.0 connections, for example to + Qdrouterd. + + * librabbitmq (optional; also called “rabbitmq-c”) - Used by the `amqp' plugin for AMQP connections, for example to RabbitMQ. + Used by the `amqp' plugin for AMQP 0.9.1 connections, for example to + RabbitMQ. * librdkafka (optional; also called “rdkafka”) diff --git a/configure.ac b/configure.ac index 352a7b66..73436288 100644 --- a/configure.ac +++ b/configure.ac @@ -4686,6 +4686,47 @@ if test "$with_libpython" != "xno"; then fi # }}} --with-libpython +# --with-libqpid_proton {{{ +AC_ARG_WITH([libqpid_proton], + [AS_HELP_STRING([--with-libqpid_proton@<:@=PREFIX@:>@], [Path to libqpid_proton.])], + [ + if test "x$withval" != "xno" && test "x$withval" != "xyes"; then + with_libqpid_proton_cppflags="-I$withval/include" + with_libqpid_proton_ldflags="-L$withval/lib" + with_libqpid_proton="yes" + else + with_libqpid_proton="$withval" + fi + ], + [with_libqpid_proton="yes"] +) + +if test "x$with_libqpid_proton" = "xyes"; then + SAVE_CPPFLAGS="$CPPFLAGS" + CPPFLAGS="$CPPFLAGS $with_libqpid_proton_cppflags" + + AC_CHECK_HEADERS([proton/proactor.h], + [with_libqpid_proton="yes"], + [with_libqpid_proton="no (proton/proactor.h not found)"] + ) + + CPPFLAGS="$SAVE_CPPFLAGS" +fi + +# add library checks for function calls + +if test "x$with_libqpid_proton" = "xyes"; then + BUILD_WITH_LIBQPIDPROTON_CPPFLAGS="$with_libqpid_proton_cppflags" + BUILD_WITH_LIBQPIDPROTON_LDFLAGS="$with_libqpid_proton_ldflags" + BUILD_WITH_LIBQPIDPROTON_LIBS="-lqpid-proton" +fi + +AC_SUBST(BUILD_WITH_LIBQPIDPROTON_CPPFLAGS) +AC_SUBST(BUILD_WITH_LIBQPIDPROTON_LDFLAGS) +AC_SUBST(BUILD_WITH_LIBQPIDPROTON_LIBS) + +# }}} + # --with-librabbitmq {{{ AC_ARG_WITH([librabbitmq], [AS_HELP_STRING([--with-librabbitmq@<:@=PREFIX@:>@], [Path to librabbitmq.])], @@ -6511,6 +6552,7 @@ m4_divert_once([HELP_ENABLE], []) AC_PLUGIN([aggregation], [yes], [Aggregation plugin]) AC_PLUGIN([amqp], [$with_librabbitmq], [AMQP output plugin]) +AC_PLUGIN([amqp1], [$with_libqpid_proton], [AMQP 1.0 output plugin]) AC_PLUGIN([apache], [$with_libcurl], [Apache httpd statistics]) AC_PLUGIN([apcups], [yes], [Statistics of UPSes by APC]) AC_PLUGIN([apple_sensors], [$with_libiokit], [Apple hardware sensors]) @@ -6899,6 +6941,7 @@ AC_MSG_RESULT([ libpqos . . . . . . . $with_libpqos]) AC_MSG_RESULT([ libprotobuf . . . . . $with_libprotobuf]) AC_MSG_RESULT([ libprotobuf-c . . . . $with_libprotobuf_c]) AC_MSG_RESULT([ libpython . . . . . . $with_libpython]) +AC_MSG_RESULT([ libqpid-proton . . . $with_libqpid_proton]) AC_MSG_RESULT([ librabbitmq . . . . . $with_librabbitmq]) AC_MSG_RESULT([ libriemann-client . . $with_libriemann_client]) AC_MSG_RESULT([ librdkafka . . . . . $with_librdkafka]) @@ -6930,6 +6973,7 @@ AC_MSG_RESULT() AC_MSG_RESULT([ Modules:]) AC_MSG_RESULT([ aggregation . . . . . $enable_aggregation]) AC_MSG_RESULT([ amqp . . . . . . . $enable_amqp]) +AC_MSG_RESULT([ amqp1 . . . . . . . $enable_amqp1]) AC_MSG_RESULT([ apache . . . . . . . $enable_apache]) AC_MSG_RESULT([ apcups . . . . . . . $enable_apcups]) AC_MSG_RESULT([ apple_sensors . . . . $enable_apple_sensors]) diff --git a/src/amqp1.c b/src/amqp1.c new file mode 100644 index 00000000..dcd17dd0 --- /dev/null +++ b/src/amqp1.c @@ -0,0 +1,607 @@ +/** + * collectd - src/amqp1.c + * Copyright(c) 2017 Red Hat Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Andy Smith + */ + +#include "collectd.h" + +#include "common.h" +#include "plugin.h" +#include "utils_cmd_putval.h" +#include "utils_format_graphite.h" +#include "utils_format_json.h" +#include "utils_random.h" +#include "utils_deq.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#define BUFSIZE 8192 +#define AMQP1_FORMAT_JSON 0 +#define AMQP1_FORMAT_COMMAND 1 +#define AMQP1_FORMAT_GRAPHITE 2 + +typedef struct amqp1_config_transport_t { + DEQ_LINKS(struct amqp1_config_transport_t); + char *name; + char *host; + char *port; + char *user; + char *password; + char *address; +} amqp1_config_transport_t; + +typedef struct amqp1_config_instance_t { + DEQ_LINKS(struct amqp1_config_instance_t); + char *name; + uint8_t format; + unsigned int graphite_flags; + _Bool store_rates; + char *prefix; + char *postfix; + char escape_char; + _Bool pre_settle; + char send_to[128]; +} amqp1_config_instance_t; + +DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t); + +typedef struct cd_message_t { + DEQ_LINKS(struct cd_message_t); + pn_bytes_t mbuf; + amqp1_config_instance_t *instance; +} cd_message_t; + +DEQ_DECLARE(cd_message_t, cd_message_list_t); + +/* + * Globals + */ +pn_connection_t *conn = NULL; +pn_session_t *ssn = NULL; +pn_link_t *sender = NULL; +pn_proactor_t *proactor = NULL; +pthread_mutex_t send_lock; +cd_message_list_t out_messages; +uint64_t cd_tag = 1; +uint64_t acknowledged = 0; +amqp1_config_transport_t *transport = NULL; +bool finished = false; + +static int event_thread_running = 0; +static pthread_t event_thread_id; + +/* + * Functions + */ +static void cd_message_free(cd_message_t *cdm) +{ + if (cdm->mbuf.start) { + free((void *)cdm->mbuf.start); + } + free(cdm); +} /* }}} void cd_message_free */ + +static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ +{ + uint64_t dtag; + cd_message_list_t to_send; + cd_message_t *cdm; + int link_credit = pn_link_credit(link); + int event_count = 0; + pn_delivery_t *dlv; + + DEQ_INIT(to_send); + + pthread_mutex_lock(&send_lock); + + if (link_credit > 0) { + dtag = cd_tag; + cdm = DEQ_HEAD(out_messages); + while (cdm) { + DEQ_REMOVE_HEAD(out_messages); + DEQ_INSERT_TAIL(to_send, cdm); + if (DEQ_SIZE(to_send) == link_credit) + break; + cdm = DEQ_HEAD(out_messages); + } + cd_tag += DEQ_SIZE(to_send); + } + + pthread_mutex_unlock(&send_lock); + + /* message is already formatted and encoded */ + cdm = DEQ_HEAD(to_send); + while (cdm) { + DEQ_REMOVE_HEAD(to_send); + dtag++; + dlv = pn_delivery(link, pn_dtag((const char*)&dtag, sizeof(dtag))); + pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size); + pn_link_advance(link); + if (cdm->instance->pre_settle == true) { + pn_delivery_settle(dlv); + } + event_count++; + cd_message_free(cdm); + cdm = DEQ_HEAD(to_send); + } + + return event_count; +} /* }}} int amqp1_send_out_messages */ + +static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */ +{ + if (pn_condition_is_set(cond)) { + ERROR("amqp1 plugin: %s: %s: %s", + pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), + pn_condition_get_description(cond)); + pn_connection_close(pn_event_connection(e)); + conn = NULL; + } +} /* }}} void check_condition */ + +static bool handle(pn_event_t *event) /* {{{ */ +{ + + switch (pn_event_type(event)) { + + case PN_CONNECTION_INIT:{ + conn = pn_event_connection(event); + pn_connection_set_container(conn, transport->address); + pn_connection_open(conn); + ssn = pn_session(conn); + pn_session_open(ssn); + sender = pn_sender(ssn, "cd-sender"); + pn_link_set_snd_settle_mode(sender, PN_SND_MIXED); + pn_link_open(sender); + break; + } + + case PN_LINK_FLOW: { + /* peer has given us credit, send outbound messages */ + amqp1_send_out_messages(sender); + break; + } + + case PN_DELIVERY: { + /* acknowledgement from peer that a message was delivered */ + pn_delivery_t * dlv = pn_event_delivery(event); + if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) { + acknowledged++; + } + break; + } + + case PN_CONNECTION_WAKE: { + if (!finished) { + amqp1_send_out_messages(sender); + } + break; + } + + case PN_TRANSPORT_CLOSED: { + check_condition(event, pn_transport_condition(pn_event_transport(event))); + break; + } + + case PN_CONNECTION_REMOTE_CLOSE: { + check_condition(event, pn_session_remote_condition(pn_event_session(event))); + pn_connection_close(pn_event_connection(event)); + break; + } + + case PN_SESSION_REMOTE_CLOSE: { + check_condition(event, pn_session_remote_condition(pn_event_session(event))); + pn_connection_close(pn_event_connection(event)); + break; + } + + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: { + check_condition(event, pn_link_remote_condition(pn_event_link(event))); + pn_connection_close(pn_event_connection(event)); + break; + } + + case PN_PROACTOR_INACTIVE: { + return false; + } + + default: break; + } + return true; +} /* }}} bool handle */ + +static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ +{ + + do { + pn_event_batch_t *events = pn_proactor_wait(proactor); + pn_event_t *e; + for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) { + if (!handle(e)) { + finished = true; + } + } + pn_proactor_done(proactor, events); + } while (!finished); + + event_thread_running = 0; + + return NULL; +} /* }}} void event_thread */ + +static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ + user_data_t *user_data) +{ + amqp1_config_instance_t *instance = user_data->data; + int status = 0; + size_t bfree = BUFSIZE; + size_t bfill = 0; + cd_message_t *cdm; + size_t bufsize = BUFSIZE; + pn_data_t *body; + pn_message_t *message; + + if ((ds == NULL) || (vl == NULL) || (transport == NULL)) + return EINVAL; + + cdm = NEW(cd_message_t); + DEQ_ITEM_INIT(cdm); + cdm->mbuf = pn_bytes(bufsize, (char *) malloc(bufsize)); + cdm->instance = instance; + + switch (instance->format) { + case AMQP1_FORMAT_COMMAND: + status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl); + if (status != 0) { + ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status); + return status; + } + cdm->mbuf.size = strlen(cdm->mbuf.start); + break; + case AMQP1_FORMAT_JSON: + format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree); + format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl, + instance->store_rates); + format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree); + cdm->mbuf.size = strlen(cdm->mbuf.start); + break; + case AMQP1_FORMAT_GRAPHITE: + status = + format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl, instance->prefix, + instance->postfix, instance->escape_char, instance->graphite_flags); + if (status != 0) { + ERROR("amqp1 plugin: format_graphite failed with status %i.", status); + return status; + } + cdm->mbuf.size = strlen(cdm->mbuf.start); + break; + default: + ERROR("amqp1 plugin: Invalid format (%i).", instance->format); + return -1; + } + + /* encode message */ + message = pn_message(); + pn_message_set_address(message, instance->send_to); + body = pn_message_body(message); + pn_data_clear(body); + pn_data_put_binary(body, cdm->mbuf); + pn_data_exit(body); + + /* put_binary copies and stores so ok to use mbuf */ + cdm->mbuf.size = bufsize; + pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size); + + pthread_mutex_lock(&send_lock); + DEQ_INSERT_TAIL(out_messages, cdm); + pthread_mutex_unlock(&send_lock); + + pn_message_free(message); + + /* activate the sender */ + if (conn != NULL) { + pn_connection_wake(conn); + } + + return 0; +} /* }}} int amqp_write1 */ + +static void amqp1_config_transport_free(void *ptr) /* {{{ */ +{ + amqp1_config_transport_t *transport = ptr; + + if (transport == NULL) + return; + + sfree(transport->name); + sfree(transport->host); + sfree(transport->user); + sfree(transport->password); + sfree(transport->address); + + sfree(transport); +} /* }}} void amqp1_config_transport_free */ + +static void amqp1_config_instance_free(void *ptr) /* {{{ */ +{ + amqp1_config_instance_t *instance = ptr; + + if (instance == NULL) + return; + + sfree(instance->name); + sfree(instance->prefix); + sfree(instance->postfix); + + sfree(instance); +} /* }}} void amqp1_config_instance_free */ + +static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ +{ + int status=0; + char *key = NULL; + amqp1_config_instance_t *instance; + + instance = calloc(1, sizeof(*instance)); + if (instance == NULL) { + ERROR("amqp1 plugin: calloc failed."); + return ENOMEM; + } + + /* Initialize instance configuration {{{ */ + instance->name = NULL; + + status = cf_util_get_string(ci, &instance->name); + if (status != 0) { + sfree(instance); + return status; + } + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("PreSettle", child->key) == 0) + status = cf_util_get_boolean(child, &instance->pre_settle); + else if (strcasecmp("Format", child->key) == 0) { + status = cf_util_get_string(child, &key); + if (status != 0) + return status; + /* TODO: goto errout */ + // goto errout; + assert(key != NULL); + if (strcasecmp(key, "Command") == 0) { + instance->format = AMQP1_FORMAT_COMMAND; + } else if (strcasecmp(key, "Graphite") == 0) { + instance->format = AMQP1_FORMAT_GRAPHITE; + } else if (strcasecmp(key, "JSON") == 0) { + instance->format = AMQP1_FORMAT_JSON; + } else { + WARNING("amqp1 plugin: Invalid format string: %s", key); + } + sfree(key); + } + else if (strcasecmp("StoreRates", child->key) == 0) + status = cf_util_get_boolean(child, &instance->store_rates); + else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0) + status = cf_util_get_flag(child, &instance->graphite_flags, + GRAPHITE_SEPARATE_INSTANCES); + else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0) + status = cf_util_get_flag(child, &instance->graphite_flags, + GRAPHITE_ALWAYS_APPEND_DS); + else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0) + status = cf_util_get_flag(child, &instance->graphite_flags, + GRAPHITE_PRESERVE_SEPARATOR); + else if (strcasecmp("GraphitePrefix", child->key) == 0) + status = cf_util_get_string(child, &instance->prefix); + else if (strcasecmp("GraphitePostfix", child->key) == 0) + status = cf_util_get_string(child, &instance->postfix); + else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) { + char *tmp_buff = NULL; + status = cf_util_get_string(child, &tmp_buff); + if (strlen(tmp_buff) > 1) + WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles " + "only one character. Others will be ignored."); + instance->escape_char = tmp_buff[0]; + sfree(tmp_buff); + } + else + WARNING("amqp1 plugin: Ignoring unknown " + "instance configuration option " + "\%s\".", child->key); + if (status != 0) + break; + } + + if (status != 0) { + amqp1_config_instance_free(instance); + return status; + } else { + char tpname[128]; + snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); + snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", + transport->address,instance->name); + status = plugin_register_write(tpname, amqp1_write, &(user_data_t) { + .data = instance, .free_func = amqp1_config_instance_free, }); + if (status != 0) { + amqp1_config_instance_free(instance); + } + } + + return status; +} /* }}} int amqp1_config_instance */ + +static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ +{ + int status=0; + + transport = calloc(1, sizeof(*transport)); + if (transport == NULL) { + ERROR("amqp1 plugin: calloc failed."); + return ENOMEM; + } + + /* Initialize transport configuration {{{ */ + transport->name = NULL; + + status = cf_util_get_string(ci, &transport->name); + if (status != 0) { + sfree(transport); + return status; + } + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Host", child->key) == 0) + status = cf_util_get_string(child, &transport->host); + else if (strcasecmp("Port", child->key) == 0) + status = cf_util_get_string(child, &transport->port); + else if (strcasecmp("User", child->key) == 0) + status = cf_util_get_string(child, &transport->user); + else if (strcasecmp("Password", child->key) == 0) + status = cf_util_get_string(child, &transport->password); + else if (strcasecmp("Address", child->key) == 0) + status = cf_util_get_string(child, &transport->address); + else if (strcasecmp("Instance",child->key) == 0) + amqp1_config_instance(child); + else + WARNING("amqp1 plugin: Ignoring unknown " + "transport configuration option " + "\%s\".", child->key); + + if (status != 0) + break; + } + + if (status != 0) { + amqp1_config_transport_free(transport); + } + return status; +} /* }}} int amqp1_config_transport */ + +static int amqp1_config(oconfig_item_t *ci) /* {{{ */ +{ + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Transport", child->key) == 0) + amqp1_config_transport(child); + else + WARNING("amqp1 plugin: Ignoring unknown config iption \%s\".", + child->key); + } + + return 0; +} /* }}} int amqp1_config */ + +static int amqp1_init(void) /* {{{ */ +{ + char addr[PN_MAX_ADDR]; + int status; + char errbuf[1024]; + + if (transport == NULL) { + ERROR("amqp1: init failed, no transport configured"); + return -1; + } + + if (proactor == NULL) { + pthread_mutex_init(&send_lock, /* attr = */ NULL); + proactor = pn_proactor(); + pn_proactor_addr(addr, sizeof(addr),transport->host,transport->port); + conn = pn_connection(); + if (transport->user != NULL) { + pn_connection_set_user(conn, transport->user); + pn_connection_set_password(conn, transport->password); + } + pn_proactor_connect(proactor, conn, addr); + /* start_thread */ + status = plugin_thread_create(&event_thread_id, NULL /* no attributes */, + event_thread, NULL /* no argument */, + "handle"); + if (status != 0) { + ERROR("amqp1: pthread_create failed: %s", + sstrerror(errno, errbuf, sizeof(errbuf))); + } else { + event_thread_running = 1; + } + } + return 0; +} /* }}} int amqp1_init */ + +static int amqp1_shutdown +(void) /* {{{ */ +{ + cd_message_t *cdm; + + /* Stop the proactor thread */ + if (event_thread_running != 0) { + finished=true; + /* activate the event thread */ + pn_connection_wake(conn); + pthread_join(event_thread_id, NULL /* no return value */); + memset(&event_thread_id, 0, sizeof(event_thread_id)); + } + + /* Free the remaining out_messages */ + cdm = DEQ_HEAD(out_messages); + while (cdm) { + DEQ_REMOVE_HEAD(out_messages); + cd_message_free(cdm); + cdm = DEQ_HEAD(out_messages); + } + + if (proactor != NULL) { + pn_proactor_free(proactor); + } + + if (transport != NULL) { + amqp1_config_transport_free(transport); + } + + return 0; +} /* }}} int amqp1_shutdown */ + +void module_register(void) +{ + plugin_register_complex_config("amqp1", amqp1_config); + plugin_register_init("amqp1", amqp1_init); + plugin_register_shutdown("amqp1",amqp1_shutdown); +} /* void module_register */ diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 6ec61f32..ce1c00bb 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -90,6 +90,7 @@ #@BUILD_PLUGIN_AGGREGATION_TRUE@LoadPlugin aggregation #@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp +#@BUILD_PLUGIN_AMQP1_TRUE@LoadPlugin amqp1 #@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache #@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups #@BUILD_PLUGIN_APPLE_SENSORS_TRUE@LoadPlugin apple_sensors @@ -269,6 +270,28 @@ # # +# +# +# Host "localhost" +# Port "5672" +# User "guest" +# Password "guest" +# Address "collectd" +# +# Format JSON +# PreSettle false +# +# +# Format JSON +# PreSettle true +# +# +# Format JSON +# PreSettle false +# +# +# + # # # URL "http://localhost/status?auto" diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index dfd785a2..2dca2f53 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -530,9 +530,9 @@ are disabled by default. =head2 Plugin C The I can be used to communicate with other instances of -I or third party applications using an AMQP message broker. Values -are sent to or received from the broker, which handles routing, queueing and -possibly filtering out messages. +I or third party applications using an AMQP 0.9.1 message broker. +Values are sent to or received from the broker, which handles routing, +queueing and possibly filtering out messages. B @@ -738,6 +738,154 @@ is preserved, i.e. passed through. =back +=head2 Plugin C + +The I can be used to communicate with other instances of +I or third party applications using an AMQP 1.0 message +intermediary. Values are sent to the messaging intermediary which +may handle direct messaging or queue based transfer. + +B + + + # Send values to an AMQP 1.0 intermediary + + Host "localhost" + Port "5672" + User "guest" + Password "guest" + Address "collectd" + + Format "command" + PreSettle false + # StoreRates false + # GraphitePrefix "collectd." + # GraphiteEscapeChar "_" + # GraphiteSeparateInstances false + # GraphiteAlwaysAppendDS false + # GraphitePreserveSeparator false + + + + +The plugin's configuration consists of a I that configures +communications to the AMQP 1.0 messaging bus and one or more I +corresponding to publishers to the messaging system. The address in +the I block concatenated with the name given int the +I block starting tag will be used as the send-to address for +communications over the messaging link. + +The following options are accepted within each I block: + +=over 4 + +=item B I + +Hostname or IP-address of the AMQP 1.0 intermediary. Defaults to the +default behavior of the underlying communications library, +I, which is "localhost". + +=item B I + +Service name or port number on which the AMQP 1.0 intermediary accepts +connections. This argument must be a string, even if the numeric form +is used. Defaults to "5672". + +=item B I + +=item B I + +Credentials used to authenticate to the AMQP 1.0 intermediary. By +default "guest"/"guest" is used. + +=item B
I
+ +This option specifies the prefix for the send-to value in the message. +By default, "collectd" will be used. + +=back + +The following options are accepted within each I block: + +=over 4 + +=item B B|B|B + +Selects the format in which messages are sent to the intermediary. If set to +B (the default), values are sent as C commands which are +identical to the syntax used by the I and I. In this +case, the C header field will be set to C. + +If set to B, the values are encoded in the I, +an easy and straight forward exchange format. The C header field +will be set to C. + +If set to B, values are encoded in the I format, which is +" \n". The C header field will be set to +C. + +A subscribing client I use the C header field to +determine how to decode the values. + +=item B B|B + +If set to B (the default), the plugin will wait for a message +acknowledgement from the messaging bus before sending the next +message. This indicates transfer of ownership to the messaging +system. If set to B, the plugin will not wait for a message +acknowledgement and the message may be dropped prior to transfer of +ownership. + +=item B B|B + +Determines whether or not C, C and C data sources +are converted to a I (i.e. a C value). If set to B (the +default), no conversion is performed. Otherwise the conversion is performed +using the internal value cache. + +Please note that currently this option is only used if the B option has +been set to B. + +=item B + +A prefix can be added in the metric name when outputting in the I format. +It's added before the I name. +Metric name will be "" + +=item B + +A postfix can be added in the metric name when outputting in the I format. +It's added after the I name. +Metric name will be "" + +=item B + +Specify a character to replace dots (.) in the host part of the metric name. +In I metric name, dots are used as separators between different +metric parts (host, plugin, type). +Default is "_" (I). + +=item B B|B + +If set to B, the plugin instance and type instance will be in their own +path component, for example C. If set to B (the +default), the plugin and plugin instance (and likewise the type and type +instance) are put into one component, for example C. + +=item B B|B + +If set to B, append the name of the I (DS) to the "metric" +identifier. If set to B (the default), this is only done when there is +more than one DS. + +=item B B|B + +If set to B (the default) the C<.> (dot) character is replaced with +I. Otherwise, if set to B, the C<.> (dot) character +is preserved, i.e. passed through. + +=back + =head2 Plugin C To configure the C-plugin you first need to configure the Apache diff --git a/src/utils_deq.h b/src/utils_deq.h new file mode 100644 index 00000000..feb7df9e --- /dev/null +++ b/src/utils_deq.h @@ -0,0 +1,180 @@ +#ifndef utils_deq_h +#define utils_deq_h 1 + +#include +#include +#include + +#define CT_ASSERT(exp) { assert(exp); } + +#define NEW(t) (t*) malloc(sizeof(t)) +#define NEW_ARRAY(t,n) (t*) malloc(sizeof(t)*(n)) +#define NEW_PTR_ARRAY(t,n) (t**) malloc(sizeof(t*)*(n)) + +// +// If available, use aligned_alloc for cache-line-aligned allocations. Otherwise +// fall back to plain malloc. +// +#define NEW_CACHE_ALIGNED(t,p) \ +do { \ + if (posix_memalign((void*) &(p), 64, (sizeof(t) + (sizeof(t) % 64 ? 64 - (sizeof(t) % 64) : 0))) != 0) (p) = 0; \ +} while (0) + +#define ALLOC_CACHE_ALIGNED(s,p) \ +do { \ + if (posix_memalign((void*) &(p), 64, (s + (s % 64 ? 64 - (s % 64) : 0))) != 0) (p) = 0; \ +} while (0) + +#define ZERO(p) memset(p, 0, sizeof(*p)) + +#define DEQ_DECLARE(i,d) typedef struct { \ + i *head; \ + i *tail; \ + i *scratch; \ + size_t size; \ + } d + +#define DEQ_LINKS_N(n,t) t *prev##n; t *next##n +#define DEQ_LINKS(t) DEQ_LINKS_N(,t) +#define DEQ_EMPTY {0,0,0,0} + +#define DEQ_INIT(d) do { (d).head = 0; (d).tail = 0; (d).scratch = 0; (d).size = 0; } while (0) +#define DEQ_IS_EMPTY(d) ((d).head == 0) +#define DEQ_ITEM_INIT_N(n,i) do { (i)->next##n = 0; (i)->prev##n = 0; } while(0) +#define DEQ_ITEM_INIT(i) DEQ_ITEM_INIT_N(,i) +#define DEQ_HEAD(d) ((d).head) +#define DEQ_TAIL(d) ((d).tail) +#define DEQ_SIZE(d) ((d).size) +#define DEQ_NEXT_N(n,i) (i)->next##n +#define DEQ_NEXT(i) DEQ_NEXT_N(,i) +#define DEQ_PREV_N(n,i) (i)->prev##n +#define DEQ_PREV(i) DEQ_PREV_N(,i) +#define DEQ_MOVE(d1,d2) do {d2 = d1; DEQ_INIT(d1);} while (0) +/** + *@pre ptr points to first element of deq + *@post ptr points to first element of deq that passes test, or 0. Test should involve ptr. + */ +#define DEQ_FIND_N(n,ptr,test) while((ptr) && !(test)) ptr = DEQ_NEXT_N(n,ptr); +#define DEQ_FIND(ptr,test) DEQ_FIND_N(,ptr,test) + +#define DEQ_INSERT_HEAD_N(n,d,i) \ +do { \ + CT_ASSERT((i)->next##n == 0); \ + CT_ASSERT((i)->prev##n == 0); \ + if ((d).head) { \ + (i)->next##n = (d).head; \ + (d).head->prev##n = i; \ + } else { \ + (d).tail = i; \ + (i)->next##n = 0; \ + CT_ASSERT((d).size == 0); \ + } \ + (i)->prev##n = 0; \ + (d).head = i; \ + (d).size++; \ +} while (0) +#define DEQ_INSERT_HEAD(d,i) DEQ_INSERT_HEAD_N(,d,i) + +#define DEQ_INSERT_TAIL_N(n,d,i) \ +do { \ + CT_ASSERT((i)->next##n == 0); \ + CT_ASSERT((i)->prev##n == 0); \ + if ((d).tail) { \ + (i)->prev##n = (d).tail; \ + (d).tail->next##n = i; \ + } else { \ + (d).head = i; \ + (i)->prev##n = 0; \ + CT_ASSERT((d).size == 0); \ + } \ + (i)->next##n = 0; \ + (d).tail = i; \ + (d).size++; \ +} while (0) +#define DEQ_INSERT_TAIL(d,i) DEQ_INSERT_TAIL_N(,d,i) + +#define DEQ_REMOVE_HEAD_N(n,d) \ +do { \ + CT_ASSERT((d).head); \ + if ((d).head) { \ + (d).scratch = (d).head; \ + (d).head = (d).head->next##n; \ + if ((d).head == 0) { \ + (d).tail = 0; \ + CT_ASSERT((d).size == 1); \ + } else \ + (d).head->prev##n = 0; \ + (d).size--; \ + (d).scratch->next##n = 0; \ + (d).scratch->prev##n = 0; \ + } \ +} while (0) +#define DEQ_REMOVE_HEAD(d) DEQ_REMOVE_HEAD_N(,d) + +#define DEQ_REMOVE_TAIL_N(n,d) \ +do { \ + CT_ASSERT((d).tail); \ + if ((d).tail) { \ + (d).scratch = (d).tail; \ + (d).tail = (d).tail->prev##n; \ + if ((d).tail == 0) { \ + (d).head = 0; \ + CT_ASSERT((d).size == 1); \ + } else \ + (d).tail->next##n = 0; \ + (d).size--; \ + (d).scratch->next##n = 0; \ + (d).scratch->prev##n = 0; \ + } \ +} while (0) +#define DEQ_REMOVE_TAIL(d) DEQ_REMOVE_TAIL_N(,d) + +#define DEQ_INSERT_AFTER_N(n,d,i,a) \ +do { \ + CT_ASSERT((i)->next##n == 0); \ + CT_ASSERT((i)->prev##n == 0); \ + CT_ASSERT(a); \ + if ((a)->next##n) \ + (a)->next##n->prev##n = (i); \ + else \ + (d).tail = (i); \ + (i)->next##n = (a)->next##n; \ + (i)->prev##n = (a); \ + (a)->next##n = (i); \ + (d).size++; \ +} while (0) +#define DEQ_INSERT_AFTER(d,i,a) DEQ_INSERT_AFTER_N(,d,i,a) + +#define DEQ_REMOVE_N(n,d,i) \ +do { \ + if ((i)->next##n) \ + (i)->next##n->prev##n = (i)->prev##n; \ + else \ + (d).tail = (i)->prev##n; \ + if ((i)->prev##n) \ + (i)->prev##n->next##n = (i)->next##n; \ + else \ + (d).head = (i)->next##n; \ + CT_ASSERT((d).size > 0); \ + (d).size--; \ + (i)->next##n = 0; \ + (i)->prev##n = 0; \ + CT_ASSERT((d).size || (!(d).head && !(d).tail)); \ +} while (0) +#define DEQ_REMOVE(d,i) DEQ_REMOVE_N(,d,i) + +#define DEQ_APPEND_N(n,d1,d2) \ +do { \ + if (!(d1).head) \ + (d1) = (d2); \ + else if ((d2).head) { \ + (d1).tail->next##n = (d2).head; \ + (d2).head->prev##n = (d1).tail; \ + (d1).tail = (d2).tail; \ + (d1).size += (d2).size; \ + } \ + DEQ_INIT(d2); \ +} while (0) +#define DEQ_APPEND(d1,d2) DEQ_APPEND_N(,d1,d2) + +#endif -- 2.11.0