Add connection retry
[collectd.git] / src / amqp1.c
1 /**
2  * collectd - src/amqp1.c
3  * Copyright(c) 2017 Red Hat Inc.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Andy Smith <ansmith@redhat.com>
25  */
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_cmd_putval.h"
32 #include "utils_deq.h"
33 #include "utils_format_graphite.h"
34 #include "utils_format_json.h"
35 #include "utils_random.h"
36
37 #include <proton/condition.h>
38 #include <proton/connection.h>
39 #include <proton/delivery.h>
40 #include <proton/link.h>
41 #include <proton/message.h>
42 #include <proton/proactor.h>
43 #include <proton/sasl.h>
44 #include <proton/session.h>
45 #include <proton/transport.h>
46
47 #include <errno.h>
48 #include <stdint.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51
52 #define BUFSIZE 8192
53 #define AMQP1_FORMAT_JSON 0
54 #define AMQP1_FORMAT_COMMAND 1
55 #define AMQP1_FORMAT_GRAPHITE 2
56
57 typedef struct amqp1_config_transport_t {
58   DEQ_LINKS(struct amqp1_config_transport_t);
59   char *name;
60   char *host;
61   char *port;
62   char *user;
63   char *password;
64   char *address;
65   int  retry_delay;
66 } amqp1_config_transport_t;
67
68 typedef struct amqp1_config_instance_t {
69   DEQ_LINKS(struct amqp1_config_instance_t);
70   char *name;
71   _Bool notify;
72   uint8_t format;
73   unsigned int graphite_flags;
74   _Bool store_rates;
75   char *prefix;
76   char *postfix;
77   char escape_char;
78   _Bool pre_settle;
79   char send_to[128];
80 } amqp1_config_instance_t;
81
82 DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
83
84 typedef struct cd_message_t {
85   DEQ_LINKS(struct cd_message_t);
86   pn_bytes_t mbuf;
87   amqp1_config_instance_t *instance;
88 } cd_message_t;
89
90 DEQ_DECLARE(cd_message_t, cd_message_list_t);
91
92 /*
93  * Globals
94  */
95 pn_connection_t *conn = NULL;
96 pn_link_t *sender = NULL;
97 pn_proactor_t *proactor = NULL;
98 pthread_mutex_t send_lock;
99 cd_message_list_t out_messages;
100 uint64_t cd_tag = 1;
101 uint64_t acknowledged = 0;
102 amqp1_config_transport_t *transport = NULL;
103
104 static bool stopping = false;
105 static int event_thread_running = 0;
106 static pthread_t event_thread_id;
107
108 /*
109  * Functions
110  */
111 static void cd_message_free(cd_message_t *cdm) {
112   if (cdm->mbuf.start) {
113     free((void *)cdm->mbuf.start);
114   }
115   free(cdm);
116 } /* }}} void cd_message_free */
117
118 static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
119 {
120   uint64_t dtag;
121   cd_message_list_t to_send;
122   cd_message_t *cdm;
123   int link_credit = pn_link_credit(link);
124   int event_count = 0;
125   pn_delivery_t *dlv;
126
127   if (stopping){
128     return 0;
129   }
130
131   DEQ_INIT(to_send);
132
133   pthread_mutex_lock(&send_lock);
134
135   if (link_credit > 0) {
136     dtag = cd_tag;
137     cdm = DEQ_HEAD(out_messages);
138     while (cdm) {
139       DEQ_REMOVE_HEAD(out_messages);
140       DEQ_INSERT_TAIL(to_send, cdm);
141       if (DEQ_SIZE(to_send) == link_credit)
142         break;
143       cdm = DEQ_HEAD(out_messages);
144     }
145     cd_tag += DEQ_SIZE(to_send);
146   }
147
148   pthread_mutex_unlock(&send_lock);
149
150   /* message is already formatted and encoded */
151   cdm = DEQ_HEAD(to_send);
152   while (cdm) {
153     DEQ_REMOVE_HEAD(to_send);
154     dtag++;
155     dlv = pn_delivery(link, pn_dtag((const char *)&dtag, sizeof(dtag)));
156     pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
157     pn_link_advance(link);
158     if (cdm->instance->pre_settle == true) {
159       pn_delivery_settle(dlv);
160     }
161     event_count++;
162     cd_message_free(cdm);
163     cdm = DEQ_HEAD(to_send);
164   }
165
166   return event_count;
167 } /* }}} int amqp1_send_out_messages */
168
169
170 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
171 {
172   if (pn_condition_is_set(cond)) {
173     ERROR("amqp1 plugin: %s: %s: %s", pn_event_type_name(pn_event_type(e)),
174           pn_condition_get_name(cond), pn_condition_get_description(cond));
175     pn_connection_close(pn_event_connection(e));
176     conn = NULL;
177   }
178 } /* }}} void check_condition */
179
180 static bool handle(pn_event_t *event) /* {{{ */
181 {
182
183   switch (pn_event_type(event)) {
184
185   case PN_CONNECTION_INIT: {
186     conn = pn_event_connection(event);
187     pn_connection_set_container(conn, transport->address);
188     pn_connection_open(conn);
189     pn_session_t *ssn = pn_session(conn);
190     pn_session_open(ssn);
191     sender = pn_sender(ssn, "cd-sender");
192     pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
193     pn_link_open(sender);
194     break;
195   }
196
197   case PN_LINK_FLOW: {
198     /* peer has given us credit, send outbound messages */
199     amqp1_send_out_messages(sender);
200     break;
201   }
202
203   case PN_DELIVERY: {
204     /* acknowledgement from peer that a message was delivered */
205     pn_delivery_t *dlv = pn_event_delivery(event);
206     if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) {
207       pn_delivery_settle(dlv);
208       acknowledged++;
209     }
210     break;
211   }
212
213   case PN_CONNECTION_WAKE: {
214     if (!stopping) {
215       amqp1_send_out_messages(sender);
216     }
217     break;
218   }
219
220   case PN_TRANSPORT_CLOSED: {
221     check_condition(event, pn_transport_condition(pn_event_transport(event)));
222     break;
223   }
224
225   case PN_CONNECTION_REMOTE_CLOSE: {
226     check_condition(event,
227                     pn_session_remote_condition(pn_event_session(event)));
228     pn_connection_close(pn_event_connection(event));
229     break;
230   }
231
232   case PN_SESSION_REMOTE_CLOSE: {
233     check_condition(event,
234                     pn_session_remote_condition(pn_event_session(event)));
235     pn_connection_close(pn_event_connection(event));
236     break;
237   }
238
239   case PN_LINK_REMOTE_CLOSE:
240   case PN_LINK_REMOTE_DETACH: {
241     check_condition(event, pn_link_remote_condition(pn_event_link(event)));
242     pn_connection_close(pn_event_connection(event));
243     break;
244   }
245
246   case PN_PROACTOR_INACTIVE: {
247     return false;
248   }
249
250   default:
251     break;
252   }
253   return true;
254 } /* }}} bool handle */
255
256 static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
257 {
258   char addr[PN_MAX_ADDR];
259   cd_message_t *cdm;
260
261   /* setup proactor */
262   proactor = pn_proactor();
263   pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
264
265   while (!stopping) {
266     /* make connection */
267     conn = pn_connection();
268     if (transport->user != NULL) {
269       pn_connection_set_user(conn, transport->user);
270       pn_connection_set_password(conn, transport->password);
271     }
272     pn_proactor_connect(proactor, conn, addr);
273
274     bool engine_running = true;
275     while (engine_running && !stopping) {
276       pn_event_batch_t *events = pn_proactor_wait(proactor);
277       pn_event_t *e;
278       while (( e = pn_event_batch_next(events))){
279         engine_running = handle(e);
280         if (!engine_running) {
281           break;
282         }
283       }
284       pn_proactor_done(proactor, events);
285     }
286
287     pn_proactor_release_connection(conn);
288
289     DEBUG("amqp1 plugin: retrying connection");
290     int delay = transport->retry_delay;
291     while (delay-- > 0 && !stopping) {
292       sleep(1.0);
293     }
294   }
295
296   pn_proactor_disconnect(proactor, NULL);
297
298   /* Free the remaining out_messages */
299   cdm = DEQ_HEAD(out_messages);
300   while (cdm) {
301     DEQ_REMOVE_HEAD(out_messages);
302     cd_message_free(cdm);
303     cdm = DEQ_HEAD(out_messages);
304   }
305
306   event_thread_running = 0;
307
308   return NULL;
309 } /* }}} void event_thread */
310
311 static void encqueue(cd_message_t *cdm,
312                      amqp1_config_instance_t *instance) /* {{{ */
313 {
314   size_t bufsize = BUFSIZE;
315   pn_data_t *body;
316   pn_message_t *message;
317
318   /* encode message */
319   message = pn_message();
320   pn_message_set_address(message, instance->send_to);
321   body = pn_message_body(message);
322   pn_data_clear(body);
323   pn_data_put_binary(body, cdm->mbuf);
324   pn_data_exit(body);
325
326   /* put_binary copies and stores so ok to use mbuf */
327   cdm->mbuf.size = bufsize;
328   pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
329
330   pthread_mutex_lock(&send_lock);
331   DEQ_INSERT_TAIL(out_messages, cdm);
332   pthread_mutex_unlock(&send_lock);
333
334   pn_message_free(message);
335
336   /* activate the sender */
337   if (conn != NULL) {
338     pn_connection_wake(conn);
339   }
340
341 } /* }}} void encqueue */
342
343 static int amqp1_notify(notification_t const *n,
344                         user_data_t *user_data) /* {{{ */
345 {
346   amqp1_config_instance_t *instance;
347   int status = 0;
348   size_t bfree = BUFSIZE;
349   size_t bfill = 0;
350   cd_message_t *cdm;
351   size_t bufsize = BUFSIZE;
352
353   if ((n == NULL) || (user_data == NULL))
354     return EINVAL;
355
356   instance = user_data->data;
357
358   if (instance->notify != true) {
359     ERROR("amqp1 plugin: write notification failed");
360   }
361
362   cdm = NEW(cd_message_t);
363   DEQ_ITEM_INIT(cdm);
364   cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
365   cdm->instance = instance;
366
367   switch (instance->format) {
368   case AMQP1_FORMAT_JSON:
369     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
370     status = format_json_notification((char *)cdm->mbuf.start, bufsize, n);
371     if (status != 0) {
372       ERROR("amqp1 plugin: formatting notification failed");
373       return status;
374     }
375     cdm->mbuf.size = strlen(cdm->mbuf.start);
376     break;
377   default:
378     ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
379     return -1;
380   }
381
382   /* encode message and place on outbound queue */
383   encqueue(cdm, instance);
384
385   return 0;
386 } /* }}} int amqp1_notify */
387
388 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
389                        user_data_t *user_data) {
390   amqp1_config_instance_t *instance;
391   int status = 0;
392   size_t bfree = BUFSIZE;
393   size_t bfill = 0;
394   cd_message_t *cdm;
395   size_t bufsize = BUFSIZE;
396
397   if ((ds == NULL) || (vl == NULL) || (transport == NULL) ||
398       (user_data == NULL))
399     return EINVAL;
400
401   instance = user_data->data;
402
403   if (instance->notify != false) {
404     ERROR("amqp1 plugin: write failed");
405   }
406
407   cdm = NEW(cd_message_t);
408   DEQ_ITEM_INIT(cdm);
409   cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
410   cdm->instance = instance;
411
412   switch (instance->format) {
413   case AMQP1_FORMAT_COMMAND:
414     status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
415     if (status != 0) {
416       ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
417       return status;
418     }
419     cdm->mbuf.size = strlen(cdm->mbuf.start);
420     break;
421   case AMQP1_FORMAT_JSON:
422     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
423     format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
424                            instance->store_rates);
425     format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
426     cdm->mbuf.size = strlen(cdm->mbuf.start);
427     break;
428   case AMQP1_FORMAT_GRAPHITE:
429     status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
430                              instance->prefix, instance->postfix,
431                              instance->escape_char, instance->graphite_flags);
432     if (status != 0) {
433       ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
434       return status;
435     }
436     cdm->mbuf.size = strlen(cdm->mbuf.start);
437     break;
438   default:
439     ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
440     return -1;
441   }
442
443   /* encode message and place on outboud queue */
444   encqueue(cdm, instance);
445
446   return 0;
447 } /* }}} int amqp1_write */
448
449 static void amqp1_config_transport_free(void *ptr) /* {{{ */
450 {
451   amqp1_config_transport_t *transport = ptr;
452
453   if (transport == NULL)
454     return;
455
456   sfree(transport->name);
457   sfree(transport->host);
458   sfree(transport->user);
459   sfree(transport->password);
460   sfree(transport->address);
461
462   sfree(transport);
463 } /* }}} void amqp1_config_transport_free */
464
465 static void amqp1_config_instance_free(void *ptr) /* {{{ */
466 {
467   amqp1_config_instance_t *instance = ptr;
468
469   if (instance == NULL)
470     return;
471
472   sfree(instance->name);
473   sfree(instance->prefix);
474   sfree(instance->postfix);
475
476   sfree(instance);
477 } /* }}} void amqp1_config_instance_free */
478
479 static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
480 {
481   int status = 0;
482   char *key = NULL;
483   amqp1_config_instance_t *instance;
484
485   instance = calloc(1, sizeof(*instance));
486   if (instance == NULL) {
487     ERROR("amqp1 plugin: calloc failed.");
488     return ENOMEM;
489   }
490
491   /* Initialize instance configuration {{{ */
492   instance->name = NULL;
493
494   status = cf_util_get_string(ci, &instance->name);
495   if (status != 0) {
496     sfree(instance);
497     return status;
498   }
499
500   for (int i = 0; i < ci->children_num; i++) {
501     oconfig_item_t *child = ci->children + i;
502
503     if (strcasecmp("PreSettle", child->key) == 0)
504       status = cf_util_get_boolean(child, &instance->pre_settle);
505     else if (strcasecmp("Notify", child->key) == 0)
506       status = cf_util_get_boolean(child, &instance->notify);
507     else if (strcasecmp("Format", child->key) == 0) {
508       status = cf_util_get_string(child, &key);
509       if (status != 0)
510         return status;
511       /* TODO: goto errout */
512       //          goto errout;
513       assert(key != NULL);
514       if (strcasecmp(key, "Command") == 0) {
515         instance->format = AMQP1_FORMAT_COMMAND;
516       } else if (strcasecmp(key, "Graphite") == 0) {
517         instance->format = AMQP1_FORMAT_GRAPHITE;
518       } else if (strcasecmp(key, "JSON") == 0) {
519         instance->format = AMQP1_FORMAT_JSON;
520       } else {
521         WARNING("amqp1 plugin: Invalid format string: %s", key);
522       }
523       sfree(key);
524     } else if (strcasecmp("StoreRates", child->key) == 0)
525       status = cf_util_get_boolean(child, &instance->store_rates);
526     else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
527       status = cf_util_get_flag(child, &instance->graphite_flags,
528                                 GRAPHITE_SEPARATE_INSTANCES);
529     else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0)
530       status = cf_util_get_flag(child, &instance->graphite_flags,
531                                 GRAPHITE_ALWAYS_APPEND_DS);
532     else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0)
533       status = cf_util_get_flag(child, &instance->graphite_flags,
534                                 GRAPHITE_PRESERVE_SEPARATOR);
535     else if (strcasecmp("GraphitePrefix", child->key) == 0)
536       status = cf_util_get_string(child, &instance->prefix);
537     else if (strcasecmp("GraphitePostfix", child->key) == 0)
538       status = cf_util_get_string(child, &instance->postfix);
539     else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
540       char *tmp_buff = NULL;
541       status = cf_util_get_string(child, &tmp_buff);
542       if (strlen(tmp_buff) > 1)
543         WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
544                 "only one character. Others will be ignored.");
545       instance->escape_char = tmp_buff[0];
546       sfree(tmp_buff);
547     } else
548       WARNING("amqp1 plugin: Ignoring unknown "
549               "instance configuration option "
550               "\%s\".",
551               child->key);
552     if (status != 0)
553       break;
554   }
555
556   if (status != 0) {
557     amqp1_config_instance_free(instance);
558     return status;
559   } else {
560     char tpname[128];
561     snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
562     snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
563              transport->address, instance->name);
564     if (instance->notify == true) {
565       status = plugin_register_notification(
566           tpname, amqp1_notify,
567           &(user_data_t){
568               .data = instance, .free_func = amqp1_config_instance_free,
569           });
570     } else {
571       status = plugin_register_write(
572           tpname, amqp1_write,
573           &(user_data_t){
574               .data = instance, .free_func = amqp1_config_instance_free,
575           });
576     }
577
578     if (status != 0) {
579       amqp1_config_instance_free(instance);
580     }
581   }
582
583   return status;
584 } /* }}} int amqp1_config_instance */
585
586 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
587 {
588   int status = 0;
589
590   transport = calloc(1, sizeof(*transport));
591   if (transport == NULL) {
592     ERROR("amqp1 plugin: calloc failed.");
593     return ENOMEM;
594   }
595
596   /* Initialize transport configuration {{{ */
597   transport->name = NULL;
598   transport->retry_delay = 1;
599
600   status = cf_util_get_string(ci, &transport->name);
601   if (status != 0) {
602     sfree(transport);
603     return status;
604   }
605
606   for (int i = 0; i < ci->children_num; i++) {
607     oconfig_item_t *child = ci->children + i;
608
609     if (strcasecmp("Host", child->key) == 0)
610       status = cf_util_get_string(child, &transport->host);
611     else if (strcasecmp("Port", child->key) == 0)
612       status = cf_util_get_string(child, &transport->port);
613     else if (strcasecmp("User", child->key) == 0)
614       status = cf_util_get_string(child, &transport->user);
615     else if (strcasecmp("Password", child->key) == 0)
616       status = cf_util_get_string(child, &transport->password);
617     else if (strcasecmp("Address", child->key) == 0)
618       status = cf_util_get_string(child, &transport->address);
619     else if (strcasecmp("RetryDelay", child->key) == 0)
620       status = cf_util_get_int(child, &transport->retry_delay);
621     else if (strcasecmp("Instance", child->key) == 0)
622       amqp1_config_instance(child);
623     else
624       WARNING("amqp1 plugin: Ignoring unknown "
625               "transport configuration option "
626               "\%s\".",
627               child->key);
628
629     if (status != 0)
630       break;
631   }
632
633   if (status != 0) {
634     amqp1_config_transport_free(transport);
635   }
636   return status;
637 } /* }}} int amqp1_config_transport */
638
639 static int amqp1_config(oconfig_item_t *ci) /* {{{ */
640 {
641
642   for (int i = 0; i < ci->children_num; i++) {
643     oconfig_item_t *child = ci->children + i;
644
645     if (strcasecmp("Transport", child->key) == 0)
646       amqp1_config_transport(child);
647     else
648       WARNING("amqp1 plugin: Ignoring unknown config iption \%s\".",
649               child->key);
650   }
651
652   return 0;
653 } /* }}} int amqp1_config */
654
655 static int amqp1_init(void) /* {{{ */
656 {
657   int status;
658   char errbuf[1024];
659
660   if (transport == NULL) {
661     ERROR("amqp1: init failed, no transport configured");
662     return -1;
663   }
664
665   if (proactor == NULL) {
666     pthread_mutex_init(&send_lock, /* attr = */ NULL);
667     /* start_thread */
668     status =
669         plugin_thread_create(&event_thread_id, NULL /* no attributes */,
670                              event_thread, NULL /* no argument */, "handle");
671     if (status != 0) {
672       ERROR("amqp1 plugin: pthread_create failed: %s",
673             sstrerror(errno, errbuf, sizeof(errbuf)));
674     } else {
675       event_thread_running = 1;
676     }
677   }
678   return 0;
679 } /* }}} int amqp1_init */
680
681 static int amqp1_shutdown(void) /* {{{ */
682 {
683   stopping = true;
684
685   /* Stop the proactor thread */
686   if (event_thread_running == 1) {
687     DEBUG("amqp1 plugin: Shutting down proactor thread.");
688     pn_connection_wake(conn);
689   }
690   pthread_join(event_thread_id, NULL /* no return value */);
691   memset(&event_thread_id, 0, sizeof(event_thread_id));
692
693   DEBUG("amqp1 plugin: proactor thread exited.");
694
695   if (transport != NULL) {
696     amqp1_config_transport_free(transport);
697   }
698
699   return 0;
700 } /* }}} int amqp1_shutdown */
701
702 void module_register(void) {
703   plugin_register_complex_config("amqp1", amqp1_config);
704   plugin_register_init("amqp1", amqp1_init);
705   plugin_register_shutdown("amqp1", amqp1_shutdown);
706 } /* void module_register */