Merge pull request #799 from mfournier/hiredis-switch
authorMarc Fournier <marc.fournier@camptocamp.com>
Thu, 20 Nov 2014 07:50:24 +0000 (08:50 +0100)
committerMarc Fournier <marc.fournier@camptocamp.com>
Thu, 20 Nov 2014 07:50:24 +0000 (08:50 +0100)
Switch redis & write_redis plugins from credis to hiredis

README
configure.ac
src/Makefile.am
src/redis.c
src/types.db
src/write_redis.c

diff --git a/README b/README
index 3c7a10f..8a34781 100644 (file)
--- a/README
+++ b/README
@@ -618,9 +618,9 @@ Prerequisites
   * libclntsh (optional)
     Used by the `oracle' plugin.
 
-  * libcredis (optional)
-    Used by the `redis' plugin. Please note that you require a 0.2.2 version
-    or higher. <http://code.google.com/p/credis/>
+  * libhiredis (optional)
+    Used by the redis plugin. Please note that you require a 0.10.0 version
+    or higher. <https://github.com/redis/hiredis>
 
   * libcurl (optional)
     If you want to use the `apache', `ascent', `bind', `curl', `curl_json',
index 7b665a9..78499a9 100644 (file)
@@ -1579,62 +1579,63 @@ fi
 AM_CONDITIONAL(BUILD_WITH_LIBAQUAERO5, test "x$with_libaquaero5" = "xyes")
 # }}}
 
-# --with-libcredis {{{
-AC_ARG_WITH(libcredis, [AS_HELP_STRING([--with-libcredis@<:@=PREFIX@:>@], [Path to libcredis.])],
+# --with-libhiredis {{{
+AC_ARG_WITH(libhiredis, [AS_HELP_STRING([--with-libhiredis@<:@=PREFIX@:>@],
+      [Path to libhiredis.])],
 [
  if test "x$withval" = "xyes"
  then
-        with_libcredis="yes"
+        with_libhiredis="yes"
  else if test "x$withval" = "xno"
  then
-        with_libcredis="no"
+        with_libhiredis="no"
  else
-        with_libcredis="yes"
-        LIBCREDIS_CPPFLAGS="$LIBCREDIS_CPPFLAGS -I$withval/include"
-        LIBCREDIS_LDFLAGS="$LIBCREDIS_LDFLAGS -L$withval/lib"
+        with_libhiredis="yes"
+        LIBHIREDIS_CPPFLAGS="$LIBHIREDIS_CPPFLAGS -I$withval/include"
+        LIBHIREDIS_LDFLAGS="$LIBHIREDIS_LDFLAGS -L$withval/lib"
  fi; fi
 ],
-[with_libcredis="yes"])
+[with_libhiredis="yes"])
 
 SAVE_CPPFLAGS="$CPPFLAGS"
 SAVE_LDFLAGS="$LDFLAGS"
 
-CPPFLAGS="$CPPFLAGS $LIBCREDIS_CPPFLAGS"
-LDFLAGS="$LDFLAGS $LIBCREDIS_LDFLAGS"
+CPPFLAGS="$CPPFLAGS $LIBHIREDIS_CPPFLAGS"
+LDFLAGS="$LDFLAGS $LIBHIREDIS_LDFLAGS"
 
-if test "x$with_libcredis" = "xyes"
+if test "x$with_libhiredis" = "xyes"
 then
-       if test "x$LIBCREDIS_CPPFLAGS" != "x"
+       if test "x$LIBHIREDIS_CPPFLAGS" != "x"
        then
-               AC_MSG_NOTICE([libcredis CPPFLAGS: $LIBCREDIS_CPPFLAGS])
+               AC_MSG_NOTICE([libhiredis CPPFLAGS: $LIBHIREDIS_CPPFLAGS])
        fi
-       AC_CHECK_HEADERS(credis.h,
-       [with_libcredis="yes"],
-       [with_libcredis="no (credis.h not found)"])
+       AC_CHECK_HEADERS(hiredis/hiredis.h,
+       [with_libhiredis="yes"],
+       [with_libhiredis="no (hiredis.h not found)"])
 fi
