Merge pull request #3329 from efuss/fix-3311
[collectd.git] / src / amqp.c
index c54a1e0..2077d57 100644 (file)
 
 #include "collectd.h"
 
-#include "common.h"
 #include "plugin.h"
-#include "utils_cmd_putval.h"
-#include "utils_format_graphite.h"
-#include "utils_format_json.h"
+#include "utils/cmds/putval.h"
+#include "utils/common/common.h"
+#include "utils/format_graphite/format_graphite.h"
+#include "utils/format_json/format_json.h"
 
 #include <amqp.h>
 #include <amqp_framing.h>
@@ -66,7 +66,7 @@ int amqp_socket_close(amqp_socket_t *);
  * Data types
  */
 struct camqp_config_s {
-  _Bool publish;
+  bool publish;
   char *name;
 
   char *host;
@@ -83,7 +83,7 @@ struct camqp_config_s {
 
   /* publish only */
   uint8_t delivery_mode;
-  _Bool store_rates;
+  bool store_rates;
   int format;
   /* publish & graphite format only */
   char *prefix;
@@ -94,8 +94,8 @@ struct camqp_config_s {
   /* subscribe only */
   char *exchange_type;
   char *queue;
-  _Bool queue_durable;
-  _Bool queue_auto_delete;
+  bool queue_durable;
+  bool queue_auto_delete;
 
   amqp_connection_state_t connection;
   pthread_mutex_t lock;
@@ -111,9 +111,9 @@ static const char *def_user = "guest";
 static const char *def_password = "guest";
 static const char *def_exchange = "amq.fanout";
 
-static pthread_t *subscriber_threads = NULL;
-static size_t subscriber_threads_num = 0;
-static _Bool subscriber_threads_running = 1;
+static pthread_t *subscriber_threads;
+static size_t subscriber_threads_num;
+static bool subscriber_threads_running = true;
 
 #define CONF(c, f) (((c)->f != NULL) ? (c)->f : def_##f)
 
@@ -164,28 +164,28 @@ static char *camqp_bytes_cstring(amqp_bytes_t *in) /* {{{ */
   char *ret;
 
   if ((in == NULL) || (in->bytes == NULL))
-    return (NULL);
+    return NULL;
 
   ret = malloc(in->len + 1);
   if (ret == NULL)
-    return (NULL);
+    return NULL;
 
   memcpy(ret, in->bytes, in->len);
   ret[in->len] = 0;
 
-  return (ret);
+  return ret;
 } /* }}} char *camqp_bytes_cstring */
 
-static _Bool camqp_is_error(camqp_config_t *conf) /* {{{ */
+static bool camqp_is_error(camqp_config_t *conf) /* {{{ */
 {
   amqp_rpc_reply_t r;
 
   r = amqp_get_rpc_reply(conf->connection);
   if (r.reply_type == AMQP_RESPONSE_NORMAL)
-    return (0);
+    return false;
 
-  return (1);
-} /* }}} _Bool camqp_is_error */
+  return true;
+} /* }}} bool camqp_is_error */
 
 static char *camqp_strerror(camqp_config_t *conf, /* {{{ */
                             char *buffer, size_t buffer_size) {
@@ -204,10 +204,10 @@ static char *camqp_strerror(camqp_config_t *conf, /* {{{ */
   case AMQP_RESPONSE_LIBRARY_EXCEPTION:
 #if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
     if (r.library_errno)
-      return (sstrerror(r.library_errno, buffer, buffer_size));
+      return sstrerror(r.library_errno, buffer, buffer_size);
 #else
     if (r.library_error)
-      return (sstrerror(r.library_error, buffer, buffer_size));
+      return sstrerror(r.library_error, buffer, buffer_size);
 #endif
     else
       sstrncpy(buffer, "End of stream", buffer_size);
@@ -236,7 +236,7 @@ static char *camqp_strerror(camqp_config_t *conf, /* {{{ */
     ssnprintf(buffer, buffer_size, "Unknown reply type %i", (int)r.reply_type);
   }
 
-  return (buffer);
+  return buffer;
 } /* }}} char *camqp_strerror */
 
 #if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
@@ -245,7 +245,7 @@ static int camqp_create_exchange(camqp_config_t *conf) /* {{{ */
   amqp_exchange_declare_ok_t *ed_ret;
 
   if (conf->exchange_type == NULL)
-    return (0);
+    return 0;
 
   ed_ret = amqp_exchange_declare(
       conf->connection,
@@ -261,14 +261,14 @@ static int camqp_create_exchange(camqp_config_t *conf) /* {{{ */
     ERROR("amqp plugin: amqp_exchange_declare failed: %s",
           camqp_strerror(conf, errbuf, sizeof(errbuf)));
     camqp_close_connection(conf);
-    return (-1);
+    return -1;
   }
 
   INFO("amqp plugin: Successfully created exchange \"%s\" "
        "with type \"%s\".",
        conf->exchange, conf->exchange_type);
 
-  return (0);
+  return 0;
 } /* }}} int camqp_create_exchange */
 #else
 static int camqp_create_exchange(camqp_config_t *conf) /* {{{ */
@@ -278,7 +278,7 @@ static int camqp_create_exchange(camqp_config_t *conf) /* {{{ */
   struct amqp_table_entry_t_ argument_table_entries[1];
 
   if (conf->exchange_type == NULL)
-    return (0);
+    return 0;
 
   /* Valid arguments: "auto_delete", "internal" */
   argument_table.num_entries = STATIC_ARRAY_SIZE(argument_table_entries);
@@ -304,14 +304,14 @@ static int camqp_create_exchange(camqp_config_t *conf) /* {{{ */
     ERROR("amqp plugin: amqp_exchange_declare failed: %s",
           camqp_strerror(conf, errbuf, sizeof(errbuf)));
     camqp_close_connection(conf);
-    return (-1);
+    return -1;
   }
 
   INFO("amqp plugin: Successfully created exchange \"%s\" "
        "with type \"%s\".",
        conf->exchange, conf->exchange_type);
 
