write_riemann: use the complain mechanism to rate limit connection failure logs
[collectd.git] / src / write_riemann.c
index f7b0388..fd82650 100644 (file)
@@ -37,6 +37,7 @@
 #include "common.h"
 #include "configfile.h"
 #include "utils_cache.h"
+#include "utils_complain.h"
 #include "write_riemann_threshold.h"
 
 #define RIEMANN_HOST           "localhost"
@@ -55,6 +56,8 @@ struct riemann_host {
        _Bool                    always_append_ds;
        char                    *node;
        int                      port;
+    c_complain_t init_complaint;
+    c_complain_t init_send_complaint;
        riemann_client_type_t    client_type;
        riemann_client_t        *client;
        double                   ttl_factor;
@@ -65,6 +68,7 @@ struct riemann_host {
        char                     *tls_ca_file;
        char                     *tls_cert_file;
        char                     *tls_key_file;
+       struct timeval timeout;
 };
 
 static char    **riemann_tags;
@@ -92,12 +96,25 @@ static int wrr_connect(struct riemann_host *host) /* {{{ */
                                             RIEMANN_CLIENT_OPTION_TLS_KEY_FILE, host->tls_key_file,
                                             RIEMANN_CLIENT_OPTION_NONE);
        if (host->client == NULL) {
-               WARNING("write_riemann plugin: Unable to connect to Riemann at %s:%d",
-                       node, port);
+        c_complain (LOG_ERR, &host->init_complaint,
+                    "write_riemann plugin: Unable to connect to Riemann at %s:%d",
+                    node, port);
                return -1;
        }
-       DEBUG("write_riemann plugin: got a successful connection for: %s:%d",
-             node, port);
+       if (host->timeout.tv_sec != 0) {
+               if (riemann_client_set_timeout(host->client, &host->timeout) != 0) {
+                       riemann_client_free(host->client);
+                       host->client = NULL;
+            c_complain (LOG_ERR, &host->init_complaint,
+                        "write_riemann plugin: Unable to connect to Riemann at %s:%d",
+                        node, port);
+                       return -1;
+               }
+       }
+
+    c_release (LOG_INFO, &host->init_complaint,
+               "write_riemann plugin: Successfully connected to %s:%d",
+               node, port);
 
        return 0;
 } /* }}} int wrr_connect */
@@ -125,8 +142,10 @@ static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
        pthread_mutex_lock (&host->lock);
 
        status = wrr_connect(host);
-       if (status != 0)
+       if (status != 0) {
+        pthread_mutex_unlock(&host->lock);
                return status;
+    }
 
        status = riemann_client_send_message(host->client, msg);
        if (status != 0) {
@@ -187,35 +206,25 @@ static riemann_message_t *wrr_notification_to_message(struct riemann_host *host,
                                     RIEMANN_EVENT_FIELD_NONE);
 
        if (n->host[0] != 0)
-               riemann_event_attribute_add(event,
-                                           riemann_attribute_create("host", n->host));
+               riemann_event_string_attribute_add(event, "host", n->host);
        if (n->plugin[0] != 0)
-               riemann_event_attribute_add(event,
-                                           riemann_attribute_create("plugin", n->plugin));
+               riemann_event_string_attribute_add(event, "plugin", n->plugin);
        if (n->plugin_instance[0] != 0)
-               riemann_event_attribute_add(event,
-                                           riemann_attribute_create("plugin_instance",
-                                                                    n->plugin_instance));
+               riemann_event_string_attribute_add(event, "plugin_instance", n->plugin_instance);
 
        if (n->type[0] != 0)
-               riemann_event_attribute_add(event,
-                                           riemann_attribute_create("type", n->type));
+               riemann_event_string_attribute_add(event, "type", n->type);
        if (n->type_instance[0] != 0)
-               riemann_event_attribute_add(event,
-                                           riemann_attribute_create("type_instance",
-                                                                    n->type_instance));
+               riemann_event_string_attribute_add(event, "type_instance", n->type_instance);
 
        for (i = 0; i < riemann_attrs_num; i += 2)
-               riemann_event_attribute_add(event,
-                                           riemann_attribute_create(riemann_attrs[i],
-                                                                    riemann_attrs[i +1]));
+               riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i+1]);
 
        for (i = 0; i < riemann_tags_num; i++)
                riemann_event_tag_add(event, riemann_tags[i]);
 
        if (n->message[0] != 0)
-               riemann_event_attribute_add(event,
-                                           riemann_attribute_create("description", n->message));
+               riemann_event_string_attribute_add(event, "description", n->message);
 
        /* Pull in values from threshold and add extra attributes */
        for (meta = n->meta; meta != NULL; meta = meta->next)
@@ -230,9 +239,7 @@ static riemann_message_t *wrr_notification_to_message(struct riemann_host *host,
                }
 
                if (meta->type == NM_TYPE_STRING) {
-                       riemann_event_attribute_add(event,
-                                                   riemann_attribute_create(meta->name,
-                                                                            meta->nm_value.nm_string));
+                       riemann_event_string_attribute_add(event, meta->name, meta->nm_value.nm_string);
                        continue;
                }
        }
@@ -294,10 +301,10 @@ static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {
                          RIEMANN_EVENT_FIELD_HOST, vl->host,
                          RIEMANN_EVENT_FIELD_TIME, (int64_t) CDTIME_T_TO_TIME_T(vl->time),
                          RIEMANN_EVENT_FIELD_TTL, (float) CDTIME_T_TO_DOUBLE(vl->interval) * host->ttl_factor,
-                         RIEMANN_EVENT_FIELD_ATTRIBUTES,
-                         riemann_attribute_create("plugin", vl->plugin),
-                         riemann_attribute_create("type", vl->type),
-                         riemann_attribute_create("ds_name", ds->ds[index].name),
+                         RIEMANN_EVENT_FIELD_STRING_ATTRIBUTES,
+                         "plugin", vl->plugin,
+                         "type", vl->type,
+                         "ds_name", ds->ds[index].name,
                          NULL,
                          RIEMANN_EVENT_FIELD_SERVICE, service_buffer,
                          RIEMANN_EVENT_FIELD_NONE);