-if test "x$with_libcredis" = "xyes"
+if test "x$with_libhiredis" = "xyes"
 then
-       if test "x$LIBCREDIS_LDFLAGS" != "x"
+       if test "x$LIBHIREDIS_LDFLAGS" != "x"
        then
-               AC_MSG_NOTICE([libcredis LDFLAGS: $LIBCREDIS_LDFLAGS])
+               AC_MSG_NOTICE([libhiredis LDFLAGS: $LIBHIREDIS_LDFLAGS])
        fi
-       AC_CHECK_LIB(credis, credis_info,
-       [with_libcredis="yes"],
-       [with_libcredis="no (symbol 'credis_info' not found)"])
+       AC_CHECK_LIB(hiredis, redisCommand,
+       [with_libhiredis="yes"],
+       [with_libhiredis="no (symbol 'redisCommand' not found)"])
 
 fi
 
 CPPFLAGS="$SAVE_CPPFLAGS"
 LDFLAGS="$SAVE_LDFLAGS"
 
-if test "x$with_libcredis" = "xyes"
+if test "x$with_libhiredis" = "xyes"
 then
-       BUILD_WITH_LIBCREDIS_CPPFLAGS="$LIBCREDIS_CPPFLAGS"
-       BUILD_WITH_LIBCREDIS_LDFLAGS="$LIBCREDIS_LDFLAGS"
-       AC_SUBST(BUILD_WITH_LIBCREDIS_CPPFLAGS)
-       AC_SUBST(BUILD_WITH_LIBCREDIS_LDFLAGS)
+       BUILD_WITH_LIBHIREDIS_CPPFLAGS="$LIBHIREDIS_CPPFLAGS"
+       BUILD_WITH_LIBHIREDIS_LDFLAGS="$LIBHIREDIS_LDFLAGS"
+       AC_SUBST(BUILD_WITH_LIBHIREDIS_CPPFLAGS)
+       AC_SUBST(BUILD_WITH_LIBHIREDIS_LDFLAGS)
 fi
-AM_CONDITIONAL(BUILD_WITH_LIBCREDIS, test "x$with_libcredis" = "xyes")
+AM_CONDITIONAL(BUILD_WITH_LIBHIREDIS, test "x$with_libhiredis" = "xyes")
 # }}}
 
 # --with-libcurl {{{
@@ -5544,7 +5545,7 @@ AC_PLUGIN([powerdns],    [yes],                [PowerDNS statistics])
 AC_PLUGIN([processes],   [$plugin_processes],  [Process statistics])
 AC_PLUGIN([protocols],   [$plugin_protocols],  [Protocol (IP, TCP, ...) statistics])
 AC_PLUGIN([python],      [$with_python],       [Embed a Python interpreter])
-AC_PLUGIN([redis],       [$with_libcredis],    [Redis plugin])
+AC_PLUGIN([redis],       [$with_libhiredis],    [Redis plugin])
 AC_PLUGIN([routeros],    [$with_librouteros],  [RouterOS plugin])
 AC_PLUGIN([rrdcached],   [$librrd_rrdc_update], [RRDTool output plugin])
 AC_PLUGIN([rrdtool],     [$with_librrd],       [RRDTool output plugin])