-  return (0);
+  return 0;
 } /* }}} int camqp_create_exchange */
 #endif
 
@@ -320,20 +320,21 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */
   amqp_queue_declare_ok_t *qd_ret;
   amqp_basic_consume_ok_t *cm_ret;
 
-  qd_ret = amqp_queue_declare(conf->connection,
-                              /* channel     = */ CAMQP_CHANNEL,
-                              /* queue       = */ (conf->queue != NULL)
-                                  ? amqp_cstring_bytes(conf->queue)
-                                  : AMQP_EMPTY_BYTES,
-                              /* passive     = */ 0,
-                              /* durable     = */ conf->queue_durable,
-                              /* exclusive   = */ 0,
-                              /* auto_delete = */ conf->queue_auto_delete,
-                              /* arguments   = */ AMQP_EMPTY_TABLE);
+  qd_ret =
+      amqp_queue_declare(conf->connection,
+                         /* channel     = */ CAMQP_CHANNEL,
+                         /* queue       = */
+                         (conf->queue != NULL) ? amqp_cstring_bytes(conf->queue)
+                                               : AMQP_EMPTY_BYTES,
+                         /* passive     = */ 0,
+                         /* durable     = */ conf->queue_durable,
+                         /* exclusive   = */ 0,
+                         /* auto_delete = */ conf->queue_auto_delete,
+                         /* arguments   = */ AMQP_EMPTY_TABLE);
   if (qd_ret == NULL) {
     ERROR("amqp plugin: amqp_queue_declare failed.");
     camqp_close_connection(conf);
-    return (-1);
+    return -1;
   }
 
   if (conf->queue == NULL) {
@@ -341,7 +342,7 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */
     if (conf->queue == NULL) {
       ERROR("amqp plugin: camqp_bytes_cstring failed.");
       camqp_close_connection(conf);
-      return (-1);
+      return -1;
     }
 
     INFO("amqp plugin: Created queue \"%s\".", conf->queue);
@@ -353,21 +354,21 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */
     amqp_queue_bind_ok_t *qb_ret;
 
     assert(conf->queue != NULL);
-    qb_ret =
-        amqp_queue_bind(conf->connection,
-                        /* channel     = */ CAMQP_CHANNEL,
-                        /* queue       = */ amqp_cstring_bytes(conf->queue),
-                        /* exchange    = */ amqp_cstring_bytes(conf->exchange),
-                        /* routing_key = */ (conf->routing_key != NULL)
-                            ? amqp_cstring_bytes(conf->routing_key)
-                            : AMQP_EMPTY_BYTES,
-                        /* arguments   = */ AMQP_EMPTY_TABLE);
+    qb_ret = amqp_queue_bind(
+        conf->connection,
+        /* channel     = */ CAMQP_CHANNEL,
+        /* queue       = */ amqp_cstring_bytes(conf->queue),
+        /* exchange    = */ amqp_cstring_bytes(conf->exchange),
+        /* routing_key = */
+        (conf->routing_key != NULL) ? amqp_cstring_bytes(conf->routing_key)
+                                    : AMQP_EMPTY_BYTES,
+        /* arguments   = */ AMQP_EMPTY_TABLE);
     if ((qb_ret == NULL) && camqp_is_error(conf)) {
       char errbuf[1024];
       ERROR("amqp plugin: amqp_queue_bind failed: %s",
             camqp_strerror(conf, errbuf, sizeof(errbuf)));
       camqp_close_connection(conf);
-      return (-1);
+      return -1;
     }
 
     DEBUG("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".",
@@ -388,15 +389,15 @@ static int camqp_setup_queue(camqp_config_t *conf) /* {{{ */
     ERROR("amqp plugin: amqp_basic_consume failed: %s",
           camqp_strerror(conf, errbuf, sizeof(errbuf)));
     camqp_close_connection(conf);
-    return (-1);
+    return -1;
   }
 
-  return (0);
+  return 0;
 } /* }}} int camqp_setup_queue */
 
 static int camqp_connect(camqp_config_t *conf) /* {{{ */
 {
-  static time_t last_connect_time = 0;
+  static time_t last_connect_time;
 
   amqp_rpc_reply_t reply;
   int status;
@@ -407,14 +408,14 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
 #endif
 
   if (conf->connection != NULL)
-    return (0);
+    return 0;
 
   time_t now = time(NULL);
   if (now < (last_connect_time + conf->connection_retry_delay)) {
     DEBUG("amqp plugin: skipping connection retry, "
           "ConnectionRetryDelay: %d",
           conf->connection_retry_delay);
-    return (1);
+    return 1;
   } else {
     DEBUG("amqp plugin: retrying connection");
     last_connect_time = now;
@@ -423,12 +424,12 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
   conf->connection = amqp_new_connection();
   if (conf->connection == NULL) {
     ERROR("amqp plugin: amqp_new_connection failed.");
-    return (ENOMEM);
+    return ENOMEM;
   }
 
 #ifdef HAVE_AMQP_TCP_SOCKET
 #define CLOSE_SOCKET() /* amqp_destroy_connection() closes the socket for us   \
-                          */
+                        */
   /* TODO: add support for SSL using amqp_ssl_socket_new
    *       and related functions */
   socket = amqp_tcp_socket_new(conf->connection);
@@ -436,31 +437,27 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
     ERROR("amqp plugin: amqp_tcp_socket_new failed.");
     amqp_destroy_connection(conf->connection);
     conf->connection = NULL;
-    return (ENOMEM);
+    return ENOMEM;
   }
 
   status = amqp_socket_open(socket, CONF(conf, host), conf->port);
   if (status < 0) {
-    char errbuf[1024];
     status *= -1;
-    ERROR("amqp plugin: amqp_socket_open failed: %s",
-          sstrerror(status, errbuf, sizeof(errbuf)));
+    ERROR("amqp plugin: amqp_socket_open failed: %s", STRERROR(status));
     amqp_destroy_connection(conf->connection);
     conf->connection = NULL;
-    return (status);
+    return status;
   }
 #else /* HAVE_AMQP_TCP_SOCKET */
 #define CLOSE_SOCKET() close(sockfd)
   /* this interface is deprecated as of rabbitmq-c 0.4 */
   sockfd = amqp_open_socket(CONF(conf, host), conf->port);
   if (sockfd < 0) {
-    char errbuf[1024];
     status = (-1) * sockfd;
-    ERROR("amqp plugin: amqp_open_socket failed: %s",
-          sstrerror(status, errbuf, sizeof(errbuf)));
+    ERROR("amqp plugin: amqp_open_socket failed: %s", STRERROR(status));
     amqp_destroy_connection(conf->connection);
     conf->connection = NULL;
-    return (status);
+    return status;
   }
   amqp_set_sockfd(conf->connection, sockfd);
 #endif
@@ -477,7 +474,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
     amqp_destroy_connection(conf->connection);
     CLOSE_SOCKET();
     conf->connection = NULL;
-    return (1);
+    return 1;
   }
 
   amqp_channel_open(conf->connection, /* channel = */ 1);
