amqp plugin: Implement the "StoreRates" option.
[collectd.git] / src / amqp.c
1 /*
2 **
3 ** collectd-amqp
4 ** Copyright (c) <2009> <sebastien.pahl@dotcloud.com>
5 **
6 ** Permission is hereby granted, free of charge, to any person
7 ** obtaining a copy of this software and associated documentation
8 ** files (the "Software"), to deal in the Software without
9 ** restriction, including without limitation the rights to use,
10 ** copy, modify, merge, publish, distribute, sublicense, and/or sell
11 ** copies of the Software, and to permit persons to whom the
12 ** Software is furnished to do so, subject to the following
13 ** conditions:
14 **
15 ** The above copyright notice and this permission notice shall be
16 ** included in all copies or substantial portions of the Software.
17 **
18 ** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19 ** EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
20 ** OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21 ** NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
22 ** HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
23 ** WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24 ** FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
25 ** OTHER DEALINGS IN THE SOFTWARE.
26 **
27 */
28
29 #include <stdint.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <strings.h>
33 #include <pthread.h>
34
35 #include "collectd.h"
36 #include "common.h"
37 #include "plugin.h"
38 #include "utils_format_json.h"
39
40 #include <amqp.h>
41 #include <amqp_framing.h>
42
43 /* Defines for the delivery mode. I have no idea why they're not defined by the
44  * library.. */
45 #define AMQP_DM_VOLATILE   1
46 #define AMQP_DM_PERSISTENT 2
47
48 static int   port       = 5672;
49 static char *host       = NULL;
50 static char *vhost      = NULL;
51 static char *user       = NULL;
52 static char *password   = NULL;
53 static char *exchange   = NULL;
54 static char *routingkey = NULL;
55 static uint8_t delivery_mode = AMQP_DM_VOLATILE;
56 static _Bool store_rates = 0;
57
58 static amqp_connection_state_t amqp_conn = NULL;
59 static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
60
61 static const char *config_keys[] =
62 {
63     "Host",
64     "Port",
65     "VHost",
66     "User",
67     "Password",
68     "Exchange",
69     "RoutingKey",
70     "Persistent",
71     "StoreRates"
72 };
73
74 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
75
76 static int config_set(char **var, const char *value)
77 {
78     sfree(*var);
79     if ((*var = strdup(value)) == NULL)
80         return (1);
81     return (0);
82 }
83
84 static int config(const char *key, const char *value)
85 {
86     if (strcasecmp(key, "host") == 0)
87         return (config_set(&host, value));
88     else if(strcasecmp(key, "port") == 0)
89     {
90         int tmp;
91
92         tmp = service_name_to_port_number (value);
93         if (tmp <= 0)
94         {
95             ERROR ("AMQP plugin: Cannot parse `%s' as a "
96                     "service name (port number).", value);
97             return (1);
98         }
99
100         port = tmp;
101         return (0);
102     }
103     else if (strcasecmp(key, "vhost") == 0)
104         return (config_set(&vhost, value));
105     else if (strcasecmp(key, "user") == 0)
106         return (config_set(&user, value));
107     else if (strcasecmp(key, "password") == 0)
108         return (config_set(&password, value));
109     else if (strcasecmp(key, "exchange") == 0)
110         return (config_set(&exchange, value));
111     else if (strcasecmp(key, "routingkey") == 0)
112         return (config_set(&routingkey, value));
113     else if (strcasecmp ("Persistent", key) == 0)
114     {
115         if (IS_TRUE (value))
116             delivery_mode = AMQP_DM_PERSISTENT;
117         else
118             delivery_mode = AMQP_DM_VOLATILE;
119         return (0);
120     }
121     else if (strcasecmp ("StoreRates", key) == 0)
122     {
123         if (IS_TRUE (value))
124             store_rates = 1;
125         else
126             store_rates = 0;
127         return (0);
128     }
129     return (-1);
130 }
131
132 static int amqp_connect (void)
133 {
134     amqp_rpc_reply_t reply;
135     int sockfd;
136     int status;
137
138     if (amqp_conn != NULL)
139         return (0);
140
141     amqp_conn = amqp_new_connection ();
142     if (amqp_conn == NULL)
143     {
144         ERROR ("amqp plugin: amqp_new_connection failed.");
145         return (ENOMEM);
146     }
147
148     sockfd = amqp_open_socket (host, port);
149     if (sockfd < 0)
150     {
151         char errbuf[1024];
152         status = (-1) * sockfd;
153         ERROR ("amqp plugin: amqp_open_socket failed: %s",
154                 sstrerror (status, errbuf, sizeof (errbuf)));
155         amqp_destroy_connection(amqp_conn);
156         amqp_conn = NULL;
157         return (status);
158     }
159
160     amqp_set_sockfd (amqp_conn, sockfd);
161
162     reply = amqp_login(amqp_conn, vhost,
163             /* channel max = */      0,
164             /* frame max = */   131072,
165             /* heartbeat = */        0,
166             /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
167     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
168     {
169         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
170                 vhost, user);
171         amqp_destroy_connection(amqp_conn);
172         close(sockfd);
173         amqp_conn = NULL;
174         return (1);
175     }
176
177     amqp_channel_open (amqp_conn, /* channel = */ 1);
178     /* FIXME: Is checking "reply.reply_type" really correct here? How does
179      * it get set? --octo */
180     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
181     {
182         ERROR ("amqp plugin: amqp_channel_open failed.");
183         amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
184         amqp_destroy_connection(amqp_conn);
185         close(sockfd);
186         amqp_conn = NULL;
187         return (1);
188     }
189
190     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
191             "on %s:%i.", vhost, host, port);
192     return (0);
193 } /* int amqp_connect */
194
195 static int amqp_write_locked (const char *buffer)
196 {
197     amqp_basic_properties_t props;
198     int status;
199
200     status = amqp_connect ();
201     if (status != 0)
202         return (status);
203
204     memset (&props, 0, sizeof (props));
205     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
206     props.content_type = amqp_cstring_bytes("application/json");
207     props.delivery_mode = delivery_mode;
208
209     status = amqp_basic_publish(amqp_conn,
210                 /* channel = */ 1,
211                 amqp_cstring_bytes(exchange),
212                 amqp_cstring_bytes(routingkey),
213                 /* mandatory = */ 0,
214                 /* immediate = */ 0,
215                 &props,
216                 amqp_cstring_bytes(buffer));
217     if (status != 0)
218     {
219         int sockfd;
220
221         ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
222                 status);
223
224         sockfd = amqp_get_sockfd (amqp_conn);
225         amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
226         amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
227         amqp_destroy_connection (amqp_conn);
228         close (sockfd);
229         amqp_conn = NULL;
230     }
231
232     return (status);
233 } /* int amqp_write_locked */
234
235 static int amqp_write (const data_set_t *ds, const value_list_t *vl,
236         __attribute__((unused)) user_data_t *user_data)
237 {
238     char buffer[4096];
239     size_t bfree;
240     size_t bfill;
241     int status;
242
243     if ((ds == NULL) || (vl == NULL))
244         return (EINVAL);
245
246     memset (buffer, 0, sizeof (buffer));
247     bfree = sizeof (buffer);
248     bfill = 0;
249
250     format_json_initialize (buffer, &bfill, &bfree);
251     format_json_value_list (buffer, &bfill, &bfree, ds, vl, store_rates);
252     format_json_finalize (buffer, &bfill, &bfree);
253
254     pthread_mutex_lock (&amqp_conn_lock);
255     status = amqp_write_locked (buffer);
256     pthread_mutex_unlock (&amqp_conn_lock);
257
258     return (status);
259 } /* int amqp_write */
260
261 static int shutdown(void)
262 {
263     pthread_mutex_lock (&amqp_conn_lock);
264     if (amqp_conn != NULL)
265     {
266         int sockfd;
267
268         sockfd = amqp_get_sockfd (amqp_conn);
269         amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
270         amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
271         amqp_destroy_connection (amqp_conn);
272         close(sockfd);
273         amqp_conn = NULL;
274     }
275     pthread_mutex_unlock (&amqp_conn_lock);
276
277     sfree(host);
278     sfree(vhost);
279     sfree(user);
280     sfree(password);
281     sfree(exchange);
282     sfree(routingkey);
283
284     return (0);
285 }
286
287 void module_register(void)
288 {
289     plugin_register_config("amqp", config, config_keys, config_keys_num);
290     plugin_register_write("amqp", amqp_write, NULL);
291     plugin_register_shutdown("amqp", shutdown);
292 }
293
294 /* vim: set sw=4 sts=4 et : */