@@ -5584,7 +5585,7 @@ AC_PLUGIN([write_graphite], [yes],             [Graphite / Carbon output plugin]
 AC_PLUGIN([write_http],  [$with_libcurl],      [HTTP output plugin])
 AC_PLUGIN([write_kafka],  [$with_librdkafka],  [Kafka output plugin])
 AC_PLUGIN([write_mongodb], [$with_libmongoc],  [MongoDB output plugin])
-AC_PLUGIN([write_redis], [$with_libcredis],    [Redis output plugin])
+AC_PLUGIN([write_redis], [$with_libhiredis],    [Redis output plugin])
 AC_PLUGIN([write_riemann], [$have_protoc_c],   [Riemann output plugin])
 AC_PLUGIN([write_tsdb],  [yes],                [TSDB output plugin])
 AC_PLUGIN([xmms],        [$with_libxmms],      [XMMS statistics])
@@ -5775,7 +5776,7 @@ Configuration:
     libatasmart . . . . . $with_libatasmart
     libcurl . . . . . . . $with_libcurl
     libdbi  . . . . . . . $with_libdbi
-    libcredis . . . . . . $with_libcredis
+    libhiredis  . . . . . $with_libhiredis
     libesmtp  . . . . . . $with_libesmtp
     libganglia  . . . . . $with_libganglia
     libgcrypt . . . . . . $with_libgcrypt
index 04c77a3..e942869 100644 (file)
@@ -841,9 +841,9 @@ endif
 if BUILD_PLUGIN_REDIS
 pkglib_LTLIBRARIES += redis.la
 redis_la_SOURCES = redis.c
-redis_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBCREDIS_LDFLAGS)
-redis_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBCREDIS_CPPFLAGS)
-redis_la_LIBADD = -lcredis
+redis_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBHIREDIS_LDFLAGS)
+redis_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBHIREDIS_CPPFLAGS)
+redis_la_LIBADD = -lhiredis
 endif
 
 if BUILD_PLUGIN_ROUTEROS
@@ -1186,9 +1186,9 @@ endif
 if BUILD_PLUGIN_WRITE_REDIS
 pkglib_LTLIBRARIES += write_redis.la
 write_redis_la_SOURCES = write_redis.c
-write_redis_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBCREDIS_LDFLAGS)
-write_redis_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBCREDIS_CPPFLAGS)
-write_redis_la_LIBADD = -lcredis
+write_redis_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBHIREDIS_LDFLAGS)
+write_redis_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_LIBHIREDIS_CPPFLAGS)
+write_redis_la_LIBADD = -lhiredis
 endif
 
 if BUILD_PLUGIN_WRITE_RIEMANN
index 92be18f..f5bb304 100644 (file)
@@ -26,7 +26,8 @@
 #include "configfile.h"
 
 #include <pthread.h>
-#include <credis.h>
+#include <sys/time.h>
+#include <hiredis/hiredis.h>
 
 #ifndef HOST_NAME_MAX
 # define HOST_NAME_MAX _POSIX_HOST_NAME_MAX
@@ -38,6 +39,7 @@
 #define REDIS_DEF_TIMEOUT 2000
 #define MAX_REDIS_NODE_NAME 64
 #define MAX_REDIS_PASSWD_LENGTH 512
+#define MAX_REDIS_VAL_SIZE 256
 
 /* Redis plugin configuration example:
  *
@@ -45,7 +47,8 @@
  *   <Node "mynode">
  *     Host "localhost"
  *     Port "6379"
- *     Timeout 2000
+ *     Timeout 2
+ *     Password "foobar"
  *   </Node>
  * </Plugin>
  */
@@ -58,7 +61,7 @@ struct redis_node_s
   char host[HOST_NAME_MAX];
   char passwd[MAX_REDIS_PASSWD_LENGTH];
   int port;
-  int timeout;
+  struct timeval timeout;
 
   redis_node_t *next;
 };
@@ -107,16 +110,18 @@ static int redis_node_add (const redis_node_t *rn) /* {{{ */
   return (0);
 } /* }}} */
 
+
 static int redis_config_node (oconfig_item_t *ci) /* {{{ */
 {
   redis_node_t rn;
   int i;
   int status;
+  int timeout;
 
   memset (&rn, 0, sizeof (rn));
   sstrncpy (rn.host, REDIS_DEF_HOST, sizeof (rn.host));
   rn.port = REDIS_DEF_PORT;
-  rn.timeout = REDIS_DEF_TIMEOUT;
+  rn.timeout.tv_usec = REDIS_DEF_TIMEOUT;
 
   status = cf_util_get_string_buffer (ci, rn.name, sizeof (rn.name));
   if (status != 0)
@@ -138,7 +143,10 @@ static int redis_config_node (oconfig_item_t *ci) /* {{{ */
       }
     }
     else if (strcasecmp ("Timeout", option->key) == 0)
