amqp plugin: Document the lock required to hold when calling "camqp_write_locked".
[collectd.git] / src / amqp.c
1 /**
2  * collectd - src/amqp.c
3  * Copyright (C) 2009  Sebastien Pahl
4  * Copyright (C) 2010  Florian Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Sebastien Pahl <sebastien.pahl at dotcloud.com>
26  *   Florian Forster <octo at verplant.org>
27  **/
28
29 #include "collectd.h"
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_cmd_putval.h"
33 #include "utils_format_json.h"
34
35 #include <pthread.h>
36
37 #include <amqp.h>
38 #include <amqp_framing.h>
39
40 /* Defines for the delivery mode. I have no idea why they're not defined by the
41  * library.. */
42 #define CAMQP_DM_VOLATILE   1
43 #define CAMQP_DM_PERSISTENT 2
44
45 #define CAMQP_FORMAT_COMMAND 1
46 #define CAMQP_FORMAT_JSON    2
47
48 #define CAMQP_CHANNEL 1
49
50 /*
51  * Data types
52  */
53 struct camqp_config_s
54 {
55     _Bool   publish;
56     char   *name;
57
58     char   *host;
59     int     port;
60     char   *vhost;
61     char   *user;
62     char   *password;
63
64     char   *exchange;
65     char   *routing_key;
66
67     /* publish only */
68     uint8_t delivery_mode;
69     _Bool   store_rates;
70     int     format;
71
72     /* subscribe only */
73     char   *exchange_type;
74     char   *queue;
75
76     amqp_connection_state_t connection;
77     pthread_mutex_t lock;
78 };
79 typedef struct camqp_config_s camqp_config_t;
80
81 /*
82  * Global variables
83  */
84 static const char *def_host       = "localhost";
85 static const char *def_vhost      = "/";
86 static const char *def_user       = "guest";
87 static const char *def_password   = "guest";
88 static const char *def_exchange   = "amq.fanout";
89
90 static pthread_t *subscriber_threads     = NULL;
91 static size_t     subscriber_threads_num = 0;
92 static _Bool      subscriber_threads_running = 1;
93
94 #define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
95
96 /*
97  * Functions
98  */
99 static void camqp_close_connection (camqp_config_t *conf) /* {{{ */
100 {
101     int sockfd;
102
103     if ((conf == NULL) || (conf->connection == NULL))
104         return;
105
106     sockfd = amqp_get_sockfd (conf->connection);
107     amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS);
108     amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
109     amqp_destroy_connection (conf->connection);
110     close (sockfd);
111     conf->connection = NULL;
112 } /* }}} void camqp_close_connection */
113
114 static void camqp_config_free (void *ptr) /* {{{ */
115 {
116     camqp_config_t *conf = ptr;
117
118     if (conf == NULL)
119         return;
120
121     camqp_close_connection (conf);
122
123     sfree (conf->name);
124     sfree (conf->host);
125     sfree (conf->vhost);
126     sfree (conf->user);
127     sfree (conf->password);
128     sfree (conf->exchange);
129     sfree (conf->exchange_type);
130     sfree (conf->queue);
131     sfree (conf->routing_key);
132
133     sfree (conf);
134 } /* }}} void camqp_config_free */
135
136 static char *camqp_bytes_cstring (amqp_bytes_t *in) /* {{{ */
137 {
138     char *ret;
139
140     if ((in == NULL) || (in->bytes == NULL))
141         return (NULL);
142
143     ret = malloc (in->len + 1);
144     if (ret == NULL)
145         return (NULL);
146
147     memcpy (ret, in->bytes, in->len);
148     ret[in->len] = 0;
149
150     return (ret);
151 } /* }}} char *camqp_bytes_cstring */
152
153 static _Bool camqp_is_error (camqp_config_t *conf) /* {{{ */
154 {
155     amqp_rpc_reply_t r;
156
157     r = amqp_get_rpc_reply (conf->connection);
158     if (r.reply_type == AMQP_RESPONSE_NORMAL)
159         return (0);
160
161     return (1);
162 } /* }}} _Bool camqp_is_error */
163
164 static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
165         char *buffer, size_t buffer_size)
166 {
167     amqp_rpc_reply_t r;
168
169     r = amqp_get_rpc_reply (conf->connection);
170     switch (r.reply_type)
171     {
172         case AMQP_RESPONSE_NORMAL:
173             sstrncpy (buffer, "Success", sizeof (buffer));
174             break;
175
176         case AMQP_RESPONSE_NONE:
177             sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer));
178             break;
179
180         case AMQP_RESPONSE_LIBRARY_EXCEPTION:
181             if (r.library_errno)
182                 return (sstrerror (r.library_errno, buffer, buffer_size));
183             else
184                 sstrncpy (buffer, "End of stream", sizeof (buffer));
185             break;
186
187         case AMQP_RESPONSE_SERVER_EXCEPTION:
188             if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD)
189             {
190                 amqp_connection_close_t *m = r.reply.decoded;
191                 char *tmp = camqp_bytes_cstring (&m->reply_text);
192                 ssnprintf (buffer, buffer_size, "Server connection error %d: %s",
193                         m->reply_code, tmp);
194                 sfree (tmp);
195             }
196             else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
197             {
198                 amqp_channel_close_t *m = r.reply.decoded;
199                 char *tmp = camqp_bytes_cstring (&m->reply_text);
200                 ssnprintf (buffer, buffer_size, "Server channel error %d: %s",
201                         m->reply_code, tmp);
202                 sfree (tmp);
203             }
204             else
205             {
206                 ssnprintf (buffer, buffer_size, "Server error method %#"PRIx32,
207                         r.reply.id);
208             }
209             break;
210
211         default:
212             ssnprintf (buffer, buffer_size, "Unknown reply type %i",
213                     (int) r.reply_type);
214     }
215
216     return (buffer);
217 } /* }}} char *camqp_strerror */
218
219 static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
220 {
221     amqp_exchange_declare_ok_t *ed_ret;
222
223     if (conf->exchange_type == NULL)
224         return (0);
225
226     ed_ret = amqp_exchange_declare (conf->connection,
227             /* channel     = */ CAMQP_CHANNEL,
228             /* exchange    = */ amqp_cstring_bytes (conf->exchange),
229             /* type        = */ amqp_cstring_bytes (conf->exchange_type),
230             /* passive     = */ 0,
231             /* durable     = */ 0,
232             /* auto_delete = */ 1,
233             /* arguments   = */ AMQP_EMPTY_TABLE);
234     if ((ed_ret == NULL) && camqp_is_error (conf))
235     {
236         char errbuf[1024];
237         ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
238                 camqp_strerror (conf, errbuf, sizeof (errbuf)));
239         camqp_close_connection (conf);
240         return (-1);
241     }
242
243     INFO ("amqp plugin: Successfully created exchange \"%s\" "
244             "with type \"%s\".",
245             conf->exchange, conf->exchange_type);
246
247     return (0);
248 } /* }}} int camqp_create_exchange */
249
250 static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
251 {
252     amqp_queue_declare_ok_t *qd_ret;
253     amqp_basic_consume_ok_t *cm_ret;
254
255     qd_ret = amqp_queue_declare (conf->connection,
256             /* channel     = */ CAMQP_CHANNEL,
257             /* queue       = */ (conf->queue != NULL)
258             ? amqp_cstring_bytes (conf->queue)
259             : AMQP_EMPTY_BYTES,
260             /* passive     = */ 0,
261             /* durable     = */ 0,
262             /* exclusive   = */ 0,
263             /* auto_delete = */ 1,
264             /* arguments   = */ AMQP_EMPTY_TABLE);
265     if (qd_ret == NULL)
266     {
267         ERROR ("amqp plugin: amqp_queue_declare failed.");
268         camqp_close_connection (conf);
269         return (-1);
270     }
271
272     if (conf->queue == NULL)
273     {
274         conf->queue = camqp_bytes_cstring (&qd_ret->queue);
275         if (conf->queue == NULL)
276         {
277             ERROR ("amqp plugin: camqp_bytes_cstring failed.");
278             camqp_close_connection (conf);
279             return (-1);
280         }
281
282         INFO ("amqp plugin: Created queue \"%s\".", conf->queue);
283     }
284     DEBUG ("amqp plugin: Successfully created queue \"%s\".", conf->queue);
285
286     /* bind to an exchange */
287     if (conf->exchange != NULL)
288     {
289         amqp_queue_bind_ok_t *qb_ret;
290
291         assert (conf->queue != NULL);
292         qb_ret = amqp_queue_bind (conf->connection,
293                 /* channel     = */ CAMQP_CHANNEL,
294                 /* queue       = */ amqp_cstring_bytes (conf->queue),
295                 /* exchange    = */ amqp_cstring_bytes (conf->exchange),
296                 /* routing_key = */ (conf->routing_key != NULL)
297                 ? amqp_cstring_bytes (conf->routing_key)
298                 : AMQP_EMPTY_BYTES,
299                 /* arguments   = */ AMQP_EMPTY_TABLE);
300         if ((qb_ret == NULL) && camqp_is_error (conf))
301         {
302             char errbuf[1024];
303             ERROR ("amqp plugin: amqp_queue_bind failed: %s",
304                     camqp_strerror (conf, errbuf, sizeof (errbuf)));
305             camqp_close_connection (conf);
306             return (-1);
307         }
308
309         DEBUG ("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".",
310                 conf->queue, conf->exchange);
311     } /* if (conf->exchange != NULL) */
312
313     cm_ret = amqp_basic_consume (conf->connection,
314             /* channel      = */ CAMQP_CHANNEL,
315             /* queue        = */ amqp_cstring_bytes (conf->queue),
316             /* consumer_tag = */ AMQP_EMPTY_BYTES,
317             /* no_local     = */ 0,
318             /* no_ack       = */ 1,
319             /* exclusive    = */ 0);
320     if ((cm_ret == NULL) && camqp_is_error (conf))
321     {
322         char errbuf[1024];
323         ERROR ("amqp plugin: amqp_basic_consume failed: %s",
324                     camqp_strerror (conf, errbuf, sizeof (errbuf)));
325         camqp_close_connection (conf);
326         return (-1);
327     }
328
329     return (0);
330 } /* }}} int camqp_setup_queue */
331
332 static int camqp_connect (camqp_config_t *conf) /* {{{ */
333 {
334     amqp_rpc_reply_t reply;
335     int sockfd;
336     int status;
337
338     if (conf->connection != NULL)
339         return (0);
340
341     conf->connection = amqp_new_connection ();
342     if (conf->connection == NULL)
343     {
344         ERROR ("amqp plugin: amqp_new_connection failed.");
345         return (ENOMEM);
346     }
347
348     sockfd = amqp_open_socket (CONF(conf, host), conf->port);
349     if (sockfd < 0)
350     {
351         char errbuf[1024];
352         status = (-1) * sockfd;
353         ERROR ("amqp plugin: amqp_open_socket failed: %s",
354                 sstrerror (status, errbuf, sizeof (errbuf)));
355         amqp_destroy_connection (conf->connection);
356         conf->connection = NULL;
357         return (status);
358     }
359     amqp_set_sockfd (conf->connection, sockfd);
360
361     reply = amqp_login (conf->connection, CONF(conf, vhost),
362             /* channel max = */      0,
363             /* frame max   = */ 131072,
364             /* heartbeat   = */      0,
365             /* authentication = */ AMQP_SASL_METHOD_PLAIN,
366             CONF(conf, user), CONF(conf, password));
367     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
368     {
369         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
370                 CONF(conf, vhost), CONF(conf, user));
371         amqp_destroy_connection (conf->connection);
372         close (sockfd);
373         conf->connection = NULL;
374         return (1);
375     }
376
377     amqp_channel_open (conf->connection, /* channel = */ 1);
378     /* FIXME: Is checking "reply.reply_type" really correct here? How does
379      * it get set? --octo */
380     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
381     {
382         ERROR ("amqp plugin: amqp_channel_open failed.");
383         amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
384         amqp_destroy_connection (conf->connection);
385         close(sockfd);
386         conf->connection = NULL;
387         return (1);
388     }
389
390     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
391             "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
392
393     status = camqp_create_exchange (conf);
394     if (status != 0)
395         return (status);
396
397     if (!conf->publish)
398         return (camqp_setup_queue (conf));
399     return (0);
400 } /* }}} int camqp_connect */
401
402 static int camqp_shutdown (void) /* {{{ */
403 {
404     size_t i;
405
406     DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
407             subscriber_threads_num);
408
409     subscriber_threads_running = 0;
410     for (i = 0; i < subscriber_threads_num; i++)
411     {
412         /* FIXME: Sending a signal is not very elegant here. Maybe find out how
413          * to use a timeout in the thread and check for the variable in regular
414          * intervals. */
415         pthread_kill (subscriber_threads[i], SIGTERM);
416         pthread_join (subscriber_threads[i], /* retval = */ NULL);
417     }
418
419     subscriber_threads_num = 0;
420     sfree (subscriber_threads);
421
422     DEBUG ("amqp plugin: All subscriber threads exited.");
423
424     return (0);
425 } /* }}} int camqp_shutdown */
426
427 /*
428  * Subscribing code
429  */
430 static int camqp_read_body (camqp_config_t *conf, /* {{{ */
431         size_t body_size, const char *content_type)
432 {
433     char body[body_size + 1];
434     char *body_ptr;
435     size_t received;
436     amqp_frame_t frame;
437     int status;
438
439     memset (body, 0, sizeof (body));
440     body_ptr = &body[0];
441     received = 0;
442
443     while (received < body_size)
444     {
445         status = amqp_simple_wait_frame (conf->connection, &frame);
446         if (status < 0)
447         {
448             char errbuf[1024];
449             status = (-1) * status;
450             ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
451                     sstrerror (status, errbuf, sizeof (errbuf)));
452             camqp_close_connection (conf);
453             return (status);
454         }
455
456         if (frame.frame_type != AMQP_FRAME_BODY)
457         {
458             NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
459                     frame.frame_type);
460             return (-1);
461         }
462
463         if ((body_size - received) < frame.payload.body_fragment.len)
464         {
465             WARNING ("amqp plugin: Body is larger than indicated by header.");
466             return (-1);
467         }
468
469         memcpy (body_ptr, frame.payload.body_fragment.bytes,
470                 frame.payload.body_fragment.len);
471         body_ptr += frame.payload.body_fragment.len;
472         received += frame.payload.body_fragment.len;
473     } /* while (received < body_size) */
474
475     if (strcasecmp ("text/collectd", content_type) == 0)
476     {
477         status = handle_putval (stderr, body);
478         if (status != 0)
479             ERROR ("amqp plugin: handle_putval failed with status %i.",
480                     status);
481         return (status);
482     }
483     else if (strcasecmp ("application/json", content_type) == 0)
484     {
485         ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not "
486                 "been implemented yet. FIXME!");
487         return (0);
488     }
489     else
490     {
491         ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".",
492                 content_type);
493         return (EINVAL);
494     }
495
496     /* not reached */
497     return (0);
498 } /* }}} int camqp_read_body */
499
500 static int camqp_read_header (camqp_config_t *conf) /* {{{ */
501 {
502     int status;
503     amqp_frame_t frame;
504     amqp_basic_properties_t *properties;
505     char *content_type;
506
507     status = amqp_simple_wait_frame (conf->connection, &frame);
508     if (status < 0)
509     {
510         char errbuf[1024];
511         status = (-1) * status;
512         ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
513                     sstrerror (status, errbuf, sizeof (errbuf)));
514         camqp_close_connection (conf);
515         return (status);
516     }
517
518     if (frame.frame_type != AMQP_FRAME_HEADER)
519     {
520         NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
521                 frame.frame_type);
522         return (-1);
523     }
524
525     properties = frame.payload.properties.decoded;
526     content_type = camqp_bytes_cstring (&properties->content_type);
527     if (content_type == NULL)
528     {
529         ERROR ("amqp plugin: Unable to determine content type.");
530         return (-1);
531     }
532
533     status = camqp_read_body (conf,
534             (size_t) frame.payload.properties.body_size,
535             content_type);
536
537     sfree (content_type);
538     return (status);
539 } /* }}} int camqp_read_header */
540
541 static void *camqp_subscribe_thread (void *user_data) /* {{{ */
542 {
543     camqp_config_t *conf = user_data;
544     int status;
545
546     while (subscriber_threads_running)
547     {
548         amqp_frame_t frame;
549
550         status = camqp_connect (conf);
551         if (status != 0)
552         {
553             ERROR ("amqp plugin: camqp_connect failed. "
554                     "Will sleep for %i seconds.", interval_g);
555             sleep (interval_g);
556             continue;
557         }
558
559         status = amqp_simple_wait_frame (conf->connection, &frame);
560         if (status < 0)
561         {
562             ERROR ("amqp plugin: amqp_simple_wait_frame failed. "
563                     "Will sleep for %i seconds.", interval_g);
564             camqp_close_connection (conf);
565             sleep (interval_g);
566             continue;
567         }
568
569         if (frame.frame_type != AMQP_FRAME_METHOD)
570         {
571             DEBUG ("amqp plugin: Unexpected frame type: %#"PRIx8,
572                     frame.frame_type);
573             continue;
574         }
575
576         if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
577         {
578             DEBUG ("amqp plugin: Unexpected method id: %#"PRIx32,
579                     frame.payload.method.id);
580             continue;
581         }
582
583         status = camqp_read_header (conf);
584
585         amqp_maybe_release_buffers (conf->connection);
586     } /* while (subscriber_threads_running) */
587
588     camqp_config_free (conf);
589     pthread_exit (NULL);
590 } /* }}} void *camqp_subscribe_thread */
591
592 static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
593 {
594     int status;
595     pthread_t *tmp;
596
597     tmp = realloc (subscriber_threads,
598             sizeof (*subscriber_threads) * (subscriber_threads_num + 1));
599     if (tmp == NULL)
600     {
601         ERROR ("amqp plugin: realloc failed.");
602         camqp_config_free (conf);
603         return (ENOMEM);
604     }
605     subscriber_threads = tmp;
606     tmp = subscriber_threads + subscriber_threads_num;
607     memset (tmp, 0, sizeof (*tmp));
608
609     status = pthread_create (tmp, /* attr = */ NULL,
610             camqp_subscribe_thread, conf);
611     if (status != 0)
612     {
613         char errbuf[1024];
614         ERROR ("amqp plugin: pthread_create failed: %s",
615                 sstrerror (status, errbuf, sizeof (errbuf)));
616         camqp_config_free (conf);
617         return (status);
618     }
619
620     subscriber_threads_num++;
621
622     return (0);
623 } /* }}} int camqp_subscribe_init */
624
625 /*
626  * Publishing code
627  */
628 /* XXX: You must hold "conf->lock" when calling this function! */
629 static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
630         const char *buffer, const char *routing_key)
631 {
632     amqp_basic_properties_t props;
633     int status;
634
635     status = camqp_connect (conf);
636     if (status != 0)
637         return (status);
638
639     memset (&props, 0, sizeof (props));
640     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
641         | AMQP_BASIC_DELIVERY_MODE_FLAG
642         | AMQP_BASIC_APP_ID_FLAG;
643     if (conf->format == CAMQP_FORMAT_COMMAND)
644         props.content_type = amqp_cstring_bytes("text/collectd");
645     else if (conf->format == CAMQP_FORMAT_JSON)
646         props.content_type = amqp_cstring_bytes("application/json");
647     else
648         assert (23 == 42);
649     props.delivery_mode = conf->delivery_mode;
650     props.app_id = amqp_cstring_bytes("collectd");
651
652     status = amqp_basic_publish(conf->connection,
653                 /* channel = */ 1,
654                 amqp_cstring_bytes(CONF(conf, exchange)),
655                 amqp_cstring_bytes (routing_key),
656                 /* mandatory = */ 0,
657                 /* immediate = */ 0,
658                 &props,
659                 amqp_cstring_bytes(buffer));
660     if (status != 0)
661     {
662         ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
663                 status);
664         camqp_close_connection (conf);
665     }
666
667     return (status);
668 } /* }}} int camqp_write_locked */
669
670 static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
671         user_data_t *user_data)
672 {
673     camqp_config_t *conf = user_data->data;
674     char routing_key[6 * DATA_MAX_NAME_LEN];
675     char buffer[4096];
676     int status;
677
678     if ((ds == NULL) || (vl == NULL) || (conf == NULL))
679         return (EINVAL);
680
681     memset (buffer, 0, sizeof (buffer));
682
683     if (conf->routing_key != NULL)
684     {
685         sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
686     }
687     else
688     {
689         size_t i;
690         ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s",
691                 vl->host,
692                 vl->plugin, vl->plugin_instance,
693                 vl->type, vl->type_instance);
694
695         /* Switch slashes (the only character forbidden by collectd) and dots
696          * (the separation character used by AMQP). */
697         for (i = 0; routing_key[i] != 0; i++)
698         {
699             if (routing_key[i] == '.')
700                 routing_key[i] = '/';
701             else if (routing_key[i] == '/')
702                 routing_key[i] = '.';
703         }
704     }
705
706     if (conf->format == CAMQP_FORMAT_COMMAND)
707     {
708         status = create_putval (buffer, sizeof (buffer), ds, vl);
709         if (status != 0)
710         {
711             ERROR ("amqp plugin: create_putval failed with status %i.",
712                     status);
713             return (status);
714         }
715     }
716     else if (conf->format == CAMQP_FORMAT_JSON)
717     {
718         size_t bfree = sizeof (buffer);
719         size_t bfill = 0;
720
721         format_json_initialize (buffer, &bfill, &bfree);
722         format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
723         format_json_finalize (buffer, &bfill, &bfree);
724     }
725     else
726     {
727         ERROR ("amqp plugin: Invalid format (%i).", conf->format);
728         return (-1);
729     }
730
731     pthread_mutex_lock (&conf->lock);
732     status = camqp_write_locked (conf, buffer, routing_key);
733     pthread_mutex_unlock (&conf->lock);
734
735     return (status);
736 } /* }}} int camqp_write */
737
738 /*
739  * Config handling
740  */
741 static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */
742         camqp_config_t *conf)
743 {
744     char *string;
745     int status;
746
747     string = NULL;
748     status = cf_util_get_string (ci, &string);
749     if (status != 0)
750         return (status);
751
752     assert (string != NULL);
753     if (strcasecmp ("Command", string) == 0)
754         conf->format = CAMQP_FORMAT_COMMAND;
755     else if (strcasecmp ("JSON", string) == 0)
756         conf->format = CAMQP_FORMAT_JSON;
757     else
758     {
759         WARNING ("amqp plugin: Invalid format string: %s",
760                 string);
761     }
762
763     free (string);
764
765     return (0);
766 } /* }}} int config_set_string */
767
768 static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
769         _Bool publish)
770 {
771     camqp_config_t *conf;
772     int status;
773     int i;
774
775     conf = malloc (sizeof (*conf));
776     if (conf == NULL)
777     {
778         ERROR ("amqp plugin: malloc failed.");
779         return (ENOMEM);
780     }
781
782     /* Initialize "conf" {{{ */
783     memset (conf, 0, sizeof (*conf));
784     conf->publish = publish;
785     conf->name = NULL;
786     conf->format = CAMQP_FORMAT_COMMAND;
787     conf->host = NULL;
788     conf->port = 5672;
789     conf->vhost = NULL;
790     conf->user = NULL;
791     conf->password = NULL;
792     conf->exchange = NULL;
793     conf->routing_key = NULL;
794     /* publish only */
795     conf->delivery_mode = CAMQP_DM_VOLATILE;
796     conf->store_rates = 0;
797     /* subscribe only */
798     conf->exchange_type = NULL;
799     conf->queue = NULL;
800     /* general */
801     conf->connection = NULL;
802     pthread_mutex_init (&conf->lock, /* attr = */ NULL);
803     /* }}} */
804
805     status = cf_util_get_string (ci, &conf->name);
806     if (status != 0)
807     {
808         sfree (conf);
809         return (status);
810     }
811
812     for (i = 0; i < ci->children_num; i++)
813     {
814         oconfig_item_t *child = ci->children + i;
815
816         if (strcasecmp ("Host", child->key) == 0)
817             status = cf_util_get_string (child, &conf->host);
818         else if (strcasecmp ("Port", child->key) == 0)
819         {
820             status = cf_util_get_port_number (child);
821             if (status > 0)
822             {
823                 conf->port = status;
824                 status = 0;
825             }
826         }
827         else if (strcasecmp ("VHost", child->key) == 0)
828             status = cf_util_get_string (child, &conf->vhost);
829         else if (strcasecmp ("User", child->key) == 0)
830             status = cf_util_get_string (child, &conf->user);
831         else if (strcasecmp ("Password", child->key) == 0)
832             status = cf_util_get_string (child, &conf->password);
833         else if (strcasecmp ("Exchange", child->key) == 0)
834             status = cf_util_get_string (child, &conf->exchange);
835         else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
836             status = cf_util_get_string (child, &conf->exchange_type);
837         else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
838             status = cf_util_get_string (child, &conf->queue);
839         else if (strcasecmp ("RoutingKey", child->key) == 0)
840             status = cf_util_get_string (child, &conf->routing_key);
841         else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
842         {
843             _Bool tmp = 0;
844             status = cf_util_get_boolean (child, &tmp);
845             if (tmp)
846                 conf->delivery_mode = CAMQP_DM_PERSISTENT;
847             else
848                 conf->delivery_mode = CAMQP_DM_VOLATILE;
849         }
850         else if ((strcasecmp ("StoreRates", child->key) == 0) && publish)
851             status = cf_util_get_boolean (child, &conf->store_rates);
852         else if ((strcasecmp ("Format", child->key) == 0) && publish)
853             status = camqp_config_set_format (child, conf);
854         else
855             WARNING ("amqp plugin: Ignoring unknown "
856                     "configuration option \"%s\".", child->key);
857
858         if (status != 0)
859             break;
860     } /* for (i = 0; i < ci->children_num; i++) */
861
862     if ((status == 0) && (conf->exchange == NULL))
863     {
864         if (conf->exchange_type != NULL)
865             WARNING ("amqp plugin: The option \"ExchangeType\" was given "
866                     "without the \"Exchange\" option. It will be ignored.");
867
868         if (!publish && (conf->routing_key != NULL))
869             WARNING ("amqp plugin: The option \"RoutingKey\" was given "
870                     "without the \"Exchange\" option. It will be ignored.");
871
872     }
873
874     if (status != 0)
875     {
876         camqp_config_free (conf);
877         return (status);
878     }
879
880     if (conf->exchange != NULL)
881     {
882         DEBUG ("amqp plugin: camqp_config_connection: exchange = %s;",
883                 conf->exchange);
884     }
885
886     if (publish)
887     {
888         char cbname[128];
889         user_data_t ud = { conf, camqp_config_free };
890
891         ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
892
893         status = plugin_register_write (cbname, camqp_write, &ud);
894         if (status != 0)
895         {
896             camqp_config_free (conf);
897             return (status);
898         }
899     }
900     else
901     {
902         status = camqp_subscribe_init (conf);
903         if (status != 0)
904         {
905             camqp_config_free (conf);
906             return (status);
907         }
908     }
909
910     return (0);
911 } /* }}} int camqp_config_connection */
912
913 static int camqp_config (oconfig_item_t *ci) /* {{{ */
914 {
915     int i;
916
917     for (i = 0; i < ci->children_num; i++)
918     {
919         oconfig_item_t *child = ci->children + i;
920
921         if (strcasecmp ("Publish", child->key) == 0)
922             camqp_config_connection (child, /* publish = */ 1);
923         else if (strcasecmp ("Subscribe", child->key) == 0)
924             camqp_config_connection (child, /* publish = */ 0);
925         else
926             WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
927                     child->key);
928     } /* for (ci->children_num) */
929
930     return (0);
931 } /* }}} int camqp_config */
932
933 void module_register (void)
934 {
935     plugin_register_complex_config ("amqp", camqp_config);
936     plugin_register_shutdown ("amqp", camqp_shutdown);
937 } /* void module_register */
938
939 /* vim: set sw=4 sts=4 et fdm=marker : */