Merge pull request #2618 from ajssmith/amqp1_dev1_branch
[collectd.git] / src / write_redis.c
index 0a5c5df..c17654b 100644 (file)
@@ -45,7 +45,8 @@ struct wr_node_s {
   char *prefix;
   int database;
   int max_set_size;
-  _Bool store_rates;
+  int max_set_duration;
+  bool store_rates;
 
   redisContext *conn;
   pthread_mutex_t lock;
@@ -69,17 +70,16 @@ static int wr_write(const data_set_t *ds, /* {{{ */
 
   status = FORMAT_VL(ident, sizeof(ident), vl);
   if (status != 0)
-    return (status);
-  ssnprintf(key, sizeof(key), "%s%s",
-            (node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX,
-            ident);
-  ssnprintf(time, sizeof(time), "%.9f", CDTIME_T_TO_DOUBLE(vl->time));
+    return status;
+  snprintf(key, sizeof(key), "%s%s",
+           (node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX, ident);
+  snprintf(time, sizeof(time), "%.9f", CDTIME_T_TO_DOUBLE(vl->time));
 
   value_size = sizeof(value);
   value_ptr = &value[0];
   status = format_values(value_ptr, value_size, ds, vl, node->store_rates);
   if (status != 0)
-    return (status);
+    return status;
 
   pthread_mutex_lock(&node->lock);
 
@@ -92,14 +92,14 @@ static int wr_write(const data_set_t *ds, /* {{{ */
             (node->host != NULL) ? node->host : "localhost",
             (node->port != 0) ? node->port : 6379);
       pthread_mutex_unlock(&node->lock);
-      return (-1);
+      return -1;
     } else if (node->conn->err) {
       ERROR(
           "write_redis plugin: Connecting to host \"%s\" (port %i) failed: %s",
           (node->host != NULL) ? node->host : "localhost",
           (node->port != 0) ? node->port : 6379, node->conn->errstr);
       pthread_mutex_unlock(&node->lock);
-      return (-1);
+      return -1;
     }
 
     rr = redisCommand(node->conn, "SELECT %d", node->database);
@@ -126,6 +126,20 @@ static int wr_write(const data_set_t *ds, /* {{{ */
       freeReplyObject(rr);
   }
 
+  if (node->max_set_duration > 0) {
+    /*
+     * remove element, scored less than 'current-max_set_duration'
+     * '(...' indicates 'less than' in redis CLI.
+     */
+    rr = redisCommand(node->conn, "ZREMRANGEBYSCORE %s -1 (%.9f", key,
+                      (CDTIME_T_TO_DOUBLE(vl->time) - node->max_set_duration));
+    if (rr == NULL)
+      WARNING("ZREMRANGEBYSCORE command error. key:%s message:%s", key,
+              node->conn->errstr);
+    else
+      freeReplyObject(rr);
+  }
+
   /* TODO(octo): This is more overhead than necessary. Use the cache and
    * metadata to determine if it is a new metric and call SADD only once for
    * each metric. */
@@ -140,7 +154,7 @@ static int wr_write(const data_set_t *ds, /* {{{ */
 
   pthread_mutex_unlock(&node->lock);
 
-  return (0);
+  return 0;
 } /* }}} int wr_write */
 
 static void wr_config_free(void *ptr) /* {{{ */
@@ -167,7 +181,7 @@ static int wr_config_node(oconfig_item_t *ci) /* {{{ */
 
   node = calloc(1, sizeof(*node));
   if (node == NULL)
-    return (ENOMEM);
+    return ENOMEM;
   node->host = NULL;
   node->port = 0;
   node->timeout.tv_sec = 0;
@@ -176,13 +190,14 @@ static int wr_config_node(oconfig_item_t *ci) /* {{{ */
   node->prefix = NULL;
   node->database = 0;
   node->max_set_size = -1;
-  node->store_rates = 1;
+  node->max_set_duration = -1;
+  node->store_rates = true;
   pthread_mutex_init(&node->lock, /* attr = */ NULL);
 
   status = cf_util_get_string_buffer(ci, node->name, sizeof(node->name));
   if (status != 0) {
     sfree(node);
-    return (status);
+    return status;
   }
 
   for (int i = 0; i < ci->children_num; i++) {
@@ -206,6 +221,8 @@ static int wr_config_node(oconfig_item_t *ci) /* {{{ */
       status = cf_util_get_int(child, &node->database);
     } else if (strcasecmp("MaxSetSize", child->key) == 0) {
       status = cf_util_get_int(child, &node->max_set_size);
+    } else if (strcasecmp("MaxSetDuration", child->key) == 0) {
+      status = cf_util_get_int(child, &node->max_set_duration);
     } else if (strcasecmp("StoreRates", child->key) == 0) {
       status = cf_util_get_boolean(child, &node->store_rates);
     } else
@@ -217,20 +234,21 @@ static int wr_config_node(oconfig_item_t *ci) /* {{{ */
   } /* for (i = 0; i < ci->children_num; i++) */
 
   if (status == 0) {
-    char cb_name[DATA_MAX_NAME_LEN];
+    char cb_name[sizeof("write_redis/") + DATA_MAX_NAME_LEN];
 
-    ssnprintf(cb_name, sizeof(cb_name), "write_redis/%s", node->name);
+    snprintf(cb_name, sizeof(cb_name), "write_redis/%s", node->name);
 
-    status = plugin_register_write(
-        cb_name, wr_write, &(user_data_t){
-                               .data = node, .free_func = wr_config_free,
-                           });
+    status =
+        plugin_register_write(cb_name, wr_write,
+                              &(user_data_t){
+                                  .data = node, .free_func = wr_config_free,
+                              });
   }
 
   if (status != 0)
     wr_config_free(node);
 
-  return (status);
+  return status;
 } /* }}} int wr_config_node */
 
 static int wr_config(oconfig_item_t *ci) /* {{{ */
@@ -246,11 +264,9 @@ static int wr_config(oconfig_item_t *ci) /* {{{ */
               child->key);
   }
 
-  return (0);
+  return 0;
 } /* }}} int wr_config */
 
 void module_register(void) {
   plugin_register_complex_config("write_redis", wr_config);
 }
-
-/* vim: set sw=2 sts=2 tw=78 et fdm=marker : */