67c96b750256550c90fee2776a338b170695c058
[collectd.git] / src / amqp1.c
1 /**
2  * collectd - src/amqp1.c
3  * Copyright(c) 2017 Red Hat Inc.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Andy Smith <ansmith@redhat.com>
25  */
26
27 #include "collectd.h"
28
29 #include "plugin.h"
30 #include "utils/cmds/putval.h"
31 #include "utils/common/common.h"
32 #include "utils/deq/deq.h"
33 #include "utils/format_graphite/format_graphite.h"
34 #include "utils/format_json/format_json.h"
35 #include "utils_random.h"
36
37 #include <proton/condition.h>
38 #include <proton/connection.h>
39 #include <proton/delivery.h>
40 #include <proton/link.h>
41 #include <proton/message.h>
42 #include <proton/proactor.h>
43 #include <proton/sasl.h>
44 #include <proton/session.h>
45 #include <proton/transport.h>
46
47 #include <errno.h>
48 #include <stdint.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51
52 #define BUFSIZE 8192
53 #define AMQP1_FORMAT_JSON 0
54 #define AMQP1_FORMAT_COMMAND 1
55 #define AMQP1_FORMAT_GRAPHITE 2
56
57 typedef struct amqp1_config_transport_s {
58   DEQ_LINKS(struct amqp1_config_transport_s);
59   char *name;
60   char *host;
61   char *port;
62   char *user;
63   char *password;
64   char *address;
65   int retry_delay;
66 } amqp1_config_transport_t;
67
68 typedef struct amqp1_config_instance_s {
69   DEQ_LINKS(struct amqp1_config_instance_s);
70   char *name;
71   bool notify;
72   uint8_t format;
73   unsigned int graphite_flags;
74   bool store_rates;
75   char *prefix;
76   char *postfix;
77   char escape_char;
78   bool pre_settle;
79   char send_to[1024];
80 } amqp1_config_instance_t;
81
82 DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
83
84 typedef struct cd_message_s {
85   DEQ_LINKS(struct cd_message_s);
86   pn_rwbytes_t mbuf;
87   amqp1_config_instance_t *instance;
88 } cd_message_t;
89
90 DEQ_DECLARE(cd_message_t, cd_message_list_t);
91
92 /*
93  * Globals
94  */
95 static pn_connection_t *conn;
96 static pn_link_t *sender;
97 static pn_proactor_t *proactor;
98 static pthread_mutex_t send_lock;
99 static cd_message_list_t out_messages;
100 static uint64_t cd_tag = 1;
101 static uint64_t acknowledged;
102 static amqp1_config_transport_t *transport;
103 static bool stopping;
104 static bool event_thread_running;
105 static pthread_t event_thread_id;
106
107 /*
108  * Functions
109  */
110 static void cd_message_free(cd_message_t *cdm) {
111   free(cdm->mbuf.start);
112   free(cdm);
113 } /* }}} void cd_message_free */
114
115 static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
116 {
117   uint64_t dtag;
118   cd_message_list_t to_send;
119   cd_message_t *cdm;
120   int link_credit = pn_link_credit(link);
121   int event_count = 0;
122   pn_delivery_t *dlv;
123
124   if (stopping) {
125     return 0;
126   }
127
128   DEQ_INIT(to_send);
129
130   pthread_mutex_lock(&send_lock);
131
132   if (link_credit > 0) {
133     dtag = cd_tag;
134     cdm = DEQ_HEAD(out_messages);
135     while (cdm) {
136       DEQ_REMOVE_HEAD(out_messages);
137       DEQ_INSERT_TAIL(to_send, cdm);
138       if (DEQ_SIZE(to_send) == (size_t)link_credit)
139         break;
140       cdm = DEQ_HEAD(out_messages);
141     }
142     cd_tag += DEQ_SIZE(to_send);
143   }
144
145   pthread_mutex_unlock(&send_lock);
146
147   /* message is already formatted and encoded */
148   cdm = DEQ_HEAD(to_send);
149   while (cdm) {
150     DEQ_REMOVE_HEAD(to_send);
151     dtag++;
152     dlv = pn_delivery(link, pn_dtag((const char *)&dtag, sizeof(dtag)));
153     pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
154     pn_link_advance(link);
155     if (cdm->instance->pre_settle == true) {
156       pn_delivery_settle(dlv);
157     }
158     event_count++;
159     cd_message_free(cdm);
160     cdm = DEQ_HEAD(to_send);
161   }
162
163   return event_count;
164 } /* }}} int amqp1_send_out_messages */
165
166 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
167 {
168   if (pn_condition_is_set(cond)) {
169     ERROR("amqp1 plugin: %s: %s: %s", pn_event_type_name(pn_event_type(e)),
170           pn_condition_get_name(cond), pn_condition_get_description(cond));
171     pn_connection_close(pn_event_connection(e));
172     conn = NULL;
173   }
174 } /* }}} void check_condition */
175
176 static bool handle(pn_event_t *event) /* {{{ */
177 {
178
179   switch (pn_event_type(event)) {
180
181   case PN_CONNECTION_INIT: {
182     conn = pn_event_connection(event);
183     pn_connection_set_container(conn, transport->name);
184     pn_connection_open(conn);
185     pn_session_t *ssn = pn_session(conn);
186     pn_session_open(ssn);
187     sender = pn_sender(ssn, "cd-sender");
188     pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
189     pn_link_open(sender);
190     break;
191   }
192
193   case PN_LINK_FLOW: {
194     /* peer has given us credit, send outbound messages */
195     amqp1_send_out_messages(sender);
196     break;
197   }
198
199   case PN_DELIVERY: {
200     /* acknowledgement from peer that a message was delivered */
201     pn_delivery_t *dlv = pn_event_delivery(event);
202     if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) {
203       pn_delivery_settle(dlv);
204       acknowledged++;
205     }
206     break;
207   }
208
209   case PN_CONNECTION_WAKE: {
210     if (!stopping) {
211       amqp1_send_out_messages(sender);
212     }
213     break;
214   }
215
216   case PN_TRANSPORT_CLOSED: {
217     check_condition(event, pn_transport_condition(pn_event_transport(event)));
218     break;
219   }
220
221   case PN_CONNECTION_REMOTE_CLOSE: {
222     check_condition(event,
223                     pn_session_remote_condition(pn_event_session(event)));
224     pn_connection_close(pn_event_connection(event));
225     break;
226   }
227
228   case PN_SESSION_REMOTE_CLOSE: {
229     check_condition(event,
230                     pn_session_remote_condition(pn_event_session(event)));
231     pn_connection_close(pn_event_connection(event));
232     break;
233   }
234
235   case PN_LINK_REMOTE_CLOSE:
236   case PN_LINK_REMOTE_DETACH: {
237     check_condition(event, pn_link_remote_condition(pn_event_link(event)));
238     pn_connection_close(pn_event_connection(event));
239     break;
240   }
241
242   case PN_PROACTOR_INACTIVE: {
243     return false;
244   }
245
246   default:
247     break;
248   }
249   return true;
250 } /* }}} bool handle */
251
252 static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
253 {
254   char addr[PN_MAX_ADDR];
255   cd_message_t *cdm;
256
257   /* setup proactor */
258   proactor = pn_proactor();
259   pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
260
261   while (!stopping) {
262     /* make connection */
263     conn = pn_connection();
264     if (transport->user != NULL) {
265       pn_connection_set_user(conn, transport->user);
266       pn_connection_set_password(conn, transport->password);
267     }
268     pn_proactor_connect(proactor, conn, addr);
269
270     bool engine_running = true;
271     while (engine_running && !stopping) {
272       pn_event_batch_t *events = pn_proactor_wait(proactor);
273       pn_event_t *e;
274       while ((e = pn_event_batch_next(events))) {
275         engine_running = handle(e);
276         if (!engine_running) {
277           break;
278         }
279       }
280       pn_proactor_done(proactor, events);
281     }
282
283     pn_proactor_release_connection(conn);
284
285     DEBUG("amqp1 plugin: retrying connection");
286     int delay = transport->retry_delay;
287     while (delay-- > 0 && !stopping) {
288       sleep(1.0);
289     }
290   }
291
292   pn_proactor_disconnect(proactor, NULL);
293
294   /* Free the remaining out_messages */
295   cdm = DEQ_HEAD(out_messages);
296   while (cdm) {
297     DEQ_REMOVE_HEAD(out_messages);
298     cd_message_free(cdm);
299     cdm = DEQ_HEAD(out_messages);
300   }
301
302   event_thread_running = false;
303
304   return NULL;
305 } /* }}} void event_thread */
306
307 static int encqueue(cd_message_t *cdm,
308                     amqp1_config_instance_t *instance) /* {{{ */
309 {
310   /* encode message */
311   pn_message_t *message = pn_message();
312   pn_message_set_address(message, instance->send_to);
313   pn_data_t *body = pn_message_body(message);
314   pn_data_clear(body);
315   pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
316   pn_data_exit(body);
317
318   /* put_binary copies and stores so ok to use mbuf */
319   cdm->mbuf.size = BUFSIZE;
320
321   int status;
322   char *start;
323   while ((status = pn_message_encode(message, cdm->mbuf.start,
324                                      &cdm->mbuf.size)) == PN_OVERFLOW) {
325     DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
326     cdm->mbuf.size *= 2;
327     start = realloc(cdm->mbuf.start, cdm->mbuf.size);
328     if (start == NULL) {
329       status = -1;
330       break;
331     } else {
332       cdm->mbuf.start = start;
333     }
334   }
335
336   if (status != 0) {
337     ERROR("amqp1 plugin: error encoding message: %s",
338           pn_error_text(pn_message_error(message)));
339     pn_message_free(message);
340     return -1;
341   }
342
343   pthread_mutex_lock(&send_lock);
344   DEQ_INSERT_TAIL(out_messages, cdm);
345   pthread_mutex_unlock(&send_lock);
346
347   pn_message_free(message);
348
349   /* activate the sender */
350   if (conn) {
351     pn_connection_wake(conn);
352   }
353
354   return 0;
355 } /* }}} int encqueue */
356
357 static int amqp1_notify(notification_t const *n,
358                         user_data_t *user_data) /* {{{ */
359 {
360   int status = 0;
361   size_t bfree = BUFSIZE;
362   size_t bfill = 0;
363   size_t bufsize = BUFSIZE;
364
365   if (n == NULL || user_data == NULL)
366     return EINVAL;
367
368   amqp1_config_instance_t *instance = user_data->data;
369
370   if (instance->notify != true) {
371     ERROR("amqp1 plugin: write notification failed");
372   }
373
374   cd_message_t *cdm = malloc(sizeof(*cdm));
375   if (cdm == NULL) {
376     ERROR("amqp1 plugin: notify failed");
377     return -1;
378   }
379
380   DEQ_ITEM_INIT(cdm);
381   char *start = malloc(bufsize);
382   if (start == NULL) {
383     ERROR("amqp1 plugin: malloc failed");
384     free(cdm);
385     return -1;
386   }
387   cdm->mbuf.size = bufsize;
388   cdm->mbuf.start = start;
389   cdm->instance = instance;
390
391   switch (instance->format) {
392   case AMQP1_FORMAT_JSON:
393     format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
394     status = format_json_notification(cdm->mbuf.start, bufsize, n);
395     if (status != 0) {
396       ERROR("amqp1 plugin: formatting notification failed");
397       cd_message_free(cdm);
398       return status;
399     }
400     cdm->mbuf.size = strlen(cdm->mbuf.start);
401     if (cdm->mbuf.size >= BUFSIZE) {
402       ERROR("amqp1 plugin: notify format json failed");
403       cd_message_free(cdm);
404       return -1;
405     }
406     break;
407   default:
408     ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
409     cd_message_free(cdm);
410     return -1;
411   }
412
413   /* encode message and place on outbound queue */
414   status = encqueue(cdm, instance);
415   if (status != 0) {
416     ERROR("amqp1 plugin: notify enqueue failed");
417     cd_message_free(cdm);
418   }
419   return status;
420
421 } /* }}} int amqp1_notify */
422
423 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
424                        user_data_t *user_data) {
425   int status = 0;
426   size_t bfree = BUFSIZE;
427   size_t bfill = 0;
428   size_t bufsize = BUFSIZE;
429
430   if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL)
431     return EINVAL;
432
433   amqp1_config_instance_t *instance = user_data->data;
434
435   if (instance->notify != false) {
436     ERROR("amqp1 plugin: write failed");
437   }
438
439   cd_message_t *cdm = malloc(sizeof(*cdm));
440   if (cdm == NULL) {
441     ERROR("amqp1 plugin: malloc failed.");
442     return -1;
443   }
444   DEQ_ITEM_INIT(cdm);
445   char *start = malloc(bufsize);
446   if (start == NULL) {
447     ERROR("amqp1 plugin: malloc failed.");
448     free(cdm);
449     return -1;
450   }
451   cdm->mbuf.size = bufsize;
452   cdm->mbuf.start = start;
453   cdm->instance = instance;
454
455   switch (instance->format) {
456   case AMQP1_FORMAT_COMMAND:
457     status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
458     if (status != 0) {
459       ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
460       cd_message_free(cdm);
461       return status;
462     }
463     cdm->mbuf.size = strlen(cdm->mbuf.start);
464     if (cdm->mbuf.size >= BUFSIZE) {
465       ERROR("amqp1 plugin: format cmd failed");
466       cd_message_free(cdm);
467       return -1;
468     }
469     break;
470   case AMQP1_FORMAT_JSON:
471     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
472     format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
473                            instance->store_rates);
474     status = format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
475     if (status != 0) {
476       ERROR("amqp1 plugin: format_json_finalize failed with status %i.",
477             status);
478       cd_message_free(cdm);
479       return status;
480     }
481     cdm->mbuf.size = strlen(cdm->mbuf.start);
482     if (cdm->mbuf.size >= BUFSIZE) {
483       ERROR("amqp1 plugin: format json failed");
484       cd_message_free(cdm);
485       return -1;
486     }
487     break;
488   case AMQP1_FORMAT_GRAPHITE:
489     status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
490                              instance->prefix, instance->postfix,
491                              instance->escape_char, instance->graphite_flags);
492     if (status != 0) {
493       ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
494       cd_message_free(cdm);
495       return status;
496     }
497     cdm->mbuf.size = strlen(cdm->mbuf.start);
498     if (cdm->mbuf.size >= BUFSIZE) {
499       ERROR("amqp1 plugin: format graphite failed");
500       cd_message_free(cdm);
501       return -1;
502     }
503     break;
504   default:
505     ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
506     cd_message_free(cdm);
507     return -1;
508   }
509
510   /* encode message and place on outbound queue */
511   status = encqueue(cdm, instance);
512   if (status != 0) {
513     ERROR("amqp1 plugin: write enqueue failed");
514     cd_message_free(cdm);
515   }
516   return status;
517
518 } /* }}} int amqp1_write */
519
520 static void amqp1_config_transport_free(void *ptr) /* {{{ */
521 {
522   amqp1_config_transport_t *transport = ptr;
523
524   if (transport == NULL)
525     return;
526
527   sfree(transport->name);
528   sfree(transport->host);
529   sfree(transport->port);
530   sfree(transport->user);
531   sfree(transport->password);
532   sfree(transport->address);
533
534   sfree(transport);
535 } /* }}} void amqp1_config_transport_free */
536
537 static void amqp1_config_instance_free(void *ptr) /* {{{ */
538 {
539   amqp1_config_instance_t *instance = ptr;
540
541   if (instance == NULL)
542     return;
543
544   sfree(instance->name);
545   sfree(instance->prefix);
546   sfree(instance->postfix);
547
548   sfree(instance);
549 } /* }}} void amqp1_config_instance_free */
550
551 static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
552 {
553   amqp1_config_instance_t *instance = calloc(1, sizeof(*instance));
554   if (instance == NULL) {
555     ERROR("amqp1 plugin: calloc failed.");
556     return ENOMEM;
557   }
558
559   int status = cf_util_get_string(ci, &instance->name);
560   if (status != 0) {
561     sfree(instance);
562     return status;
563   }
564
565   for (int i = 0; i < ci->children_num; i++) {
566     oconfig_item_t *child = ci->children + i;
567
568     if (strcasecmp("PreSettle", child->key) == 0)
569       status = cf_util_get_boolean(child, &instance->pre_settle);
570     else if (strcasecmp("Notify", child->key) == 0)
571       status = cf_util_get_boolean(child, &instance->notify);
572     else if (strcasecmp("Format", child->key) == 0) {
573       char *key = NULL;
574       status = cf_util_get_string(child, &key);
575       if (status != 0)
576         return status;
577       assert(key != NULL);
578       if (strcasecmp(key, "Command") == 0) {
579         instance->format = AMQP1_FORMAT_COMMAND;
580       } else if (strcasecmp(key, "Graphite") == 0) {
581         instance->format = AMQP1_FORMAT_GRAPHITE;
582       } else if (strcasecmp(key, "JSON") == 0) {
583         instance->format = AMQP1_FORMAT_JSON;
584       } else {
585         WARNING("amqp1 plugin: Invalid format string: %s", key);
586       }
587       sfree(key);
588     } else if (strcasecmp("StoreRates", child->key) == 0)
589       status = cf_util_get_boolean(child, &instance->store_rates);
590     else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
591       status = cf_util_get_flag(child, &instance->graphite_flags,
592                                 GRAPHITE_SEPARATE_INSTANCES);
593     else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0)
594       status = cf_util_get_flag(child, &instance->graphite_flags,
595                                 GRAPHITE_ALWAYS_APPEND_DS);
596     else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0)
597       status = cf_util_get_flag(child, &instance->graphite_flags,
598                                 GRAPHITE_PRESERVE_SEPARATOR);
599     else if (strcasecmp("GraphitePrefix", child->key) == 0)
600       status = cf_util_get_string(child, &instance->prefix);
601     else if (strcasecmp("GraphitePostfix", child->key) == 0)
602       status = cf_util_get_string(child, &instance->postfix);
603     else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
604       char *tmp_buff = NULL;
605       status = cf_util_get_string(child, &tmp_buff);
606       if (status == 0) {
607         if (strlen(tmp_buff) > 1)
608           WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
609                   "only one character. Others will be ignored.");
610         instance->escape_char = tmp_buff[0];
611       }
612       sfree(tmp_buff);
613     } else
614       WARNING("amqp1 plugin: Ignoring unknown "
615               "instance configuration option "
616               "\"%s\".",
617               child->key);
618     if (status != 0)
619       break;
620   }
621
622   if (status != 0) {
623     amqp1_config_instance_free(instance);
624     return status;
625   } else {
626     char tpname[DATA_MAX_NAME_LEN];
627     status = ssnprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
628     if ((status < 0) || (size_t)status >= sizeof(tpname)) {
629       ERROR("amqp1 plugin: Instance name would have been truncated.");
630       return -1;
631     }
632     status = ssnprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
633                        transport->address, instance->name);
634     if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
635       ERROR("amqp1 plugin: send_to address would have been truncated.");
636       return -1;
637     }
638     if (instance->notify) {
639       status = plugin_register_notification(
640           tpname, amqp1_notify,
641           &(user_data_t){
642               .data = instance,
643               .free_func = amqp1_config_instance_free,
644           });
645     } else {
646       status =
647           plugin_register_write(tpname, amqp1_write,
648                                 &(user_data_t){
649                                     .data = instance,
650                                     .free_func = amqp1_config_instance_free,
651                                 });
652     }
653
654     if (status != 0) {
655       amqp1_config_instance_free(instance);
656     }
657   }
658
659   return status;
660 } /* }}} int amqp1_config_instance */
661
662 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
663 {
664   transport = calloc(1, sizeof(*transport));
665   if (transport == NULL) {
666     ERROR("amqp1 plugin: calloc failed.");
667     return ENOMEM;
668   }
669
670   /* Initialize transport configuration {{{ */
671   transport->retry_delay = 1;
672
673   int status = cf_util_get_string(ci, &transport->name);
674   if (status != 0) {
675     sfree(transport);
676     return status;
677   }
678
679   for (int i = 0; i < ci->children_num; i++) {
680     oconfig_item_t *child = ci->children + i;
681
682     if (strcasecmp("Host", child->key) == 0)
683       status = cf_util_get_string(child, &transport->host);
684     else if (strcasecmp("Port", child->key) == 0)
685       status = cf_util_get_string(child, &transport->port);
686     else if (strcasecmp("User", child->key) == 0)
687       status = cf_util_get_string(child, &transport->user);
688     else if (strcasecmp("Password", child->key) == 0)
689       status = cf_util_get_string(child, &transport->password);
690     else if (strcasecmp("Address", child->key) == 0)
691       status = cf_util_get_string(child, &transport->address);
692     else if (strcasecmp("RetryDelay", child->key) == 0)
693       status = cf_util_get_int(child, &transport->retry_delay);
694     else if (strcasecmp("Instance", child->key) == 0)
695       amqp1_config_instance(child);
696     else
697       WARNING("amqp1 plugin: Ignoring unknown "
698               "transport configuration option "
699               "\"%s\".",
700               child->key);
701
702     if (status != 0)
703       break;
704   }
705
706   if (status != 0) {
707     amqp1_config_transport_free(transport);
708   }
709   return status;
710 } /* }}} int amqp1_config_transport */
711
712 static int amqp1_config(oconfig_item_t *ci) /* {{{ */
713 {
714
715   for (int i = 0; i < ci->children_num; i++) {
716     oconfig_item_t *child = ci->children + i;
717
718     if (strcasecmp("Transport", child->key) == 0)
719       amqp1_config_transport(child);
720     else
721       WARNING("amqp1 plugin: Ignoring unknown config option \"%s\".",
722               child->key);
723   }
724
725   return 0;
726 } /* }}} int amqp1_config */
727
728 static int amqp1_init(void) /* {{{ */
729 {
730   if (transport == NULL) {
731     ERROR("amqp1: init failed, no transport configured");
732     return -1;
733   }
734
735   if (proactor == NULL) {
736     pthread_mutex_init(&send_lock, /* attr = */ NULL);
737     /* start_thread */
738     int status =
739         plugin_thread_create(&event_thread_id, NULL /* no attributes */,
740                              event_thread, NULL /* no argument */, "handle");
741     if (status != 0) {
742       ERROR("amqp1 plugin: pthread_create failed: %s", STRERRNO);
743     } else {
744       event_thread_running = true;
745     }
746   }
747   return 0;
748 } /* }}} int amqp1_init */
749
750 static int amqp1_shutdown(void) /* {{{ */
751 {
752   stopping = true;
753
754   /* Stop the proactor thread */
755   if (event_thread_running) {
756     DEBUG("amqp1 plugin: Shutting down proactor thread.");
757     pn_connection_wake(conn);
758   }
759   pthread_join(event_thread_id, NULL /* no return value */);
760   memset(&event_thread_id, 0, sizeof(event_thread_id));
761
762   DEBUG("amqp1 plugin: proactor thread exited.");
763
764   if (transport) {
765     amqp1_config_transport_free(transport);
766   }
767
768   return 0;
769 } /* }}} int amqp1_shutdown */
770
771 void module_register(void) {
772   plugin_register_complex_config("amqp1", amqp1_config);
773   plugin_register_init("amqp1", amqp1_init);
774   plugin_register_shutdown("amqp1", amqp1_shutdown);
775 } /* void module_register */