amqp plugin: Implement persistent connection handling.
[collectd.git] / src / amqp.c
1 /*
2 **
3 ** collectd-amqp
4 ** Copyright (c) <2009> <sebastien.pahl@dotcloud.com>
5 **
6 ** Permission is hereby granted, free of charge, to any person
7 ** obtaining a copy of this software and associated documentation
8 ** files (the "Software"), to deal in the Software without
9 ** restriction, including without limitation the rights to use,
10 ** copy, modify, merge, publish, distribute, sublicense, and/or sell
11 ** copies of the Software, and to permit persons to whom the
12 ** Software is furnished to do so, subject to the following
13 ** conditions:
14 **
15 ** The above copyright notice and this permission notice shall be
16 ** included in all copies or substantial portions of the Software.
17 **
18 ** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19 ** EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
20 ** OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21 ** NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
22 ** HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
23 ** WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24 ** FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
25 ** OTHER DEALINGS IN THE SOFTWARE.
26 **
27 */
28
29 #include <stdint.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <strings.h>
33 #include <pthread.h>
34
35 #include "collectd.h"
36 #include "common.h"
37 #include "plugin.h"
38 #include "utils_format_json.h"
39
40 #include <amqp.h>
41 #include <amqp_framing.h>
42
43 static int  port;
44 static char *host       = NULL;
45 static char *vhost      = NULL;
46 static char *user       = NULL;
47 static char *password   = NULL;
48 static char *exchange   = NULL;
49 static char *routingkey = NULL;
50
51 static amqp_connection_state_t amqp_conn = NULL;
52 static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
53
54 static const char *config_keys[] =
55 {
56     "Host",
57     "Port",
58     "VHost",
59     "User",
60     "Password",
61     "Exchange",
62     "RoutingKey"
63 };
64
65 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
66
67 static int config_set(char **var, const char *value)
68 {
69     sfree(*var);
70     if ((*var = strdup(value)) == NULL)
71         return (1);
72     return (0);
73 }
74
75 static int config(const char *key, const char *value)
76 {
77     if (strcasecmp(key, "host") == 0)
78         return (config_set(&host, value));
79     else if(strcasecmp(key, "port") == 0)
80     {
81         int tmp;
82
83         tmp = service_name_to_port_number (value);
84         if (tmp <= 0)
85         {
86             ERROR ("AMQP plugin: Cannot parse `%s' as a "
87                     "service name (port number).", value);
88             return (1);
89         }
90
91         port = tmp;
92         return (0);
93     }
94     else if (strcasecmp(key, "vhost") == 0)
95         return (config_set(&vhost, value));
96     else if (strcasecmp(key, "user") == 0)
97         return (config_set(&user, value));
98     else if (strcasecmp(key, "password") == 0)
99         return (config_set(&password, value));
100     else if (strcasecmp(key, "exchange") == 0)
101         return (config_set(&exchange, value));
102     else if (strcasecmp(key, "routingkey") == 0)
103         return (config_set(&routingkey, value));
104     return (-1);
105 }
106
107 static int amqp_write_locked (const char *buffer)
108 {
109     amqp_rpc_reply_t reply;
110     amqp_basic_properties_t props;
111     int status;
112
113     if (amqp_conn == NULL)
114     {
115         int sockfd;
116
117         amqp_conn = amqp_new_connection ();
118         if (amqp_conn == NULL)
119         {
120             ERROR ("amqp plugin: amqp_new_connection failed.");
121             return (ENOMEM);
122         }
123
124         sockfd = amqp_open_socket (host, port);
125         if (sockfd < 0)
126         {
127             char errbuf[1024];
128             status = (-1) * sockfd;
129             ERROR ("amqp plugin: amqp_open_socket failed: %s",
130                     sstrerror (status, errbuf, sizeof (errbuf)));
131             amqp_destroy_connection(amqp_conn);
132             amqp_conn = NULL;
133             return (status);
134         }
135
136         amqp_set_sockfd (amqp_conn, sockfd);
137
138         reply = amqp_login(amqp_conn, vhost,
139                 /* channel max = */      0,
140                 /* frame max = */   131072,
141                 /* heartbeat = */        0,
142                 /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
143         if (reply.reply_type != AMQP_RESPONSE_NORMAL)
144         {
145             ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
146                     vhost, user);
147             amqp_destroy_connection(amqp_conn);
148             close(sockfd);
149             amqp_conn = NULL;
150             return (1);
151         }
152
153         amqp_channel_open (amqp_conn, /* channel = */ 1);
154         /* FIXME: Is checking "reply.reply_type" really correct here? How does
155          * it get set? --octo */
156         if (reply.reply_type != AMQP_RESPONSE_NORMAL)
157         {
158             ERROR ("amqp plugin: amqp_channel_open failed.");
159             amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
160             amqp_destroy_connection(amqp_conn);
161             close(sockfd);
162             amqp_conn = NULL;
163             return (1);
164         }
165
166         INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
167                 "on %s:%i.", vhost, host, port);
168     } /* if (amqp_conn == NULL) */
169
170     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
171     props.content_type = amqp_cstring_bytes("application/json");
172     props.delivery_mode = 2; /* persistent delivery mode */
173
174     status = amqp_basic_publish(amqp_conn,
175                 /* channel = */ 1,
176                 amqp_cstring_bytes(exchange),
177                 amqp_cstring_bytes(routingkey),
178                 /* mandatory = */ 0,
179                 /* immediate = */ 0,
180                 &props,
181                 amqp_cstring_bytes(buffer));
182     if (status != 0)
183     {
184         int sockfd;
185
186         ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
187                 status);
188
189         sockfd = amqp_get_sockfd (amqp_conn);
190         amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
191         amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
192         amqp_destroy_connection (amqp_conn);
193         close(sockfd);
194         amqp_conn = NULL;
195     }
196
197     return (status);
198 } /* int amqp_write_locked */
199
200 static int amqp_write (const data_set_t *ds, const value_list_t *vl,
201         __attribute__((unused)) user_data_t *user_data)
202 {
203     char buffer[4096];
204     size_t bfree;
205     size_t bfill;
206     int status;
207
208     if ((ds == NULL) || (vl == NULL))
209         return (EINVAL);
210
211     memset (buffer, 0, sizeof (buffer));
212     bfree = sizeof (buffer);
213     bfill = 0;
214
215     format_json_initialize(buffer, &bfill, &bfree);
216     /* TODO: Possibly add a config option "StoreRates" and pass the value along here. */
217     format_json_value_list(buffer, &bfill, &bfree, ds, vl, /* rates = */ 0);
218     format_json_finalize(buffer, &bfill, &bfree);
219
220     pthread_mutex_lock (&amqp_conn_lock);
221     status = amqp_write_locked (buffer);
222     pthread_mutex_unlock (&amqp_conn_lock);
223
224     return (status);
225 } /* int amqp_write */
226
227 static int shutdown(void)
228 {
229     pthread_mutex_lock (&amqp_conn_lock);
230     if (amqp_conn != NULL)
231     {
232         int sockfd;
233
234         sockfd = amqp_get_sockfd (amqp_conn);
235         amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
236         amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
237         amqp_destroy_connection (amqp_conn);
238         close(sockfd);
239         amqp_conn = NULL;
240     }
241     pthread_mutex_unlock (&amqp_conn_lock);
242
243     sfree(host);
244     sfree(vhost);
245     sfree(user);
246     sfree(password);
247     sfree(exchange);
248     sfree(routingkey);
249
250     return (0);
251 }
252
253 void module_register(void)
254 {
255     plugin_register_config("amqp", config, config_keys, config_keys_num);
256     plugin_register_write("amqp", amqp_write, NULL);
257     plugin_register_shutdown("amqp", shutdown);
258 }
259
260 /* vim: set sw=4 sts=4 et : */