amqp plugin: Implement publishing to multiple brokers.
[collectd.git] / src / amqp.c
1 /**
2  * collectd - src/amqp.c
3  * Copyright (C) 2009  Sebastien Pahl
4  * Copyright (C) 2010  Florian Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Sebastien Pahl <sebastien.pahl at dotcloud.com>
26  *   Florian Forster <octo at verplant.org>
27  **/
28
29 #include <stdint.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <strings.h>
33 #include <pthread.h>
34
35 #include "collectd.h"
36 #include "common.h"
37 #include "plugin.h"
38 #include "utils_format_json.h"
39
40 #include <amqp.h>
41 #include <amqp_framing.h>
42
43 /* Defines for the delivery mode. I have no idea why they're not defined by the
44  * library.. */
45 #define CAMQP_DM_VOLATILE   1
46 #define CAMQP_DM_PERSISTENT 2
47
48 #define CAMQP_CHANNEL 1
49
50 /*
51  * Data types
52  */
53 struct camqp_config_s
54 {
55     _Bool   publish;
56     char   *name;
57
58     char   *host;
59     int     port;
60     char   *vhost;
61     char   *user;
62     char   *password;
63
64     char   *exchange;
65     char   *exchange_type;
66     char   *queue;
67     char   *routingkey;
68     uint8_t delivery_mode;
69
70     _Bool   store_rates;
71
72     amqp_connection_state_t connection;
73     pthread_mutex_t lock;
74 };
75 typedef struct camqp_config_s camqp_config_t;
76
77 /*
78  * Global variables
79  */
80 static const char *def_host       = "localhost";
81 static const char *def_vhost      = "/";
82 static const char *def_user       = "guest";
83 static const char *def_password   = "guest";
84 static const char *def_exchange   = "amq.fanout";
85 static const char *def_routingkey = "collectd";
86
87 #define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
88
89 /*
90  * Functions
91  */
92 static void camqp_close_connection (camqp_config_t *conf) /* {{{ */
93 {
94     int sockfd;
95
96     if ((conf == NULL) || (conf->connection == NULL))
97         return;
98
99     sockfd = amqp_get_sockfd (conf->connection);
100     amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS);
101     amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
102     amqp_destroy_connection (conf->connection);
103     close (sockfd);
104     conf->connection = NULL;
105 } /* }}} void camqp_close_connection */
106
107 static void camqp_config_free (void *ptr) /* {{{ */
108 {
109     camqp_config_t *conf = ptr;
110
111     if (conf == NULL)
112         return;
113
114     camqp_close_connection (conf);
115
116     sfree (conf->name);
117     sfree (conf->host);
118     sfree (conf->vhost);
119     sfree (conf->user);
120     sfree (conf->password);
121     sfree (conf->exchange);
122     sfree (conf->exchange_type);
123     sfree (conf->queue);
124     sfree (conf->routingkey);
125
126     sfree (conf);
127 } /* }}} void camqp_config_free */
128
129 static int amqp_connect (camqp_config_t *conf) /* {{{ */
130 {
131     amqp_rpc_reply_t reply;
132     int sockfd;
133     int status;
134
135     if (conf->connection != NULL)
136         return (0);
137
138     conf->connection = amqp_new_connection ();
139     if (conf->connection == NULL)
140     {
141         ERROR ("amqp plugin: amqp_new_connection failed.");
142         return (ENOMEM);
143     }
144
145     sockfd = amqp_open_socket (CONF(conf, host), conf->port);
146     if (sockfd < 0)
147     {
148         char errbuf[1024];
149         status = (-1) * sockfd;
150         ERROR ("amqp plugin: amqp_open_socket failed: %s",
151                 sstrerror (status, errbuf, sizeof (errbuf)));
152         amqp_destroy_connection (conf->connection);
153         conf->connection = NULL;
154         return (status);
155     }
156     amqp_set_sockfd (conf->connection, sockfd);
157
158     reply = amqp_login (conf->connection, CONF(conf, vhost),
159             /* channel max = */      0,
160             /* frame max   = */ 131072,
161             /* heartbeat   = */      0,
162             /* authentication = */ AMQP_SASL_METHOD_PLAIN,
163             CONF(conf, user), CONF(conf, password));
164     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
165     {
166         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
167                 CONF(conf, vhost), CONF(conf, user));
168         amqp_destroy_connection (conf->connection);
169         close (sockfd);
170         conf->connection = NULL;
171         return (1);
172     }
173
174     amqp_channel_open (conf->connection, /* channel = */ 1);
175     /* FIXME: Is checking "reply.reply_type" really correct here? How does
176      * it get set? --octo */
177     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
178     {
179         ERROR ("amqp plugin: amqp_channel_open failed.");
180         amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
181         amqp_destroy_connection (conf->connection);
182         close(sockfd);
183         conf->connection = NULL;
184         return (1);
185     }
186
187     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
188             "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
189     return (0);
190 } /* }}} int amqp_connect */
191
192 static int amqp_write_locked (camqp_config_t *conf, /* {{{ */
193         const char *buffer)
194 {
195     amqp_basic_properties_t props;
196     int status;
197
198     status = amqp_connect (conf);
199     if (status != 0)
200         return (status);
201
202     memset (&props, 0, sizeof (props));
203     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
204         | AMQP_BASIC_DELIVERY_MODE_FLAG
205         | AMQP_BASIC_APP_ID_FLAG;
206     props.content_type = amqp_cstring_bytes("application/json");
207     props.delivery_mode = conf->delivery_mode;
208     props.app_id = amqp_cstring_bytes("collectd");
209
210     status = amqp_basic_publish(conf->connection,
211                 /* channel = */ 1,
212                 amqp_cstring_bytes(CONF(conf, exchange)),
213                 amqp_cstring_bytes(CONF(conf, routingkey)),
214                 /* mandatory = */ 0,
215                 /* immediate = */ 0,
216                 &props,
217                 amqp_cstring_bytes(buffer));
218     if (status != 0)
219     {
220         ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
221                 status);
222         camqp_close_connection (conf);
223     }
224
225     return (status);
226 } /* }}} int amqp_write_locked */
227
228 static int amqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
229         user_data_t *user_data)
230 {
231     camqp_config_t *conf = user_data->data;
232     char buffer[4096];
233     size_t bfree;
234     size_t bfill;
235     int status;
236
237     if ((ds == NULL) || (vl == NULL) || (conf == NULL))
238         return (EINVAL);
239
240     memset (buffer, 0, sizeof (buffer));
241     bfree = sizeof (buffer);
242     bfill = 0;
243
244     format_json_initialize (buffer, &bfill, &bfree);
245     format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
246     format_json_finalize (buffer, &bfill, &bfree);
247
248     pthread_mutex_lock (&conf->lock);
249     status = amqp_write_locked (conf, buffer);
250     pthread_mutex_unlock (&conf->lock);
251
252     return (status);
253 } /* }}} int amqp_write */
254
255 static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
256         _Bool publish)
257 {
258     camqp_config_t *conf;
259     int status;
260     int i;
261
262     conf = malloc (sizeof (*conf));
263     if (conf == NULL)
264     {
265         ERROR ("amqp plugin: malloc failed.");
266         return (ENOMEM);
267     }
268
269     /* Initialize "conf" {{{ */
270     memset (conf, 0, sizeof (*conf));
271     conf->publish = publish;
272     conf->name = NULL;
273     conf->host = NULL;
274     conf->port = 5672;
275     conf->vhost = NULL;
276     conf->user = NULL;
277     conf->password = NULL;
278     conf->exchange = NULL;
279     conf->exchange_type = NULL;
280     conf->queue = NULL;
281     conf->routingkey = NULL;
282     conf->delivery_mode = CAMQP_DM_VOLATILE;
283     conf->store_rates = 0;
284     conf->connection = NULL;
285     pthread_mutex_init (&conf->lock, /* attr = */ NULL);
286     /* }}} */
287
288     status = cf_util_get_string (ci, &conf->name);
289     if (status != 0)
290     {
291         sfree (conf);
292         return (status);
293     }
294
295     for (i = 0; i < ci->children_num; i++)
296     {
297         oconfig_item_t *child = ci->children + i;
298
299         if (strcasecmp ("Host", child->key) == 0)
300             status = cf_util_get_string (ci, &conf->host);
301         else if (strcasecmp ("Port", child->key) == 0)
302         {
303             status = cf_util_get_port_number (child);
304             if (status > 0)
305             {
306                 conf->port = status;
307                 status = 0;
308             }
309         }
310         else if (strcasecmp ("VHost", child->key) == 0)
311             status = cf_util_get_string (ci, &conf->vhost);
312         else if (strcasecmp ("User", child->key) == 0)
313             status = cf_util_get_string (ci, &conf->user);
314         else if (strcasecmp ("Password", child->key) == 0)
315             status = cf_util_get_string (ci, &conf->password);
316         else if (strcasecmp ("Exchange", child->key) == 0)
317             status = cf_util_get_string (ci, &conf->exchange);
318         else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
319             status = cf_util_get_string (ci, &conf->exchange_type);
320         else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
321             status = cf_util_get_string (ci, &conf->queue);
322         else if (strcasecmp ("RoutingKey", child->key) == 0)
323             status = cf_util_get_string (ci, &conf->routingkey);
324         else if (strcasecmp ("Persistent", child->key) == 0)
325         {
326             _Bool tmp = 0;
327             status = cf_util_get_boolean (ci, &tmp);
328             if (tmp)
329                 conf->delivery_mode = CAMQP_DM_PERSISTENT;
330             else
331                 conf->delivery_mode = CAMQP_DM_VOLATILE;
332         }
333         else if (strcasecmp ("StoreRates", child->key) == 0)
334             status = cf_util_get_boolean (ci, &conf->store_rates);
335         else
336             WARNING ("amqp plugin: Ignoring unknown "
337                     "configuration option \"%s\".", child->key);
338
339         if (status != 0)
340             break;
341     } /* for (i = 0; i < ci->children_num; i++) */
342
343     if ((status == 0) && !publish && (conf->exchange == NULL))
344     {
345         if (conf->routingkey != NULL)
346             WARNING ("amqp plugin: The option \"RoutingKey\" was given "
347                     "without the \"Exchange\" option. It will be ignored.");
348
349         if (conf->exchange_type != NULL)
350             WARNING ("amqp plugin: The option \"ExchangeType\" was given "
351                     "without the \"Exchange\" option. It will be ignored.");
352     }
353
354     if (status != 0)
355     {
356         camqp_config_free (conf);
357         return (status);
358     }
359
360     if (publish)
361     {
362         char cbname[128];
363         user_data_t ud = { conf, camqp_config_free };
364
365         ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
366
367         status = plugin_register_write (cbname, amqp_write, &ud);
368         if (status != 0)
369         {
370             camqp_config_free (conf);
371             return (status);
372         }
373     }
374
375     return (0);
376 } /* }}} int camqp_config_connection */
377
378 static int camqp_config (oconfig_item_t *ci) /* {{{ */
379 {
380     int i;
381
382     for (i = 0; i < ci->children_num; i++)
383     {
384         oconfig_item_t *child = ci->children + i;
385
386         if (strcasecmp ("Publish", child->key) == 0)
387             camqp_config_connection (child, /* publish = */ 1);
388         else
389             WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
390                     child->key);
391     } /* for (ci->children_num) */
392
393     return (0);
394 } /* }}} int camqp_config */
395
396 static int shutdown (void) /* {{{ */
397 {
398     /* FIXME: Set a global shutdown variable here. */
399     return (0);
400 } /* }}} int shutdown */
401
402 void module_register (void)
403 {
404     plugin_register_complex_config ("amqp", camqp_config);
405     plugin_register_shutdown ("amqp", shutdown);
406 } /* void module_register */
407
408 /* vim: set sw=4 sts=4 et fdm=marker : */