-      status = cf_util_get_int (option, &rn.timeout);
+    {
+      status = cf_util_get_int (option, &timeout);
+      if (status == 0) rn.timeout.tv_usec = timeout;
+    }
     else if (strcasecmp ("Password", option->key) == 0)
       status = cf_util_get_string_buffer (option, rn.passwd, sizeof (rn.passwd));
     else
@@ -180,39 +188,14 @@ static int redis_config (oconfig_item_t *ci) /* {{{ */
 } /* }}} */
 
   __attribute__ ((nonnull(2)))
-static void redis_submit_g (char *plugin_instance,
-    const char *type, const char *type_instance,
-    gauge_t value) /* {{{ */
-{
-  value_t values[1];
-  value_list_t vl = VALUE_LIST_INIT;
-
-  values[0].gauge = value;
-
-  vl.values = values;
-  vl.values_len = 1;
-  sstrncpy (vl.host, hostname_g, sizeof (vl.host));
-  sstrncpy (vl.plugin, "redis", sizeof (vl.plugin));
-  if (plugin_instance != NULL)
-    sstrncpy (vl.plugin_instance, plugin_instance,
-        sizeof (vl.plugin_instance));
-  sstrncpy (vl.type, type, sizeof (vl.type));
-  if (type_instance != NULL)
-    sstrncpy (vl.type_instance, type_instance,
-        sizeof (vl.type_instance));
-
-  plugin_dispatch_values (&vl);
-} /* }}} */
-
-  __attribute__ ((nonnull(2)))
-static void redis_submit_d (char *plugin_instance,
+static void redis_submit (char *plugin_instance,
     const char *type, const char *type_instance,
-    derive_t value) /* {{{ */
+    value_t value) /* {{{ */
 {
   value_t values[1];
   value_list_t vl = VALUE_LIST_INIT;
 
-  values[0].derive = value;
+  values[0] = value;
 
   vl.values = values;
   vl.values_len = 1;
@@ -231,8 +214,14 @@ static void redis_submit_d (char *plugin_instance,
 
 static int redis_init (void) /* {{{ */
 {
-  redis_node_t rn = { "default", REDIS_DEF_HOST, REDIS_DEF_PASSWD,
-    REDIS_DEF_PORT, REDIS_DEF_TIMEOUT, /* next = */ NULL };
+  redis_node_t rn = {
+    .name = "default",
+    .host = REDIS_DEF_HOST,
+    .port = REDIS_DEF_PORT,
+    .timeout.tv_sec = 0,
+    .timeout.tv_usec = REDIS_DEF_TIMEOUT,
+    .next = NULL
+};
 
   if (nodes_head == NULL)
     redis_node_add (&rn);
@@ -240,20 +229,45 @@ static int redis_init (void) /* {{{ */
   return (0);
 } /* }}} int redis_init */
 
+int redis_handle_info (char *node, char const *info_line, char const *type, char const *type_instance, char const *field_name, int ds_type) /* {{{ */
+{
+  char *str = strstr (info_line, field_name);
+  static char buf[MAX_REDIS_VAL_SIZE];
+  value_t val;
+  if (str)
+  {
+    int i;
+
+    str += strlen (field_name) + 1; /* also skip the ':' */
+    for(i=0;(*str && (isdigit(*str) || *str == '.'));i++,str++)
+      buf[i] = *str;
+    buf[i] ='\0';
+
+    if(parse_value (buf, &val, ds_type) == -1)
+    {
+      WARNING ("redis plugin: Unable to parse field `%s'.", field_name);
+      return (-1);
+    }
+
+    redis_submit (node, type, type_instance, val);
+    return (0);
+  }
+  return (-1);
+
+} /* }}} int redis_handle_info */
+
 static int redis_read (void) /* {{{ */
 {
   redis_node_t *rn;
 
   for (rn = nodes_head; rn != NULL; rn = rn->next)
   {
-    REDIS rh;
-    REDIS_INFO info;
-
-    int status;
+    redisContext *rh;
+    redisReply   *rr;
 
     DEBUG ("redis plugin: querying info from node `%s' (%s:%d).", rn->name, rn->host, rn->port);
 
-    rh = credis_connect (rn->host, rn->port, rn->timeout);
+    rh = redisConnectWithTimeout ((char *)rn->host, rn->port, rn->timeout);
     if (rh == NULL)
     {
       ERROR ("redis plugin: unable to connect to node `%s' (%s:%d).", rn->name, rn->host, rn->port);
@@ -263,56 +277,42 @@ static int redis_read (void) /* {{{ */
     if (strlen (rn->passwd) > 0)
     {
       DEBUG ("redis plugin: authenticanting node `%s' passwd(%s).", rn->name, rn->passwd);
-      status = credis_auth(rh, rn->passwd);
-      if (status != 0)
+      rr = redisCommand (rh, "AUTH %s", rn->passwd);
+
+      if (rr == NULL || rr->type != REDIS_REPLY_STATUS)
       {
         WARNING ("redis plugin: unable to authenticate on node `%s'.", rn->name);
-        credis_close (rh);
+        if (rr != NULL)
+          freeReplyObject (rr);
+
+        redisFree (rh);
         continue;
       }
     }
 
-    memset (&info, 0, sizeof (info));
-    status = credis_info (rh, &info);
-    if (status != 0)
+    if ((rr = redisCommand(rh, "INFO")) == NULL)
     {
-      WARNING ("redis plugin: unable to get info from node `%s'.", rn->name);
-      credis_close (rh);
+      WARNING ("redis plugin: unable to connect to node `%s'.", rn->name);
+      redisFree (rh);
       continue;
     }
 
-    /* typedef struct _cr_info {
-     *   char redis_version[CREDIS_VERSION_STRING_SIZE];
-     *   int bgsave_in_progress;
-     *   int connected_clients;
-     *   int connected_slaves;
-     *   unsigned int used_memory;
-     *   long long changes_since_last_save;
-     *   int last_save_time;
-     *   long long total_connections_received;
-     *   long long total_commands_processed;
-     *   int uptime_in_seconds;
-     *   int uptime_in_days;
-     *   int role;
-     * } REDIS_INFO; */
-
-    DEBUG ("redis plugin: received info from node `%s': connected_clients = %d; "
-        "connected_slaves = %d; used_memory = %lu; changes_since_last_save = %lld; "
-        "bgsave_in_progress = %d; total_connections_received = %lld; "
-        "total_commands_processed = %lld; uptime_in_seconds = %ld", rn->name,
-        info.connected_clients, info.connected_slaves, info.used_memory,
-        info.changes_since_last_save, info.bgsave_in_progress,
-        info.total_connections_received, info.total_commands_processed,
-        info.uptime_in_seconds);
-
-    redis_submit_g (rn->name, "current_connections", "clients", info.connected_clients);
-    redis_submit_g (rn->name, "current_connections", "slaves", info.connected_slaves);
-    redis_submit_g (rn->name, "memory", "used", info.used_memory);
-    redis_submit_g (rn->name, "volatile_changes", NULL, info.changes_since_last_save);
-    redis_submit_d (rn->name, "total_connections", NULL, info.total_connections_received);
-    redis_submit_d (rn->name, "total_operations", NULL, info.total_commands_processed);
-
-    credis_close (rh);
+    redis_handle_info (rn->name, rr->str, "uptime", NULL, "uptime_in_seconds", DS_TYPE_GAUGE);
+    redis_handle_info (rn->name, rr->str, "current_connections", "clients", "connected_clients", DS_TYPE_GAUGE);
+    redis_handle_info (rn->name, rr->str, "blocked_clients", NULL, "blocked_clients", DS_TYPE_GAUGE);
+    redis_handle_info (rn->name, rr->str, "memory", NULL, "used_memory", DS_TYPE_GAUGE);
+    redis_handle_info (rn->name, rr->str, "memory_lua", NULL, "used_memory_lua", DS_TYPE_GAUGE);
+    /* changes_since_last_save: Deprecated in redis version 2.6 and above */
+    redis_handle_info (rn->name, rr->str, "volatile_changes", NULL, "changes_since_last_save", DS_TYPE_GAUGE);
+    redis_handle_info (rn->name, rr->str, "total_connections", NULL, "total_connections_received", DS_TYPE_DERIVE);
+    redis_handle_info (rn->name, rr->str, "total_operations", NULL, "total_commands_processed", DS_TYPE_DERIVE);
+    redis_handle_info (rn->name, rr->str, "expired_keys", NULL, "expired_keys", DS_TYPE_GAUGE);
+    redis_handle_info (rn->name, rr->str, "pubsub", "channels", "pubsub_channels", DS_TYPE_GAUGE);
+    redis_handle_info (rn->name, rr->str, "pubsub", "patterns", "pubsub_patterns", DS_TYPE_GAUGE);
+    redis_handle_info (rn->name, rr->str, "current_connections", "slaves", "connected_slaves", DS_TYPE_GAUGE);
+
+    freeReplyObject (rr);
+    redisFree (rh);
   }
 
   return 0;
index ec34bd4..f6c8433 100644 (file)
@@ -8,12 +8,14 @@ ath_nodes             value:GAUGE:0:65535
 ath_stat               value:DERIVE:0:U
 backends               value:GAUGE:0:65535
 bitrate                        value:GAUGE:0:4294967295
+blocked_clients value:GAUGE:0:U
 bytes                  value:GAUGE:0:U
 cache_eviction         value:DERIVE:0:U
 cache_operation                value:DERIVE:0:U
 cache_ratio            value:GAUGE:0:100
 cache_result           value:DERIVE:0:U
 cache_size             value:GAUGE:0:U
+changes_since_last_save   value:GAUGE:0:U
 charge                 value:GAUGE:0:U
 compression_ratio      value:GAUGE:0:2
 compression            uncompressed:DERIVE:0:U, compressed:DERIVE:0:U
@@ -59,6 +61,7 @@ email_check           value:GAUGE:0:U
 email_count            value:GAUGE:0:U
 email_size             value:GAUGE:0:U
 entropy                        value:GAUGE:0:4294967295
+expired_keys    value:GAUGE:0:U
 fanspeed               value:GAUGE:0:U
 file_size              value:GAUGE:0:U
 files                  value:GAUGE:0:U
@@ -99,6 +102,7 @@ memcached_items              value:GAUGE:0:U
 memcached_octets       rx:DERIVE:0:U, tx:DERIVE:0:U
 memcached_ops          value:DERIVE:0:U
 memory                 value:GAUGE:0:281474976710656
+memory_lua             value:GAUGE:0:281474976710656
 multimeter             value:GAUGE:U:U
 mutex_operations       value:DERIVE:0:U
 mysql_commands         value:DERIVE:0:U
@@ -156,6 +160,7 @@ ps_rss                      value:GAUGE:0:9223372036854775807
 ps_stacksize           value:GAUGE:0:9223372036854775807
 ps_state               value:GAUGE:0:65535
 ps_vm                  value:GAUGE:0:9223372036854775807
+pubsub        value:GAUGE:0:U
 queue_length           value:GAUGE:0:U
 records                        value:GAUGE:0:U
 requests               value:GAUGE:0:U
index 3defaca..b47650d 100644 (file)
@@ -30,7 +30,8 @@
 #include "configfile.h"
 
 #include <pthread.h>
-#include <credis.h>
+#include <sys/time.h>
+#include <hiredis/hiredis.h>
 
 struct wr_node_s
 {
@@ -38,9 +39,9 @@ struct wr_node_s
 
   char *host;
   int port;
-  int timeout;
+  struct timeval timeout;
 
-  REDIS conn;
+  redisContext *conn;
   pthread_mutex_t lock;
 };
 typedef struct wr_node_s wr_node_t;
@@ -56,15 +57,18 @@ static int wr_write (const data_set_t *ds, /* {{{ */
   char ident[512];
   char key[512];
   char value[512];
+  char time[24];
   size_t value_size;
   char *value_ptr;
   int status;
+  redisReply   *rr;
   int i;
 
   status = FORMAT_VL (ident, sizeof (ident), vl);
   if (status != 0)
     return (status);
   ssnprintf (key, sizeof (key), "collectd/%s", ident);
+  ssnprintf (time, sizeof (time), "%.9f", CDTIME_T_TO_DOUBLE(vl->time));
 
   memset (value, 0, sizeof (value));
   value_size = sizeof (value);
@@ -84,7 +88,9 @@ static int wr_write (const data_set_t *ds, /* {{{ */
   }                                                                  \
 } while (0)
 
-  APPEND ("%lu", (unsigned long) vl->time);
+  APPEND (time);
+  APPEND (":");
+
   for (i = 0; i < ds->ds_num; i++)
   {
     if (ds->ds[i].type == DS_TYPE_COUNTER)
@@ -105,7 +111,7 @@ static int wr_write (const data_set_t *ds, /* {{{ */
 
   if (node->conn == NULL)
   {
-    node->conn = credis_connect (node->host, node->port, node->timeout);
+    node->conn = redisConnectWithTimeout ((char *)node->host, node->port, node->timeout);
     if (node->conn == NULL)
     {
       ERROR ("write_redis plugin: Connecting to host \"%s\" (port %i) failed.",
@@ -116,12 +122,14 @@ static int wr_write (const data_set_t *ds, /* {{{ */
     }
   }
 
-  /* "credis_zadd" doesn't handle a NULL pointer gracefully, so I'd rather
-   * have a meaningful assertion message than a normal segmentation fault. */
   assert (node->conn != NULL);
-  status = credis_zadd (node->conn, key, (double) vl->time, value);
+  rr = redisCommand (node->conn, "ZADD %s %s %s", key, time, value);
+  if (rr==NULL)
+    WARNING("ZADD command error. key:%s", key);
 
-  credis_sadd (node->conn, "collectd/values", ident);
+  rr = redisCommand (node->conn, "SADD collectd/values %s", ident);
+  if (rr==NULL)
+    WARNING("SADD command error. ident:%s", ident);
 
   pthread_mutex_unlock (&node->lock);
 
@@ -137,7 +145,7 @@ static void wr_config_free (void *ptr) /* {{{ */
 
   if (node->conn != NULL)
   {
-    credis_close (node->conn);
+    redisFree (node->conn);
     node->conn = NULL;
   }
 
@@ -148,6 +156,7 @@ static void wr_config_free (void *ptr) /* {{{ */
 static int wr_config_node (oconfig_item_t *ci) /* {{{ */
 {
   wr_node_t *node;
+  int timeout;
   int status;
   int i;
 
@@ -157,7 +166,8 @@ static int wr_config_node (oconfig_item_t *ci) /* {{{ */
   memset (node, 0, sizeof (*node));
   node->host = NULL;
   node->port = 0;
-  node->timeout = 1000;
+  node->timeout.tv_sec = 0;
+  node->timeout.tv_usec = 1000;
   node->conn = NULL;
   pthread_mutex_init (&node->lock, /* attr = */ NULL);
 
@@ -183,8 +193,10 @@ static int wr_config_node (oconfig_item_t *ci) /* {{{ */
         status = 0;
       }
     }
-    else if (strcasecmp ("Timeout", child->key) == 0)
-      status = cf_util_get_int (child, &node->timeout);
+    else if (strcasecmp ("Timeout", child->key) == 0) {
+      status = cf_util_get_int (child, &timeout);
+      if (status == 0) node->timeout.tv_usec = timeout;
+    }
     else
       WARNING ("write_redis plugin: Ignoring unknown config option \"%s\".",
           child->key);