write_http plugin: Implement support for multiple destinations.
[collectd.git] / src / write_http.c
1 /**
2  * collectd - src/write_http.c
3  * Copyright (C) 2009       Paul Sadauskas
4  * Copyright (C) 2009       Doug MacEachern
5  * Copyright (C) 2007-2009  Florian octo Forster
6  *
7  * This program is free software; you can redistribute it and/or modify it
8  * under the terms of the GNU General Public License as published by the
9  * Free Software Foundation; only version 2 of the License is applicable.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19  *
20  * Authors:
21  *   Florian octo Forster <octo at verplant.org>
22  *   Doug MacEachern <dougm@hyperic.com>
23  *   Paul Sadauskas <psadauskas@gmail.com>
24  **/
25
26 #include "collectd.h"
27 #include "plugin.h"
28 #include "common.h"
29 #include "utils_cache.h"
30 #include "utils_parse_option.h"
31
32 #if HAVE_PTHREAD_H
33 # include <pthread.h>
34 #endif
35
36 #include <curl/curl.h>
37
38 /*
39  * Private variables
40  */
41 struct wh_callback_s
42 {
43         char *location;
44
45         char *user;
46         char *pass;
47         char *credentials;
48
49         CURL *curl;
50         char curl_errbuf[CURL_ERROR_SIZE];
51
52         char   send_buffer[4096];
53         size_t send_buffer_free;
54         size_t send_buffer_fill;
55         time_t send_buffer_init_time;
56
57         pthread_mutex_t send_lock;
58 };
59 typedef struct wh_callback_s wh_callback_t;
60
61 static void wh_reset_buffer (wh_callback_t *cb)  /* {{{ */
62 {
63         memset (cb->send_buffer, 0, sizeof (cb->send_buffer));
64         cb->send_buffer_free = sizeof (cb->send_buffer);
65         cb->send_buffer_fill = 0;
66         cb->send_buffer_init_time = time (NULL);
67 } /* }}} wh_reset_buffer */
68
69 static int wh_send_buffer (wh_callback_t *cb) /* {{{ */
70 {
71         int status = 0;
72
73         curl_easy_setopt (cb->curl, CURLOPT_POSTFIELDS, cb->send_buffer);
74         status = curl_easy_perform (cb->curl);
75         if (status != 0)
76         {
77                 ERROR ("write_http plugin: curl_easy_perform failed with "
78                                 "staus %i: %s",
79                                 status, cb->curl_errbuf);
80         }
81         return (status);
82 } /* }}} wh_send_buffer */
83
84 static int wh_callback_init (wh_callback_t *cb) /* {{{ */
85 {
86         struct curl_slist *headers;
87
88         if (cb->curl != NULL)
89                 return (0);
90
91         cb->curl = curl_easy_init ();
92         if (cb->curl == NULL)
93         {
94                 ERROR ("curl plugin: curl_easy_init failed.");
95                 return (-1);
96         }
97
98         curl_easy_setopt (cb->curl, CURLOPT_USERAGENT, PACKAGE_NAME"/"PACKAGE_VERSION);
99
100         headers = NULL;
101         headers = curl_slist_append (headers, "Accept:  */*");
102         headers = curl_slist_append (headers, "Content-Type: text/plain");
103         curl_easy_setopt (cb->curl, CURLOPT_HTTPHEADER, headers);
104
105         curl_easy_setopt (cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf);
106         curl_easy_setopt (cb->curl, CURLOPT_URL, cb->location);
107
108         if (cb->user != NULL)
109         {
110                 size_t credentials_size;
111
112                 credentials_size = strlen (cb->user) + 2;
113                 if (cb->pass != NULL)
114                         credentials_size += strlen (cb->pass);
115
116                 cb->credentials = (char *) malloc (credentials_size);
117                 if (cb->credentials == NULL)
118                 {
119                         ERROR ("curl plugin: malloc failed.");
120                         return (-1);
121                 }
122
123                 ssnprintf (cb->credentials, credentials_size, "%s:%s",
124                                 cb->user, (cb->pass == NULL) ? "" : cb->pass);
125                 curl_easy_setopt (cb->curl, CURLOPT_USERPWD, cb->credentials);
126                 curl_easy_setopt (cb->curl, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST);
127         }
128
129         wh_reset_buffer (cb);
130
131         return (0);
132 } /* }}} int wh_callback_init */
133
134 static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */
135 {
136         int status;
137
138         DEBUG ("write_http plugin: wh_flush_nolock: timeout = %i; "
139                         "send_buffer_fill = %zu;",
140                         timeout, cb->send_buffer_fill);
141
142         if (timeout > 0)
143         {
144                 time_t now;
145
146                 now = time (NULL);
147                 if ((cb->send_buffer_init_time + timeout) > now)
148                         return (0);
149         }
150
151         if (cb->send_buffer_fill <= 0)
152         {
153                 cb->send_buffer_init_time = time (NULL);
154                 return (0);
155         }
156
157         status = wh_send_buffer (cb);
158         wh_reset_buffer (cb);
159
160         return (status);
161 } /* }}} wh_flush_nolock */
162
163 static int wh_flush (int timeout, /* {{{ */
164                 const char *identifier __attribute__((unused)),
165                 user_data_t *user_data)
166 {
167         wh_callback_t *cb;
168         int status;
169
170         if (user_data == NULL)
171                 return (-EINVAL);
172
173         cb = user_data->data;
174
175         pthread_mutex_lock (&cb->send_lock);
176
177         if (cb->curl == NULL)
178         {
179                 status = wh_callback_init (cb);
180                 if (status != 0)
181                 {
182                         ERROR ("write_http plugin: wh_callback_init failed.");
183                         pthread_mutex_unlock (&cb->send_lock);
184                         return (-1);
185                 }
186         }
187
188         status = wh_flush_nolock (timeout, cb);
189         pthread_mutex_unlock (&cb->send_lock);
190
191         return (status);
192 } /* }}} int wh_flush */
193
194 static void wh_callback_free (void *data) /* {{{ */
195 {
196         wh_callback_t *cb;
197
198         if (data == NULL)
199                 return;
200
201         cb = data;
202
203         wh_flush_nolock (/* timeout = */ -1, cb);
204
205         curl_easy_cleanup (cb->curl);
206         sfree (cb->location);
207         sfree (cb->user);
208         sfree (cb->pass);
209         sfree (cb->credentials);
210
211         sfree (cb);
212 } /* }}} void wh_callback_free */
213
214 static int wh_value_list_to_string (char *buffer, /* {{{ */
215                 size_t buffer_size,
216                 const data_set_t *ds, const value_list_t *vl)
217 {
218         size_t offset = 0;
219         int status;
220         int i;
221
222         assert (0 == strcmp (ds->type, vl->type));
223
224         memset (buffer, 0, buffer_size);
225
226 #define BUFFER_ADD(...) do { \
227         status = ssnprintf (buffer + offset, buffer_size - offset, \
228                         __VA_ARGS__); \
229         if (status < 1) \
230                 return (-1); \
231         else if (((size_t) status) >= (buffer_size - offset)) \
232                 return (-1); \
233         else \
234                 offset += ((size_t) status); \
235 } while (0)
236
237         BUFFER_ADD ("%lu", (unsigned long) vl->time);
238
239         for (i = 0; i < ds->ds_num; i++)
240 {
241         if (ds->ds[i].type == DS_TYPE_GAUGE)
242                 BUFFER_ADD (":%f", vl->values[i].gauge);
243         else if (ds->ds[i].type == DS_TYPE_COUNTER)
244                 BUFFER_ADD (":%llu", vl->values[i].counter);
245         else if (ds->ds[i].type == DS_TYPE_DERIVE)
246                 BUFFER_ADD (":%"PRIi64, vl->values[i].derive);
247         else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
248                 BUFFER_ADD (":%"PRIu64, vl->values[i].absolute);
249         else
250         {
251                 ERROR ("write_http plugin: Unknown data source type: %i",
252                                 ds->ds[i].type);
253                 return (-1);
254         }
255 } /* for ds->ds_num */
256
257 #undef BUFFER_ADD
258
259 return (0);
260 } /* }}} int wh_value_list_to_string */
261
262 static int config_set_string (char **ret_string, /* {{{ */
263                 oconfig_item_t *ci)
264 {
265         char *string;
266
267         if ((ci->values_num != 1)
268                         || (ci->values[0].type != OCONFIG_TYPE_STRING))
269         {
270                 WARNING ("write_http plugin: The `%s' config option "
271                                 "needs exactly one string argument.", ci->key);
272                 return (-1);
273         }
274
275         string = strdup (ci->values[0].value.string);
276         if (string == NULL)
277         {
278                 ERROR ("write_http plugin: strdup failed.");
279                 return (-1);
280         }
281
282         if (*ret_string != NULL)
283                 free (*ret_string);
284         *ret_string = string;
285
286         return (0);
287 } /* }}} int config_set_string */
288
289 static int wh_write_command (const data_set_t *ds, const value_list_t *vl, /* {{{ */
290                 wh_callback_t *cb)
291 {
292         char key[10*DATA_MAX_NAME_LEN];
293         char values[512];
294         char command[1024];
295         size_t command_len;
296
297         int status;
298
299         if (0 != strcmp (ds->type, vl->type)) {
300                 ERROR ("write_http plugin: DS type does not match "
301                                 "value list type");
302                 return -1;
303         }
304
305         /* Copy the identifier to `key' and escape it. */
306         status = FORMAT_VL (key, sizeof (key), vl);
307         if (status != 0) {
308                 ERROR ("write_http plugin: error with format_name");
309                 return (status);
310         }
311         escape_string (key, sizeof (key));
312
313         /* Convert the values to an ASCII representation and put that into
314          * `values'. */
315         status = wh_value_list_to_string (values, sizeof (values), ds, vl);
316         if (status != 0) {
317                 ERROR ("write_http plugin: error with "
318                                 "wh_value_list_to_string");
319                 return (status);
320         }
321
322         command_len = (size_t) ssnprintf (command, sizeof (command),
323                         "PUTVAL %s interval=%i %s\n",
324                         key, vl->interval, values);
325         if (command_len >= sizeof (command)) {
326                 ERROR ("write_http plugin: Command buffer too small: "
327                                 "Need %zu bytes.", command_len + 1);
328                 return (-1);
329         }
330
331         pthread_mutex_lock (&cb->send_lock);
332
333         if (cb->curl == NULL)
334         {
335                 status = wh_callback_init (cb);
336                 if (status != 0)
337                 {
338                         ERROR ("write_http plugin: wh_callback_init failed.");
339                         pthread_mutex_unlock (&cb->send_lock);
340                         return (-1);
341                 }
342         }
343
344         if (command_len >= cb->send_buffer_free)
345         {
346                 status = wh_flush_nolock (/* timeout = */ -1, cb);
347                 if (status != 0)
348                 {
349                         pthread_mutex_unlock (&cb->send_lock);
350                         return (status);
351                 }
352         }
353         assert (command_len < cb->send_buffer_free);
354
355         /* `command_len + 1' because `command_len' does not include the
356          * trailing null byte. Neither does `send_buffer_fill'. */
357         memcpy (cb->send_buffer + cb->send_buffer_fill,
358                         command, command_len + 1);
359         cb->send_buffer_fill += command_len;
360         cb->send_buffer_free -= command_len;
361
362         DEBUG ("write_http plugin: <%s> buffer %zu/%zu (%g%%) \"%s\"",
363                         cb->location,
364                         cb->send_buffer_fill, sizeof (cb->send_buffer),
365                         100.0 * ((double) cb->send_buffer_fill) / ((double) sizeof (cb->send_buffer)),
366                         command);
367
368         /* Check if we have enough space for this command. */
369         pthread_mutex_unlock (&cb->send_lock);
370
371         return (0);
372 } /* }}} int wh_write_command */
373
374 static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
375                 user_data_t *user_data)
376 {
377         wh_callback_t *cb;
378         int status;
379
380         if (user_data == NULL)
381                 return (-EINVAL);
382
383         cb = user_data->data;
384
385         status = wh_write_command (ds, vl, cb);
386         return (status);
387 } /* }}} int wh_write */
388
389 static int wh_config_url (oconfig_item_t *ci) /* {{{ */
390 {
391         wh_callback_t *cb;
392         user_data_t user_data;
393         int i;
394
395         cb = malloc (sizeof (*cb));
396         if (cb == NULL)
397         {
398                 ERROR ("write_http plugin: malloc failed.");
399                 return (-1);
400         }
401         memset (cb, 0, sizeof (*cb));
402
403         pthread_mutex_init (&cb->send_lock, /* attr = */ NULL);
404
405         config_set_string (&cb->location, ci);
406         if (cb->location == NULL)
407                 return (-1);
408
409         for (i = 0; i < ci->children_num; i++)
410         {
411                 oconfig_item_t *child = ci->children + i;
412
413                 if (strcasecmp ("User", child->key) == 0)
414                         config_set_string (&cb->user, child);
415                 else if (strcasecmp ("Password", child->key) == 0)
416                         config_set_string (&cb->pass, child);
417                 else
418                 {
419                         ERROR ("write_http plugin: Invalid configuration "
420                                         "option: %s.", child->key);
421                 }
422         }
423
424         DEBUG ("write_http: Registering write callback with URL %s",
425                         cb->location);
426
427         memset (&user_data, 0, sizeof (user_data));
428         user_data.data = cb;
429         user_data.free_func = NULL;
430         plugin_register_flush ("write_http", wh_flush, &user_data);
431
432         user_data.free_func = wh_callback_free;
433         plugin_register_write ("write_http", wh_write, &user_data);
434
435         return (0);
436 } /* }}} int wh_config_url */
437
438 static int wh_config (oconfig_item_t *ci) /* {{{ */
439 {
440         int i;
441
442         for (i = 0; i < ci->children_num; i++)
443         {
444                 oconfig_item_t *child = ci->children + i;
445
446                 if (strcasecmp ("URL", child->key) == 0)
447                         wh_config_url (child);
448                 else
449                 {
450                         ERROR ("write_http plugin: Invalid configuration "
451                                         "option: %s.", child->key);
452                 }
453         }
454
455         return (0);
456 } /* }}} int wh_config */
457
458 void module_register (void) /* {{{ */
459 {
460         plugin_register_complex_config ("write_http", wh_config);
461 } /* }}} void module_register */
462
463 /* vim: set fdm=marker sw=8 ts=8 tw=78 et : */