Merge branch 'collectd-4.10' into collectd-5.3
[collectd.git] / src / amqp.c
1 /**
2  * collectd - src/amqp.c
3  * Copyright (C) 2009       Sebastien Pahl
4  * Copyright (C) 2010-2012  Florian Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Sebastien Pahl <sebastien.pahl at dotcloud.com>
26  *   Florian Forster <octo at verplant.org>
27  **/
28
29 #include "collectd.h"
30 #include "common.h"
31 #include "plugin.h"
32 #include "utils_cmd_putval.h"
33 #include "utils_format_json.h"
34 #include "utils_format_graphite.h"
35
36 #include <pthread.h>
37
38 #include <amqp.h>
39 #include <amqp_framing.h>
40
41 /* Defines for the delivery mode. I have no idea why they're not defined by the
42  * library.. */
43 #define CAMQP_DM_VOLATILE   1
44 #define CAMQP_DM_PERSISTENT 2
45
46 #define CAMQP_FORMAT_COMMAND    1
47 #define CAMQP_FORMAT_JSON       2
48 #define CAMQP_FORMAT_GRAPHITE   3
49
50 #define CAMQP_CHANNEL 1
51
52 /*
53  * Data types
54  */
55 struct camqp_config_s
56 {
57     _Bool   publish;
58     char   *name;
59
60     char   *host;
61     int     port;
62     char   *vhost;
63     char   *user;
64     char   *password;
65
66     char   *exchange;
67     char   *routing_key;
68
69     /* publish only */
70     uint8_t delivery_mode;
71     _Bool   store_rates;
72     int     format;
73     /* publish & graphite format only */
74     char    *prefix;
75     char    *postfix;
76     char    escape_char;
77     unsigned int graphite_flags;
78
79     /* subscribe only */
80     char   *exchange_type;
81     char   *queue;
82
83     amqp_connection_state_t connection;
84     pthread_mutex_t lock;
85 };
86 typedef struct camqp_config_s camqp_config_t;
87
88 /*
89  * Global variables
90  */
91 static const char *def_host       = "localhost";
92 static const char *def_vhost      = "/";
93 static const char *def_user       = "guest";
94 static const char *def_password   = "guest";
95 static const char *def_exchange   = "amq.fanout";
96
97 static pthread_t *subscriber_threads     = NULL;
98 static size_t     subscriber_threads_num = 0;
99 static _Bool      subscriber_threads_running = 1;
100
101 #define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
102
103 /*
104  * Functions
105  */
106 static void camqp_close_connection (camqp_config_t *conf) /* {{{ */
107 {
108     int sockfd;
109
110     if ((conf == NULL) || (conf->connection == NULL))
111         return;
112
113     sockfd = amqp_get_sockfd (conf->connection);
114     amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS);
115     amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
116     amqp_destroy_connection (conf->connection);
117     close (sockfd);
118     conf->connection = NULL;
119 } /* }}} void camqp_close_connection */
120
121 static void camqp_config_free (void *ptr) /* {{{ */
122 {
123     camqp_config_t *conf = ptr;
124
125     if (conf == NULL)
126         return;
127
128     camqp_close_connection (conf);
129
130     sfree (conf->name);
131     sfree (conf->host);
132     sfree (conf->vhost);
133     sfree (conf->user);
134     sfree (conf->password);
135     sfree (conf->exchange);
136     sfree (conf->exchange_type);
137     sfree (conf->queue);
138     sfree (conf->routing_key);
139     sfree (conf->prefix);
140     sfree (conf->postfix);
141
142
143     sfree (conf);
144 } /* }}} void camqp_config_free */
145
146 static char *camqp_bytes_cstring (amqp_bytes_t *in) /* {{{ */
147 {
148     char *ret;
149
150     if ((in == NULL) || (in->bytes == NULL))
151         return (NULL);
152
153     ret = malloc (in->len + 1);
154     if (ret == NULL)
155         return (NULL);
156
157     memcpy (ret, in->bytes, in->len);
158     ret[in->len] = 0;
159
160     return (ret);
161 } /* }}} char *camqp_bytes_cstring */
162
163 static _Bool camqp_is_error (camqp_config_t *conf) /* {{{ */
164 {
165     amqp_rpc_reply_t r;
166
167     r = amqp_get_rpc_reply (conf->connection);
168     if (r.reply_type == AMQP_RESPONSE_NORMAL)
169         return (0);
170
171     return (1);
172 } /* }}} _Bool camqp_is_error */
173
174 static char *camqp_strerror (camqp_config_t *conf, /* {{{ */
175         char *buffer, size_t buffer_size)
176 {
177     amqp_rpc_reply_t r;
178
179     r = amqp_get_rpc_reply (conf->connection);
180     switch (r.reply_type)
181     {
182         case AMQP_RESPONSE_NORMAL:
183             sstrncpy (buffer, "Success", sizeof (buffer));
184             break;
185
186         case AMQP_RESPONSE_NONE:
187             sstrncpy (buffer, "Missing RPC reply type", sizeof (buffer));
188             break;
189
190         case AMQP_RESPONSE_LIBRARY_EXCEPTION:
191 #if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
192             if (r.library_errno)
193                 return (sstrerror (r.library_errno, buffer, buffer_size));
194 #else
195             if (r.library_error)
196                 return (sstrerror (r.library_error, buffer, buffer_size));
197 #endif
198             else
199                 sstrncpy (buffer, "End of stream", sizeof (buffer));
200             break;
201
202         case AMQP_RESPONSE_SERVER_EXCEPTION:
203             if (r.reply.id == AMQP_CONNECTION_CLOSE_METHOD)
204             {
205                 amqp_connection_close_t *m = r.reply.decoded;
206                 char *tmp = camqp_bytes_cstring (&m->reply_text);
207                 ssnprintf (buffer, buffer_size, "Server connection error %d: %s",
208                         m->reply_code, tmp);
209                 sfree (tmp);
210             }
211             else if (r.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
212             {
213                 amqp_channel_close_t *m = r.reply.decoded;
214                 char *tmp = camqp_bytes_cstring (&m->reply_text);
215                 ssnprintf (buffer, buffer_size, "Server channel error %d: %s",
216                         m->reply_code, tmp);
217                 sfree (tmp);
218             }
219             else
220             {
221                 ssnprintf (buffer, buffer_size, "Server error method %#"PRIx32,
222                         r.reply.id);
223             }
224             break;
225
226         default:
227             ssnprintf (buffer, buffer_size, "Unknown reply type %i",
228                     (int) r.reply_type);
229     }
230
231     return (buffer);
232 } /* }}} char *camqp_strerror */
233
234 #if HAVE_AMQP_RPC_REPLY_T_LIBRARY_ERRNO
235 static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
236 {
237     amqp_exchange_declare_ok_t *ed_ret;
238
239     if (conf->exchange_type == NULL)
240         return (0);
241
242     ed_ret = amqp_exchange_declare (conf->connection,
243             /* channel     = */ CAMQP_CHANNEL,
244             /* exchange    = */ amqp_cstring_bytes (conf->exchange),
245             /* type        = */ amqp_cstring_bytes (conf->exchange_type),
246             /* passive     = */ 0,
247             /* durable     = */ 0,
248             /* auto_delete = */ 1,
249             /* arguments   = */ AMQP_EMPTY_TABLE);
250     if ((ed_ret == NULL) && camqp_is_error (conf))
251     {
252         char errbuf[1024];
253         ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
254                 camqp_strerror (conf, errbuf, sizeof (errbuf)));
255         camqp_close_connection (conf);
256         return (-1);
257     }
258
259     INFO ("amqp plugin: Successfully created exchange \"%s\" "
260             "with type \"%s\".",
261             conf->exchange, conf->exchange_type);
262
263     return (0);
264 } /* }}} int camqp_create_exchange */
265 #else
266 static int camqp_create_exchange (camqp_config_t *conf) /* {{{ */
267 {
268     amqp_exchange_declare_ok_t *ed_ret;
269     amqp_table_t argument_table;
270     struct amqp_table_entry_t_ argument_table_entries[1];
271
272     if (conf->exchange_type == NULL)
273         return (0);
274
275     /* Valid arguments: "auto_delete", "internal" */
276     argument_table.num_entries = STATIC_ARRAY_SIZE (argument_table_entries);
277     argument_table.entries = argument_table_entries;
278     argument_table_entries[0].key = amqp_cstring_bytes ("auto_delete");
279     argument_table_entries[0].value.kind = AMQP_FIELD_KIND_BOOLEAN;
280     argument_table_entries[0].value.value.boolean = 1;
281
282     ed_ret = amqp_exchange_declare (conf->connection,
283             /* channel     = */ CAMQP_CHANNEL,
284             /* exchange    = */ amqp_cstring_bytes (conf->exchange),
285             /* type        = */ amqp_cstring_bytes (conf->exchange_type),
286             /* passive     = */ 0,
287             /* durable     = */ 0,
288             /* arguments   = */ argument_table);
289     if ((ed_ret == NULL) && camqp_is_error (conf))
290     {
291         char errbuf[1024];
292         ERROR ("amqp plugin: amqp_exchange_declare failed: %s",
293                 camqp_strerror (conf, errbuf, sizeof (errbuf)));
294         camqp_close_connection (conf);
295         return (-1);
296     }
297
298     INFO ("amqp plugin: Successfully created exchange \"%s\" "
299             "with type \"%s\".",
300             conf->exchange, conf->exchange_type);
301
302     return (0);
303 } /* }}} int camqp_create_exchange */
304 #endif
305
306 static int camqp_setup_queue (camqp_config_t *conf) /* {{{ */
307 {
308     amqp_queue_declare_ok_t *qd_ret;
309     amqp_basic_consume_ok_t *cm_ret;
310
311     qd_ret = amqp_queue_declare (conf->connection,
312             /* channel     = */ CAMQP_CHANNEL,
313             /* queue       = */ (conf->queue != NULL)
314             ? amqp_cstring_bytes (conf->queue)
315             : AMQP_EMPTY_BYTES,
316             /* passive     = */ 0,
317             /* durable     = */ 0,
318             /* exclusive   = */ 0,
319             /* auto_delete = */ 1,
320             /* arguments   = */ AMQP_EMPTY_TABLE);
321     if (qd_ret == NULL)
322     {
323         ERROR ("amqp plugin: amqp_queue_declare failed.");
324         camqp_close_connection (conf);
325         return (-1);
326     }
327
328     if (conf->queue == NULL)
329     {
330         conf->queue = camqp_bytes_cstring (&qd_ret->queue);
331         if (conf->queue == NULL)
332         {
333             ERROR ("amqp plugin: camqp_bytes_cstring failed.");
334             camqp_close_connection (conf);
335             return (-1);
336         }
337
338         INFO ("amqp plugin: Created queue \"%s\".", conf->queue);
339     }
340     DEBUG ("amqp plugin: Successfully created queue \"%s\".", conf->queue);
341
342     /* bind to an exchange */
343     if (conf->exchange != NULL)
344     {
345         amqp_queue_bind_ok_t *qb_ret;
346
347         assert (conf->queue != NULL);
348         qb_ret = amqp_queue_bind (conf->connection,
349                 /* channel     = */ CAMQP_CHANNEL,
350                 /* queue       = */ amqp_cstring_bytes (conf->queue),
351                 /* exchange    = */ amqp_cstring_bytes (conf->exchange),
352                 /* routing_key = */ (conf->routing_key != NULL)
353                 ? amqp_cstring_bytes (conf->routing_key)
354                 : AMQP_EMPTY_BYTES,
355                 /* arguments   = */ AMQP_EMPTY_TABLE);
356         if ((qb_ret == NULL) && camqp_is_error (conf))
357         {
358             char errbuf[1024];
359             ERROR ("amqp plugin: amqp_queue_bind failed: %s",
360                     camqp_strerror (conf, errbuf, sizeof (errbuf)));
361             camqp_close_connection (conf);
362             return (-1);
363         }
364
365         DEBUG ("amqp plugin: Successfully bound queue \"%s\" to exchange \"%s\".",
366                 conf->queue, conf->exchange);
367     } /* if (conf->exchange != NULL) */
368
369     cm_ret = amqp_basic_consume (conf->connection,
370             /* channel      = */ CAMQP_CHANNEL,
371             /* queue        = */ amqp_cstring_bytes (conf->queue),
372             /* consumer_tag = */ AMQP_EMPTY_BYTES,
373             /* no_local     = */ 0,
374             /* no_ack       = */ 1,
375             /* exclusive    = */ 0,
376             /* arguments    = */ AMQP_EMPTY_TABLE
377         );
378     if ((cm_ret == NULL) && camqp_is_error (conf))
379     {
380         char errbuf[1024];
381         ERROR ("amqp plugin: amqp_basic_consume failed: %s",
382                     camqp_strerror (conf, errbuf, sizeof (errbuf)));
383         camqp_close_connection (conf);
384         return (-1);
385     }
386
387     return (0);
388 } /* }}} int camqp_setup_queue */
389
390 static int camqp_connect (camqp_config_t *conf) /* {{{ */
391 {
392     amqp_rpc_reply_t reply;
393     int sockfd;
394     int status;
395
396     if (conf->connection != NULL)
397         return (0);
398
399     conf->connection = amqp_new_connection ();
400     if (conf->connection == NULL)
401     {
402         ERROR ("amqp plugin: amqp_new_connection failed.");
403         return (ENOMEM);
404     }
405
406     sockfd = amqp_open_socket (CONF(conf, host), conf->port);
407     if (sockfd < 0)
408     {
409         char errbuf[1024];
410         status = (-1) * sockfd;
411         ERROR ("amqp plugin: amqp_open_socket failed: %s",
412                 sstrerror (status, errbuf, sizeof (errbuf)));
413         amqp_destroy_connection (conf->connection);
414         conf->connection = NULL;
415         return (status);
416     }
417     amqp_set_sockfd (conf->connection, sockfd);
418
419     reply = amqp_login (conf->connection, CONF(conf, vhost),
420             /* channel max = */      0,
421             /* frame max   = */ 131072,
422             /* heartbeat   = */      0,
423             /* authentication = */ AMQP_SASL_METHOD_PLAIN,
424             CONF(conf, user), CONF(conf, password));
425     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
426     {
427         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
428                 CONF(conf, vhost), CONF(conf, user));
429         amqp_destroy_connection (conf->connection);
430         close (sockfd);
431         conf->connection = NULL;
432         return (1);
433     }
434
435     amqp_channel_open (conf->connection, /* channel = */ 1);
436     /* FIXME: Is checking "reply.reply_type" really correct here? How does
437      * it get set? --octo */
438     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
439     {
440         ERROR ("amqp plugin: amqp_channel_open failed.");
441         amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
442         amqp_destroy_connection (conf->connection);
443         close(sockfd);
444         conf->connection = NULL;
445         return (1);
446     }
447
448     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
449             "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
450
451     status = camqp_create_exchange (conf);
452     if (status != 0)
453         return (status);
454
455     if (!conf->publish)
456         return (camqp_setup_queue (conf));
457     return (0);
458 } /* }}} int camqp_connect */
459
460 static int camqp_shutdown (void) /* {{{ */
461 {
462     size_t i;
463
464     DEBUG ("amqp plugin: Shutting down %zu subscriber threads.",
465             subscriber_threads_num);
466
467     subscriber_threads_running = 0;
468     for (i = 0; i < subscriber_threads_num; i++)
469     {
470         /* FIXME: Sending a signal is not very elegant here. Maybe find out how
471          * to use a timeout in the thread and check for the variable in regular
472          * intervals. */
473         pthread_kill (subscriber_threads[i], SIGTERM);
474         pthread_join (subscriber_threads[i], /* retval = */ NULL);
475     }
476
477     subscriber_threads_num = 0;
478     sfree (subscriber_threads);
479
480     DEBUG ("amqp plugin: All subscriber threads exited.");
481
482     return (0);
483 } /* }}} int camqp_shutdown */
484
485 /*
486  * Subscribing code
487  */
488 static int camqp_read_body (camqp_config_t *conf, /* {{{ */
489         size_t body_size, const char *content_type)
490 {
491     char body[body_size + 1];
492     char *body_ptr;
493     size_t received;
494     amqp_frame_t frame;
495     int status;
496
497     memset (body, 0, sizeof (body));
498     body_ptr = &body[0];
499     received = 0;
500
501     while (received < body_size)
502     {
503         status = amqp_simple_wait_frame (conf->connection, &frame);
504         if (status < 0)
505         {
506             char errbuf[1024];
507             status = (-1) * status;
508             ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
509                     sstrerror (status, errbuf, sizeof (errbuf)));
510             camqp_close_connection (conf);
511             return (status);
512         }
513
514         if (frame.frame_type != AMQP_FRAME_BODY)
515         {
516             NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
517                     frame.frame_type);
518             return (-1);
519         }
520
521         if ((body_size - received) < frame.payload.body_fragment.len)
522         {
523             WARNING ("amqp plugin: Body is larger than indicated by header.");
524             return (-1);
525         }
526
527         memcpy (body_ptr, frame.payload.body_fragment.bytes,
528                 frame.payload.body_fragment.len);
529         body_ptr += frame.payload.body_fragment.len;
530         received += frame.payload.body_fragment.len;
531     } /* while (received < body_size) */
532
533     if (strcasecmp ("text/collectd", content_type) == 0)
534     {
535         status = handle_putval (stderr, body);
536         if (status != 0)
537             ERROR ("amqp plugin: handle_putval failed with status %i.",
538                     status);
539         return (status);
540     }
541     else if (strcasecmp ("application/json", content_type) == 0)
542     {
543         ERROR ("amqp plugin: camqp_read_body: Parsing JSON data has not "
544                 "been implemented yet. FIXME!");
545         return (0);
546     }
547     else
548     {
549         ERROR ("amqp plugin: camqp_read_body: Unknown content type \"%s\".",
550                 content_type);
551         return (EINVAL);
552     }
553
554     /* not reached */
555     return (0);
556 } /* }}} int camqp_read_body */
557
558 static int camqp_read_header (camqp_config_t *conf) /* {{{ */
559 {
560     int status;
561     amqp_frame_t frame;
562     amqp_basic_properties_t *properties;
563     char *content_type;
564
565     status = amqp_simple_wait_frame (conf->connection, &frame);
566     if (status < 0)
567     {
568         char errbuf[1024];
569         status = (-1) * status;
570         ERROR ("amqp plugin: amqp_simple_wait_frame failed: %s",
571                     sstrerror (status, errbuf, sizeof (errbuf)));
572         camqp_close_connection (conf);
573         return (status);
574     }
575
576     if (frame.frame_type != AMQP_FRAME_HEADER)
577     {
578         NOTICE ("amqp plugin: Unexpected frame type: %#"PRIx8,
579                 frame.frame_type);
580         return (-1);
581     }
582
583     properties = frame.payload.properties.decoded;
584     content_type = camqp_bytes_cstring (&properties->content_type);
585     if (content_type == NULL)
586     {
587         ERROR ("amqp plugin: Unable to determine content type.");
588         return (-1);
589     }
590
591     status = camqp_read_body (conf,
592             (size_t) frame.payload.properties.body_size,
593             content_type);
594
595     sfree (content_type);
596     return (status);
597 } /* }}} int camqp_read_header */
598
599 static void *camqp_subscribe_thread (void *user_data) /* {{{ */
600 {
601     camqp_config_t *conf = user_data;
602     int status;
603
604     cdtime_t interval = plugin_get_interval ();
605
606     while (subscriber_threads_running)
607     {
608         amqp_frame_t frame;
609
610         status = camqp_connect (conf);
611         if (status != 0)
612         {
613             struct timespec ts_interval;
614             ERROR ("amqp plugin: camqp_connect failed. "
615                     "Will sleep for %.3f seconds.",
616                     CDTIME_T_TO_DOUBLE (interval));
617             CDTIME_T_TO_TIMESPEC (interval, &ts_interval);
618             nanosleep (&ts_interval, /* remaining = */ NULL);
619             continue;
620         }
621
622         status = amqp_simple_wait_frame (conf->connection, &frame);
623         if (status < 0)
624         {
625             struct timespec ts_interval;
626             ERROR ("amqp plugin: amqp_simple_wait_frame failed. "
627                     "Will sleep for %.3f seconds.",
628                     CDTIME_T_TO_DOUBLE (interval));
629             camqp_close_connection (conf);
630             CDTIME_T_TO_TIMESPEC (interval, &ts_interval);
631             nanosleep (&ts_interval, /* remaining = */ NULL);
632             continue;
633         }
634
635         if (frame.frame_type != AMQP_FRAME_METHOD)
636         {
637             DEBUG ("amqp plugin: Unexpected frame type: %#"PRIx8,
638                     frame.frame_type);
639             continue;
640         }
641
642         if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
643         {
644             DEBUG ("amqp plugin: Unexpected method id: %#"PRIx32,
645                     frame.payload.method.id);
646             continue;
647         }
648
649         status = camqp_read_header (conf);
650
651         amqp_maybe_release_buffers (conf->connection);
652     } /* while (subscriber_threads_running) */
653
654     camqp_config_free (conf);
655     pthread_exit (NULL);
656     return (NULL);
657 } /* }}} void *camqp_subscribe_thread */
658
659 static int camqp_subscribe_init (camqp_config_t *conf) /* {{{ */
660 {
661     int status;
662     pthread_t *tmp;
663
664     tmp = realloc (subscriber_threads,
665             sizeof (*subscriber_threads) * (subscriber_threads_num + 1));
666     if (tmp == NULL)
667     {
668         ERROR ("amqp plugin: realloc failed.");
669         camqp_config_free (conf);
670         return (ENOMEM);
671     }
672     subscriber_threads = tmp;
673     tmp = subscriber_threads + subscriber_threads_num;
674     memset (tmp, 0, sizeof (*tmp));
675
676     status = plugin_thread_create (tmp, /* attr = */ NULL,
677             camqp_subscribe_thread, conf);
678     if (status != 0)
679     {
680         char errbuf[1024];
681         ERROR ("amqp plugin: pthread_create failed: %s",
682                 sstrerror (status, errbuf, sizeof (errbuf)));
683         camqp_config_free (conf);
684         return (status);
685     }
686
687     subscriber_threads_num++;
688
689     return (0);
690 } /* }}} int camqp_subscribe_init */
691
692 /*
693  * Publishing code
694  */
695 /* XXX: You must hold "conf->lock" when calling this function! */
696 static int camqp_write_locked (camqp_config_t *conf, /* {{{ */
697         const char *buffer, const char *routing_key)
698 {
699     amqp_basic_properties_t props;
700     int status;
701
702     status = camqp_connect (conf);
703     if (status != 0)
704         return (status);
705
706     memset (&props, 0, sizeof (props));
707     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
708         | AMQP_BASIC_DELIVERY_MODE_FLAG
709         | AMQP_BASIC_APP_ID_FLAG;
710     if (conf->format == CAMQP_FORMAT_COMMAND)
711         props.content_type = amqp_cstring_bytes("text/collectd");
712     else if (conf->format == CAMQP_FORMAT_JSON)
713         props.content_type = amqp_cstring_bytes("application/json");
714     else if (conf->format == CAMQP_FORMAT_GRAPHITE)
715         props.content_type = amqp_cstring_bytes("text/graphite");
716     else
717         assert (23 == 42);
718     props.delivery_mode = conf->delivery_mode;
719     props.app_id = amqp_cstring_bytes("collectd");
720
721     status = amqp_basic_publish(conf->connection,
722                 /* channel = */ 1,
723                 amqp_cstring_bytes(CONF(conf, exchange)),
724                 amqp_cstring_bytes (routing_key),
725                 /* mandatory = */ 0,
726                 /* immediate = */ 0,
727                 &props,
728                 amqp_cstring_bytes(buffer));
729     if (status != 0)
730     {
731         ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
732                 status);
733         camqp_close_connection (conf);
734     }
735
736     return (status);
737 } /* }}} int camqp_write_locked */
738
739 static int camqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
740         user_data_t *user_data)
741 {
742     camqp_config_t *conf = user_data->data;
743     char routing_key[6 * DATA_MAX_NAME_LEN];
744     char buffer[4096];
745     int status;
746
747     if ((ds == NULL) || (vl == NULL) || (conf == NULL))
748         return (EINVAL);
749
750     memset (buffer, 0, sizeof (buffer));
751
752     if (conf->routing_key != NULL)
753     {
754         sstrncpy (routing_key, conf->routing_key, sizeof (routing_key));
755     }
756     else
757     {
758         size_t i;
759         ssnprintf (routing_key, sizeof (routing_key), "collectd/%s/%s/%s/%s/%s",
760                 vl->host,
761                 vl->plugin, vl->plugin_instance,
762                 vl->type, vl->type_instance);
763
764         /* Switch slashes (the only character forbidden by collectd) and dots
765          * (the separation character used by AMQP). */
766         for (i = 0; routing_key[i] != 0; i++)
767         {
768             if (routing_key[i] == '.')
769                 routing_key[i] = '/';
770             else if (routing_key[i] == '/')
771                 routing_key[i] = '.';
772         }
773     }
774
775     if (conf->format == CAMQP_FORMAT_COMMAND)
776     {
777         status = create_putval (buffer, sizeof (buffer), ds, vl);
778         if (status != 0)
779         {
780             ERROR ("amqp plugin: create_putval failed with status %i.",
781                     status);
782             return (status);
783         }
784     }
785     else if (conf->format == CAMQP_FORMAT_JSON)
786     {
787         size_t bfree = sizeof (buffer);
788         size_t bfill = 0;
789
790         format_json_initialize (buffer, &bfill, &bfree);
791         format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
792         format_json_finalize (buffer, &bfill, &bfree);
793     }
794     else if (conf->format == CAMQP_FORMAT_GRAPHITE)
795     {
796         status = format_graphite (buffer, sizeof (buffer), ds, vl,
797                     conf->prefix, conf->postfix, conf->escape_char,
798                     conf->graphite_flags);
799         if (status != 0)
800         {
801             ERROR ("amqp plugin: format_graphite failed with status %i.",
802                     status);
803             return (status);
804         }
805     }
806     else
807     {
808         ERROR ("amqp plugin: Invalid format (%i).", conf->format);
809         return (-1);
810     }
811
812     pthread_mutex_lock (&conf->lock);
813     status = camqp_write_locked (conf, buffer, routing_key);
814     pthread_mutex_unlock (&conf->lock);
815
816     return (status);
817 } /* }}} int camqp_write */
818
819 /*
820  * Config handling
821  */
822 static int camqp_config_set_format (oconfig_item_t *ci, /* {{{ */
823         camqp_config_t *conf)
824 {
825     char *string;
826     int status;
827
828     string = NULL;
829     status = cf_util_get_string (ci, &string);
830     if (status != 0)
831         return (status);
832
833     assert (string != NULL);
834     if (strcasecmp ("Command", string) == 0)
835         conf->format = CAMQP_FORMAT_COMMAND;
836     else if (strcasecmp ("JSON", string) == 0)
837         conf->format = CAMQP_FORMAT_JSON;
838     else if (strcasecmp ("Graphite", string) == 0)
839         conf->format = CAMQP_FORMAT_GRAPHITE;
840     else
841     {
842         WARNING ("amqp plugin: Invalid format string: %s",
843                 string);
844     }
845
846     free (string);
847
848     return (0);
849 } /* }}} int config_set_string */
850
851 static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
852         _Bool publish)
853 {
854     camqp_config_t *conf;
855     int status;
856     int i;
857
858     conf = malloc (sizeof (*conf));
859     if (conf == NULL)
860     {
861         ERROR ("amqp plugin: malloc failed.");
862         return (ENOMEM);
863     }
864
865     /* Initialize "conf" {{{ */
866     memset (conf, 0, sizeof (*conf));
867     conf->publish = publish;
868     conf->name = NULL;
869     conf->format = CAMQP_FORMAT_COMMAND;
870     conf->host = NULL;
871     conf->port = 5672;
872     conf->vhost = NULL;
873     conf->user = NULL;
874     conf->password = NULL;
875     conf->exchange = NULL;
876     conf->routing_key = NULL;
877     /* publish only */
878     conf->delivery_mode = CAMQP_DM_VOLATILE;
879     conf->store_rates = 0;
880     /* publish & graphite only */
881     conf->prefix = NULL;
882     conf->postfix = NULL;
883     conf->escape_char = '_';
884     /* subscribe only */
885     conf->exchange_type = NULL;
886     conf->queue = NULL;
887     /* general */
888     conf->connection = NULL;
889     pthread_mutex_init (&conf->lock, /* attr = */ NULL);
890     /* }}} */
891
892     status = cf_util_get_string (ci, &conf->name);
893     if (status != 0)
894     {
895         sfree (conf);
896         return (status);
897     }
898
899     for (i = 0; i < ci->children_num; i++)
900     {
901         oconfig_item_t *child = ci->children + i;
902
903         if (strcasecmp ("Host", child->key) == 0)
904             status = cf_util_get_string (child, &conf->host);
905         else if (strcasecmp ("Port", child->key) == 0)
906         {
907             status = cf_util_get_port_number (child);
908             if (status > 0)
909             {
910                 conf->port = status;
911                 status = 0;
912             }
913         }
914         else if (strcasecmp ("VHost", child->key) == 0)
915             status = cf_util_get_string (child, &conf->vhost);
916         else if (strcasecmp ("User", child->key) == 0)
917             status = cf_util_get_string (child, &conf->user);
918         else if (strcasecmp ("Password", child->key) == 0)
919             status = cf_util_get_string (child, &conf->password);
920         else if (strcasecmp ("Exchange", child->key) == 0)
921             status = cf_util_get_string (child, &conf->exchange);
922         else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
923             status = cf_util_get_string (child, &conf->exchange_type);
924         else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
925             status = cf_util_get_string (child, &conf->queue);
926         else if (strcasecmp ("RoutingKey", child->key) == 0)
927             status = cf_util_get_string (child, &conf->routing_key);
928         else if ((strcasecmp ("Persistent", child->key) == 0) && publish)
929         {
930             _Bool tmp = 0;
931             status = cf_util_get_boolean (child, &tmp);
932             if (tmp)
933                 conf->delivery_mode = CAMQP_DM_PERSISTENT;
934             else
935                 conf->delivery_mode = CAMQP_DM_VOLATILE;
936         }
937         else if ((strcasecmp ("StoreRates", child->key) == 0) && publish)
938         {
939             status = cf_util_get_boolean (child, &conf->store_rates);
940             (void) cf_util_get_flag (child, &conf->graphite_flags,
941                     GRAPHITE_STORE_RATES);
942         }
943         else if ((strcasecmp ("Format", child->key) == 0) && publish)
944             status = camqp_config_set_format (child, conf);
945         else if ((strcasecmp ("GraphitePrefix", child->key) == 0) && publish)
946             status = cf_util_get_string (child, &conf->prefix);
947         else if ((strcasecmp ("GraphitePostfix", child->key) == 0) && publish)
948             status = cf_util_get_string (child, &conf->postfix);
949         else if ((strcasecmp ("GraphiteEscapeChar", child->key) == 0) && publish)
950         {
951             char *tmp_buff = NULL;
952             status = cf_util_get_string (child, &tmp_buff);
953             if (strlen (tmp_buff) > 1)
954                 WARNING ("amqp plugin: The option \"GraphiteEscapeChar\" handles "
955                         "only one character. Others will be ignored.");
956             conf->escape_char = tmp_buff[0];
957             sfree (tmp_buff);
958         }
959         else
960             WARNING ("amqp plugin: Ignoring unknown "
961                     "configuration option \"%s\".", child->key);
962
963         if (status != 0)
964             break;
965     } /* for (i = 0; i < ci->children_num; i++) */
966
967     if ((status == 0) && (conf->exchange == NULL))
968     {
969         if (conf->exchange_type != NULL)
970             WARNING ("amqp plugin: The option \"ExchangeType\" was given "
971                     "without the \"Exchange\" option. It will be ignored.");
972
973         if (!publish && (conf->routing_key != NULL))
974             WARNING ("amqp plugin: The option \"RoutingKey\" was given "
975                     "without the \"Exchange\" option. It will be ignored.");
976
977     }
978
979     if (status != 0)
980     {
981         camqp_config_free (conf);
982         return (status);
983     }
984
985     if (conf->exchange != NULL)
986     {
987         DEBUG ("amqp plugin: camqp_config_connection: exchange = %s;",
988                 conf->exchange);
989     }
990
991     if (publish)
992     {
993         char cbname[128];
994         user_data_t ud = { conf, camqp_config_free };
995
996         ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
997
998         status = plugin_register_write (cbname, camqp_write, &ud);
999         if (status != 0)
1000         {
1001             camqp_config_free (conf);
1002             return (status);
1003         }
1004     }
1005     else
1006     {
1007         status = camqp_subscribe_init (conf);
1008         if (status != 0)
1009         {
1010             camqp_config_free (conf);
1011             return (status);
1012         }
1013     }
1014
1015     return (0);
1016 } /* }}} int camqp_config_connection */
1017
1018 static int camqp_config (oconfig_item_t *ci) /* {{{ */
1019 {
1020     int i;
1021
1022     for (i = 0; i < ci->children_num; i++)
1023     {
1024         oconfig_item_t *child = ci->children + i;
1025
1026         if (strcasecmp ("Publish", child->key) == 0)
1027             camqp_config_connection (child, /* publish = */ 1);
1028         else if (strcasecmp ("Subscribe", child->key) == 0)
1029             camqp_config_connection (child, /* publish = */ 0);
1030         else
1031             WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
1032                     child->key);
1033     } /* for (ci->children_num) */
1034
1035     return (0);
1036 } /* }}} int camqp_config */
1037
1038 void module_register (void)
1039 {
1040     plugin_register_complex_config ("amqp", camqp_config);
1041     plugin_register_shutdown ("amqp", camqp_shutdown);
1042 } /* void module_register */
1043
1044 /* vim: set sw=4 sts=4 et fdm=marker : */