Updates for rpm spec and review feedback
authorAndrew Smith <ansmith@redhat.com>
Sun, 6 May 2018 18:00:03 +0000 (14:00 -0400)
committerAndrew Smith <ansmith@redhat.com>
Sun, 6 May 2018 18:00:03 +0000 (14:00 -0400)
Makefile.am
contrib/redhat/collectd.spec
src/amqp1.c
src/utils_deq.h

index facbc97..95a0369 100644 (file)
@@ -544,7 +544,9 @@ endif
 
 if BUILD_PLUGIN_AMQP1
 pkglib_LTLIBRARIES += amqp1.la
-amqp1_la_SOURCES = src/amqp1.c
+amqp1_la_SOURCES = \
+       src/amqp1.c \
+       src/utils_deq.h
 amqp1_la_CPPFLAGS = $(AM_CPPFLAGS) $(BUILD_WITH_LIBQPIDPROTON_CPPFLAGS)
 amqp1_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBQPIDPROTON_LDFLAGS)
 amqp1_la_LIBADD = \
index d84b457..6fc8e51 100644 (file)
@@ -41,6 +41,7 @@
 # plugins enabled by default
 %define with_aggregation 0%{!?_without_aggregation:1}
 %define with_amqp 0%{!?_without_amqp:1}
+%define with_amqp1 0%{!?_without_amqp1:1}
 %define with_apache 0%{!?_without_apache:1}
 %define with_apcups 0%{!?_without_apcups:1}
 %define with_ascent 0%{!?_without_ascent:1}
@@ -277,13 +278,24 @@ every 10 seconds by default.
 
 %if %{with_amqp}
 %package amqp
-Summary:       AMQP plugin for collectd
+Summary:       AMQP 0.9 plugin for collectd
 Group:         System Environment/Daemons
 Requires:      %{name}%{?_isa} = %{version}-%{release}
 BuildRequires: librabbitmq-devel
 %description amqp
-The AMQP plugin transmits or receives values collected by collectd via the
-Advanced Message Queuing Protocol (AMQP).
+The AMQP 0.9 plugin transmits or receives values collected by collectd via the
+Advanced Message Queuing Protocol v0.9 (AMQP).
+%endif
+
+%if %{with_amqp1}
+%package amqp1
+Summary:       AMQP 1.0 plugin for collectd
+Group:         System Environment/Daemons
+Requires:      %{name}%{?_isa} = %{version}-%{release}
+BuildRequires: qpid-proton-c-devel
+%description amqp1
+The AMQP 1.0 plugin transmits or receives values collected by collectd via the
+Advanced Message Queuing Protocol v1.0 (AMQP1).
 %endif
 
 %if %{with_apache}
@@ -1015,6 +1027,12 @@ Collectd utilities
 %define _with_amqp --disable-amqp
 %endif
 
+%if %{with_amqp1}
+%define _with_amqp1 --enable-amqp1
+%else
+%define _with_amqp1 --disable-amqp1
+%endif
+
 %if %{with_apache}
 %define _with_apache --enable-apache
 %else
@@ -1888,6 +1906,7 @@ Collectd utilities
        --enable-target_v5upgrade \
        %{?_with_aggregation} \
        %{?_with_amqp} \
+       %{?_with_amqp1} \
        %{?_with_apache} \
        %{?_with_apcups} \
        %{?_with_apple_sensors} \
@@ -2399,6 +2418,11 @@ fi
 %{_libdir}/%{name}/amqp.so
 %endif
 
+%if %{with_amqp1}
+%files amqp1
+%{_libdir}/%{name}/amqp1.so
+%endif
+
 %if %{with_apache}
 %files apache
 %{_libdir}/%{name}/apache.so
