From: Ruben Kerkhof Date: Thu, 24 May 2018 09:06:55 +0000 (+0200) Subject: Merge pull request #2618 from ajssmith/amqp1_dev1_branch X-Git-Url: https://git.octo.it/?p=collectd.git;a=commitdiff_plain;h=d486225f89ea52d8ed2b4242eba2ad94c409f837;hp=3f4d0cbd79ca46e1b98edb10ddee3a723d69a6bb Merge pull request #2618 from ajssmith/amqp1_dev1_branch Write amqp1 plugin --- diff --git a/AUTHORS b/AUTHORS index 4df743c5..409655ac 100644 --- a/AUTHORS +++ b/AUTHORS @@ -59,6 +59,9 @@ Andreas Henriksson Andy Parkins - battery plugin: sysfs code. +Andy Smith + - AMQP 1.0 plugin. + Anthony Dewhurst - zfs_arc plugin. @@ -286,7 +289,7 @@ Scott Sanders - Write-Graphite plugin. Sebastien Pahl - - AMQP plugin. + - AMQP 0.9 plugin. Serhiy Pshyk - intel_pmu plugin diff --git a/Makefile.am b/Makefile.am index 98b71ab4..c345904d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -550,6 +550,20 @@ amqp_la_LIBADD = \ libformat_json.la endif +if BUILD_PLUGIN_AMQP1 +pkglib_LTLIBRARIES += amqp1.la +amqp1_la_SOURCES = \ + src/amqp1.c \ + src/utils_deq.h +amqp1_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBQPIDPROTON_CPPFLAGS) +amqp1_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBQPIDPROTON_LDFLAGS) +amqp1_la_LIBADD = \ + $(BUILD_WITH_LIBQPIDPROTON_LIBS) \ + libcmds.la \ + libformat_graphite.la \ + libformat_json.la +endif + if BUILD_PLUGIN_APACHE pkglib_LTLIBRARIES += apache.la apache_la_SOURCES = src/apache.c diff --git a/README b/README index 43df03b8..2210b2b9 100644 --- a/README +++ b/README @@ -465,7 +465,11 @@ Features - amqp Sends JSON-encoded data to an Advanced Message Queuing Protocol (AMQP) - server, such as RabbitMQ. + 0.9.1 server, such as RabbitMQ. + + - amqp1 + Sends JSON-encoded data to an Advanced Message Queuing Protocol (AMQP) + 1.0 server, such as Qpid Dispatch Router or Apache Artemis Broker. - csv Write to comma separated values (CSV) files. This needs lots of @@ -908,8 +912,14 @@ Prerequisites are supported. + * libqpid-proton (optional) + Used by the `amqp1' plugin for AMQP 1.0 connections, for example to + Qdrouterd. + + * librabbitmq (optional; also called “rabbitmq-c”) - Used by the `amqp' plugin for AMQP connections, for example to RabbitMQ. + Used by the `amqp' plugin for AMQP 0.9.1 connections, for example to + RabbitMQ. * librdkafka (optional; also called “rdkafka”) diff --git a/configure.ac b/configure.ac index 4be85bbf..1e31e218 100644 --- a/configure.ac +++ b/configure.ac @@ -4719,6 +4719,56 @@ if test "$with_libpython" != "xno"; then fi # }}} --with-libpython +# --with-libqpid_proton {{{ +AC_ARG_WITH([libqpid_proton], + [AS_HELP_STRING([--with-libqpid_proton@<:@=PREFIX@:>@], [Path to libqpid_proton.])], + [ + if test "x$withval" != "xno" && test "x$withval" != "xyes"; then + with_libqpid_proton_cppflags="-I$withval/include" + with_libqpid_proton_ldflags="-L$withval/lib" + with_libqpid_proton="yes" + else + with_libqpid_proton="$withval" + fi + ], + [with_libqpid_proton="yes"] +) + +if test "x$with_libqpid_proton" = "xyes"; then + SAVE_CPPFLAGS="$CPPFLAGS" + CPPFLAGS="$CPPFLAGS $with_libqpid_proton_cppflags" + + AC_CHECK_HEADERS([proton/proactor.h], + [with_libqpid_proton="yes"], + [with_libqpid_proton="no (proton/proactor.h not found)"] + ) + + CPPFLAGS="$SAVE_CPPFLAGS" +fi + +if test "x$with_libqpid_proton" = "xyes"; then + SAVE_LDFLAGS="$LDFLAGS" + LDFLAGS="$LDFLAGS $with_libqpid_proton_ldflags" + + AC_CHECK_LIB([qpid-proton], [pn_connection], + [with_libqpid_proton="yes"], + [with_libqpid_proton="no (Symbol 'pn_connection' not found)"]) + + LDFLAGS="$SAVE_LDFLAGS" +fi + +if test "x$with_libqpid_proton" = "xyes"; then + BUILD_WITH_LIBQPIDPROTON_CPPFLAGS="$with_libqpid_proton_cppflags" + BUILD_WITH_LIBQPIDPROTON_LDFLAGS="$with_libqpid_proton_ldflags" + BUILD_WITH_LIBQPIDPROTON_LIBS="-lqpid-proton" +fi + +AC_SUBST(BUILD_WITH_LIBQPIDPROTON_CPPFLAGS) +AC_SUBST(BUILD_WITH_LIBQPIDPROTON_LDFLAGS) +AC_SUBST(BUILD_WITH_LIBQPIDPROTON_LIBS) + +# }}} + # --with-librabbitmq {{{ AC_ARG_WITH([librabbitmq], [AS_HELP_STRING([--with-librabbitmq@<:@=PREFIX@:>@], [Path to librabbitmq.])], @@ -6544,6 +6594,7 @@ m4_divert_once([HELP_ENABLE], []) AC_PLUGIN([aggregation], [yes], [Aggregation plugin]) AC_PLUGIN([amqp], [$with_librabbitmq], [AMQP output plugin]) +AC_PLUGIN([amqp1], [$with_libqpid_proton], [AMQP 1.0 output plugin]) AC_PLUGIN([apache], [$with_libcurl], [Apache httpd statistics]) AC_PLUGIN([apcups], [yes], [Statistics of UPSes by APC]) AC_PLUGIN([apple_sensors], [$with_libiokit], [Apple hardware sensors]) @@ -6932,6 +6983,7 @@ AC_MSG_RESULT([ libpqos . . . . . . . $with_libpqos]) AC_MSG_RESULT([ libprotobuf . . . . . $with_libprotobuf]) AC_MSG_RESULT([ libprotobuf-c . . . . $with_libprotobuf_c]) AC_MSG_RESULT([ libpython . . . . . . $with_libpython]) +AC_MSG_RESULT([ libqpid-proton . . . $with_libqpid_proton]) AC_MSG_RESULT([ librabbitmq . . . . . $with_librabbitmq]) AC_MSG_RESULT([ libriemann-client . . $with_libriemann_client]) AC_MSG_RESULT([ librdkafka . . . . . $with_librdkafka]) @@ -6963,6 +7015,7 @@ AC_MSG_RESULT() AC_MSG_RESULT([ Modules:]) AC_MSG_RESULT([ aggregation . . . . . $enable_aggregation]) AC_MSG_RESULT([ amqp . . . . . . . $enable_amqp]) +AC_MSG_RESULT([ amqp1 . . . . . . . $enable_amqp1]) AC_MSG_RESULT([ apache . . . . . . . $enable_apache]) AC_MSG_RESULT([ apcups . . . . . . . $enable_apcups]) AC_MSG_RESULT([ apple_sensors . . . . $enable_apple_sensors]) diff --git a/contrib/redhat/collectd.spec b/contrib/redhat/collectd.spec index 752b0246..6f86b7e8 100644 --- a/contrib/redhat/collectd.spec +++ b/contrib/redhat/collectd.spec @@ -44,6 +44,7 @@ # plugins enabled by default %define with_aggregation 0%{!?_without_aggregation:1} %define with_amqp 0%{!?_without_amqp:1} +%define with_amqp1 0%{!?_without_amqp1:1} %define with_apache 0%{!?_without_apache:1} %define with_apcups 0%{!?_without_apcups:1} %define with_ascent 0%{!?_without_ascent:1} @@ -280,13 +281,24 @@ every 10 seconds by default. %if %{with_amqp} %package amqp -Summary: AMQP plugin for collectd +Summary: AMQP 0.9 plugin for collectd Group: System Environment/Daemons Requires: %{name}%{?_isa} = %{version}-%{release} BuildRequires: librabbitmq-devel %description amqp -The AMQP plugin transmits or receives values collected by collectd via the -Advanced Message Queuing Protocol (AMQP). +The AMQP 0.9 plugin transmits or receives values collected by collectd via the +Advanced Message Queuing Protocol v0.9 (AMQP). +%endif + +%if %{with_amqp1} +%package amqp1 +Summary: AMQP 1.0 plugin for collectd +Group: System Environment/Daemons +Requires: %{name}%{?_isa} = %{version}-%{release} +BuildRequires: qpid-proton-c-devel +%description amqp1 +The AMQP 1.0 plugin transmits or receives values collected by collectd via the +Advanced Message Queuing Protocol v1.0 (AMQP1). %endif %if %{with_apache} @@ -1018,6 +1030,12 @@ Collectd utilities %define _with_amqp --disable-amqp %endif +%if %{with_amqp1} +%define _with_amqp1 --enable-amqp1 +%else +%define _with_amqp1 --disable-amqp1 +%endif + %if %{with_apache} %define _with_apache --enable-apache %else @@ -1898,6 +1916,7 @@ Collectd utilities --enable-target_v5upgrade \ %{?_with_aggregation} \ %{?_with_amqp} \ + %{?_with_amqp1} \ %{?_with_apache} \ %{?_with_apcups} \ %{?_with_apple_sensors} \ @@ -2409,6 +2428,11 @@ fi %{_libdir}/%{name}/amqp.so %endif +%if %{with_amqp1} +%files amqp1 +%{_libdir}/%{name}/amqp1.so +%endif + %if %{with_apache} %files apache %{_libdir}/%{name}/apache.so diff --git a/src/amqp1.c b/src/amqp1.c new file mode 100644 index 00000000..4ba73596 --- /dev/null +++ b/src/amqp1.c @@ -0,0 +1,724 @@ +/** + * collectd - src/amqp1.c + * Copyright(c) 2017 Red Hat Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Andy Smith + */ + +#include "collectd.h" + +#include "common.h" +#include "plugin.h" +#include "utils_cmd_putval.h" +#include "utils_deq.h" +#include "utils_format_graphite.h" +#include "utils_format_json.h" +#include "utils_random.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#define BUFSIZE 8192 +#define AMQP1_FORMAT_JSON 0 +#define AMQP1_FORMAT_COMMAND 1 +#define AMQP1_FORMAT_GRAPHITE 2 + +typedef struct amqp1_config_transport_s { + DEQ_LINKS(struct amqp1_config_transport_s); + char *name; + char *host; + char *port; + char *user; + char *password; + char *address; + int retry_delay; +} amqp1_config_transport_t; + +typedef struct amqp1_config_instance_s { + DEQ_LINKS(struct amqp1_config_instance_s); + char *name; + bool notify; + uint8_t format; + unsigned int graphite_flags; + bool store_rates; + char *prefix; + char *postfix; + char escape_char; + bool pre_settle; + char send_to[1024]; +} amqp1_config_instance_t; + +DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t); + +typedef struct cd_message_s { + DEQ_LINKS(struct cd_message_s); + pn_rwbytes_t mbuf; + amqp1_config_instance_t *instance; +} cd_message_t; + +DEQ_DECLARE(cd_message_t, cd_message_list_t); + +/* + * Globals + */ +static pn_connection_t *conn = NULL; +static pn_link_t *sender = NULL; +static pn_proactor_t *proactor = NULL; +static pthread_mutex_t send_lock; +static cd_message_list_t out_messages; +static uint64_t cd_tag = 1; +static uint64_t acknowledged = 0; +static amqp1_config_transport_t *transport = NULL; +static bool stopping = false; +static int event_thread_running = 0; +static pthread_t event_thread_id; + +/* + * Functions + */ +static void cd_message_free(cd_message_t *cdm) { + free(cdm->mbuf.start); + free(cdm); +} /* }}} void cd_message_free */ + +static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */ +{ + uint64_t dtag; + cd_message_list_t to_send; + cd_message_t *cdm; + int link_credit = pn_link_credit(link); + int event_count = 0; + pn_delivery_t *dlv; + + if (stopping) { + return 0; + } + + DEQ_INIT(to_send); + + pthread_mutex_lock(&send_lock); + + if (link_credit > 0) { + dtag = cd_tag; + cdm = DEQ_HEAD(out_messages); + while (cdm) { + DEQ_REMOVE_HEAD(out_messages); + DEQ_INSERT_TAIL(to_send, cdm); + if (DEQ_SIZE(to_send) == link_credit) + break; + cdm = DEQ_HEAD(out_messages); + } + cd_tag += DEQ_SIZE(to_send); + } + + pthread_mutex_unlock(&send_lock); + + /* message is already formatted and encoded */ + cdm = DEQ_HEAD(to_send); + while (cdm) { + DEQ_REMOVE_HEAD(to_send); + dtag++; + dlv = pn_delivery(link, pn_dtag((const char *)&dtag, sizeof(dtag))); + pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size); + pn_link_advance(link); + if (cdm->instance->pre_settle == true) { + pn_delivery_settle(dlv); + } + event_count++; + cd_message_free(cdm); + cdm = DEQ_HEAD(to_send); + } + + return event_count; +} /* }}} int amqp1_send_out_messages */ + +static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */ +{ + if (pn_condition_is_set(cond)) { + ERROR("amqp1 plugin: %s: %s: %s", pn_event_type_name(pn_event_type(e)), + pn_condition_get_name(cond), pn_condition_get_description(cond)); + pn_connection_close(pn_event_connection(e)); + conn = NULL; + } +} /* }}} void check_condition */ + +static bool handle(pn_event_t *event) /* {{{ */ +{ + + switch (pn_event_type(event)) { + + case PN_CONNECTION_INIT: { + conn = pn_event_connection(event); + pn_connection_set_container(conn, transport->name); + pn_connection_open(conn); + pn_session_t *ssn = pn_session(conn); + pn_session_open(ssn); + sender = pn_sender(ssn, "cd-sender"); + pn_link_set_snd_settle_mode(sender, PN_SND_MIXED); + pn_link_open(sender); + break; + } + + case PN_LINK_FLOW: { + /* peer has given us credit, send outbound messages */ + amqp1_send_out_messages(sender); + break; + } + + case PN_DELIVERY: { + /* acknowledgement from peer that a message was delivered */ + pn_delivery_t *dlv = pn_event_delivery(event); + if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) { + pn_delivery_settle(dlv); + acknowledged++; + } + break; + } + + case PN_CONNECTION_WAKE: { + if (!stopping) { + amqp1_send_out_messages(sender); + } + break; + } + + case PN_TRANSPORT_CLOSED: { + check_condition(event, pn_transport_condition(pn_event_transport(event))); + break; + } + + case PN_CONNECTION_REMOTE_CLOSE: { + check_condition(event, + pn_session_remote_condition(pn_event_session(event))); + pn_connection_close(pn_event_connection(event)); + break; + } + + case PN_SESSION_REMOTE_CLOSE: { + check_condition(event, + pn_session_remote_condition(pn_event_session(event))); + pn_connection_close(pn_event_connection(event)); + break; + } + + case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: { + check_condition(event, pn_link_remote_condition(pn_event_link(event))); + pn_connection_close(pn_event_connection(event)); + break; + } + + case PN_PROACTOR_INACTIVE: { + return false; + } + + default: + break; + } + return true; +} /* }}} bool handle */ + +static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */ +{ + char addr[PN_MAX_ADDR]; + cd_message_t *cdm; + + /* setup proactor */ + proactor = pn_proactor(); + pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port); + + while (!stopping) { + /* make connection */ + conn = pn_connection(); + if (transport->user != NULL) { + pn_connection_set_user(conn, transport->user); + pn_connection_set_password(conn, transport->password); + } + pn_proactor_connect(proactor, conn, addr); + + bool engine_running = true; + while (engine_running && !stopping) { + pn_event_batch_t *events = pn_proactor_wait(proactor); + pn_event_t *e; + while ((e = pn_event_batch_next(events))) { + engine_running = handle(e); + if (!engine_running) { + break; + } + } + pn_proactor_done(proactor, events); + } + + pn_proactor_release_connection(conn); + + DEBUG("amqp1 plugin: retrying connection"); + int delay = transport->retry_delay; + while (delay-- > 0 && !stopping) { + sleep(1.0); + } + } + + pn_proactor_disconnect(proactor, NULL); + + /* Free the remaining out_messages */ + cdm = DEQ_HEAD(out_messages); + while (cdm) { + DEQ_REMOVE_HEAD(out_messages); + cd_message_free(cdm); + cdm = DEQ_HEAD(out_messages); + } + + event_thread_running = 0; + + return NULL; +} /* }}} void event_thread */ + +static int encqueue(cd_message_t *cdm, + amqp1_config_instance_t *instance) /* {{{ */ +{ + size_t bufsize = BUFSIZE; + pn_data_t *body; + pn_message_t *message; + int status = 0; + + /* encode message */ + message = pn_message(); + pn_message_set_address(message, instance->send_to); + body = pn_message_body(message); + pn_data_clear(body); + pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start)); + pn_data_exit(body); + + /* put_binary copies and stores so ok to use mbuf */ + cdm->mbuf.size = bufsize; + while ((status = pn_message_encode(message, (char *)cdm->mbuf.start, + &cdm->mbuf.size)) == PN_OVERFLOW) { + DEBUG("amqp1 plugin: increasing message buffer size %i", + (int)cdm->mbuf.size); + cdm->mbuf.size *= 2; + cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size); + } + + if (status != 0) { + ERROR("amqp1 plugin: error encoding message: %s", + pn_error_text(pn_message_error(message))); + pn_message_free(message); + cd_message_free(cdm); + return -1; + } + + pthread_mutex_lock(&send_lock); + DEQ_INSERT_TAIL(out_messages, cdm); + pthread_mutex_unlock(&send_lock); + + pn_message_free(message); + + /* activate the sender */ + if (conn != NULL) { + pn_connection_wake(conn); + } + + return 0; +} /* }}} int encqueue */ + +static int amqp1_notify(notification_t const *n, + user_data_t *user_data) /* {{{ */ +{ + amqp1_config_instance_t *instance; + int status = 0; + size_t bfree = BUFSIZE; + size_t bfill = 0; + cd_message_t *cdm; + size_t bufsize = BUFSIZE; + + if ((n == NULL) || (user_data == NULL)) + return EINVAL; + + instance = user_data->data; + + if (instance->notify != true) { + ERROR("amqp1 plugin: write notification failed"); + } + + cdm = (cd_message_t *)malloc(sizeof(cd_message_t)); + DEQ_ITEM_INIT(cdm); + cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize)); + cdm->instance = instance; + + switch (instance->format) { + case AMQP1_FORMAT_JSON: + format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree); + status = format_json_notification((char *)cdm->mbuf.start, bufsize, n); + if (status != 0) { + ERROR("amqp1 plugin: formatting notification failed"); + return status; + } + cdm->mbuf.size = strlen(cdm->mbuf.start); + break; + default: + ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format); + return -1; + } + + /* encode message and place on outbound queue */ + status = encqueue(cdm, instance); + + return status; +} /* }}} int amqp1_notify */ + +static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */ + user_data_t *user_data) { + amqp1_config_instance_t *instance; + int status = 0; + size_t bfree = BUFSIZE; + size_t bfill = 0; + cd_message_t *cdm; + size_t bufsize = BUFSIZE; + + if ((ds == NULL) || (vl == NULL) || (transport == NULL) || + (user_data == NULL)) + return EINVAL; + + instance = user_data->data; + + if (instance->notify != false) { + ERROR("amqp1 plugin: write failed"); + } + + cdm = (cd_message_t *)malloc(sizeof(cd_message_t)); + DEQ_ITEM_INIT(cdm); + cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize)); + cdm->instance = instance; + + switch (instance->format) { + case AMQP1_FORMAT_COMMAND: + status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl); + if (status != 0) { + ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status); + return status; + } + cdm->mbuf.size = strlen(cdm->mbuf.start); + break; + case AMQP1_FORMAT_JSON: + format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree); + format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl, + instance->store_rates); + format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree); + cdm->mbuf.size = strlen(cdm->mbuf.start); + break; + case AMQP1_FORMAT_GRAPHITE: + status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl, + instance->prefix, instance->postfix, + instance->escape_char, instance->graphite_flags); + if (status != 0) { + ERROR("amqp1 plugin: format_graphite failed with status %i.", status); + return status; + } + cdm->mbuf.size = strlen(cdm->mbuf.start); + break; + default: + ERROR("amqp1 plugin: Invalid write format (%i).", instance->format); + return -1; + } + + /* encode message and place on outboud queue */ + encqueue(cdm, instance); + + return 0; +} /* }}} int amqp1_write */ + +static void amqp1_config_transport_free(void *ptr) /* {{{ */ +{ + amqp1_config_transport_t *transport = ptr; + + if (transport == NULL) + return; + + sfree(transport->name); + sfree(transport->host); + sfree(transport->user); + sfree(transport->password); + sfree(transport->address); + + sfree(transport); +} /* }}} void amqp1_config_transport_free */ + +static void amqp1_config_instance_free(void *ptr) /* {{{ */ +{ + amqp1_config_instance_t *instance = ptr; + + if (instance == NULL) + return; + + sfree(instance->name); + sfree(instance->prefix); + sfree(instance->postfix); + + sfree(instance); +} /* }}} void amqp1_config_instance_free */ + +static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */ +{ + int status = 0; + char *key = NULL; + amqp1_config_instance_t *instance; + + instance = calloc(1, sizeof(*instance)); + if (instance == NULL) { + ERROR("amqp1 plugin: calloc failed."); + return ENOMEM; + } + + status = cf_util_get_string(ci, &instance->name); + if (status != 0) { + sfree(instance); + return status; + } + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("PreSettle", child->key) == 0) + status = cf_util_get_boolean(child, &instance->pre_settle); + else if (strcasecmp("Notify", child->key) == 0) + status = cf_util_get_boolean(child, &instance->notify); + else if (strcasecmp("Format", child->key) == 0) { + status = cf_util_get_string(child, &key); + if (status != 0) + return status; + /* TODO: goto errout */ + // goto errout; + assert(key != NULL); + if (strcasecmp(key, "Command") == 0) { + instance->format = AMQP1_FORMAT_COMMAND; + } else if (strcasecmp(key, "Graphite") == 0) { + instance->format = AMQP1_FORMAT_GRAPHITE; + } else if (strcasecmp(key, "JSON") == 0) { + instance->format = AMQP1_FORMAT_JSON; + } else { + WARNING("amqp1 plugin: Invalid format string: %s", key); + } + sfree(key); + } else if (strcasecmp("StoreRates", child->key) == 0) + status = cf_util_get_boolean(child, &instance->store_rates); + else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0) + status = cf_util_get_flag(child, &instance->graphite_flags, + GRAPHITE_SEPARATE_INSTANCES); + else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0) + status = cf_util_get_flag(child, &instance->graphite_flags, + GRAPHITE_ALWAYS_APPEND_DS); + else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0) + status = cf_util_get_flag(child, &instance->graphite_flags, + GRAPHITE_PRESERVE_SEPARATOR); + else if (strcasecmp("GraphitePrefix", child->key) == 0) + status = cf_util_get_string(child, &instance->prefix); + else if (strcasecmp("GraphitePostfix", child->key) == 0) + status = cf_util_get_string(child, &instance->postfix); + else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) { + char *tmp_buff = NULL; + status = cf_util_get_string(child, &tmp_buff); + if (status == 0) { + if (strlen(tmp_buff) > 1) + WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles " + "only one character. Others will be ignored."); + instance->escape_char = tmp_buff[0]; + } + sfree(tmp_buff); + } else + WARNING("amqp1 plugin: Ignoring unknown " + "instance configuration option " + "\%s\".", + child->key); + if (status != 0) + break; + } + + if (status != 0) { + amqp1_config_instance_free(instance); + return status; + } else { + char tpname[DATA_MAX_NAME_LEN]; + status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name); + if ((status < 0) || (size_t)status >= sizeof(tpname)) { + ERROR("amqp1 plugin: Instance name would have been truncated."); + return -1; + } + status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s", + transport->address, instance->name); + if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) { + ERROR("amqp1 plugin: send_to address would have been truncated."); + return -1; + } + if (instance->notify == true) { + status = plugin_register_notification( + tpname, amqp1_notify, + &(user_data_t){ + .data = instance, .free_func = amqp1_config_instance_free, + }); + } else { + status = plugin_register_write( + tpname, amqp1_write, + &(user_data_t){ + .data = instance, .free_func = amqp1_config_instance_free, + }); + } + + if (status != 0) { + amqp1_config_instance_free(instance); + } + } + + return status; +} /* }}} int amqp1_config_instance */ + +static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */ +{ + int status = 0; + + transport = calloc(1, sizeof(*transport)); + if (transport == NULL) { + ERROR("amqp1 plugin: calloc failed."); + return ENOMEM; + } + + /* Initialize transport configuration {{{ */ + transport->retry_delay = 1; + + status = cf_util_get_string(ci, &transport->name); + if (status != 0) { + sfree(transport); + return status; + } + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Host", child->key) == 0) + status = cf_util_get_string(child, &transport->host); + else if (strcasecmp("Port", child->key) == 0) + status = cf_util_get_string(child, &transport->port); + else if (strcasecmp("User", child->key) == 0) + status = cf_util_get_string(child, &transport->user); + else if (strcasecmp("Password", child->key) == 0) + status = cf_util_get_string(child, &transport->password); + else if (strcasecmp("Address", child->key) == 0) + status = cf_util_get_string(child, &transport->address); + else if (strcasecmp("RetryDelay", child->key) == 0) + status = cf_util_get_int(child, &transport->retry_delay); + else if (strcasecmp("Instance", child->key) == 0) + amqp1_config_instance(child); + else + WARNING("amqp1 plugin: Ignoring unknown " + "transport configuration option " + "\%s\".", + child->key); + + if (status != 0) + break; + } + + if (status != 0) { + amqp1_config_transport_free(transport); + } + return status; +} /* }}} int amqp1_config_transport */ + +static int amqp1_config(oconfig_item_t *ci) /* {{{ */ +{ + + for (int i = 0; i < ci->children_num; i++) { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp("Transport", child->key) == 0) + amqp1_config_transport(child); + else + WARNING("amqp1 plugin: Ignoring unknown config option \%s\".", + child->key); + } + + return 0; +} /* }}} int amqp1_config */ + +static int amqp1_init(void) /* {{{ */ +{ + int status; + char errbuf[1024]; + + if (transport == NULL) { + ERROR("amqp1: init failed, no transport configured"); + return -1; + } + + if (proactor == NULL) { + pthread_mutex_init(&send_lock, /* attr = */ NULL); + /* start_thread */ + status = + plugin_thread_create(&event_thread_id, NULL /* no attributes */, + event_thread, NULL /* no argument */, "handle"); + if (status != 0) { + ERROR("amqp1 plugin: pthread_create failed: %s", + sstrerror(errno, errbuf, sizeof(errbuf))); + } else { + event_thread_running = 1; + } + } + return 0; +} /* }}} int amqp1_init */ + +static int amqp1_shutdown(void) /* {{{ */ +{ + stopping = true; + + /* Stop the proactor thread */ + if (event_thread_running == 1) { + DEBUG("amqp1 plugin: Shutting down proactor thread."); + pn_connection_wake(conn); + } + pthread_join(event_thread_id, NULL /* no return value */); + memset(&event_thread_id, 0, sizeof(event_thread_id)); + + DEBUG("amqp1 plugin: proactor thread exited."); + + if (transport != NULL) { + amqp1_config_transport_free(transport); + } + + return 0; +} /* }}} int amqp1_shutdown */ + +void module_register(void) { + plugin_register_complex_config("amqp1", amqp1_config); + plugin_register_init("amqp1", amqp1_init); + plugin_register_shutdown("amqp1", amqp1_shutdown); +} /* void module_register */ diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 96c26303..30791bd6 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -90,6 +90,7 @@ #@BUILD_PLUGIN_AGGREGATION_TRUE@LoadPlugin aggregation #@BUILD_PLUGIN_AMQP_TRUE@LoadPlugin amqp +#@BUILD_PLUGIN_AMQP1_TRUE@LoadPlugin amqp1 #@BUILD_PLUGIN_APACHE_TRUE@LoadPlugin apache #@BUILD_PLUGIN_APCUPS_TRUE@LoadPlugin apcups #@BUILD_PLUGIN_APPLE_SENSORS_TRUE@LoadPlugin apple_sensors @@ -269,6 +270,29 @@ # # +# +# +# Host "localhost" +# Port "5672" +# User "guest" +# Password "guest" +# Address "collectd" +# RetryDelay 1 +# +# Format JSON +# PreSettle false +# +# +# Format JSON +# PreSettle true +# +# +# Format JSON +# PreSettle false +# +# +# + # # # URL "http://localhost/status?auto" diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 17bc680f..f2bffa60 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -530,9 +530,9 @@ are disabled by default. =head2 Plugin C The I can be used to communicate with other instances of -I or third party applications using an AMQP message broker. Values -are sent to or received from the broker, which handles routing, queueing and -possibly filtering out messages. +I or third party applications using an AMQP 0.9.1 message broker. +Values are sent to or received from the broker, which handles routing, +queueing and possibly filtering out messages. B @@ -738,6 +738,171 @@ is preserved, i.e. passed through. =back +=head2 Plugin C + +The I can be used to communicate with other instances of +I or third party applications using an AMQP 1.0 message +intermediary. Metric values or notifications are sent to the +messaging intermediary which may handle direct messaging or +queue based transfer. + +B + + + # Send values to an AMQP 1.0 intermediary + + Host "localhost" + Port "5672" + User "guest" + Password "guest" + Address "collectd" +# RetryDelay 1 + + Format "command" + PreSettle false + Notify false + # StoreRates false + # GraphitePrefix "collectd." + # GraphiteEscapeChar "_" + # GraphiteSeparateInstances false + # GraphiteAlwaysAppendDS false + # GraphitePreserveSeparator false + + + + +The plugin's configuration consists of a I that configures +communications to the AMQP 1.0 messaging bus and one or more I +corresponding to metric or event publishers to the messaging system. + +The address in the I block concatenated with the name given in the +I block starting tag will be used as the send-to address for +communications over the messaging link. + +The following options are accepted within each I block: + +=over 4 + +=item B I + +Hostname or IP-address of the AMQP 1.0 intermediary. Defaults to the +default behavior of the underlying communications library, +I, which is "localhost". + +=item B I + +Service name or port number on which the AMQP 1.0 intermediary accepts +connections. This argument must be a string, even if the numeric form +is used. Defaults to "5672". + +=item B I + +=item B I + +Credentials used to authenticate to the AMQP 1.0 intermediary. By +default "guest"/"guest" is used. + +=item B
I
+ +This option specifies the prefix for the send-to value in the message. +By default, "collectd" will be used. + +=item B I + +When the AMQP1 connection is lost, defines the time in seconds to wait +before attempting to reconnect. Defaults to 1, which implies attempt +to reconnect at 1 second intervals. + +=back + +The following options are accepted within each I block: + +=over 4 + +=item B B|B|B + +Selects the format in which messages are sent to the intermediary. If set to +B (the default), values are sent as C commands which are +identical to the syntax used by the I and I. In this +case, the C header field will be set to C. + +If set to B, the values are encoded in the I, +an easy and straight forward exchange format. The C header field +will be set to C. + +If set to B, values are encoded in the I format, which is +" \n". The C header field will be set to +C. + +A subscribing client I use the C header field to +determine how to decode the values. + +=item B B|B + +If set to B (the default), the plugin will wait for a message +acknowledgement from the messaging bus before sending the next +message. This indicates transfer of ownership to the messaging +system. If set to B, the plugin will not wait for a message +acknowledgement and the message may be dropped prior to transfer of +ownership. + +=item B B|B + +If set to B (the default), the plugin will service the +instance write call back as a value list. If set to B the +plugin will service the instance as a write notification callback +for alert formatting. + +=item B B|B + +Determines whether or not C, C and C data sources +are converted to a I (i.e. a C value). If set to B (the +default), no conversion is performed. Otherwise the conversion is performed +using the internal value cache. + +Please note that currently this option is only used if the B option has +been set to B. + +=item B + +A prefix can be added in the metric name when outputting in the I format. +It's added before the I name. +Metric name will be "" + +=item B + +A postfix can be added in the metric name when outputting in the I format. +It's added after the I name. +Metric name will be "" + +=item B + +Specify a character to replace dots (.) in the host part of the metric name. +In I metric name, dots are used as separators between different +metric parts (host, plugin, type). +Default is "_" (I). + +=item B B|B + +If set to B, the plugin instance and type instance will be in their own +path component, for example C. If set to B (the +default), the plugin and plugin instance (and likewise the type and type +instance) are put into one component, for example C. + +=item B B|B + +If set to B, append the name of the I (DS) to the "metric" +identifier. If set to B (the default), this is only done when there is +more than one DS. + +=item B B|B + +If set to B (the default) the C<.> (dot) character is replaced with +I. Otherwise, if set to B, the C<.> (dot) character +is preserved, i.e. passed through. + +=back + =head2 Plugin C To configure the C-plugin you first need to configure the Apache diff --git a/src/utils_deq.h b/src/utils_deq.h new file mode 100644 index 00000000..3182baae --- /dev/null +++ b/src/utils_deq.h @@ -0,0 +1,214 @@ +/** + * collectd - src/utils_deq.h + * Copyright(c) 2017 Red Hat Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + * + * Authors: + * Andy Smith + */ + +#ifndef utils_deq_h +#define utils_deq_h 1 + +#include +#include +#include + +#define CT_ASSERT(exp) \ + { assert(exp); } + +#define NEW(t) (t *)malloc(sizeof(t)) +#define NEW_ARRAY(t, n) (t *)malloc(sizeof(t) * (n)) +#define NEW_PTR_ARRAY(t, n) (t **)malloc(sizeof(t *) * (n)) + +#define ZERO(p) memset(p, 0, sizeof(*p)) + +#define DEQ_DECLARE(i, d) \ + typedef struct { \ + i *head; \ + i *tail; \ + i *scratch; \ + size_t size; \ + } d + +#define DEQ_LINKS_N(n, t) \ + t *prev##n; \ + t *next##n +#define DEQ_LINKS(t) DEQ_LINKS_N(, t) +#define DEQ_EMPTY \ + { 0, 0, 0, 0 } + +#define DEQ_INIT(d) \ + do { \ + (d).head = 0; \ + (d).tail = 0; \ + (d).scratch = 0; \ + (d).size = 0; \ + } while (0) +#define DEQ_IS_EMPTY(d) ((d).head == 0) +#define DEQ_ITEM_INIT_N(n, i) \ + do { \ + (i)->next##n = 0; \ + (i)->prev##n = 0; \ + } while (0) +#define DEQ_ITEM_INIT(i) DEQ_ITEM_INIT_N(, i) +#define DEQ_HEAD(d) ((d).head) +#define DEQ_TAIL(d) ((d).tail) +#define DEQ_SIZE(d) ((d).size) +#define DEQ_NEXT_N(n, i) (i)->next##n +#define DEQ_NEXT(i) DEQ_NEXT_N(, i) +#define DEQ_PREV_N(n, i) (i)->prev##n +#define DEQ_PREV(i) DEQ_PREV_N(, i) +#define DEQ_MOVE(d1, d2) \ + do { \ + d2 = d1; \ + DEQ_INIT(d1); \ + } while (0) +/** + *@pre ptr points to first element of deq + *@post ptr points to first element of deq that passes test, or 0. Test should + *involve ptr. + */ +#define DEQ_FIND_N(n, ptr, test) \ + while ((ptr) && !(test)) \ + ptr = DEQ_NEXT_N(n, ptr); +#define DEQ_FIND(ptr, test) DEQ_FIND_N(, ptr, test) + +#define DEQ_INSERT_HEAD_N(n, d, i) \ + do { \ + CT_ASSERT((i)->next##n == 0); \ + CT_ASSERT((i)->prev##n == 0); \ + if ((d).head) { \ + (i)->next##n = (d).head; \ + (d).head->prev##n = i; \ + } else { \ + (d).tail = i; \ + (i)->next##n = 0; \ + CT_ASSERT((d).size == 0); \ + } \ + (i)->prev##n = 0; \ + (d).head = i; \ + (d).size++; \ + } while (0) +#define DEQ_INSERT_HEAD(d, i) DEQ_INSERT_HEAD_N(, d, i) + +#define DEQ_INSERT_TAIL_N(n, d, i) \ + do { \ + CT_ASSERT((i)->next##n == 0); \ + CT_ASSERT((i)->prev##n == 0); \ + if ((d).tail) { \ + (i)->prev##n = (d).tail; \ + (d).tail->next##n = i; \ + } else { \ + (d).head = i; \ + (i)->prev##n = 0; \ + CT_ASSERT((d).size == 0); \ + } \ + (i)->next##n = 0; \ + (d).tail = i; \ + (d).size++; \ + } while (0) +#define DEQ_INSERT_TAIL(d, i) DEQ_INSERT_TAIL_N(, d, i) + +#define DEQ_REMOVE_HEAD_N(n, d) \ + do { \ + CT_ASSERT((d).head); \ + if ((d).head) { \ + (d).scratch = (d).head; \ + (d).head = (d).head->next##n; \ + if ((d).head == 0) { \ + (d).tail = 0; \ + CT_ASSERT((d).size == 1); \ + } else \ + (d).head->prev##n = 0; \ + (d).size--; \ + (d).scratch->next##n = 0; \ + (d).scratch->prev##n = 0; \ + } \ + } while (0) +#define DEQ_REMOVE_HEAD(d) DEQ_REMOVE_HEAD_N(, d) + +#define DEQ_REMOVE_TAIL_N(n, d) \ + do { \ + CT_ASSERT((d).tail); \ + if ((d).tail) { \ + (d).scratch = (d).tail; \ + (d).tail = (d).tail->prev##n; \ + if ((d).tail == 0) { \ + (d).head = 0; \ + CT_ASSERT((d).size == 1); \ + } else \ + (d).tail->next##n = 0; \ + (d).size--; \ + (d).scratch->next##n = 0; \ + (d).scratch->prev##n = 0; \ + } \ + } while (0) +#define DEQ_REMOVE_TAIL(d) DEQ_REMOVE_TAIL_N(, d) + +#define DEQ_INSERT_AFTER_N(n, d, i, a) \ + do { \ + CT_ASSERT((i)->next##n == 0); \ + CT_ASSERT((i)->prev##n == 0); \ + CT_ASSERT(a); \ + if ((a)->next##n) \ + (a)->next##n->prev##n = (i); \ + else \ + (d).tail = (i); \ + (i)->next##n = (a)->next##n; \ + (i)->prev##n = (a); \ + (a)->next##n = (i); \ + (d).size++; \ + } while (0) +#define DEQ_INSERT_AFTER(d, i, a) DEQ_INSERT_AFTER_N(, d, i, a) + +#define DEQ_REMOVE_N(n, d, i) \ + do { \ + if ((i)->next##n) \ + (i)->next##n->prev##n = (i)->prev##n; \ + else \ + (d).tail = (i)->prev##n; \ + if ((i)->prev##n) \ + (i)->prev##n->next##n = (i)->next##n; \ + else \ + (d).head = (i)->next##n; \ + CT_ASSERT((d).size > 0); \ + (d).size--; \ + (i)->next##n = 0; \ + (i)->prev##n = 0; \ + CT_ASSERT((d).size || (!(d).head && !(d).tail)); \ + } while (0) +#define DEQ_REMOVE(d, i) DEQ_REMOVE_N(, d, i) + +#define DEQ_APPEND_N(n, d1, d2) \ + do { \ + if (!(d1).head) \ + (d1) = (d2); \ + else if ((d2).head) { \ + (d1).tail->next##n = (d2).head; \ + (d2).head->prev##n = (d1).tail; \ + (d1).tail = (d2).tail; \ + (d1).size += (d2).size; \ + } \ + DEQ_INIT(d2); \ + } while (0) +#define DEQ_APPEND(d1, d2) DEQ_APPEND_N(, d1, d2) + +#endif