Added library link check and addressed review comments
[collectd.git] / src / amqp1.c
1 /**
2  * collectd - src/amqp1.c
3  * Copyright(c) 2017 Red Hat Inc.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Andy Smith <ansmith@redhat.com>
25  */
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_cmd_putval.h"
32 #include "utils_deq.h"
33 #include "utils_format_graphite.h"
34 #include "utils_format_json.h"
35 #include "utils_random.h"
36
37 #include <proton/condition.h>
38 #include <proton/connection.h>
39 #include <proton/delivery.h>
40 #include <proton/link.h>
41 #include <proton/message.h>
42 #include <proton/proactor.h>
43 #include <proton/sasl.h>
44 #include <proton/session.h>
45 #include <proton/transport.h>
46
47 #include <errno.h>
48 #include <stdint.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51
52 #define BUFSIZE 8192
53 #define AMQP1_FORMAT_JSON 0
54 #define AMQP1_FORMAT_COMMAND 1
55 #define AMQP1_FORMAT_GRAPHITE 2
56
57 typedef struct amqp1_config_transport_s {
58   DEQ_LINKS(struct amqp1_config_transport_s);
59   char *name;
60   char *host;
61   char *port;
62   char *user;
63   char *password;
64   char *address;
65   int retry_delay;
66 } amqp1_config_transport_t;
67
68 typedef struct amqp1_config_instance_s {
69   DEQ_LINKS(struct amqp1_config_instance_s);
70   char *name;
71   bool notify;
72   uint8_t format;
73   unsigned int graphite_flags;
74   bool store_rates;
75   char *prefix;
76   char *postfix;
77   char escape_char;
78   bool pre_settle;
79   char send_to[1024];
80 } amqp1_config_instance_t;
81
82 DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
83
84 typedef struct cd_message_s {
85   DEQ_LINKS(struct cd_message_s);
86   pn_rwbytes_t mbuf;
87   amqp1_config_instance_t *instance;
88 } cd_message_t;
89
90 DEQ_DECLARE(cd_message_t, cd_message_list_t);
91
92 /*
93  * Globals
94  */
95 static pn_connection_t *conn = NULL;
96 static pn_link_t *sender = NULL;
97 static pn_proactor_t *proactor = NULL;
98 static pthread_mutex_t send_lock;
99 static cd_message_list_t out_messages;
100 static uint64_t cd_tag = 1;
101 static uint64_t acknowledged = 0;
102 static amqp1_config_transport_t *transport = NULL;
103 static bool stopping = false;
104 static int event_thread_running = 0;
105 static pthread_t event_thread_id;
106
107 /*
108  * Functions
109  */
110 static void cd_message_free(cd_message_t *cdm) {
111   free(cdm->mbuf.start);
112   free(cdm);
113 } /* }}} void cd_message_free */
114
115 static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
116 {
117   uint64_t dtag;
118   cd_message_list_t to_send;
119   cd_message_t *cdm;
120   int link_credit = pn_link_credit(link);
121   int event_count = 0;
122   pn_delivery_t *dlv;
123
124   if (stopping) {
125     return 0;
126   }
127
128   DEQ_INIT(to_send);
129
130   pthread_mutex_lock(&send_lock);
131
132   if (link_credit > 0) {
133     dtag = cd_tag;
134     cdm = DEQ_HEAD(out_messages);
135     while (cdm) {
136       DEQ_REMOVE_HEAD(out_messages);
137       DEQ_INSERT_TAIL(to_send, cdm);
138       if (DEQ_SIZE(to_send) == link_credit)
139         break;
140       cdm = DEQ_HEAD(out_messages);
141     }
142     cd_tag += DEQ_SIZE(to_send);
143   }
144
145   pthread_mutex_unlock(&send_lock);
146
147   /* message is already formatted and encoded */
148   cdm = DEQ_HEAD(to_send);
149   while (cdm) {
150     DEQ_REMOVE_HEAD(to_send);
151     dtag++;
152     dlv = pn_delivery(link, pn_dtag((const char *)&dtag, sizeof(dtag)));
153     pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
154     pn_link_advance(link);
155     if (cdm->instance->pre_settle == true) {
156       pn_delivery_settle(dlv);
157     }
158     event_count++;
159     cd_message_free(cdm);
160     cdm = DEQ_HEAD(to_send);
161   }
162
163   return event_count;
164 } /* }}} int amqp1_send_out_messages */
165
166 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
167 {
168   if (pn_condition_is_set(cond)) {
169     ERROR("amqp1 plugin: %s: %s: %s", pn_event_type_name(pn_event_type(e)),
170           pn_condition_get_name(cond), pn_condition_get_description(cond));
171     pn_connection_close(pn_event_connection(e));
172     conn = NULL;
173   }
174 } /* }}} void check_condition */
175
176 static bool handle(pn_event_t *event) /* {{{ */
177 {
178
179   switch (pn_event_type(event)) {
180
181   case PN_CONNECTION_INIT: {
182     conn = pn_event_connection(event);
183     pn_connection_set_container(conn, transport->name);
184     pn_connection_open(conn);
185     pn_session_t *ssn = pn_session(conn);
186     pn_session_open(ssn);
187     sender = pn_sender(ssn, "cd-sender");
188     pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
189     pn_link_open(sender);
190     break;
191   }
192
193   case PN_LINK_FLOW: {
194     /* peer has given us credit, send outbound messages */
195     amqp1_send_out_messages(sender);
196     break;
197   }
198
199   case PN_DELIVERY: {
200     /* acknowledgement from peer that a message was delivered */
201     pn_delivery_t *dlv = pn_event_delivery(event);
202     if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) {
203       pn_delivery_settle(dlv);
204       acknowledged++;
205     }
206     break;
207   }
208
209   case PN_CONNECTION_WAKE: {
210     if (!stopping) {
211       amqp1_send_out_messages(sender);
212     }
213     break;
214   }
215
216   case PN_TRANSPORT_CLOSED: {
217     check_condition(event, pn_transport_condition(pn_event_transport(event)));
218     break;
219   }
220
221   case PN_CONNECTION_REMOTE_CLOSE: {
222     check_condition(event,
223                     pn_session_remote_condition(pn_event_session(event)));
224     pn_connection_close(pn_event_connection(event));
225     break;
226   }
227
228   case PN_SESSION_REMOTE_CLOSE: {
229     check_condition(event,
230                     pn_session_remote_condition(pn_event_session(event)));
231     pn_connection_close(pn_event_connection(event));
232     break;
233   }
234
235   case PN_LINK_REMOTE_CLOSE:
236   case PN_LINK_REMOTE_DETACH: {
237     check_condition(event, pn_link_remote_condition(pn_event_link(event)));
238     pn_connection_close(pn_event_connection(event));
239     break;
240   }
241
242   case PN_PROACTOR_INACTIVE: {
243     return false;
244   }
245
246   default:
247     break;
248   }
249   return true;
250 } /* }}} bool handle */
251
252 static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
253 {
254   char addr[PN_MAX_ADDR];
255   cd_message_t *cdm;
256
257   /* setup proactor */
258   proactor = pn_proactor();
259   pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
260
261   while (!stopping) {
262     /* make connection */
263     conn = pn_connection();
264     if (transport->user != NULL) {
265       pn_connection_set_user(conn, transport->user);
266       pn_connection_set_password(conn, transport->password);
267     }
268     pn_proactor_connect(proactor, conn, addr);
269
270     bool engine_running = true;
271     while (engine_running && !stopping) {
272       pn_event_batch_t *events = pn_proactor_wait(proactor);
273       pn_event_t *e;
274       while ((e = pn_event_batch_next(events))) {
275         engine_running = handle(e);
276         if (!engine_running) {
277           break;
278         }
279       }
280       pn_proactor_done(proactor, events);
281     }
282
283     pn_proactor_release_connection(conn);
284
285     DEBUG("amqp1 plugin: retrying connection");
286     int delay = transport->retry_delay;
287     while (delay-- > 0 && !stopping) {
288       sleep(1.0);
289     }
290   }
291
292   pn_proactor_disconnect(proactor, NULL);
293
294   /* Free the remaining out_messages */
295   cdm = DEQ_HEAD(out_messages);
296   while (cdm) {
297     DEQ_REMOVE_HEAD(out_messages);
298     cd_message_free(cdm);
299     cdm = DEQ_HEAD(out_messages);
300   }
301
302   event_thread_running = 0;
303
304   return NULL;
305 } /* }}} void event_thread */
306
307 static int encqueue(cd_message_t *cdm,
308                     amqp1_config_instance_t *instance) /* {{{ */
309 {
310   size_t bufsize = BUFSIZE;
311   pn_data_t *body;
312   pn_message_t *message;
313   int status = 0;
314
315   /* encode message */
316   message = pn_message();
317   pn_message_set_address(message, instance->send_to);
318   body = pn_message_body(message);
319   pn_data_clear(body);
320   pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
321   pn_data_exit(body);
322
323   /* put_binary copies and stores so ok to use mbuf */
324   cdm->mbuf.size = bufsize;
325   while ((status = pn_message_encode(message, (char *)cdm->mbuf.start,
326                                      &cdm->mbuf.size)) == PN_OVERFLOW) {
327     DEBUG("amqp1 plugin: increasing message buffer size %i",
328           (int)cdm->mbuf.size);
329     cdm->mbuf.size *= 2;
330     cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size);
331   }
332
333   if (status != 0) {
334     ERROR("amqp1 plugin: error encoding message: %s",
335           pn_error_text(pn_message_error(message)));
336     pn_message_free(message);
337     cd_message_free(cdm);
338     return -1;
339   }
340
341   pthread_mutex_lock(&send_lock);
342   DEQ_INSERT_TAIL(out_messages, cdm);
343   pthread_mutex_unlock(&send_lock);
344
345   pn_message_free(message);
346
347   /* activate the sender */
348   if (conn != NULL) {
349     pn_connection_wake(conn);
350   }
351
352   return 0;
353 } /* }}} int encqueue */
354
355 static int amqp1_notify(notification_t const *n,
356                         user_data_t *user_data) /* {{{ */
357 {
358   amqp1_config_instance_t *instance;
359   int status = 0;
360   size_t bfree = BUFSIZE;
361   size_t bfill = 0;
362   cd_message_t *cdm;
363   size_t bufsize = BUFSIZE;
364
365   if ((n == NULL) || (user_data == NULL))
366     return EINVAL;
367
368   instance = user_data->data;
369
370   if (instance->notify != true) {
371     ERROR("amqp1 plugin: write notification failed");
372   }
373
374   cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
375   DEQ_ITEM_INIT(cdm);
376   cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
377   cdm->instance = instance;
378
379   switch (instance->format) {
380   case AMQP1_FORMAT_JSON:
381     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
382     status = format_json_notification((char *)cdm->mbuf.start, bufsize, n);
383     if (status != 0) {
384       ERROR("amqp1 plugin: formatting notification failed");
385       return status;
386     }
387     cdm->mbuf.size = strlen(cdm->mbuf.start);
388     break;
389   default:
390     ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
391     return -1;
392   }
393
394   /* encode message and place on outbound queue */
395   status = encqueue(cdm, instance);
396
397   return status;
398 } /* }}} int amqp1_notify */
399
400 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
401                        user_data_t *user_data) {
402   amqp1_config_instance_t *instance;
403   int status = 0;
404   size_t bfree = BUFSIZE;
405   size_t bfill = 0;
406   cd_message_t *cdm;
407   size_t bufsize = BUFSIZE;
408
409   if ((ds == NULL) || (vl == NULL) || (transport == NULL) ||
410       (user_data == NULL))
411     return EINVAL;
412
413   instance = user_data->data;
414
415   if (instance->notify != false) {
416     ERROR("amqp1 plugin: write failed");
417   }
418
419   cdm = (cd_message_t *)malloc(sizeof(cd_message_t));
420   DEQ_ITEM_INIT(cdm);
421   cdm->mbuf = pn_rwbytes(bufsize, (char *)malloc(bufsize));
422   cdm->instance = instance;
423
424   switch (instance->format) {
425   case AMQP1_FORMAT_COMMAND:
426     status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
427     if (status != 0) {
428       ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
429       return status;
430     }
431     cdm->mbuf.size = strlen(cdm->mbuf.start);
432     break;
433   case AMQP1_FORMAT_JSON:
434     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
435     format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
436                            instance->store_rates);
437     format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
438     cdm->mbuf.size = strlen(cdm->mbuf.start);
439     break;
440   case AMQP1_FORMAT_GRAPHITE:
441     status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
442                              instance->prefix, instance->postfix,
443                              instance->escape_char, instance->graphite_flags);
444     if (status != 0) {
445       ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
446       return status;
447     }
448     cdm->mbuf.size = strlen(cdm->mbuf.start);
449     break;
450   default:
451     ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
452     return -1;
453   }
454
455   /* encode message and place on outboud queue */
456   encqueue(cdm, instance);
457
458   return 0;
459 } /* }}} int amqp1_write */
460
461 static void amqp1_config_transport_free(void *ptr) /* {{{ */
462 {
463   amqp1_config_transport_t *transport = ptr;
464
465   if (transport == NULL)
466     return;
467
468   sfree(transport->name);
469   sfree(transport->host);
470   sfree(transport->user);
471   sfree(transport->password);
472   sfree(transport->address);
473
474   sfree(transport);
475 } /* }}} void amqp1_config_transport_free */
476
477 static void amqp1_config_instance_free(void *ptr) /* {{{ */
478 {
479   amqp1_config_instance_t *instance = ptr;
480
481   if (instance == NULL)
482     return;
483
484   sfree(instance->name);
485   sfree(instance->prefix);
486   sfree(instance->postfix);
487
488   sfree(instance);
489 } /* }}} void amqp1_config_instance_free */
490
491 static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
492 {
493   int status = 0;
494   char *key = NULL;
495   amqp1_config_instance_t *instance;
496
497   instance = calloc(1, sizeof(*instance));
498   if (instance == NULL) {
499     ERROR("amqp1 plugin: calloc failed.");
500     return ENOMEM;
501   }
502
503   status = cf_util_get_string(ci, &instance->name);
504   if (status != 0) {
505     sfree(instance);
506     return status;
507   }
508
509   for (int i = 0; i < ci->children_num; i++) {
510     oconfig_item_t *child = ci->children + i;
511
512     if (strcasecmp("PreSettle", child->key) == 0)
513       status = cf_util_get_boolean(child, &instance->pre_settle);
514     else if (strcasecmp("Notify", child->key) == 0)
515       status = cf_util_get_boolean(child, &instance->notify);
516     else if (strcasecmp("Format", child->key) == 0) {
517       status = cf_util_get_string(child, &key);
518       if (status != 0)
519         return status;
520       /* TODO: goto errout */
521       //          goto errout;
522       assert(key != NULL);
523       if (strcasecmp(key, "Command") == 0) {
524         instance->format = AMQP1_FORMAT_COMMAND;
525       } else if (strcasecmp(key, "Graphite") == 0) {
526         instance->format = AMQP1_FORMAT_GRAPHITE;
527       } else if (strcasecmp(key, "JSON") == 0) {
528         instance->format = AMQP1_FORMAT_JSON;
529       } else {
530         WARNING("amqp1 plugin: Invalid format string: %s", key);
531       }
532       sfree(key);
533     } else if (strcasecmp("StoreRates", child->key) == 0)
534       status = cf_util_get_boolean(child, &instance->store_rates);
535     else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
536       status = cf_util_get_flag(child, &instance->graphite_flags,
537                                 GRAPHITE_SEPARATE_INSTANCES);
538     else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0)
539       status = cf_util_get_flag(child, &instance->graphite_flags,
540                                 GRAPHITE_ALWAYS_APPEND_DS);
541     else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0)
542       status = cf_util_get_flag(child, &instance->graphite_flags,
543                                 GRAPHITE_PRESERVE_SEPARATOR);
544     else if (strcasecmp("GraphitePrefix", child->key) == 0)
545       status = cf_util_get_string(child, &instance->prefix);
546     else if (strcasecmp("GraphitePostfix", child->key) == 0)
547       status = cf_util_get_string(child, &instance->postfix);
548     else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
549       char *tmp_buff = NULL;
550       status = cf_util_get_string(child, &tmp_buff);
551       if (status == 0) {
552         if (strlen(tmp_buff) > 1)
553           WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
554                   "only one character. Others will be ignored.");
555         instance->escape_char = tmp_buff[0];
556       }
557       sfree(tmp_buff);
558     } else
559       WARNING("amqp1 plugin: Ignoring unknown "
560               "instance configuration option "
561               "\%s\".",
562               child->key);
563     if (status != 0)
564       break;
565   }
566
567   if (status != 0) {
568     amqp1_config_instance_free(instance);
569     return status;
570   } else {
571     char tpname[DATA_MAX_NAME_LEN];
572     status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
573     if ((status < 0) || (size_t)status >= sizeof(tpname)) {
574       ERROR("amqp1 plugin: Instance name would have been truncated.");
575       return -1;
576     }
577     status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
578                       transport->address, instance->name);
579     if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
580       ERROR("amqp1 plugin: send_to address would have been truncated.");
581       return -1;
582     }
583     if (instance->notify == true) {
584       status = plugin_register_notification(
585           tpname, amqp1_notify,
586           &(user_data_t){
587               .data = instance, .free_func = amqp1_config_instance_free,
588           });
589     } else {
590       status = plugin_register_write(
591           tpname, amqp1_write,
592           &(user_data_t){
593               .data = instance, .free_func = amqp1_config_instance_free,
594           });
595     }
596
597     if (status != 0) {
598       amqp1_config_instance_free(instance);
599     }
600   }
601
602   return status;
603 } /* }}} int amqp1_config_instance */
604
605 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
606 {
607   int status = 0;
608
609   transport = calloc(1, sizeof(*transport));
610   if (transport == NULL) {
611     ERROR("amqp1 plugin: calloc failed.");
612     return ENOMEM;
613   }
614
615   /* Initialize transport configuration {{{ */
616   transport->retry_delay = 1;
617
618   status = cf_util_get_string(ci, &transport->name);
619   if (status != 0) {
620     sfree(transport);
621     return status;
622   }
623
624   for (int i = 0; i < ci->children_num; i++) {
625     oconfig_item_t *child = ci->children + i;
626
627     if (strcasecmp("Host", child->key) == 0)
628       status = cf_util_get_string(child, &transport->host);
629     else if (strcasecmp("Port", child->key) == 0)
630       status = cf_util_get_string(child, &transport->port);
631     else if (strcasecmp("User", child->key) == 0)
632       status = cf_util_get_string(child, &transport->user);
633     else if (strcasecmp("Password", child->key) == 0)
634       status = cf_util_get_string(child, &transport->password);
635     else if (strcasecmp("Address", child->key) == 0)
636       status = cf_util_get_string(child, &transport->address);
637     else if (strcasecmp("RetryDelay", child->key) == 0)
638       status = cf_util_get_int(child, &transport->retry_delay);
639     else if (strcasecmp("Instance", child->key) == 0)
640       amqp1_config_instance(child);
641     else
642       WARNING("amqp1 plugin: Ignoring unknown "
643               "transport configuration option "
644               "\%s\".",
645               child->key);
646
647     if (status != 0)
648       break;
649   }
650
651   if (status != 0) {
652     amqp1_config_transport_free(transport);
653   }
654   return status;
655 } /* }}} int amqp1_config_transport */
656
657 static int amqp1_config(oconfig_item_t *ci) /* {{{ */
658 {
659
660   for (int i = 0; i < ci->children_num; i++) {
661     oconfig_item_t *child = ci->children + i;
662
663     if (strcasecmp("Transport", child->key) == 0)
664       amqp1_config_transport(child);
665     else
666       WARNING("amqp1 plugin: Ignoring unknown config option \%s\".",
667               child->key);
668   }
669
670   return 0;
671 } /* }}} int amqp1_config */
672
673 static int amqp1_init(void) /* {{{ */
674 {
675   int status;
676   char errbuf[1024];
677
678   if (transport == NULL) {
679     ERROR("amqp1: init failed, no transport configured");
680     return -1;
681   }
682
683   if (proactor == NULL) {
684     pthread_mutex_init(&send_lock, /* attr = */ NULL);
685     /* start_thread */
686     status =
687         plugin_thread_create(&event_thread_id, NULL /* no attributes */,
688                              event_thread, NULL /* no argument */, "handle");
689     if (status != 0) {
690       ERROR("amqp1 plugin: pthread_create failed: %s",
691             sstrerror(errno, errbuf, sizeof(errbuf)));
692     } else {
693       event_thread_running = 1;
694     }
695   }
696   return 0;
697 } /* }}} int amqp1_init */
698
699 static int amqp1_shutdown(void) /* {{{ */
700 {
701   stopping = true;
702
703   /* Stop the proactor thread */
704   if (event_thread_running == 1) {
705     DEBUG("amqp1 plugin: Shutting down proactor thread.");
706     pn_connection_wake(conn);
707   }
708   pthread_join(event_thread_id, NULL /* no return value */);
709   memset(&event_thread_id, 0, sizeof(event_thread_id));
710
711   DEBUG("amqp1 plugin: proactor thread exited.");
712
713   if (transport != NULL) {
714     amqp1_config_transport_free(transport);
715   }
716
717   return 0;
718 } /* }}} int amqp1_shutdown */
719
720 void module_register(void) {
721   plugin_register_complex_config("amqp1", amqp1_config);
722   plugin_register_init("amqp1", amqp1_init);
723   plugin_register_shutdown("amqp1", amqp1_shutdown);
724 } /* void module_register */