X-Git-Url: https://git.octo.it/?a=blobdiff_plain;f=src%2Fwrite_graphite.c;h=2dce2d716cfb64e8e2802eb1f9e18564979b92d9;hb=5b4053d27b6a24e2f18e678f0d8c3343b7dfad7c;hp=e65f3d8a4ca822f6966b0f75aed5fdc088945cfc;hpb=6b94b7b11db35ec179dc447ddb2ceba6eec97d37;p=collectd.git diff --git a/src/write_graphite.c b/src/write_graphite.c index e65f3d8a..2dce2d71 100644 --- a/src/write_graphite.c +++ b/src/write_graphite.c @@ -43,17 +43,14 @@ */ #include "collectd.h" + #include "common.h" #include "plugin.h" #include "configfile.h" -#include "utils_cache.h" #include "utils_complain.h" #include "utils_format_graphite.h" -/* Folks without pthread will need to disable this plugin. */ -#include - #include #define WG_DEFAULT_NODE "localhost" @@ -95,35 +92,35 @@ struct wg_callback c_complain_t init_complaint; cdtime_t last_connect_time; - /*Force reconnect useful for load balanced environments*/ - cdtime_t last_force_reconnect_time; - int force_reconnect_timeout; - int conn_forced_closed; + /* Force reconnect useful for load balanced environments */ + cdtime_t last_reconnect_time; + cdtime_t reconnect_interval; + _Bool reconnect_interval_reached; }; -/* -* Force Reconnect functions -*/ - -static void wg_force_reconnect_check(struct wg_callback *cb) +/* wg_force_reconnect_check closes cb->sock_fd when it was open for longer + * than cb->reconnect_interval. Must hold cb->send_lock when calling. */ +static void wg_force_reconnect_check (struct wg_callback *cb) { cdtime_t now; - if(!cb->force_reconnect_timeout) return; - //check if address changes if addr_timeout + + if (cb->reconnect_interval == 0) + return; + + /* check if address changes if addr_timeout */ now = cdtime (); - DEBUG("wg_force_reconnect_check: now %ld last: %ld ",CDTIME_T_TO_TIME_T(now),CDTIME_T_TO_TIME_T(cb->last_force_reconnect_time)); - if ((now - cb->last_force_reconnect_time) < TIME_T_TO_CDTIME_T(cb->force_reconnect_timeout)){ - return; - } - //here we should close connection on next + if ((now - cb->last_reconnect_time) < cb->reconnect_interval) + return; + + /* here we should close connection on next */ close (cb->sock_fd); cb->sock_fd = -1; - INFO("Connection Forced closed after %ld seconds ",CDTIME_T_TO_TIME_T(now - cb->last_force_reconnect_time)); - cb->last_force_reconnect_time = now; - cb->conn_forced_closed=1; -} - + cb->last_reconnect_time = now; + cb->reconnect_interval_reached = 1; + INFO ("write_graphite plugin: Connection closed after %.3f seconds.", + CDTIME_T_TO_DOUBLE (now - cb->last_reconnect_time)); +} /* * Functions @@ -138,10 +135,13 @@ static void wg_reset_buffer (struct wg_callback *cb) static int wg_send_buffer (struct wg_callback *cb) { - ssize_t status = 0; + ssize_t status; + + if (cb->sock_fd < 0) + return (-1); status = swrite (cb->sock_fd, cb->send_buf, strlen (cb->send_buf)); - if (status < 0) + if (status != 0) { if (cb->log_send_errors) { @@ -180,7 +180,7 @@ static int wg_flush_nolock (cdtime_t timeout, struct wg_callback *cb) return (0); } - if (cb->send_buf_fill <= 0) + if (cb->send_buf_fill == 0) { cb->send_buf_init_time = cdtime (); return (0); @@ -194,9 +194,7 @@ static int wg_flush_nolock (cdtime_t timeout, struct wg_callback *cb) static int wg_callback_init (struct wg_callback *cb) { - struct addrinfo ai_hints; struct addrinfo *ai_list; - struct addrinfo *ai_ptr; cdtime_t now; int status; @@ -212,19 +210,16 @@ static int wg_callback_init (struct wg_callback *cb) return (EAGAIN); cb->last_connect_time = now; - memset (&ai_hints, 0, sizeof (ai_hints)); -#ifdef AI_ADDRCONFIG - ai_hints.ai_flags |= AI_ADDRCONFIG; -#endif - ai_hints.ai_family = AF_UNSPEC; + struct addrinfo ai_hints = { + .ai_family = AF_UNSPEC, + .ai_flags = AI_ADDRCONFIG + }; if (0 == strcasecmp ("tcp", cb->protocol)) ai_hints.ai_socktype = SOCK_STREAM; else ai_hints.ai_socktype = SOCK_DGRAM; - ai_list = NULL; - status = getaddrinfo (cb->node, cb->service, &ai_hints, &ai_list); if (status != 0) { @@ -234,7 +229,7 @@ static int wg_callback_init (struct wg_callback *cb) } assert (ai_list != NULL); - for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + for (struct addrinfo *ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) { cb->sock_fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); @@ -277,16 +272,15 @@ static int wg_callback_init (struct wg_callback *cb) "write_graphite plugin: Successfully connected to %s:%s via %s.", cb->node, cb->service, cb->protocol); } - if(!cb->conn_forced_closed || cb->send_buf_free== 0) - { - /*when not forced connection*/ - /*or buffer not initialized -- happens if forceReconnect happens before first connection*/ + + /* wg_force_reconnect_check does not flush the buffer before closing a + * sending socket, so only call wg_reset_buffer() if the socket was closed + * for a different reason (tracked in cb->reconnect_interval_reached). */ + if (!cb->reconnect_interval_reached || (cb->send_buf_free == 0)) wg_reset_buffer (cb); - } - else { - /*if forced connection don't reset buffer with valid metrics when reconnect*/ - cb->conn_forced_closed=0; - } + else + cb->reconnect_interval_reached = 0; + return (0); } @@ -361,7 +355,7 @@ static int wg_send_message (char const *message, struct wg_callback *cb) pthread_mutex_lock (&cb->send_lock); - wg_force_reconnect_check(cb); + wg_force_reconnect_check (cb); if (cb->sock_fd < 0) { @@ -408,7 +402,7 @@ static int wg_send_message (char const *message, struct wg_callback *cb) static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, struct wg_callback *cb) { - char buffer[WG_SEND_BUF_SIZE]; + char buffer[WG_SEND_BUF_SIZE] = { 0 }; int status; if (0 != strcmp (ds->type, vl->type)) @@ -418,7 +412,6 @@ static int wg_write_messages (const data_set_t *ds, const value_list_t *vl, return -1; } - memset (buffer, 0, sizeof (buffer)); status = format_graphite (buffer, sizeof (buffer), ds, vl, cb->prefix, cb->postfix, cb->escape_char, cb->format_flags); if (status != 0) /* error message has been printed already. */ @@ -451,11 +444,9 @@ static int wg_write (const data_set_t *ds, const value_list_t *vl, static int config_set_char (char *dest, oconfig_item_t *ci) { - char buffer[4]; + char buffer[4] = { 0 }; int status; - memset (buffer, 0, sizeof (buffer)); - status = cf_util_get_string_buffer (ci, buffer, sizeof (buffer)); if (status != 0) return (status); @@ -482,26 +473,24 @@ static int config_set_char (char *dest, static int wg_config_node (oconfig_item_t *ci) { struct wg_callback *cb; - user_data_t user_data; + user_data_t user_data = { 0 }; char callback_name[DATA_MAX_NAME_LEN]; - int i; int status = 0; - cb = malloc (sizeof (*cb)); + cb = calloc (1, sizeof (*cb)); if (cb == NULL) { - ERROR ("write_graphite plugin: malloc failed."); + ERROR ("write_graphite plugin: calloc failed."); return (-1); } - memset (cb, 0, sizeof (*cb)); cb->sock_fd = -1; cb->name = NULL; cb->node = strdup (WG_DEFAULT_NODE); cb->service = strdup (WG_DEFAULT_SERVICE); cb->protocol = strdup (WG_DEFAULT_PROTOCOL); - cb->last_force_reconnect_time=cdtime(); - cb->force_reconnect_timeout=0; - cb->conn_forced_closed=0; + cb->last_reconnect_time = cdtime(); + cb->reconnect_interval = 0; + cb->reconnect_interval_reached = 0; cb->log_send_errors = WG_DEFAULT_LOG_SEND_ERRORS; cb->prefix = NULL; cb->postfix = NULL; @@ -522,7 +511,7 @@ static int wg_config_node (oconfig_item_t *ci) pthread_mutex_init (&cb->send_lock, /* attr = */ NULL); C_COMPLAIN_INIT (&cb->init_complaint); - for (i = 0; i < ci->children_num; i++) + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i; @@ -542,8 +531,8 @@ static int wg_config_node (oconfig_item_t *ci) status = -1; } } - else if (strcasecmp ("ForceReconnectTimeout", child->key) == 0) - cf_util_get_int (child,&cb->force_reconnect_timeout); + else if (strcasecmp ("ReconnectInterval", child->key) == 0) + cf_util_get_cdtime (child, &cb->reconnect_interval); else if (strcasecmp ("LogSendErrors", child->key) == 0) cf_util_get_boolean (child, &cb->log_send_errors); else if (strcasecmp ("Prefix", child->key) == 0) @@ -586,7 +575,6 @@ static int wg_config_node (oconfig_item_t *ci) ssnprintf (callback_name, sizeof (callback_name), "write_graphite/%s", cb->name); - memset (&user_data, 0, sizeof (user_data)); user_data.data = cb; user_data.free_func = wg_callback_free; plugin_register_write (callback_name, wg_write, &user_data); @@ -599,9 +587,7 @@ static int wg_config_node (oconfig_item_t *ci) static int wg_config (oconfig_item_t *ci) { - int i; - - for (i = 0; i < ci->children_num; i++) + for (int i = 0; i < ci->children_num; i++) { oconfig_item_t *child = ci->children + i;