@@ -489,7 +486,7 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
     amqp_destroy_connection(conf->connection);
     CLOSE_SOCKET();
     conf->connection = NULL;
-    return (1);
+    return 1;
   }
 
   INFO("amqp plugin: Successfully opened connection to vhost \"%s\" "
@@ -498,16 +495,16 @@ static int camqp_connect(camqp_config_t *conf) /* {{{ */
 
   status = camqp_create_exchange(conf);
   if (status != 0)
-    return (status);
+    return status;
 
   if (!conf->publish)
-    return (camqp_setup_queue(conf));
-  return (0);
+    return camqp_setup_queue(conf);
+  return 0;
 } /* }}} int camqp_connect */
 
 static int camqp_shutdown(void) /* {{{ */
 {
-  DEBUG("amqp plugin: Shutting down %zu subscriber threads.",
+  DEBUG("amqp plugin: Shutting down %" PRIsz " subscriber threads.",
         subscriber_threads_num);
 
   subscriber_threads_running = 0;
@@ -524,7 +521,7 @@ static int camqp_shutdown(void) /* {{{ */
 
   DEBUG("amqp plugin: All subscriber threads exited.");
 
-  return (0);
+  return 0;
 } /* }}} int camqp_shutdown */
 
 /*
@@ -545,22 +542,20 @@ static int camqp_read_body(camqp_config_t *conf, /* {{{ */
   while (received < body_size) {
     status = amqp_simple_wait_frame(conf->connection, &frame);
     if (status < 0) {
-      char errbuf[1024];
       status = (-1) * status;
-      ERROR("amqp plugin: amqp_simple_wait_frame failed: %s",
-            sstrerror(status, errbuf, sizeof(errbuf)));
+      ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", STRERROR(status));
       camqp_close_connection(conf);
-      return (status);
+      return status;
     }
 
     if (frame.frame_type != AMQP_FRAME_BODY) {
       NOTICE("amqp plugin: Unexpected frame type: %#" PRIx8, frame.frame_type);
-      return (-1);
+      return -1;
     }
 
     if ((body_size - received) < frame.payload.body_fragment.len) {
       WARNING("amqp plugin: Body is larger than indicated by header.");
-      return (-1);
+      return -1;
     }
 
     memcpy(body_ptr, frame.payload.body_fragment.bytes,
@@ -573,19 +568,19 @@ static int camqp_read_body(camqp_config_t *conf, /* {{{ */
     status = cmd_handle_putval(stderr, body);
     if (status != 0)
       ERROR("amqp plugin: cmd_handle_putval failed with status %i.", status);
-    return (status);
+    return status;
   } else if (strcasecmp("application/json", content_type) == 0) {
     ERROR("amqp plugin: camqp_read_body: Parsing JSON data has not "
           "been implemented yet. FIXME!");
-    return (0);
+    return 0;
   } else {
     ERROR("amqp plugin: camqp_read_body: Unknown content type \"%s\".",
           content_type);
-    return (EINVAL);
+    return EINVAL;
   }
 
   /* not reached */
-  return (0);
+  return 0;
 } /* }}} int camqp_read_body */
 
 static int camqp_read_header(camqp_config_t *conf) /* {{{ */
@@ -597,31 +592,29 @@ static int camqp_read_header(camqp_config_t *conf) /* {{{ */
 
   status = amqp_simple_wait_frame(conf->connection, &frame);
   if (status < 0) {
-    char errbuf[1024];
     status = (-1) * status;
-    ERROR("amqp plugin: amqp_simple_wait_frame failed: %s",
-          sstrerror(status, errbuf, sizeof(errbuf)));
+    ERROR("amqp plugin: amqp_simple_wait_frame failed: %s", STRERROR(status));
     camqp_close_connection(conf);
-    return (status);
+    return status;
   }
 
   if (frame.frame_type != AMQP_FRAME_HEADER) {
     NOTICE("amqp plugin: Unexpected frame type: %#" PRIx8, frame.frame_type);
-    return (-1);
+    return -1;
   }
 
   properties = frame.payload.properties.decoded;
   content_type = camqp_bytes_cstring(&properties->content_type);
   if (content_type == NULL) {
     ERROR("amqp plugin: Unable to determine content type.");
-    return (-1);
+    return -1;
   }
 
   status = camqp_read_body(conf, (size_t)frame.payload.properties.body_size,
                            content_type);
 
   sfree(content_type);
-  return (status);
+  return status;
 } /* }}} int camqp_read_header */
 
 static void *camqp_subscribe_thread(void *user_data) /* {{{ */
@@ -671,7 +664,7 @@ static void *camqp_subscribe_thread(void *user_data) /* {{{ */
 
   camqp_config_free(conf);
   pthread_exit(NULL);
-  return (NULL);
+  return NULL;
 } /* }}} void *camqp_subscribe_thread */
 
 static int camqp_subscribe_init(camqp_config_t *conf) /* {{{ */
