2 * collectd - src/amqp.c
3 * Copyright (C) 2009 Sebastien Pahl
4 * Copyright (C) 2010 Florian Forster
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, including without limitation
9 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 * and/or sell copies of the Software, and to permit persons to whom the
11 * Software is furnished to do so, subject to the following conditions:
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
25 * Sebastien Pahl <sebastien.pahl at dotcloud.com>
26 * Florian Forster <octo at verplant.org>
38 #include "utils_format_json.h"
41 #include <amqp_framing.h>
43 /* Defines for the delivery mode. I have no idea why they're not defined by the
45 #define CAMQP_DM_VOLATILE 1
46 #define CAMQP_DM_PERSISTENT 2
48 #define CAMQP_CHANNEL 1
68 uint8_t delivery_mode;
72 amqp_connection_state_t connection;
75 typedef struct camqp_config_s camqp_config_t;
80 static const char *def_host = "localhost";
81 static const char *def_vhost = "/";
82 static const char *def_user = "guest";
83 static const char *def_password = "guest";
84 static const char *def_exchange = "amq.fanout";
85 static const char *def_routingkey = "collectd";
87 #define CONF(c,f) (((c)->f != NULL) ? (c)->f : def_##f)
92 static void camqp_close_connection (camqp_config_t *conf) /* {{{ */
96 if ((conf == NULL) || (conf->connection == NULL))
99 sockfd = amqp_get_sockfd (conf->connection);
100 amqp_channel_close (conf->connection, CAMQP_CHANNEL, AMQP_REPLY_SUCCESS);
101 amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
102 amqp_destroy_connection (conf->connection);
104 conf->connection = NULL;
105 } /* }}} void camqp_close_connection */
107 static void camqp_config_free (void *ptr) /* {{{ */
109 camqp_config_t *conf = ptr;
114 camqp_close_connection (conf);
120 sfree (conf->password);
121 sfree (conf->exchange);
122 sfree (conf->exchange_type);
124 sfree (conf->routingkey);
127 } /* }}} void camqp_config_free */
129 static int amqp_connect (camqp_config_t *conf) /* {{{ */
131 amqp_rpc_reply_t reply;
135 if (conf->connection != NULL)
138 conf->connection = amqp_new_connection ();
139 if (conf->connection == NULL)
141 ERROR ("amqp plugin: amqp_new_connection failed.");
145 sockfd = amqp_open_socket (CONF(conf, host), conf->port);
149 status = (-1) * sockfd;
150 ERROR ("amqp plugin: amqp_open_socket failed: %s",
151 sstrerror (status, errbuf, sizeof (errbuf)));
152 amqp_destroy_connection (conf->connection);
153 conf->connection = NULL;
156 amqp_set_sockfd (conf->connection, sockfd);
158 reply = amqp_login (conf->connection, CONF(conf, vhost),
159 /* channel max = */ 0,
160 /* frame max = */ 131072,
162 /* authentication = */ AMQP_SASL_METHOD_PLAIN,
163 CONF(conf, user), CONF(conf, password));
164 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
166 ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
167 CONF(conf, vhost), CONF(conf, user));
168 amqp_destroy_connection (conf->connection);
170 conf->connection = NULL;
174 amqp_channel_open (conf->connection, /* channel = */ 1);
175 /* FIXME: Is checking "reply.reply_type" really correct here? How does
176 * it get set? --octo */
177 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
179 ERROR ("amqp plugin: amqp_channel_open failed.");
180 amqp_connection_close (conf->connection, AMQP_REPLY_SUCCESS);
181 amqp_destroy_connection (conf->connection);
183 conf->connection = NULL;
187 INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
188 "on %s:%i.", CONF(conf, vhost), CONF(conf, host), conf->port);
190 } /* }}} int amqp_connect */
192 static int amqp_write_locked (camqp_config_t *conf, /* {{{ */
195 amqp_basic_properties_t props;
198 status = amqp_connect (conf);
202 memset (&props, 0, sizeof (props));
203 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
204 | AMQP_BASIC_DELIVERY_MODE_FLAG
205 | AMQP_BASIC_APP_ID_FLAG;
206 props.content_type = amqp_cstring_bytes("application/json");
207 props.delivery_mode = conf->delivery_mode;
208 props.app_id = amqp_cstring_bytes("collectd");
210 status = amqp_basic_publish(conf->connection,
212 amqp_cstring_bytes(CONF(conf, exchange)),
213 amqp_cstring_bytes(CONF(conf, routingkey)),
217 amqp_cstring_bytes(buffer));
220 ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
222 camqp_close_connection (conf);
226 } /* }}} int amqp_write_locked */
228 static int amqp_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
229 user_data_t *user_data)
231 camqp_config_t *conf = user_data->data;
237 if ((ds == NULL) || (vl == NULL) || (conf == NULL))
240 memset (buffer, 0, sizeof (buffer));
241 bfree = sizeof (buffer);
244 format_json_initialize (buffer, &bfill, &bfree);
245 format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf->store_rates);
246 format_json_finalize (buffer, &bfill, &bfree);
248 pthread_mutex_lock (&conf->lock);
249 status = amqp_write_locked (conf, buffer);
250 pthread_mutex_unlock (&conf->lock);
253 } /* }}} int amqp_write */
255 static int camqp_config_connection (oconfig_item_t *ci, /* {{{ */
258 camqp_config_t *conf;
262 conf = malloc (sizeof (*conf));
265 ERROR ("amqp plugin: malloc failed.");
269 /* Initialize "conf" {{{ */
270 memset (conf, 0, sizeof (*conf));
271 conf->publish = publish;
277 conf->password = NULL;
278 conf->exchange = NULL;
279 conf->exchange_type = NULL;
281 conf->routingkey = NULL;
282 conf->delivery_mode = CAMQP_DM_VOLATILE;
283 conf->store_rates = 0;
284 conf->connection = NULL;
285 pthread_mutex_init (&conf->lock, /* attr = */ NULL);
288 status = cf_util_get_string (ci, &conf->name);
295 for (i = 0; i < ci->children_num; i++)
297 oconfig_item_t *child = ci->children + i;
299 if (strcasecmp ("Host", child->key) == 0)
300 status = cf_util_get_string (ci, &conf->host);
301 else if (strcasecmp ("Port", child->key) == 0)
303 status = cf_util_get_port_number (child);
310 else if (strcasecmp ("VHost", child->key) == 0)
311 status = cf_util_get_string (ci, &conf->vhost);
312 else if (strcasecmp ("User", child->key) == 0)
313 status = cf_util_get_string (ci, &conf->user);
314 else if (strcasecmp ("Password", child->key) == 0)
315 status = cf_util_get_string (ci, &conf->password);
316 else if (strcasecmp ("Exchange", child->key) == 0)
317 status = cf_util_get_string (ci, &conf->exchange);
318 else if ((strcasecmp ("ExchangeType", child->key) == 0) && !publish)
319 status = cf_util_get_string (ci, &conf->exchange_type);
320 else if ((strcasecmp ("Queue", child->key) == 0) && !publish)
321 status = cf_util_get_string (ci, &conf->queue);
322 else if (strcasecmp ("RoutingKey", child->key) == 0)
323 status = cf_util_get_string (ci, &conf->routingkey);
324 else if (strcasecmp ("Persistent", child->key) == 0)
327 status = cf_util_get_boolean (ci, &tmp);
329 conf->delivery_mode = CAMQP_DM_PERSISTENT;
331 conf->delivery_mode = CAMQP_DM_VOLATILE;
333 else if (strcasecmp ("StoreRates", child->key) == 0)
334 status = cf_util_get_boolean (ci, &conf->store_rates);
336 WARNING ("amqp plugin: Ignoring unknown "
337 "configuration option \"%s\".", child->key);
341 } /* for (i = 0; i < ci->children_num; i++) */
343 if ((status == 0) && !publish && (conf->exchange == NULL))
345 if (conf->routingkey != NULL)
346 WARNING ("amqp plugin: The option \"RoutingKey\" was given "
347 "without the \"Exchange\" option. It will be ignored.");
349 if (conf->exchange_type != NULL)
350 WARNING ("amqp plugin: The option \"ExchangeType\" was given "
351 "without the \"Exchange\" option. It will be ignored.");
356 camqp_config_free (conf);
363 user_data_t ud = { conf, camqp_config_free };
365 ssnprintf (cbname, sizeof (cbname), "amqp/%s", conf->name);
367 status = plugin_register_write (cbname, amqp_write, &ud);
370 camqp_config_free (conf);
376 } /* }}} int camqp_config_connection */
378 static int camqp_config (oconfig_item_t *ci) /* {{{ */
382 for (i = 0; i < ci->children_num; i++)
384 oconfig_item_t *child = ci->children + i;
386 if (strcasecmp ("Publish", child->key) == 0)
387 camqp_config_connection (child, /* publish = */ 1);
389 WARNING ("amqp plugin: Ignoring unknown config option \"%s\".",
391 } /* for (ci->children_num) */
394 } /* }}} int camqp_config */
396 static int shutdown (void) /* {{{ */
398 /* FIXME: Set a global shutdown variable here. */
400 } /* }}} int shutdown */
402 void module_register (void)
404 plugin_register_complex_config ("amqp", camqp_config);
405 plugin_register_shutdown ("amqp", shutdown);
406 } /* void module_register */
408 /* vim: set sw=4 sts=4 et fdm=marker : */