/**
- * collectd - src/target_set.c
- * Copyright (C) 2008-2010 Florian Forster
+ * collectd - src/write_redis.c
+ * Copyright (C) 2010 Florian Forster
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
#include "configfile.h"
#include <pthread.h>
-#include <credis.h>
+#include <sys/time.h>
+#include <hiredis/hiredis.h>
struct wr_node_s
{
char *host;
int port;
- int timeout;
+ struct timeval timeout;
- REDIS conn;
+ redisContext *conn;
pthread_mutex_t lock;
};
typedef struct wr_node_s wr_node_t;
user_data_t *ud)
{
wr_node_t *node = ud->data;
+ char ident[512];
char key[512];
char value[512];
- char tmp[512];
+ char time[24];
+ size_t value_size;
+ char *value_ptr;
int status;
+ redisReply *rr;
int i;
- status = FORMAT_VL (tmp, sizeof (tmp), vl);
+ status = FORMAT_VL (ident, sizeof (ident), vl);
if (status != 0)
return (status);
- ssnprintf (key, sizeof (key), "collectd/%s", tmp);
+ ssnprintf (key, sizeof (key), "collectd/%s", ident);
+ ssnprintf (time, sizeof (time), "%.9f", CDTIME_T_TO_DOUBLE(vl->time));
+
+ memset (value, 0, sizeof (value));
+ value_size = sizeof (value);
+ value_ptr = &value[0];
+
+#define APPEND(...) do { \
+ status = snprintf (value_ptr, value_size, __VA_ARGS__); \
+ if (((size_t) status) > value_size) \
+ { \
+ value_ptr += value_size; \
+ value_size = 0; \
+ } \
+ else \
+ { \
+ value_ptr += status; \
+ value_size -= status; \
+ } \
+} while (0)
+
+ APPEND ("%s:", time);
- ssnprintf (value, sizeof (value), "%lu", (unsigned long) vl->time);
for (i = 0; i < ds->ds_num; i++)
{
if (ds->ds[i].type == DS_TYPE_COUNTER)
- ssnprintf (tmp, sizeof (tmp), "%s:%llu",
- value, vl->values[i].counter);
+ APPEND ("%llu", vl->values[i].counter);
else if (ds->ds[i].type == DS_TYPE_GAUGE)
- ssnprintf (tmp, sizeof (tmp), "%s:%g",
- value, vl->values[i].gauge);
+ APPEND ("%g", vl->values[i].gauge);
else if (ds->ds[i].type == DS_TYPE_DERIVE)
- ssnprintf (tmp, sizeof (tmp), "%s:%"PRIi64,
- value, vl->values[i].derive);
+ APPEND ("%"PRIi64, vl->values[i].derive);
else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
- ssnprintf (tmp, sizeof (tmp), "%s:%"PRIu64,
- value, vl->values[i].absolute);
+ APPEND ("%"PRIu64, vl->values[i].absolute);
else
assert (23 == 42);
-
- memcpy (value, tmp, sizeof (value));
}
+#undef APPEND
+
pthread_mutex_lock (&node->lock);
if (node->conn == NULL)
{
- node->conn = credis_connect (node->host, node->port, node->timeout);
+ node->conn = redisConnectWithTimeout ((char *)node->host, node->port, node->timeout);
if (node->conn == NULL)
{
ERROR ("write_redis plugin: Connecting to host \"%s\" (port %i) failed.",
}
}
- /* "credis_zadd" doesn't handle a NULL pointer gracefully, so I'd rather
- * have a meaningful assertion message than a normal segmentation fault. */
assert (node->conn != NULL);
- status = credis_zadd (node->conn, key, (double) vl->time, value);
+ rr = redisCommand (node->conn, "ZADD %s %s %s", key, time, value);
+ if (rr==NULL)
+ WARNING("ZADD command error. key:%s", key);
+
+ rr = redisCommand (node->conn, "SADD collectd/values %s", ident);
+ if (rr==NULL)
+ WARNING("SADD command error. ident:%s", ident);
pthread_mutex_unlock (&node->lock);
if (node->conn != NULL)
{
- credis_close (node->conn);
+ redisFree (node->conn);
node->conn = NULL;
}
static int wr_config_node (oconfig_item_t *ci) /* {{{ */
{
wr_node_t *node;
+ int timeout;
int status;
int i;
memset (node, 0, sizeof (*node));
node->host = NULL;
node->port = 0;
- node->timeout = 1000;
+ node->timeout.tv_sec = 0;
+ node->timeout.tv_usec = 1000;
node->conn = NULL;
pthread_mutex_init (&node->lock, /* attr = */ NULL);
status = 0;
}
}
- else if (strcasecmp ("Timeout", child->key) == 0)
- status = cf_util_get_int (child, &node->timeout);
+ else if (strcasecmp ("Timeout", child->key) == 0) {
+ status = cf_util_get_int (child, &timeout);
+ if (status == 0) node->timeout.tv_usec = timeout;
+ }
else
WARNING ("write_redis plugin: Ignoring unknown config option \"%s\".",
child->key);