/**
* collectd - src/write_redis.c
- * Copyright (C) 2010 Florian Forster
+ * Copyright (C) 2010-2015 Florian Forster
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
#include <sys/time.h>
#include <hiredis/hiredis.h>
+#ifndef REDIS_DEFAULT_PREFIX
+# define REDIS_DEFAULT_PREFIX "collectd/"
+#endif
+
struct wr_node_s
{
char name[DATA_MAX_NAME_LEN];
char *host;
int port;
struct timeval timeout;
+ char *prefix;
+ int database;
+ int max_set_size;
+ _Bool store_rates;
redisContext *conn;
pthread_mutex_t lock;
char *value_ptr;
int status;
redisReply *rr;
- int i;
status = FORMAT_VL (ident, sizeof (ident), vl);
if (status != 0)
return (status);
- ssnprintf (key, sizeof (key), "collectd/%s", ident);
+ ssnprintf (key, sizeof (key), "%s%s",
+ (node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX,
+ ident);
ssnprintf (time, sizeof (time), "%.9f", CDTIME_T_TO_DOUBLE(vl->time));
memset (value, 0, sizeof (value));
value_size = sizeof (value);
value_ptr = &value[0];
-
-#define APPEND(...) do { \
- status = snprintf (value_ptr, value_size, __VA_ARGS__); \
- if (((size_t) status) > value_size) \
- { \
- value_ptr += value_size; \
- value_size = 0; \
- } \
- else \
- { \
- value_ptr += status; \
- value_size -= status; \
- } \
-} while (0)
-
- APPEND ("%s:", time);
-
- for (i = 0; i < ds->ds_num; i++)
- {
- if (ds->ds[i].type == DS_TYPE_COUNTER)
- APPEND ("%llu", vl->values[i].counter);
- else if (ds->ds[i].type == DS_TYPE_GAUGE)
- APPEND (GAUGE_FORMAT, vl->values[i].gauge);
- else if (ds->ds[i].type == DS_TYPE_DERIVE)
- APPEND ("%"PRIi64, vl->values[i].derive);
- else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
- APPEND ("%"PRIu64, vl->values[i].absolute);
- else
- assert (23 == 42);
- }
-
-#undef APPEND
-
- status = format_values (value_ptr, value_size, ds, vl, /* store rates = */ 0);
- pthread_mutex_lock (&node->lock);
+ status = format_values (value_ptr, value_size, ds, vl, node->store_rates);
if (status != 0)
return (status);
+ pthread_mutex_lock (&node->lock);
+
if (node->conn == NULL)
{
node->conn = redisConnectWithTimeout ((char *)node->host, node->port, node->timeout);
pthread_mutex_unlock (&node->lock);
return (-1);
}
+
+ rr = redisCommand(node->conn, "SELECT %d", node->database);
+ if (rr == NULL)
+ WARNING("SELECT command error. database:%d message:%s", node->database, node->conn->errstr);
+ else
+ freeReplyObject (rr);
}
rr = redisCommand (node->conn, "ZADD %s %s %s", key, time, value);
else
freeReplyObject (rr);
+ if (node->max_set_size >= 0)
+ {
+ rr = redisCommand (node->conn, "ZREMRANGEBYRANK %s %d %d", key, 0, (-1 * node->max_set_size) - 1);
+ if (rr == NULL)
+ WARNING("ZREMRANGEBYRANK command error. key:%s message:%s", key, node->conn->errstr);
+ else
+ freeReplyObject (rr);
+ }
+
/* TODO(octo): This is more overhead than necessary. Use the cache and
* metadata to determine if it is a new metric and call SADD only once for
* each metric. */
- rr = redisCommand (node->conn, "SADD collectd/values %s", ident);
+ rr = redisCommand (node->conn, "SADD %svalues %s",
+ (node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX,
+ ident);
if (rr==NULL)
WARNING("SADD command error. ident:%s message:%s", ident, node->conn->errstr);
else
int status;
int i;
- node = malloc (sizeof (*node));
+ node = calloc (1, sizeof (*node));
if (node == NULL)
return (ENOMEM);
- memset (node, 0, sizeof (*node));
node->host = NULL;
node->port = 0;
node->timeout.tv_sec = 0;
node->timeout.tv_usec = 1000;
node->conn = NULL;
+ node->prefix = NULL;
+ node->database = 0;
+ node->max_set_size = -1;
+ node->store_rates = 1;
pthread_mutex_init (&node->lock, /* attr = */ NULL);
status = cf_util_get_string_buffer (ci, node->name, sizeof (node->name));
status = cf_util_get_int (child, &timeout);
if (status == 0) node->timeout.tv_usec = timeout;
}
+ else if (strcasecmp ("Prefix", child->key) == 0) {
+ status = cf_util_get_string (child, &node->prefix);
+ }
+ else if (strcasecmp ("Database", child->key) == 0) {
+ status = cf_util_get_int (child, &node->database);
+ }
+ else if (strcasecmp ("MaxSetSize", child->key) == 0) {
+ status = cf_util_get_int (child, &node->max_set_size);
+ }
+ else if (strcasecmp ("StoreRates", child->key) == 0) {
+ status = cf_util_get_boolean (child, &node->store_rates);
+ }
else
WARNING ("write_redis plugin: Ignoring unknown config option \"%s\".",
child->key);