index 3397f52..5a5d2b8 100644 (file)
@@ -62,7 +62,7 @@ typedef struct amqp1_config_transport_t {
   char *user;
   char *password;
   char *address;
-  int  retry_delay;
+  int retry_delay;
 } amqp1_config_transport_t;
 
 typedef struct amqp1_config_instance_t {
@@ -76,14 +76,14 @@ typedef struct amqp1_config_instance_t {
   char *postfix;
   char escape_char;
   _Bool pre_settle;
-  char send_to[128];
+  char send_to[1024];
 } amqp1_config_instance_t;
 
 DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
 
 typedef struct cd_message_t {
   DEQ_LINKS(struct cd_message_t);
-  pn_bytes_t mbuf;
+  pn_rwbytes_t mbuf;
   amqp1_config_instance_t *instance;
 } cd_message_t;
 
@@ -124,7 +124,7 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
   int event_count = 0;
   pn_delivery_t *dlv;
 
-  if (stopping){
+  if (stopping) {
     return 0;
   }
 
@@ -166,7 +166,6 @@ static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
   return event_count;
 } /* }}} int amqp1_send_out_messages */
 
-
 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
 {
   if (pn_condition_is_set(cond)) {
@@ -184,7 +183,7 @@ static bool handle(pn_event_t *event) /* {{{ */
 
   case PN_CONNECTION_INIT: {
     conn = pn_event_connection(event);
-    pn_connection_set_container(conn, transport->address);
+    pn_connection_set_container(conn, transport->name);
     pn_connection_open(conn);
     pn_session_t *ssn = pn_session(conn);
     pn_session_open(ssn);
@@ -275,7 +274,7 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
     while (engine_running && !stopping) {
       pn_event_batch_t *events = pn_proactor_wait(proactor);
       pn_event_t *e;
-      while (( e = pn_event_batch_next(events))){
+      while ((e = pn_event_batch_next(events))) {
         engine_running = handle(e);
         if (!engine_running) {
           break;
@@ -308,24 +307,39 @@ static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
   return NULL;
 } /* }}} void event_thread */
 
-static void encqueue(cd_message_t *cdm,
-                     amqp1_config_instance_t *instance) /* {{{ */
+static int encqueue(cd_message_t *cdm,
+                    amqp1_config_instance_t *instance) /* {{{ */
 {
   size_t bufsize = BUFSIZE;
   pn_data_t *body;
   pn_message_t *message;
+  int status = 0;
 
   /* encode message */
   message = pn_message();
   pn_message_set_address(message, instance->send_to);
   body = pn_message_body(message);
   pn_data_clear(body);
-  pn_data_put_binary(body, cdm->mbuf);
+  pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
   pn_data_exit(body);
 
   /* put_binary copies and stores so ok to use mbuf */
   cdm->mbuf.size = bufsize;
-  pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
+  while ((status = pn_message_encode(message, (char *)cdm->mbuf.start,
+                                     &cdm->mbuf.size)) == PN_OVERFLOW) {
+    DEBUG("amqp1 plugin: increasing message buffer size %i",
+          (int)cdm->mbuf.size);
+    cdm->mbuf.size *= 2;
+    cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size);
+  }
+
+  if (status != 0) {
+    ERROR("amqp1 plugin: error encoding message: %s",
+          pn_error_text(pn_message_error(message)));
+    pn_message_free(message);
+    cd_message_free(cdm);
+    return -1;
+  }
 
   pthread_mutex_lock(&send_lock);
   DEQ_INSERT_TAIL(out_messages, cdm);
@@ -338,7 +352,8 @@ static void encqueue(cd_message_t *cdm,
     pn_connection_wake(conn);
   }
 
-} /* }}} void encqueue */
+  return 0;
+} /* }}} int encqueue */
 
 static int amqp1_notify(notification_t const *n,
                         user_data_t *user_data) /* {{{ */
@@ -361,7 +376,7 @@ static int amqp1_notify(notification_t const *n,
 
   cdm = NEW(cd_message_t);
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
+  cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
   cdm->instance = instance;
 
   switch (instance->format) {
@@ -380,9 +395,9 @@ static int amqp1_notify(notification_t const *n,
   }
 
   /* encode message and place on outbound queue */
-  encqueue(cdm, instance);
+  status = encqueue(cdm, instance);
 
-  return 0;
+  return status;
 } /* }}} int amqp1_notify */
 
 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
@@ -406,7 +421,7 @@ static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
 
   cdm = NEW(cd_message_t);
   DEQ_ITEM_INIT(cdm);
-  cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
+  cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
   cdm->instance = instance;
 
   switch (instance->format) {
@@ -557,10 +572,19 @@ static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
     amqp1_config_instance_free(instance);
     return status;
   } else {
-    char tpname[128];
-    snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
-    snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
-             transport->address, instance->name);
+    char tpname[1024];
+    int status;
+    status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
+    if ((status < 0) || (size_t)status >= sizeof(tpname)) {
+      ERROR("amqp1 plugin: Instance name would have been truncated.");
+      return -1;
+    }
+    status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
+                      transport->address, instance->name);
+    if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
+      ERROR("amqp1 plugin: send_to address would have been truncated.");
+      return -1;
+    }
     if (instance->notify == true) {
       status = plugin_register_notification(
           tpname, amqp1_notify,
index 150864b..3182baa 100644 (file)
@@ -1,3 +1,29 @@
+/**
+ * collectd - src/utils_deq.h
+ * Copyright(c) 2017 Red Hat Inc.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Authors:
+ *   Andy Smith <ansmith@redhat.com>
+ */
+
 #ifndef utils_deq_h
 #define utils_deq_h 1
 
 #define NEW_ARRAY(t, n) (t *)malloc(sizeof(t) * (n))
 #define NEW_PTR_ARRAY(t, n) (t **)malloc(sizeof(t *) * (n))
 
-//
-// If available, use aligned_alloc for cache-line-aligned allocations. Otherwise
-// fall back to plain malloc.
-//
-#define NEW_CACHE_ALIGNED(t, p)                                                \
-  do {                                                                         \
-    if (posix_memalign(                                                        \
-            (void *)&(p), 64,                                                  \
-            (sizeof(t) + (sizeof(t) % 64 ? 64 - (sizeof(t) % 64) : 0))) != 0)  \
-      (p) = 0;                                                                 \
-  } while (0)
-
-#define ALLOC_CACHE_ALIGNED(s, p)                                              \
-  do {                                                                         \
-    if (posix_memalign((void *)&(p), 64,                                       \
-                       (s + (s % 64 ? 64 - (s % 64) : 0))) != 0)               \
-      (p) = 0;                                                                 \
-  } while (0)
-
 #define ZERO(p) memset(p, 0, sizeof(*p))
 
 #define DEQ_DECLARE(i, d)                                                      \