@@ -684,7 +677,7 @@ static int camqp_subscribe_init(camqp_config_t *conf) /* {{{ */
   if (tmp == NULL) {
     ERROR("amqp plugin: realloc failed.");
     sfree(subscriber_threads);
-    return (ENOMEM);
+    return ENOMEM;
   }
   subscriber_threads = tmp;
   tmp = subscriber_threads + subscriber_threads_num;
@@ -693,15 +686,13 @@ static int camqp_subscribe_init(camqp_config_t *conf) /* {{{ */
   status = plugin_thread_create(tmp, /* attr = */ NULL, camqp_subscribe_thread,
                                 conf, "amqp subscribe");
   if (status != 0) {
-    char errbuf[1024];
-    ERROR("amqp plugin: pthread_create failed: %s",
-          sstrerror(status, errbuf, sizeof(errbuf)));
-    return (status);
+    ERROR("amqp plugin: pthread_create failed: %s", STRERROR(status));
+    return status;
   }
 
   subscriber_threads_num++;
 
-  return (0);
+  return 0;
 } /* }}} int camqp_subscribe_init */
 
 /*
@@ -714,7 +705,7 @@ static int camqp_write_locked(camqp_config_t *conf, /* {{{ */
 
   status = camqp_connect(conf);
   if (status != 0)
