amqp plugin: Added some comments. Rewrapped lines of the license header.
[collectd.git] / src / amqp.c
1 /**
2  * collectd - src/amqp.c
3  * Copyright (C) 2009  Sebastien Pahl
4  * Copyright (C) 2010  Florian Forster
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, including without limitation
9  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10  * and/or sell copies of the Software, and to permit persons to whom the
11  * Software is furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  *
24  * Authors:
25  *   Sebastien Pahl <sebastien.pahl at dotcloud.com>
26  *   Florian Forster <octo at verplant.org>
27  **/
28
29 #include <stdint.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <strings.h>
33 #include <pthread.h>
34
35 #include "collectd.h"
36 #include "common.h"
37 #include "plugin.h"
38 #include "utils_format_json.h"
39
40 #include <amqp.h>
41 #include <amqp_framing.h>
42
43 /* Defines for the delivery mode. I have no idea why they're not defined by the
44  * library.. */
45 #define AMQP_DM_VOLATILE   1
46 #define AMQP_DM_PERSISTENT 2
47
48 /*
49  * Global variables
50  */
51 static int   port       = 5672;
52 static char *host       = NULL;
53 static char *vhost      = NULL;
54 static char *user       = NULL;
55 static char *password   = NULL;
56 static char *exchange   = NULL;
57 static char *routingkey = NULL;
58 static uint8_t delivery_mode = AMQP_DM_VOLATILE;
59 static _Bool store_rates = 0;
60
61 static amqp_connection_state_t amqp_conn = NULL;
62 static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
63
64 static const char *config_keys[] =
65 {
66     "Host",
67     "Port",
68     "VHost",
69     "User",
70     "Password",
71     "Exchange",
72     "RoutingKey",
73     "Persistent",
74     "StoreRates"
75 };
76 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
77
78 /*
79  * Functions
80  */
81 static int config_set(char **var, const char *value)
82 {
83     sfree(*var);
84     if ((*var = strdup(value)) == NULL)
85         return (1);
86     return (0);
87 } /* int config_set */
88
89 static int config(const char *key, const char *value)
90 {
91     if (strcasecmp(key, "host") == 0)
92         return (config_set(&host, value));
93     else if(strcasecmp(key, "port") == 0)
94     {
95         int tmp;
96
97         tmp = service_name_to_port_number (value);
98         if (tmp <= 0)
99         {
100             ERROR ("AMQP plugin: Cannot parse `%s' as a "
101                     "service name (port number).", value);
102             return (1);
103         }
104
105         port = tmp;
106         return (0);
107     }
108     else if (strcasecmp(key, "vhost") == 0)
109         return (config_set(&vhost, value));
110     else if (strcasecmp(key, "user") == 0)
111         return (config_set(&user, value));
112     else if (strcasecmp(key, "password") == 0)
113         return (config_set(&password, value));
114     else if (strcasecmp(key, "exchange") == 0)
115         return (config_set(&exchange, value));
116     else if (strcasecmp(key, "routingkey") == 0)
117         return (config_set(&routingkey, value));
118     else if (strcasecmp ("Persistent", key) == 0)
119     {
120         if (IS_TRUE (value))
121             delivery_mode = AMQP_DM_PERSISTENT;
122         else
123             delivery_mode = AMQP_DM_VOLATILE;
124         return (0);
125     }
126     else if (strcasecmp ("StoreRates", key) == 0)
127     {
128         if (IS_TRUE (value))
129             store_rates = 1;
130         else
131             store_rates = 0;
132         return (0);
133     }
134     return (-1);
135 } /* int config */
136
137 static int amqp_connect (void)
138 {
139     amqp_rpc_reply_t reply;
140     int sockfd;
141     int status;
142
143     if (amqp_conn != NULL)
144         return (0);
145
146     amqp_conn = amqp_new_connection ();
147     if (amqp_conn == NULL)
148     {
149         ERROR ("amqp plugin: amqp_new_connection failed.");
150         return (ENOMEM);
151     }
152
153     sockfd = amqp_open_socket (host, port);
154     if (sockfd < 0)
155     {
156         char errbuf[1024];
157         status = (-1) * sockfd;
158         ERROR ("amqp plugin: amqp_open_socket failed: %s",
159                 sstrerror (status, errbuf, sizeof (errbuf)));
160         amqp_destroy_connection(amqp_conn);
161         amqp_conn = NULL;
162         return (status);
163     }
164
165     amqp_set_sockfd (amqp_conn, sockfd);
166
167     reply = amqp_login(amqp_conn, vhost,
168             /* channel max = */      0,
169             /* frame max = */   131072,
170             /* heartbeat = */        0,
171             /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
172     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
173     {
174         ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
175                 vhost, user);
176         amqp_destroy_connection(amqp_conn);
177         close(sockfd);
178         amqp_conn = NULL;
179         return (1);
180     }
181
182     amqp_channel_open (amqp_conn, /* channel = */ 1);
183     /* FIXME: Is checking "reply.reply_type" really correct here? How does
184      * it get set? --octo */
185     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
186     {
187         ERROR ("amqp plugin: amqp_channel_open failed.");
188         amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
189         amqp_destroy_connection(amqp_conn);
190         close(sockfd);
191         amqp_conn = NULL;
192         return (1);
193     }
194
195     INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
196             "on %s:%i.", vhost, host, port);
197     return (0);
198 } /* int amqp_connect */
199
200 static int amqp_write_locked (const char *buffer)
201 {
202     amqp_basic_properties_t props;
203     int status;
204
205     status = amqp_connect ();
206     if (status != 0)
207         return (status);
208
209     memset (&props, 0, sizeof (props));
210     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
211     props.content_type = amqp_cstring_bytes("application/json");
212     props.delivery_mode = delivery_mode;
213
214     status = amqp_basic_publish(amqp_conn,
215                 /* channel = */ 1,
216                 amqp_cstring_bytes(exchange),
217                 amqp_cstring_bytes(routingkey),
218                 /* mandatory = */ 0,
219                 /* immediate = */ 0,
220                 &props,
221                 amqp_cstring_bytes(buffer));
222     if (status != 0)
223     {
224         int sockfd;
225
226         ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
227                 status);
228
229         sockfd = amqp_get_sockfd (amqp_conn);
230         amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
231         amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
232         amqp_destroy_connection (amqp_conn);
233         close (sockfd);
234         amqp_conn = NULL;
235     }
236
237     return (status);
238 } /* int amqp_write_locked */
239
240 static int amqp_write (const data_set_t *ds, const value_list_t *vl,
241         __attribute__((unused)) user_data_t *user_data)
242 {
243     char buffer[4096];
244     size_t bfree;
245     size_t bfill;
246     int status;
247
248     if ((ds == NULL) || (vl == NULL))
249         return (EINVAL);
250
251     memset (buffer, 0, sizeof (buffer));
252     bfree = sizeof (buffer);
253     bfill = 0;
254
255     format_json_initialize (buffer, &bfill, &bfree);
256     format_json_value_list (buffer, &bfill, &bfree, ds, vl, store_rates);
257     format_json_finalize (buffer, &bfill, &bfree);
258
259     pthread_mutex_lock (&amqp_conn_lock);
260     status = amqp_write_locked (buffer);
261     pthread_mutex_unlock (&amqp_conn_lock);
262
263     return (status);
264 } /* int amqp_write */
265
266 static int shutdown (void)
267 {
268     pthread_mutex_lock (&amqp_conn_lock);
269     if (amqp_conn != NULL)
270     {
271         int sockfd;
272
273         sockfd = amqp_get_sockfd (amqp_conn);
274         amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
275         amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
276         amqp_destroy_connection (amqp_conn);
277         close(sockfd);
278         amqp_conn = NULL;
279     }
280     pthread_mutex_unlock (&amqp_conn_lock);
281
282     sfree(host);
283     sfree(vhost);
284     sfree(user);
285     sfree(password);
286     sfree(exchange);
287     sfree(routingkey);
288
289     return (0);
290 } /* int shutdown */
291
292 void module_register (void)
293 {
294     plugin_register_config ("amqp", config, config_keys, config_keys_num);
295     plugin_register_write ("amqp", amqp_write, NULL);
296     plugin_register_shutdown ("amqp", shutdown);
297 } /* void module_register */
298
299 /* vim: set sw=4 sts=4 et : */