Add notify handler
[collectd.git] / src / amqp1.c
1 /**
2  * collectd - src/amqp1.c
3  * Copyright(c) 2017 Red Hat Inc.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Andy Smith <ansmith@redhat.com>
25  */
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_cmd_putval.h"
32 #include "utils_format_graphite.h"
33 #include "utils_format_json.h"
34 #include "utils_random.h"
35 #include "utils_deq.h"
36
37 #include <proton/connection.h>
38 #include <proton/condition.h>
39 #include <proton/delivery.h>
40 #include <proton/link.h>
41 #include <proton/message.h>
42 #include <proton/proactor.h>
43 #include <proton/sasl.h>
44 #include <proton/session.h>
45 #include <proton/transport.h>
46
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <errno.h>
50 #include <stdint.h>
51
52 #define BUFSIZE 8192
53 #define AMQP1_FORMAT_JSON 0
54 #define AMQP1_FORMAT_COMMAND 1
55 #define AMQP1_FORMAT_GRAPHITE 2
56
57 typedef struct amqp1_config_transport_t {
58   DEQ_LINKS(struct amqp1_config_transport_t);
59   char              *name;
60   char              *host;
61   char              *port;
62   char              *user;
63   char              *password;
64   char              *address;
65 } amqp1_config_transport_t;
66
67 typedef struct amqp1_config_instance_t {
68   DEQ_LINKS(struct amqp1_config_instance_t);
69   char              *name;
70   _Bool             notify;
71   uint8_t           format;
72   unsigned int      graphite_flags;
73   _Bool             store_rates;
74   char              *prefix;
75   char              *postfix;
76   char              escape_char;
77   _Bool             pre_settle;
78   char              send_to[128];
79 } amqp1_config_instance_t;
80
81 DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
82
83 typedef struct cd_message_t {
84   DEQ_LINKS(struct cd_message_t);
85   pn_bytes_t mbuf;
86   amqp1_config_instance_t *instance;
87 } cd_message_t;
88
89 DEQ_DECLARE(cd_message_t, cd_message_list_t);
90
91 /*
92  * Globals
93  */
94 pn_connection_t          *conn = NULL;
95 pn_session_t             *ssn = NULL;
96 pn_link_t                *sender = NULL;
97 pn_proactor_t            *proactor = NULL;
98 pthread_mutex_t          send_lock;
99 cd_message_list_t        out_messages;
100 uint64_t                 cd_tag = 1;
101 uint64_t                 acknowledged = 0;
102 amqp1_config_transport_t *transport = NULL;
103 bool                     finished = false;
104
105 static int       event_thread_running = 0;
106 static pthread_t event_thread_id;
107
108 /*
109  * Functions
110  */
111 static void cd_message_free(cd_message_t *cdm)
112 {
113   if (cdm->mbuf.start) {
114     free((void *)cdm->mbuf.start);
115   }
116   free(cdm);
117 } /* }}} void cd_message_free */
118
119 static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
120 {
121   uint64_t          dtag;
122   cd_message_list_t to_send;
123   cd_message_t      *cdm;
124   int               link_credit = pn_link_credit(link);
125   int               event_count = 0;
126   pn_delivery_t     *dlv;
127
128   DEQ_INIT(to_send);
129
130   pthread_mutex_lock(&send_lock);
131
132   if (link_credit > 0) {
133     dtag = cd_tag;
134     cdm = DEQ_HEAD(out_messages);
135     while (cdm) {
136       DEQ_REMOVE_HEAD(out_messages);
137       DEQ_INSERT_TAIL(to_send, cdm);
138       if (DEQ_SIZE(to_send) == link_credit)
139         break;
140       cdm = DEQ_HEAD(out_messages);
141     }
142     cd_tag += DEQ_SIZE(to_send);
143   }
144
145   pthread_mutex_unlock(&send_lock);
146
147   /* message is already formatted and encoded */
148   cdm = DEQ_HEAD(to_send);
149   while (cdm) {
150     DEQ_REMOVE_HEAD(to_send);
151     dtag++;
152     dlv = pn_delivery(link, pn_dtag((const char*)&dtag, sizeof(dtag)));
153     pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
154     pn_link_advance(link);
155     if (cdm->instance->pre_settle == true) {
156       pn_delivery_settle(dlv);
157     }
158     event_count++;
159     cd_message_free(cdm);
160     cdm = DEQ_HEAD(to_send);
161   }
162
163   return event_count;
164 } /* }}} int amqp1_send_out_messages */
165
166 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
167 {
168   if (pn_condition_is_set(cond)) {
169     ERROR("amqp1 plugin: %s: %s: %s",
170           pn_event_type_name(pn_event_type(e)),
171           pn_condition_get_name(cond),
172           pn_condition_get_description(cond));
173     pn_connection_close(pn_event_connection(e));
174     conn = NULL;
175   }
176 } /* }}} void check_condition */
177
178 static bool handle(pn_event_t *event) /* {{{ */
179 {
180
181   switch (pn_event_type(event)) {
182
183   case PN_CONNECTION_INIT:{
184     conn = pn_event_connection(event);
185     pn_connection_set_container(conn, transport->address);
186     pn_connection_open(conn);
187     ssn = pn_session(conn);
188     pn_session_open(ssn);
189     sender = pn_sender(ssn, "cd-sender");
190     pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
191     pn_link_open(sender);
192     break;
193   }
194
195   case PN_LINK_FLOW: {
196     /* peer has given us credit, send outbound messages */
197     amqp1_send_out_messages(sender);
198     break;
199   }
200
201   case PN_DELIVERY: {
202     /* acknowledgement from peer that a message was delivered */
203     pn_delivery_t * dlv = pn_event_delivery(event);
204     if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) {
205       acknowledged++;
206     }
207     break;
208   }
209
210   case PN_CONNECTION_WAKE: {
211     if (!finished) {
212       amqp1_send_out_messages(sender);
213     }
214     break;
215   }
216
217   case PN_TRANSPORT_CLOSED: {
218     check_condition(event, pn_transport_condition(pn_event_transport(event)));
219     break;
220   }
221
222   case PN_CONNECTION_REMOTE_CLOSE: {
223     check_condition(event, pn_session_remote_condition(pn_event_session(event)));
224     pn_connection_close(pn_event_connection(event));
225     break;
226   }
227
228   case PN_SESSION_REMOTE_CLOSE: {
229     check_condition(event, pn_session_remote_condition(pn_event_session(event)));
230     pn_connection_close(pn_event_connection(event));
231     break;
232   }
233
234   case PN_LINK_REMOTE_CLOSE:
235   case PN_LINK_REMOTE_DETACH: {
236     check_condition(event, pn_link_remote_condition(pn_event_link(event)));
237     pn_connection_close(pn_event_connection(event));
238     break;
239   }
240
241   case PN_PROACTOR_INACTIVE: {
242     return false;
243   }
244
245   default: break;
246   }
247   return true;
248 } /* }}} bool handle */
249
250 static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
251 {
252
253   do {
254     pn_event_batch_t *events = pn_proactor_wait(proactor);
255     pn_event_t *e;
256     for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
257       if (!handle(e)) {
258         finished = true;
259       }
260     }
261     pn_proactor_done(proactor, events);
262   } while (!finished);
263
264   event_thread_running = 0;
265
266   return NULL;
267 } /* }}} void event_thread */
268
269 static void encqueue(cd_message_t *cdm, amqp1_config_instance_t *instance ) /* {{{ */
270 {
271   size_t       bufsize = BUFSIZE;
272   pn_data_t    *body;
273   pn_message_t *message;
274
275   /* encode message */
276   message = pn_message();
277   pn_message_set_address(message, instance->send_to);
278   body = pn_message_body(message);
279   pn_data_clear(body);
280   pn_data_put_binary(body, cdm->mbuf);
281   pn_data_exit(body);
282
283   /* put_binary copies and stores so ok to use mbuf */
284   cdm->mbuf.size = bufsize;
285   pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
286
287   pthread_mutex_lock(&send_lock);
288   DEQ_INSERT_TAIL(out_messages, cdm);
289   pthread_mutex_unlock(&send_lock);
290
291   pn_message_free(message);
292
293   /* activate the sender */
294   if (conn != NULL) {
295     pn_connection_wake(conn);
296   }
297
298 } /* }}} void encqueue */
299
300 static int amqp1_notify(notification_t const *n, user_data_t *user_data) /* {{{ */
301 {
302   amqp1_config_instance_t *instance;
303   int          status = 0;
304   size_t       bfree = BUFSIZE;
305   size_t       bfill = 0;
306   cd_message_t *cdm;
307   size_t       bufsize = BUFSIZE;
308
309   if ((n == NULL) || (user_data == NULL))
310     return EINVAL;
311
312   instance = user_data->data;
313
314   if (instance->notify != true) {
315     ERROR("amqp1 plugin: write notification failed");
316   }
317
318   cdm = NEW(cd_message_t);
319   DEQ_ITEM_INIT(cdm);
320   cdm->mbuf = pn_bytes(bufsize, (char *) malloc(bufsize));
321   cdm->instance = instance;
322
323   switch (instance->format) {
324   case AMQP1_FORMAT_JSON:
325     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
326     status = format_json_notification((char *)cdm->mbuf.start, bufsize, n);
327     if (status != 0) {
328       ERROR("amqp1 plugin: formatting notification failed");
329       return status;
330     }
331     cdm->mbuf.size = strlen(cdm->mbuf.start);
332     break;
333   default:
334     ERROR("amqp1 plugin: Invalid notify format (%i).", instance->format);
335     return -1;
336   }
337
338   /* encode message and place on outbound queue */
339   encqueue(cdm, instance);
340
341   return 0;
342 } /* }}} int amqp1_notify */
343
344 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
345                        user_data_t *user_data)
346 {
347   amqp1_config_instance_t *instance;
348   int          status = 0;
349   size_t       bfree = BUFSIZE;
350   size_t       bfill = 0;
351   cd_message_t *cdm;
352   size_t       bufsize = BUFSIZE;
353
354   if ((ds == NULL) || (vl == NULL) || (transport == NULL) || (user_data == NULL))
355     return EINVAL;
356
357   instance = user_data->data;
358
359   if (instance->notify != false) {
360     ERROR("amqp1 plugin: write failed");
361   }
362
363   cdm = NEW(cd_message_t);
364   DEQ_ITEM_INIT(cdm);
365   cdm->mbuf = pn_bytes(bufsize, (char *) malloc(bufsize));
366   cdm->instance = instance;
367
368   switch (instance->format) {
369   case AMQP1_FORMAT_COMMAND:
370     status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
371     if (status != 0) {
372       ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
373       return status;
374     }
375     cdm->mbuf.size = strlen(cdm->mbuf.start);
376     break;
377   case AMQP1_FORMAT_JSON:
378     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
379     format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
380                              instance->store_rates);
381     format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
382     cdm->mbuf.size = strlen(cdm->mbuf.start);
383     break;
384   case AMQP1_FORMAT_GRAPHITE:
385     status =
386         format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl, instance->prefix,
387                         instance->postfix, instance->escape_char, instance->graphite_flags);
388     if (status != 0) {
389       ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
390       return status;
391     }
392     cdm->mbuf.size = strlen(cdm->mbuf.start);
393     break;
394   default:
395     ERROR("amqp1 plugin: Invalid write format (%i).", instance->format);
396     return -1;
397   }
398
399   /* encode message and place on outboud queue */
400   encqueue(cdm, instance);
401
402   return 0;
403 } /* }}} int amqp1_write */
404
405 static void amqp1_config_transport_free(void *ptr) /* {{{ */
406 {
407   amqp1_config_transport_t *transport = ptr;
408
409   if (transport == NULL)
410     return;
411
412   sfree(transport->name);
413   sfree(transport->host);
414   sfree(transport->user);
415   sfree(transport->password);
416   sfree(transport->address);
417
418   sfree(transport);
419 } /* }}} void amqp1_config_transport_free */
420
421 static void amqp1_config_instance_free(void *ptr) /* {{{ */
422 {
423   amqp1_config_instance_t *instance = ptr;
424
425   if (instance == NULL)
426     return;
427
428   sfree(instance->name);
429   sfree(instance->prefix);
430   sfree(instance->postfix);
431
432   sfree(instance);
433 } /* }}} void amqp1_config_instance_free */
434
435 static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
436 {
437   int  status=0;
438   char *key = NULL;
439   amqp1_config_instance_t *instance;
440
441   instance = calloc(1, sizeof(*instance));
442   if (instance == NULL) {
443     ERROR("amqp1 plugin: calloc failed.");
444     return ENOMEM;
445   }
446
447   /* Initialize instance configuration {{{ */
448   instance->name = NULL;
449
450   status = cf_util_get_string(ci, &instance->name);
451   if (status != 0) {
452     sfree(instance);
453     return status;
454   }
455
456   for (int i = 0; i < ci->children_num; i++) {
457     oconfig_item_t *child = ci->children + i;
458
459     if (strcasecmp("PreSettle", child->key) == 0)
460       status = cf_util_get_boolean(child, &instance->pre_settle);
461     else if (strcasecmp("Notify", child->key) == 0)
462       status = cf_util_get_boolean(child, &instance->notify);
463     else if (strcasecmp("Format", child->key) == 0) {
464       status = cf_util_get_string(child, &key);
465       if (status != 0)
466           return status;
467           /* TODO: goto errout */
468       //          goto errout;
469       assert(key != NULL);
470       if (strcasecmp(key, "Command") == 0) {
471         instance->format = AMQP1_FORMAT_COMMAND;
472       } else if (strcasecmp(key, "Graphite") == 0) {
473         instance->format = AMQP1_FORMAT_GRAPHITE;
474       } else if (strcasecmp(key, "JSON") == 0) {
475         instance->format = AMQP1_FORMAT_JSON;
476       } else {
477         WARNING("amqp1 plugin: Invalid format string: %s", key);
478       }
479       sfree(key);
480     }
481     else if (strcasecmp("StoreRates", child->key) == 0)
482       status = cf_util_get_boolean(child, &instance->store_rates);
483     else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
484       status = cf_util_get_flag(child, &instance->graphite_flags,
485                                 GRAPHITE_SEPARATE_INSTANCES);
486     else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0)
487       status = cf_util_get_flag(child, &instance->graphite_flags,
488                                 GRAPHITE_ALWAYS_APPEND_DS);
489     else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0)
490       status = cf_util_get_flag(child, &instance->graphite_flags,
491                                 GRAPHITE_PRESERVE_SEPARATOR);
492     else if (strcasecmp("GraphitePrefix", child->key) == 0)
493       status = cf_util_get_string(child, &instance->prefix);
494     else if (strcasecmp("GraphitePostfix", child->key) == 0)
495       status = cf_util_get_string(child, &instance->postfix);
496     else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
497       char *tmp_buff = NULL;
498       status = cf_util_get_string(child, &tmp_buff);
499       if (strlen(tmp_buff) > 1)
500         WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
501                 "only one character. Others will be ignored.");
502       instance->escape_char = tmp_buff[0];
503       sfree(tmp_buff);
504     }
505     else
506       WARNING("amqp1 plugin: Ignoring unknown "
507               "instance configuration option "
508               "\%s\".", child->key);
509     if (status != 0)
510       break;
511   }
512
513   if (status != 0) {
514     amqp1_config_instance_free(instance);
515     return status;
516   } else {
517     char tpname[128];
518     snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
519     snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
520              transport->address,instance->name);
521     if (instance->notify == true) {
522       status = plugin_register_notification(tpname, amqp1_notify, &(user_data_t) {
523               .data = instance, .free_func = amqp1_config_instance_free, });
524     } else {
525       status = plugin_register_write(tpname, amqp1_write, &(user_data_t) {
526               .data = instance, .free_func = amqp1_config_instance_free, });
527     }
528
529     if (status != 0) {
530       amqp1_config_instance_free(instance);
531     }
532   }
533
534   return status;
535 } /* }}} int amqp1_config_instance */
536
537 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
538 {
539   int status=0;
540
541   transport = calloc(1, sizeof(*transport));
542   if (transport == NULL) {
543     ERROR("amqp1 plugin: calloc failed.");
544     return ENOMEM;
545   }
546
547   /* Initialize transport configuration {{{ */
548   transport->name = NULL;
549
550   status = cf_util_get_string(ci, &transport->name);
551   if (status != 0) {
552     sfree(transport);
553     return status;
554   }
555
556   for (int i = 0; i < ci->children_num; i++) {
557     oconfig_item_t *child = ci->children + i;
558
559     if (strcasecmp("Host", child->key) == 0)
560       status = cf_util_get_string(child, &transport->host);
561     else if (strcasecmp("Port", child->key) == 0)
562       status = cf_util_get_string(child, &transport->port);
563     else if (strcasecmp("User", child->key) == 0)
564       status = cf_util_get_string(child, &transport->user);
565     else if (strcasecmp("Password", child->key) == 0)
566       status = cf_util_get_string(child, &transport->password);
567     else if (strcasecmp("Address", child->key) == 0)
568       status = cf_util_get_string(child, &transport->address);
569     else if (strcasecmp("Instance",child->key) == 0)
570       amqp1_config_instance(child);
571     else
572       WARNING("amqp1 plugin: Ignoring unknown "
573               "transport configuration option "
574               "\%s\".", child->key);
575
576     if (status != 0)
577       break;
578   }
579
580   if (status != 0) {
581     amqp1_config_transport_free(transport);
582   }
583   return status;
584 }  /* }}} int amqp1_config_transport */
585
586 static int amqp1_config(oconfig_item_t *ci) /* {{{ */
587 {
588
589   for (int i = 0; i < ci->children_num; i++) {
590     oconfig_item_t *child = ci->children + i;
591
592     if (strcasecmp("Transport", child->key) == 0)
593       amqp1_config_transport(child);
594     else
595       WARNING("amqp1 plugin: Ignoring unknown config iption \%s\".",
596               child->key);
597   }
598
599   return 0;
600 } /* }}} int amqp1_config */
601
602 static int amqp1_init(void) /* {{{ */
603 {
604   char addr[PN_MAX_ADDR];
605   int  status;
606   char errbuf[1024];
607
608   if (transport == NULL) {
609     ERROR("amqp1: init failed, no transport configured");
610     return -1;
611   }
612
613   if (proactor == NULL) {
614     pthread_mutex_init(&send_lock, /* attr = */ NULL);
615     proactor = pn_proactor();
616     pn_proactor_addr(addr, sizeof(addr),transport->host,transport->port);
617     conn = pn_connection();
618     if (transport->user != NULL) {
619         pn_connection_set_user(conn, transport->user);
620         pn_connection_set_password(conn, transport->password);
621     }
622     pn_proactor_connect(proactor, conn, addr);
623     /* start_thread */
624     status = plugin_thread_create(&event_thread_id, NULL /* no attributes */,
625                                   event_thread, NULL /* no argument */,
626                                   "handle");
627     if (status != 0) {
628       ERROR("amqp1: pthread_create failed: %s",
629             sstrerror(errno, errbuf, sizeof(errbuf)));
630     } else {
631       event_thread_running = 1;
632     }
633   }
634   return 0;
635 } /* }}} int amqp1_init */
636
637 static int amqp1_shutdown(void) /* {{{ */
638 {
639   cd_message_t *cdm;
640
641   /* Stop the proactor thread */
642   if (event_thread_running != 0) {
643     finished=true;
644     /* activate the event thread */
645     pn_connection_wake(conn);
646     pthread_join(event_thread_id, NULL /* no return value */);
647     memset(&event_thread_id, 0, sizeof(event_thread_id));
648   }
649
650   /* Free the remaining out_messages */
651   cdm = DEQ_HEAD(out_messages);
652   while (cdm) {
653     DEQ_REMOVE_HEAD(out_messages);
654     cd_message_free(cdm);
655     cdm = DEQ_HEAD(out_messages);
656   }
657
658   if (proactor != NULL) {
659     pn_proactor_free(proactor);
660   }
661
662   if (transport != NULL) {
663     amqp1_config_transport_free(transport);
664   }
665
666   return 0;
667 } /* }}} int amqp1_shutdown */
668
669 void module_register(void)
670 {
671   plugin_register_complex_config("amqp1", amqp1_config);
672   plugin_register_init("amqp1", amqp1_init);
673   plugin_register_shutdown("amqp1",amqp1_shutdown);
674 } /* void module_register */