-    return (status);
+    return status;
 
   amqp_basic_properties_t props = {._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
                                              AMQP_BASIC_DELIVERY_MODE_FLAG |
@@ -742,7 +733,7 @@ static int camqp_write_locked(camqp_config_t *conf, /* {{{ */
     camqp_close_connection(conf);
   }
 
-  return (status);
+  return status;
 } /* }}} int camqp_write_locked */
 
 static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
@@ -753,7 +744,7 @@ static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
   int status;
 
   if ((ds == NULL) || (vl == NULL) || (conf == NULL))
-    return (EINVAL);
+    return EINVAL;
 
   if (conf->routing_key != NULL) {
     sstrncpy(routing_key, conf->routing_key, sizeof(routing_key));
@@ -776,7 +767,7 @@ static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
     status = cmd_create_putval(buffer, sizeof(buffer), ds, vl);
     if (status != 0) {
       ERROR("amqp plugin: cmd_create_putval failed with status %i.", status);
-      return (status);
+      return status;
     }
   } else if (conf->format == CAMQP_FORMAT_JSON) {
     size_t bfree = sizeof(buffer);
@@ -791,18 +782,18 @@ static int camqp_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
                         conf->postfix, conf->escape_char, conf->graphite_flags);
     if (status != 0) {
       ERROR("amqp plugin: format_graphite failed with status %i.", status);
-      return (status);
+      return status;
     }
   } else {
     ERROR("amqp plugin: Invalid format (%i).", conf->format);
-    return (-1);
+    return -1;
   }
 
   pthread_mutex_lock(&conf->lock);
   status = camqp_write_locked(conf, buffer, routing_key);
   pthread_mutex_unlock(&conf->lock);
 
-  return (status);
+  return status;
 } /* }}} int camqp_write */
 
 /*
@@ -816,7 +807,7 @@ static int camqp_config_set_format(oconfig_item_t *ci, /* {{{ */
   string = NULL;
   status = cf_util_get_string(ci, &string);
   if (status != 0)
