/**
* collectd - src/write_redis.c
- * Copyright (C) 2010 Florian Forster
+ * Copyright (C) 2010-2015 Florian Forster
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
#include <sys/time.h>
#include <hiredis/hiredis.h>
+#ifndef REDIS_DEFAULT_PREFIX
+# define REDIS_DEFAULT_PREFIX "collectd/"
+#endif
+
struct wr_node_s
{
char name[DATA_MAX_NAME_LEN];
char *host;
int port;
struct timeval timeout;
+ char *prefix;
redisContext *conn;
pthread_mutex_t lock;
char ident[512];
char key[512];
char value[512];
+ char time[24];
size_t value_size;
char *value_ptr;
int status;
status = FORMAT_VL (ident, sizeof (ident), vl);
if (status != 0)
return (status);
- ssnprintf (key, sizeof (key), "collectd/%s", ident);
+ ssnprintf (key, sizeof (key), "%s%s",
+ (node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX,
+ ident);
+ ssnprintf (time, sizeof (time), "%.9f", CDTIME_T_TO_DOUBLE(vl->time));
memset (value, 0, sizeof (value));
value_size = sizeof (value);
} \
} while (0)
- APPEND ("%lu", (unsigned long) vl->time);
+ APPEND ("%s:", time);
+
for (i = 0; i < ds->ds_num; i++)
{
if (ds->ds[i].type == DS_TYPE_COUNTER)
APPEND ("%llu", vl->values[i].counter);
else if (ds->ds[i].type == DS_TYPE_GAUGE)
- APPEND ("%g", vl->values[i].gauge);
+ APPEND (GAUGE_FORMAT, vl->values[i].gauge);
else if (ds->ds[i].type == DS_TYPE_DERIVE)
APPEND ("%"PRIi64, vl->values[i].derive);
else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
#undef APPEND
+ status = format_values (value_ptr, value_size, ds, vl, /* store rates = */ 0);
pthread_mutex_lock (&node->lock);
+ if (status != 0)
+ return (status);
if (node->conn == NULL)
{
node->conn = redisConnectWithTimeout ((char *)node->host, node->port, node->timeout);
if (node->conn == NULL)
{
- ERROR ("write_redis plugin: Connecting to host \"%s\" (port %i) failed.",
+ ERROR ("write_redis plugin: Connecting to host \"%s\" (port %i) failed: Unkown reason",
(node->host != NULL) ? node->host : "localhost",
(node->port != 0) ? node->port : 6379);
pthread_mutex_unlock (&node->lock);
return (-1);
}
+ else if (node->conn->err)
+ {
+ ERROR ("write_redis plugin: Connecting to host \"%s\" (port %i) failed: %s",
+ (node->host != NULL) ? node->host : "localhost",
+ (node->port != 0) ? node->port : 6379,
+ node->conn->errstr);
+ pthread_mutex_unlock (&node->lock);
+ return (-1);
+ }
}
- assert (node->conn != NULL);
- rr = redisCommand (node->conn, "ZADD %b %f %b", key, sizeof (key),
- (double) vl->time, value, sizeof (value));
+ rr = redisCommand (node->conn, "ZADD %s %s %s", key, time, value);
+ if (rr == NULL)
+ WARNING("ZADD command error. key:%s message:%s", key, node->conn->errstr);
+ else
+ freeReplyObject (rr);
+
+ /* TODO(octo): This is more overhead than necessary. Use the cache and
+ * metadata to determine if it is a new metric and call SADD only once for
+ * each metric. */
+ rr = redisCommand (node->conn, "SADD %svalues %s",
+ (node->prefix != NULL) ? node->prefix : REDIS_DEFAULT_PREFIX,
+ ident);
if (rr==NULL)
- WARNING("ZADD command error. key:%s", key);
-
- rr = redisCommand (node->conn, "SADD collectd/values %b", ident, sizeof(ident));
- if (rr==NULL)
- WARNING("SADD command error. ident:%s", ident);
+ WARNING("SADD command error. ident:%s message:%s", ident, node->conn->errstr);
+ else
+ freeReplyObject (rr);
pthread_mutex_unlock (&node->lock);
node->timeout.tv_sec = 0;
node->timeout.tv_usec = 1000;
node->conn = NULL;
+ node->prefix = NULL;
pthread_mutex_init (&node->lock, /* attr = */ NULL);
status = cf_util_get_string_buffer (ci, node->name, sizeof (node->name));
status = cf_util_get_int (child, &timeout);
if (status == 0) node->timeout.tv_usec = timeout;
}
+ else if (strcasecmp ("Prefix", child->key) == 0) {
+ status = cf_util_get_string (child, &node->prefix);
+ }
else
WARNING ("write_redis plugin: Ignoring unknown config option \"%s\".",
child->key);