amqp plugin: Fix compabitility with current librabbitmq.
[collectd.git] / src / amqp.c
1 /**
2  * collectd - src/amqp.c
3  * Copyright (C) 2009       Sebastien Pahl
4  * Copyright (C) 2010-2012  Florian Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Sebastien Pahl <sebastien.pahl at dotcloud.com>
26  *   Florian Forster <octo at verplant.org>
27  **/
28
29 #include "collectd.h"
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_cmd_putval.h"
33 #include "utils_format_json.h"
34
35 #include <pthread.h>
36
37 #include <amqp.h>
38 #include <amqp_framing.h>
39
40 /* Defines for the delivery mode. I have no idea why they're not defined by the
41  * library.. */
42 #define CAMQP_DM_VOLATILE   1
43 #define CAMQP_DM_PERSISTENT 2
44
45 #define CAMQP_FORMAT_COMMAND 1
46 #define CAMQP_FORMAT_JSON    2
47
48 #define CAMQP_CHANNEL 1
49
50 /*
51  * Data types
52  */
53 struct camqp_config_s
54 {
55     _Bool   publish;
56     char   *name;
57
58     char   *host;
59     int     port;
60     char   *vhost;
61     char   *user;
62     char   *password;
63
64     char   *exchange;
65     char   *routing_key;
66
67     /* publish only */
68     uint8_t delivery_mode;
69     _Bool   store_rates;
70     int     format;
71
72     /* subscribe only */
73     char   *exchange_type;
74     char   *queue;
75
76     amqp_connection_state_t connection;
77     pthread_mutex_t lock;
78 };
79 typedef struct camqp_config_s camqp_config_t;
80
81 /*
82  * Global variables
83  */
84 static const char *def_host       = "localhost";
85 static const char *def_vhost      = "/";
86 static const char *def_user       = "guest";
87 static const char *def_password   = "guest";
88 static const char *def_exchange   = "amq.fanout";
89
90 static pthread_t *subscriber_threads     = NULL;
91 static size_t     subscriber_threads_num = 0;
92 static _Bool      subscriber_threads_running = 1;
93
94 #define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
95
96 /*
97  * Functions
98  */
99 static void camqp_close_connection (camqp_config_t *conf) /* {{{ */
100 {
101     int sockfd;
102
103     if ((conf == NULL) || (conf->connection == NULL))
104         return;
105
106     sockfd = amqp_get_sockfd (conf->connection);
107     amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS);
108     amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
109     amqp_destroy_connection (conf->connection);
110     close (sockfd);
111     conf->connection = NULL;
112 } /* }}} void camqp_close_connection */
113
114 static void camqp_config_free (void *ptr) /* {{{ */
115 {
116     camqp_config_t *conf = ptr;
117
118     if (conf == NULL)
119         return;
120
121     camqp_close_connection (conf);
122
123     sfree (conf->name);
124     sfree (conf->host);
125     sfree (conf->vhost);
126     sfree (conf->user);
127     sfree (conf->password);
128     sfree (conf->exchange);
129     sfree (conf->exchange_type);
130     sfree (conf->queue);
131     sfree (conf->routing_key);
132
133     sfree (conf);
134 } /* }}} void camqp_config_free */
135
136 static char *camqp_bytes_cstring (amqp_bytes_t *in) /* {{{ */
137 {
138     char *ret;
139
140     if ((in == NULL) || (in->bytes == NULL))
141         return (NULL);
142
143     ret = malloc (in->len + 1);
144     if (ret == NULL)
145         return (NULL);
146
147     memcpy (ret, in->bytes, in->len);
148     ret[in->len] = 0;
149
150     return (ret);
151 } /* }}} char *camqp_bytes_cstring */
152
153 static _Bool camqp_is_error (camqp_config_t *conf) /* {{{ */
154 {
155     amqp_rpc_reply_t r;
156
157     r = amqp_get_rpc_reply (conf->connection);
158     if (r.reply_type == AMQP_RESPONSE_NORMAL)
159         return (0);
160
161     return (1);
162 } /* }}} _Bool camqp_is_error */
163
164 static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
165         char *buffer, size_t buffer_size)
166 {
167     amqp_rpc_reply_t r;
168
169     r = amqp_get_rpc_reply (conf->connection);
170     switch (r.reply_type)
171     {
172         case AMQP_RESPONSE_NORMAL:
173             sstrncpy (buffer, "Success", sizeof (buffer));
174             break;
175
176         case AMQP_RESPONSE_NONE:
177             sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer));
178             break;
179
180         case AMQP_RESPONSE_LIBRARY_EXCEPTION:
181 #if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
182             if (r.library_errno)
183                 return (sstrerror (r.library_errno, buffer, buffer_size));
184 #else
185             if (r.library_error)
186                 return (sstrerror (r.library_error, buffer, buffer_size));
187 #endif
188             else
189                 sstrncpy (buffer, "End of stream", sizeof (buffer));
190             break;
191
192         case AMQP_RESPONSE_SERVER_EXCEPTION:
193             if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD)
194             {
195                 amqp_connection_close_t *m = r.reply.decoded;
196                 char *tmp = camqp_bytes_cstring (&m->reply_text);
197                 ssnprintf (buffer, buffer_size, "Server connection error %d: %s",
198                         m->reply_code, tmp);
199                 sfree (tmp);
200             }
201             else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
202             {
203                 amqp_channel_close_t *m = r.reply.decoded;
204                 char *tmp = camqp_bytes_cstring (&m->reply_text);
205                 ssnprintf (buffer, buffer_size, "Server channel error %d: %s",
206                         m->reply_code, tmp);
207                 sfree (tmp);
208             }
209             else
210             {
211                 ssnprintf (buffer, buffer_size, "Server error method %#"PRIx32,
212                         r.reply.id);
213             }
214             break;
215
216         default:
217             ssnprintf (buffer, buffer_size, "Unknown reply type %i",
218                     (int) r.reply_type);
219     }
220
221     return (buffer);
222 } /* }}} char *camqp_strerror */
223
224 #if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
225 static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
226 {
227     amqp_exchange_declare_ok_t *ed_ret;
228
229     if (conf->exchange_type == NULL)
230         return (0);
231
232     ed_ret = amqp_exchange_declare (conf->connection,
233             /* channel     = */ CAMQP_CHANNEL,
234             /* exchange    = */ amqp_cstring_bytes (conf->exchange),
235             /* type        = */ amqp_cstring_bytes (conf->exchange_type),
236             /* passive     = */ 0,
237             /* durable     = */ 0,
238             /* auto_delete = */ 1,
239             /* arguments   = */ AMQP_EMPTY_TABLE);
240     if ((ed_ret == NULL) && camqp_is_error (conf))
241     {
242         char errbuf[1024];
243         ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
244                 camqp_strerror (conf, errbuf, sizeof (errbuf)));
245         camqp_close_connection (conf);
246         return (-1);
247     }
248
249     INFO ("amqp plugin: Successfully created exchange \"%s\" "
250             "with type \"%s\".",
251             conf->exchange, conf->exchange_type);
252
253     return (0);
254 } /* }}} int camqp_create_exchange */
255 #else
256 static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
257 {
258     amqp_exchange_declare_ok_t *ed_ret;
259     amqp_table_t argument_table;
260     struct amqp_table_entry_t_ argument_table_entries[1];
261
262     if (conf->exchange_type == NULL)
263         return (0);
264
265     /* Valid arguments: "auto_delete", "internal" */
266     argument_table.num_entries = STATIC_ARRAY_SIZE (argument_table_entries);
267     argument_table.entries = argument_table_entries;
268     argument_table_entries[0].key = amqp_cstring_bytes ("auto_delete");
269     argument_table_entries[0].value.kind = AMQP_FIELD_KIND_BOOLEAN;
270     argument_table_entries[0].value.value.boolean = 1;
271
272     ed_ret = amqp_exchange_declare (conf->connection,
273             /* channel     = */ CAMQP_CHANNEL,
274             /* exchange    = */ amqp_cstring_bytes (conf->exchange),
275             /* type        = */ amqp_cstring_bytes (conf->exchange_type),
276             /* passive     = */ 0,
277             /* durable     = */ 0,
278             /* arguments   = */ argument_table);
279     if ((ed_ret == NULL) && camqp_is_error (conf))
280     {
281         char errbuf[1024];
282         ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
283                 camqp_strerror (conf, errbuf, sizeof (errbuf)));
284         camqp_close_connection (conf);
285         return (-1);
286     }
287
288     INFO ("amqp plugin: Successfully created exchange \"%s\" "
289             "with type \"%s\".",
290             conf->exchange, conf->exchange_type);
291
292     return (0);
293 } /* }}} int camqp_create_exchange */
294 #endif
295
296 static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
297 {
298     amqp_queue_declare_ok_t *qd_ret;
299     amqp_basic_consume_ok_t *cm_ret;
300
301     qd_ret = amqp_queue_declare (conf->connection,
302             /* channel     = */ CAMQP_CHANNEL,
303             /* queue       = */ (conf->queue != NULL)
304             ? amqp_cstring_bytes (conf->queue)
305             : AMQP_EMPTY_BYTES,
306             /* passive     = */ 0,
307             /* durable     = */ 0,
308             /* exclusive   = */ 0,
309             /* auto_delete = */ 1,
310             /* arguments   = */ AMQP_EMPTY_TABLE);
311     if (qd_ret == NULL)
312     {
313         ERROR ("amqp plugin: amqp_queue_declare failed.");
314         camqp_close_connection (conf);
315         return (-1);
316     }
317
318     if (conf->queue == NULL)
319     {
320         conf->queue = camqp_bytes_cstring (&qd_ret->queue);
321         if (conf->queue == NULL)
322         {
323             ERROR ("amqp plugin: camqp_bytes_cstring failed.");
324             camqp_close_connection (conf);
325             return (-1);
326         }
327
328         INFO ("amqp plugin: Created queue \"%s\".", conf->queue);
329     }
330     DEBUG ("amqp plugin: Successfully created queue \"%s\".", conf->queue);
331
332     /* bind to an exchange */
333     if (conf->exchange != NULL)
334     {
335         amqp_queue_bind_ok_t *qb_ret;
336
337         assert (conf->queue != NULL);
338         qb_ret = amqp_queue_bind (conf->connection,
339                 /* channel     = */ CAMQP_CHANNEL,
340                 /* queue       = */ amqp_cstring_bytes (conf->queue),
341                 /* exchange    = */ amqp_cstring_bytes (conf->exchange),
342                 /* routing_key = */ (conf->routing_key != NULL)
343                 ? amqp_cstring_bytes (conf->routing_key)
344                 : AMQP_EMPTY_BYTES,
345                 /* arguments   = */ AMQP_EMPTY_TABLE);
346         if ((qb_ret == NULL) && camqp_is_error (conf))
347         {
348             char errbuf[1024];
349             ERROR ("amqp plugin: amqp_queue_bind failed: %s",
350                     camqp_strerror (conf, errbuf, sizeof (errbuf)));
351             camqp_close_connection (conf);
352             return (-1);
353         }
354
355         DEBUG ("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".",
356                 conf->queue, conf->exchange);
357     } /* if (conf->exchange != NULL) */
358
359     cm_ret = amqp_basic_consume (conf->connection,
360             /* channel      = */ CAMQP_CHANNEL,
361             /* queue        = */ amqp_cstring_bytes (conf->queue),
362             /* consumer_tag = */ AMQP_EMPTY_BYTES,
363             /* no_local     = */ 0,
364             /* no_ack       = */ 1,
365             /* exclusive    = */ 0,
366             /* arguments    = */ AMQP_EMPTY_TABLE
367         );
368     if ((cm_ret == NULL) && camqp_is_error (conf))
369     {
370         char errbuf[1024];
371         ERROR ("amqp plugin: amqp_basic_consume failed: %s",
372                     camqp_strerror (conf, errbuf, sizeof (errbuf)));
373         camqp_close_connection (conf);
374         return (-1);
375     }
376
377     return (0);
378 } /* }}} int camqp_setup_queue */
379
380 static int camqp_connect (camqp_config_t *conf) /* {{{ */
381 {
382     amqp_rpc_reply_t reply;
383     int sockfd;
384     int status;
385
386     if (conf->connection != NULL)
387         return (0);
388
389     conf->connection = amqp_new_connection ();
390     if (conf->connection == NULL)
391     {
392         ERROR ("amqp plugin: amqp_new_connection failed.");
393         return (ENOMEM);
394     }
395
396     sockfd = amqp_open_socket (CONF(conf, host), conf->port);
397     if (sockfd < 0)
398     {
399         char errbuf[1024];
400         status = (-1) * sockfd;
401         ERROR ("amqp plugin: amqp_open_socket failed: %s",
402                 sstrerror (status, errbuf, sizeof (errbuf)));
403         amqp_destroy_connection (conf->connection);
404         conf->connection = NULL;
405         return (status);
406     }
407     amqp_set_sockfd (conf->connection, sockfd);
408
409     reply = amqp_login (conf->connection, CONF(conf, vhost),
410             /* channel max = */      0,
411             /* frame max   = */ 131072,
412             /* heartbeat   = */      0,
413             /* authentication = */ AMQP_SASL_METHOD_PLAIN,
414             CONF(conf, user), CONF(conf, password));
415     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
416     {
417         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
418                 CONF(conf, vhost), CONF(conf, user));
419         amqp_destroy_connection (conf->connection);
420         close (sockfd);
421         conf->connection = NULL;
422         return (1);
423     }
424
425     amqp_channel_open (conf->connection, /* channel = */ 1);
426     /* FIXME: Is checking "reply.reply_type" really correct here? How does
427      * it get set? --octo */
428     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
429     {
430         ERROR ("amqp plugin: amqp_channel_open failed.");
431         amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
432         amqp_destroy_connection (conf->connection);
433         close(sockfd);
434         conf->connection = NULL;
435         return (1);
436     }
437
438     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
439             "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
440
441     status = camqp_create_exchange (conf);
442     if (status != 0)
443         return (status);
444
445     if (!conf->publish)
446         return (camqp_setup_queue (conf));
447     return (0);
448 } /* }}} int camqp_connect */
449
450 static int camqp_shutdown (void) /* {{{ */
451 {
452     size_t i;
453
454     DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
455             subscriber_threads_num);
456
457     subscriber_threads_running = 0;
458     for (i = 0; i < subscriber_threads_num; i++)
459     {
460         /* FIXME: Sending a signal is not very elegant here. Maybe find out how
461          * to use a timeout in the thread and check for the variable in regular
462          * intervals. */
463         pthread_kill (subscriber_threads[i], SIGTERM);
464         pthread_join (subscriber_threads[i], /* retval = */ NULL);
465     }
466
467     subscriber_threads_num = 0;
468     sfree (subscriber_threads);
469
470     DEBUG ("amqp plugin: All subscriber threads exited.");
471
472     return (0);
473 } /* }}} int camqp_shutdown */
474
475 /*
476  * Subscribing code
477  */
478 static int camqp_read_body (camqp_config_t *conf, /* {{{ */
479         size_t body_size, const char *content_type)
480 {
481     char body[body_size + 1];
482     char *body_ptr;
483     size_t received;
484     amqp_frame_t frame;
485     int status;
486
487     memset (body, 0, sizeof (body));
488     body_ptr = &body[0];
489     received = 0;
490
491     while (received < body_size)
492     {
493         status = amqp_simple_wait_frame (conf->connection, &frame);
494         if (status < 0)
495         {
496             char errbuf[1024];
497             status = (-1) * status;
498             ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
499                     sstrerror (status, errbuf, sizeof (errbuf)));
500             camqp_close_connection (conf);
501             return (status);
502         }
503
504         if (frame.frame_type != AMQP_FRAME_BODY)
505         {
506             NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
507                     frame.frame_type);
508             return (-1);
509         }
510
511         if ((body_size - received) < frame.payload.body_fragment.len)
512         {
513             WARNING ("amqp plugin: Body is larger than indicated by header.");
514             return (-1);
515         }
516
517         memcpy (body_ptr, frame.payload.body_fragment.bytes,
518                 frame.payload.body_fragment.len);
519         body_ptr += frame.payload.body_fragment.len;
520         received += frame.payload.body_fragment.len;
521     } /* while (received < body_size) */
522
523     if (strcasecmp ("text/collectd", content_type) == 0)
524     {
525         status = handle_putval (stderr, body);
526         if (status != 0)
527             ERROR ("amqp plugin: handle_putval failed with status %i.",
528                     status);
529         return (status);
530     }
531     else if (strcasecmp ("application/json", content_type) == 0)
532     {
533         ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not "
534                 "been implemented yet. FIXME!");
535         return (0);
536     }
537     else
538     {
539         ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".",
540                 content_type);
541         return (EINVAL);
542     }
543
544     /* not reached */
545     return (0);
546 } /* }}} int camqp_read_body */
547
548 static int camqp_read_header (camqp_config_t *conf) /* {{{ */
549 {
550     int status;
551     amqp_frame_t frame;
552     amqp_basic_properties_t *properties;
553     char *content_type;
554
555     status = amqp_simple_wait_frame (conf->connection, &frame);
556     if (status < 0)
557     {
558         char errbuf[1024];
559         status = (-1) * status;
560         ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
561                     sstrerror (status, errbuf, sizeof (errbuf)));
562         camqp_close_connection (conf);
563         return (status);
564     }
565
566     if (frame.frame_type != AMQP_FRAME_HEADER)
567     {
568         NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
569                 frame.frame_type);
570         return (-1);
571     }
572
573     properties = frame.payload.properties.decoded;
574     content_type = camqp_bytes_cstring (&properties->content_type);
575     if (content_type == NULL)
576     {
577         ERROR ("amqp plugin: Unable to determine content type.");
578         return (-1);
579     }
580
581     status = camqp_read_body (conf,
582             (size_t) frame.payload.properties.body_size,
583             content_type);
584
585     sfree (content_type);
586     return (status);
587 } /* }}} int camqp_read_header */
588
589 static void *camqp_subscribe_thread (void *user_data) /* {{{ */
590 {
591     camqp_config_t *conf = user_data;
592     int status;
593
594     while (subscriber_threads_running)
595     {
596         amqp_frame_t frame;
597
598         status = camqp_connect (conf);
599         if (status != 0)
600         {
601             struct timespec ts_interval;
602             ERROR ("amqp plugin: camqp_connect failed. "
603                     "Will sleep for %.3f seconds.",
604                     CDTIME_T_TO_DOUBLE (interval_g));
605             CDTIME_T_TO_TIMESPEC (interval_g, &ts_interval);
606             nanosleep (&ts_interval, /* remaining = */ NULL);
607             continue;
608         }
609
610         status = amqp_simple_wait_frame (conf->connection, &frame);
611         if (status < 0)
612         {
613             struct timespec ts_interval;
614             ERROR ("amqp plugin: amqp_simple_wait_frame failed. "
615                     "Will sleep for %.3f seconds.",
616                     CDTIME_T_TO_DOUBLE (interval_g));
617             camqp_close_connection (conf);
618             CDTIME_T_TO_TIMESPEC (interval_g, &ts_interval);
619             nanosleep (&ts_interval, /* remaining = */ NULL);
620             continue;
621         }
622
623         if (frame.frame_type != AMQP_FRAME_METHOD)
624         {
625             DEBUG ("amqp plugin: Unexpected frame type: %#"PRIx8,
626                     frame.frame_type);
627             continue;
628         }
629
630         if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
631         {
632             DEBUG ("amqp plugin: Unexpected method id: %#"PRIx32,
633                     frame.payload.method.id);
634             continue;
635         }
636
637         status = camqp_read_header (conf);
638
639         amqp_maybe_release_buffers (conf->connection);
640     } /* while (subscriber_threads_running) */
641
642     camqp_config_free (conf);
643     pthread_exit (NULL);
644 } /* }}} void *camqp_subscribe_thread */
645
646 static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
647 {
648     int status;
649     pthread_t *tmp;
650
651     tmp = realloc (subscriber_threads,
652             sizeof (*subscriber_threads) * (subscriber_threads_num + 1));
653     if (tmp == NULL)
654     {
655         ERROR ("amqp plugin: realloc failed.");
656         camqp_config_free (conf);
657         return (ENOMEM);
658     }
659     subscriber_threads = tmp;
660     tmp = subscriber_threads + subscriber_threads_num;
661     memset (tmp, 0, sizeof (*tmp));
662
663     status = pthread_create (tmp, /* attr = */ NULL,
664             camqp_subscribe_thread, conf);
665     if (status != 0)
666     {
667         char errbuf[1024];
668         ERROR ("amqp plugin: pthread_create failed: %s",
669                 sstrerror (status, errbuf, sizeof (errbuf)));
670         camqp_config_free (conf);
671         return (status);
672     }
673
674     subscriber_threads_num++;
675
676     return (0);
677 } /* }}} int camqp_subscribe_init */
678
679 /*
680  * Publishing code
681  */
682 /* XXX: You must hold "conf->lock" when calling this function! */
683 static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
684         const char *buffer, const char *routing_key)
685 {
686     amqp_basic_properties_t props;
687     int status;
688
689     status = camqp_connect (conf);
690     if (status != 0)
691         return (status);
692
693     memset (&props, 0, sizeof (props));
694     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
695         | AMQP_BASIC_DELIVERY_MODE_FLAG
696         | AMQP_BASIC_APP_ID_FLAG;
697     if (conf->format == CAMQP_FORMAT_COMMAND)
698         props.content_type = amqp_cstring_bytes("text/collectd");
699     else if (conf->format == CAMQP_FORMAT_JSON)
700         props.content_type = amqp_cstring_bytes("application/json");
701     else
702         assert (23 == 42);
703     props.delivery_mode = conf->delivery_mode;
704     props.app_id = amqp_cstring_bytes("collectd");
705
706     status = amqp_basic_publish(conf->connection,
707                 /* channel = */ 1,
708                 amqp_cstring_bytes(CONF(conf, exchange)),
709                 amqp_cstring_bytes (routing_key),
710                 /* mandatory = */ 0,
711                 /* immediate = */ 0,
712                 &props,
713                 amqp_cstring_bytes(buffer));
714     if (status != 0)
715     {
716         ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
717                 status);
718         camqp_close_connection (conf);
719     }
720
721     return (status);
722 } /* }}} int camqp_write_locked */
723
724 static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
725         user_data_t *user_data)
726 {
727     camqp_config_t *conf = user_data->data;
728     char routing_key[6 * DATA_MAX_NAME_LEN];
729     char buffer[4096];
730     int status;
731
732     if ((ds == NULL) || (vl == NULL) || (conf == NULL))
733         return (EINVAL);
734
735     memset (buffer, 0, sizeof (buffer));
736
737     if (conf->routing_key != NULL)
738     {
739         sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
740     }
741     else
742     {
743         size_t i;
744         ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s",
745                 vl->host,
746                 vl->plugin, vl->plugin_instance,
747                 vl->type, vl->type_instance);
748
749         /* Switch slashes (the only character forbidden by collectd) and dots
750          * (the separation character used by AMQP). */
751         for (i = 0; routing_key[i] != 0; i++)
752         {
753             if (routing_key[i] == '.')
754                 routing_key[i] = '/';
755             else if (routing_key[i] == '/')
756                 routing_key[i] = '.';
757         }
758     }
759
760     if (conf->format == CAMQP_FORMAT_COMMAND)
761     {
762         status = create_putval (buffer, sizeof (buffer), ds, vl);
763         if (status != 0)
764         {
765             ERROR ("amqp plugin: create_putval failed with status %i.",
766                     status);
767             return (status);
768         }
769     }
770     else if (conf->format == CAMQP_FORMAT_JSON)
771     {
772         size_t bfree = sizeof (buffer);
773         size_t bfill = 0;
774
775         format_json_initialize (buffer, &bfill, &bfree);
776         format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
777         format_json_finalize (buffer, &bfill, &bfree);
778     }
779     else
780     {
781         ERROR ("amqp plugin: Invalid format (%i).", conf->format);
782         return (-1);
783     }
784
785     pthread_mutex_lock (&conf->lock);
786     status = camqp_write_locked (conf, buffer, routing_key);
787     pthread_mutex_unlock (&conf->lock);
788
789     return (status);
790 } /* }}} int camqp_write */
791
792 /*
793  * Config handling
794  */
795 static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */
796         camqp_config_t *conf)
797 {
798     char *string;
799     int status;
800
801     string = NULL;
802     status = cf_util_get_string (ci, &string);
803     if (status != 0)
804         return (status);
805
806     assert (string != NULL);
807     if (strcasecmp ("Command", string) == 0)
808         conf->format = CAMQP_FORMAT_COMMAND;
809     else if (strcasecmp ("JSON", string) == 0)
810         conf->format = CAMQP_FORMAT_JSON;
811     else
812     {
813         WARNING ("amqp plugin: Invalid format string: %s",
814                 string);
815     }
816
817     free (string);
818
819     return (0);
820 } /* }}} int config_set_string */
821
822 static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
823         _Bool publish)
824 {
825     camqp_config_t *conf;
826     int status;
827     int i;
828
829     conf = malloc (sizeof (*conf));
830     if (conf == NULL)
831     {
832         ERROR ("amqp plugin: malloc failed.");
833         return (ENOMEM);
834     }
835
836     /* Initialize "conf" {{{ */
837     memset (conf, 0, sizeof (*conf));
838     conf->publish = publish;
839     conf->name = NULL;
840     conf->format = CAMQP_FORMAT_COMMAND;
841     conf->host = NULL;
842     conf->port = 5672;
843     conf->vhost = NULL;
844     conf->user = NULL;
845     conf->password = NULL;
846     conf->exchange = NULL;
847     conf->routing_key = NULL;
848     /* publish only */
849     conf->delivery_mode = CAMQP_DM_VOLATILE;
850     conf->store_rates = 0;
851     /* subscribe only */
852     conf->exchange_type = NULL;
853     conf->queue = NULL;
854     /* general */
855     conf->connection = NULL;
856     pthread_mutex_init (&conf->lock, /* attr = */ NULL);
857     /* }}} */
858
859     status = cf_util_get_string (ci, &conf->name);
860     if (status != 0)
861     {
862         sfree (conf);
863         return (status);
864     }
865
866     for (i = 0; i < ci->children_num; i++)
867     {
868         oconfig_item_t *child = ci->children + i;
869
870         if (strcasecmp ("Host", child->key) == 0)
871             status = cf_util_get_string (child, &conf->host);
872         else if (strcasecmp ("Port", child->key) == 0)
873         {
874             status = cf_util_get_port_number (child);
875             if (status > 0)
876             {
877                 conf->port = status;
878                 status = 0;
879             }
880         }
881         else if (strcasecmp ("VHost", child->key) == 0)
882             status = cf_util_get_string (child, &conf->vhost);
883         else if (strcasecmp ("User", child->key) == 0)
884             status = cf_util_get_string (child, &conf->user);
885         else if (strcasecmp ("Password", child->key) == 0)
886             status = cf_util_get_string (child, &conf->password);
887         else if (strcasecmp ("Exchange", child->key) == 0)
888             status = cf_util_get_string (child, &conf->exchange);
889         else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
890             status = cf_util_get_string (child, &conf->exchange_type);
891         else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
892             status = cf_util_get_string (child, &conf->queue);
893         else if (strcasecmp ("RoutingKey", child->key) == 0)
894             status = cf_util_get_string (child, &conf->routing_key);
895         else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
896         {
897             _Bool tmp = 0;
898             status = cf_util_get_boolean (child, &tmp);
899             if (tmp)
900                 conf->delivery_mode = CAMQP_DM_PERSISTENT;
901             else
902                 conf->delivery_mode = CAMQP_DM_VOLATILE;
903         }
904         else if ((strcasecmp ("StoreRates", child->key) == 0) && publish)
905             status = cf_util_get_boolean (child, &conf->store_rates);
906         else if ((strcasecmp ("Format", child->key) == 0) && publish)
907             status = camqp_config_set_format (child, conf);
908         else
909             WARNING ("amqp plugin: Ignoring unknown "
910                     "configuration option \"%s\".", child->key);
911
912         if (status != 0)
913             break;
914     } /* for (i = 0; i < ci->children_num; i++) */
915
916     if ((status == 0) && (conf->exchange == NULL))
917     {
918         if (conf->exchange_type != NULL)
919             WARNING ("amqp plugin: The option \"ExchangeType\" was given "
920                     "without the \"Exchange\" option. It will be ignored.");
921
922         if (!publish && (conf->routing_key != NULL))
923             WARNING ("amqp plugin: The option \"RoutingKey\" was given "
924                     "without the \"Exchange\" option. It will be ignored.");
925
926     }
927
928     if (status != 0)
929     {
930         camqp_config_free (conf);
931         return (status);
932     }
933
934     if (conf->exchange != NULL)
935     {
936         DEBUG ("amqp plugin: camqp_config_connection: exchange = %s;",
937                 conf->exchange);
938     }
939
940     if (publish)
941     {
942         char cbname[128];
943         user_data_t ud = { conf, camqp_config_free };
944
945         ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
946
947         status = plugin_register_write (cbname, camqp_write, &ud);
948         if (status != 0)
949         {
950             camqp_config_free (conf);
951             return (status);
952         }
953     }
954     else
955     {
956         status = camqp_subscribe_init (conf);
957         if (status != 0)
958         {
959             camqp_config_free (conf);
960             return (status);
961         }
962     }
963
964     return (0);
965 } /* }}} int camqp_config_connection */
966
967 static int camqp_config (oconfig_item_t *ci) /* {{{ */
968 {
969     int i;
970
971     for (i = 0; i < ci->children_num; i++)
972     {
973         oconfig_item_t *child = ci->children + i;
974
975         if (strcasecmp ("Publish", child->key) == 0)
976             camqp_config_connection (child, /* publish = */ 1);
977         else if (strcasecmp ("Subscribe", child->key) == 0)
978             camqp_config_connection (child, /* publish = */ 0);
979         else
980             WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
981                     child->key);
982     } /* for (ci->children_num) */
983
984     return (0);
985 } /* }}} int camqp_config */
986
987 void module_register (void)
988 {
989     plugin_register_complex_config ("amqp", camqp_config);
990     plugin_register_shutdown ("amqp", camqp_shutdown);
991 } /* void module_register */
992
993 /* vim: set sw=4 sts=4 et fdm=marker : */