X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fwrite_graphite.c;h=41451a8cee8f8d0bab4dda5d81361b3efd30a229;hb=ca316d91e178412604ea8462dc60a8bc32cbfc87;hp=a3bead39b49147a8a72fca8ebf9c16489d095464;hpb=4bfd1fe12c8b7d1e212c04d2e0a069d37a2ffe0c;p=collectd.git diff --git a/src/write_graphite.c b/src/write_graphite.c index a3bead39..41451a8c 100644 --- a/src/write_graphite.c +++ b/src/write_graphite.c @@ -35,6 +35,8 @@ * * Host "localhost" * Port "2003" + * Protocol "udp" + * LogSendErrors true * Prefix "collectd" * * @@ -47,7 +49,7 @@ #include "utils_cache.h" #include "utils_complain.h" -#include "utils_parse_option.h" +#include "utils_format_graphite.h" /* Folks without pthread will need to disable this plugin. */ #include @@ -63,6 +65,14 @@ # define WG_DEFAULT_SERVICE "2003" #endif +#ifndef WG_DEFAULT_PROTOCOL +# define WG_DEFAULT_PROTOCOL "tcp" +#endif + +#ifndef WG_DEFAULT_LOG_SEND_ERRORS +# define WG_DEFAULT_LOG_SEND_ERRORS 1 +#endif + #ifndef WG_DEFAULT_ESCAPE # define WG_DEFAULT_ESCAPE '_' #endif @@ -72,6 +82,10 @@ # define WG_SEND_BUF_SIZE 1428 #endif +#ifndef WG_MIN_RECONNECT_INTERVAL +# define WG_MIN_RECONNECT_INTERVAL TIME_T_TO_CDTIME_T (1) +#endif + /* * Private variables */ @@ -79,15 +93,17 @@ struct wg_callback { int sock_fd; + char *name; + char *node; char *service; + char *protocol; + _Bool log_send_errors; char *prefix; char *postfix; char escape_char; - _Bool store_rates; - _Bool separate_instances; - _Bool always_append_ds; + unsigned int format_flags; char send_buf[WG_SEND_BUF_SIZE]; size_t send_buf_free; @@ -96,6 +112,7 @@ struct wg_callback pthread_mutex_t send_lock; c_complain_t init_complaint; + cdtime_t last_connect_time; }; @@ -117,10 +134,15 @@ static int wg_send_buffer (struct wg_callback *cb) status = swrite (cb->sock_fd, cb->send_buf, strlen (cb->send_buf)); if (status < 0) { - char errbuf[1024]; - ERROR ("write_graphite plugin: send failed with status %zi (%s)", - status, sstrerror (errno, errbuf, sizeof (errbuf))); + const char *protocol = cb->protocol ? cb->protocol : WG_DEFAULT_PROTOCOL; + if (cb->log_send_errors) + { + char errbuf[1024]; + ERROR ("write_graphite plugin: send to %s:%s (%s) failed with status %zi (%s)", + cb->node, cb->service, protocol, + status, sstrerror (errno, errbuf, sizeof (errbuf))); + } close (cb->sock_fd); cb->sock_fd = -1; @@ -168,28 +190,43 @@ static int wg_callback_init (struct wg_callback *cb) struct addrinfo ai_hints; struct addrinfo *ai_list; struct addrinfo *ai_ptr; + cdtime_t now; int status; const char *node = cb->node ? cb->node : WG_DEFAULT_NODE; const char *service = cb->service ? cb->service : WG_DEFAULT_SERVICE; + const char *protocol = cb->protocol ? cb->protocol : WG_DEFAULT_PROTOCOL; + + char connerr[1024] = ""; if (cb->sock_fd > 0) return (0); + /* Don't try to reconnect too often. By default, one reconnection attempt + * is made per second. */ + now = cdtime (); + if ((now - cb->last_connect_time) < WG_MIN_RECONNECT_INTERVAL) + return (EAGAIN); + cb->last_connect_time = now; + memset (&ai_hints, 0, sizeof (ai_hints)); #ifdef AI_ADDRCONFIG ai_hints.ai_flags |= AI_ADDRCONFIG; #endif ai_hints.ai_family = AF_UNSPEC; - ai_hints.ai_socktype = SOCK_STREAM; + + if (0 == strcasecmp ("tcp", protocol)) + ai_hints.ai_socktype = SOCK_STREAM; + else + ai_hints.ai_socktype = SOCK_DGRAM; ai_list = NULL; status = getaddrinfo (node, service, &ai_hints, &ai_list); if (status != 0) { - ERROR ("write_graphite plugin: getaddrinfo (%s, %s) failed: %s", - node, service, gai_strerror (status)); + ERROR ("write_graphite plugin: getaddrinfo (%s, %s, %s) failed: %s", + node, service, protocol, gai_strerror (status)); return (-1); } @@ -198,12 +235,19 @@ static int wg_callback_init (struct wg_callback *cb) { cb->sock_fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); - if (cb->sock_fd < 0) + if (cb->sock_fd < 0) { + char errbuf[1024]; + snprintf (connerr, sizeof (connerr), "failed to open socket: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); continue; + } status = connect (cb->sock_fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); if (status != 0) { + char errbuf[1024]; + snprintf (connerr, sizeof (connerr), "failed to connect to remote " + "host: %s", sstrerror (errno, errbuf, sizeof (errbuf))); close (cb->sock_fd); cb->sock_fd = -1; continue; @@ -216,19 +260,19 @@ static int wg_callback_init (struct wg_callback *cb) if (cb->sock_fd < 0) { - char errbuf[1024]; + if (connerr[0] == '\0') + /* this should not happen but try to get a message anyway */ + sstrerror (errno, connerr, sizeof (connerr)); c_complain (LOG_ERR, &cb->init_complaint, - "write_graphite plugin: Connecting to %s:%s failed. " - "The last error was: %s", node, service, - sstrerror (errno, errbuf, sizeof (errbuf))); - close (cb->sock_fd); + "write_graphite plugin: Connecting to %s:%s via %s failed. " + "The last error was: %s", node, service, protocol, connerr); return (-1); } else { c_release (LOG_INFO, &cb->init_complaint, - "write_graphite plugin: Successfully connected to %s:%s.", - node, service); + "write_graphite plugin: Successfully connected to %s:%s via %s.", + node, service, protocol); } wg_reset_buffer (cb); @@ -249,10 +293,15 @@ static void wg_callback_free (void *data) wg_flush_nolock (/* timeout = */ 0, cb); - close(cb->sock_fd); - cb->sock_fd = -1; + if (cb->sock_fd >= 0) + { + close (cb->sock_fd); + cb->sock_fd = -1; + } + sfree(cb->name); sfree(cb->node); + sfree(cb->protocol); sfree(cb->service); sfree(cb->prefix); sfree(cb->postfix); @@ -293,175 +342,12 @@ static int wg_flush (cdtime_t timeout, return (status); } -static int wg_format_values (char *ret, size_t ret_len, - int ds_num, const data_set_t *ds, const value_list_t *vl, - _Bool store_rates) -{ - size_t offset = 0; - int status; - gauge_t *rates = NULL; - - assert (0 == strcmp (ds->type, vl->type)); - - memset (ret, 0, ret_len); - -#define BUFFER_ADD(...) do { \ - status = ssnprintf (ret + offset, ret_len - offset, \ - __VA_ARGS__); \ - if (status < 1) \ - { \ - sfree (rates); \ - return (-1); \ - } \ - else if (((size_t) status) >= (ret_len - offset)) \ - { \ - sfree (rates); \ - return (-1); \ - } \ - else \ - offset += ((size_t) status); \ -} while (0) - - if (ds->ds[ds_num].type == DS_TYPE_GAUGE) - BUFFER_ADD ("%f", vl->values[ds_num].gauge); - else if (store_rates) - { - if (rates == NULL) - rates = uc_get_rate (ds, vl); - if (rates == NULL) - { - WARNING ("format_values: " - "uc_get_rate failed."); - return (-1); - } - BUFFER_ADD ("%g", rates[ds_num]); - } - else if (ds->ds[ds_num].type == DS_TYPE_COUNTER) - BUFFER_ADD ("%llu", vl->values[ds_num].counter); - else if (ds->ds[ds_num].type == DS_TYPE_DERIVE) - BUFFER_ADD ("%"PRIi64, vl->values[ds_num].derive); - else if (ds->ds[ds_num].type == DS_TYPE_ABSOLUTE) - BUFFER_ADD ("%"PRIu64, vl->values[ds_num].absolute); - else - { - ERROR ("format_values plugin: Unknown data source type: %i", - ds->ds[ds_num].type); - sfree (rates); - return (-1); - } - -#undef BUFFER_ADD - - sfree (rates); - return (0); -} - -static void wg_copy_escape_part (char *dst, const char *src, size_t dst_len, - char escape_char) -{ - size_t i; - - memset (dst, 0, dst_len); - - if (src == NULL) - return; - - for (i = 0; i < dst_len; i++) - { - if (src[i] == 0) - { - dst[i] = 0; - break; - } - - if ((src[i] == '.') - || isspace ((int) src[i]) - || iscntrl ((int) src[i])) - dst[i] = escape_char; - else - dst[i] = src[i]; - } -} - -static int wg_format_name (char *ret, int ret_len, - const value_list_t *vl, - const struct wg_callback *cb, - const char *ds_name) -{ - char n_host[DATA_MAX_NAME_LEN]; - char n_plugin[DATA_MAX_NAME_LEN]; - char n_plugin_instance[DATA_MAX_NAME_LEN]; - char n_type[DATA_MAX_NAME_LEN]; - char n_type_instance[DATA_MAX_NAME_LEN]; - - char *prefix; - char *postfix; - - char tmp_plugin[2 * DATA_MAX_NAME_LEN + 1]; - char tmp_type[2 * DATA_MAX_NAME_LEN + 1]; - - prefix = cb->prefix; - if (prefix == NULL) - prefix = ""; - - postfix = cb->postfix; - if (postfix == NULL) - postfix = ""; - - wg_copy_escape_part (n_host, vl->host, - sizeof (n_host), cb->escape_char); - wg_copy_escape_part (n_plugin, vl->plugin, - sizeof (n_plugin), cb->escape_char); - wg_copy_escape_part (n_plugin_instance, vl->plugin_instance, - sizeof (n_plugin_instance), cb->escape_char); - wg_copy_escape_part (n_type, vl->type, - sizeof (n_type), cb->escape_char); - wg_copy_escape_part (n_type_instance, vl->type_instance, - sizeof (n_type_instance), cb->escape_char); - - if (n_plugin_instance[0] != '\0') - ssnprintf (tmp_plugin, sizeof (tmp_plugin), "%s%c%s", - n_plugin, - cb->separate_instances ? '.' : '-', - n_plugin_instance); - else - sstrncpy (tmp_plugin, n_plugin, sizeof (tmp_plugin)); - - if (n_type_instance[0] != '\0') - ssnprintf (tmp_type, sizeof (tmp_type), "%s%c%s", - n_type, - cb->separate_instances ? '.' : '-', - n_type_instance); - else - sstrncpy (tmp_type, n_type, sizeof (tmp_type)); - - if (ds_name != NULL) - ssnprintf (ret, ret_len, "%s%s%s.%s.%s.%s", - prefix, n_host, postfix, tmp_plugin, tmp_type, ds_name); - else - ssnprintf (ret, ret_len, "%s%s%s.%s.%s", - prefix, n_host, postfix, tmp_plugin, tmp_type); - - return (0); -} - -static int wg_send_message (const char* key, const char* value, - cdtime_t time, struct wg_callback *cb) +static int wg_send_message (char const *message, struct wg_callback *cb) { int status; size_t message_len; - char message[1024]; - - message_len = (size_t) ssnprintf (message, sizeof (message), - "%s %s %u\r\n", - key, - value, - (unsigned int) CDTIME_T_TO_TIME_T (time)); - if (message_len >= sizeof (message)) { - ERROR ("write_graphite plugin: message buffer too small: " - "Need %zu bytes.", message_len + 1); - return (-1); - } + + message_len = strlen (message); pthread_mutex_lock (&cb->send_lock); @@ -496,9 +382,10 @@ static int wg_send_message (const char* key, const char* value, cb->send_buf_fill += message_len; cb->send_buf_free -= message_len; - DEBUG ("write_graphite plugin: [%s]:%s buf %zu/%zu (%.1f %%) \"%s\"", + DEBUG ("write_graphite plugin: [%s]:%s (%s) buf %zu/%zu (%.1f %%) \"%s\"", cb->node, cb->service, + cb->protocol, cb->send_buf_fill, sizeof (cb->send_buf), 100.0 * ((double) cb->send_buf_fill) / ((double) sizeof (cb->send_buf)), message); @@ -511,10 +398,8 @@ static int wg_send_message (const char* key, const char* value, static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, struct wg_callback *cb) { - char key[10*DATA_MAX_NAME_LEN]; - char values[512]; - - int status, i; + char buffer[WG_SEND_BUF_SIZE]; + int status; if (0 != strcmp (ds->type, vl->type)) { @@ -523,44 +408,19 @@ static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, return -1; } - for (i = 0; i < ds->ds_num; i++) - { - const char *ds_name = NULL; - - if (cb->always_append_ds || (ds->ds_num > 1)) - ds_name = ds->ds[i].name; - - /* Copy the identifier to `key' and escape it. */ - status = wg_format_name (key, sizeof (key), vl, cb, ds_name); - if (status != 0) - { - ERROR ("write_graphite plugin: error with format_name"); - return (status); - } - - escape_string (key, sizeof (key)); - /* Convert the values to an ASCII representation and put that into - * `values'. */ - status = wg_format_values (values, sizeof (values), i, ds, vl, - cb->store_rates); - if (status != 0) - { - ERROR ("write_graphite plugin: error with " - "wg_format_values"); - return (status); - } + memset (buffer, 0, sizeof (buffer)); + status = format_graphite (buffer, sizeof (buffer), ds, vl, + cb->prefix, cb->postfix, cb->escape_char, cb->format_flags); + if (status != 0) /* error message has been printed already. */ + return (status); - /* Send the message to graphite */ - status = wg_send_message (key, values, vl->time, cb); - if (status != 0) - { - /* An error message has already been printed. */ - return (status); - } - } + /* Send the message to graphite */ + status = wg_send_message (buffer, cb); + if (status != 0) /* error message has been printed already. */ + return (status); return (0); -} +} /* int wg_write_messages */ static int wg_write (const data_set_t *ds, const value_list_t *vl, user_data_t *user_data) @@ -609,12 +469,13 @@ static int config_set_char (char *dest, return (0); } -static int wg_config_carbon (oconfig_item_t *ci) +static int wg_config_node (oconfig_item_t *ci) { struct wg_callback *cb; user_data_t user_data; char callback_name[DATA_MAX_NAME_LEN]; int i; + int status = 0; cb = malloc (sizeof (*cb)); if (cb == NULL) @@ -624,12 +485,26 @@ static int wg_config_carbon (oconfig_item_t *ci) } memset (cb, 0, sizeof (*cb)); cb->sock_fd = -1; + cb->name = NULL; cb->node = NULL; cb->service = NULL; + cb->protocol = NULL; + cb->log_send_errors = WG_DEFAULT_LOG_SEND_ERRORS; cb->prefix = NULL; cb->postfix = NULL; cb->escape_char = WG_DEFAULT_ESCAPE; - cb->store_rates = 1; + cb->format_flags = GRAPHITE_STORE_RATES; + + /* FIXME: Legacy configuration syntax. */ + if (strcasecmp ("Carbon", ci->key) != 0) + { + status = cf_util_get_string (ci, &cb->name); + if (status != 0) + { + wg_callback_free (cb); + return (status); + } + } pthread_mutex_init (&cb->send_lock, /* attr = */ NULL); C_COMPLAIN_INIT (&cb->init_complaint); @@ -642,28 +517,61 @@ static int wg_config_carbon (oconfig_item_t *ci) cf_util_get_string (child, &cb->node); else if (strcasecmp ("Port", child->key) == 0) cf_util_get_service (child, &cb->service); + else if (strcasecmp ("Protocol", child->key) == 0) + { + cf_util_get_string (child, &cb->protocol); + + if (strcasecmp ("UDP", cb->protocol) != 0 && + strcasecmp ("TCP", cb->protocol) != 0) + { + ERROR ("write_graphite plugin: Unknown protocol (%s)", + cb->protocol); + status = -1; + } + } + else if (strcasecmp ("LogSendErrors", child->key) == 0) + cf_util_get_boolean (child, &cb->log_send_errors); else if (strcasecmp ("Prefix", child->key) == 0) cf_util_get_string (child, &cb->prefix); else if (strcasecmp ("Postfix", child->key) == 0) cf_util_get_string (child, &cb->postfix); else if (strcasecmp ("StoreRates", child->key) == 0) - cf_util_get_boolean (child, &cb->store_rates); + cf_util_get_flag (child, &cb->format_flags, + GRAPHITE_STORE_RATES); else if (strcasecmp ("SeparateInstances", child->key) == 0) - cf_util_get_boolean (child, &cb->separate_instances); + cf_util_get_flag (child, &cb->format_flags, + GRAPHITE_SEPARATE_INSTANCES); else if (strcasecmp ("AlwaysAppendDS", child->key) == 0) - cf_util_get_boolean (child, &cb->always_append_ds); + cf_util_get_flag (child, &cb->format_flags, + GRAPHITE_ALWAYS_APPEND_DS); else if (strcasecmp ("EscapeCharacter", child->key) == 0) config_set_char (&cb->escape_char, child); else { ERROR ("write_graphite plugin: Invalid configuration " "option: %s.", child->key); + status = -1; } + + if (status != 0) + break; } - ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s/%s", - cb->node != NULL ? cb->node : WG_DEFAULT_NODE, - cb->service != NULL ? cb->service : WG_DEFAULT_SERVICE); + if (status != 0) + { + wg_callback_free (cb); + return (status); + } + + /* FIXME: Legacy configuration syntax. */ + if (cb->name == NULL) + ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s/%s/%s", + cb->node != NULL ? cb->node : WG_DEFAULT_NODE, + cb->service != NULL ? cb->service : WG_DEFAULT_SERVICE, + cb->protocol != NULL ? cb->protocol : WG_DEFAULT_PROTOCOL); + else + ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s", + cb->name); memset (&user_data, 0, sizeof (user_data)); user_data.data = cb; @@ -684,8 +592,11 @@ static int wg_config (oconfig_item_t *ci) { oconfig_item_t *child = ci->children + i; - if (strcasecmp ("Carbon", child->key) == 0) - wg_config_carbon (child); + if (strcasecmp ("Node", child->key) == 0) + wg_config_node (child); + /* FIXME: Remove this legacy mode in version 6. */ + else if (strcasecmp ("Carbon", child->key) == 0) + wg_config_node (child); else { ERROR ("write_graphite plugin: Invalid configuration "