write_riemann: do not deadlock when sending from flush
[collectd.git] / src / write_riemann.c
index fd82650..f836b9d 100644 (file)
@@ -46,6 +46,7 @@
 #define RIEMANN_BATCH_MAX      8192
 
 struct riemann_host {
+    c_complain_t init_complaint;
        char                    *name;
        char                    *event_service_prefix;
        pthread_mutex_t  lock;
@@ -56,13 +57,12 @@ struct riemann_host {
        _Bool                    always_append_ds;
        char                    *node;
        int                      port;
-    c_complain_t init_complaint;
-    c_complain_t init_send_complaint;
        riemann_client_type_t    client_type;
        riemann_client_t        *client;
        double                   ttl_factor;
     cdtime_t         batch_init;
     int              batch_max;
+    int              batch_timeout;
        int                          reference_count;
   riemann_message_t    *batch_msg;
        char                     *tls_ca_file;
@@ -136,21 +136,18 @@ static int wrr_disconnect(struct riemann_host *host) /* {{{ */
  *
  * Acquires the host lock, disconnects on errors.
  */
-static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
+static int wrr_send_nolock(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
 {
        int status = 0;
-       pthread_mutex_lock (&host->lock);
 
        status = wrr_connect(host);
        if (status != 0) {
-        pthread_mutex_unlock(&host->lock);
                return status;
     }
 
        status = riemann_client_send_message(host->client, msg);
        if (status != 0) {
                wrr_disconnect(host);
-               pthread_mutex_unlock(&host->lock);
                return status;
        }
 
@@ -166,16 +163,24 @@ static int wrr_send(struct riemann_host *host, riemann_message_t *msg) /* {{{ */
                if (response == NULL)
                {
                        wrr_disconnect(host);
-                       pthread_mutex_unlock(&host->lock);
                        return errno;
                }
                riemann_message_free(response);
        }
 
-       pthread_mutex_unlock (&host->lock);
        return 0;
 } /* }}} int wrr_send */
 
+static int wrr_send(struct riemann_host *host, riemann_message_t *msg)
+{
+    int status = 0;
+
+    pthread_mutex_lock (&host->lock);
+    status = wrr_send_nolock(host, msg);
+    pthread_mutex_unlock (&host->lock);
+    return status;
+}
+
 static riemann_message_t *wrr_notification_to_message(struct riemann_host *host, /* {{{ */
                notification_t const *n)
 {
@@ -461,7 +466,7 @@ static int wrr_batch_flush_nolock(cdtime_t timeout,
                if ((host->batch_init + timeout) > now)
                        return status;
        }
-       wrr_send(host, host->batch_msg);
+       wrr_send_nolock(host, host->batch_msg);
        riemann_message_free(host->batch_msg);
 
        if (host->client_type != RIEMANN_CLIENT_UDP)
@@ -516,6 +521,7 @@ static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
        riemann_message_t *msg;
        size_t len;
        int ret;
+    cdtime_t timeout;
 
        msg = wrr_value_list_to_message(host, ds, vl, statuses);
        if (msg == NULL)
@@ -547,7 +553,12 @@ static int wrr_batch_add_value_list(struct riemann_host *host, /* {{{ */
        ret = 0;
        if ((host->batch_max < 0) || (((size_t) host->batch_max) <= len)) {
                ret = wrr_batch_flush_nolock(0, host);
-       }
+       } else {
+        if (host->batch_timeout > 0) {
+            timeout = TIME_T_TO_CDTIME_T((time_t)host->batch_timeout);
+            ret = wrr_batch_flush_nolock(timeout, host);
+        }
+    }
 
        pthread_mutex_unlock(&host->lock);
        return ret;
@@ -660,6 +671,7 @@ static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
   host->batch_mode = 1;
   host->batch_max = RIEMANN_BATCH_MAX; /* typical MSS */
   host->batch_init = cdtime();
+  host->batch_timeout = 0;
   host->ttl_factor = RIEMANN_TTL_FACTOR;
   host->client = NULL;
   host->client_type = RIEMANN_CLIENT_TCP;
@@ -705,6 +717,10 @@ static int wrr_config_node(oconfig_item_t *ci) /* {{{ */
       status = cf_util_get_int(child, &host->batch_max);
       if (status != 0)
         break;
+    } else if (strcasecmp("BatchFlushTimeout", child->key) == 0) {
+      status = cf_util_get_int(child, &host->batch_timeout);
+      if (status != 0)
+        break;
     } else if (strcasecmp("Timeout", child->key) == 0) {
       status = cf_util_get_int(child, (int *)&host->timeout.tv_sec);
       if (status != 0)