-    return (status);
+    return status;
 
   assert(string != NULL);
   if (strcasecmp("Command", string) == 0)
@@ -831,18 +822,18 @@ static int camqp_config_set_format(oconfig_item_t *ci, /* {{{ */
 
   free(string);
 
-  return (0);
+  return 0;
 } /* }}} int config_set_string */
 
 static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
-                                   _Bool publish) {
+                                   bool publish) {
   camqp_config_t *conf;
   int status;
 
   conf = calloc(1, sizeof(*conf));
   if (conf == NULL) {
     ERROR("amqp plugin: calloc failed.");
-    return (ENOMEM);
+    return ENOMEM;
   }
 
   /* Initialize "conf" {{{ */
@@ -860,7 +851,7 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
 
   /* publish only */
   conf->delivery_mode = CAMQP_DM_VOLATILE;
-  conf->store_rates = 0;
+  conf->store_rates = false;
   conf->graphite_flags = 0;
   /* publish & graphite only */
   conf->prefix = NULL;
@@ -869,8 +860,8 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
   /* subscribe only */
   conf->exchange_type = NULL;
   conf->queue = NULL;
-  conf->queue_durable = 0;
-  conf->queue_auto_delete = 1;
+  conf->queue_durable = false;
+  conf->queue_auto_delete = true;
   /* general */
   conf->connection = NULL;
   pthread_mutex_init(&conf->lock, /* attr = */ NULL);
@@ -879,7 +870,7 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
   status = cf_util_get_string(ci, &conf->name);
   if (status != 0) {
     sfree(conf);
-    return (status);
+    return status;
   }
 
   for (int i = 0; i < ci->children_num; i++) {
@@ -912,7 +903,7 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
     else if (strcasecmp("RoutingKey", child->key) == 0)
       status = cf_util_get_string(child, &conf->routing_key);
     else if ((strcasecmp("Persistent", child->key) == 0) && publish) {
-      _Bool tmp = 0;
+      bool tmp = 0;
       status = cf_util_get_boolean(child, &tmp);
       if (tmp)
         conf->delivery_mode = CAMQP_DM_PERSISTENT;
@@ -970,7 +961,7 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
 
   if (status != 0) {
     camqp_config_free(conf);
-    return (status);
+    return status;
   }
 
   if (conf->exchange != NULL) {
@@ -982,23 +973,24 @@ static int camqp_config_connection(oconfig_item_t *ci, /* {{{ */
     char cbname[128];
     ssnprintf(cbname, sizeof(cbname), "amqp/%s", conf->name);
 
-    status = plugin_register_write(
-        cbname, camqp_write, &(user_data_t){
-                                 .data = conf, .free_func = camqp_config_free,
-                             });
+    status = plugin_register_write(cbname, camqp_write,
+                                   &(user_data_t){
+                                       .data = conf,
+                                       .free_func = camqp_config_free,
+                                   });
     if (status != 0) {
       camqp_config_free(conf);
-      return (status);
+      return status;
     }
   } else {
     status = camqp_subscribe_init(conf);
     if (status != 0) {
       camqp_config_free(conf);
-      return (status);
+      return status;
     }
   }
 
-  return (0);
+  return 0;
 } /* }}} int camqp_config_connection */
 
 static int camqp_config(oconfig_item_t *ci) /* {{{ */
@@ -1007,20 +999,18 @@ static int camqp_config(oconfig_item_t *ci) /* {{{ */
     oconfig_item_t *child = ci->children + i;
 
     if (strcasecmp("Publish", child->key) == 0)
-      camqp_config_connection(child, /* publish = */ 1);
+      camqp_config_connection(child, /* publish = */ true);
     else if (strcasecmp("Subscribe", child->key) == 0)
-      camqp_config_connection(child, /* publish = */ 0);
+      camqp_config_connection(child, /* publish = */ false);
     else
       WARNING("amqp plugin: Ignoring unknown config option \"%s\".",
               child->key);
   } /* for (ci->children_num) */
 
-  return (0);
+  return 0;
 } /* }}} int camqp_config */
 
 void module_register(void) {
   plugin_register_complex_config("amqp", camqp_config);
   plugin_register_shutdown("amqp", camqp_shutdown);
 } /* void module_register */
-
-/* vim: set sw=4 sts=4 et fdm=marker : */