d7be877b734cfad541c70c6e44ef2141279cc658
[collectd.git] / src / amqp1.c
1 /**
2  * collectd - src/amqp1.c
3  * Copyright(c) 2017 Red Hat Inc.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Andy Smith <ansmith@redhat.com>
25  */
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_cmd_putval.h"
32 #include "utils_deq.h"
33 #include "utils_format_graphite.h"
34 #include "utils_format_json.h"
35 #include "utils_random.h"
36
37 #include <proton/condition.h>
38 #include <proton/connection.h>
39 #include <proton/delivery.h>
40 #include <proton/link.h>
41 #include <proton/message.h>
42 #include <proton/proactor.h>
43 #include <proton/sasl.h>
44 #include <proton/session.h>
45 #include <proton/transport.h>
46
47 #include <errno.h>
48 #include <stdint.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51
52 #define BUFSIZE 8192
53 #define AMQP1_FORMAT_JSON 0
54 #define AMQP1_FORMAT_COMMAND 1
55 #define AMQP1_FORMAT_GRAPHITE 2
56
57 typedef struct amqp1_config_transport_t {
58   DEQ_LINKS(struct amqp1_config_transport_t);
59   char *name;
60   char *host;
61   char *port;
62   char *user;
63   char *password;
64   char *address;
65 } amqp1_config_transport_t;
66
67 typedef struct amqp1_config_instance_t {
68   DEQ_LINKS(struct amqp1_config_instance_t);
69   char *name;
70   _Bool notify;
71   uint8_t format;
72   unsigned int graphite_flags;
73   _Bool store_rates;
74   char *prefix;
75   char *postfix;
76   char escape_char;
77   _Bool pre_settle;
78   char send_to[128];
79 } amqp1_config_instance_t;
80
81 DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
82
83 typedef struct cd_message_t {
84   DEQ_LINKS(struct cd_message_t);
85   pn_bytes_t mbuf;
86   amqp1_config_instance_t *instance;
87 } cd_message_t;
88
89 DEQ_DECLARE(cd_message_t, cd_message_list_t);
90
91 /*
92  * Globals
93  */
94 pn_connection_t *conn = NULL;
95 pn_session_t *ssn = NULL;
96 pn_link_t *sender = NULL;
97 pn_proactor_t *proactor = NULL;
98 pthread_mutex_t send_lock;
99 cd_message_list_t out_messages;
100 uint64_t cd_tag = 1;
101 uint64_t acknowledged = 0;
102 amqp1_config_transport_t *transport = NULL;
103 bool finished = false;
104
105 static int event_thread_running = 0;
106 static pthread_t event_thread_id;
107
108 /*
109  * Functions
110  */
111 static void cd_message_free(cd_message_t *cdm) {
112   if (cdm->mbuf.start) {
113     free((void *)cdm->mbuf.start);
114   }
115   free(cdm);
116 } /* }}} void cd_message_free */
117
118 static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
119 {
120   uint64_t dtag;
121   cd_message_list_t to_send;
122   cd_message_t *cdm;
123   int link_credit = pn_link_credit(link);
124   int event_count = 0;
125   pn_delivery_t *dlv;
126
127   DEQ_INIT(to_send);
128
129   pthread_mutex_lock(&send_lock);
130
131   if (link_credit > 0) {
132     dtag = cd_tag;
133     cdm = DEQ_HEAD(out_messages);
134     while (cdm) {
135       DEQ_REMOVE_HEAD(out_messages);
136       DEQ_INSERT_TAIL(to_send, cdm);
137       if (DEQ_SIZE(to_send) == link_credit)
138         break;
139       cdm = DEQ_HEAD(out_messages);
140     }
141     cd_tag += DEQ_SIZE(to_send);
142   }
143
144   pthread_mutex_unlock(&send_lock);
145
146   /* message is already formatted and encoded */
147   cdm = DEQ_HEAD(to_send);
148   while (cdm) {
149     DEQ_REMOVE_HEAD(to_send);
150     dtag++;
151     dlv = pn_delivery(link, pn_dtag((const char *)&dtag, sizeof(dtag)));
152     pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
153     pn_link_advance(link);
154     if (cdm->instance->pre_settle == true) {
155       pn_delivery_settle(dlv);
156     }
157     event_count++;
158     cd_message_free(cdm);
159     cdm = DEQ_HEAD(to_send);
160   }
161
162   return event_count;
163 } /* }}} int amqp1_send_out_messages */
164
165 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
166 {
167   if (pn_condition_is_set(cond)) {
168     ERROR("amqp1 plugin: %s: %s: %s", pn_event_type_name(pn_event_type(e)),
169           pn_condition_get_name(cond), pn_condition_get_description(cond));
170     pn_connection_close(pn_event_connection(e));
171     conn = NULL;
172   }
173 } /* }}} void check_condition */
174
175 static bool handle(pn_event_t *event) /* {{{ */
176 {
177
178   switch (pn_event_type(event)) {
179
180   case PN_CONNECTION_INIT: {
181     conn = pn_event_connection(event);
182     pn_connection_set_container(conn, transport->address);
183     pn_connection_open(conn);
184     ssn = pn_session(conn);
185     pn_session_open(ssn);
186     sender = pn_sender(ssn, "cd-sender");
187     pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
188     pn_link_open(sender);
189     break;
190   }
191
192   case PN_LINK_FLOW: {
193     /* peer has given us credit, send outbound messages */
194     amqp1_send_out_messages(sender);
195     break;
196   }
197
198   case PN_DELIVERY: {
199     /* acknowledgement from peer that a message was delivered */
200     pn_delivery_t *dlv = pn_event_delivery(event);
201     if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) {
202       pn_delivery_settle(dlv);
203       acknowledged++;
204     }
205     break;
206   }
207
208   case PN_CONNECTION_WAKE: {
209     if (!finished) {
210       amqp1_send_out_messages(sender);
211     }
212     break;
213   }
214
215   case PN_TRANSPORT_CLOSED: {
216     check_condition(event, pn_transport_condition(pn_event_transport(event)));
217     break;
218   }
219
220   case PN_CONNECTION_REMOTE_CLOSE: {
221     check_condition(event,
222                     pn_session_remote_condition(pn_event_session(event)));
223     pn_connection_close(pn_event_connection(event));
224     break;
225   }
226
227   case PN_SESSION_REMOTE_CLOSE: {
228     check_condition(event,
229                     pn_session_remote_condition(pn_event_session(event)));
230     pn_connection_close(pn_event_connection(event));
231     break;
232   }
233
234   case PN_LINK_REMOTE_CLOSE:
235   case PN_LINK_REMOTE_DETACH: {
236     check_condition(event, pn_link_remote_condition(pn_event_link(event)));
237     pn_connection_close(pn_event_connection(event));
238     break;
239   }
240
241   case PN_PROACTOR_INACTIVE: {
242     return false;
243   }
244
245   default:
246     break;
247   }
248   return true;
249 } /* }}} bool handle */
250
251 static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
252 {
253
254   do {
255     pn_event_batch_t *events = pn_proactor_wait(proactor);
256     pn_event_t *e;
257     for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
258       if (!handle(e)) {
259         finished = true;
260       }
261     }
262     pn_proactor_done(proactor, events);
263   } while (!finished);
264
265   event_thread_running = 0;
266
267   return NULL;
268 } /* }}} void event_thread */
269
270 static void encqueue(cd_message_t *cdm,
271                      amqp1_config_instance_t *instance) /* {{{ */
272 {
273   size_t bufsize = BUFSIZE;
274   pn_data_t *body;
275   pn_message_t *message;
276
277   /* encode message */
278   message = pn_message();
279   pn_message_set_address(message, instance->send_to);
280   body = pn_message_body(message);
281   pn_data_clear(body);
282   pn_data_put_binary(body, cdm->mbuf);
283   pn_data_exit(body);
284
285   /* put_binary copies and stores so ok to use mbuf */
286   cdm->mbuf.size = bufsize;
287   pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
288
289   pthread_mutex_lock(&send_lock);
290   DEQ_INSERT_TAIL(out_messages, cdm);
291   pthread_mutex_unlock(&send_lock);
292
293   pn_message_free(message);
294
295   /* activate the sender */
296   if (conn != NULL) {
297     pn_connection_wake(conn);
298   }
299
300 } /* }}} void encqueue */
301
302 static int amqp1_notify(notification_t const *n,
303                         user_data_t *user_data) /* {{{ */
304 {
305   amqp1_config_instance_t *instance;
306   int status = 0;
307   size_t bfree = BUFSIZE;
308   size_t bfill = 0;
309   cd_message_t *cdm;
310   size_t bufsize = BUFSIZE;
311
312   if ((n == NULL) || (user_data == NULL))
313     return EINVAL;
314
315   instance = user_data->data;
316
317   if (instance->notify != true) {
318     ERROR("amqp1 plugin: write notification failed");
319   }
320
321   cdm = NEW(cd_message_t);
322   DEQ_ITEM_INIT(cdm);
323   cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
324   cdm->instance = instance;
325
326   switch (instance->format) {
327   case AMQP1_FORMAT_JSON:
328     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
329     status = format_json_notification((char *)cdm->mbuf.start, bufsize, n);
330     if (status != 0) {
331       ERROR("amqp1 plugin: formatting notification failed");
332       return status;
333     }
334     cdm->mbuf.size = strlen(cdm->mbuf.start);
335     break;
336   default:
337     ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
338     return -1;
339   }
340
341   /* encode message and place on outbound queue */
342   encqueue(cdm, instance);
343
344   return 0;
345 } /* }}} int amqp1_notify */
346
347 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
348                        user_data_t *user_data) {
349   amqp1_config_instance_t *instance;
350   int status = 0;
351   size_t bfree = BUFSIZE;
352   size_t bfill = 0;
353   cd_message_t *cdm;
354   size_t bufsize = BUFSIZE;
355
356   if ((ds == NULL) || (vl == NULL) || (transport == NULL) ||
357       (user_data == NULL))
358     return EINVAL;
359
360   instance = user_data->data;
361
362   if (instance->notify != false) {
363     ERROR("amqp1 plugin: write failed");
364   }
365
366   cdm = NEW(cd_message_t);
367   DEQ_ITEM_INIT(cdm);
368   cdm->mbuf = pn_bytes(bufsize, (char *)malloc(bufsize));
369   cdm->instance = instance;
370
371   switch (instance->format) {
372   case AMQP1_FORMAT_COMMAND:
373     status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
374     if (status != 0) {
375       ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
376       return status;
377     }
378     cdm->mbuf.size = strlen(cdm->mbuf.start);
379     break;
380   case AMQP1_FORMAT_JSON:
381     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
382     format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
383                            instance->store_rates);
384     format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
385     cdm->mbuf.size = strlen(cdm->mbuf.start);
386     break;
387   case AMQP1_FORMAT_GRAPHITE:
388     status = format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl,
389                              instance->prefix, instance->postfix,
390                              instance->escape_char, instance->graphite_flags);
391     if (status != 0) {
392       ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
393       return status;
394     }
395     cdm->mbuf.size = strlen(cdm->mbuf.start);
396     break;
397   default:
398     ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
399     return -1;
400   }
401
402   /* encode message and place on outboud queue */
403   encqueue(cdm, instance);
404
405   return 0;
406 } /* }}} int amqp1_write */
407
408 static void amqp1_config_transport_free(void *ptr) /* {{{ */
409 {
410   amqp1_config_transport_t *transport = ptr;
411
412   if (transport == NULL)
413     return;
414
415   sfree(transport->name);
416   sfree(transport->host);
417   sfree(transport->user);
418   sfree(transport->password);
419   sfree(transport->address);
420
421   sfree(transport);
422 } /* }}} void amqp1_config_transport_free */
423
424 static void amqp1_config_instance_free(void *ptr) /* {{{ */
425 {
426   amqp1_config_instance_t *instance = ptr;
427
428   if (instance == NULL)
429     return;
430
431   sfree(instance->name);
432   sfree(instance->prefix);
433   sfree(instance->postfix);
434
435   sfree(instance);
436 } /* }}} void amqp1_config_instance_free */
437
438 static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
439 {
440   int status = 0;
441   char *key = NULL;
442   amqp1_config_instance_t *instance;
443
444   instance = calloc(1, sizeof(*instance));
445   if (instance == NULL) {
446     ERROR("amqp1 plugin: calloc failed.");
447     return ENOMEM;
448   }
449
450   /* Initialize instance configuration {{{ */
451   instance->name = NULL;
452
453   status = cf_util_get_string(ci, &instance->name);
454   if (status != 0) {
455     sfree(instance);
456     return status;
457   }
458
459   for (int i = 0; i < ci->children_num; i++) {
460     oconfig_item_t *child = ci->children + i;
461
462     if (strcasecmp("PreSettle", child->key) == 0)
463       status = cf_util_get_boolean(child, &instance->pre_settle);
464     else if (strcasecmp("Notify", child->key) == 0)
465       status = cf_util_get_boolean(child, &instance->notify);
466     else if (strcasecmp("Format", child->key) == 0) {
467       status = cf_util_get_string(child, &key);
468       if (status != 0)
469         return status;
470       /* TODO: goto errout */
471       //          goto errout;
472       assert(key != NULL);
473       if (strcasecmp(key, "Command") == 0) {
474         instance->format = AMQP1_FORMAT_COMMAND;
475       } else if (strcasecmp(key, "Graphite") == 0) {
476         instance->format = AMQP1_FORMAT_GRAPHITE;
477       } else if (strcasecmp(key, "JSON") == 0) {
478         instance->format = AMQP1_FORMAT_JSON;
479       } else {
480         WARNING("amqp1 plugin: Invalid format string: %s", key);
481       }
482       sfree(key);
483     } else if (strcasecmp("StoreRates", child->key) == 0)
484       status = cf_util_get_boolean(child, &instance->store_rates);
485     else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
486       status = cf_util_get_flag(child, &instance->graphite_flags,
487                                 GRAPHITE_SEPARATE_INSTANCES);
488     else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0)
489       status = cf_util_get_flag(child, &instance->graphite_flags,
490                                 GRAPHITE_ALWAYS_APPEND_DS);
491     else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0)
492       status = cf_util_get_flag(child, &instance->graphite_flags,
493                                 GRAPHITE_PRESERVE_SEPARATOR);
494     else if (strcasecmp("GraphitePrefix", child->key) == 0)
495       status = cf_util_get_string(child, &instance->prefix);
496     else if (strcasecmp("GraphitePostfix", child->key) == 0)
497       status = cf_util_get_string(child, &instance->postfix);
498     else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
499       char *tmp_buff = NULL;
500       status = cf_util_get_string(child, &tmp_buff);
501       if (strlen(tmp_buff) > 1)
502         WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
503                 "only one character. Others will be ignored.");
504       instance->escape_char = tmp_buff[0];
505       sfree(tmp_buff);
506     } else
507       WARNING("amqp1 plugin: Ignoring unknown "
508               "instance configuration option "
509               "\%s\".",
510               child->key);
511     if (status != 0)
512       break;
513   }
514
515   if (status != 0) {
516     amqp1_config_instance_free(instance);
517     return status;
518   } else {
519     char tpname[128];
520     snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
521     snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
522              transport->address, instance->name);
523     if (instance->notify == true) {
524       status = plugin_register_notification(
525           tpname, amqp1_notify,
526           &(user_data_t){
527               .data = instance, .free_func = amqp1_config_instance_free,
528           });
529     } else {
530       status = plugin_register_write(
531           tpname, amqp1_write,
532           &(user_data_t){
533               .data = instance, .free_func = amqp1_config_instance_free,
534           });
535     }
536
537     if (status != 0) {
538       amqp1_config_instance_free(instance);
539     }
540   }
541
542   return status;
543 } /* }}} int amqp1_config_instance */
544
545 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
546 {
547   int status = 0;
548
549   transport = calloc(1, sizeof(*transport));
550   if (transport == NULL) {
551     ERROR("amqp1 plugin: calloc failed.");
552     return ENOMEM;
553   }
554
555   /* Initialize transport configuration {{{ */
556   transport->name = NULL;
557
558   status = cf_util_get_string(ci, &transport->name);
559   if (status != 0) {
560     sfree(transport);
561     return status;
562   }
563
564   for (int i = 0; i < ci->children_num; i++) {
565     oconfig_item_t *child = ci->children + i;
566
567     if (strcasecmp("Host", child->key) == 0)
568       status = cf_util_get_string(child, &transport->host);
569     else if (strcasecmp("Port", child->key) == 0)
570       status = cf_util_get_string(child, &transport->port);
571     else if (strcasecmp("User", child->key) == 0)
572       status = cf_util_get_string(child, &transport->user);
573     else if (strcasecmp("Password", child->key) == 0)
574       status = cf_util_get_string(child, &transport->password);
575     else if (strcasecmp("Address", child->key) == 0)
576       status = cf_util_get_string(child, &transport->address);
577     else if (strcasecmp("Instance", child->key) == 0)
578       amqp1_config_instance(child);
579     else
580       WARNING("amqp1 plugin: Ignoring unknown "
581               "transport configuration option "
582               "\%s\".",
583               child->key);
584
585     if (status != 0)
586       break;
587   }
588
589   if (status != 0) {
590     amqp1_config_transport_free(transport);
591   }
592   return status;
593 } /* }}} int amqp1_config_transport */
594
595 static int amqp1_config(oconfig_item_t *ci) /* {{{ */
596 {
597
598   for (int i = 0; i < ci->children_num; i++) {
599     oconfig_item_t *child = ci->children + i;
600
601     if (strcasecmp("Transport", child->key) == 0)
602       amqp1_config_transport(child);
603     else
604       WARNING("amqp1 plugin: Ignoring unknown config iption \%s\".",
605               child->key);
606   }
607
608   return 0;
609 } /* }}} int amqp1_config */
610
611 static int amqp1_init(void) /* {{{ */
612 {
613   char addr[PN_MAX_ADDR];
614   int status;
615   char errbuf[1024];
616
617   if (transport == NULL) {
618     ERROR("amqp1: init failed, no transport configured");
619     return -1;
620   }
621
622   if (proactor == NULL) {
623     pthread_mutex_init(&send_lock, /* attr = */ NULL);
624     proactor = pn_proactor();
625     pn_proactor_addr(addr, sizeof(addr), transport->host, transport->port);
626     conn = pn_connection();
627     if (transport->user != NULL) {
628       pn_connection_set_user(conn, transport->user);
629       pn_connection_set_password(conn, transport->password);
630     }
631     pn_proactor_connect(proactor, conn, addr);
632     /* start_thread */
633     status =
634         plugin_thread_create(&event_thread_id, NULL /* no attributes */,
635                              event_thread, NULL /* no argument */, "handle");
636     if (status != 0) {
637       ERROR("amqp1: pthread_create failed: %s",
638             sstrerror(errno, errbuf, sizeof(errbuf)));
639     } else {
640       event_thread_running = 1;
641     }
642   }
643   return 0;
644 } /* }}} int amqp1_init */
645
646 static int amqp1_shutdown(void) /* {{{ */
647 {
648   cd_message_t *cdm;
649
650   /* Stop the proactor thread */
651   if (event_thread_running != 0) {
652     finished = true;
653     /* activate the event thread */
654     pn_connection_wake(conn);
655     pthread_join(event_thread_id, NULL /* no return value */);
656     memset(&event_thread_id, 0, sizeof(event_thread_id));
657   }
658
659   /* Free the remaining out_messages */
660   cdm = DEQ_HEAD(out_messages);
661   while (cdm) {
662     DEQ_REMOVE_HEAD(out_messages);
663     cd_message_free(cdm);
664     cdm = DEQ_HEAD(out_messages);
665   }
666
667   if (proactor != NULL) {
668     pn_proactor_free(proactor);
669   }
670
671   if (transport != NULL) {
672     amqp1_config_transport_free(transport);
673   }
674
675   return 0;
676 } /* }}} int amqp1_shutdown */
677
678 void module_register(void) {
679   plugin_register_complex_config("amqp1", amqp1_config);
680   plugin_register_init("amqp1", amqp1_init);
681   plugin_register_shutdown("amqp1", amqp1_shutdown);
682 } /* void module_register */