amqp plugin: Use the global (and widely used) "sfree" macro …
[collectd.git] / src / amqp.c
1 /*
2 **
3 ** collectd-amqp
4 ** Copyright (c) <2009> <sebastien.pahl@dotcloud.com>
5 **
6 ** Permission is hereby granted, free of charge, to any person
7 ** obtaining a copy of this software and associated documentation
8 ** files (the "Software"), to deal in the Software without
9 ** restriction, including without limitation the rights to use,
10 ** copy, modify, merge, publish, distribute, sublicense, and/or sell
11 ** copies of the Software, and to permit persons to whom the
12 ** Software is furnished to do so, subject to the following
13 ** conditions:
14 **
15 ** The above copyright notice and this permission notice shall be
16 ** included in all copies or substantial portions of the Software.
17 **
18 ** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19 ** EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
20 ** OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21 ** NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
22 ** HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
23 ** WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24 ** FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
25 ** OTHER DEALINGS IN THE SOFTWARE.
26 **
27 */
28
29 #include <stdint.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <strings.h>
33
34 #include <collectd.h>
35 #include <common.h>
36 #include <plugin.h>
37 #include <utils_format_json.h>
38
39 #include <amqp.h>
40 #include <amqp_framing.h>
41
42 #define PLUGIN_NAME "amqp"
43
44 static int  port;
45 static char *host       = NULL;
46 static char *vhost      = NULL;
47 static char *user       = NULL;
48 static char *password   = NULL;
49 static char *exchange   = NULL;
50 static char *routingkey = NULL;
51
52 static const char *config_keys[] =
53 {
54     "Host",
55     "Port",
56     "VHost",
57     "User",
58     "Password",
59     "Exchange",
60     "RoutingKey"
61 };
62
63 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
64
65 static int config_set(char **var, const char *value)
66 {
67     sfree(*var);
68     if ((*var = strdup(value)) == NULL)
69         return (1);
70     return (0);
71 }
72
73 static int config(const char *key, const char *value)
74 {
75     if (strcasecmp(key, "host") == 0)
76         return (config_set(&host, value));
77     else if(strcasecmp(key, "port") == 0)
78     {
79         int tmp;
80
81         tmp = service_name_to_port_number (value);
82         if (tmp <= 0)
83         {
84             ERROR ("AMQP plugin: Cannot parse `%s' as a "
85                     "service name (port number).", value);
86             return (1);
87         }
88
89         port = tmp;
90         return (0);
91     }
92     else if (strcasecmp(key, "vhost") == 0)
93         return (config_set(&vhost, value));
94     else if (strcasecmp(key, "user") == 0)
95         return (config_set(&user, value));
96     else if (strcasecmp(key, "password") == 0)
97         return (config_set(&password, value));
98     else if (strcasecmp(key, "exchange") == 0)
99         return (config_set(&exchange, value));
100     else if (strcasecmp(key, "routingkey") == 0)
101         return (config_set(&routingkey, value));
102     return (-1);
103 }
104
105 static int amqp_write(const data_set_t *ds, const value_list_t *vl, user_data_t *user_data)
106 {
107     int error;
108     int sockfd;
109     size_t bfree;
110     size_t bfill;
111     char buffer[4096];
112     amqp_rpc_reply_t reply;
113     amqp_connection_state_t conn;
114     amqp_basic_properties_t props;
115
116     conn = amqp_new_connection();
117     if ((sockfd = amqp_open_socket(host, port)) < 0)
118     {
119         amqp_destroy_connection(conn);
120         return (1);
121     }
122     amqp_set_sockfd(conn, sockfd);
123     reply = amqp_login(conn, vhost,
124             /* channel max = */      0,
125             /* frame max = */   131072,
126             /* heartbeat = */        0,
127             /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
128     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
129     {
130         amqp_destroy_connection(conn);
131         close(sockfd);
132         return (1);
133     }
134     amqp_channel_open(conn, 1);
135     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
136     {
137         amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
138         amqp_destroy_connection(conn);
139         close(sockfd);
140         return (1);
141     }
142     error = 0;
143     memset(buffer, 0, sizeof(buffer));
144     bfree = sizeof(buffer);
145     bfill = 0;
146     format_json_initialize(buffer, &bfill, &bfree);
147     format_json_value_list(buffer, &bfill, &bfree, ds, vl);
148     format_json_finalize(buffer, &bfill, &bfree);
149     props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
150     props.content_type = amqp_cstring_bytes("application/json");
151     props.delivery_mode = 2; /* persistent delivery mode */
152     error = amqp_basic_publish(conn,
153                 /* channel = */ 1,
154                 amqp_cstring_bytes(exchange),
155                 amqp_cstring_bytes(routingkey),
156                 /* mandatory = */ 0,
157                 /* immediate = */ 0,
158                 &props,
159                 amqp_cstring_bytes(buffer));
160     reply = amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
161     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
162         error = 1;
163     reply = amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
164     if (reply.reply_type != AMQP_RESPONSE_NORMAL)
165         error = 1;
166     amqp_destroy_connection(conn);
167     if (close(sockfd) < 0)
168         error = 1;
169     return (error);
170 }
171
172 static int shutdown(void)
173 {
174     sfree(host);
175     sfree(vhost);
176     sfree(user);
177     sfree(password);
178     sfree(exchange);
179     sfree(routingkey);
180     return (0);
181 }
182
183 void module_register(void)
184 {
185     plugin_register_config(PLUGIN_NAME, config, config_keys, config_keys_num);
186     plugin_register_write(PLUGIN_NAME, amqp_write, NULL);
187     plugin_register_shutdown(PLUGIN_NAME, shutdown);
188 }
189
190 /* vim: set sw=4 sts=4 et : */