2 * collectd - src/amqp.c
3 * Copyright (C) 2009 Sebastien Pahl
4 * Copyright (C) 2010 Florian Forster
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, including without limitation
9 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 * and/or sell copies of the Software, and to permit persons to whom the
11 * Software is furnished to do so, subject to the following conditions:
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
25 * Sebastien Pahl <sebastien.pahl at dotcloud.com>
26 * Florian Forster <octo at verplant.org>
38 #include "utils_format_json.h"
41 #include <amqp_framing.h>
43 /* Defines for the delivery mode. I have no idea why they're not defined by the
45 #define AMQP_DM_VOLATILE 1
46 #define AMQP_DM_PERSISTENT 2
51 static const char *def_host = "localhost";
52 static const char *def_vhost = "/";
53 static const char *def_user = "guest";
54 static const char *def_password = "guest";
55 static const char *def_exchange = "amq.fanout";
56 static const char *def_routingkey = "collectd";
58 static char *conf_host = NULL;
59 static char *conf_vhost = NULL;
60 static char *conf_user = NULL;
61 static char *conf_password = NULL;
62 static char *conf_exchange = NULL;
63 static char *conf_routingkey = NULL;
64 static int conf_port = 5672;
65 static uint8_t conf_delivery_mode = AMQP_DM_VOLATILE;
66 static _Bool conf_store_rates = 0;
68 #define CONF(f) ((conf_##f != NULL) ? conf_##f : def_##f)
70 static amqp_connection_state_t amqp_conn = NULL;
71 static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
73 static const char *config_keys[] =
85 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
90 static int config_set(char **var, const char *value)
93 if ((*var = strdup(value)) == NULL)
96 } /* int config_set */
98 static int config(const char *key, const char *value)
100 if (strcasecmp(key, "host") == 0)
101 return (config_set(&conf_host, value));
102 else if(strcasecmp(key, "port") == 0)
106 tmp = service_name_to_port_number (value);
109 ERROR ("AMQP plugin: Cannot parse `%s' as a "
110 "service name (port number).", value);
117 else if (strcasecmp(key, "vhost") == 0)
118 return (config_set(&conf_vhost, value));
119 else if (strcasecmp(key, "user") == 0)
120 return (config_set(&conf_user, value));
121 else if (strcasecmp(key, "password") == 0)
122 return (config_set(&conf_password, value));
123 else if (strcasecmp(key, "exchange") == 0)
124 return (config_set(&conf_exchange, value));
125 else if (strcasecmp(key, "routingkey") == 0)
126 return (config_set(&conf_routingkey, value));
127 else if (strcasecmp ("Persistent", key) == 0)
130 conf_delivery_mode = AMQP_DM_PERSISTENT;
132 conf_delivery_mode = AMQP_DM_VOLATILE;
135 else if (strcasecmp ("StoreRates", key) == 0)
138 conf_store_rates = 1;
140 conf_store_rates = 0;
146 static int amqp_connect (void)
148 amqp_rpc_reply_t reply;
152 if (amqp_conn != NULL)
155 amqp_conn = amqp_new_connection ();
156 if (amqp_conn == NULL)
158 ERROR ("amqp plugin: amqp_new_connection failed.");
162 sockfd = amqp_open_socket (CONF(host), conf_port);
166 status = (-1) * sockfd;
167 ERROR ("amqp plugin: amqp_open_socket failed: %s",
168 sstrerror (status, errbuf, sizeof (errbuf)));
169 amqp_destroy_connection(amqp_conn);
174 amqp_set_sockfd (amqp_conn, sockfd);
176 reply = amqp_login (amqp_conn, CONF(vhost),
177 /* channel max = */ 0,
178 /* frame max = */ 131072,
180 /* authentication = */ AMQP_SASL_METHOD_PLAIN,
181 CONF(user), CONF(password));
182 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
184 ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
185 CONF(vhost), CONF(user));
186 amqp_destroy_connection (amqp_conn);
192 amqp_channel_open (amqp_conn, /* channel = */ 1);
193 /* FIXME: Is checking "reply.reply_type" really correct here? How does
194 * it get set? --octo */
195 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
197 ERROR ("amqp plugin: amqp_channel_open failed.");
198 amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
199 amqp_destroy_connection(amqp_conn);
205 INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
206 "on %s:%i.", CONF(vhost), CONF(host), conf_port);
208 } /* int amqp_connect */
210 static int amqp_write_locked (const char *buffer)
212 amqp_basic_properties_t props;
215 status = amqp_connect ();
219 memset (&props, 0, sizeof (props));
220 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
221 | AMQP_BASIC_DELIVERY_MODE_FLAG
222 | AMQP_BASIC_APP_ID_FLAG;
223 props.content_type = amqp_cstring_bytes("application/json");
224 props.delivery_mode = conf_delivery_mode;
225 props.app_id = amqp_cstring_bytes("collectd");
227 status = amqp_basic_publish(amqp_conn,
229 amqp_cstring_bytes(CONF(exchange)),
230 amqp_cstring_bytes(CONF(routingkey)),
234 amqp_cstring_bytes(buffer));
239 ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
242 sockfd = amqp_get_sockfd (amqp_conn);
243 amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
244 amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
245 amqp_destroy_connection (amqp_conn);
251 } /* int amqp_write_locked */
253 static int amqp_write (const data_set_t *ds, const value_list_t *vl,
254 __attribute__((unused)) user_data_t *user_data)
261 if ((ds == NULL) || (vl == NULL))
264 memset (buffer, 0, sizeof (buffer));
265 bfree = sizeof (buffer);
268 format_json_initialize (buffer, &bfill, &bfree);
269 format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf_store_rates);
270 format_json_finalize (buffer, &bfill, &bfree);
272 pthread_mutex_lock (&amqp_conn_lock);
273 status = amqp_write_locked (buffer);
274 pthread_mutex_unlock (&amqp_conn_lock);
277 } /* int amqp_write */
279 static int shutdown (void)
281 pthread_mutex_lock (&amqp_conn_lock);
282 if (amqp_conn != NULL)
286 sockfd = amqp_get_sockfd (amqp_conn);
287 amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
288 amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
289 amqp_destroy_connection (amqp_conn);
293 pthread_mutex_unlock (&amqp_conn_lock);
298 sfree(conf_password);
299 sfree(conf_exchange);
300 sfree(conf_routingkey);
305 void module_register (void)
307 plugin_register_config ("amqp", config, config_keys, config_keys_num);
308 plugin_register_write ("amqp", amqp_write, NULL);
309 plugin_register_shutdown ("amqp", shutdown);
310 } /* void module_register */
312 /* vim: set sw=4 sts=4 et : */