2 * collectd - src/write_http.c
3 * Copyright (C) 2009 Paul Sadauskas
4 * Copyright (C) 2009 Doug MacEachern
5 * Copyright (C) 2007-2009 Florian octo Forster
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; only version 2 of the License is applicable.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 * Florian octo Forster <octo at verplant.org>
22 * Doug MacEachern <dougm@hyperic.com>
23 * Paul Sadauskas <psadauskas@gmail.com>
29 #include "utils_cache.h"
30 #include "utils_parse_option.h"
31 #include "utils_format_json.h"
37 #include <curl/curl.h>
53 #define WH_FORMAT_COMMAND 0
54 #define WH_FORMAT_JSON 1
57 char send_buffer[4096];
58 size_t send_buffer_free;
59 size_t send_buffer_fill;
60 time_t send_buffer_init_time;
62 pthread_mutex_t send_lock;
64 typedef struct wh_callback_s wh_callback_t;
69 char errbuf[CURL_ERROR_SIZE];
71 typedef struct wh_curl_s wh_curl_t;
73 static pthread_once_t curl_key_init = PTHREAD_ONCE_INIT;
74 static pthread_key_t curl_key;
76 static void wh_curl_destroy (void *data) /* {{{ */
83 DEBUG ("write_http plugin: Destroying a cURL object.");
85 curl_easy_cleanup (c->curl);
87 } /* }}} void wh_curl_destroy */
89 static void wh_curl_init (void) /* {{{ */
91 pthread_key_create(&curl_key, wh_curl_destroy);
92 } /* }}} void wh_curl_init */
94 static wh_curl_t *wh_curl_get (wh_callback_t *cb) /* {{{ */
96 struct curl_slist *headers;
99 pthread_once (&curl_key_init, wh_curl_init);
101 c = pthread_getspecific (curl_key);
105 DEBUG ("write_http plugin: Creating a cURL object.");
107 c = malloc (sizeof (*c));
110 ERROR ("write_http plugin: malloc failed.");
113 memset (c, 0, sizeof (*c));
115 c->curl = curl_easy_init ();
118 ERROR ("write_http plugin: curl_easy_init failed.");
123 curl_easy_setopt (c->curl, CURLOPT_USERAGENT, PACKAGE_NAME"/"PACKAGE_VERSION);
125 /* The fields from `cb' we read here are only written to at
126 * configuration time, therefore it's safe to read them without a
129 headers = curl_slist_append (headers, "Accept: */*");
130 if (cb->format == WH_FORMAT_JSON)
131 headers = curl_slist_append (headers, "Content-Type: application/json");
133 headers = curl_slist_append (headers, "Content-Type: text/plain");
134 headers = curl_slist_append (headers, "Expect:");
135 curl_easy_setopt (c->curl, CURLOPT_HTTPHEADER, headers);
137 curl_easy_setopt (c->curl, CURLOPT_ERRORBUFFER, c->errbuf);
138 curl_easy_setopt (c->curl, CURLOPT_URL, cb->location);
140 if (cb->credentials != NULL)
141 curl_easy_setopt (c->curl, CURLOPT_USERPWD, cb->credentials);
143 curl_easy_setopt (c->curl, CURLOPT_SSL_VERIFYPEER, cb->verify_peer);
144 curl_easy_setopt (c->curl, CURLOPT_SSL_VERIFYHOST,
145 cb->verify_host ? 2 : 0);
146 if (cb->cacert != NULL)
147 curl_easy_setopt (c->curl, CURLOPT_CAINFO, cb->cacert);
149 pthread_setspecific (curl_key, c);
152 } /* }}} int wh_curl_get */
154 static void wh_reset_buffer (wh_callback_t *cb) /* {{{ */
156 memset (cb->send_buffer, 0, sizeof (cb->send_buffer));
157 cb->send_buffer_free = sizeof (cb->send_buffer);
158 cb->send_buffer_fill = 0;
159 cb->send_buffer_init_time = time (NULL);
161 if (cb->format == WH_FORMAT_JSON)
163 format_json_initialize (cb->send_buffer,
164 &cb->send_buffer_fill,
165 &cb->send_buffer_free);
167 } /* }}} wh_reset_buffer */
169 static int wh_send_buffer (wh_callback_t *cb, /* {{{ */
175 c = wh_curl_get (cb);
179 curl_easy_setopt (c->curl, CURLOPT_POSTFIELDS, buffer);
180 status = curl_easy_perform (c->curl);
183 ERROR ("write_http plugin: curl_easy_perform failed with "
189 } /* }}} wh_send_buffer */
191 /* You must hold cb->send_lock when entering `wh_flush_nolock'. */
192 static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */
194 char buffer[sizeof (cb->send_buffer)];
197 DEBUG ("write_http plugin: wh_flush_nolock: timeout = %i; "
198 "send_buffer_fill = %zu;",
199 timeout, cb->send_buffer_fill);
206 if ((cb->send_buffer_init_time + timeout) > now)
210 /* Finalize the send buffer and copy it to `buffer'. */
211 if (cb->format == WH_FORMAT_COMMAND)
213 if (cb->send_buffer_fill <= 0)
215 cb->send_buffer_init_time = time (NULL);
219 memcpy (buffer, cb->send_buffer, sizeof (buffer));
220 wh_reset_buffer (cb);
222 else if (cb->format == WH_FORMAT_JSON)
224 if (cb->send_buffer_fill <= 2)
226 cb->send_buffer_init_time = time (NULL);
230 status = format_json_finalize (cb->send_buffer,
231 &cb->send_buffer_fill,
232 &cb->send_buffer_free);
235 ERROR ("write_http: wh_flush_nolock: "
236 "format_json_finalize failed.");
237 wh_reset_buffer (cb);
241 memcpy (buffer, cb->send_buffer, sizeof (buffer));
242 wh_reset_buffer (cb);
246 ERROR ("write_http: wh_flush_nolock: "
247 "Unknown format: %i",
252 /* We copied the send buffer to `buffer' and reset it so we can do
253 * without the `send_lock' here. This allows other read-threads to
254 * append stuff to the new buffer while we wait for the web-server to
256 pthread_mutex_unlock (&cb->send_lock);
257 status = wh_send_buffer (cb, buffer);
258 pthread_mutex_lock (&cb->send_lock);
261 } /* }}} wh_flush_nolock */
263 static int wh_flush (int timeout, /* {{{ */
264 const char *identifier __attribute__((unused)),
265 user_data_t *user_data)
270 if (user_data == NULL)
273 cb = user_data->data;
275 pthread_mutex_lock (&cb->send_lock);
276 status = wh_flush_nolock (timeout, cb);
277 pthread_mutex_unlock (&cb->send_lock);
280 } /* }}} int wh_flush */
282 static void wh_callback_free (void *data) /* {{{ */
291 wh_flush_nolock (/* timeout = */ -1, cb);
293 sfree (cb->location);
296 sfree (cb->credentials);
300 } /* }}} void wh_callback_free */
302 static int wh_value_list_to_string (char *buffer, /* {{{ */
304 const data_set_t *ds, const value_list_t *vl)
310 assert (0 == strcmp (ds->type, vl->type));
312 memset (buffer, 0, buffer_size);
314 #define BUFFER_ADD(...) do { \
315 status = ssnprintf (buffer + offset, buffer_size - offset, \
319 else if (((size_t) status) >= (buffer_size - offset)) \
322 offset += ((size_t) status); \
325 BUFFER_ADD ("%lu", (unsigned long) vl->time);
327 for (i = 0; i < ds->ds_num; i++)
329 if (ds->ds[i].type == DS_TYPE_GAUGE)
330 BUFFER_ADD (":%f", vl->values[i].gauge);
331 else if (ds->ds[i].type == DS_TYPE_COUNTER)
332 BUFFER_ADD (":%llu", vl->values[i].counter);
333 else if (ds->ds[i].type == DS_TYPE_DERIVE)
334 BUFFER_ADD (":%"PRIi64, vl->values[i].derive);
335 else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
336 BUFFER_ADD (":%"PRIu64, vl->values[i].absolute);
339 ERROR ("write_http plugin: Unknown data source type: %i",
343 } /* for ds->ds_num */
348 } /* }}} int wh_value_list_to_string */
350 static int wh_write_command (const data_set_t *ds, const value_list_t *vl, /* {{{ */
353 char key[10*DATA_MAX_NAME_LEN];
360 if (0 != strcmp (ds->type, vl->type)) {
361 ERROR ("write_http plugin: DS type does not match "
366 /* Copy the identifier to `key' and escape it. */
367 status = FORMAT_VL (key, sizeof (key), vl);
369 ERROR ("write_http plugin: error with format_name");
372 escape_string (key, sizeof (key));
374 /* Convert the values to an ASCII representation and put that into
376 status = wh_value_list_to_string (values, sizeof (values), ds, vl);
378 ERROR ("write_http plugin: error with "
379 "wh_value_list_to_string");
383 command_len = (size_t) ssnprintf (command, sizeof (command),
384 "PUTVAL %s interval=%i %s\r\n",
385 key, vl->interval, values);
386 if (command_len >= sizeof (command)) {
387 ERROR ("write_http plugin: Command buffer too small: "
388 "Need %zu bytes.", command_len + 1);
392 pthread_mutex_lock (&cb->send_lock);
394 if (command_len >= cb->send_buffer_free)
396 status = wh_flush_nolock (/* timeout = */ -1, cb);
399 pthread_mutex_unlock (&cb->send_lock);
403 assert (command_len < cb->send_buffer_free);
405 /* `command_len + 1' because `command_len' does not include the
406 * trailing null byte. Neither does `send_buffer_fill'. */
407 memcpy (cb->send_buffer + cb->send_buffer_fill,
408 command, command_len + 1);
409 cb->send_buffer_fill += command_len;
410 cb->send_buffer_free -= command_len;
412 DEBUG ("write_http plugin: <%s> buffer %zu/%zu (%g%%) \"%s\"",
414 cb->send_buffer_fill, sizeof (cb->send_buffer),
415 100.0 * ((double) cb->send_buffer_fill) / ((double) sizeof (cb->send_buffer)),
418 /* Check if we have enough space for this command. */
419 pthread_mutex_unlock (&cb->send_lock);
422 } /* }}} int wh_write_command */
424 static int wh_write_json (const data_set_t *ds, const value_list_t *vl, /* {{{ */
429 pthread_mutex_lock (&cb->send_lock);
431 status = format_json_value_list (cb->send_buffer,
432 &cb->send_buffer_fill,
433 &cb->send_buffer_free,
435 if (status == (-ENOMEM))
437 status = wh_flush_nolock (/* timeout = */ -1, cb);
440 wh_reset_buffer (cb);
441 pthread_mutex_unlock (&cb->send_lock);
445 status = format_json_value_list (cb->send_buffer,
446 &cb->send_buffer_fill,
447 &cb->send_buffer_free,
452 pthread_mutex_unlock (&cb->send_lock);
456 DEBUG ("write_http plugin: <%s> buffer %zu/%zu (%g%%)",
458 cb->send_buffer_fill, sizeof (cb->send_buffer),
459 100.0 * ((double) cb->send_buffer_fill) / ((double) sizeof (cb->send_buffer)));
461 /* Check if we have enough space for this command. */
462 pthread_mutex_unlock (&cb->send_lock);
465 } /* }}} int wh_write_json */
467 static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
468 user_data_t *user_data)
473 if (user_data == NULL)
476 cb = user_data->data;
478 if (cb->format == WH_FORMAT_JSON)
479 status = wh_write_json (ds, vl, cb);
481 status = wh_write_command (ds, vl, cb);
484 } /* }}} int wh_write */
486 static int config_set_string (char **ret_string, /* {{{ */
491 if ((ci->values_num != 1)
492 || (ci->values[0].type != OCONFIG_TYPE_STRING))
494 WARNING ("write_http plugin: The `%s' config option "
495 "needs exactly one string argument.", ci->key);
499 string = strdup (ci->values[0].value.string);
502 ERROR ("write_http plugin: strdup failed.");
506 if (*ret_string != NULL)
508 *ret_string = string;
511 } /* }}} int config_set_string */
513 static int config_set_boolean (int *dest, oconfig_item_t *ci) /* {{{ */
515 if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_BOOLEAN))
517 WARNING ("write_http plugin: The `%s' config option "
518 "needs exactly one boolean argument.", ci->key);
522 *dest = ci->values[0].value.boolean ? 1 : 0;
525 } /* }}} int config_set_boolean */
527 static int config_set_format (wh_callback_t *cb, /* {{{ */
532 if ((ci->values_num != 1)
533 || (ci->values[0].type != OCONFIG_TYPE_STRING))
535 WARNING ("write_http plugin: The `%s' config option "
536 "needs exactly one string argument.", ci->key);
540 string = ci->values[0].value.string;
541 if (strcasecmp ("Command", string) == 0)
542 cb->format = WH_FORMAT_COMMAND;
543 else if (strcasecmp ("JSON", string) == 0)
544 cb->format = WH_FORMAT_JSON;
547 ERROR ("write_http plugin: Invalid format string: %s",
553 } /* }}} int config_set_string */
555 static int wh_config_url (oconfig_item_t *ci) /* {{{ */
558 user_data_t user_data;
561 cb = malloc (sizeof (*cb));
564 ERROR ("write_http plugin: malloc failed.");
567 memset (cb, 0, sizeof (*cb));
571 cb->credentials = NULL;
575 cb->format = WH_FORMAT_COMMAND;
577 pthread_mutex_init (&cb->send_lock, /* attr = */ NULL);
579 config_set_string (&cb->location, ci);
580 if (cb->location == NULL)
583 for (i = 0; i < ci->children_num; i++)
585 oconfig_item_t *child = ci->children + i;
587 if (strcasecmp ("User", child->key) == 0)
588 config_set_string (&cb->user, child);
589 else if (strcasecmp ("Password", child->key) == 0)
590 config_set_string (&cb->pass, child);
591 else if (strcasecmp ("VerifyPeer", child->key) == 0)
592 config_set_boolean (&cb->verify_peer, child);
593 else if (strcasecmp ("VerifyHost", child->key) == 0)
594 config_set_boolean (&cb->verify_host, child);
595 else if (strcasecmp ("CACert", child->key) == 0)
596 config_set_string (&cb->cacert, child);
597 else if (strcasecmp ("Format", child->key) == 0)
598 config_set_format (cb, child);
601 ERROR ("write_http plugin: Invalid configuration "
602 "option: %s.", child->key);
606 DEBUG ("write_http: Registering write callback with URL %s",
609 if (cb->user != NULL)
611 size_t credentials_size;
613 credentials_size = strlen (cb->user) + 2;
614 if (cb->pass != NULL)
615 credentials_size += strlen (cb->pass);
617 cb->credentials = (char *) malloc (credentials_size);
618 if (cb->credentials == NULL)
620 ERROR ("write_http plugin: malloc failed.");
624 ssnprintf (cb->credentials, credentials_size, "%s:%s",
625 cb->user, (cb->pass == NULL) ? "" : cb->pass);
628 wh_reset_buffer (cb);
630 memset (&user_data, 0, sizeof (user_data));
632 user_data.free_func = NULL;
633 plugin_register_flush ("write_http", wh_flush, &user_data);
635 user_data.free_func = wh_callback_free;
636 plugin_register_write ("write_http", wh_write, &user_data);
639 } /* }}} int wh_config_url */
641 static int wh_config (oconfig_item_t *ci) /* {{{ */
645 for (i = 0; i < ci->children_num; i++)
647 oconfig_item_t *child = ci->children + i;
649 if (strcasecmp ("URL", child->key) == 0)
650 wh_config_url (child);
653 ERROR ("write_http plugin: Invalid configuration "
654 "option: %s.", child->key);
659 } /* }}} int wh_config */
661 void module_register (void) /* {{{ */
663 plugin_register_complex_config ("write_http", wh_config);
664 } /* }}} void module_register */
666 /* vim: set fdm=marker sw=8 ts=8 tw=78 et : */