From cf7c7d8538822a2ad09021adf1a919b7ce1892c0 Mon Sep 17 00:00:00 2001 From: Florian Forster Date: Sat, 29 Aug 2009 10:33:19 +0200 Subject: [PATCH] write_http plugin: Implement support for multiple destinations. --- src/collectd.conf.in | 8 + src/collectd.conf.pod | 18 +- src/plugin.c | 8 +- src/write_http.c | 445 +++++++++++++++++++++++++++++--------------------- 4 files changed, 289 insertions(+), 190 deletions(-) diff --git a/src/collectd.conf.in b/src/collectd.conf.in index 19b57afa..9a542452 100644 --- a/src/collectd.conf.in +++ b/src/collectd.conf.in @@ -127,6 +127,7 @@ FQDNLookup true #@BUILD_PLUGIN_VMEM_TRUE@LoadPlugin vmem #@BUILD_PLUGIN_VSERVER_TRUE@LoadPlugin vserver #@BUILD_PLUGIN_WIRELESS_TRUE@LoadPlugin wireless +#@BUILD_PLUGIN_WRITE_HTTP_TRUE@LoadPlugin write_http #@BUILD_PLUGIN_XMMS_TRUE@LoadPlugin xmms ############################################################################## @@ -713,6 +714,13 @@ FQDNLookup true # Verbose false # +# +# +# User "collectd" +# Password "weCh3ik0" +# +# + ############################################################################## # Filter configuration # #----------------------------------------------------------------------------# diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index e7943199..786572cb 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -3454,15 +3454,23 @@ iptables to feed data for the guest IPs into the iptables plugin. =head2 Plugin C This output plugin submits values to an http server by POST them using the -PUTVAL plain-text protocol. +PUTVAL plain-text protocol. Each destination you want to post data to needs to +have one B block, within which the destination can be configured further, +for example by specifying authentication data. -The following options are accepted by the I: +Synopsis: -=over 4 + + + User "collectd" + Password "weCh3ik0" + + -=item B I +B blocks need one string argument which is used as the URL to which data +is posted. The following options are understood within B blocks. -Set the URL location where values will be sent. +=over 4 =item B I diff --git a/src/plugin.c b/src/plugin.c index b150cf67..7f37fa7a 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -1205,8 +1205,14 @@ void plugin_shutdown_all (void) (*callback) (); } - destroy_all_callbacks (&list_write); + /* Write plugins which use the `user_data' pointer usually need the + * same data available to the flush callback. If this is the case, set + * the free_function to NULL when registering the flush callback and to + * the real free function when registering the write callback. This way + * the data isn't freed twice. */ destroy_all_callbacks (&list_flush); + destroy_all_callbacks (&list_write); + destroy_all_callbacks (&list_notification); destroy_all_callbacks (&list_shutdown); destroy_all_callbacks (&list_log); diff --git a/src/write_http.c b/src/write_http.c index 27d1842c..912c92dd 100644 --- a/src/write_http.c +++ b/src/write_http.c @@ -38,84 +38,178 @@ /* * Private variables */ -static const char *config_keys[] = +struct wh_callback_s { - "URL", "User", "Password" -}; -static int config_keys_num = STATIC_ARRAY_SIZE (config_keys); + char *location; -static char *location = NULL; + char *user; + char *pass; + char *credentials; -char *user; -char *pass; -char *credentials; + CURL *curl; + char curl_errbuf[CURL_ERROR_SIZE]; -CURL *curl; -char curl_errbuf[CURL_ERROR_SIZE]; + char send_buffer[4096]; + size_t send_buffer_free; + size_t send_buffer_fill; + time_t send_buffer_init_time; -#define SEND_BUFFER_SIZE 4096 -static char send_buffer[SEND_BUFFER_SIZE]; -static size_t send_buffer_free; -static size_t send_buffer_fill; -static time_t send_buffer_init_time; + pthread_mutex_t send_lock; +}; +typedef struct wh_callback_s wh_callback_t; -static pthread_mutex_t send_lock = PTHREAD_MUTEX_INITIALIZER; +static void wh_reset_buffer (wh_callback_t *cb) /* {{{ */ +{ + memset (cb->send_buffer, 0, sizeof (cb->send_buffer)); + cb->send_buffer_free = sizeof (cb->send_buffer); + cb->send_buffer_fill = 0; + cb->send_buffer_init_time = time (NULL); +} /* }}} wh_reset_buffer */ -static void wh_init_buffer (void) /* {{{ */ +static int wh_send_buffer (wh_callback_t *cb) /* {{{ */ { - memset (send_buffer, 0, sizeof (send_buffer)); - send_buffer_free = sizeof (send_buffer); - send_buffer_fill = 0; - send_buffer_init_time = time (NULL); -} /* }}} wh_init_buffer */ + int status = 0; + + curl_easy_setopt (cb->curl, CURLOPT_POSTFIELDS, cb->send_buffer); + status = curl_easy_perform (cb->curl); + if (status != 0) + { + ERROR ("write_http plugin: curl_easy_perform failed with " + "staus %i: %s", + status, cb->curl_errbuf); + } + return (status); +} /* }}} wh_send_buffer */ -static int wh_init(void) /* {{{ */ +static int wh_callback_init (wh_callback_t *cb) /* {{{ */ { + struct curl_slist *headers; - curl = curl_easy_init (); + if (cb->curl != NULL) + return (0); - if (curl == NULL) + cb->curl = curl_easy_init (); + if (cb->curl == NULL) { ERROR ("curl plugin: curl_easy_init failed."); return (-1); } - struct curl_slist *headers=NULL; + curl_easy_setopt (cb->curl, CURLOPT_USERAGENT, PACKAGE_NAME"/"PACKAGE_VERSION); - curl_easy_setopt (curl, CURLOPT_USERAGENT, PACKAGE_NAME"/"PACKAGE_VERSION); + headers = NULL; + headers = curl_slist_append (headers, "Accept: */*"); + headers = curl_slist_append (headers, "Content-Type: text/plain"); + curl_easy_setopt (cb->curl, CURLOPT_HTTPHEADER, headers); - headers = curl_slist_append(headers, "Accept: */*"); - headers = curl_slist_append(headers, "Content-Type: text/plain"); - curl_easy_setopt (curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt (cb->curl, CURLOPT_ERRORBUFFER, cb->curl_errbuf); + curl_easy_setopt (cb->curl, CURLOPT_URL, cb->location); - curl_easy_setopt (curl, CURLOPT_ERRORBUFFER, curl_errbuf); - curl_easy_setopt (curl, CURLOPT_URL, location); - - if (user != NULL) + if (cb->user != NULL) { size_t credentials_size; - credentials_size = strlen (user) + 2; - if (pass != NULL) - credentials_size += strlen (pass); + credentials_size = strlen (cb->user) + 2; + if (cb->pass != NULL) + credentials_size += strlen (cb->pass); - credentials = (char *) malloc (credentials_size); - if (credentials == NULL) + cb->credentials = (char *) malloc (credentials_size); + if (cb->credentials == NULL) { ERROR ("curl plugin: malloc failed."); return (-1); } - ssnprintf (credentials, credentials_size, "%s:%s", - user, (pass == NULL) ? "" : pass); - curl_easy_setopt (curl, CURLOPT_USERPWD, credentials); - curl_easy_setopt (curl, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST); + ssnprintf (cb->credentials, credentials_size, "%s:%s", + cb->user, (cb->pass == NULL) ? "" : cb->pass); + curl_easy_setopt (cb->curl, CURLOPT_USERPWD, cb->credentials); + curl_easy_setopt (cb->curl, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST); } - wh_init_buffer (); + wh_reset_buffer (cb); return (0); -} /* }}} */ +} /* }}} int wh_callback_init */ + +static int wh_flush_nolock (int timeout, wh_callback_t *cb) /* {{{ */ +{ + int status; + + DEBUG ("write_http plugin: wh_flush_nolock: timeout = %i; " + "send_buffer_fill = %zu;", + timeout, cb->send_buffer_fill); + + if (timeout > 0) + { + time_t now; + + now = time (NULL); + if ((cb->send_buffer_init_time + timeout) > now) + return (0); + } + + if (cb->send_buffer_fill <= 0) + { + cb->send_buffer_init_time = time (NULL); + return (0); + } + + status = wh_send_buffer (cb); + wh_reset_buffer (cb); + + return (status); +} /* }}} wh_flush_nolock */ + +static int wh_flush (int timeout, /* {{{ */ + const char *identifier __attribute__((unused)), + user_data_t *user_data) +{ + wh_callback_t *cb; + int status; + + if (user_data == NULL) + return (-EINVAL); + + cb = user_data->data; + + pthread_mutex_lock (&cb->send_lock); + + if (cb->curl == NULL) + { + status = wh_callback_init (cb); + if (status != 0) + { + ERROR ("write_http plugin: wh_callback_init failed."); + pthread_mutex_unlock (&cb->send_lock); + return (-1); + } + } + + status = wh_flush_nolock (timeout, cb); + pthread_mutex_unlock (&cb->send_lock); + + return (status); +} /* }}} int wh_flush */ + +static void wh_callback_free (void *data) /* {{{ */ +{ + wh_callback_t *cb; + + if (data == NULL) + return; + + cb = data; + + wh_flush_nolock (/* timeout = */ -1, cb); + + curl_easy_cleanup (cb->curl); + sfree (cb->location); + sfree (cb->user); + sfree (cb->pass); + sfree (cb->credentials); + + sfree (cb); +} /* }}} void wh_callback_free */ static int wh_value_list_to_string (char *buffer, /* {{{ */ size_t buffer_size, @@ -165,132 +259,35 @@ static int wh_value_list_to_string (char *buffer, /* {{{ */ return (0); } /* }}} int wh_value_list_to_string */ -static int wh_config (const char *key, const char *value) /* {{{ */ -{ - if (strcasecmp ("URL", key) == 0) - { - if (location != NULL) - free (location); - location = strdup (value); - if (location != NULL) - { - int len = strlen (location); - while ((len > 0) && (location[len - 1] == '/')) - { - len--; - location[len] = '\0'; - } - if (len <= 0) - { - free (location); - location = NULL; - } - } - } - else if (strcasecmp ("User", key) == 0) - { - if (user != NULL) - free (user); - user = strdup (value); - if (user != NULL) - { - int len = strlen (user); - while ((len > 0) && (user[len - 1] == '/')) - { - len--; - user[len] = '\0'; - } - if (len <= 0) - { - free (user); - user = NULL; - } - } - } - else if (strcasecmp ("Password", key) == 0) - { - if (pass != NULL) - free (pass); - pass = strdup (value); - if (pass != NULL) - { - int len = strlen (pass); - while ((len > 0) && (pass[len - 1] == '/')) - { - len--; - pass[len] = '\0'; - } - if (len <= 0) - { - free (pass); - pass = NULL; - } - } - } - else - { - return (-1); - } - return (0); -} /* }}} int wh_config */ - -static int wh_send_buffer (char *buffer) /* {{{ */ -{ - int status = 0; - - curl_easy_setopt (curl, CURLOPT_POSTFIELDS, buffer); - status = curl_easy_perform (curl); - if (status != 0) - { - ERROR ("write_http plugin: curl_easy_perform failed with " - "staus %i: %s", - status, curl_errbuf); - } - return (status); -} /* }}} wh_send_buffer */ - -static int wh_flush_nolock (int timeout) /* {{{ */ +static int config_set_string (char **ret_string, /* {{{ */ + oconfig_item_t *ci) { - int status; + char *string; - DEBUG ("write_http plugin: wh_flush_nolock: timeout = %i; " - "send_buffer =\n %s", timeout, send_buffer); - - if (timeout > 0) + if ((ci->values_num != 1) + || (ci->values[0].type != OCONFIG_TYPE_STRING)) { - time_t now; - - now = time (NULL); - if ((send_buffer_init_time + timeout) > now) - return (0); + WARNING ("write_http plugin: The `%s' config option " + "needs exactly one string argument.", ci->key); + return (-1); } - if (send_buffer_fill <= 0) + string = strdup (ci->values[0].value.string); + if (string == NULL) { - send_buffer_init_time = time (NULL); - return (0); + ERROR ("write_http plugin: strdup failed."); + return (-1); } - status = wh_send_buffer (send_buffer); - wh_init_buffer (); + if (*ret_string != NULL) + free (*ret_string); + *ret_string = string; - return (status); -} /* }}} wh_flush_nolock */ - -static int wh_flush (int timeout, /* {{{ */ - const char *identifier __attribute__((unused)), - user_data_t *user_data __attribute__((unused))) -{ - int status; - - pthread_mutex_lock (&send_lock); - status = wh_flush_nolock (timeout); - pthread_mutex_unlock (&send_lock); - - return (status); -} /* }}} int wh_flush */ + return (0); +} /* }}} int config_set_string */ -static int wh_write_command (const data_set_t *ds, const value_list_t *vl) /* {{{ */ +static int wh_write_command (const data_set_t *ds, const value_list_t *vl, /* {{{ */ + wh_callback_t *cb) { char key[10*DATA_MAX_NAME_LEN]; char values[512]; @@ -331,56 +328,136 @@ static int wh_write_command (const data_set_t *ds, const value_list_t *vl) /* {{ return (-1); } - pthread_mutex_lock (&send_lock); + pthread_mutex_lock (&cb->send_lock); - /* Check if we have enough space for this command. */ - if (command_len >= send_buffer_free) + if (cb->curl == NULL) + { + status = wh_callback_init (cb); + if (status != 0) + { + ERROR ("write_http plugin: wh_callback_init failed."); + pthread_mutex_unlock (&cb->send_lock); + return (-1); + } + } + + if (command_len >= cb->send_buffer_free) { - status = wh_flush_nolock (/* timeout = */ -1); + status = wh_flush_nolock (/* timeout = */ -1, cb); if (status != 0) { - pthread_mutex_unlock (&send_lock); - return status; + pthread_mutex_unlock (&cb->send_lock); + return (status); } } - assert (command_len < send_buffer_free); + assert (command_len < cb->send_buffer_free); /* `command_len + 1' because `command_len' does not include the * trailing null byte. Neither does `send_buffer_fill'. */ - memcpy (send_buffer + send_buffer_fill, command, command_len + 1); - send_buffer_fill += command_len; - send_buffer_free -= command_len; + memcpy (cb->send_buffer + cb->send_buffer_fill, + command, command_len + 1); + cb->send_buffer_fill += command_len; + cb->send_buffer_free -= command_len; + + DEBUG ("write_http plugin: <%s> buffer %zu/%zu (%g%%) \"%s\"", + cb->location, + cb->send_buffer_fill, sizeof (cb->send_buffer), + 100.0 * ((double) cb->send_buffer_fill) / ((double) sizeof (cb->send_buffer)), + command); - pthread_mutex_unlock (&send_lock); + /* Check if we have enough space for this command. */ + pthread_mutex_unlock (&cb->send_lock); return (0); } /* }}} int wh_write_command */ static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */ - user_data_t __attribute__((unused)) *user_data) + user_data_t *user_data) { + wh_callback_t *cb; int status; - status = wh_write_command (ds, vl); + if (user_data == NULL) + return (-EINVAL); + cb = user_data->data; + + status = wh_write_command (ds, vl, cb); return (status); } /* }}} int wh_write */ -static int wh_shutdown (void) /* {{{ */ +static int wh_config_url (oconfig_item_t *ci) /* {{{ */ { - wh_flush_nolock (/* timeout = */ -1); - curl_easy_cleanup(curl); + wh_callback_t *cb; + user_data_t user_data; + int i; + + cb = malloc (sizeof (*cb)); + if (cb == NULL) + { + ERROR ("write_http plugin: malloc failed."); + return (-1); + } + memset (cb, 0, sizeof (*cb)); + + pthread_mutex_init (&cb->send_lock, /* attr = */ NULL); + + config_set_string (&cb->location, ci); + if (cb->location == NULL) + return (-1); + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("User", child->key) == 0) + config_set_string (&cb->user, child); + else if (strcasecmp ("Password", child->key) == 0) + config_set_string (&cb->pass, child); + else + { + ERROR ("write_http plugin: Invalid configuration " + "option: %s.", child->key); + } + } + + DEBUG ("write_http: Registering write callback with URL %s", + cb->location); + + memset (&user_data, 0, sizeof (user_data)); + user_data.data = cb; + user_data.free_func = NULL; + plugin_register_flush ("write_http", wh_flush, &user_data); + + user_data.free_func = wh_callback_free; + plugin_register_write ("write_http", wh_write, &user_data); + return (0); -} /* }}} int wh_shutdown */ +} /* }}} int wh_config_url */ + +static int wh_config (oconfig_item_t *ci) /* {{{ */ +{ + int i; + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("URL", child->key) == 0) + wh_config_url (child); + else + { + ERROR ("write_http plugin: Invalid configuration " + "option: %s.", child->key); + } + } + + return (0); +} /* }}} int wh_config */ void module_register (void) /* {{{ */ { - plugin_register_init("write_http", wh_init); - plugin_register_config ("write_http", wh_config, - config_keys, config_keys_num); - plugin_register_write ("write_http", wh_write, /* user_data = */ NULL); - plugin_register_flush ("write_http", wh_flush, /* user_data = */ NULL); - plugin_register_shutdown("write_http", wh_shutdown); + plugin_register_complex_config ("write_http", wh_config); } /* }}} void module_register */ /* vim: set fdm=marker sw=8 ts=8 tw=78 et : */ -- 2.11.0