amqp plugin: Chose (hopefully sane) default values for all config options.
[collectd.git] / src / amqp.c
1 /**
2  * collectd - src/amqp.c
3  * Copyright (C) 2009  Sebastien Pahl
4  * Copyright (C) 2010  Florian Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Sebastien Pahl <sebastien.pahl at dotcloud.com>
26  *   Florian Forster <octo at verplant.org>
27  **/
28
29 #include <stdint.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <strings.h>
33 #include <pthread.h>
34
35 #include "collectd.h"
36 #include "common.h"
37 #include "plugin.h"
38 #include "utils_format_json.h"
39
40 #include <amqp.h>
41 #include <amqp_framing.h>
42
43 /* Defines for the delivery mode. I have no idea why they're not defined by the
44  * library.. */
45 #define AMQP_DM_VOLATILE   1
46 #define AMQP_DM_PERSISTENT 2
47
48 /*
49  * Global variables
50  */
51 static const char *def_host       = "localhost";
52 static const char *def_vhost      = "/";
53 static const char *def_user       = "guest";
54 static const char *def_password   = "guest";
55 static const char *def_exchange   = "amq.fanout";
56 static const char *def_routingkey = "collectd";
57
58 static char *conf_host       = NULL;
59 static char *conf_vhost      = NULL;
60 static char *conf_user       = NULL;
61 static char *conf_password   = NULL;
62 static char *conf_exchange   = NULL;
63 static char *conf_routingkey = NULL;
64 static int   conf_port       = 5672;
65 static uint8_t conf_delivery_mode = AMQP_DM_VOLATILE;
66 static _Bool conf_store_rates = 0;
67
68 #define CONF(f) ((conf_##f != NULL) ? conf_##f : def_##f)
69
70 static amqp_connection_state_t amqp_conn = NULL;
71 static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
72
73 static const char *config_keys[] =
74 {
75     "Host",
76     "Port",
77     "VHost",
78     "User",
79     "Password",
80     "Exchange",
81     "RoutingKey",
82     "Persistent",
83     "StoreRates"
84 };
85 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
86
87 /*
88  * Functions
89  */
90 static int config_set(char **var, const char *value)
91 {
92     sfree(*var);
93     if ((*var = strdup(value)) == NULL)
94         return (1);
95     return (0);
96 } /* int config_set */
97
98 static int config(const char *key, const char *value)
99 {
100     if (strcasecmp(key, "host") == 0)
101         return (config_set(&conf_host, value));
102     else if(strcasecmp(key, "port") == 0)
103     {
104         int tmp;
105
106         tmp = service_name_to_port_number (value);
107         if (tmp <= 0)
108         {
109             ERROR ("AMQP plugin: Cannot parse `%s' as a "
110                     "service name (port number).", value);
111             return (1);
112         }
113
114         conf_port = tmp;
115         return (0);
116     }
117     else if (strcasecmp(key, "vhost") == 0)
118         return (config_set(&conf_vhost, value));
119     else if (strcasecmp(key, "user") == 0)
120         return (config_set(&conf_user, value));
121     else if (strcasecmp(key, "password") == 0)
122         return (config_set(&conf_password, value));
123     else if (strcasecmp(key, "exchange") == 0)
124         return (config_set(&conf_exchange, value));
125     else if (strcasecmp(key, "routingkey") == 0)
126         return (config_set(&conf_routingkey, value));
127     else if (strcasecmp ("Persistent", key) == 0)
128     {
129         if (IS_TRUE (value))
130             conf_delivery_mode = AMQP_DM_PERSISTENT;
131         else
132             conf_delivery_mode = AMQP_DM_VOLATILE;
133         return (0);
134     }
135     else if (strcasecmp ("StoreRates", key) == 0)
136     {
137         if (IS_TRUE (value))
138             conf_store_rates = 1;
139         else
140             conf_store_rates = 0;
141         return (0);
142     }
143     return (-1);
144 } /* int config */
145
146 static int amqp_connect (void)
147 {
148     amqp_rpc_reply_t reply;
149     int sockfd;
150     int status;
151
152     if (amqp_conn != NULL)
153         return (0);
154
155     amqp_conn = amqp_new_connection ();
156     if (amqp_conn == NULL)
157     {
158         ERROR ("amqp plugin: amqp_new_connection failed.");
159         return (ENOMEM);
160     }
161
162     sockfd = amqp_open_socket (CONF(host), conf_port);
163     if (sockfd < 0)
164     {
165         char errbuf[1024];
166         status = (-1) * sockfd;
167         ERROR ("amqp plugin: amqp_open_socket failed: %s",
168                 sstrerror (status, errbuf, sizeof (errbuf)));
169         amqp_destroy_connection(amqp_conn);
170         amqp_conn = NULL;
171         return (status);
172     }
173
174     amqp_set_sockfd (amqp_conn, sockfd);
175
176     reply = amqp_login (amqp_conn, CONF(vhost),
177             /* channel max = */      0,
178             /* frame max   = */ 131072,
179             /* heartbeat   = */      0,
180             /* authentication = */ AMQP_SASL_METHOD_PLAIN,
181             CONF(user), CONF(password));
182     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
183     {
184         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
185                 CONF(vhost), CONF(user));
186         amqp_destroy_connection (amqp_conn);
187         close (sockfd);
188         amqp_conn = NULL;
189         return (1);
190     }
191
192     amqp_channel_open (amqp_conn, /* channel = */ 1);
193     /* FIXME: Is checking "reply.reply_type" really correct here? How does
194      * it get set? --octo */
195     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
196     {
197         ERROR ("amqp plugin: amqp_channel_open failed.");
198         amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
199         amqp_destroy_connection(amqp_conn);
200         close(sockfd);
201         amqp_conn = NULL;
202         return (1);
203     }
204
205     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
206             "on %s:%i.", CONF(vhost), CONF(host), conf_port);
207     return (0);
208 } /* int amqp_connect */
209
210 static int amqp_write_locked (const char *buffer)
211 {
212     amqp_basic_properties_t props;
213     int status;
214
215     status = amqp_connect ();
216     if (status != 0)
217         return (status);
218
219     memset (&props, 0, sizeof (props));
220     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
221         | AMQP_BASIC_DELIVERY_MODE_FLAG
222         | AMQP_BASIC_APP_ID_FLAG;
223     props.content_type = amqp_cstring_bytes("application/json");
224     props.delivery_mode = conf_delivery_mode;
225     props.app_id = amqp_cstring_bytes("collectd");
226
227     status = amqp_basic_publish(amqp_conn,
228                 /* channel = */ 1,
229                 amqp_cstring_bytes(CONF(exchange)),
230                 amqp_cstring_bytes(CONF(routingkey)),
231                 /* mandatory = */ 0,
232                 /* immediate = */ 0,
233                 &props,
234                 amqp_cstring_bytes(buffer));
235     if (status != 0)
236     {
237         int sockfd;
238
239         ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
240                 status);
241
242         sockfd = amqp_get_sockfd (amqp_conn);
243         amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
244         amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
245         amqp_destroy_connection (amqp_conn);
246         close (sockfd);
247         amqp_conn = NULL;
248     }
249
250     return (status);
251 } /* int amqp_write_locked */
252
253 static int amqp_write (const data_set_t *ds, const value_list_t *vl,
254         __attribute__((unused)) user_data_t *user_data)
255 {
256     char buffer[4096];
257     size_t bfree;
258     size_t bfill;
259     int status;
260
261     if ((ds == NULL) || (vl == NULL))
262         return (EINVAL);
263
264     memset (buffer, 0, sizeof (buffer));
265     bfree = sizeof (buffer);
266     bfill = 0;
267
268     format_json_initialize (buffer, &bfill, &bfree);
269     format_json_value_list (buffer, &bfill, &bfree, ds, vl, conf_store_rates);
270     format_json_finalize (buffer, &bfill, &bfree);
271
272     pthread_mutex_lock (&amqp_conn_lock);
273     status = amqp_write_locked (buffer);
274     pthread_mutex_unlock (&amqp_conn_lock);
275
276     return (status);
277 } /* int amqp_write */
278
279 static int shutdown (void)
280 {
281     pthread_mutex_lock (&amqp_conn_lock);
282     if (amqp_conn != NULL)
283     {
284         int sockfd;
285
286         sockfd = amqp_get_sockfd (amqp_conn);
287         amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
288         amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
289         amqp_destroy_connection (amqp_conn);
290         close(sockfd);
291         amqp_conn = NULL;
292     }
293     pthread_mutex_unlock (&amqp_conn_lock);
294
295     sfree(conf_host);
296     sfree(conf_vhost);
297     sfree(conf_user);
298     sfree(conf_password);
299     sfree(conf_exchange);
300     sfree(conf_routingkey);
301
302     return (0);
303 } /* int shutdown */
304
305 void module_register (void)
306 {
307     plugin_register_config ("amqp", config, config_keys, config_keys_num);
308     plugin_register_write ("amqp", amqp_write, NULL);
309     plugin_register_shutdown ("amqp", shutdown);
310 } /* void module_register */
311
312 /* vim: set sw=4 sts=4 et : */