X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fwrite_graphite.c;h=8ae15ba0c5a2536f332b5de0358a7a13ed127d56;hb=e9fb3dcaad4d46f1e594fd773f6d2327822bdddf;hp=fca40d1a478fc48639686185edb2e4a8491ea918;hpb=acd2fa45d20e4445140fbad3dae7032019dff16b;p=collectd.git diff --git a/src/write_graphite.c b/src/write_graphite.c index fca40d1a..8ae15ba0 100644 --- a/src/write_graphite.c +++ b/src/write_graphite.c @@ -47,6 +47,7 @@ #include "utils_cache.h" #include "utils_parse_option.h" +#include "utils_format_graphite.h" /* Folks without pthread will need to disable this plugin. */ #include @@ -54,15 +55,6 @@ #include #include -#ifndef WG_FORMAT_NAME -#define WG_FORMAT_NAME(ret, ret_len, vl, cb, ds_name) \ - wg_format_name (ret, ret_len, (vl)->host, \ - (vl)->plugin, (vl)->plugin_instance, \ - (vl)->type, (vl)->type_instance, \ - (cb)->prefix, (cb)->postfix, \ - ds_name, (cb)->escape_char) -#endif - #ifndef WG_DEFAULT_NODE # define WG_DEFAULT_NODE "localhost" #endif @@ -75,8 +67,9 @@ # define WG_DEFAULT_ESCAPE '_' #endif +/* Ethernet - (IPv6 + TCP) = 1500 - (40 + 32) = 1428 */ #ifndef WG_SEND_BUF_SIZE -# define WG_SEND_BUF_SIZE 4096 +# define WG_SEND_BUF_SIZE 1428 #endif /* @@ -92,7 +85,7 @@ struct wg_callback char *postfix; char escape_char; - _Bool store_rates; + unsigned int format_flags; char send_buf[WG_SEND_BUF_SIZE]; size_t send_buf_free; @@ -116,21 +109,22 @@ static void wg_reset_buffer (struct wg_callback *cb) static int wg_send_buffer (struct wg_callback *cb) { - int status = 0; + ssize_t status = 0; - status = write (cb->sock_fd, cb->send_buf, strlen (cb->send_buf)); + status = swrite (cb->sock_fd, cb->send_buf, strlen (cb->send_buf)); if (status < 0) { - ERROR ("write_graphite plugin: send failed with " - "status %i (%s)", - status, - strerror (errno)); + char errbuf[1024]; + ERROR ("write_graphite plugin: send failed with status %zi (%s)", + status, sstrerror (errno, errbuf, sizeof (errbuf))); + close (cb->sock_fd); cb->sock_fd = -1; return (-1); } + return (0); } @@ -289,186 +283,12 @@ static int wg_flush (cdtime_t timeout, return (status); } -static int wg_format_values (char *ret, size_t ret_len, - int ds_num, const data_set_t *ds, const value_list_t *vl, - _Bool store_rates) -{ - size_t offset = 0; - int status; - gauge_t *rates = NULL; - - assert (0 == strcmp (ds->type, vl->type)); - - memset (ret, 0, ret_len); - -#define BUFFER_ADD(...) do { \ - status = ssnprintf (ret + offset, ret_len - offset, \ - __VA_ARGS__); \ - if (status < 1) \ - { \ - sfree (rates); \ - return (-1); \ - } \ - else if (((size_t) status) >= (ret_len - offset)) \ - { \ - sfree (rates); \ - return (-1); \ - } \ - else \ - offset += ((size_t) status); \ -} while (0) - - if (ds->ds[ds_num].type == DS_TYPE_GAUGE) - BUFFER_ADD ("%f", vl->values[ds_num].gauge); - else if (store_rates) - { - if (rates == NULL) - rates = uc_get_rate (ds, vl); - if (rates == NULL) - { - WARNING ("format_values: " - "uc_get_rate failed."); - return (-1); - } - BUFFER_ADD ("%g", rates[ds_num]); - } - else if (ds->ds[ds_num].type == DS_TYPE_COUNTER) - BUFFER_ADD ("%llu", vl->values[ds_num].counter); - else if (ds->ds[ds_num].type == DS_TYPE_DERIVE) - BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive); - else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE) - BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute); - else - { - ERROR ("format_values plugin: Unknown data source type: %i", - ds->ds[ds_num].type); - sfree (rates); - return (-1); - } - -#undef BUFFER_ADD - - sfree (rates); - return (0); -} - -static void wg_copy_escape_part (char *dst, const char *src, size_t dst_len, - char escape_char) -{ - size_t i; - - memset (dst, 0, dst_len); - - if (src == NULL) - return; - - for (i = 0; i < dst_len; i++) - { - if (src[i] == 0) - { - dst[i] = 0; - break; - } - - if ((src[i] == '.') - || isspace ((int) src[i]) - || iscntrl ((int) src[i])) - dst[i] = escape_char; - else - dst[i] = src[i]; - } -} - -static int wg_format_name (char *ret, int ret_len, - const char *hostname, - const char *plugin, const char *plugin_instance, - const char *type, const char *type_instance, - const char *prefix, const char *postfix, - const char *ds_name, char escape_char) -{ - char n_hostname[DATA_MAX_NAME_LEN]; - char n_plugin[DATA_MAX_NAME_LEN]; - char n_plugin_instance[DATA_MAX_NAME_LEN]; - char n_type[DATA_MAX_NAME_LEN]; - char n_type_instance[DATA_MAX_NAME_LEN]; - int status; - - assert (hostname != NULL); - assert (plugin != NULL); - assert (type != NULL); - assert (ds_name != NULL); - - if (prefix == NULL) - prefix = ""; - - if (postfix == NULL) - postfix = ""; - - wg_copy_escape_part (n_hostname, hostname, - sizeof (n_hostname), escape_char); - wg_copy_escape_part (n_plugin, plugin, - sizeof (n_plugin), escape_char); - wg_copy_escape_part (n_plugin_instance, plugin_instance, - sizeof (n_plugin_instance), escape_char); - wg_copy_escape_part (n_type, type, - sizeof (n_type), escape_char); - wg_copy_escape_part (n_type_instance, type_instance, - sizeof (n_type_instance), escape_char); - - if (n_plugin_instance[0] == '\0') - { - if (n_type_instance[0] == '\0') - { - status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s", - prefix, n_hostname, postfix, n_plugin, n_type, ds_name); - } - else - { - status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s-%s.%s", - prefix, n_hostname, postfix, n_plugin, n_type, - n_type_instance, ds_name); - } - } - else - { - if (n_type_instance[0] == '\0') - { - status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s.%s", - prefix, n_hostname, postfix, n_plugin, - n_plugin_instance, n_type, ds_name); - } - else - { - status = ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s-%s.%s", - prefix, n_hostname, postfix, n_plugin, - n_plugin_instance, n_type, n_type_instance, ds_name); - } - } - - if ((status < 1) || (status >= ret_len)) - return (-1); - - return (0); -} - -static int wg_send_message (const char* key, const char* value, - cdtime_t time, struct wg_callback *cb) +static int wg_send_message (char const *message, struct wg_callback *cb) { int status; size_t message_len; - char message[1024]; - - message_len = (size_t) ssnprintf (message, sizeof (message), - "%s %s %.0f\n", - key, - value, - CDTIME_T_TO_DOUBLE(time)); - if (message_len >= sizeof (message)) { - ERROR ("write_graphite plugin: message buffer too small: " - "Need %zu bytes.", message_len + 1); - return (-1); - } + message_len = strlen (message); pthread_mutex_lock (&cb->send_lock); @@ -492,6 +312,8 @@ static int wg_send_message (const char* key, const char* value, return (status); } } + + /* Assert that we have enough space for this message. */ assert (message_len < cb->send_buf_free); /* `message_len + 1' because `message_len' does not include the @@ -501,14 +323,13 @@ static int wg_send_message (const char* key, const char* value, cb->send_buf_fill += message_len; cb->send_buf_free -= message_len; - DEBUG ("write_graphite plugin: <%s:%s> buf %zu/%zu (%g%%) \"%s\"", + DEBUG ("write_graphite plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"", cb->node, cb->service, cb->send_buf_fill, sizeof (cb->send_buf), 100.0 * ((double) cb->send_buf_fill) / ((double) sizeof (cb->send_buf)), message); - /* Check if we have enough space for this message. */ pthread_mutex_unlock (&cb->send_lock); return (0); @@ -517,10 +338,8 @@ static int wg_send_message (const char* key, const char* value, static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, struct wg_callback *cb) { - char key[10*DATA_MAX_NAME_LEN]; - char values[512]; - - int status, i; + char buffer[4096]; + int status; if (0 != strcmp (ds->type, vl->type)) { @@ -529,74 +348,22 @@ static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, return -1; } - if (ds->ds_num > 1) - { - for (i = 0; i < ds->ds_num; i++) - { - /* Copy the identifier to `key' and escape it. */ - status = WG_FORMAT_NAME (key, sizeof (key), vl, cb, ds->ds[i].name); - if (status != 0) - { - ERROR ("write_graphite plugin: error with format_name"); - return (status); - } - - escape_string (key, sizeof (key)); - /* Convert the values to an ASCII representation and put that - * into `values'. */ - status = wg_format_values (values, sizeof (values), i, ds, vl, - cb->store_rates); - if (status != 0) - { - ERROR ("write_graphite plugin: error with " - "wg_format_values"); - return (status); - } - - /* Send the message to graphite */ - status = wg_send_message (key, values, vl->time, cb); - if (status != 0) - { - ERROR ("write_graphite plugin: error with " - "wg_send_message"); - return (status); - } - } - } - else - { - /* Copy the identifier to `key' and escape it. */ - status = WG_FORMAT_NAME (key, sizeof (key), vl, cb, NULL); - if (status != 0) - { - ERROR ("write_graphite plugin: error with format_name"); - return (status); - } - - escape_string (key, sizeof (key)); - /* Convert the values to an ASCII representation and put that into - * `values'. */ - status = wg_format_values (values, sizeof (values), 0, ds, vl, - cb->store_rates); - if (status != 0) - { - ERROR ("write_graphite plugin: error with " - "wg_format_values"); - return (status); - } + memset (buffer, 0, sizeof (buffer)); + status = format_graphite (buffer, sizeof (buffer), ds, vl, + cb->prefix, cb->postfix, cb->escape_char, cb->format_flags); + if (status != 0) /* error message has been printed already. */ + return (status); - /* Send the message to graphite */ - status = wg_send_message (key, values, vl->time, cb); - if (status != 0) - { - ERROR ("write_graphite plugin: error with " - "wg_send_message"); - return (status); - } + wg_send_message (buffer, cb); + if (status != 0) + { + ERROR ("write_graphite plugin: wg_send_message failed " + "with status %i.", status); + return (status); } return (0); -} +} /* int wg_write_messages */ static int wg_write (const data_set_t *ds, const value_list_t *vl, user_data_t *user_data) @@ -605,7 +372,7 @@ static int wg_write (const data_set_t *ds, const value_list_t *vl, int status; if (user_data == NULL) - return (-EINVAL); + return (EINVAL); cb = user_data->data; @@ -617,14 +384,30 @@ static int wg_write (const data_set_t *ds, const value_list_t *vl, static int config_set_char (char *dest, oconfig_item_t *ci) { - if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING)) + char buffer[4]; + int status; + + memset (buffer, 0, sizeof (buffer)); + + status = cf_util_get_string_buffer (ci, buffer, sizeof (buffer)); + if (status != 0) + return (status); + + if (buffer[0] == 0) { - WARNING ("write_graphite plugin: The `%s' config option " - "needs exactly one string argument.", ci->key); + ERROR ("write_graphite plugin: Cannot use an empty string for the " + "\"EscapeCharacter\" option."); return (-1); } - *dest = ci->values[0].value.string[0]; + if (buffer[1] != 0) + { + WARNING ("write_graphite plugin: Only the first character of the " + "\"EscapeCharacter\" option ('%c') will be used.", + (int) buffer[0]); + } + + *dest = buffer[0]; return (0); } @@ -633,6 +416,7 @@ static int wg_config_carbon (oconfig_item_t *ci) { struct wg_callback *cb; user_data_t user_data; + char callback_name[DATA_MAX_NAME_LEN]; int i; cb = malloc (sizeof (*cb)); @@ -648,6 +432,7 @@ static int wg_config_carbon (oconfig_item_t *ci) cb->prefix = NULL; cb->postfix = NULL; cb->escape_char = WG_DEFAULT_ESCAPE; + cb->format_flags = GRAPHITE_STORE_RATES; pthread_mutex_init (&cb->send_lock, /* attr = */ NULL); @@ -664,7 +449,14 @@ static int wg_config_carbon (oconfig_item_t *ci) else if (strcasecmp ("Postfix", child->key) == 0) cf_util_get_string (child, &cb->postfix); else if (strcasecmp ("StoreRates", child->key) == 0) - cf_util_get_boolean (child, &cb->store_rates); + cf_util_get_flag (child, &cb->format_flags, + GRAPHITE_STORE_RATES); + else if (strcasecmp ("SeparateInstances", child->key) == 0) + cf_util_get_flag (child, &cb->format_flags, + GRAPHITE_SEPARATE_INSTANCES); + else if (strcasecmp ("AlwaysAppendDS", child->key) == 0) + cf_util_get_flag (child, &cb->format_flags, + GRAPHITE_ALWAYS_APPEND_DS); else if (strcasecmp ("EscapeCharacter", child->key) == 0) config_set_char (&cb->escape_char, child); else @@ -674,17 +466,17 @@ static int wg_config_carbon (oconfig_item_t *ci) } } - DEBUG ("write_graphite: Registering write callback to carbon agent %s:%s", - cb->node ? cb->node : WG_DEFAULT_NODE, - cb->service ? cb->service : WG_DEFAULT_SERVICE); + ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s/%s", + cb->node != NULL ? cb->node : WG_DEFAULT_NODE, + cb->service != NULL ? cb->service : WG_DEFAULT_SERVICE); memset (&user_data, 0, sizeof (user_data)); user_data.data = cb; - user_data.free_func = NULL; - plugin_register_flush ("write_graphite", wg_flush, &user_data); - user_data.free_func = wg_callback_free; - plugin_register_write ("write_graphite", wg_write, &user_data); + plugin_register_write (callback_name, wg_write, &user_data); + + user_data.free_func = NULL; + plugin_register_flush (callback_name, wg_flush, &user_data); return (0); }