Merge pull request #2837 from abays/fix-collectd-tg-dtime
[collectd.git] / src / amqp1.c
1 /**
2  * collectd - src/amqp1.c
3  * Copyright(c) 2017 Red Hat Inc.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Andy Smith <ansmith@redhat.com>
25  */
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_cmd_putval.h"
32 #include "utils_deq.h"
33 #include "utils_format_graphite.h"
34 #include "utils_format_json.h"
35 #include "utils_random.h"
36
37 #include <proton/condition.h>
38 #include <proton/connection.h>
39 #include <proton/delivery.h>
40 #include <proton/link.h>
41 #include <proton/message.h>
42 #include <proton/proactor.h>
43 #include <proton/sasl.h>
44 #include <proton/session.h>
45 #include <proton/transport.h>
46
47 #include <errno.h>
48 #include <stdint.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51
52 #define BUFSIZE 8192
53 #define AMQP1_FORMAT_JSON 0
54 #define AMQP1_FORMAT_COMMAND 1
55 #define AMQP1_FORMAT_GRAPHITE 2
56
57 typedef struct amqp1_config_transport_s {
58   DEQ_LINKS(struct amqp1_config_transport_s);
59   char *name;
60   char *host;
61   char *port;
62   char *user;
63   char *password;
64   char *address;
65   int retry_delay;
66 } amqp1_config_transport_t;
67
68 typedef struct amqp1_config_instance_s {
69   DEQ_LINKS(struct amqp1_config_instance_s);
70   char *name;
71   bool notify;
72   uint8_t format;
73   unsigned int graphite_flags;
74   bool store_rates;
75   char *prefix;
76   char *postfix;
77   char escape_char;
78   bool pre_settle;
79   char send_to[1024];
80 } amqp1_config_instance_t;
81
82 DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
83
84 typedef struct cd_message_s {
85   DEQ_LINKS(struct cd_message_s);
86   pn_rwbytes_t mbuf;
87   amqp1_config_instance_t *instance;
88 } cd_message_t;
89
90 DEQ_DECLARE(cd_message_t, cd_message_list_t);
91
92 /*
93  * Globals
94  */
95 static pn_connection_t *conn;
96 static pn_link_t *sender;
97 static pn_proactor_t *proactor;
98 static pthread_mutex_t send_lock;
99 static cd_message_list_t out_messages;
100 static uint64_t cd_tag = 1;
101 static uint64_t acknowledged;
102 static amqp1_config_transport_t *transport;
103 static bool stopping;
104 static bool event_thread_running;
105 static pthread_t event_thread_id;
106
107 /*
108  * Functions
109  */
110 static void cd_message_free(cd_message_t *cdm) {
111   free(cdm->mbuf.start);
112   free(cdm);
113 } /* }}} void cd_message_free */
114
115 static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
116 {
117   uint64_t dtag;
118   cd_message_list_t to_send;
119   cd_message_t *cdm;
120   int link_credit = pn_link_credit(link);
121   int event_count = 0;
122   pn_delivery_t *dlv;
123
124   if (stopping) {
125     return 0;
126   }
127
128   DEQ_INIT(to_send);
129
130   pthread_mutex_lock(&send_lock);
131
132   if (link_credit > 0) {
133     dtag = cd_tag;
134     cdm = DEQ_HEAD(out_messages);
135     while (cdm) {
136       DEQ_REMOVE_HEAD(out_messages);
137       DEQ_INSERT_TAIL(to_send, cdm);
138       if (DEQ_SIZE(to_send) == (size_t)link_credit)
139         break;
140       cdm = DEQ_HEAD(out_messages);
141     }
142     cd_tag += DEQ_SIZE(to_send);
143   }
144
145   pthread_mutex_unlock(&send_lock);
146
147   /* message is already formatted and encoded */
148   cdm = DEQ_HEAD(to_send);
149   while (cdm) {
150     DEQ_REMOVE_HEAD(to_send);
151     dtag++;
152     dlv = pn_delivery(link, pn_dtag((const char *)&dtag, sizeof(dtag)));
153     pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
154     pn_link_advance(link);
155     if (cdm->instance->pre_settle == true) {
156       pn_delivery_settle(dlv);
157     }
158     event_count++;
159     cd_message_free(cdm);
160     cdm = DEQ_HEAD(to_send);
161   }
162
163   return event_count;
164 } /* }}} int amqp1_send_out_messages */
165
166 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
167 {
168   if (pn_condition_is_set(cond)) {
169     ERROR("amqp1 plugin: %s: %s: %s", pn_event_type_name(pn_event_type(e)),
170           pn_condition_get_name(cond), pn_condition_get_description(cond));
171     pn_connection_close(pn_event_connection(e));
172     conn = NULL;
173   }
174 } /* }}} void check_condition */
175
176 static bool handle(pn_event_t *event) /* {{{ */
177 {
178
179   switch (pn_event_type(event)) {
180
181   case PN_CONNECTION_INIT: {
182     conn = pn_event_connection(event);
183     pn_connection_set_container(conn, transport->name);
184     pn_connection_open(conn);
185     pn_session_t *ssn = pn_session(conn);
186     pn_session_open(ssn);
187     sender = pn_sender(ssn, "cd-sender");
188     pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
189     pn_link_open(sender);
190     break;
191   }
192
193   case PN_LINK_FLOW: {
194     /* peer has given us credit, send outbound messages */
195     amqp1_send_out_messages(sender);
196     break;
197   }
198
199   case PN_DELIVERY: {
200     /* acknowledgement from peer that a message was delivered */
201     pn_delivery_t *dlv = pn_event_delivery(event);
202     if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) {
203       pn_delivery_settle(dlv);
204       acknowledged++;
205     }
206     break;
207   }
208
209   case PN_CONNECTION_WAKE: {
210     if (!stopping) {
211       amqp1_send_out_messages(sender);
212     }
213     break;
214   }
215
216   case PN_TRANSPORT_CLOSED: {
217     check_condition(event, pn_transport_condition(pn_event_transport(event)));
218     break;
219   }
220
221   case PN_CONNECTION_REMOTE_CLOSE: {
222     check_condition(event,
223                     pn_session_remote_condition(pn_event_session(event)));
224     pn_connection_close(pn_event_connection(event));
225     break;
226   }
227
228   case PN_SESSION_REMOTE_CLOSE: {
229     check_condition(event,
230                     pn_session_remote_condition(pn_event_session(event)));
231     pn_connection_close(pn_event_connection(event));
232     break;
233   }
234
235   case PN_LINK_REMOTE_CLOSE:
236   case PN_LINK_REMOTE_DETACH: {
237     check_condition(event, pn_link_remote_condition(pn_event_link(event)));
238     pn_connection_close(pn_event_connection(event));
239     break;
240   }
241
242   case PN_PROACTOR_INACTIVE: {
243     return false;
244   }
245
246   default:
247     break;
248   }
249   return true;
250 } /* }}} bool handle */
251
252 static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
253 {
254   char addr[PN_MAX_ADDR];
255   cd_message_t *cdm;
256
257   /* setup proactor */
258   proactor = pn_proactor();
259   pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
260
261   while (!stopping) {
262     /* make connection */
263     conn = pn_connection();
264     if (transport->user != NULL) {
265       pn_connection_set_user(conn, transport->user);
266       pn_connection_set_password(conn, transport->password);
267     }
268     pn_proactor_connect(proactor, conn, addr);
269
270     bool engine_running = true;
271     while (engine_running && !stopping) {
272       pn_event_batch_t *events = pn_proactor_wait(proactor);
273       pn_event_t *e;
274       while ((e = pn_event_batch_next(events))) {
275         engine_running = handle(e);
276         if (!engine_running) {
277           break;
278         }
279       }
280       pn_proactor_done(proactor, events);
281     }
282
283     pn_proactor_release_connection(conn);
284
285     DEBUG("amqp1 plugin: retrying connection");
286     int delay = transport->retry_delay;
287     while (delay-- > 0 && !stopping) {
288       sleep(1.0);
289     }
290   }
291
292   pn_proactor_disconnect(proactor, NULL);
293
294   /* Free the remaining out_messages */
295   cdm = DEQ_HEAD(out_messages);
296   while (cdm) {
297     DEQ_REMOVE_HEAD(out_messages);
298     cd_message_free(cdm);
299     cdm = DEQ_HEAD(out_messages);
300   }
301
302   event_thread_running = false;
303
304   return NULL;
305 } /* }}} void event_thread */
306
307 static int encqueue(cd_message_t *cdm,
308                     amqp1_config_instance_t *instance) /* {{{ */
309 {
310   /* encode message */
311   pn_message_t *message = pn_message();
312   pn_message_set_address(message, instance->send_to);
313   pn_data_t *body = pn_message_body(message);
314   pn_data_clear(body);
315   pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
316   pn_data_exit(body);
317
318   /* put_binary copies and stores so ok to use mbuf */
319   cdm->mbuf.size = BUFSIZE;
320
321   int status;
322   char *start;
323   while ((status = pn_message_encode(message, cdm->mbuf.start,
324                                      &cdm->mbuf.size)) == PN_OVERFLOW) {
325     DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
326     cdm->mbuf.size *= 2;
327     start = realloc(cdm->mbuf.start, cdm->mbuf.size);
328     if (start == NULL) {
329       status = -1;
330       break;
331     } else {
332       cdm->mbuf.start = start;
333     }
334   }
335
336   if (status != 0) {
337     ERROR("amqp1 plugin: error encoding message: %s",
338           pn_error_text(pn_message_error(message)));
339     pn_message_free(message);
340     return -1;
341   }
342
343   pthread_mutex_lock(&send_lock);
344   DEQ_INSERT_TAIL(out_messages, cdm);
345   pthread_mutex_unlock(&send_lock);
346
347   pn_message_free(message);
348
349   /* activate the sender */
350   if (conn) {
351     pn_connection_wake(conn);
352   }
353
354   return 0;
355 } /* }}} int encqueue */
356
357 static int amqp1_notify(notification_t const *n,
358                         user_data_t *user_data) /* {{{ */
359 {
360   int status = 0;
361   size_t bfree = BUFSIZE;
362   size_t bfill = 0;
363   size_t bufsize = BUFSIZE;
364
365   if (n == NULL || user_data == NULL)
366     return EINVAL;
367
368   amqp1_config_instance_t *instance = user_data->data;
369
370   if (instance->notify != true) {
371     ERROR("amqp1 plugin: write notification failed");
372   }
373
374   cd_message_t *cdm = malloc(sizeof(*cdm));
375   if (cdm == NULL ){
376     ERROR("amqp1 plugin: notify failed");
377     return -1;
378   }
379
380   DEQ_ITEM_INIT(cdm);
381   char *start = malloc(bufsize);
382   if (start == NULL) {
383     ERROR("amqp1 plugin: malloc failed");
384     free(cdm);
385     return -1;
386   }
387   cdm->mbuf.size = bufsize;
388   cdm->mbuf.start = start;
389   cdm->instance = instance;
390
391   switch (instance->format) {
392   case AMQP1_FORMAT_JSON:
393     format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
394     status = format_json_notification(cdm->mbuf.start, bufsize, n);
395     if (status != 0) {
396       ERROR("amqp1 plugin: formatting notification failed");
397       cd_message_free(cdm);
398       return status;
399     }
400     cdm->mbuf.size = strlen(cdm->mbuf.start);
401     if (cdm->mbuf.size >= BUFSIZE) {
402       ERROR("amqp1 plugin: notify format json failed");
403       cd_message_free(cdm);
404       return -1;
405     }
406     break;
407   default:
408     ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
409     cd_message_free(cdm);
410     return -1;
411   }
412
413   /* encode message and place on outbound queue */
414   status = encqueue(cdm, instance);
415   if (status != 0) {
416     ERROR("amqp1 plugin: notify enqueue failed");
417     cd_message_free(cdm);
418   }
419   return status;
420
421 } /* }}} int amqp1_notify */
422
423 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
424                        user_data_t *user_data) {
425   int status = 0;
426   size_t bfree = BUFSIZE;
427   size_t bfill = 0;
428   size_t bufsize = BUFSIZE;
429
430   if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL)
431     return EINVAL;
432
433   amqp1_config_instance_t *instance = user_data->data;
434
435   if (instance->notify != false) {
436     ERROR("amqp1 plugin: write failed");
437   }
438
439   cd_message_t *cdm = malloc(sizeof(*cdm));
440   if (cdm == NULL) {
441     ERROR("amqp1 plugin: malloc failed.");
442     return -1;
443   }
444   DEQ_ITEM_INIT(cdm);
445   char *start = malloc(bufsize);
446   if (start == NULL) {
447     ERROR("amqp1 plugin: malloc failed.");
448     free(cdm);
449     return -1;
450   }
451   cdm->mbuf.size = bufsize;
452   cdm->mbuf.start = start;
453   cdm->instance = instance;
454
455   switch (instance->format) {
456   case AMQP1_FORMAT_COMMAND:
457     status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
458     if (status != 0) {
459       ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
460       cd_message_free(cdm);
461       return status;
462     }
463     cdm->mbuf.size = strlen(cdm->mbuf.start);
464     if (cdm->mbuf.size >= BUFSIZE) {
465       ERROR("amqp1 plugin: format cmd failed");
466       cd_message_free(cdm);
467       return -1;
468     }
469     break;
470   case AMQP1_FORMAT_JSON:
471     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
472     format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
473                            instance->store_rates);
474     status= format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
475     if (status != 0) {
476       ERROR("amqp1 plugin: format_json_finalize failed with status %i.", status);
477       cd_message_free(cdm);
478       return(status);
479     }
480     cdm->mbuf.size = strlen(cdm->mbuf.start);
481     if (cdm->mbuf.size >= BUFSIZE) {
482       ERROR("amqp1 plugin: format graphite failed");
483       cd_message_free(cdm);
484       return -1;
485     }
486     break;
487   case AMQP1_FORMAT_GRAPHITE:
488     status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
489                              instance->prefix, instance->postfix,
490                              instance->escape_char, instance->graphite_flags);
491     if (status != 0) {
492       ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
493       cd_message_free(cdm);
494       return status;
495     }
496     cdm->mbuf.size = strlen(cdm->mbuf.start);
497     if (cdm->mbuf.size >= BUFSIZE) {
498       ERROR("amqp1 plugin: format graphite failed");
499       cd_message_free(cdm);
500       return -1;
501     }
502     break;
503   default:
504     ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
505     cd_message_free(cdm);
506     return -1;
507   }
508
509   /* encode message and place on outbound queue */
510   status = encqueue(cdm, instance);
511   if (status != 0) {
512     ERROR("amqp1 plugin: write enqueue failed");
513     cd_message_free(cdm);
514   }
515   return status;
516
517 } /* }}} int amqp1_write */
518
519 static void amqp1_config_transport_free(void *ptr) /* {{{ */
520 {
521   amqp1_config_transport_t *transport = ptr;
522
523   if (transport == NULL)
524     return;
525
526   sfree(transport->name);
527   sfree(transport->host);
528   sfree(transport->port);
529   sfree(transport->user);
530   sfree(transport->password);
531   sfree(transport->address);
532
533   sfree(transport);
534 } /* }}} void amqp1_config_transport_free */
535
536 static void amqp1_config_instance_free(void *ptr) /* {{{ */
537 {
538   amqp1_config_instance_t *instance = ptr;
539
540   if (instance == NULL)
541     return;
542
543   sfree(instance->name);
544   sfree(instance->prefix);
545   sfree(instance->postfix);
546
547   sfree(instance);
548 } /* }}} void amqp1_config_instance_free */
549
550 static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
551 {
552   amqp1_config_instance_t *instance = calloc(1, sizeof(*instance));
553   if (instance == NULL) {
554     ERROR("amqp1 plugin: calloc failed.");
555     return ENOMEM;
556   }
557
558   int status = cf_util_get_string(ci, &instance->name);
559   if (status != 0) {
560     sfree(instance);
561     return status;
562   }
563
564   for (int i = 0; i < ci->children_num; i++) {
565     oconfig_item_t *child = ci->children + i;
566
567     if (strcasecmp("PreSettle", child->key) == 0)
568       status = cf_util_get_boolean(child, &instance->pre_settle);
569     else if (strcasecmp("Notify", child->key) == 0)
570       status = cf_util_get_boolean(child, &instance->notify);
571     else if (strcasecmp("Format", child->key) == 0) {
572       char *key = NULL;
573       status = cf_util_get_string(child, &key);
574       if (status != 0)
575         return status;
576       assert(key != NULL);
577       if (strcasecmp(key, "Command") == 0) {
578         instance->format = AMQP1_FORMAT_COMMAND;
579       } else if (strcasecmp(key, "Graphite") == 0) {
580         instance->format = AMQP1_FORMAT_GRAPHITE;
581       } else if (strcasecmp(key, "JSON") == 0) {
582         instance->format = AMQP1_FORMAT_JSON;
583       } else {
584         WARNING("amqp1 plugin: Invalid format string: %s", key);
585       }
586       sfree(key);
587     } else if (strcasecmp("StoreRates", child->key) == 0)
588       status = cf_util_get_boolean(child, &instance->store_rates);
589     else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
590       status = cf_util_get_flag(child, &instance->graphite_flags,
591                                 GRAPHITE_SEPARATE_INSTANCES);
592     else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0)
593       status = cf_util_get_flag(child, &instance->graphite_flags,
594                                 GRAPHITE_ALWAYS_APPEND_DS);
595     else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0)
596       status = cf_util_get_flag(child, &instance->graphite_flags,
597                                 GRAPHITE_PRESERVE_SEPARATOR);
598     else if (strcasecmp("GraphitePrefix", child->key) == 0)
599       status = cf_util_get_string(child, &instance->prefix);
600     else if (strcasecmp("GraphitePostfix", child->key) == 0)
601       status = cf_util_get_string(child, &instance->postfix);
602     else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
603       char *tmp_buff = NULL;
604       status = cf_util_get_string(child, &tmp_buff);
605       if (status == 0) {
606         if (strlen(tmp_buff) > 1)
607           WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
608                   "only one character. Others will be ignored.");
609         instance->escape_char = tmp_buff[0];
610       }
611       sfree(tmp_buff);
612     } else
613       WARNING("amqp1 plugin: Ignoring unknown "
614               "instance configuration option "
615               "\"%s\".",
616               child->key);
617     if (status != 0)
618       break;
619   }
620
621   if (status != 0) {
622     amqp1_config_instance_free(instance);
623     return status;
624   } else {
625     char tpname[DATA_MAX_NAME_LEN];
626     status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
627     if ((status < 0) || (size_t)status >= sizeof(tpname)) {
628       ERROR("amqp1 plugin: Instance name would have been truncated.");
629       return -1;
630     }
631     status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
632                       transport->address, instance->name);
633     if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
634       ERROR("amqp1 plugin: send_to address would have been truncated.");
635       return -1;
636     }
637     if (instance->notify) {
638       status = plugin_register_notification(
639           tpname, amqp1_notify,
640           &(user_data_t){
641               .data = instance, .free_func = amqp1_config_instance_free,
642           });
643     } else {
644       status = plugin_register_write(
645           tpname, amqp1_write,
646           &(user_data_t){
647               .data = instance, .free_func = amqp1_config_instance_free,
648           });
649     }
650
651     if (status != 0) {
652       amqp1_config_instance_free(instance);
653     }
654   }
655
656   return status;
657 } /* }}} int amqp1_config_instance */
658
659 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
660 {
661   transport = calloc(1, sizeof(*transport));
662   if (transport == NULL) {
663     ERROR("amqp1 plugin: calloc failed.");
664     return ENOMEM;
665   }
666
667   /* Initialize transport configuration {{{ */
668   transport->retry_delay = 1;
669
670   int status = cf_util_get_string(ci, &transport->name);
671   if (status != 0) {
672     sfree(transport);
673     return status;
674   }
675
676   for (int i = 0; i < ci->children_num; i++) {
677     oconfig_item_t *child = ci->children + i;
678
679     if (strcasecmp("Host", child->key) == 0)
680       status = cf_util_get_string(child, &transport->host);
681     else if (strcasecmp("Port", child->key) == 0)
682       status = cf_util_get_string(child, &transport->port);
683     else if (strcasecmp("User", child->key) == 0)
684       status = cf_util_get_string(child, &transport->user);
685     else if (strcasecmp("Password", child->key) == 0)
686       status = cf_util_get_string(child, &transport->password);
687     else if (strcasecmp("Address", child->key) == 0)
688       status = cf_util_get_string(child, &transport->address);
689     else if (strcasecmp("RetryDelay", child->key) == 0)
690       status = cf_util_get_int(child, &transport->retry_delay);
691     else if (strcasecmp("Instance", child->key) == 0)
692       amqp1_config_instance(child);
693     else
694       WARNING("amqp1 plugin: Ignoring unknown "
695               "transport configuration option "
696               "\"%s\".",
697               child->key);
698
699     if (status != 0)
700       break;
701   }
702
703   if (status != 0) {
704     amqp1_config_transport_free(transport);
705   }
706   return status;
707 } /* }}} int amqp1_config_transport */
708
709 static int amqp1_config(oconfig_item_t *ci) /* {{{ */
710 {
711
712   for (int i = 0; i < ci->children_num; i++) {
713     oconfig_item_t *child = ci->children + i;
714
715     if (strcasecmp("Transport", child->key) == 0)
716       amqp1_config_transport(child);
717     else
718       WARNING("amqp1 plugin: Ignoring unknown config option \"%s\".",
719               child->key);
720   }
721
722   return 0;
723 } /* }}} int amqp1_config */
724
725 static int amqp1_init(void) /* {{{ */
726 {
727   if (transport == NULL) {
728     ERROR("amqp1: init failed, no transport configured");
729     return -1;
730   }
731
732   if (proactor == NULL) {
733     pthread_mutex_init(&send_lock, /* attr = */ NULL);
734     /* start_thread */
735     int status =
736         plugin_thread_create(&event_thread_id, NULL /* no attributes */,
737                              event_thread, NULL /* no argument */, "handle");
738     if (status != 0) {
739       ERROR("amqp1 plugin: pthread_create failed: %s", STRERRNO);
740     } else {
741       event_thread_running = true;
742     }
743   }
744   return 0;
745 } /* }}} int amqp1_init */
746
747 static int amqp1_shutdown(void) /* {{{ */
748 {
749   stopping = true;
750
751   /* Stop the proactor thread */
752   if (event_thread_running) {
753     DEBUG("amqp1 plugin: Shutting down proactor thread.");
754     pn_connection_wake(conn);
755   }
756   pthread_join(event_thread_id, NULL /* no return value */);
757   memset(&event_thread_id, 0, sizeof(event_thread_id));
758
759   DEBUG("amqp1 plugin: proactor thread exited.");
760
761   if (transport) {
762     amqp1_config_transport_free(transport);
763   }
764
765   return 0;
766 } /* }}} int amqp1_shutdown */
767
768 void module_register(void) {
769   plugin_register_complex_config("amqp1", amqp1_config);
770   plugin_register_init("amqp1", amqp1_init);
771   plugin_register_shutdown("amqp1", amqp1_shutdown);
772 } /* void module_register */