@@ -325,13 +332,9 @@ static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {
        }
 
        if (vl->plugin_instance[0] != 0)
-               riemann_event_attribute_add(event,
-                                           riemann_attribute_create("plugin_instance",
-                                                                    vl->plugin_instance));
+               riemann_event_string_attribute_add(event, "plugin_instance", vl->plugin_instance);
        if (vl->type_instance[0] != 0)
-               riemann_event_attribute_add(event,
-                                           riemann_attribute_create("type_instance",
-                                                                    vl->type_instance));
+               riemann_event_string_attribute_add(event, "type_instance", vl->type_instance);
 
        if ((ds->ds[index].type != DS_TYPE_GAUGE) && (rates != NULL))
        {
@@ -339,28 +342,23 @@ static riemann_event_t *wrr_value_to_event(struct riemann_host const *host, /* {
 
                ssnprintf(ds_type, sizeof(ds_type), "%s:rate",
                          DS_TYPE_TO_STRING(ds->ds[index].type));
-               riemann_event_attribute_add(event,
-                                           riemann_attribute_create("ds_type", ds_type));
+               riemann_event_string_attribute_add(event, "ds_type", ds_type);
        }
        else
        {
-               riemann_event_attribute_add(event,
-                                           riemann_attribute_create("ds_type",
-                                                                    DS_TYPE_TO_STRING(ds->ds[index].type)));
+               riemann_event_string_attribute_add(event, "ds_type",
+                                       DS_TYPE_TO_STRING(ds->ds[index].type));
        }
 
        {
                char ds_index[DATA_MAX_NAME_LEN];
 
                ssnprintf(ds_index, sizeof(ds_index), "%zu", index);
-               riemann_event_attribute_add(event,
-                                           riemann_attribute_create("ds_index", ds_index));
+               riemann_event_string_attribute_add(event, "ds_index", ds_index);
        }
 
        for (i = 0; i < riemann_attrs_num; i += 2)
-               riemann_event_attribute_add(event,
-                                           riemann_attribute_create(riemann_attrs[i],
-                                                                    riemann_attrs[i +1]));
+               riemann_event_string_attribute_add(event, riemann_attrs[i], riemann_attrs[i +1]);
 
        for (i = 0; i < riemann_tags_num; i++)
                riemann_event_tag_add(event, riemann_tags[i]);
@@ -500,8 +498,11 @@ static int wrr_batch_flush(cdtime_t timeout,
        pthread_mutex_lock(&host->lock);
        status = wrr_batch_flush_nolock(timeout, host);
        if (status != 0)
-               ERROR("write_riemann plugin: riemann_client_send failed with status %i",
-                     status);
+        c_complain (LOG_ERR, &host->init_complaint,
+                    "write_riemann plugin: riemann_client_send failed with status %i",
+                    status);
+    else
+        c_release (LOG_DEBUG, &host->init_complaint, "write_riemann plugin: batch sent.");
 
        pthread_mutex_unlock(&host->lock);
        return status;
@@ -542,7 +543,7 @@ static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
                }
        }
 
-       len = protobuf_c_message_get_packed_size((const ProtobufCMessage*)(host->batch_msg));
+       len = riemann_message_get_packed_size(host->batch_msg);
        ret = 0;
        if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
                ret = wrr_batch_flush_nolock(0, host);
@@ -570,8 +571,12 @@ static int wrr_notification(const notification_t *n, user_data_t *ud) /* {{{ */
 
        status = wrr_send(host, msg);
        if (status != 0)
-               ERROR("write_riemann plugin: riemann_client_send failed with status %i",
-                     status);
+        c_complain (LOG_ERR, &host->init_complaint,
+                    "write_riemann plugin: riemann_client_send failed with status %i",
+                    status);
+    else
+        c_release (LOG_DEBUG, &host->init_complaint,
+                   "write_riemann plugin: riemann_client_send succeeded");
 
        riemann_message_free(msg);
        return (status);
@@ -602,9 +607,6 @@ static int wrr_write(const data_set_t *ds, /* {{{ */
       return (-1);
 
     status = wrr_send(host, msg);
-    if (status != 0)
-      ERROR("write_riemann plugin: riemann_client_send failed with status %i",
-            status);
 
     riemann_message_free(msg);
   }
@@ -647,6 +649,7 @@ static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
     return ENOMEM;
   }
   pthread_mutex_init(&host->lock, NULL);
+  C_COMPLAIN_INIT (&host->init_complaint);
   host->reference_count = 1;
   host->node = NULL;
   host->port = 0;
@@ -660,6 +663,8 @@ static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
   host->ttl_factor = RIEMANN_TTL_FACTOR;
   host->client = NULL;
   host->client_type = RIEMANN_CLIENT_TCP;
+  host->timeout.tv_sec = 0;
+  host->timeout.tv_usec = 0;
 
   status = cf_util_get_string(ci, &host->name);
   if (status != 0) {
@@ -700,6 +705,10 @@ static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
       status = cf_util_get_int(child, &host->batch_max);
       if (status != 0)
         break;
+    } else if (strcasecmp("Timeout", child->key) == 0) {
+      status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
+      if (status != 0)
+        break;
     } else if (strcasecmp("Port", child->key) == 0) {
       host->port = cf_util_get_port_number(child);
       if (host->port == -1) {