7cbdb5fd1eed5cd09da4be5c0f7d0379a24c1740
[collectd.git] / src / amqp1.c
1 /**
2  * collectd - src/amqp1.c
3  * Copyright(c) 2017 Red Hat Inc.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Andy Smith <ansmith@redhat.com>
25  */
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_cmd_putval.h"
32 #include "utils_deq.h"
33 #include "utils_format_graphite.h"
34 #include "utils_format_json.h"
35 #include "utils_random.h"
36
37 #include <proton/condition.h>
38 #include <proton/connection.h>
39 #include <proton/delivery.h>
40 #include <proton/link.h>
41 #include <proton/message.h>
42 #include <proton/proactor.h>
43 #include <proton/sasl.h>
44 #include <proton/session.h>
45 #include <proton/transport.h>
46
47 #include <errno.h>
48 #include <stdint.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51
52 #define BUFSIZE 8192
53 #define AMQP1_FORMAT_JSON 0
54 #define AMQP1_FORMAT_COMMAND 1
55 #define AMQP1_FORMAT_GRAPHITE 2
56
57 typedef struct amqp1_config_transport_s {
58   DEQ_LINKS(struct amqp1_config_transport_s);
59   char *name;
60   char *host;
61   char *port;
62   char *user;
63   char *password;
64   char *address;
65   int retry_delay;
66 } amqp1_config_transport_t;
67
68 typedef struct amqp1_config_instance_s {
69   DEQ_LINKS(struct amqp1_config_instance_s);
70   char *name;
71   bool notify;
72   uint8_t format;
73   unsigned int graphite_flags;
74   bool store_rates;
75   char *prefix;
76   char *postfix;
77   char escape_char;
78   bool pre_settle;
79   char send_to[1024];
80 } amqp1_config_instance_t;
81
82 DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
83
84 typedef struct cd_message_s {
85   DEQ_LINKS(struct cd_message_s);
86   pn_rwbytes_t mbuf;
87   amqp1_config_instance_t *instance;
88 } cd_message_t;
89
90 DEQ_DECLARE(cd_message_t, cd_message_list_t);
91
92 /*
93  * Globals
94  */
95 static pn_connection_t *conn;
96 static pn_link_t *sender;
97 static pn_proactor_t *proactor;
98 static pthread_mutex_t send_lock;
99 static cd_message_list_t out_messages;
100 static uint64_t cd_tag = 1;
101 static uint64_t acknowledged;
102 static amqp1_config_transport_t *transport;
103 static bool stopping;
104 static bool event_thread_running;
105 static pthread_t event_thread_id;
106
107 /*
108  * Functions
109  */
110 static void cd_message_free(cd_message_t *cdm) {
111   free(cdm->mbuf.start);
112   free(cdm);
113 } /* }}} void cd_message_free */
114
115 static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
116 {
117   uint64_t dtag;
118   cd_message_list_t to_send;
119   cd_message_t *cdm;
120   int link_credit = pn_link_credit(link);
121   int event_count = 0;
122   pn_delivery_t *dlv;
123
124   if (stopping) {
125     return 0;
126   }
127
128   DEQ_INIT(to_send);
129
130   pthread_mutex_lock(&send_lock);
131
132   if (link_credit > 0) {
133     dtag = cd_tag;
134     cdm = DEQ_HEAD(out_messages);
135     while (cdm) {
136       DEQ_REMOVE_HEAD(out_messages);
137       DEQ_INSERT_TAIL(to_send, cdm);
138       if (DEQ_SIZE(to_send) == link_credit)
139         break;
140       cdm = DEQ_HEAD(out_messages);
141     }
142     cd_tag += DEQ_SIZE(to_send);
143   }
144
145   pthread_mutex_unlock(&send_lock);
146
147   /* message is already formatted and encoded */
148   cdm = DEQ_HEAD(to_send);
149   while (cdm) {
150     DEQ_REMOVE_HEAD(to_send);
151     dtag++;
152     dlv = pn_delivery(link, pn_dtag((const char *)&dtag, sizeof(dtag)));
153     pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
154     pn_link_advance(link);
155     if (cdm->instance->pre_settle == true) {
156       pn_delivery_settle(dlv);
157     }
158     event_count++;
159     cd_message_free(cdm);
160     cdm = DEQ_HEAD(to_send);
161   }
162
163   return event_count;
164 } /* }}} int amqp1_send_out_messages */
165
166 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
167 {
168   if (pn_condition_is_set(cond)) {
169     ERROR("amqp1 plugin: %s: %s: %s", pn_event_type_name(pn_event_type(e)),
170           pn_condition_get_name(cond), pn_condition_get_description(cond));
171     pn_connection_close(pn_event_connection(e));
172     conn = NULL;
173   }
174 } /* }}} void check_condition */
175
176 static bool handle(pn_event_t *event) /* {{{ */
177 {
178
179   switch (pn_event_type(event)) {
180
181   case PN_CONNECTION_INIT: {
182     conn = pn_event_connection(event);
183     pn_connection_set_container(conn, transport->name);
184     pn_connection_open(conn);
185     pn_session_t *ssn = pn_session(conn);
186     pn_session_open(ssn);
187     sender = pn_sender(ssn, "cd-sender");
188     pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
189     pn_link_open(sender);
190     break;
191   }
192
193   case PN_LINK_FLOW: {
194     /* peer has given us credit, send outbound messages */
195     amqp1_send_out_messages(sender);
196     break;
197   }
198
199   case PN_DELIVERY: {
200     /* acknowledgement from peer that a message was delivered */
201     pn_delivery_t *dlv = pn_event_delivery(event);
202     if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) {
203       pn_delivery_settle(dlv);
204       acknowledged++;
205     }
206     break;
207   }
208
209   case PN_CONNECTION_WAKE: {
210     if (!stopping) {
211       amqp1_send_out_messages(sender);
212     }
213     break;
214   }
215
216   case PN_TRANSPORT_CLOSED: {
217     check_condition(event, pn_transport_condition(pn_event_transport(event)));
218     break;
219   }
220
221   case PN_CONNECTION_REMOTE_CLOSE: {
222     check_condition(event,
223                     pn_session_remote_condition(pn_event_session(event)));
224     pn_connection_close(pn_event_connection(event));
225     break;
226   }
227
228   case PN_SESSION_REMOTE_CLOSE: {
229     check_condition(event,
230                     pn_session_remote_condition(pn_event_session(event)));
231     pn_connection_close(pn_event_connection(event));
232     break;
233   }
234
235   case PN_LINK_REMOTE_CLOSE:
236   case PN_LINK_REMOTE_DETACH: {
237     check_condition(event, pn_link_remote_condition(pn_event_link(event)));
238     pn_connection_close(pn_event_connection(event));
239     break;
240   }
241
242   case PN_PROACTOR_INACTIVE: {
243     return false;
244   }
245
246   default:
247     break;
248   }
249   return true;
250 } /* }}} bool handle */
251
252 static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
253 {
254   char addr[PN_MAX_ADDR];
255   cd_message_t *cdm;
256
257   /* setup proactor */
258   proactor = pn_proactor();
259   pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
260
261   while (!stopping) {
262     /* make connection */
263     conn = pn_connection();
264     if (transport->user != NULL) {
265       pn_connection_set_user(conn, transport->user);
266       pn_connection_set_password(conn, transport->password);
267     }
268     pn_proactor_connect(proactor, conn, addr);
269
270     bool engine_running = true;
271     while (engine_running && !stopping) {
272       pn_event_batch_t *events = pn_proactor_wait(proactor);
273       pn_event_t *e;
274       while ((e = pn_event_batch_next(events))) {
275         engine_running = handle(e);
276         if (!engine_running) {
277           break;
278         }
279       }
280       pn_proactor_done(proactor, events);
281     }
282
283     pn_proactor_release_connection(conn);
284
285     DEBUG("amqp1 plugin: retrying connection");
286     int delay = transport->retry_delay;
287     while (delay-- > 0 && !stopping) {
288       sleep(1.0);
289     }
290   }
291
292   pn_proactor_disconnect(proactor, NULL);
293
294   /* Free the remaining out_messages */
295   cdm = DEQ_HEAD(out_messages);
296   while (cdm) {
297     DEQ_REMOVE_HEAD(out_messages);
298     cd_message_free(cdm);
299     cdm = DEQ_HEAD(out_messages);
300   }
301
302   event_thread_running = false;
303
304   return NULL;
305 } /* }}} void event_thread */
306
307 static int encqueue(cd_message_t *cdm,
308                     amqp1_config_instance_t *instance) /* {{{ */
309 {
310   /* encode message */
311   pn_message_t *message = pn_message();
312   pn_message_set_address(message, instance->send_to);
313   pn_data_t *body = pn_message_body(message);
314   pn_data_clear(body);
315   pn_data_put_binary(body, pn_bytes(cdm->mbuf.size, cdm->mbuf.start));
316   pn_data_exit(body);
317
318   /* put_binary copies and stores so ok to use mbuf */
319   cdm->mbuf.size = BUFSIZE;
320
321   int status;
322   while ((status = pn_message_encode(message, cdm->mbuf.start,
323                                      &cdm->mbuf.size)) == PN_OVERFLOW) {
324     DEBUG("amqp1 plugin: increasing message buffer size %zu", cdm->mbuf.size);
325     cdm->mbuf.size *= 2;
326     cdm->mbuf.start = (char *)realloc(cdm->mbuf.start, cdm->mbuf.size);
327   }
328
329   if (status != 0) {
330     ERROR("amqp1 plugin: error encoding message: %s",
331           pn_error_text(pn_message_error(message)));
332     pn_message_free(message);
333     cd_message_free(cdm);
334     return -1;
335   }
336
337   pthread_mutex_lock(&send_lock);
338   DEQ_INSERT_TAIL(out_messages, cdm);
339   pthread_mutex_unlock(&send_lock);
340
341   pn_message_free(message);
342
343   /* activate the sender */
344   if (conn) {
345     pn_connection_wake(conn);
346   }
347
348   return 0;
349 } /* }}} int encqueue */
350
351 static int amqp1_notify(notification_t const *n,
352                         user_data_t *user_data) /* {{{ */
353 {
354   size_t bfree = BUFSIZE;
355   size_t bfill = 0;
356   size_t bufsize = BUFSIZE;
357
358   if (n == NULL || user_data == NULL)
359     return EINVAL;
360
361   amqp1_config_instance_t *instance = user_data->data;
362
363   if (instance->notify != true) {
364     ERROR("amqp1 plugin: write notification failed");
365   }
366
367   cd_message_t *cdm = malloc(sizeof(*cdm));
368   DEQ_ITEM_INIT(cdm);
369   cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
370   cdm->instance = instance;
371
372   switch (instance->format) {
373   case AMQP1_FORMAT_JSON:
374     format_json_initialize(cdm->mbuf.start, &bfill, &bfree);
375     int status = format_json_notification(cdm->mbuf.start, bufsize, n);
376     if (status != 0) {
377       ERROR("amqp1 plugin: formatting notification failed");
378       return status;
379     }
380     cdm->mbuf.size = strlen(cdm->mbuf.start);
381     break;
382   default:
383     ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
384     return -1;
385   }
386
387   /* encode message and place on outbound queue */
388   return encqueue(cdm, instance);
389
390 } /* }}} int amqp1_notify */
391
392 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
393                        user_data_t *user_data) {
394   int status = 0;
395   size_t bfree = BUFSIZE;
396   size_t bfill = 0;
397   size_t bufsize = BUFSIZE;
398
399   if (ds == NULL || vl == NULL || transport == NULL || user_data == NULL)
400     return EINVAL;
401
402   amqp1_config_instance_t *instance = user_data->data;
403
404   if (instance->notify != false) {
405     ERROR("amqp1 plugin: write failed");
406   }
407
408   cd_message_t *cdm = malloc(sizeof(*cdm));
409   DEQ_ITEM_INIT(cdm);
410   cdm->mbuf = pn_rwbytes(bufsize, malloc(bufsize));
411   cdm->instance = instance;
412
413   switch (instance->format) {
414   case AMQP1_FORMAT_COMMAND:
415     status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
416     if (status != 0) {
417       ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
418       return status;
419     }
420     cdm->mbuf.size = strlen(cdm->mbuf.start);
421     break;
422   case AMQP1_FORMAT_JSON:
423     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
424     format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
425                            instance->store_rates);
426     format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
427     cdm->mbuf.size = strlen(cdm->mbuf.start);
428     break;
429   case AMQP1_FORMAT_GRAPHITE:
430     status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
431                              instance->prefix, instance->postfix,
432                              instance->escape_char, instance->graphite_flags);
433     if (status != 0) {
434       ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
435       return status;
436     }
437     cdm->mbuf.size = strlen(cdm->mbuf.start);
438     break;
439   default:
440     ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
441     return -1;
442   }
443
444   /* encode message and place on outbound queue */
445   return encqueue(cdm, instance);
446
447 } /* }}} int amqp1_write */
448
449 static void amqp1_config_transport_free(void *ptr) /* {{{ */
450 {
451   amqp1_config_transport_t *transport = ptr;
452
453   if (transport == NULL)
454     return;
455
456   sfree(transport->name);
457   sfree(transport->host);
458   sfree(transport->user);
459   sfree(transport->password);
460   sfree(transport->address);
461
462   sfree(transport);
463 } /* }}} void amqp1_config_transport_free */
464
465 static void amqp1_config_instance_free(void *ptr) /* {{{ */
466 {
467   amqp1_config_instance_t *instance = ptr;
468
469   if (instance == NULL)
470     return;
471
472   sfree(instance->name);
473   sfree(instance->prefix);
474   sfree(instance->postfix);
475
476   sfree(instance);
477 } /* }}} void amqp1_config_instance_free */
478
479 static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
480 {
481   amqp1_config_instance_t *instance = calloc(1, sizeof(*instance));
482   if (instance == NULL) {
483     ERROR("amqp1 plugin: calloc failed.");
484     return ENOMEM;
485   }
486
487   int status = cf_util_get_string(ci, &instance->name);
488   if (status != 0) {
489     sfree(instance);
490     return status;
491   }
492
493   for (int i = 0; i < ci->children_num; i++) {
494     oconfig_item_t *child = ci->children + i;
495
496     if (strcasecmp("PreSettle", child->key) == 0)
497       status = cf_util_get_boolean(child, &instance->pre_settle);
498     else if (strcasecmp("Notify", child->key) == 0)
499       status = cf_util_get_boolean(child, &instance->notify);
500     else if (strcasecmp("Format", child->key) == 0) {
501       char *key = NULL;
502       status = cf_util_get_string(child, &key);
503       if (status != 0)
504         return status;
505       assert(key != NULL);
506       if (strcasecmp(key, "Command") == 0) {
507         instance->format = AMQP1_FORMAT_COMMAND;
508       } else if (strcasecmp(key, "Graphite") == 0) {
509         instance->format = AMQP1_FORMAT_GRAPHITE;
510       } else if (strcasecmp(key, "JSON") == 0) {
511         instance->format = AMQP1_FORMAT_JSON;
512       } else {
513         WARNING("amqp1 plugin: Invalid format string: %s", key);
514       }
515       sfree(key);
516     } else if (strcasecmp("StoreRates", child->key) == 0)
517       status = cf_util_get_boolean(child, &instance->store_rates);
518     else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
519       status = cf_util_get_flag(child, &instance->graphite_flags,
520                                 GRAPHITE_SEPARATE_INSTANCES);
521     else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0)
522       status = cf_util_get_flag(child, &instance->graphite_flags,
523                                 GRAPHITE_ALWAYS_APPEND_DS);
524     else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0)
525       status = cf_util_get_flag(child, &instance->graphite_flags,
526                                 GRAPHITE_PRESERVE_SEPARATOR);
527     else if (strcasecmp("GraphitePrefix", child->key) == 0)
528       status = cf_util_get_string(child, &instance->prefix);
529     else if (strcasecmp("GraphitePostfix", child->key) == 0)
530       status = cf_util_get_string(child, &instance->postfix);
531     else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
532       char *tmp_buff = NULL;
533       status = cf_util_get_string(child, &tmp_buff);
534       if (status == 0) {
535         if (strlen(tmp_buff) > 1)
536           WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
537                   "only one character. Others will be ignored.");
538         instance->escape_char = tmp_buff[0];
539       }
540       sfree(tmp_buff);
541     } else
542       WARNING("amqp1 plugin: Ignoring unknown "
543               "instance configuration option "
544               "\%s\".",
545               child->key);
546     if (status != 0)
547       break;
548   }
549
550   if (status != 0) {
551     amqp1_config_instance_free(instance);
552     return status;
553   } else {
554     char tpname[DATA_MAX_NAME_LEN];
555     status = snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
556     if ((status < 0) || (size_t)status >= sizeof(tpname)) {
557       ERROR("amqp1 plugin: Instance name would have been truncated.");
558       return -1;
559     }
560     status = snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
561                       transport->address, instance->name);
562     if ((status < 0) || (size_t)status >= sizeof(instance->send_to)) {
563       ERROR("amqp1 plugin: send_to address would have been truncated.");
564       return -1;
565     }
566     if (instance->notify) {
567       status = plugin_register_notification(
568           tpname, amqp1_notify,
569           &(user_data_t){
570               .data = instance, .free_func = amqp1_config_instance_free,
571           });
572     } else {
573       status = plugin_register_write(
574           tpname, amqp1_write,
575           &(user_data_t){
576               .data = instance, .free_func = amqp1_config_instance_free,
577           });
578     }
579
580     if (status != 0) {
581       amqp1_config_instance_free(instance);
582     }
583   }
584
585   return status;
586 } /* }}} int amqp1_config_instance */
587
588 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
589 {
590   transport = calloc(1, sizeof(*transport));
591   if (transport == NULL) {
592     ERROR("amqp1 plugin: calloc failed.");
593     return ENOMEM;
594   }
595
596   /* Initialize transport configuration {{{ */
597   transport->retry_delay = 1;
598
599   int status = cf_util_get_string(ci, &transport->name);
600   if (status != 0) {
601     sfree(transport);
602     return status;
603   }
604
605   for (int i = 0; i < ci->children_num; i++) {
606     oconfig_item_t *child = ci->children + i;
607
608     if (strcasecmp("Host", child->key) == 0)
609       status = cf_util_get_string(child, &transport->host);
610     else if (strcasecmp("Port", child->key) == 0)
611       status = cf_util_get_string(child, &transport->port);
612     else if (strcasecmp("User", child->key) == 0)
613       status = cf_util_get_string(child, &transport->user);
614     else if (strcasecmp("Password", child->key) == 0)
615       status = cf_util_get_string(child, &transport->password);
616     else if (strcasecmp("Address", child->key) == 0)
617       status = cf_util_get_string(child, &transport->address);
618     else if (strcasecmp("RetryDelay", child->key) == 0)
619       status = cf_util_get_int(child, &transport->retry_delay);
620     else if (strcasecmp("Instance", child->key) == 0)
621       amqp1_config_instance(child);
622     else
623       WARNING("amqp1 plugin: Ignoring unknown "
624               "transport configuration option "
625               "\%s\".",
626               child->key);
627
628     if (status != 0)
629       break;
630   }
631
632   if (status != 0) {
633     amqp1_config_transport_free(transport);
634   }
635   return status;
636 } /* }}} int amqp1_config_transport */
637
638 static int amqp1_config(oconfig_item_t *ci) /* {{{ */
639 {
640
641   for (int i = 0; i < ci->children_num; i++) {
642     oconfig_item_t *child = ci->children + i;
643
644     if (strcasecmp("Transport", child->key) == 0)
645       amqp1_config_transport(child);
646     else
647       WARNING("amqp1 plugin: Ignoring unknown config option \%s\".",
648               child->key);
649   }
650
651   return 0;
652 } /* }}} int amqp1_config */
653
654 static int amqp1_init(void) /* {{{ */
655 {
656   if (transport == NULL) {
657     ERROR("amqp1: init failed, no transport configured");
658     return -1;
659   }
660
661   if (proactor == NULL) {
662     pthread_mutex_init(&send_lock, /* attr = */ NULL);
663     /* start_thread */
664     int status =
665         plugin_thread_create(&event_thread_id, NULL /* no attributes */,
666                              event_thread, NULL /* no argument */, "handle");
667     if (status != 0) {
668       ERROR("amqp1 plugin: pthread_create failed: %s", STRERRNO);
669     } else {
670       event_thread_running = true;
671     }
672   }
673   return 0;
674 } /* }}} int amqp1_init */
675
676 static int amqp1_shutdown(void) /* {{{ */
677 {
678   stopping = true;
679
680   /* Stop the proactor thread */
681   if (event_thread_running) {
682     DEBUG("amqp1 plugin: Shutting down proactor thread.");
683     pn_connection_wake(conn);
684   }
685   pthread_join(event_thread_id, NULL /* no return value */);
686   memset(&event_thread_id, 0, sizeof(event_thread_id));
687
688   DEBUG("amqp1 plugin: proactor thread exited.");
689
690   if (transport) {
691     amqp1_config_transport_free(transport);
692   }
693
694   return 0;
695 } /* }}} int amqp1_shutdown */
696
697 void module_register(void) {
698   plugin_register_complex_config("amqp1", amqp1_config);
699   plugin_register_init("amqp1", amqp1_init);
700   plugin_register_shutdown("amqp1", amqp1_shutdown);
701 } /* void module_register */