X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fwrite_graphite.c;h=0b8ab41c08bf10681c04987fd37c946c2dcb3a4c;hb=db35efb33e81d0a013e09a8a6ffa362ad5962f7c;hp=a7eef3fb435616ca6093207c385eb8ba90a104a2;hpb=2bda2a5648c87a2c5b8304238cd80ff17984c5cd;p=collectd.git diff --git a/src/write_graphite.c b/src/write_graphite.c index a7eef3fb..0b8ab41c 100644 --- a/src/write_graphite.c +++ b/src/write_graphite.c @@ -54,37 +54,18 @@ /* Folks without pthread will need to disable this plugin. */ #include -#include #include -#ifndef WG_DEFAULT_NODE -# define WG_DEFAULT_NODE "localhost" -#endif - -#ifndef WG_DEFAULT_SERVICE -# define WG_DEFAULT_SERVICE "2003" -#endif - -#ifndef WG_DEFAULT_PROTOCOL -# define WG_DEFAULT_PROTOCOL "tcp" -#endif - -#ifndef WG_DEFAULT_LOG_SEND_ERRORS -# define WG_DEFAULT_LOG_SEND_ERRORS 1 -#endif - -#ifndef WG_DEFAULT_ESCAPE -# define WG_DEFAULT_ESCAPE '_' -#endif +#define WG_DEFAULT_NODE "localhost" +#define WG_DEFAULT_SERVICE "2003" +#define WG_DEFAULT_PROTOCOL "tcp" +#define WG_DEFAULT_LOG_SEND_ERRORS 1 +#define WG_DEFAULT_ESCAPE '_' /* Ethernet - (IPv6 + TCP) = 1500 - (40 + 32) = 1428 */ -#ifndef WG_SEND_BUF_SIZE -# define WG_SEND_BUF_SIZE 1428 -#endif +#define WG_SEND_BUF_SIZE 1428 -#ifndef WG_MIN_RECONNECT_INTERVAL -# define WG_MIN_RECONNECT_INTERVAL TIME_T_TO_CDTIME_T (1) -#endif +#define WG_MIN_RECONNECT_INTERVAL TIME_T_TO_CDTIME_T (1) /* * Private variables @@ -113,8 +94,36 @@ struct wg_callback pthread_mutex_t send_lock; c_complain_t init_complaint; cdtime_t last_connect_time; + + /* Force reconnect useful for load balanced environments */ + cdtime_t last_reconnect_time; + cdtime_t reconnect_interval; + _Bool reconnect_interval_reached; }; +/* wg_force_reconnect_check closes cb->sock_fd when it was open for longer + * than cb->reconnect_interval. Must hold cb->send_lock when calling. */ +static void wg_force_reconnect_check (struct wg_callback *cb) +{ + cdtime_t now; + + if (cb->reconnect_interval == 0) + return; + + /* check if address changes if addr_timeout */ + now = cdtime (); + if ((now - cb->last_reconnect_time) < cb->reconnect_interval) + return; + + /* here we should close connection on next */ + close (cb->sock_fd); + cb->sock_fd = -1; + cb->last_reconnect_time = now; + cb->reconnect_interval_reached = 1; + + INFO ("write_graphite plugin: Connection closed after %.3f seconds.", + CDTIME_T_TO_DOUBLE (now - cb->last_reconnect_time)); +} /* * Functions @@ -134,13 +143,11 @@ static int wg_send_buffer (struct wg_callback *cb) status = swrite (cb->sock_fd, cb->send_buf, strlen (cb->send_buf)); if (status != 0) { - const char *protocol = cb->protocol ? cb->protocol : WG_DEFAULT_PROTOCOL; - if (cb->log_send_errors) { char errbuf[1024]; ERROR ("write_graphite plugin: send to %s:%s (%s) failed with status %zi (%s)", - cb->node, cb->service, protocol, + cb->node, cb->service, cb->protocol, status, sstrerror (errno, errbuf, sizeof (errbuf))); } @@ -173,7 +180,7 @@ static int wg_flush_nolock (cdtime_t timeout, struct wg_callback *cb) return (0); } - if (cb->send_buf_fill <= 0) + if (cb->send_buf_fill == 0) { cb->send_buf_init_time = cdtime (); return (0); @@ -193,10 +200,6 @@ static int wg_callback_init (struct wg_callback *cb) cdtime_t now; int status; - const char *node = cb->node ? cb->node : WG_DEFAULT_NODE; - const char *service = cb->service ? cb->service : WG_DEFAULT_SERVICE; - const char *protocol = cb->protocol ? cb->protocol : WG_DEFAULT_PROTOCOL; - char connerr[1024] = ""; if (cb->sock_fd > 0) @@ -215,18 +218,18 @@ static int wg_callback_init (struct wg_callback *cb) #endif ai_hints.ai_family = AF_UNSPEC; - if (0 == strcasecmp ("tcp", protocol)) + if (0 == strcasecmp ("tcp", cb->protocol)) ai_hints.ai_socktype = SOCK_STREAM; else ai_hints.ai_socktype = SOCK_DGRAM; ai_list = NULL; - status = getaddrinfo (node, service, &ai_hints, &ai_list); + status = getaddrinfo (cb->node, cb->service, &ai_hints, &ai_list); if (status != 0) { ERROR ("write_graphite plugin: getaddrinfo (%s, %s, %s) failed: %s", - node, service, protocol, gai_strerror (status)); + cb->node, cb->service, cb->protocol, gai_strerror (status)); return (-1); } @@ -265,17 +268,23 @@ static int wg_callback_init (struct wg_callback *cb) sstrerror (errno, connerr, sizeof (connerr)); c_complain (LOG_ERR, &cb->init_complaint, "write_graphite plugin: Connecting to %s:%s via %s failed. " - "The last error was: %s", node, service, protocol, connerr); + "The last error was: %s", cb->node, cb->service, cb->protocol, connerr); return (-1); } else { c_release (LOG_INFO, &cb->init_complaint, "write_graphite plugin: Successfully connected to %s:%s via %s.", - node, service, protocol); + cb->node, cb->service, cb->protocol); } - wg_reset_buffer (cb); + /* wg_force_reconnect_check does not flush the buffer before closing a + * sending socket, so only call wg_reset_buffer() if the socket was closed + * for a different reason (tracked in cb->reconnect_interval_reached). */ + if (!cb->reconnect_interval_reached || (cb->send_buf_free == 0)) + wg_reset_buffer (cb); + else + cb->reconnect_interval_reached = 0; return (0); } @@ -351,6 +360,8 @@ static int wg_send_message (char const *message, struct wg_callback *cb) pthread_mutex_lock (&cb->send_lock); + wg_force_reconnect_check (cb); + if (cb->sock_fd < 0) { status = wg_callback_init (cb); @@ -383,9 +394,7 @@ static int wg_send_message (char const *message, struct wg_callback *cb) cb->send_buf_free -= message_len; DEBUG ("write_graphite plugin: [%s]:%s (%s) buf %zu/%zu (%.1f %%) \"%s\"", - cb->node, - cb->service, - cb->protocol, + cb->node, cb->service, cb->protocol, cb->send_buf_fill, sizeof (cb->send_buf), 100.0 * ((double) cb->send_buf_fill) / ((double) sizeof (cb->send_buf)), message); @@ -477,18 +486,20 @@ static int wg_config_node (oconfig_item_t *ci) int i; int status = 0; - cb = malloc (sizeof (*cb)); + cb = calloc (1, sizeof (*cb)); if (cb == NULL) { - ERROR ("write_graphite plugin: malloc failed."); + ERROR ("write_graphite plugin: calloc failed."); return (-1); } - memset (cb, 0, sizeof (*cb)); cb->sock_fd = -1; cb->name = NULL; - cb->node = NULL; - cb->service = NULL; - cb->protocol = NULL; + cb->node = strdup (WG_DEFAULT_NODE); + cb->service = strdup (WG_DEFAULT_SERVICE); + cb->protocol = strdup (WG_DEFAULT_PROTOCOL); + cb->last_reconnect_time = cdtime(); + cb->reconnect_interval = 0; + cb->reconnect_interval_reached = 0; cb->log_send_errors = WG_DEFAULT_LOG_SEND_ERRORS; cb->prefix = NULL; cb->postfix = NULL; @@ -529,6 +540,8 @@ static int wg_config_node (oconfig_item_t *ci) status = -1; } } + else if (strcasecmp ("ReconnectInterval", child->key) == 0) + cf_util_get_cdtime (child, &cb->reconnect_interval); else if (strcasecmp ("LogSendErrors", child->key) == 0) cf_util_get_boolean (child, &cb->log_send_errors); else if (strcasecmp ("Prefix", child->key) == 0) @@ -566,9 +579,7 @@ static int wg_config_node (oconfig_item_t *ci) /* FIXME: Legacy configuration syntax. */ if (cb->name == NULL) ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s/%s/%s", - cb->node != NULL ? cb->node : WG_DEFAULT_NODE, - cb->service != NULL ? cb->service : WG_DEFAULT_SERVICE, - cb->protocol != NULL ? cb->protocol : WG_DEFAULT_PROTOCOL); + cb->node, cb->service, cb->protocol); else ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s", cb->name);