initial commit for amqp1 plugin
[collectd.git] / src / amqp1.c
1 /**
2  * collectd - src/amqp1.c
3  * Copyright(c) 2017 Red Hat Inc.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
21  * DEALINGS IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Andy Smith <ansmith@redhat.com>
25  */
26
27 #include "collectd.h"
28
29 #include "common.h"
30 #include "plugin.h"
31 #include "utils_cmd_putval.h"
32 #include "utils_format_graphite.h"
33 #include "utils_format_json.h"
34 #include "utils_random.h"
35 #include "utils_deq.h"
36
37 #include <proton/connection.h>
38 #include <proton/condition.h>
39 #include <proton/delivery.h>
40 #include <proton/link.h>
41 #include <proton/message.h>
42 #include <proton/proactor.h>
43 #include <proton/sasl.h>
44 #include <proton/session.h>
45 #include <proton/transport.h>
46
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <errno.h>
50 #include <stdint.h>
51
52 #define BUFSIZE 8192
53 #define AMQP1_FORMAT_JSON 0
54 #define AMQP1_FORMAT_COMMAND 1
55 #define AMQP1_FORMAT_GRAPHITE 2
56
57 typedef struct amqp1_config_transport_t {
58   DEQ_LINKS(struct amqp1_config_transport_t);
59   char              *name;
60   char              *host;
61   char              *port;
62   char              *user;
63   char              *password;
64   char              *address;
65 } amqp1_config_transport_t;
66
67 typedef struct amqp1_config_instance_t {
68   DEQ_LINKS(struct amqp1_config_instance_t);
69   char              *name;
70   uint8_t           format;
71   unsigned int      graphite_flags;
72   _Bool             store_rates;
73   char              *prefix;
74   char              *postfix;
75   char              escape_char;
76   _Bool             pre_settle;
77   char              send_to[128];
78 } amqp1_config_instance_t;
79
80 DEQ_DECLARE(amqp1_config_instance_t, amqp1_config_instance_list_t);
81
82 typedef struct cd_message_t {
83   DEQ_LINKS(struct cd_message_t);
84   pn_bytes_t mbuf;
85   amqp1_config_instance_t *instance;
86 } cd_message_t;
87
88 DEQ_DECLARE(cd_message_t, cd_message_list_t);
89
90 /* 
91  * Globals
92  */
93 pn_connection_t          *conn = NULL;
94 pn_session_t             *ssn = NULL;
95 pn_link_t                *sender = NULL;
96 pn_proactor_t            *proactor = NULL;
97 pthread_mutex_t          send_lock;
98 cd_message_list_t        out_messages;
99 uint64_t                 cd_tag = 1;
100 uint64_t                 acknowledged = 0;
101 amqp1_config_transport_t *transport = NULL;
102 bool                     finished = false;
103
104 static int       event_thread_running = 0;
105 static pthread_t event_thread_id;
106
107 /*
108  * Functions
109  */
110 static void cd_message_free(cd_message_t *cdm)
111 {
112   if (cdm->mbuf.start) {
113     free((void *)cdm->mbuf.start);
114   }
115   free(cdm);
116 } /* }}} void cd_message_free */
117
118 static int amqp1_send_out_messages(pn_link_t *link) /* {{{ */
119 {
120   uint64_t          dtag;
121   cd_message_list_t to_send;
122   cd_message_t      *cdm;
123   int               link_credit = pn_link_credit(link);
124   int               event_count = 0;
125   pn_delivery_t     *dlv;
126
127   DEQ_INIT(to_send);
128
129   pthread_mutex_lock(&send_lock);
130
131   if (link_credit > 0) {
132     dtag = cd_tag;
133     cdm = DEQ_HEAD(out_messages);
134     while (cdm) {
135       DEQ_REMOVE_HEAD(out_messages);
136       DEQ_INSERT_TAIL(to_send, cdm);
137       if (DEQ_SIZE(to_send) == link_credit)
138         break;
139       cdm = DEQ_HEAD(out_messages);
140     }
141     cd_tag += DEQ_SIZE(to_send);
142   }
143
144   pthread_mutex_unlock(&send_lock);
145
146   /* message is already formatted and encoded */
147   cdm = DEQ_HEAD(to_send);
148   while (cdm) {
149     DEQ_REMOVE_HEAD(to_send);
150     dtag++;
151     dlv = pn_delivery(link, pn_dtag((const char*)&dtag, sizeof(dtag)));
152     pn_link_send(link, cdm->mbuf.start, cdm->mbuf.size);
153     pn_link_advance(link);
154     if (cdm->instance->pre_settle == true) {
155       pn_delivery_settle(dlv);
156     }    
157     event_count++;
158     cd_message_free(cdm);
159     cdm = DEQ_HEAD(to_send);
160   }
161
162   return event_count;
163 } /* }}} int amqp1_send_out_messages */
164
165 static void check_condition(pn_event_t *e, pn_condition_t *cond) /* {{{ */
166 {
167   if (pn_condition_is_set(cond)) {
168     ERROR("amqp1 plugin: %s: %s: %s",
169           pn_event_type_name(pn_event_type(e)),
170           pn_condition_get_name(cond),
171           pn_condition_get_description(cond));
172     pn_connection_close(pn_event_connection(e));
173     conn = NULL;
174   }
175 } /* }}} void check_condition */
176
177 static bool handle(pn_event_t *event) /* {{{ */
178 {
179
180   switch (pn_event_type(event)) {
181
182   case PN_CONNECTION_INIT:{
183     conn = pn_event_connection(event);
184     pn_connection_set_container(conn, transport->address);
185     pn_connection_open(conn);
186     ssn = pn_session(conn);
187     pn_session_open(ssn);
188     sender = pn_sender(ssn, "cd-sender");
189     pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
190     pn_link_open(sender);
191     break;
192   }
193
194   case PN_LINK_FLOW: {
195     /* peer has given us credit, send outbound messages */
196     amqp1_send_out_messages(sender);
197     break;
198   }
199
200   case PN_DELIVERY: {
201     /* acknowledgement from peer that a message was delivered */
202     pn_delivery_t * dlv = pn_event_delivery(event);
203     if (pn_delivery_remote_state(dlv) == PN_ACCEPTED) {
204       acknowledged++;
205     }
206     break;
207   }
208
209   case PN_CONNECTION_WAKE: {
210     if (!finished) {
211       amqp1_send_out_messages(sender);
212     }
213     break;
214   }
215        
216   case PN_TRANSPORT_CLOSED: {
217     check_condition(event, pn_transport_condition(pn_event_transport(event)));
218     break;
219   }
220
221   case PN_CONNECTION_REMOTE_CLOSE: {
222     check_condition(event, pn_session_remote_condition(pn_event_session(event)));
223     pn_connection_close(pn_event_connection(event));
224     break;
225   }
226
227   case PN_SESSION_REMOTE_CLOSE: {
228     check_condition(event, pn_session_remote_condition(pn_event_session(event)));
229     pn_connection_close(pn_event_connection(event));
230     break;
231   }
232
233   case PN_LINK_REMOTE_CLOSE:
234   case PN_LINK_REMOTE_DETACH: {
235     check_condition(event, pn_link_remote_condition(pn_event_link(event)));
236     pn_connection_close(pn_event_connection(event));
237     break;
238   }    
239
240   case PN_PROACTOR_INACTIVE: {
241     return false;
242   }
243
244   default: break;
245   }
246   return true;
247 } /* }}} bool handle */
248
249 static void *event_thread(void __attribute__((unused)) * arg) /* {{{ */
250 {
251
252   do {
253     pn_event_batch_t *events = pn_proactor_wait(proactor);
254     pn_event_t *e;
255     for (e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
256       if (!handle(e)) {
257         finished = true;
258       }
259     }
260     pn_proactor_done(proactor, events);
261   } while (!finished);
262
263   event_thread_running = 0;
264
265   return NULL;
266 } /* }}} void event_thread */
267
268 static int amqp1_write(const data_set_t *ds, const value_list_t *vl, /* {{{ */
269                        user_data_t *user_data) 
270 {
271   amqp1_config_instance_t *instance = user_data->data;
272   int          status = 0;
273   size_t       bfree = BUFSIZE;
274   size_t       bfill = 0;
275   cd_message_t *cdm;
276   size_t       bufsize = BUFSIZE;
277   pn_data_t    *body;
278   pn_message_t *message;
279
280   if ((ds == NULL) || (vl == NULL) || (transport == NULL))
281     return EINVAL;
282
283   cdm = NEW(cd_message_t);
284   DEQ_ITEM_INIT(cdm);
285   cdm->mbuf = pn_bytes(bufsize, (char *) malloc(bufsize));
286   cdm->instance = instance;
287
288   switch (instance->format) {
289   case AMQP1_FORMAT_COMMAND:
290     status = cmd_create_putval((char *)cdm->mbuf.start, bufsize, ds, vl);
291     if (status != 0) {
292       ERROR("amqp1 plugin: cmd_create_putval failed with status %i.", status);
293       return status;
294     }
295     cdm->mbuf.size = strlen(cdm->mbuf.start);
296     break;
297   case AMQP1_FORMAT_JSON:
298     format_json_initialize((char *)cdm->mbuf.start, &bfill, &bfree);
299     format_json_value_list((char *)cdm->mbuf.start, &bfill, &bfree, ds, vl,
300                              instance->store_rates);
301     format_json_finalize((char *)cdm->mbuf.start, &bfill, &bfree);
302     cdm->mbuf.size = strlen(cdm->mbuf.start);
303     break;
304   case AMQP1_FORMAT_GRAPHITE:
305     status =
306         format_graphite((char *)cdm->mbuf.start, bufsize, ds, vl, instance->prefix,
307                         instance->postfix, instance->escape_char, instance->graphite_flags);
308     if (status != 0) {
309       ERROR("amqp1 plugin: format_graphite failed with status %i.", status);
310       return status;
311     }
312     cdm->mbuf.size = strlen(cdm->mbuf.start);
313     break;
314   default:
315     ERROR("amqp1 plugin: Invalid format (%i).", instance->format);
316     return -1;
317   }
318
319   /* encode message */
320   message = pn_message();
321   pn_message_set_address(message, instance->send_to);
322   body = pn_message_body(message);
323   pn_data_clear(body);
324   pn_data_put_binary(body, cdm->mbuf);
325   pn_data_exit(body);
326
327   /* put_binary copies and stores so ok to use mbuf */
328   cdm->mbuf.size = bufsize;
329   pn_message_encode(message, (char *)cdm->mbuf.start, &cdm->mbuf.size);
330
331   pthread_mutex_lock(&send_lock);
332   DEQ_INSERT_TAIL(out_messages, cdm);
333   pthread_mutex_unlock(&send_lock);
334
335   pn_message_free(message);
336
337   /* activate the sender */
338   if (conn != NULL) {
339     pn_connection_wake(conn);
340   }
341
342   return 0;
343 } /* }}} int amqp_write1 */
344
345 static void amqp1_config_transport_free(void *ptr) /* {{{ */
346 {
347   amqp1_config_transport_t *transport = ptr;
348  
349   if (transport == NULL)
350     return;
351
352   sfree(transport->name);
353   sfree(transport->host);
354   sfree(transport->user);
355   sfree(transport->password);
356   sfree(transport->address);
357
358   sfree(transport);
359 } /* }}} void amqp1_config_transport_free */
360
361 static void amqp1_config_instance_free(void *ptr) /* {{{ */
362 {
363   amqp1_config_instance_t *instance = ptr;
364   
365   if (instance == NULL)
366     return;
367
368   sfree(instance->name);
369   sfree(instance->prefix);
370   sfree(instance->postfix);
371
372   sfree(instance);
373 } /* }}} void amqp1_config_instance_free */
374
375 static int amqp1_config_instance(oconfig_item_t *ci) /* {{{ */
376 {
377   int  status=0;
378   char *key = NULL;
379   amqp1_config_instance_t *instance;
380
381   instance = calloc(1, sizeof(*instance));
382   if (instance == NULL) {
383     ERROR("amqp1 plugin: calloc failed.");
384     return ENOMEM;
385   }
386
387   /* Initialize instance configuration {{{ */
388   instance->name = NULL;
389
390   status = cf_util_get_string(ci, &instance->name);
391   if (status != 0) {
392     sfree(instance);
393     return status;
394   }
395
396   for (int i = 0; i < ci->children_num; i++) {
397     oconfig_item_t *child = ci->children + i;
398
399     if (strcasecmp("PreSettle", child->key) == 0)
400       status = cf_util_get_boolean(child, &instance->pre_settle);
401     else if (strcasecmp("Format", child->key) == 0) {
402       status = cf_util_get_string(child, &key);
403       if (status != 0)
404           return status;
405           /* TODO: goto errout */
406       //          goto errout;
407       assert(key != NULL);
408       if (strcasecmp(key, "Command") == 0) {
409         instance->format = AMQP1_FORMAT_COMMAND;
410       } else if (strcasecmp(key, "Graphite") == 0) {
411         instance->format = AMQP1_FORMAT_GRAPHITE;
412       } else if (strcasecmp(key, "JSON") == 0) {
413         instance->format = AMQP1_FORMAT_JSON;
414       } else {
415         WARNING("amqp1 plugin: Invalid format string: %s", key);
416       }
417       sfree(key);
418     }
419     else if (strcasecmp("StoreRates", child->key) == 0)
420       status = cf_util_get_boolean(child, &instance->store_rates);
421     else if (strcasecmp("GraphiteSeparateInstances", child->key) == 0)
422       status = cf_util_get_flag(child, &instance->graphite_flags,
423                                 GRAPHITE_SEPARATE_INSTANCES);
424     else if (strcasecmp("GraphiteAlwaysAppendDS", child->key) == 0)
425       status = cf_util_get_flag(child, &instance->graphite_flags,
426                                 GRAPHITE_ALWAYS_APPEND_DS);
427     else if (strcasecmp("GraphitePreserveSeparator", child->key) == 0)
428       status = cf_util_get_flag(child, &instance->graphite_flags,
429                                 GRAPHITE_PRESERVE_SEPARATOR);
430     else if (strcasecmp("GraphitePrefix", child->key) == 0)
431       status = cf_util_get_string(child, &instance->prefix);
432     else if (strcasecmp("GraphitePostfix", child->key) == 0)
433       status = cf_util_get_string(child, &instance->postfix);
434     else if (strcasecmp("GraphiteEscapeChar", child->key) == 0) {
435       char *tmp_buff = NULL;
436       status = cf_util_get_string(child, &tmp_buff);
437       if (strlen(tmp_buff) > 1)
438         WARNING("amqp1 plugin: The option \"GraphiteEscapeChar\" handles "
439                 "only one character. Others will be ignored.");
440       instance->escape_char = tmp_buff[0];
441       sfree(tmp_buff);
442     }
443     else
444       WARNING("amqp1 plugin: Ignoring unknown "
445               "instance configuration option "
446               "\%s\".", child->key);
447     if (status != 0)
448       break;
449   }
450
451   if (status != 0) {
452     amqp1_config_instance_free(instance);
453     return status;
454   } else {
455     char tpname[128];
456     snprintf(tpname, sizeof(tpname), "amqp1/%s", instance->name);
457     snprintf(instance->send_to, sizeof(instance->send_to), "/%s/%s",
458              transport->address,instance->name);
459     status = plugin_register_write(tpname, amqp1_write, &(user_data_t) {
460             .data = instance, .free_func = amqp1_config_instance_free, });
461     if (status != 0) {
462       amqp1_config_instance_free(instance);
463     }
464   }
465
466   return status;
467 } /* }}} int amqp1_config_instance */
468   
469 static int amqp1_config_transport(oconfig_item_t *ci) /* {{{ */
470 {
471   int status=0;
472   
473   transport = calloc(1, sizeof(*transport));
474   if (transport == NULL) {
475     ERROR("amqp1 plugin: calloc failed.");
476     return ENOMEM;
477   }
478
479   /* Initialize transport configuration {{{ */
480   transport->name = NULL;
481
482   status = cf_util_get_string(ci, &transport->name);
483   if (status != 0) {
484     sfree(transport);
485     return status;
486   }
487  
488   for (int i = 0; i < ci->children_num; i++) {
489     oconfig_item_t *child = ci->children + i;
490
491     if (strcasecmp("Host", child->key) == 0)
492       status = cf_util_get_string(child, &transport->host);
493     else if (strcasecmp("Port", child->key) == 0)
494       status = cf_util_get_string(child, &transport->port);
495     else if (strcasecmp("User", child->key) == 0)
496       status = cf_util_get_string(child, &transport->user);
497     else if (strcasecmp("Password", child->key) == 0)
498       status = cf_util_get_string(child, &transport->password);
499     else if (strcasecmp("Address", child->key) == 0)
500       status = cf_util_get_string(child, &transport->address);
501     else if (strcasecmp("Instance",child->key) == 0)
502       amqp1_config_instance(child);
503     else
504       WARNING("amqp1 plugin: Ignoring unknown "
505               "transport configuration option "
506               "\%s\".", child->key);
507    
508     if (status != 0)
509       break;
510   }
511
512   if (status != 0) {
513     amqp1_config_transport_free(transport);
514   }
515   return status;
516 }  /* }}} int amqp1_config_transport */
517
518 static int amqp1_config(oconfig_item_t *ci) /* {{{ */
519 {
520
521   for (int i = 0; i < ci->children_num; i++) {
522     oconfig_item_t *child = ci->children + i;
523
524     if (strcasecmp("Transport", child->key) == 0)
525       amqp1_config_transport(child);
526     else
527       WARNING("amqp1 plugin: Ignoring unknown config iption \%s\".",
528               child->key);
529   }
530
531   return 0;
532 } /* }}} int amqp1_config */
533
534 static int amqp1_init(void) /* {{{ */
535 {
536   char addr[PN_MAX_ADDR];
537   int  status;
538   char errbuf[1024];
539
540   if (transport == NULL) {
541     ERROR("amqp1: init failed, no transport configured");
542     return -1;
543   }
544
545   if (proactor == NULL) {
546     pthread_mutex_init(&send_lock, /* attr = */ NULL);
547     proactor = pn_proactor();
548     pn_proactor_addr(addr, sizeof(addr),transport->host,transport->port);
549     conn = pn_connection();
550     if (transport->user != NULL) {
551         pn_connection_set_user(conn, transport->user);
552         pn_connection_set_password(conn, transport->password);
553     }
554     pn_proactor_connect(proactor, conn, addr);
555     /* start_thread */
556     status = plugin_thread_create(&event_thread_id, NULL /* no attributes */,
557                                   event_thread, NULL /* no argument */,
558                                   "handle");
559     if (status != 0) {
560       ERROR("amqp1: pthread_create failed: %s",
561             sstrerror(errno, errbuf, sizeof(errbuf)));
562     } else {
563       event_thread_running = 1;
564     }
565   }
566   return 0;
567 } /* }}} int amqp1_init */
568
569 static int amqp1_shutdown
570 (void) /* {{{ */
571 {
572   cd_message_t *cdm;
573
574   /* Stop the proactor thread */
575   if (event_thread_running != 0) {
576     finished=true;
577     /* activate the event thread */
578     pn_connection_wake(conn);
579     pthread_join(event_thread_id, NULL /* no return value */);
580     memset(&event_thread_id, 0, sizeof(event_thread_id));
581   }
582
583   /* Free the remaining out_messages */
584   cdm = DEQ_HEAD(out_messages);
585   while (cdm) {
586     DEQ_REMOVE_HEAD(out_messages);
587     cd_message_free(cdm);
588     cdm = DEQ_HEAD(out_messages);
589   }
590
591   if (proactor != NULL) {
592     pn_proactor_free(proactor);
593   }
594
595   if (transport != NULL) {
596     amqp1_config_transport_free(transport);
597   }
598
599   return 0;
600 } /* }}} int amqp1_shutdown */
601
602 void module_register(void)
603 {
604   plugin_register_complex_config("amqp1", amqp1_config);
605   plugin_register_init("amqp1", amqp1_init);
606   plugin_register_shutdown("amqp1",amqp1_shutdown);
607 } /* void module_register */