514e4be7fb8c64592eda2c735734c7c72412848d
[collectd.git] / src / amqp1.c
1 /**
2  * collectd - src/amqp1.c
3  * Copyright(c) 2017 Red Hat Inc.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Andy Smith <ansmith@redhat.com>
25  */
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_cmd_putval.h"
32 #include "utils_deq.h"
33 #include "utils_format_graphite.h"
34 #include "utils_format_json.h"
35 #include "utils_random.h"
36
37 #include <proton/condition.h>
38 #include <proton/connection.h>
39 #include <proton/delivery.h>
40 #include <proton/link.h>
41 #include <proton/message.h>
42 #include <proton/proactor.h>
43 #include <proton/sasl.h>
44 #include <proton/session.h>
45 #include <proton/transport.h>
46
47 #include <errno.h>
48 #include <stdint.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51
52 #define BUFSIZE 8192
53 #define AMQP1_FORMAT_JSON 0
54 #define AMQP1_FORMAT_COMMAND 1
55 #define AMQP1_FORMAT_GRAPHITE 2
56
57 typedef struct amqp1_config_transport_t {
58   DEQ_LINKS(struct amqp1_config_transport_t);
59   char *name;
60   char *host;
61   char *port;
62   char *user;
63   char *password;
64   char *address;
65 } amqp1_config_transport_t;
66
67 typedef struct amqp1_config_instance_t {
68   DEQ_LINKS(struct amqp1_config_instance_t);
69   char *name;
70   _Bool notify;
71   uint8_t format;
72   unsigned int graphite_flags;
73   _Bool store_rates;
74   char *prefix;
75   char *postfix;
76   char escape_char;
77   _Bool pre_settle;
78   char send_to[128];
79 } amqp1_config_instance_t;
80
81 DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
82
83 typedef struct cd_message_t {
84   DEQ_LINKS(struct cd_message_t);
85   pn_bytes_t mbuf;
86   amqp1_config_instance_t *instance;
87 } cd_message_t;
88
89 DEQ_DECLARE(cd_message_t, cd_message_list_t);
90
91 /*
92  * Globals
93  */
94 pn_connection_t *conn = NULL;
95 pn_session_t *ssn = NULL;
96 pn_link_t *sender = NULL;
97 pn_proactor_t *proactor = NULL;
98 pthread_mutex_t send_lock;
99 cd_message_list_t out_messages;
100 uint64_t cd_tag = 1;
101 uint64_t acknowledged = 0;
102 amqp1_config_transport_t *transport = NULL;
103 bool finished = false;
104
105 static int event_thread_running = 0;
106 static pthread_t event_thread_id;
107
108 /*
109  * Functions
110  */
111 static void cd_message_free(cd_message_t *cdm) {
112   if (cdm->mbuf.start) {
113     free((void *)cdm->mbuf.start);
114   }
115   free(cdm);
116 } /* }}} void cd_message_free */
117
118 static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
119 {
120   uint64_t dtag;
121   cd_message_list_t to_send;
122   cd_message_t *cdm;
123   int link_credit = pn_link_credit(link);
124   int event_count = 0;
125   pn_delivery_t *dlv;
126
127   DEQ_INIT(to_send);
128
129   pthread_mutex_lock(&send_lock);
130
131   if (link_credit > 0) {
132     dtag = cd_tag;
133     cdm = DEQ_HEAD(out_messages);
134     while (cdm) {
135       DEQ_REMOVE_HEAD(out_messages);
136       DEQ_INSERT_TAIL(to_send, cdm);
137       if (DEQ_SIZE(to_send) == link_credit)
138         break;
139       cdm = DEQ_HEAD(out_messages);
140     }
141     cd_tag += DEQ_SIZE(to_send);
142   }
143
144   pthread_mutex_unlock(&send_lock);
145
146   /* message is already formatted and encoded */
147   cdm = DEQ_HEAD(to_send);
148   while (cdm) {
149     DEQ_REMOVE_HEAD(to_send);
150     dtag++;
151     dlv = pn_delivery(link, pn_dtag((const char *)&dtag, sizeof(dtag)));
152     pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
153     pn_link_advance(link);
154     if (cdm->instance->pre_settle == true) {
155       pn_delivery_settle(dlv);
156     }
157     event_count++;
158     cd_message_free(cdm);
159     cdm = DEQ_HEAD(to_send);
160   }
161
162   return event_count;
163 } /* }}} int amqp1_send_out_messages */
164
165 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
166 {
167   if (pn_condition_is_set(cond)) {
168     ERROR("amqp1 plugin: %s: %s: %s", pn_event_type_name(pn_event_type(e)),
169           pn_condition_get_name(cond), pn_condition_get_description(cond));
170     pn_connection_close(pn_event_connection(e));
171     conn = NULL;
172   }
173 } /* }}} void check_condition */
174
175 static bool handle(pn_event_t *event) /* {{{ */
176 {
177
178   switch (pn_event_type(event)) {
179
180   case PN_CONNECTION_INIT: {
181     conn = pn_event_connection(event);
182     pn_connection_set_container(conn, transport->address);
183     pn_connection_open(conn);
184     ssn = pn_session(conn);
185     pn_session_open(ssn);
186     sender = pn_sender(ssn, "cd-sender");
187     pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
188     pn_link_open(sender);
189     break;
190   }
191
192   case PN_LINK_FLOW: {
193     /* peer has given us credit, send outbound messages */
194     amqp1_send_out_messages(sender);
195     break;
196   }
197
198   case PN_DELIVERY: {
199     /* acknowledgement from peer that a message was delivered */
200     pn_delivery_t *dlv = pn_event_delivery(event);
201     if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) {
202       acknowledged++;
203     }
204     break;
205   }
206
207   case PN_CONNECTION_WAKE: {
208     if (!finished) {
209       amqp1_send_out_messages(sender);
210     }
211     break;
212   }
213
214   case PN_TRANSPORT_CLOSED: {
215     check_condition(event, pn_transport_condition(pn_event_transport(event)));
216     break;
217   }
218
219   case PN_CONNECTION_REMOTE_CLOSE: {
220     check_condition(event,
221                     pn_session_remote_condition(pn_event_session(event)));
222     pn_connection_close(pn_event_connection(event));
223     break;
224   }
225
226   case PN_SESSION_REMOTE_CLOSE: {
227     check_condition(event,
228                     pn_session_remote_condition(pn_event_session(event)));
229     pn_connection_close(pn_event_connection(event));
230     break;
231   }
232
233   case PN_LINK_REMOTE_CLOSE:
234   case PN_LINK_REMOTE_DETACH: {
235     check_condition(event, pn_link_remote_condition(pn_event_link(event)));
236     pn_connection_close(pn_event_connection(event));
237     break;
238   }
239
240   case PN_PROACTOR_INACTIVE: {
241     return false;
242   }
243
244   default:
245     break;
246   }
247   return true;
248 } /* }}} bool handle */
249
250 static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
251 {
252
253   do {
254     pn_event_batch_t *events = pn_proactor_wait(proactor);
255     pn_event_t *e;
256     for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
257       if (!handle(e)) {
258         finished = true;
259       }
260     }
261     pn_proactor_done(proactor, events);
262   } while (!finished);
263
264   event_thread_running = 0;
265
266   return NULL;
267 } /* }}} void event_thread */
268
269 static void encqueue(cd_message_t *cdm,
270                      amqp1_config_instance_t *instance) /* {{{ */
271 {
272   size_t bufsize = BUFSIZE;
273   pn_data_t *body;
274   pn_message_t *message;
275
276   /* encode message */
277   message = pn_message();
278   pn_message_set_address(message, instance->send_to);
279   body = pn_message_body(message);
280   pn_data_clear(body);
281   pn_data_put_binary(body, cdm->mbuf);
282   pn_data_exit(body);
283
284   /* put_binary copies and stores so ok to use mbuf */
285   cdm->mbuf.size = bufsize;
286   pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
287
288   pthread_mutex_lock(&send_lock);
289   DEQ_INSERT_TAIL(out_messages, cdm);
290   pthread_mutex_unlock(&send_lock);
291
292   pn_message_free(message);
293
294   /* activate the sender */
295   if (conn != NULL) {
296     pn_connection_wake(conn);
297   }
298
299 } /* }}} void encqueue */
300
301 static int amqp1_notify(notification_t const *n,
302                         user_data_t *user_data) /* {{{ */
303 {
304   amqp1_config_instance_t *instance;
305   int status = 0;
306   size_t bfree = BUFSIZE;
307   size_t bfill = 0;
308   cd_message_t *cdm;
309   size_t bufsize = BUFSIZE;
310
311   if ((n == NULL) || (user_data == NULL))
312     return EINVAL;
313
314   instance = user_data->data;
315
316   if (instance->notify != true) {
317     ERROR("amqp1 plugin: write notification failed");
318   }
319
320   cdm = NEW(cd_message_t);
321   DEQ_ITEM_INIT(cdm);
322   cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
323   cdm->instance = instance;
324
325   switch (instance->format) {
326   case AMQP1_FORMAT_JSON:
327     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
328     status = format_json_notification((char *)cdm->mbuf.start, bufsize, n);
329     if (status != 0) {
330       ERROR("amqp1 plugin: formatting notification failed");
331       return status;
332     }
333     cdm->mbuf.size = strlen(cdm->mbuf.start);
334     break;
335   default:
336     ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
337     return -1;
338   }
339
340   /* encode message and place on outbound queue */
341   encqueue(cdm, instance);
342
343   return 0;
344 } /* }}} int amqp1_notify */
345
346 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
347                        user_data_t *user_data) {
348   amqp1_config_instance_t *instance;
349   int status = 0;
350   size_t bfree = BUFSIZE;
351   size_t bfill = 0;
352   cd_message_t *cdm;
353   size_t bufsize = BUFSIZE;
354
355   if ((ds == NULL) || (vl == NULL) || (transport == NULL) ||
356       (user_data == NULL))
357     return EINVAL;
358
359   instance = user_data->data;
360
361   if (instance->notify != false) {
362     ERROR("amqp1 plugin: write failed");
363   }
364
365   cdm = NEW(cd_message_t);
366   DEQ_ITEM_INIT(cdm);
367   cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
368   cdm->instance = instance;
369
370   switch (instance->format) {
371   case AMQP1_FORMAT_COMMAND:
372     status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
373     if (status != 0) {
374       ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
375       return status;
376     }
377     cdm->mbuf.size = strlen(cdm->mbuf.start);
378     break;
379   case AMQP1_FORMAT_JSON:
380     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
381     format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
382                            instance->store_rates);
383     format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
384     cdm->mbuf.size = strlen(cdm->mbuf.start);
385     break;
386   case AMQP1_FORMAT_GRAPHITE:
387     status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
388                              instance->prefix, instance->postfix,
389                              instance->escape_char, instance->graphite_flags);
390     if (status != 0) {
391       ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
392       return status;
393     }
394     cdm->mbuf.size = strlen(cdm->mbuf.start);
395     break;
396   default:
397     ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
398     return -1;
399   }
400
401   /* encode message and place on outboud queue */
402   encqueue(cdm, instance);
403
404   return 0;
405 } /* }}} int amqp1_write */
406
407 static void amqp1_config_transport_free(void *ptr) /* {{{ */
408 {
409   amqp1_config_transport_t *transport = ptr;
410
411   if (transport == NULL)
412     return;
413
414   sfree(transport->name);
415   sfree(transport->host);
416   sfree(transport->user);
417   sfree(transport->password);
418   sfree(transport->address);
419
420   sfree(transport);
421 } /* }}} void amqp1_config_transport_free */
422
423 static void amqp1_config_instance_free(void *ptr) /* {{{ */
424 {
425   amqp1_config_instance_t *instance = ptr;
426
427   if (instance == NULL)
428     return;
429
430   sfree(instance->name);
431   sfree(instance->prefix);
432   sfree(instance->postfix);
433
434   sfree(instance);
435 } /* }}} void amqp1_config_instance_free */
436
437 static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
438 {
439   int status = 0;
440   char *key = NULL;
441   amqp1_config_instance_t *instance;
442
443   instance = calloc(1, sizeof(*instance));
444   if (instance == NULL) {
445     ERROR("amqp1 plugin: calloc failed.");
446     return ENOMEM;
447   }
448
449   /* Initialize instance configuration {{{ */
450   instance->name = NULL;
451
452   status = cf_util_get_string(ci, &instance->name);
453   if (status != 0) {
454     sfree(instance);
455     return status;
456   }
457
458   for (int i = 0; i < ci->children_num; i++) {
459     oconfig_item_t *child = ci->children + i;
460
461     if (strcasecmp("PreSettle", child->key) == 0)
462       status = cf_util_get_boolean(child, &instance->pre_settle);
463     else if (strcasecmp("Notify", child->key) == 0)
464       status = cf_util_get_boolean(child, &instance->notify);
465     else if (strcasecmp("Format", child->key) == 0) {
466       status = cf_util_get_string(child, &key);
467       if (status != 0)
468         return status;
469       /* TODO: goto errout */
470       //          goto errout;
471       assert(key != NULL);
472       if (strcasecmp(key, "Command") == 0) {
473         instance->format = AMQP1_FORMAT_COMMAND;
474       } else if (strcasecmp(key, "Graphite") == 0) {
475         instance->format = AMQP1_FORMAT_GRAPHITE;
476       } else if (strcasecmp(key, "JSON") == 0) {
477         instance->format = AMQP1_FORMAT_JSON;
478       } else {
479         WARNING("amqp1 plugin: Invalid format string: %s", key);
480       }
481       sfree(key);
482     } else if (strcasecmp("StoreRates", child->key) == 0)
483       status = cf_util_get_boolean(child, &instance->store_rates);
484     else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
485       status = cf_util_get_flag(child, &instance->graphite_flags,
486                                 GRAPHITE_SEPARATE_INSTANCES);
487     else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0)
488       status = cf_util_get_flag(child, &instance->graphite_flags,
489                                 GRAPHITE_ALWAYS_APPEND_DS);
490     else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0)
491       status = cf_util_get_flag(child, &instance->graphite_flags,
492                                 GRAPHITE_PRESERVE_SEPARATOR);
493     else if (strcasecmp("GraphitePrefix", child->key) == 0)
494       status = cf_util_get_string(child, &instance->prefix);
495     else if (strcasecmp("GraphitePostfix", child->key) == 0)
496       status = cf_util_get_string(child, &instance->postfix);
497     else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
498       char *tmp_buff = NULL;
499       status = cf_util_get_string(child, &tmp_buff);
500       if (strlen(tmp_buff) > 1)
501         WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
502                 "only one character. Others will be ignored.");
503       instance->escape_char = tmp_buff[0];
504       sfree(tmp_buff);
505     } else
506       WARNING("amqp1 plugin: Ignoring unknown "
507               "instance configuration option "
508               "\%s\".",
509               child->key);
510     if (status != 0)
511       break;
512   }
513
514   if (status != 0) {
515     amqp1_config_instance_free(instance);
516     return status;
517   } else {
518     char tpname[128];
519     snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
520     snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
521              transport->address, instance->name);
522     if (instance->notify == true) {
523       status = plugin_register_notification(
524           tpname, amqp1_notify,
525           &(user_data_t){
526               .data = instance, .free_func = amqp1_config_instance_free,
527           });
528     } else {
529       status = plugin_register_write(
530           tpname, amqp1_write,
531           &(user_data_t){
532               .data = instance, .free_func = amqp1_config_instance_free,
533           });
534     }
535
536     if (status != 0) {
537       amqp1_config_instance_free(instance);
538     }
539   }
540
541   return status;
542 } /* }}} int amqp1_config_instance */
543
544 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
545 {
546   int status = 0;
547
548   transport = calloc(1, sizeof(*transport));
549   if (transport == NULL) {
550     ERROR("amqp1 plugin: calloc failed.");
551     return ENOMEM;
552   }
553
554   /* Initialize transport configuration {{{ */
555   transport->name = NULL;
556
557   status = cf_util_get_string(ci, &transport->name);
558   if (status != 0) {
559     sfree(transport);
560     return status;
561   }
562
563   for (int i = 0; i < ci->children_num; i++) {
564     oconfig_item_t *child = ci->children + i;
565
566     if (strcasecmp("Host", child->key) == 0)
567       status = cf_util_get_string(child, &transport->host);
568     else if (strcasecmp("Port", child->key) == 0)
569       status = cf_util_get_string(child, &transport->port);
570     else if (strcasecmp("User", child->key) == 0)
571       status = cf_util_get_string(child, &transport->user);
572     else if (strcasecmp("Password", child->key) == 0)
573       status = cf_util_get_string(child, &transport->password);
574     else if (strcasecmp("Address", child->key) == 0)
575       status = cf_util_get_string(child, &transport->address);
576     else if (strcasecmp("Instance", child->key) == 0)
577       amqp1_config_instance(child);
578     else
579       WARNING("amqp1 plugin: Ignoring unknown "
580               "transport configuration option "
581               "\%s\".",
582               child->key);
583
584     if (status != 0)
585       break;
586   }
587
588   if (status != 0) {
589     amqp1_config_transport_free(transport);
590   }
591   return status;
592 } /* }}} int amqp1_config_transport */
593
594 static int amqp1_config(oconfig_item_t *ci) /* {{{ */
595 {
596
597   for (int i = 0; i < ci->children_num; i++) {
598     oconfig_item_t *child = ci->children + i;
599
600     if (strcasecmp("Transport", child->key) == 0)
601       amqp1_config_transport(child);
602     else
603       WARNING("amqp1 plugin: Ignoring unknown config iption \%s\".",
604               child->key);
605   }
606
607   return 0;
608 } /* }}} int amqp1_config */
609
610 static int amqp1_init(void) /* {{{ */
611 {
612   char addr[PN_MAX_ADDR];
613   int status;
614   char errbuf[1024];
615
616   if (transport == NULL) {
617     ERROR("amqp1: init failed, no transport configured");
618     return -1;
619   }
620
621   if (proactor == NULL) {
622     pthread_mutex_init(&send_lock, /* attr = */ NULL);
623     proactor = pn_proactor();
624     pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
625     conn = pn_connection();
626     if (transport->user != NULL) {
627       pn_connection_set_user(conn, transport->user);
628       pn_connection_set_password(conn, transport->password);
629     }
630     pn_proactor_connect(proactor, conn, addr);
631     /* start_thread */
632     status =
633         plugin_thread_create(&event_thread_id, NULL /* no attributes */,
634                              event_thread, NULL /* no argument */, "handle");
635     if (status != 0) {
636       ERROR("amqp1: pthread_create failed: %s",
637             sstrerror(errno, errbuf, sizeof(errbuf)));
638     } else {
639       event_thread_running = 1;
640     }
641   }
642   return 0;
643 } /* }}} int amqp1_init */
644
645 static int amqp1_shutdown(void) /* {{{ */
646 {
647   cd_message_t *cdm;
648
649   /* Stop the proactor thread */
650   if (event_thread_running != 0) {
651     finished = true;
652     /* activate the event thread */
653     pn_connection_wake(conn);
654     pthread_join(event_thread_id, NULL /* no return value */);
655     memset(&event_thread_id, 0, sizeof(event_thread_id));
656   }
657
658   /* Free the remaining out_messages */
659   cdm = DEQ_HEAD(out_messages);
660   while (cdm) {
661     DEQ_REMOVE_HEAD(out_messages);
662     cd_message_free(cdm);
663     cdm = DEQ_HEAD(out_messages);
664   }
665
666   if (proactor != NULL) {
667     pn_proactor_free(proactor);
668   }
669
670   if (transport != NULL) {
671     amqp1_config_transport_free(transport);
672   }
673
674   return 0;
675 } /* }}} int amqp1_shutdown */
676
677 void module_register(void) {
678   plugin_register_complex_config("amqp1", amqp1_config);
679   plugin_register_init("amqp1", amqp1_init);
680   plugin_register_shutdown("amqp1", amqp1_shutdown);
681 } /* void module_register */